#define MS_CLASS "TcpConnectionHandle"
#include "handles/TcpConnectionHandle.hpp"
#include "DepLibUV.hpp"
#ifdef MS_LIBURING_SUPPORTED
#include "DepLibUring.hpp"
#endif
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include <cstring>
inline static void onAlloc(uv_handle_t* handle, size_t suggestedSize, uv_buf_t* buf)
{
auto* connection = static_cast<TcpConnectionHandle*>(handle->data);
if (connection)
{
connection->OnUvReadAlloc(suggestedSize, buf);
}
}
inline static void onRead(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{
auto* connection = static_cast<TcpConnectionHandle*>(handle->data);
if (connection)
{
connection->OnUvRead(nread, buf);
}
}
inline static void onWrite(uv_write_t* req, int status)
{
auto* writeData = static_cast<TcpConnectionHandle::UvWriteData*>(req->data);
auto* handle = req->handle;
auto* connection = static_cast<TcpConnectionHandle*>(handle->data);
const auto* cb = writeData->cb;
if (connection)
{
connection->OnUvWrite(status, cb);
}
delete writeData;
}
inline static void onCloseTcp(uv_handle_t* handle)
{
delete reinterpret_cast<uv_tcp_t*>(handle);
}
inline static void onShutdown(uv_shutdown_t* req, int )
{
auto* handle = req->handle;
delete req;
uv_close(reinterpret_cast<uv_handle_t*>(handle), static_cast<uv_close_cb>(onCloseTcp));
}
TcpConnectionHandle::TcpConnectionHandle(size_t bufferSize)
: bufferSize(bufferSize), uvHandle(new uv_tcp_t)
{
MS_TRACE();
this->uvHandle->data = static_cast<void*>(this);
}
TcpConnectionHandle::~TcpConnectionHandle()
{
MS_TRACE();
if (!this->closed)
{
InternalClose();
}
delete[] this->buffer;
}
void TcpConnectionHandle::TriggerClose()
{
MS_TRACE();
if (this->closed)
{
return;
}
InternalClose();
this->listener->OnTcpConnectionClosed(this);
}
void TcpConnectionHandle::Dump(int indentation) const
{
MS_DUMP_CLEAN(indentation, "<TcpConnectionHandle>");
MS_DUMP_CLEAN(indentation, " local IP: %s", this->localIp.c_str());
MS_DUMP_CLEAN(indentation, " local port: %" PRIu16, static_cast<uint16_t>(this->localPort));
MS_DUMP_CLEAN(indentation, " remote IP: %s", this->peerIp.c_str());
MS_DUMP_CLEAN(indentation, " remote port: %" PRIu16, static_cast<uint16_t>(this->peerPort));
MS_DUMP_CLEAN(indentation, " closed: %s", this->closed ? "yes" : "no");
MS_DUMP_CLEAN(indentation, "</TcpConnectionHandle>");
}
void TcpConnectionHandle::Setup(
Listener* listener, struct sockaddr_storage* localAddr, const std::string& localIp, uint16_t localPort)
{
MS_TRACE();
const int err = uv_tcp_init(DepLibUV::GetLoop(), this->uvHandle);
if (err != 0)
{
delete this->uvHandle;
this->uvHandle = nullptr;
MS_THROW_ERROR("uv_tcp_init() failed: %s", uv_strerror(err));
}
this->listener = listener;
this->localAddr = localAddr;
this->localIp = localIp;
this->localPort = localPort;
}
void TcpConnectionHandle::Start()
{
MS_TRACE();
if (this->closed)
{
return;
}
int err = uv_read_start(
reinterpret_cast<uv_stream_t*>(this->uvHandle),
static_cast<uv_alloc_cb>(onAlloc),
static_cast<uv_read_cb>(onRead));
if (err != 0)
{
MS_THROW_ERROR("uv_read_start() failed: %s", uv_strerror(err));
}
if (!SetPeerAddress())
{
MS_THROW_ERROR("error setting peer IP and port");
}
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
err = uv_fileno(reinterpret_cast<uv_handle_t*>(this->uvHandle), std::addressof(this->fd));
if (err != 0)
{
MS_THROW_ERROR("uv_fileno() failed: %s", uv_strerror(err));
}
}
#endif
}
void TcpConnectionHandle::Write(
const uint8_t* data1,
size_t len1,
const uint8_t* data2,
size_t len2,
TcpConnectionHandle::onSendCallback* cb)
{
MS_TRACE();
if (this->closed)
{
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
if (len1 == 0 && len2 == 0)
{
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
if (!DepLibUring::IsActive())
{
goto write_libuv;
}
auto prepared = DepLibUring::PrepareWrite(this->fd, data1, len1, data2, len2, cb);
if (!prepared)
{
MS_DEBUG_DEV("cannot write via liburing, fallback to libuv");
goto write_libuv;
}
return;
}
write_libuv:
#endif
const size_t totalLen = len1 + len2;
uv_buf_t buffers[2];
int written{ 0 };
int err;
buffers[0] = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data1)), len1);
buffers[1] = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data2)), len2);
written = uv_try_write(reinterpret_cast<uv_stream_t*>(this->uvHandle), buffers, 2);
if (written == static_cast<int>(totalLen))
{
this->sentBytes += written;
if (cb)
{
(*cb)(true);
delete cb;
}
return;
}
else if (written == UV_EAGAIN || written == UV_ENOSYS)
{
written = 0;
}
else if (written < 0)
{
MS_WARN_DEV("uv_try_write() failed, trying uv_write(): %s", uv_strerror(written));
written = 0;
}
const size_t pendingLen = totalLen - written;
auto* writeData = new UvWriteData(pendingLen);
writeData->req.data = static_cast<void*>(writeData);
if (static_cast<size_t>(written) < len1)
{
std::memcpy(
writeData->store, data1 + static_cast<size_t>(written), len1 - static_cast<size_t>(written));
std::memcpy(writeData->store + (len1 - static_cast<size_t>(written)), data2, len2);
}
else
{
std::memcpy(
writeData->store,
data2 + (static_cast<size_t>(written) - len1),
len2 - (static_cast<size_t>(written) - len1));
}
writeData->cb = cb;
const uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(writeData->store), pendingLen);
err = uv_write(
&writeData->req,
reinterpret_cast<uv_stream_t*>(this->uvHandle),
&buffer,
1,
static_cast<uv_write_cb>(onWrite));
if (err != 0)
{
MS_WARN_DEV("uv_write() failed: %s", uv_strerror(err));
if (cb)
{
(*cb)(false);
}
delete writeData;
}
else
{
this->sentBytes += pendingLen;
}
}
void TcpConnectionHandle::ErrorReceiving()
{
MS_TRACE();
InternalClose();
this->listener->OnTcpConnectionClosed(this);
}
void TcpConnectionHandle::InternalClose()
{
MS_TRACE();
if (this->closed)
{
return;
}
int err;
this->closed = true;
this->uvHandle->data = nullptr;
err = uv_read_stop(reinterpret_cast<uv_stream_t*>(this->uvHandle));
if (err != 0)
{
try
{
MS_ABORT("uv_read_stop() failed: %s", uv_strerror(err));
}
catch (const std::exception& e)
{
MS_ERROR("%s", e.what());
}
}
if (!this->hasError && !this->isClosedByPeer)
{
auto* req = new uv_shutdown_t;
req->data = static_cast<void*>(this);
err = uv_shutdown(
req, reinterpret_cast<uv_stream_t*>(this->uvHandle), static_cast<uv_shutdown_cb>(onShutdown));
if (err != 0)
{
try
{
MS_ABORT("uv_shutdown() failed: %s", uv_strerror(err));
}
catch (const std::exception& e)
{
MS_ERROR("%s", e.what());
}
}
}
else
{
uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onCloseTcp));
}
}
bool TcpConnectionHandle::SetPeerAddress()
{
MS_TRACE();
int err;
int len = sizeof(this->peerAddr);
err = uv_tcp_getpeername(this->uvHandle, reinterpret_cast<struct sockaddr*>(&this->peerAddr), &len);
if (err != 0)
{
MS_ERROR("uv_tcp_getpeername() failed: %s", uv_strerror(err));
return false;
}
int family;
Utils::IP::GetAddressInfo(
reinterpret_cast<const struct sockaddr*>(&this->peerAddr), family, this->peerIp, this->peerPort);
return true;
}
inline void TcpConnectionHandle::OnUvReadAlloc(size_t , uv_buf_t* buf)
{
MS_TRACE();
if (!this->buffer)
{
this->buffer = new uint8_t[this->bufferSize];
}
buf->base = reinterpret_cast<char*>(this->buffer + this->bufferDataLen);
if (this->bufferSize > this->bufferDataLen)
{
buf->len = this->bufferSize - this->bufferDataLen;
}
else
{
buf->len = 0;
MS_WARN_DEV("no available space in the buffer");
}
}
inline void TcpConnectionHandle::OnUvRead(ssize_t nread, const uv_buf_t* )
{
MS_TRACE();
if (nread == 0)
{
return;
}
if (nread > 0)
{
this->recvBytes += nread;
this->bufferDataLen += static_cast<size_t>(nread);
UserOnTcpConnectionRead();
}
else if (nread == UV_EOF || nread == UV_ECONNRESET)
{
MS_DEBUG_DEV("connection closed by peer, closing server side");
this->isClosedByPeer = true;
InternalClose();
this->listener->OnTcpConnectionClosed(this);
}
else
{
MS_WARN_DEV("read error, closing the connection: %s", uv_strerror(nread));
this->hasError = true;
InternalClose();
this->listener->OnTcpConnectionClosed(this);
}
}
inline void TcpConnectionHandle::OnUvWrite(int status, TcpConnectionHandle::onSendCallback* cb)
{
MS_TRACE();
if (status == 0)
{
if (cb)
{
(*cb)(true);
}
}
else
{
if (status != UV_EPIPE && status != UV_ENOTCONN)
{
this->hasError = true;
}
MS_WARN_DEV("write error, closing the connection: %s", uv_strerror(status));
if (cb)
{
(*cb)(false);
}
InternalClose();
this->listener->OnTcpConnectionClosed(this);
}
}