#define MS_CLASS "UdpSocketHandler"
#include "handles/UdpSocketHandler.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include <cstring>
static constexpr size_t ReadBufferSize{ 65536 };
thread_local static uint8_t ReadBuffer[ReadBufferSize];
inline static void onAlloc(uv_handle_t* handle, size_t suggestedSize, uv_buf_t* buf)
{
auto* socket = static_cast<UdpSocketHandler*>(handle->data);
if (socket)
socket->OnUvRecvAlloc(suggestedSize, buf);
}
inline static void onRecv(
uv_udp_t* handle, ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned int flags)
{
auto* socket = static_cast<UdpSocketHandler*>(handle->data);
if (socket)
socket->OnUvRecv(nread, buf, addr, flags);
}
inline static void onSend(uv_udp_send_t* req, int status)
{
auto* sendData = static_cast<UdpSocketHandler::UvSendData*>(req->data);
auto* handle = req->handle;
auto* socket = static_cast<UdpSocketHandler*>(handle->data);
auto* cb = sendData->cb;
if (socket)
socket->OnUvSend(status, cb);
delete sendData;
}
inline static void onClose(uv_handle_t* handle)
{
delete handle;
}
UdpSocketHandler::UdpSocketHandler(uv_udp_t* uvHandle) : uvHandle(uvHandle)
{
MS_TRACE();
int err;
this->uvHandle->data = static_cast<void*>(this);
err = uv_udp_recv_start(
this->uvHandle, static_cast<uv_alloc_cb>(onAlloc), static_cast<uv_udp_recv_cb>(onRecv));
if (err != 0)
{
uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClose));
MS_THROW_ERROR("uv_udp_recv_start() failed: %s", uv_strerror(err));
}
if (!SetLocalAddress())
{
uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClose));
MS_THROW_ERROR("error setting local IP and port");
}
}
UdpSocketHandler::~UdpSocketHandler()
{
MS_TRACE();
if (!this->closed)
Close();
}
void UdpSocketHandler::Close()
{
MS_TRACE();
if (this->closed)
return;
this->closed = true;
this->uvHandle->data = nullptr;
int err = uv_udp_recv_stop(this->uvHandle);
if (err != 0)
MS_ABORT("uv_udp_recv_stop() failed: %s", uv_strerror(err));
uv_close(reinterpret_cast<uv_handle_t*>(this->uvHandle), static_cast<uv_close_cb>(onClose));
}
void UdpSocketHandler::Dump() const
{
MS_DUMP("<UdpSocketHandler>");
MS_DUMP(" localIp : %s", this->localIp.c_str());
MS_DUMP(" localPort : %" PRIu16, static_cast<uint16_t>(this->localPort));
MS_DUMP(" closed : %s", !this->closed ? "open" : "closed");
MS_DUMP("</UdpSocketHandler>");
}
void UdpSocketHandler::Send(
const uint8_t* data, size_t len, const struct sockaddr* addr, UdpSocketHandler::onSendCallback* cb)
{
MS_TRACE();
if (this->closed)
{
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
if (len == 0)
{
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
uv_buf_t buffer = uv_buf_init(reinterpret_cast<char*>(const_cast<uint8_t*>(data)), len);
int sent = uv_udp_try_send(this->uvHandle, &buffer, 1, addr);
if (sent == static_cast<int>(len))
{
this->sentBytes += sent;
if (cb)
{
(*cb)(true);
delete cb;
}
return;
}
else if (sent >= 0)
{
MS_WARN_DEV("datagram truncated (just %d of %zu bytes were sent)", sent, len);
this->sentBytes += sent;
if (cb)
{
(*cb)(false);
delete cb;
}
return;
}
else if (sent != UV_EAGAIN)
{
MS_WARN_DEV("uv_udp_try_send() failed, trying uv_udp_send(): %s", uv_strerror(sent));
}
auto* sendData = new UvSendData(len);
sendData->req.data = static_cast<void*>(sendData);
std::memcpy(sendData->store, data, len);
sendData->cb = cb;
buffer = uv_buf_init(reinterpret_cast<char*>(sendData->store), len);
int err = uv_udp_send(
&sendData->req, this->uvHandle, &buffer, 1, addr, static_cast<uv_udp_send_cb>(onSend));
if (err != 0)
{
MS_WARN_DEV("uv_udp_send() failed: %s", uv_strerror(err));
if (cb)
(*cb)(false);
delete sendData;
}
else
{
this->sentBytes += len;
}
}
bool UdpSocketHandler::SetLocalAddress()
{
MS_TRACE();
int err;
int len = sizeof(this->localAddr);
err =
uv_udp_getsockname(this->uvHandle, reinterpret_cast<struct sockaddr*>(&this->localAddr), &len);
if (err != 0)
{
MS_ERROR("uv_udp_getsockname() failed: %s", uv_strerror(err));
return false;
}
int family;
Utils::IP::GetAddressInfo(
reinterpret_cast<const struct sockaddr*>(&this->localAddr), family, this->localIp, this->localPort);
return true;
}
inline void UdpSocketHandler::OnUvRecvAlloc(size_t , uv_buf_t* buf)
{
MS_TRACE();
buf->base = reinterpret_cast<char*>(ReadBuffer);
buf->len = ReadBufferSize;
}
inline void UdpSocketHandler::OnUvRecv(
ssize_t nread, const uv_buf_t* buf, const struct sockaddr* addr, unsigned int flags)
{
MS_TRACE();
if (nread == 0)
return;
if ((flags & UV_UDP_PARTIAL) != 0u)
{
MS_ERROR("received datagram was truncated due to insufficient buffer, ignoring it");
return;
}
if (nread > 0)
{
this->recvBytes += nread;
UserOnUdpDatagramReceived(reinterpret_cast<uint8_t*>(buf->base), nread, addr);
}
else
{
MS_DEBUG_DEV("read error: %s", uv_strerror(nread));
}
}
inline void UdpSocketHandler::OnUvSend(int status, UdpSocketHandler::onSendCallback* cb)
{
MS_TRACE();
if (status == 0)
{
if (cb)
(*cb)(true);
}
else
{
#if MS_LOG_DEV_LEVEL == 3
MS_DEBUG_DEV("send error: %s", uv_strerror(status));
#endif
if (cb)
(*cb)(false);
}
}