#define MS_CLASS "Worker"
#include "Worker.hpp"
#ifdef MS_LIBURING_SUPPORTED
#include "DepLibUring.hpp"
#endif
#include "FBS/response.h"
#include "FBS/worker.h"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Settings.hpp"
Worker::Worker(::Channel::ChannelSocket* channel, SharedInterface* shared)
: channel(channel), shared(shared)
{
MS_TRACE();
this->channel->SetListener(this);
this->signalHandle = new SignalHandle(this);
#ifdef MS_EXECUTABLE
{
this->signalHandle->AddSignal(SIGINT, "INT");
this->signalHandle->AddSignal(SIGTERM, "TERM");
}
#endif
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
DepLibUring::StartPollingCQEs();
}
#endif
this->shared->GetChannelNotifier()->Emit(
std::to_string(Logger::Pid), FBS::Notification::Event::WORKER_RUNNING);
MS_DEBUG_DEV("starting libuv loop");
DepLibUV::RunLoop();
MS_DEBUG_DEV("libuv loop ended");
}
Worker::~Worker()
{
MS_TRACE();
if (!this->closed)
{
Close();
}
}
void Worker::Close()
{
MS_TRACE();
if (this->closed)
{
return;
}
this->closed = true;
delete this->signalHandle;
for (auto& kv : this->mapRouters)
{
auto* router = kv.second;
delete router;
}
this->mapRouters.clear();
for (auto& kv : this->mapWebRtcServers)
{
auto* webRtcServer = kv.second;
delete webRtcServer;
}
this->mapWebRtcServers.clear();
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
DepLibUring::StopPollingCQEs();
}
#endif
this->channel->Close();
}
flatbuffers::Offset<FBS::Worker::DumpResponse> Worker::FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const
{
std::vector<flatbuffers::Offset<flatbuffers::String>> webRtcServerIds;
webRtcServerIds.reserve(this->mapWebRtcServers.size());
for (const auto& kv : this->mapWebRtcServers)
{
const auto& webRtcServerId = kv.first;
webRtcServerIds.push_back(builder.CreateString(webRtcServerId));
}
std::vector<flatbuffers::Offset<flatbuffers::String>> routerIds;
routerIds.reserve(this->mapRouters.size());
for (const auto& kv : this->mapRouters)
{
const auto& routerId = kv.first;
routerIds.push_back(builder.CreateString(routerId));
}
auto channelMessageHandlers = this->shared->GetChannelMessageRegistrator()->FillBuffer(builder);
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
return FBS::Worker::CreateDumpResponseDirect(
builder,
Logger::Pid,
&webRtcServerIds,
&routerIds,
channelMessageHandlers,
DepLibUring::FillBuffer(builder));
}
else
{
return FBS::Worker::CreateDumpResponseDirect(
builder, Logger::Pid, &webRtcServerIds, &routerIds, channelMessageHandlers);
}
#else
return FBS::Worker::CreateDumpResponseDirect(
builder, Logger::Pid, &webRtcServerIds, &routerIds, channelMessageHandlers);
#endif
}
flatbuffers::Offset<FBS::Worker::ResourceUsageResponse> Worker::FillBufferResourceUsage(
flatbuffers::FlatBufferBuilder& builder) const
{
MS_TRACE();
int err;
uv_rusage_t uvRusage{};
err = uv_getrusage(std::addressof(uvRusage));
if (err != 0)
{
MS_THROW_ERROR("uv_getrusage() failed: %s", uv_strerror(err));
}
return FBS::Worker::CreateResourceUsageResponse(
builder,
(uvRusage.ru_utime.tv_sec * static_cast<uint64_t>(1000)) + (uvRusage.ru_utime.tv_usec / 1000),
(uvRusage.ru_stime.tv_sec * static_cast<uint64_t>(1000)) + (uvRusage.ru_stime.tv_usec / 1000),
uvRusage.ru_maxrss,
uvRusage.ru_ixrss,
uvRusage.ru_idrss,
uvRusage.ru_isrss,
uvRusage.ru_minflt,
uvRusage.ru_majflt,
uvRusage.ru_nswap,
uvRusage.ru_inblock,
uvRusage.ru_oublock,
uvRusage.ru_msgsnd,
uvRusage.ru_msgrcv,
uvRusage.ru_nsignals,
uvRusage.ru_nvcsw,
uvRusage.ru_nivcsw);
}
RTC::WebRtcServer* Worker::AssertAndGetWebRtcServerById(const std::string& webRtcServerId) const
{
auto it = this->mapWebRtcServers.find(webRtcServerId);
if (it == this->mapWebRtcServers.end())
{
MS_THROW_ERROR("WebRtcServer not found");
}
return it->second;
}
RTC::Router* Worker::AssertAndGetRouterById(const std::string& routerId) const
{
MS_TRACE();
auto it = this->mapRouters.find(routerId);
if (it == this->mapRouters.end())
{
MS_THROW_ERROR("Router not found");
}
return it->second;
}
void Worker::CheckNoWebRtcServer(const std::string& webRtcServerId) const
{
if (this->mapWebRtcServers.find(webRtcServerId) != this->mapWebRtcServers.end())
{
MS_THROW_ERROR("a WebRtcServer with same webRtcServerId already exists");
}
}
void Worker::CheckNoRouter(const std::string& routerId) const
{
if (this->mapRouters.find(routerId) != this->mapRouters.end())
{
MS_THROW_ERROR("a Router with same routerId already exists");
}
}
void Worker::HandleRequest(Channel::ChannelRequest* request)
{
MS_TRACE();
MS_DEBUG_DEV(
"Channel request received [method:%s, id:%" PRIu32 "]", request->methodCStr, request->id);
switch (request->method)
{
case Channel::ChannelRequest::Method::WORKER_DUMP:
{
auto dumpOffset = FillBuffer(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::Worker_DumpResponse, dumpOffset);
break;
}
case Channel::ChannelRequest::Method::WORKER_GET_RESOURCE_USAGE:
{
auto resourceUsageOffset = FillBufferResourceUsage(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::Worker_ResourceUsageResponse, resourceUsageOffset);
break;
}
case Channel::ChannelRequest::Method::WORKER_UPDATE_SETTINGS:
{
Settings::HandleRequest(request);
break;
}
case Channel::ChannelRequest::Method::WORKER_CREATE_WEBRTCSERVER:
{
try
{
const auto* const body = request->data->body_as<FBS::Worker::CreateWebRtcServerRequest>();
const std::string webRtcServerId = body->webRtcServerId()->str();
CheckNoWebRtcServer(webRtcServerId);
auto* webRtcServer = new RTC::WebRtcServer(this->shared, webRtcServerId, body->listenInfos());
this->mapWebRtcServers[webRtcServerId] = webRtcServer;
MS_DEBUG_DEV("WebRtcServer created [webRtcServerId:%s]", webRtcServerId.c_str());
request->Accept();
}
catch (const MediaSoupTypeError& error)
{
MS_THROW_TYPE_ERROR("%s [method:%s]", error.what(), request->methodCStr);
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [method:%s]", error.what(), request->methodCStr);
}
break;
}
case Channel::ChannelRequest::Method::WORKER_WEBRTCSERVER_CLOSE:
{
const RTC::WebRtcServer* webRtcServer{ nullptr };
const auto* body = request->data->body_as<FBS::Worker::CloseWebRtcServerRequest>();
auto webRtcServerId = body->webRtcServerId()->str();
try
{
webRtcServer = AssertAndGetWebRtcServerById(webRtcServerId);
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [method:%s]", error.what(), request->methodCStr);
}
this->mapWebRtcServers.erase(webRtcServer->GetId());
delete webRtcServer;
MS_DEBUG_DEV("WebRtcServer closed [id:%s]", webRtcServer->id.c_str());
request->Accept();
break;
}
case Channel::ChannelRequest::Method::WORKER_CREATE_ROUTER:
{
const auto* body = request->data->body_as<FBS::Worker::CreateRouterRequest>();
auto routerId = body->routerId()->str();
try
{
CheckNoRouter(routerId);
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [method:%s]", error.what(), request->methodCStr);
}
auto* router = new RTC::Router(this->shared, routerId, this);
this->mapRouters[routerId] = router;
MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str());
request->Accept();
break;
}
case Channel::ChannelRequest::Method::WORKER_CLOSE_ROUTER:
{
const RTC::Router* router{ nullptr };
const auto* body = request->data->body_as<FBS::Worker::CloseRouterRequest>();
auto routerId = body->routerId()->str();
try
{
router = AssertAndGetRouterById(routerId);
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [method:%s]", error.what(), request->methodCStr);
}
this->mapRouters.erase(router->id);
delete router;
MS_DEBUG_DEV("Router closed [id:%s]", router->id.c_str());
request->Accept();
break;
}
default:
{
try
{
auto* handler =
this->shared->GetChannelMessageRegistrator()->GetChannelRequestHandler(request->handlerId);
if (handler == nullptr)
{
MS_THROW_ERROR("Channel request handler with ID %s not found", request->handlerId.c_str());
}
handler->HandleRequest(request);
}
catch (const MediaSoupTypeError& error)
{
MS_THROW_TYPE_ERROR("%s [method:%s]", error.what(), request->methodCStr);
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [method:%s]", error.what(), request->methodCStr);
}
break;
}
}
}
void Worker::HandleNotification(Channel::ChannelNotification* notification)
{
MS_TRACE();
MS_DEBUG_DEV("Channel notification received [event:%s]", notification->eventCStr);
switch (notification->event)
{
case Channel::ChannelNotification::Event::WORKER_CLOSE:
{
if (this->closed)
{
return;
}
MS_DEBUG_DEV("closing Worker");
Close();
break;
}
default:
{
try
{
auto* handler = this->shared->GetChannelMessageRegistrator()->GetChannelNotificationHandler(
notification->handlerId);
if (handler == nullptr)
{
MS_THROW_ERROR(
"Channel notification handler with ID %s not found", notification->handlerId.c_str());
}
handler->HandleNotification(notification);
}
catch (const MediaSoupTypeError& error)
{
MS_THROW_TYPE_ERROR("%s [event:%s]", error.what(), notification->eventCStr);
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [event:%s]", error.what(), notification->eventCStr);
}
}
}
}
void Worker::OnChannelClosed(Channel::ChannelSocket* )
{
MS_TRACE_STD();
#ifdef MS_EXECUTABLE
MS_ERROR_STD("channel remotely closed, closing myself");
#endif
Close();
}
void Worker::OnSignal(SignalHandle* , int signum)
{
MS_TRACE();
if (this->closed)
{
return;
}
switch (signum)
{
case SIGINT:
case SIGTERM:
{
MS_DEBUG_DEV("%s signal received, closing myself", signum == SIGINT ? "INT" : "TERM");
Close();
break;
}
default:
{
MS_WARN_DEV("ignoring received non handled signal [signum:%d]", signum);
}
}
}
RTC::WebRtcServer* Worker::OnRouterNeedWebRtcServer(RTC::Router* , std::string& webRtcServerId)
{
MS_TRACE();
RTC::WebRtcServer* webRtcServer{ nullptr };
const auto it = this->mapWebRtcServers.find(webRtcServerId);
if (it != this->mapWebRtcServers.end())
{
webRtcServer = it->second;
}
return webRtcServer;
}