#define MS_CLASS "Channel::ChannelSocket"
#include "Channel/ChannelSocket.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include <cmath>
#include <cstdio>
#include <cstring>
extern "C"
{
#include <netstring.h>
}
namespace Channel
{
static constexpr size_t NsMessageMaxLen{ 4194313 };
static constexpr size_t NsPayloadMaxLen{ 4194304 };
ChannelSocket::ChannelSocket(int consumerFd, int producerFd)
: consumerSocket(consumerFd, NsMessageMaxLen, this), producerSocket(producerFd, NsMessageMaxLen)
{
MS_TRACE_STD();
this->writeBuffer = static_cast<uint8_t*>(std::malloc(NsMessageMaxLen));
}
ChannelSocket::~ChannelSocket()
{
MS_TRACE_STD();
std::free(this->writeBuffer);
}
void ChannelSocket::Close()
{
MS_TRACE_STD();
this->consumerSocket.Close();
this->producerSocket.Close();
}
void ChannelSocket::SetListener(Listener* listener)
{
MS_TRACE_STD();
this->listener = listener;
}
void ChannelSocket::Send(json& jsonMessage)
{
MS_TRACE_STD();
if (this->producerSocket.IsClosed())
return;
std::string message = jsonMessage.dump();
if (message.length() > NsPayloadMaxLen)
{
MS_ERROR_STD("mesage too big");
return;
}
SendImpl(message.c_str(), message.length());
}
void ChannelSocket::SendLog(char* message, size_t messageLen)
{
MS_TRACE_STD();
if (this->producerSocket.IsClosed())
return;
if (messageLen > NsPayloadMaxLen)
{
MS_ERROR_STD("mesage too big");
return;
}
SendImpl(message, messageLen);
}
inline void ChannelSocket::SendImpl(const void* nsPayload, size_t nsPayloadLen)
{
MS_TRACE_STD();
size_t nsNumLen;
if (nsPayloadLen == 0)
{
nsNumLen = 1;
this->writeBuffer[0] = '0';
this->writeBuffer[1] = ':';
this->writeBuffer[2] = ',';
}
else
{
nsNumLen = static_cast<size_t>(std::ceil(std::log10(static_cast<double>(nsPayloadLen) + 1)));
std::sprintf(reinterpret_cast<char*>(this->writeBuffer), "%zu:", nsPayloadLen);
std::memcpy(this->writeBuffer + nsNumLen + 1, nsPayload, nsPayloadLen);
this->writeBuffer[nsNumLen + nsPayloadLen + 1] = ',';
}
size_t nsLen = nsNumLen + nsPayloadLen + 2;
this->producerSocket.Write(this->writeBuffer, nsLen);
}
void ChannelSocket::OnConsumerSocketMessage(ConsumerSocket* , char* msg, size_t msgLen)
{
MS_TRACE_STD();
try
{
json jsonMessage = json::parse(msg, msg + msgLen);
auto* request = new Channel::ChannelRequest(this, jsonMessage);
try
{
this->listener->OnChannelRequest(this, request);
}
catch (const MediaSoupTypeError& error)
{
request->TypeError(error.what());
}
catch (const MediaSoupError& error)
{
request->Error(error.what());
}
delete request;
}
catch (const json::parse_error& error)
{
MS_ERROR_STD("JSON parsing error: %s", error.what());
}
catch (const MediaSoupError& error)
{
MS_ERROR_STD("discarding wrong Channel request");
}
}
void ChannelSocket::OnConsumerSocketClosed(ConsumerSocket* )
{
MS_TRACE_STD();
this->listener->OnChannelClosed(this);
}
ConsumerSocket::ConsumerSocket(int fd, size_t bufferSize, Listener* listener)
: ::UnixStreamSocket(fd, bufferSize, ::UnixStreamSocket::Role::CONSUMER), listener(listener)
{
MS_TRACE_STD();
}
void ConsumerSocket::UserOnUnixStreamRead()
{
MS_TRACE_STD();
while (true)
{
if (IsClosed())
return;
size_t readLen = this->bufferDataLen - this->msgStart;
char* msgStart = nullptr;
size_t msgLen;
int nsRet = netstring_read(
reinterpret_cast<char*>(this->buffer + this->msgStart), readLen, &msgStart, &msgLen);
if (nsRet != 0)
{
switch (nsRet)
{
case NETSTRING_ERROR_TOO_SHORT:
{
if (this->bufferDataLen == this->bufferSize)
{
if (this->msgStart != 0)
{
std::memmove(this->buffer, this->buffer + this->msgStart, readLen);
this->msgStart = 0;
this->bufferDataLen = readLen;
}
else
{
MS_ERROR_STD(
"no more space in the buffer for the unfinished message being parsed, "
"discarding it");
this->msgStart = 0;
this->bufferDataLen = 0;
}
}
return;
}
case NETSTRING_ERROR_TOO_LONG:
{
MS_ERROR_STD("NETSTRING_ERROR_TOO_LONG");
break;
}
case NETSTRING_ERROR_NO_COLON:
{
MS_ERROR_STD("NETSTRING_ERROR_NO_COLON");
break;
}
case NETSTRING_ERROR_NO_COMMA:
{
MS_ERROR_STD("NETSTRING_ERROR_NO_COMMA");
break;
}
case NETSTRING_ERROR_LEADING_ZERO:
{
MS_ERROR_STD("NETSTRING_ERROR_LEADING_ZERO");
break;
}
case NETSTRING_ERROR_NO_LENGTH:
{
MS_ERROR_STD("NETSTRING_ERROR_NO_LENGTH");
break;
}
}
this->msgStart = 0;
this->bufferDataLen = 0;
return;
}
readLen =
reinterpret_cast<const uint8_t*>(msgStart) - (this->buffer + this->msgStart) + msgLen + 1;
this->listener->OnConsumerSocketMessage(this, msgStart, msgLen);
if ((this->msgStart + readLen) == this->bufferSize)
{
this->msgStart = 0;
this->bufferDataLen = 0;
}
else
{
this->msgStart += readLen;
}
if (this->bufferDataLen > this->msgStart)
{
continue;
}
break;
}
}
void ConsumerSocket::UserOnUnixStreamSocketClosed()
{
MS_TRACE_STD();
this->listener->OnConsumerSocketClosed(this);
}
ProducerSocket::ProducerSocket(int fd, size_t bufferSize)
: ::UnixStreamSocket(fd, bufferSize, ::UnixStreamSocket::Role::PRODUCER)
{
MS_TRACE_STD();
}
}