#define MS_CLASS "TcpConnectionHandler"
#include "handles/TcpConnectionHandler.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include <cstring>
inline static void onAlloc(uv_handle_t* handle, size_t suggestedSize, uv_buf_t* buf)
{
auto* connection = static_cast<TcpConnectionHandler*>(handle->data);
if (connection)
connection->OnUvReadAlloc(suggestedSize, buf);
}
inline static void onRead(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf)
{
auto* connection = static_cast<TcpConnectionHandler*>(handle->data);
if (connection)
connection->OnUvRead(nread, buf);
}
inline static void onWrite(uv_write_t* req, int status)
{
auto* writeData = static_cast<TcpConnectionHandler::UvWriteData*>(req->data);
auto* handle = req->handle;
auto* connection = static_cast<TcpConnectionHandler*>(handle->data);
auto* cb = writeData->cb;
if (connection)
connection->OnUvWrite(status, cb);
delete writeData;
}
inline static void onClose(uv_handle_t* handle)
{
delete handle;
}
inline static void onShutdown(uv_shutdown_t* req, int )
{
auto* handle = req->handle;
delete req;
uv_close(reinterpret_cast<uv_handle_t*>(handle), static_cast<uv_close_cb>(onClose));
}
TcpConnectionHandler::TcpConnectionHandler(size_t bufferSize) : bufferSize(bufferSize)
{
MS_TRACE();
this->uvHandle = new uv_tcp_t;
this->uvHandle->data = static_cast<void*>(this);
}
TcpConnectionHandler::~TcpConnectionHandler()
{
MS_TRACE();
if (!this->closed)
Close();
delete[] this->buffer;
}
void TcpConnectionHandler::Close()
{
MS_TRACE();
if (this->closed)
return;
int err;
this->closed = true;
this->uvHandle->data = nullptr;
err = uv_read_stop(reinterpret_cast<uv_stream_t*>(this->uvHandle));
if (err != 0)
MS_ABORT("uv_read_stop() failed: %s", uv_strerror(err));
if (!this->hasError && !this->isClosedByPeer)
{
auto req = new uv_shutdown_t;
req->data = static_cast<void*>(this);
err = uv_shutdown(
req, reinterpret_cast<uv_stream_t*>(this->uvHandle), static_cast<uv_shutdown_cb>(onShutdown));
if (err != 0)
MS_ABORT("uv_shutdown() failed: %s", uv_strerror(err));
}
else
{
uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClose));
}
}
void TcpConnectionHandler::Dump() const
{
MS_DUMP("<TcpConnectionHandler>");
MS_DUMP(" localIp : %s", this->localIp.c_str());
MS_DUMP(" localPort : %" PRIu16, static_cast<uint16_t>(this->localPort));
MS_DUMP(" remoteIp : %s", this->peerIp.c_str());
MS_DUMP(" remotePort : %" PRIu16, static_cast<uint16_t>(this->peerPort));
MS_DUMP(" closed : %s", !this->closed ? "open" : "closed");
MS_DUMP("</TcpConnectionHandler>");
}
void TcpConnectionHandler::Setup(
Listener* listener, struct sockaddr_storage* localAddr, const std::string& localIp, uint16_t localPort)
{
MS_TRACE();
int err = uv_tcp_init(DepLibUV::GetLoop(), this->uvHandle);
if (err != 0)
{
delete this->uvHandle;
this->uvHandle = nullptr;
MS_THROW_ERROR("uv_tcp_init() failed: %s", uv_strerror(err));
}
this->listener = listener;
this->localAddr = localAddr;
this->localIp = localIp;
this->localPort = localPort;
}
void TcpConnectionHandler::Start()
{
MS_TRACE();
if (this->closed)
return;
int err = uv_read_start(
reinterpret_cast<uv_stream_t*>(this->uvHandle),
static_cast<uv_alloc_cb>(onAlloc),
static_cast<uv_read_cb>(onRead));
if (err != 0)
MS_THROW_ERROR("uv_read_start() failed: %s", uv_strerror(err));
if (!SetPeerAddress())
MS_THROW_ERROR("error setting peer IP and port");
}
void TcpConnectionHandler::Write(
const uint8_t* data, size_t len, TcpConnectionHandler::onSendCallback* cb)
{
MS_TRACE();
if (this->closed)
{
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
if (len == 0)
{
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);
int written = uv_try_write(reinterpret_cast<uv_stream_t*>(this->uvHandle), &buffer, 1);
if (written == static_cast<int>(len))
{
this->sentBytes += written;
if (cb)
{
(*cb)(true);
delete cb;
}
return;
}
else if (written == UV_EAGAIN || written == UV_ENOSYS)
{
written = 0;
}
else if (written < 0)
{
MS_WARN_DEV("uv_try_write() failed, trying uv_write(): %s", uv_strerror(written));
written = 0;
}
size_t pendingLen = len - written;
auto* writeData = new UvWriteData(pendingLen);
writeData->req.data = static_cast<void*>(writeData);
std::memcpy(writeData->store, data + written, pendingLen);
writeData->cb = cb;
buffer = uv_buf_init(reinterpret_cast<char*>(writeData->store), pendingLen);
int err = uv_write(
&writeData->req,
reinterpret_cast<uv_stream_t*>(this->uvHandle),
&buffer,
1,
static_cast<uv_write_cb>(onWrite));
if (err != 0)
{
MS_WARN_DEV("uv_write() failed: %s", uv_strerror(err));
if (cb)
(*cb)(false);
delete writeData;
}
else
{
this->sentBytes += pendingLen;
}
}
void TcpConnectionHandler::Write(
const uint8_t* data1,
size_t len1,
const uint8_t* data2,
size_t len2,
TcpConnectionHandler::onSendCallback* cb)
{
MS_TRACE();
if (this->closed)
{
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
if (len1 == 0 && len2 == 0)
{
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
size_t totalLen = len1 + len2;
uv_buf_t buffers[2];
int written{ 0 };
int err;
buffers[0] = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data1)), len1);
buffers[1] = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data2)), len2);
written = uv_try_write(reinterpret_cast<uv_stream_t*>(this->uvHandle), buffers, 2);
if (written == static_cast<int>(totalLen))
{
this->sentBytes += written;
if (cb)
{
(*cb)(true);
delete cb;
}
return;
}
else if (written == UV_EAGAIN || written == UV_ENOSYS)
{
written = 0;
}
else if (written < 0)
{
MS_WARN_DEV("uv_try_write() failed, trying uv_write(): %s", uv_strerror(written));
written = 0;
}
size_t pendingLen = totalLen - written;
auto* writeData = new UvWriteData(pendingLen);
writeData->req.data = static_cast<void*>(writeData);
if (static_cast<size_t>(written) < len1)
{
std::memcpy(
writeData->store, data1 + static_cast<size_t>(written), len1 - static_cast<size_t>(written));
std::memcpy(writeData->store + (len1 - static_cast<size_t>(written)), data2, len2);
}
else
{
std::memcpy(
writeData->store,
data2 + (static_cast<size_t>(written) - len1),
len2 - (static_cast<size_t>(written) - len1));
}
writeData->cb = cb;
uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(writeData->store), pendingLen);
err = uv_write(
&writeData->req,
reinterpret_cast<uv_stream_t*>(this->uvHandle),
&buffer,
1,
static_cast<uv_write_cb>(onWrite));
if (err != 0)
{
MS_WARN_DEV("uv_write() failed: %s", uv_strerror(err));
if (cb)
(*cb)(false);
delete writeData;
}
else
{
this->sentBytes += pendingLen;
}
}
void TcpConnectionHandler::ErrorReceiving()
{
MS_TRACE();
Close();
this->listener->OnTcpConnectionClosed(this);
}
bool TcpConnectionHandler::SetPeerAddress()
{
MS_TRACE();
int err;
int len = sizeof(this->peerAddr);
err = uv_tcp_getpeername(this->uvHandle, reinterpret_cast<struct sockaddr*>(&this->peerAddr), &len);
if (err != 0)
{
MS_ERROR("uv_tcp_getpeername() failed: %s", uv_strerror(err));
return false;
}
int family;
Utils::IP::GetAddressInfo(
reinterpret_cast<const struct sockaddr*>(&this->peerAddr), family, this->peerIp, this->peerPort);
return true;
}
inline void TcpConnectionHandler::OnUvReadAlloc(size_t , uv_buf_t* buf)
{
MS_TRACE();
if (!this->buffer)
this->buffer = new uint8_t[this->bufferSize];
buf->base = reinterpret_cast<char*>(this->buffer + this->bufferDataLen);
if (this->bufferSize > this->bufferDataLen)
{
buf->len = this->bufferSize - this->bufferDataLen;
}
else
{
buf->len = 0;
MS_WARN_DEV("no available space in the buffer");
}
}
inline void TcpConnectionHandler::OnUvRead(ssize_t nread, const uv_buf_t* )
{
MS_TRACE();
if (nread == 0)
return;
if (nread > 0)
{
this->recvBytes += nread;
this->bufferDataLen += static_cast<size_t>(nread);
UserOnTcpConnectionRead();
}
else if (nread == UV_EOF || nread == UV_ECONNRESET)
{
MS_DEBUG_DEV("connection closed by peer, closing server side");
this->isClosedByPeer = true;
Close();
this->listener->OnTcpConnectionClosed(this);
}
else
{
MS_WARN_DEV("read error, closing the connection: %s", uv_strerror(nread));
this->hasError = true;
Close();
this->listener->OnTcpConnectionClosed(this);
}
}
inline void TcpConnectionHandler::OnUvWrite(int status, TcpConnectionHandler::onSendCallback* cb)
{
MS_TRACE();
if (status == 0)
{
if (cb)
(*cb)(true);
}
else
{
if (status != UV_EPIPE && status != UV_ENOTCONN)
this->hasError = true;
MS_WARN_DEV("write error, closing the connection: %s", uv_strerror(status));
if (cb)
(*cb)(false);
Close();
this->listener->OnTcpConnectionClosed(this);
}
}