#define MS_CLASS "Worker"
#include "Worker.hpp"
#include "DepLibUV.hpp"
#include "DepUsrSCTP.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Settings.hpp"
#include "Channel/ChannelNotifier.hpp"
Worker::Worker(::Channel::ChannelSocket* channel, PayloadChannel::PayloadChannelSocket* payloadChannel)
: channel(channel), payloadChannel(payloadChannel)
{
MS_TRACE();
this->channel->SetListener(this);
this->payloadChannel->SetListener(this);
this->signalsHandler = new SignalsHandler(this);
#ifdef MS_EXECUTABLE
{
this->signalsHandler->AddSignal(SIGINT, "INT");
this->signalsHandler->AddSignal(SIGTERM, "TERM");
}
#endif
DepUsrSCTP::CreateChecker();
Channel::ChannelNotifier::Emit(Logger::pid, "running");
MS_DEBUG_DEV("starting libuv loop");
DepLibUV::RunLoop();
MS_DEBUG_DEV("libuv loop ended");
}
Worker::~Worker()
{
MS_TRACE();
if (!this->closed)
Close();
}
void Worker::Close()
{
MS_TRACE();
if (this->closed)
return;
this->closed = true;
delete this->signalsHandler;
for (auto& kv : this->mapRouters)
{
auto* router = kv.second;
delete router;
}
this->mapRouters.clear();
DepUsrSCTP::CloseChecker();
this->channel->Close();
this->payloadChannel->Close();
}
void Worker::FillJson(json& jsonObject) const
{
MS_TRACE();
jsonObject["pid"] = Logger::pid;
jsonObject["routerIds"] = json::array();
auto jsonRouterIdsIt = jsonObject.find("routerIds");
for (auto& kv : this->mapRouters)
{
auto& routerId = kv.first;
jsonRouterIdsIt->emplace_back(routerId);
}
}
void Worker::FillJsonResourceUsage(json& jsonObject) const
{
MS_TRACE();
int err;
uv_rusage_t uvRusage;
err = uv_getrusage(std::addressof(uvRusage));
if (err != 0)
MS_THROW_ERROR("uv_getrusagerequest() failed: %s", uv_strerror(err));
jsonObject["ru_utime"] =
(uvRusage.ru_utime.tv_sec * static_cast<uint64_t>(1000)) + (uvRusage.ru_utime.tv_usec / 1000);
jsonObject["ru_stime"] =
(uvRusage.ru_stime.tv_sec * static_cast<uint64_t>(1000)) + (uvRusage.ru_stime.tv_usec / 1000);
jsonObject["ru_maxrss"] = uvRusage.ru_maxrss;
jsonObject["ru_ixrss"] = uvRusage.ru_ixrss;
jsonObject["ru_idrss"] = uvRusage.ru_idrss;
jsonObject["ru_isrss"] = uvRusage.ru_isrss;
jsonObject["ru_minflt"] = uvRusage.ru_minflt;
jsonObject["ru_majflt"] = uvRusage.ru_majflt;
jsonObject["ru_nswap"] = uvRusage.ru_nswap;
jsonObject["ru_inblock"] = uvRusage.ru_inblock;
jsonObject["ru_oublock"] = uvRusage.ru_oublock;
jsonObject["ru_msgsnd"] = uvRusage.ru_msgsnd;
jsonObject["ru_msgrcv"] = uvRusage.ru_msgrcv;
jsonObject["ru_nsignals"] = uvRusage.ru_nsignals;
jsonObject["ru_nvcsw"] = uvRusage.ru_nvcsw;
jsonObject["ru_nivcsw"] = uvRusage.ru_nivcsw;
}
void Worker::SetNewRouterIdFromInternal(json& internal, std::string& routerId) const
{
MS_TRACE();
auto jsonRouterIdIt = internal.find("routerId");
if (jsonRouterIdIt == internal.end() || !jsonRouterIdIt->is_string())
MS_THROW_ERROR("missing internal.routerId");
routerId.assign(jsonRouterIdIt->get<std::string>());
if (this->mapRouters.find(routerId) != this->mapRouters.end())
MS_THROW_ERROR("a Router with same routerId already exists");
}
RTC::Router* Worker::GetRouterFromInternal(json& internal) const
{
MS_TRACE();
auto jsonRouterIdIt = internal.find("routerId");
if (jsonRouterIdIt == internal.end() || !jsonRouterIdIt->is_string())
MS_THROW_ERROR("missing internal.routerId");
auto it = this->mapRouters.find(jsonRouterIdIt->get<std::string>());
if (it == this->mapRouters.end())
MS_THROW_ERROR("Router not found");
RTC::Router* router = it->second;
return router;
}
inline void Worker::OnChannelRequest(Channel::ChannelSocket* , Channel::ChannelRequest* request)
{
MS_TRACE();
MS_DEBUG_DEV(
"Channel request received [method:%s, id:%" PRIu32 "]", request->method.c_str(), request->id);
switch (request->methodId)
{
case Channel::ChannelRequest::MethodId::WORKER_CLOSE:
{
if (this->closed)
return;
MS_DEBUG_DEV("Worker close request, stopping");
Close();
break;
}
case Channel::ChannelRequest::MethodId::WORKER_DUMP:
{
json data = json::object();
FillJson(data);
request->Accept(data);
break;
}
case Channel::ChannelRequest::MethodId::WORKER_GET_RESOURCE_USAGE:
{
json data = json::object();
FillJsonResourceUsage(data);
request->Accept(data);
break;
}
case Channel::ChannelRequest::MethodId::WORKER_UPDATE_SETTINGS:
{
Settings::HandleRequest(request);
break;
}
case Channel::ChannelRequest::MethodId::WORKER_CREATE_ROUTER:
{
std::string routerId;
try
{
SetNewRouterIdFromInternal(request->internal, routerId);
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [method:%s]", error.what(), request->method.c_str());
}
auto* router = new RTC::Router(routerId);
this->mapRouters[routerId] = router;
MS_DEBUG_DEV("Router created [routerId:%s]", routerId.c_str());
request->Accept();
break;
}
case Channel::ChannelRequest::MethodId::ROUTER_CLOSE:
{
RTC::Router* router{ nullptr };
try
{
router = GetRouterFromInternal(request->internal);
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [method:%s]", error.what(), request->method.c_str());
}
this->mapRouters.erase(router->id);
delete router;
MS_DEBUG_DEV("Router closed [id:%s]", router->id.c_str());
request->Accept();
break;
}
default:
{
try
{
RTC::Router* router = GetRouterFromInternal(request->internal);
router->HandleRequest(request);
}
catch (const MediaSoupTypeError& error)
{
MS_THROW_TYPE_ERROR("%s [method:%s]", error.what(), request->method.c_str());
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [method:%s]", error.what(), request->method.c_str());
}
break;
}
}
}
inline void Worker::OnChannelClosed(Channel::ChannelSocket* )
{
MS_TRACE_STD();
#ifdef MS_EXECUTABLE
MS_ERROR_STD("channel remotely closed, closing myself");
#endif
Close();
}
inline void Worker::OnPayloadChannelNotification(
PayloadChannel::PayloadChannelSocket* , PayloadChannel::Notification* notification)
{
MS_TRACE();
MS_DEBUG_DEV("PayloadChannel notification received [event:%s]", notification->event.c_str());
try
{
RTC::Router* router = GetRouterFromInternal(notification->internal);
router->HandleNotification(notification);
}
catch (const MediaSoupTypeError& error)
{
MS_THROW_TYPE_ERROR("%s [event:%s]", error.what(), notification->event.c_str());
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [method:%s]", error.what(), notification->event.c_str());
}
}
inline void Worker::OnPayloadChannelRequest(
PayloadChannel::PayloadChannelSocket* ,
PayloadChannel::PayloadChannelRequest* request)
{
MS_TRACE();
MS_DEBUG_DEV(
"PayloadChannel request received [method:%s, id:%" PRIu32 "]",
request->method.c_str(),
request->id);
try
{
RTC::Router* router = GetRouterFromInternal(request->internal);
router->HandleRequest(request);
}
catch (const MediaSoupTypeError& error)
{
MS_THROW_TYPE_ERROR("%s [method:%s]", error.what(), request->method.c_str());
}
catch (const MediaSoupError& error)
{
MS_THROW_ERROR("%s [method:%s]", error.what(), request->method.c_str());
}
}
inline void Worker::OnPayloadChannelClosed(PayloadChannel::PayloadChannelSocket* )
{
MS_TRACE();
#ifdef MS_EXECUTABLE
MS_ERROR_STD("payloadChannel remotely closed, closing myself");
#endif
Close();
}
inline void Worker::OnSignal(SignalsHandler* , int signum)
{
MS_TRACE();
if (this->closed)
return;
switch (signum)
{
case SIGINT:
{
if (this->closed)
return;
MS_DEBUG_DEV("INT signal received, closing myself");
Close();
break;
}
case SIGTERM:
{
if (this->closed)
return;
MS_DEBUG_DEV("TERM signal received, closing myself");
Close();
break;
}
default:
{
MS_WARN_DEV("received a non handled signal [signum:%d]", signum);
}
}
}