#define MS_CLASS "RTC::Router"
#include "RTC/Router.hpp"
#ifdef MS_LIBURING_SUPPORTED
#include "DepLibUring.hpp"
#endif
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "RTC/ActiveSpeakerObserver.hpp"
#include "RTC/AudioLevelObserver.hpp"
#include "RTC/DirectTransport.hpp"
#include "RTC/PipeTransport.hpp"
#include "RTC/PlainTransport.hpp"
#include "RTC/SharedRtpPacket.hpp"
#include "RTC/WebRtcTransport.hpp"
namespace RTC
{
Router::Router(RTC::Shared* shared, const std::string& id, Listener* listener)
: id(id), shared(shared), listener(listener)
{
MS_TRACE();
this->shared->channelMessageRegistrator->RegisterHandler(
this->id,
this,
nullptr);
}
Router::~Router()
{
MS_TRACE();
this->shared->channelMessageRegistrator->UnregisterHandler(this->id);
for (auto& kv : this->mapTransports)
{
auto* transport = kv.second;
delete transport;
}
this->mapTransports.clear();
for (auto& kv : this->mapRtpObservers)
{
auto* rtpObserver = kv.second;
delete rtpObserver;
}
this->mapRtpObservers.clear();
this->mapProducerConsumers.clear();
this->mapConsumerProducer.clear();
this->mapProducerRtpObservers.clear();
this->mapProducers.clear();
this->mapDataProducerDataConsumers.clear();
this->mapDataConsumerDataProducer.clear();
this->mapDataProducers.clear();
}
flatbuffers::Offset<FBS::Router::DumpResponse> Router::FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const
{
MS_TRACE();
std::vector<flatbuffers::Offset<flatbuffers::String>> transportIds;
transportIds.reserve(this->mapTransports.size());
for (const auto& kv : this->mapTransports)
{
const auto& transportId = kv.first;
transportIds.push_back(builder.CreateString(transportId));
}
std::vector<flatbuffers::Offset<flatbuffers::String>> rtpObserverIds;
rtpObserverIds.reserve(this->mapRtpObservers.size());
for (const auto& kv : this->mapRtpObservers)
{
const auto& rtpObserverId = kv.first;
rtpObserverIds.push_back(builder.CreateString(rtpObserverId));
}
std::vector<flatbuffers::Offset<FBS::Common::StringStringArray>> mapProducerIdConsumerIds;
mapProducerIdConsumerIds.reserve(this->mapProducerConsumers.size());
for (const auto& kv : this->mapProducerConsumers)
{
auto* producer = kv.first;
const auto& consumers = kv.second;
std::vector<flatbuffers::Offset<flatbuffers::String>> consumerIds;
consumerIds.reserve(consumers.size());
for (auto* consumer : consumers)
{
consumerIds.emplace_back(builder.CreateString(consumer->id));
}
mapProducerIdConsumerIds.emplace_back(
FBS::Common::CreateStringStringArrayDirect(builder, producer->id.c_str(), &consumerIds));
}
std::vector<flatbuffers::Offset<FBS::Common::StringString>> mapConsumerIdProducerId;
mapConsumerIdProducerId.reserve(this->mapConsumerProducer.size());
for (const auto& kv : this->mapConsumerProducer)
{
auto* consumer = kv.first;
auto* producer = kv.second;
mapConsumerIdProducerId.emplace_back(
FBS::Common::CreateStringStringDirect(builder, consumer->id.c_str(), producer->id.c_str()));
}
std::vector<flatbuffers::Offset<FBS::Common::StringStringArray>> mapProducerIdObserverIds;
mapProducerIdObserverIds.reserve(this->mapProducerRtpObservers.size());
for (const auto& kv : this->mapProducerRtpObservers)
{
auto* producer = kv.first;
const auto& rtpObservers = kv.second;
std::vector<flatbuffers::Offset<flatbuffers::String>> observerIds;
observerIds.reserve(rtpObservers.size());
for (auto* rtpObserver : rtpObservers)
{
observerIds.emplace_back(builder.CreateString(rtpObserver->id));
}
mapProducerIdObserverIds.emplace_back(
FBS::Common::CreateStringStringArrayDirect(builder, producer->id.c_str(), &observerIds));
}
std::vector<flatbuffers::Offset<FBS::Common::StringStringArray>> mapDataProducerIdDataConsumerIds;
mapDataProducerIdDataConsumerIds.reserve(this->mapDataProducerDataConsumers.size());
for (const auto& kv : this->mapDataProducerDataConsumers)
{
auto* dataProducer = kv.first;
const auto& dataConsumers = kv.second;
std::vector<flatbuffers::Offset<flatbuffers::String>> dataConsumerIds;
dataConsumerIds.reserve(dataConsumers.size());
for (auto* dataConsumer : dataConsumers)
{
dataConsumerIds.emplace_back(builder.CreateString(dataConsumer->id));
}
mapDataProducerIdDataConsumerIds.emplace_back(FBS::Common::CreateStringStringArrayDirect(
builder, dataProducer->id.c_str(), &dataConsumerIds));
}
std::vector<flatbuffers::Offset<FBS::Common::StringString>> mapDataConsumerIdDataProducerId;
mapDataConsumerIdDataProducerId.reserve(this->mapDataConsumerDataProducer.size());
for (const auto& kv : this->mapDataConsumerDataProducer)
{
auto* dataConsumer = kv.first;
auto* dataProducer = kv.second;
mapDataConsumerIdDataProducerId.emplace_back(FBS::Common::CreateStringStringDirect(
builder, dataConsumer->id.c_str(), dataProducer->id.c_str()));
}
return FBS::Router::CreateDumpResponseDirect(
builder,
this->id.c_str(),
&transportIds,
&rtpObserverIds,
&mapProducerIdConsumerIds,
&mapConsumerIdProducerId,
&mapProducerIdObserverIds,
&mapDataProducerIdDataConsumerIds,
&mapDataConsumerIdDataProducerId);
}
void Router::HandleRequest(Channel::ChannelRequest* request)
{
MS_TRACE();
switch (request->method)
{
case Channel::ChannelRequest::Method::ROUTER_DUMP:
{
auto dumpOffset = FillBuffer(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::Router_DumpResponse, dumpOffset);
break;
}
case Channel::ChannelRequest::Method::ROUTER_CREATE_WEBRTCTRANSPORT:
{
const auto* body = request->data->body_as<FBS::Router::CreateWebRtcTransportRequest>();
auto transportId = body->transportId()->str();
CheckNoTransport(transportId);
auto* webRtcTransport =
new RTC::WebRtcTransport(this->shared, transportId, this, body->options());
this->mapTransports[transportId] = webRtcTransport;
MS_DEBUG_DEV("WebRtcTransport created [transportId:%s]", transportId.c_str());
auto dumpOffset = webRtcTransport->FillBuffer(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::WebRtcTransport_DumpResponse, dumpOffset);
break;
}
case Channel::ChannelRequest::Method::ROUTER_CREATE_WEBRTCTRANSPORT_WITH_SERVER:
{
const auto* body = request->data->body_as<FBS::Router::CreateWebRtcTransportRequest>();
auto transportId = body->transportId()->str();
CheckNoTransport(transportId);
const auto* options = body->options();
const auto* listenInfo = options->listen_as<FBS::WebRtcTransport::ListenServer>();
auto webRtcServerId = listenInfo->webRtcServerId()->str();
auto* webRtcServer = this->listener->OnRouterNeedWebRtcServer(this, webRtcServerId);
if (!webRtcServer)
{
MS_THROW_ERROR("wrong webRtcServerId (no associated WebRtcServer found)");
}
auto iceCandidates = webRtcServer->GetIceCandidates(
options->enableUdp(), options->enableTcp(), options->preferUdp(), options->preferTcp());
auto* webRtcTransport = new RTC::WebRtcTransport(
this->shared, transportId, this, webRtcServer, iceCandidates, options);
this->mapTransports[transportId] = webRtcTransport;
MS_DEBUG_DEV(
"WebRtcTransport with WebRtcServer created [transportId:%s]", transportId.c_str());
auto dumpOffset = webRtcTransport->FillBuffer(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::WebRtcTransport_DumpResponse, dumpOffset);
break;
}
case Channel::ChannelRequest::Method::ROUTER_CREATE_PLAINTRANSPORT:
{
const auto* body = request->data->body_as<FBS::Router::CreatePlainTransportRequest>();
auto transportId = body->transportId()->str();
CheckNoTransport(transportId);
auto* plainTransport =
new RTC::PlainTransport(this->shared, transportId, this, body->options());
this->mapTransports[transportId] = plainTransport;
MS_DEBUG_DEV("PlainTransport created [transportId:%s]", transportId.c_str());
auto dumpOffset = plainTransport->FillBuffer(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::PlainTransport_DumpResponse, dumpOffset);
break;
}
case Channel::ChannelRequest::Method::ROUTER_CREATE_PIPETRANSPORT:
{
const auto* body = request->data->body_as<FBS::Router::CreatePipeTransportRequest>();
auto transportId = body->transportId()->str();
CheckNoTransport(transportId);
auto* pipeTransport =
new RTC::PipeTransport(this->shared, transportId, this, body->options());
this->mapTransports[transportId] = pipeTransport;
MS_DEBUG_DEV("PipeTransport created [transportId:%s]", transportId.c_str());
auto dumpOffset = pipeTransport->FillBuffer(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::PipeTransport_DumpResponse, dumpOffset);
break;
}
case Channel::ChannelRequest::Method::ROUTER_CREATE_DIRECTTRANSPORT:
{
const auto* body = request->data->body_as<FBS::Router::CreateDirectTransportRequest>();
auto transportId = body->transportId()->str();
CheckNoTransport(transportId);
auto* directTransport =
new RTC::DirectTransport(this->shared, transportId, this, body->options());
this->mapTransports[transportId] = directTransport;
MS_DEBUG_DEV("DirectTransport created [transportId:%s]", transportId.c_str());
auto dumpOffset = directTransport->FillBuffer(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::DirectTransport_DumpResponse, dumpOffset);
break;
}
case Channel::ChannelRequest::Method::ROUTER_CREATE_ACTIVESPEAKEROBSERVER:
{
const auto* body = request->data->body_as<FBS::Router::CreateActiveSpeakerObserverRequest>();
auto rtpObserverId = body->rtpObserverId()->str();
CheckNoRtpObserver(rtpObserverId);
auto* activeSpeakerObserver =
new RTC::ActiveSpeakerObserver(this->shared, rtpObserverId, this, body->options());
this->mapRtpObservers[rtpObserverId] = activeSpeakerObserver;
MS_DEBUG_DEV("ActiveSpeakerObserver created [rtpObserverId:%s]", rtpObserverId.c_str());
request->Accept();
break;
}
case Channel::ChannelRequest::Method::ROUTER_CREATE_AUDIOLEVELOBSERVER:
{
const auto* body = request->data->body_as<FBS::Router::CreateAudioLevelObserverRequest>();
auto rtpObserverId = body->rtpObserverId()->str();
CheckNoRtpObserver(rtpObserverId);
auto* audioLevelObserver =
new RTC::AudioLevelObserver(this->shared, rtpObserverId, this, body->options());
this->mapRtpObservers[rtpObserverId] = audioLevelObserver;
MS_DEBUG_DEV("AudioLevelObserver created [rtpObserverId:%s]", rtpObserverId.c_str());
request->Accept();
break;
}
case Channel::ChannelRequest::Method::ROUTER_CLOSE_TRANSPORT:
{
const auto* body = request->data->body_as<FBS::Router::CloseTransportRequest>();
auto transportId = body->transportId()->str();
RTC::Transport* transport = GetTransportById(transportId);
transport->CloseProducersAndConsumers();
this->mapTransports.erase(transport->id);
MS_DEBUG_DEV("Transport closed [transportId:%s]", transport->id.c_str());
delete transport;
request->Accept();
break;
}
case Channel::ChannelRequest::Method::ROUTER_CLOSE_RTPOBSERVER:
{
const auto* body = request->data->body_as<FBS::Router::CloseRtpObserverRequest>();
auto rtpObserverId = body->rtpObserverId()->str();
RTC::RtpObserver* rtpObserver = GetRtpObserverById(rtpObserverId);
this->mapRtpObservers.erase(rtpObserver->id);
for (auto& kv : this->mapProducerRtpObservers)
{
auto& rtpObservers = kv.second;
rtpObservers.erase(rtpObserver);
}
MS_DEBUG_DEV("RtpObserver closed [rtpObserverId:%s]", rtpObserver->id.c_str());
delete rtpObserver;
request->Accept();
break;
}
default:
{
MS_THROW_ERROR("unknown method '%s'", Channel::ChannelRequest::method2String[request->method]);
}
}
}
void Router::CheckNoTransport(const std::string& transportId) const
{
if (this->mapTransports.find(transportId) != this->mapTransports.end())
{
MS_THROW_ERROR("a Transport with same id already exists");
}
}
void Router::CheckNoRtpObserver(const std::string& rtpObserverId) const
{
if (this->mapRtpObservers.find(rtpObserverId) != this->mapRtpObservers.end())
{
MS_THROW_ERROR("an RtpObserver with same id already exists");
}
}
RTC::Transport* Router::GetTransportById(const std::string& transportId) const
{
MS_TRACE();
auto it = this->mapTransports.find(transportId);
if (this->mapTransports.find(transportId) == this->mapTransports.end())
{
MS_THROW_ERROR("Transport not found");
}
return it->second;
}
RTC::RtpObserver* Router::GetRtpObserverById(const std::string& rtpObserverId) const
{
MS_TRACE();
auto it = this->mapRtpObservers.find(rtpObserverId);
if (this->mapRtpObservers.find(rtpObserverId) == this->mapRtpObservers.end())
{
MS_THROW_ERROR("RtpObserver not found");
}
return it->second;
}
inline void Router::OnTransportNewProducer(RTC::Transport* , RTC::Producer* producer)
{
MS_TRACE();
MS_ASSERT(
this->mapProducerConsumers.find(producer) == this->mapProducerConsumers.end(),
"Producer already present in mapProducerConsumers");
if (this->mapProducers.find(producer->id) != this->mapProducers.end())
{
MS_THROW_ERROR("Producer already present in mapProducers [producerId:%s]", producer->id.c_str());
}
this->mapProducers[producer->id] = producer;
this->mapProducerConsumers[producer];
this->mapProducerRtpObservers[producer];
}
inline void Router::OnTransportProducerClosed(RTC::Transport* , RTC::Producer* producer)
{
MS_TRACE();
auto mapProducerConsumersIt = this->mapProducerConsumers.find(producer);
auto mapProducersIt = this->mapProducers.find(producer->id);
auto mapProducerRtpObserversIt = this->mapProducerRtpObservers.find(producer);
MS_ASSERT(
mapProducerConsumersIt != this->mapProducerConsumers.end(),
"Producer not present in mapProducerConsumers");
MS_ASSERT(mapProducersIt != this->mapProducers.end(), "Producer not present in mapProducers");
MS_ASSERT(
mapProducerRtpObserversIt != this->mapProducerRtpObservers.end(),
"Producer not present in mapProducerRtpObservers");
auto& consumers = mapProducerConsumersIt->second;
for (auto* consumer : consumers)
{
consumer->ProducerClosed();
}
auto& rtpObservers = mapProducerRtpObserversIt->second;
for (auto* rtpObserver : rtpObservers)
{
rtpObserver->RemoveProducer(producer);
}
this->mapProducers.erase(mapProducersIt);
this->mapProducerConsumers.erase(mapProducerConsumersIt);
this->mapProducerRtpObservers.erase(mapProducerRtpObserversIt);
}
inline void Router::OnTransportProducerPaused(RTC::Transport* , RTC::Producer* producer)
{
MS_TRACE();
auto& consumers = this->mapProducerConsumers.at(producer);
for (auto* consumer : consumers)
{
consumer->ProducerPaused();
}
auto it = this->mapProducerRtpObservers.find(producer);
if (it != this->mapProducerRtpObservers.end())
{
auto& rtpObservers = it->second;
for (auto* rtpObserver : rtpObservers)
{
rtpObserver->ProducerPaused(producer);
}
}
}
inline void Router::OnTransportProducerResumed(RTC::Transport* , RTC::Producer* producer)
{
MS_TRACE();
auto& consumers = this->mapProducerConsumers.at(producer);
for (auto* consumer : consumers)
{
consumer->ProducerResumed();
}
auto it = this->mapProducerRtpObservers.find(producer);
if (it != this->mapProducerRtpObservers.end())
{
auto& rtpObservers = it->second;
for (auto* rtpObserver : rtpObservers)
{
rtpObserver->ProducerResumed(producer);
}
}
}
inline void Router::OnTransportProducerNewRtpStream(
RTC::Transport* ,
RTC::Producer* producer,
RTC::RtpStreamRecv* rtpStream,
uint32_t mappedSsrc)
{
MS_TRACE();
auto& consumers = this->mapProducerConsumers.at(producer);
for (auto* consumer : consumers)
{
consumer->ProducerNewRtpStream(rtpStream, mappedSsrc);
}
}
inline void Router::OnTransportProducerRtpStreamScore(
RTC::Transport* ,
RTC::Producer* producer,
RTC::RtpStreamRecv* rtpStream,
uint8_t score,
uint8_t previousScore)
{
MS_TRACE();
auto& consumers = this->mapProducerConsumers.at(producer);
for (auto* consumer : consumers)
{
consumer->ProducerRtpStreamScore(rtpStream, score, previousScore);
}
}
inline void Router::OnTransportProducerRtcpSenderReport(
RTC::Transport* , RTC::Producer* producer, RTC::RtpStreamRecv* rtpStream, bool first)
{
MS_TRACE();
auto& consumers = this->mapProducerConsumers.at(producer);
for (auto* consumer : consumers)
{
consumer->ProducerRtcpSenderReport(rtpStream, first);
}
}
inline void Router::OnTransportProducerRtpPacketReceived(
RTC::Transport* , RTC::Producer* producer, RTC::RtpPacket* packet)
{
MS_TRACE();
#ifdef MS_RTC_LOGGER_RTP
packet->logger.routerId = this->id;
#endif
auto& consumers = this->mapProducerConsumers.at(producer);
if (!consumers.empty())
{
RTC::SharedRtpPacket sharedPacket;
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
DepLibUring::SetActive();
}
#endif
for (auto* consumer : consumers)
{
const auto& mid = consumer->GetRtpParameters().mid;
if (!mid.empty())
{
packet->UpdateMid(mid);
}
consumer->SendRtpPacket(packet, sharedPacket);
if (sharedPacket.HasPacket())
{
sharedPacket.AssertSamePacket(packet);
}
}
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
DepLibUring::Submit();
}
#endif
}
auto it = this->mapProducerRtpObservers.find(producer);
if (it != this->mapProducerRtpObservers.end())
{
auto& rtpObservers = it->second;
for (auto* rtpObserver : rtpObservers)
{
rtpObserver->ReceiveRtpPacket(producer, packet);
}
}
}
inline void Router::OnTransportNeedWorstRemoteFractionLost(
RTC::Transport* ,
RTC::Producer* producer,
uint32_t mappedSsrc,
uint8_t& worstRemoteFractionLost)
{
MS_TRACE();
auto& consumers = this->mapProducerConsumers.at(producer);
for (auto* consumer : consumers)
{
consumer->NeedWorstRemoteFractionLost(mappedSsrc, worstRemoteFractionLost);
}
}
inline void Router::OnTransportNewConsumer(
RTC::Transport* , RTC::Consumer* consumer, const std::string& producerId)
{
MS_TRACE();
auto mapProducersIt = this->mapProducers.find(producerId);
if (mapProducersIt == this->mapProducers.end())
{
MS_THROW_ERROR("Producer not found [producerId:%s]", producerId.c_str());
}
auto* producer = mapProducersIt->second;
auto mapProducerConsumersIt = this->mapProducerConsumers.find(producer);
MS_ASSERT(
mapProducerConsumersIt != this->mapProducerConsumers.end(),
"Producer not present in mapProducerConsumers");
MS_ASSERT(
this->mapConsumerProducer.find(consumer) == this->mapConsumerProducer.end(),
"Consumer already present in mapConsumerProducer");
if (producer->IsPaused())
{
consumer->ProducerPaused();
}
auto& consumers = mapProducerConsumersIt->second;
consumers.insert(consumer);
this->mapConsumerProducer[consumer] = producer;
for (const auto& kv : producer->GetRtpStreams())
{
auto* rtpStream = kv.first;
const uint32_t mappedSsrc = kv.second;
consumer->ProducerRtpStream(rtpStream, mappedSsrc);
}
consumer->ProducerRtpStreamScores(producer->GetRtpStreamScores());
}
inline void Router::OnTransportConsumerClosed(RTC::Transport* , RTC::Consumer* consumer)
{
MS_TRACE();
auto mapConsumerProducerIt = this->mapConsumerProducer.find(consumer);
MS_ASSERT(
mapConsumerProducerIt != this->mapConsumerProducer.end(),
"Consumer not present in mapConsumerProducer");
auto* producer = mapConsumerProducerIt->second;
MS_ASSERT(
this->mapProducerConsumers.find(producer) != this->mapProducerConsumers.end(),
"Producer not present in mapProducerConsumers");
auto& consumers = this->mapProducerConsumers.at(producer);
consumers.erase(consumer);
this->mapConsumerProducer.erase(mapConsumerProducerIt);
}
inline void Router::OnTransportConsumerProducerClosed(
RTC::Transport* , RTC::Consumer* consumer)
{
MS_TRACE();
auto mapConsumerProducerIt = this->mapConsumerProducer.find(consumer);
MS_ASSERT(
mapConsumerProducerIt != this->mapConsumerProducer.end(),
"Consumer not present in mapConsumerProducer");
this->mapConsumerProducer.erase(mapConsumerProducerIt);
}
inline void Router::OnTransportConsumerKeyFrameRequested(
RTC::Transport* , RTC::Consumer* consumer, uint32_t mappedSsrc)
{
MS_TRACE();
auto* producer = this->mapConsumerProducer.at(consumer);
producer->RequestKeyFrame(mappedSsrc);
}
inline void Router::OnTransportNewDataProducer(
RTC::Transport* , RTC::DataProducer* dataProducer)
{
MS_TRACE();
MS_ASSERT(
this->mapDataProducerDataConsumers.find(dataProducer) ==
this->mapDataProducerDataConsumers.end(),
"DataProducer already present in mapDataProducerDataConsumers");
if (this->mapDataProducers.find(dataProducer->id) != this->mapDataProducers.end())
{
MS_THROW_ERROR(
"DataProducer already present in mapDataProducers [dataProducerId:%s]",
dataProducer->id.c_str());
}
this->mapDataProducers[dataProducer->id] = dataProducer;
this->mapDataProducerDataConsumers[dataProducer];
}
inline void Router::OnTransportDataProducerClosed(
RTC::Transport* , RTC::DataProducer* dataProducer)
{
MS_TRACE();
auto mapDataProducerDataConsumersIt = this->mapDataProducerDataConsumers.find(dataProducer);
auto mapDataProducersIt = this->mapDataProducers.find(dataProducer->id);
MS_ASSERT(
mapDataProducerDataConsumersIt != this->mapDataProducerDataConsumers.end(),
"DataProducer not present in mapDataProducerDataConsumers");
MS_ASSERT(
mapDataProducersIt != this->mapDataProducers.end(),
"DataProducer not present in mapDataProducers");
auto& dataConsumers = mapDataProducerDataConsumersIt->second;
for (auto* dataConsumer : dataConsumers)
{
dataConsumer->DataProducerClosed();
}
this->mapDataProducers.erase(mapDataProducersIt);
this->mapDataProducerDataConsumers.erase(mapDataProducerDataConsumersIt);
}
inline void Router::OnTransportDataProducerPaused(
RTC::Transport* , RTC::DataProducer* dataProducer)
{
MS_TRACE();
auto& dataConsumers = this->mapDataProducerDataConsumers.at(dataProducer);
for (auto* dataConsumer : dataConsumers)
{
dataConsumer->DataProducerPaused();
}
}
inline void Router::OnTransportDataProducerResumed(
RTC::Transport* , RTC::DataProducer* dataProducer)
{
MS_TRACE();
auto& dataConsumers = this->mapDataProducerDataConsumers.at(dataProducer);
for (auto* dataConsumer : dataConsumers)
{
dataConsumer->DataProducerResumed();
}
}
inline void Router::OnTransportDataProducerMessageReceived(
RTC::Transport* ,
RTC::DataProducer* dataProducer,
const uint8_t* msg,
size_t len,
uint32_t ppid,
std::vector<uint16_t>& subchannels,
std::optional<uint16_t> requiredSubchannel)
{
MS_TRACE();
auto& dataConsumers = this->mapDataProducerDataConsumers.at(dataProducer);
if (!dataConsumers.empty())
{
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
DepLibUring::SetActive();
}
#endif
for (auto* dataConsumer : dataConsumers)
{
dataConsumer->SendMessage(msg, len, ppid, subchannels, requiredSubchannel);
}
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
DepLibUring::Submit();
}
#endif
}
}
inline void Router::OnTransportNewDataConsumer(
RTC::Transport* , RTC::DataConsumer* dataConsumer, std::string& dataProducerId)
{
MS_TRACE();
auto mapDataProducersIt = this->mapDataProducers.find(dataProducerId);
if (mapDataProducersIt == this->mapDataProducers.end())
{
MS_THROW_ERROR("DataProducer not found [dataProducerId:%s]", dataProducerId.c_str());
}
auto* dataProducer = mapDataProducersIt->second;
auto mapDataProducerDataConsumersIt = this->mapDataProducerDataConsumers.find(dataProducer);
MS_ASSERT(
mapDataProducerDataConsumersIt != this->mapDataProducerDataConsumers.end(),
"DataProducer not present in mapDataProducerDataConsumers");
MS_ASSERT(
this->mapDataConsumerDataProducer.find(dataConsumer) == this->mapDataConsumerDataProducer.end(),
"DataConsumer already present in mapDataConsumerDataProducer");
if (dataProducer->IsPaused())
{
dataConsumer->DataProducerPaused();
}
auto& dataConsumers = mapDataProducerDataConsumersIt->second;
dataConsumers.insert(dataConsumer);
this->mapDataConsumerDataProducer[dataConsumer] = dataProducer;
}
inline void Router::OnTransportDataConsumerClosed(
RTC::Transport* , RTC::DataConsumer* dataConsumer)
{
MS_TRACE();
auto mapDataConsumerDataProducerIt = this->mapDataConsumerDataProducer.find(dataConsumer);
MS_ASSERT(
mapDataConsumerDataProducerIt != this->mapDataConsumerDataProducer.end(),
"DataConsumer not present in mapDataConsumerDataProducer");
auto* dataProducer = mapDataConsumerDataProducerIt->second;
MS_ASSERT(
this->mapDataProducerDataConsumers.find(dataProducer) !=
this->mapDataProducerDataConsumers.end(),
"DataProducer not present in mapDataProducerDataConsumers");
auto& dataConsumers = this->mapDataProducerDataConsumers.at(dataProducer);
dataConsumers.erase(dataConsumer);
this->mapDataConsumerDataProducer.erase(mapDataConsumerDataProducerIt);
}
inline void Router::OnTransportDataConsumerDataProducerClosed(
RTC::Transport* , RTC::DataConsumer* dataConsumer)
{
MS_TRACE();
auto mapDataConsumerDataProducerIt = this->mapDataConsumerDataProducer.find(dataConsumer);
MS_ASSERT(
mapDataConsumerDataProducerIt != this->mapDataConsumerDataProducer.end(),
"DataConsumer not present in mapDataConsumerDataProducer");
this->mapDataConsumerDataProducer.erase(mapDataConsumerDataProducerIt);
}
inline void Router::OnTransportListenServerClosed(RTC::Transport* transport)
{
MS_TRACE();
MS_ASSERT(
this->mapTransports.find(transport->id) != this->mapTransports.end(),
"Transport not present in mapTransports");
transport->CloseProducersAndConsumers();
this->mapTransports.erase(transport->id);
delete transport;
}
void Router::OnRtpObserverAddProducer(RTC::RtpObserver* rtpObserver, RTC::Producer* producer)
{
this->mapProducerRtpObservers[producer].insert(rtpObserver);
}
void Router::OnRtpObserverRemoveProducer(RTC::RtpObserver* rtpObserver, RTC::Producer* producer)
{
this->mapProducerRtpObservers[producer].erase(rtpObserver);
}
RTC::Producer* Router::RtpObserverGetProducer(
RTC::RtpObserver* , const std::string& id)
{
auto it = this->mapProducers.find(id);
if (it == this->mapProducers.end())
{
MS_THROW_ERROR("Producer not found");
}
RTC::Producer* producer = it->second;
return producer;
}
}