#define MS_CLASS "RTC::Transport"
#include "RTC/Transport.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include "Channel/ChannelNotifier.hpp"
#include "PayloadChannel/PayloadChannelNotifier.hpp"
#include "RTC/BweType.hpp"
#include "RTC/PipeConsumer.hpp"
#include "RTC/RTCP/FeedbackPs.hpp"
#include "RTC/RTCP/FeedbackPsAfb.hpp"
#include "RTC/RTCP/FeedbackPsRemb.hpp"
#include "RTC/RTCP/FeedbackRtp.hpp"
#include "RTC/RTCP/FeedbackRtpNack.hpp"
#include "RTC/RTCP/FeedbackRtpTransport.hpp"
#include "RTC/RTCP/XrDelaySinceLastRr.hpp"
#include "RTC/RtpDictionaries.hpp"
#include "RTC/SimpleConsumer.hpp"
#include "RTC/SimulcastConsumer.hpp"
#include "RTC/SvcConsumer.hpp"
#include <libwebrtc/modules/rtp_rtcp/include/rtp_rtcp_defines.h>
#include <iterator>
#include <map>
#include <sstream>
namespace RTC
{
static size_t DefaultSctpSendBufferSize{ 262144 }; static size_t MaxSctpSendBufferSize{ 268435456 };
Transport::Transport(const std::string& id, Listener* listener, json& data)
: id(id), listener(listener), recvRtxTransmission(1000u), sendRtxTransmission(1000u),
sendProbationTransmission(100u)
{
MS_TRACE();
auto jsonDirectIt = data.find("direct");
if (
jsonDirectIt != data.end() &&
jsonDirectIt->is_boolean() &&
jsonDirectIt->get<bool>()
)
{
this->direct = true;
auto jsonMaxMessageSizeIt = data.find("maxMessageSize");
if (
jsonMaxMessageSizeIt == data.end() ||
!Utils::Json::IsPositiveInteger(*jsonMaxMessageSizeIt)
)
{
MS_THROW_TYPE_ERROR("wrong maxMessageSize (not a number)");
}
this->maxMessageSize = jsonMaxMessageSizeIt->get<size_t>();
}
auto jsonInitialAvailableOutgoingBitrateIt = data.find("initialAvailableOutgoingBitrate");
if (jsonInitialAvailableOutgoingBitrateIt != data.end())
{
if (!Utils::Json::IsPositiveInteger(*jsonInitialAvailableOutgoingBitrateIt))
MS_THROW_TYPE_ERROR("wrong initialAvailableOutgoingBitrate (not a number)");
this->initialAvailableOutgoingBitrate = jsonInitialAvailableOutgoingBitrateIt->get<uint32_t>();
}
auto jsonEnableSctpIt = data.find("enableSctp");
if (
jsonEnableSctpIt != data.end() &&
jsonEnableSctpIt->is_boolean() &&
jsonEnableSctpIt->get<bool>()
)
{
if (this->direct)
{
MS_THROW_TYPE_ERROR("cannot enable SCTP in a direct Transport");
}
auto jsonNumSctpStreamsIt = data.find("numSctpStreams");
auto jsonMaxSctpMessageSizeIt = data.find("maxSctpMessageSize");
auto jsonSctpSendBufferSizeIt = data.find("sctpSendBufferSize");
auto jsonIsDataChannelIt = data.find("isDataChannel");
if (
jsonNumSctpStreamsIt == data.end() ||
!jsonNumSctpStreamsIt->is_object()
)
{
MS_THROW_TYPE_ERROR("wrong numSctpStreams (not an object)");
}
auto jsonOSIt = jsonNumSctpStreamsIt->find("OS");
auto jsonMISIt = jsonNumSctpStreamsIt->find("MIS");
if (
jsonOSIt == jsonNumSctpStreamsIt->end() ||
!Utils::Json::IsPositiveInteger(*jsonOSIt) ||
jsonMISIt == jsonNumSctpStreamsIt->end() ||
!Utils::Json::IsPositiveInteger(*jsonMISIt)
)
{
MS_THROW_TYPE_ERROR("wrong numSctpStreams.OS and/or numSctpStreams.MIS (not a number)");
}
auto os = jsonOSIt->get<uint16_t>();
auto mis = jsonMISIt->get<uint16_t>();
if (
jsonMaxSctpMessageSizeIt == data.end() ||
!Utils::Json::IsPositiveInteger(*jsonMaxSctpMessageSizeIt)
)
{
MS_THROW_TYPE_ERROR("wrong maxSctpMessageSize (not a number)");
}
this->maxMessageSize = jsonMaxSctpMessageSizeIt->get<size_t>();
size_t sctpSendBufferSize;
if (jsonSctpSendBufferSizeIt != data.end())
{
if (!Utils::Json::IsPositiveInteger(*jsonSctpSendBufferSizeIt))
{
MS_THROW_TYPE_ERROR("wrong sctpSendBufferSize (not a number)");
}
sctpSendBufferSize = jsonSctpSendBufferSizeIt->get<size_t>();
if (sctpSendBufferSize > MaxSctpSendBufferSize)
{
MS_THROW_TYPE_ERROR("wrong sctpSendBufferSize (maximum value exceeded)");
}
}
else
{
sctpSendBufferSize = DefaultSctpSendBufferSize;
}
bool isDataChannel{ false };
if (jsonIsDataChannelIt != data.end() && jsonIsDataChannelIt->is_boolean())
isDataChannel = jsonIsDataChannelIt->get<bool>();
this->sctpAssociation = new RTC::SctpAssociation(
this, os, mis, this->maxMessageSize, sctpSendBufferSize, isDataChannel);
}
this->rtcpTimer = new Timer(this);
}
Transport::~Transport()
{
MS_TRACE();
this->destroying = true;
for (auto& kv : this->mapProducers)
{
auto* producer = kv.second;
delete producer;
}
this->mapProducers.clear();
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
delete consumer;
}
this->mapConsumers.clear();
this->mapSsrcConsumer.clear();
this->mapRtxSsrcConsumer.clear();
for (auto& kv : this->mapDataProducers)
{
auto* dataProducer = kv.second;
delete dataProducer;
}
this->mapDataProducers.clear();
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
delete dataConsumer;
}
this->mapDataConsumers.clear();
delete this->sctpAssociation;
this->sctpAssociation = nullptr;
delete this->rtcpTimer;
this->rtcpTimer = nullptr;
delete this->tccClient;
this->tccClient = nullptr;
delete this->tccServer;
this->tccServer = nullptr;
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
delete this->senderBwe;
this->senderBwe = nullptr;
#endif
}
void Transport::CloseProducersAndConsumers()
{
MS_TRACE();
for (auto& kv : this->mapProducers)
{
auto* producer = kv.second;
this->listener->OnTransportProducerClosed(this, producer);
delete producer;
}
this->mapProducers.clear();
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
this->listener->OnTransportConsumerClosed(this, consumer);
delete consumer;
}
this->mapConsumers.clear();
this->mapSsrcConsumer.clear();
this->mapRtxSsrcConsumer.clear();
for (auto& kv : this->mapDataProducers)
{
auto* dataProducer = kv.second;
this->listener->OnTransportDataProducerClosed(this, dataProducer);
delete dataProducer;
}
this->mapDataProducers.clear();
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
this->listener->OnTransportDataConsumerClosed(this, dataConsumer);
delete dataConsumer;
}
this->mapDataConsumers.clear();
}
void Transport::FillJson(json& jsonObject) const
{
MS_TRACE();
jsonObject["id"] = this->id;
jsonObject["direct"] = this->direct;
jsonObject["producerIds"] = json::array();
auto jsonProducerIdsIt = jsonObject.find("producerIds");
for (const auto& kv : this->mapProducers)
{
const auto& producerId = kv.first;
jsonProducerIdsIt->emplace_back(producerId);
}
jsonObject["consumerIds"] = json::array();
auto jsonConsumerIdsIt = jsonObject.find("consumerIds");
for (const auto& kv : this->mapConsumers)
{
const auto& consumerId = kv.first;
jsonConsumerIdsIt->emplace_back(consumerId);
}
jsonObject["mapSsrcConsumerId"] = json::object();
auto jsonMapSsrcConsumerId = jsonObject.find("mapSsrcConsumerId");
for (const auto& kv : this->mapSsrcConsumer)
{
auto ssrc = kv.first;
auto* consumer = kv.second;
(*jsonMapSsrcConsumerId)[std::to_string(ssrc)] = consumer->id;
}
jsonObject["mapRtxSsrcConsumerId"] = json::object();
auto jsonMapRtxSsrcConsumerId = jsonObject.find("mapRtxSsrcConsumerId");
for (const auto& kv : this->mapRtxSsrcConsumer)
{
auto ssrc = kv.first;
auto* consumer = kv.second;
(*jsonMapRtxSsrcConsumerId)[std::to_string(ssrc)] = consumer->id;
}
jsonObject["dataProducerIds"] = json::array();
auto jsonDataProducerIdsIt = jsonObject.find("dataProducerIds");
for (const auto& kv : this->mapDataProducers)
{
const auto& dataProducerId = kv.first;
jsonDataProducerIdsIt->emplace_back(dataProducerId);
}
jsonObject["dataConsumerIds"] = json::array();
auto jsonDataConsumerIdsIt = jsonObject.find("dataConsumerIds");
for (const auto& kv : this->mapDataConsumers)
{
const auto& dataConsumerId = kv.first;
jsonDataConsumerIdsIt->emplace_back(dataConsumerId);
}
jsonObject["recvRtpHeaderExtensions"] = json::object();
auto jsonRtpHeaderExtensionsIt = jsonObject.find("recvRtpHeaderExtensions");
if (this->recvRtpHeaderExtensionIds.mid != 0u)
(*jsonRtpHeaderExtensionsIt)["mid"] = this->recvRtpHeaderExtensionIds.mid;
if (this->recvRtpHeaderExtensionIds.rid != 0u)
(*jsonRtpHeaderExtensionsIt)["rid"] = this->recvRtpHeaderExtensionIds.rid;
if (this->recvRtpHeaderExtensionIds.rrid != 0u)
(*jsonRtpHeaderExtensionsIt)["rrid"] = this->recvRtpHeaderExtensionIds.rrid;
if (this->recvRtpHeaderExtensionIds.absSendTime != 0u)
(*jsonRtpHeaderExtensionsIt)["absSendTime"] = this->recvRtpHeaderExtensionIds.absSendTime;
if (this->recvRtpHeaderExtensionIds.transportWideCc01 != 0u)
(*jsonRtpHeaderExtensionsIt)["transportWideCc01"] =
this->recvRtpHeaderExtensionIds.transportWideCc01;
this->rtpListener.FillJson(jsonObject["rtpListener"]);
jsonObject["maxMessageSize"] = this->maxMessageSize;
if (this->sctpAssociation)
{
this->sctpAssociation->FillJson(jsonObject["sctpParameters"]);
switch (this->sctpAssociation->GetState())
{
case RTC::SctpAssociation::SctpState::NEW:
jsonObject["sctpState"] = "new";
break;
case RTC::SctpAssociation::SctpState::CONNECTING:
jsonObject["sctpState"] = "connecting";
break;
case RTC::SctpAssociation::SctpState::CONNECTED:
jsonObject["sctpState"] = "connected";
break;
case RTC::SctpAssociation::SctpState::FAILED:
jsonObject["sctpState"] = "failed";
break;
case RTC::SctpAssociation::SctpState::CLOSED:
jsonObject["sctpState"] = "closed";
break;
}
this->sctpListener.FillJson(jsonObject["sctpListener"]);
}
std::vector<std::string> traceEventTypes;
std::ostringstream traceEventTypesStream;
if (this->traceEventTypes.probation)
traceEventTypes.emplace_back("probation");
if (this->traceEventTypes.bwe)
traceEventTypes.emplace_back("bwe");
if (!traceEventTypes.empty())
{
std::copy(
traceEventTypes.begin(),
traceEventTypes.end() - 1,
std::ostream_iterator<std::string>(traceEventTypesStream, ","));
traceEventTypesStream << traceEventTypes.back();
}
jsonObject["traceEventTypes"] = traceEventTypesStream.str();
}
void Transport::FillJsonStats(json& jsonArray)
{
MS_TRACE();
auto nowMs = DepLibUV::GetTimeMs();
jsonArray.emplace_back(json::value_t::object);
auto& jsonObject = jsonArray[0];
jsonObject["transportId"] = this->id;
jsonObject["timestamp"] = nowMs;
if (this->sctpAssociation)
{
switch (this->sctpAssociation->GetState())
{
case RTC::SctpAssociation::SctpState::NEW:
jsonObject["sctpState"] = "new";
break;
case RTC::SctpAssociation::SctpState::CONNECTING:
jsonObject["sctpState"] = "connecting";
break;
case RTC::SctpAssociation::SctpState::CONNECTED:
jsonObject["sctpState"] = "connected";
break;
case RTC::SctpAssociation::SctpState::FAILED:
jsonObject["sctpState"] = "failed";
break;
case RTC::SctpAssociation::SctpState::CLOSED:
jsonObject["sctpState"] = "closed";
break;
}
}
jsonObject["bytesReceived"] = this->recvTransmission.GetBytes();
jsonObject["recvBitrate"] = this->recvTransmission.GetRate(nowMs);
jsonObject["bytesSent"] = this->sendTransmission.GetBytes();
jsonObject["sendBitrate"] = this->sendTransmission.GetRate(nowMs);
jsonObject["rtpBytesReceived"] = this->recvRtpTransmission.GetBytes();
jsonObject["rtpRecvBitrate"] = this->recvRtpTransmission.GetBitrate(nowMs);
jsonObject["rtpBytesSent"] = this->sendRtpTransmission.GetBytes();
jsonObject["rtpSendBitrate"] = this->sendRtpTransmission.GetBitrate(nowMs);
jsonObject["rtxBytesReceived"] = this->recvRtxTransmission.GetBytes();
jsonObject["rtxRecvBitrate"] = this->recvRtxTransmission.GetBitrate(nowMs);
jsonObject["rtxBytesSent"] = this->sendRtxTransmission.GetBytes();
jsonObject["rtxSendBitrate"] = this->sendRtxTransmission.GetBitrate(nowMs);
jsonObject["probationBytesSent"] = this->sendProbationTransmission.GetBytes();
jsonObject["probationSendBitrate"] = this->sendProbationTransmission.GetBitrate(nowMs);
if (this->tccClient)
jsonObject["availableOutgoingBitrate"] = this->tccClient->GetAvailableBitrate();
if (this->tccServer && this->tccServer->GetAvailableBitrate() != 0u)
jsonObject["availableIncomingBitrate"] = this->tccServer->GetAvailableBitrate();
if (this->maxIncomingBitrate != 0u)
jsonObject["maxIncomingBitrate"] = this->maxIncomingBitrate;
}
void Transport::HandleRequest(Channel::ChannelRequest* request)
{
MS_TRACE();
switch (request->methodId)
{
case Channel::ChannelRequest::MethodId::TRANSPORT_DUMP:
{
json data = json::object();
FillJson(data);
request->Accept(data);
break;
}
case Channel::ChannelRequest::MethodId::TRANSPORT_GET_STATS:
{
json data = json::array();
FillJsonStats(data);
request->Accept(data);
break;
}
case Channel::ChannelRequest::MethodId::TRANSPORT_SET_MAX_INCOMING_BITRATE:
{
auto jsonBitrateIt = request->data.find("bitrate");
if (
jsonBitrateIt == request->data.end() ||
!Utils::Json::IsPositiveInteger(*jsonBitrateIt)
)
{
MS_THROW_TYPE_ERROR("missing bitrate");
}
this->maxIncomingBitrate = jsonBitrateIt->get<uint32_t>();
MS_DEBUG_TAG(bwe, "maximum incoming bitrate set to %" PRIu32, this->maxIncomingBitrate);
request->Accept();
if (this->tccServer)
this->tccServer->SetMaxIncomingBitrate(this->maxIncomingBitrate);
break;
}
case Channel::ChannelRequest::MethodId::TRANSPORT_SET_MAX_OUTGOING_BITRATE:
{
auto jsonBitrateIt = request->data.find("bitrate");
if (
jsonBitrateIt == request->data.end() ||
!Utils::Json::IsPositiveInteger(*jsonBitrateIt)
)
{
MS_THROW_TYPE_ERROR("missing bitrate");
}
if (this->tccClient)
{
uint32_t bitrate = jsonBitrateIt->get<uint32_t>();
this->tccClient->SetMaxOutgoingBitrate(bitrate);
this->maxOutgoingBitrate = bitrate;
MS_DEBUG_TAG(bwe, "maximum outgoing bitrate set to %" PRIu32, this->maxOutgoingBitrate);
ComputeOutgoingDesiredBitrate();
}
request->Accept();
break;
}
case Channel::ChannelRequest::MethodId::TRANSPORT_PRODUCE:
{
std::string producerId;
SetNewProducerIdFromInternal(request->internal, producerId);
auto* producer = new RTC::Producer(producerId, this, request->data);
try
{
this->rtpListener.AddProducer(producer);
}
catch (const MediaSoupError& error)
{
delete producer;
throw;
}
try
{
this->listener->OnTransportNewProducer(this, producer);
}
catch (const MediaSoupError& error)
{
this->rtpListener.RemoveProducer(producer);
delete producer;
throw;
}
this->mapProducers[producerId] = producer;
MS_DEBUG_DEV("Producer created [producerId:%s]", producerId.c_str());
const auto& producerRtpHeaderExtensionIds = producer->GetRtpHeaderExtensionIds();
if (producerRtpHeaderExtensionIds.mid != 0u)
{
this->recvRtpHeaderExtensionIds.mid = producerRtpHeaderExtensionIds.mid;
}
if (producerRtpHeaderExtensionIds.rid != 0u)
{
this->recvRtpHeaderExtensionIds.rid = producerRtpHeaderExtensionIds.rid;
}
if (producerRtpHeaderExtensionIds.rrid != 0u)
{
this->recvRtpHeaderExtensionIds.rrid = producerRtpHeaderExtensionIds.rrid;
}
if (producerRtpHeaderExtensionIds.absSendTime != 0u)
{
this->recvRtpHeaderExtensionIds.absSendTime = producerRtpHeaderExtensionIds.absSendTime;
}
if (producerRtpHeaderExtensionIds.transportWideCc01 != 0u)
{
this->recvRtpHeaderExtensionIds.transportWideCc01 =
producerRtpHeaderExtensionIds.transportWideCc01;
}
json data = json::object();
data["type"] = RTC::RtpParameters::GetTypeString(producer->GetType());
request->Accept(data);
const auto& rtpHeaderExtensionIds = producer->GetRtpHeaderExtensionIds();
const auto& codecs = producer->GetRtpParameters().codecs;
if (!this->tccServer)
{
bool createTccServer{ false };
RTC::BweType bweType;
if (
rtpHeaderExtensionIds.transportWideCc01 != 0u &&
std::any_of(
codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec)
{
return std::any_of(
codec.rtcpFeedback.begin(), codec.rtcpFeedback.end(), [](const RTC::RtcpFeedback& fb)
{
return fb.type == "transport-cc";
});
})
)
{
MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlServer with transport-cc");
createTccServer = true;
bweType = RTC::BweType::TRANSPORT_CC;
}
else if (
rtpHeaderExtensionIds.absSendTime != 0u &&
std::any_of(
codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec)
{
return std::any_of(
codec.rtcpFeedback.begin(), codec.rtcpFeedback.end(), [](const RTC::RtcpFeedback& fb)
{
return fb.type == "goog-remb";
});
})
)
{
MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlServer with REMB");
createTccServer = true;
bweType = RTC::BweType::REMB;
}
if (createTccServer)
{
this->tccServer = new RTC::TransportCongestionControlServer(this, bweType, RTC::MtuSize);
if (this->maxIncomingBitrate != 0u)
this->tccServer->SetMaxIncomingBitrate(this->maxIncomingBitrate);
if (IsConnected())
this->tccServer->TransportConnected();
}
}
break;
}
case Channel::ChannelRequest::MethodId::TRANSPORT_CONSUME:
{
auto jsonProducerIdIt = request->internal.find("producerId");
if (jsonProducerIdIt == request->internal.end() || !jsonProducerIdIt->is_string())
MS_THROW_ERROR("missing internal.producerId");
std::string producerId = jsonProducerIdIt->get<std::string>();
std::string consumerId;
SetNewConsumerIdFromInternal(request->internal, consumerId);
auto jsonTypeIt = request->data.find("type");
if (jsonTypeIt == request->data.end() || !jsonTypeIt->is_string())
MS_THROW_TYPE_ERROR("missing type");
auto type = RTC::RtpParameters::GetType(jsonTypeIt->get<std::string>());
RTC::Consumer* consumer{ nullptr };
switch (type)
{
case RTC::RtpParameters::Type::NONE:
{
MS_THROW_TYPE_ERROR("invalid type 'none'");
break;
}
case RTC::RtpParameters::Type::SIMPLE:
{
consumer = new RTC::SimpleConsumer(consumerId, producerId, this, request->data);
break;
}
case RTC::RtpParameters::Type::SIMULCAST:
{
consumer = new RTC::SimulcastConsumer(consumerId, producerId, this, request->data);
break;
}
case RTC::RtpParameters::Type::SVC:
{
consumer = new RTC::SvcConsumer(consumerId, producerId, this, request->data);
break;
}
case RTC::RtpParameters::Type::PIPE:
{
consumer = new RTC::PipeConsumer(consumerId, producerId, this, request->data);
break;
}
}
try
{
this->listener->OnTransportNewConsumer(this, consumer, producerId);
}
catch (const MediaSoupError& error)
{
delete consumer;
throw;
}
this->mapConsumers[consumerId] = consumer;
for (auto ssrc : consumer->GetMediaSsrcs())
{
this->mapSsrcConsumer[ssrc] = consumer;
}
for (auto ssrc : consumer->GetRtxSsrcs())
{
this->mapRtxSsrcConsumer[ssrc] = consumer;
}
MS_DEBUG_DEV(
"Consumer created [consumerId:%s, producerId:%s]", consumerId.c_str(), producerId.c_str());
json data = json::object();
data["paused"] = consumer->IsPaused();
data["producerPaused"] = consumer->IsProducerPaused();
consumer->FillJsonScore(data["score"]);
auto preferredLayers = consumer->GetPreferredLayers();
if (preferredLayers.spatial > -1 && preferredLayers.temporal > -1)
{
data["preferredLayers"]["spatialLayer"] = preferredLayers.spatial;
data["preferredLayers"]["temporalLayer"] = preferredLayers.temporal;
}
request->Accept(data);
const auto& rtpHeaderExtensionIds = consumer->GetRtpHeaderExtensionIds();
const auto& codecs = consumer->GetRtpParameters().codecs;
if (!this->tccClient)
{
bool createTccClient{ false };
RTC::BweType bweType;
if (
consumer->GetKind() == RTC::Media::Kind::VIDEO &&
rtpHeaderExtensionIds.transportWideCc01 != 0u &&
std::any_of(
codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec)
{
return std::any_of(
codec.rtcpFeedback.begin(), codec.rtcpFeedback.end(), [](const RTC::RtcpFeedback& fb)
{
return fb.type == "transport-cc";
});
})
)
{
MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlClient with transport-cc");
createTccClient = true;
bweType = RTC::BweType::TRANSPORT_CC;
}
else if (
consumer->GetKind() == RTC::Media::Kind::VIDEO &&
rtpHeaderExtensionIds.absSendTime != 0u &&
std::any_of(
codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec)
{
return std::any_of(
codec.rtcpFeedback.begin(), codec.rtcpFeedback.end(), [](const RTC::RtcpFeedback& fb)
{
return fb.type == "goog-remb";
});
})
)
{
MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlClient with REMB");
createTccClient = true;
bweType = RTC::BweType::REMB;
}
if (createTccClient)
{
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
consumer->SetExternallyManagedBitrate();
};
this->tccClient = new RTC::TransportCongestionControlClient(
this, bweType, this->initialAvailableOutgoingBitrate, this->maxOutgoingBitrate);
if (IsConnected())
this->tccClient->TransportConnected();
}
}
if (this->tccClient)
consumer->SetExternallyManagedBitrate();
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
if (
!this->senderBwe &&
consumer->GetKind() == RTC::Media::Kind::VIDEO &&
rtpHeaderExtensionIds.transportWideCc01 != 0u &&
std::any_of(
codecs.begin(), codecs.end(), [](const RTC::RtpCodecParameters& codec)
{
return std::any_of(
codec.rtcpFeedback.begin(), codec.rtcpFeedback.end(), [](const RTC::RtcpFeedback& fb)
{
return fb.type == "transport-cc";
});
})
)
{
MS_DEBUG_TAG(bwe, "enabling SenderBandwidthEstimator");
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
consumer->SetExternallyManagedBitrate();
};
this->senderBwe =
new RTC::SenderBandwidthEstimator(this, this->initialAvailableOutgoingBitrate);
if (IsConnected())
this->senderBwe->TransportConnected();
}
if (this->senderBwe)
consumer->SetExternallyManagedBitrate();
#endif
if (IsConnected())
consumer->TransportConnected();
break;
}
case Channel::ChannelRequest::MethodId::TRANSPORT_PRODUCE_DATA:
{
if (!this->sctpAssociation && !this->direct)
{
MS_THROW_ERROR("SCTP not enabled and not a direct Transport");
}
std::string dataProducerId;
SetNewDataProducerIdFromInternal(request->internal, dataProducerId);
auto* dataProducer = new RTC::DataProducer(dataProducerId, this, request->data);
switch (dataProducer->GetType())
{
case RTC::DataProducer::Type::SCTP:
{
if (!this->sctpAssociation)
{
delete dataProducer;
MS_THROW_TYPE_ERROR(
"cannot create a DataProducer of type 'sctp', SCTP not enabled in this Transport");
;
}
break;
}
case RTC::DataProducer::Type::DIRECT:
{
if (!this->direct)
{
delete dataProducer;
MS_THROW_TYPE_ERROR(
"cannot create a DataProducer of type 'direct', not a direct Transport");
;
}
break;
}
}
if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP)
{
try
{
this->sctpListener.AddDataProducer(dataProducer);
}
catch (const MediaSoupError& error)
{
delete dataProducer;
throw;
}
}
try
{
this->listener->OnTransportNewDataProducer(this, dataProducer);
}
catch (const MediaSoupError& error)
{
if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP)
{
this->sctpListener.RemoveDataProducer(dataProducer);
}
delete dataProducer;
throw;
}
this->mapDataProducers[dataProducerId] = dataProducer;
MS_DEBUG_DEV("DataProducer created [dataProducerId:%s]", dataProducerId.c_str());
json data = json::object();
dataProducer->FillJson(data);
request->Accept(data);
break;
}
case Channel::ChannelRequest::MethodId::TRANSPORT_CONSUME_DATA:
{
if (!this->sctpAssociation && !this->direct)
{
MS_THROW_ERROR("SCTP not enabled and not a direct Transport");
}
auto jsonDataProducerIdIt = request->internal.find("dataProducerId");
if (jsonDataProducerIdIt == request->internal.end() || !jsonDataProducerIdIt->is_string())
{
MS_THROW_ERROR("missing internal.dataProducerId");
}
std::string dataProducerId = jsonDataProducerIdIt->get<std::string>();
std::string dataConsumerId;
SetNewDataConsumerIdFromInternal(request->internal, dataConsumerId);
auto* dataConsumer = new RTC::DataConsumer(
dataConsumerId, dataProducerId, this, request->data, this->maxMessageSize);
switch (dataConsumer->GetType())
{
case RTC::DataConsumer::Type::SCTP:
{
if (!this->sctpAssociation)
{
delete dataConsumer;
MS_THROW_TYPE_ERROR(
"cannot create a DataConsumer of type 'sctp', SCTP not enabled in this Transport");
;
}
break;
}
case RTC::DataConsumer::Type::DIRECT:
{
if (!this->direct)
{
delete dataConsumer;
MS_THROW_TYPE_ERROR(
"cannot create a DataConsumer of type 'direct', not a direct Transport");
;
}
break;
}
}
try
{
this->listener->OnTransportNewDataConsumer(this, dataConsumer, dataProducerId);
}
catch (const MediaSoupError& error)
{
delete dataConsumer;
throw;
}
this->mapDataConsumers[dataConsumerId] = dataConsumer;
MS_DEBUG_DEV(
"DataConsumer created [dataConsumerId:%s, dataProducerId:%s]",
dataConsumerId.c_str(),
dataProducerId.c_str());
json data = json::object();
dataConsumer->FillJson(data);
request->Accept(data);
if (IsConnected())
dataConsumer->TransportConnected();
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
if (this->sctpAssociation->GetState() == RTC::SctpAssociation::SctpState::CONNECTED)
{
dataConsumer->SctpAssociationConnected();
}
this->sctpAssociation->HandleDataConsumer(dataConsumer);
}
break;
}
case Channel::ChannelRequest::MethodId::TRANSPORT_ENABLE_TRACE_EVENT:
{
auto jsonTypesIt = request->data.find("types");
if (jsonTypesIt == request->data.end() || !jsonTypesIt->is_array())
MS_THROW_TYPE_ERROR("wrong types (not an array)");
struct TraceEventTypes newTraceEventTypes;
for (const auto& type : *jsonTypesIt)
{
if (!type.is_string())
MS_THROW_TYPE_ERROR("wrong type (not a string)");
std::string typeStr = type.get<std::string>();
if (typeStr == "probation")
newTraceEventTypes.probation = true;
if (typeStr == "bwe")
newTraceEventTypes.bwe = true;
}
this->traceEventTypes = newTraceEventTypes;
request->Accept();
break;
}
case Channel::ChannelRequest::MethodId::PRODUCER_CLOSE:
{
RTC::Producer* producer = GetProducerFromInternal(request->internal);
this->rtpListener.RemoveProducer(producer);
this->mapProducers.erase(producer->id);
for (const auto& kv : producer->GetRtpStreams())
{
auto* rtpStream = kv.first;
RecvStreamClosed(rtpStream->GetSsrc());
if (rtpStream->HasRtx())
RecvStreamClosed(rtpStream->GetRtxSsrc());
}
this->listener->OnTransportProducerClosed(this, producer);
MS_DEBUG_DEV("Producer closed [producerId:%s]", producer->id.c_str());
delete producer;
request->Accept();
break;
}
case Channel::ChannelRequest::MethodId::CONSUMER_CLOSE:
{
RTC::Consumer* consumer = GetConsumerFromInternal(request->internal);
this->mapConsumers.erase(consumer->id);
for (auto ssrc : consumer->GetMediaSsrcs())
{
this->mapSsrcConsumer.erase(ssrc);
SendStreamClosed(ssrc);
}
for (auto ssrc : consumer->GetRtxSsrcs())
{
this->mapRtxSsrcConsumer.erase(ssrc);
SendStreamClosed(ssrc);
}
this->listener->OnTransportConsumerClosed(this, consumer);
MS_DEBUG_DEV("Consumer closed [consumerId:%s]", consumer->id.c_str());
delete consumer;
request->Accept();
if (this->tccClient)
ComputeOutgoingDesiredBitrate( true);
break;
}
case Channel::ChannelRequest::MethodId::PRODUCER_DUMP:
case Channel::ChannelRequest::MethodId::PRODUCER_GET_STATS:
case Channel::ChannelRequest::MethodId::PRODUCER_PAUSE:
case Channel::ChannelRequest::MethodId::PRODUCER_RESUME:
case Channel::ChannelRequest::MethodId::PRODUCER_ENABLE_TRACE_EVENT:
{
RTC::Producer* producer = GetProducerFromInternal(request->internal);
producer->HandleRequest(request);
break;
}
case Channel::ChannelRequest::MethodId::CONSUMER_DUMP:
case Channel::ChannelRequest::MethodId::CONSUMER_GET_STATS:
case Channel::ChannelRequest::MethodId::CONSUMER_PAUSE:
case Channel::ChannelRequest::MethodId::CONSUMER_RESUME:
case Channel::ChannelRequest::MethodId::CONSUMER_SET_PREFERRED_LAYERS:
case Channel::ChannelRequest::MethodId::CONSUMER_SET_PRIORITY:
case Channel::ChannelRequest::MethodId::CONSUMER_REQUEST_KEY_FRAME:
case Channel::ChannelRequest::MethodId::CONSUMER_ENABLE_TRACE_EVENT:
{
RTC::Consumer* consumer = GetConsumerFromInternal(request->internal);
consumer->HandleRequest(request);
break;
}
case Channel::ChannelRequest::MethodId::DATA_PRODUCER_CLOSE:
{
RTC::DataProducer* dataProducer = GetDataProducerFromInternal(request->internal);
if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP)
{
this->sctpListener.RemoveDataProducer(dataProducer);
}
this->mapDataProducers.erase(dataProducer->id);
this->listener->OnTransportDataProducerClosed(this, dataProducer);
MS_DEBUG_DEV("DataProducer closed [dataProducerId:%s]", dataProducer->id.c_str());
if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP)
{
this->sctpAssociation->DataProducerClosed(dataProducer);
}
delete dataProducer;
request->Accept();
break;
}
case Channel::ChannelRequest::MethodId::DATA_CONSUMER_CLOSE:
{
RTC::DataConsumer* dataConsumer = GetDataConsumerFromInternal(request->internal);
this->mapDataConsumers.erase(dataConsumer->id);
this->listener->OnTransportDataConsumerClosed(this, dataConsumer);
MS_DEBUG_DEV("DataConsumer closed [dataConsumerId:%s]", dataConsumer->id.c_str());
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
this->sctpAssociation->DataConsumerClosed(dataConsumer);
}
delete dataConsumer;
request->Accept();
break;
}
case Channel::ChannelRequest::MethodId::DATA_PRODUCER_DUMP:
case Channel::ChannelRequest::MethodId::DATA_PRODUCER_GET_STATS:
{
RTC::DataProducer* dataProducer = GetDataProducerFromInternal(request->internal);
dataProducer->HandleRequest(request);
break;
}
case Channel::ChannelRequest::MethodId::DATA_CONSUMER_DUMP:
case Channel::ChannelRequest::MethodId::DATA_CONSUMER_GET_STATS:
{
RTC::DataConsumer* dataConsumer = GetDataConsumerFromInternal(request->internal);
dataConsumer->HandleRequest(request);
break;
}
case Channel::ChannelRequest::MethodId::DATA_CONSUMER_GET_BUFFERED_AMOUNT:
{
RTC::DataConsumer* dataConsumer = GetDataConsumerFromInternal(request->internal);
if (dataConsumer->GetType() != RTC::DataConsumer::Type::SCTP)
{
MS_THROW_ERROR("invalid DataConsumer type");
}
if (!this->sctpAssociation)
{
MS_THROW_ERROR("no SCTP association present");
}
json data = json::object();
data["bufferedAmount"] = this->sctpAssociation->GetSctpBufferedAmount();
request->Accept(data);
break;
}
case Channel::ChannelRequest::MethodId::DATA_CONSUMER_SET_BUFFERED_AMOUNT_LOW_THRESHOLD:
{
RTC::DataConsumer* dataConsumer = GetDataConsumerFromInternal(request->internal);
if (dataConsumer->GetType() != RTC::DataConsumer::Type::SCTP)
{
MS_THROW_ERROR("invalid DataConsumer type");
}
dataConsumer->HandleRequest(request);
break;
}
default:
{
MS_THROW_ERROR("unknown method '%s'", request->method.c_str());
}
}
}
void Transport::HandleRequest(PayloadChannel::PayloadChannelRequest* request)
{
MS_TRACE();
switch (request->methodId)
{
case PayloadChannel::PayloadChannelRequest::MethodId::DATA_CONSUMER_SEND:
{
RTC::DataConsumer* dataConsumer = GetDataConsumerFromInternal(request->internal);
if (dataConsumer->GetType() != RTC::DataConsumer::Type::SCTP)
{
MS_THROW_ERROR("invalid DataConsumer type");
}
if (!this->sctpAssociation)
{
MS_THROW_ERROR("no SCTP association present");
}
dataConsumer->HandleRequest(request);
break;
}
default:
{
MS_THROW_ERROR("unknown method '%s'", request->method.c_str());
}
}
}
void Transport::HandleNotification(PayloadChannel::Notification* notification)
{
MS_TRACE();
switch (notification->eventId)
{
default:
{
MS_ERROR("unknown event '%s'", notification->event.c_str());
}
}
}
void Transport::Connected()
{
MS_TRACE();
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
consumer->TransportConnected();
}
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
dataConsumer->TransportConnected();
}
if (this->sctpAssociation)
this->sctpAssociation->TransportConnected();
this->rtcpTimer->Start(static_cast<uint64_t>(RTC::RTCP::MaxVideoIntervalMs / 2));
if (this->tccClient)
this->tccClient->TransportConnected();
if (this->tccServer)
this->tccServer->TransportConnected();
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
if (this->senderBwe)
this->senderBwe->TransportConnected();
#endif
}
void Transport::Disconnected()
{
MS_TRACE();
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
consumer->TransportDisconnected();
}
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
dataConsumer->TransportDisconnected();
}
this->rtcpTimer->Stop();
if (this->tccClient)
this->tccClient->TransportDisconnected();
if (this->tccServer)
this->tccServer->TransportDisconnected();
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
if (this->senderBwe)
this->senderBwe->TransportDisconnected();
#endif
}
void Transport::ReceiveRtpPacket(RTC::RtpPacket* packet)
{
MS_TRACE();
packet->SetMidExtensionId(this->recvRtpHeaderExtensionIds.mid);
packet->SetRidExtensionId(this->recvRtpHeaderExtensionIds.rid);
packet->SetRepairedRidExtensionId(this->recvRtpHeaderExtensionIds.rrid);
packet->SetAbsSendTimeExtensionId(this->recvRtpHeaderExtensionIds.absSendTime);
packet->SetTransportWideCc01ExtensionId(this->recvRtpHeaderExtensionIds.transportWideCc01);
auto nowMs = DepLibUV::GetTimeMs();
if (this->tccServer)
this->tccServer->IncomingPacket(nowMs, packet);
RTC::Producer* producer = this->rtpListener.GetProducer(packet);
if (!producer)
{
MS_WARN_TAG(
rtp,
"no suitable Producer for received RTP packet [ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]",
packet->GetSsrc(),
packet->GetPayloadType());
RecvStreamClosed(packet->GetSsrc());
delete packet;
return;
}
auto result = producer->ReceiveRtpPacket(packet);
switch (result)
{
case RTC::Producer::ReceiveRtpPacketResult::MEDIA:
this->recvRtpTransmission.Update(packet);
break;
case RTC::Producer::ReceiveRtpPacketResult::RETRANSMISSION:
this->recvRtxTransmission.Update(packet);
break;
case RTC::Producer::ReceiveRtpPacketResult::DISCARDED:
RecvStreamClosed(packet->GetSsrc());
break;
default:;
}
delete packet;
}
void Transport::ReceiveRtcpPacket(RTC::RTCP::Packet* packet)
{
MS_TRACE();
while (packet)
{
HandleRtcpPacket(packet);
auto* previousPacket = packet;
packet = packet->GetNext();
delete previousPacket;
}
}
void Transport::ReceiveSctpData(const uint8_t* data, size_t len)
{
MS_TRACE();
if (!this->sctpAssociation)
{
MS_DEBUG_TAG(sctp, "ignoring SCTP packet (SCTP not enabled)");
return;
}
this->sctpAssociation->ProcessSctpData(data, len);
}
void Transport::SetNewProducerIdFromInternal(json& internal, std::string& producerId) const
{
MS_TRACE();
auto jsonProducerIdIt = internal.find("producerId");
if (jsonProducerIdIt == internal.end() || !jsonProducerIdIt->is_string())
MS_THROW_ERROR("missing internal.producerId");
producerId.assign(jsonProducerIdIt->get<std::string>());
if (this->mapProducers.find(producerId) != this->mapProducers.end())
MS_THROW_ERROR("a Producer with same producerId already exists");
}
RTC::Producer* Transport::GetProducerFromInternal(json& internal) const
{
MS_TRACE();
auto jsonProducerIdIt = internal.find("producerId");
if (jsonProducerIdIt == internal.end() || !jsonProducerIdIt->is_string())
MS_THROW_ERROR("missing internal.producerId");
auto it = this->mapProducers.find(jsonProducerIdIt->get<std::string>());
if (it == this->mapProducers.end())
MS_THROW_ERROR("Producer not found");
RTC::Producer* producer = it->second;
return producer;
}
void Transport::SetNewConsumerIdFromInternal(json& internal, std::string& consumerId) const
{
MS_TRACE();
auto jsonConsumerIdIt = internal.find("consumerId");
if (jsonConsumerIdIt == internal.end() || !jsonConsumerIdIt->is_string())
MS_THROW_ERROR("missing internal.consumerId");
consumerId.assign(jsonConsumerIdIt->get<std::string>());
if (this->mapConsumers.find(consumerId) != this->mapConsumers.end())
MS_THROW_ERROR("a Consumer with same consumerId already exists");
}
RTC::Consumer* Transport::GetConsumerFromInternal(json& internal) const
{
MS_TRACE();
auto jsonConsumerIdIt = internal.find("consumerId");
if (jsonConsumerIdIt == internal.end() || !jsonConsumerIdIt->is_string())
MS_THROW_ERROR("missing internal.consumerId");
auto it = this->mapConsumers.find(jsonConsumerIdIt->get<std::string>());
if (it == this->mapConsumers.end())
MS_THROW_ERROR("Consumer not found");
RTC::Consumer* consumer = it->second;
return consumer;
}
inline RTC::Consumer* Transport::GetConsumerByMediaSsrc(uint32_t ssrc) const
{
MS_TRACE();
auto mapSsrcConsumerIt = this->mapSsrcConsumer.find(ssrc);
if (mapSsrcConsumerIt == this->mapSsrcConsumer.end())
return nullptr;
auto* consumer = mapSsrcConsumerIt->second;
return consumer;
}
inline RTC::Consumer* Transport::GetConsumerByRtxSsrc(uint32_t ssrc) const
{
MS_TRACE();
auto mapRtxSsrcConsumerIt = this->mapRtxSsrcConsumer.find(ssrc);
if (mapRtxSsrcConsumerIt == this->mapRtxSsrcConsumer.end())
return nullptr;
auto* consumer = mapRtxSsrcConsumerIt->second;
return consumer;
}
void Transport::SetNewDataProducerIdFromInternal(json& internal, std::string& dataProducerId) const
{
MS_TRACE();
auto jsonDataProducerIdIt = internal.find("dataProducerId");
if (jsonDataProducerIdIt == internal.end() || !jsonDataProducerIdIt->is_string())
MS_THROW_ERROR("missing internal.dataProducerId");
dataProducerId.assign(jsonDataProducerIdIt->get<std::string>());
if (this->mapDataProducers.find(dataProducerId) != this->mapDataProducers.end())
MS_THROW_ERROR("a DataProducer with same dataProducerId already exists");
}
RTC::DataProducer* Transport::GetDataProducerFromInternal(json& internal) const
{
MS_TRACE();
auto jsonDataProducerIdIt = internal.find("dataProducerId");
if (jsonDataProducerIdIt == internal.end() || !jsonDataProducerIdIt->is_string())
MS_THROW_ERROR("missing internal.dataProducerId");
auto it = this->mapDataProducers.find(jsonDataProducerIdIt->get<std::string>());
if (it == this->mapDataProducers.end())
MS_THROW_ERROR("DataProducer not found");
RTC::DataProducer* dataProducer = it->second;
return dataProducer;
}
void Transport::SetNewDataConsumerIdFromInternal(json& internal, std::string& dataConsumerId) const
{
MS_TRACE();
auto jsonDataConsumerIdIt = internal.find("dataConsumerId");
if (jsonDataConsumerIdIt == internal.end() || !jsonDataConsumerIdIt->is_string())
MS_THROW_ERROR("missing internal.dataConsumerId");
dataConsumerId.assign(jsonDataConsumerIdIt->get<std::string>());
if (this->mapDataConsumers.find(dataConsumerId) != this->mapDataConsumers.end())
MS_THROW_ERROR("a DataConsumer with same dataConsumerId already exists");
}
RTC::DataConsumer* Transport::GetDataConsumerFromInternal(json& internal) const
{
MS_TRACE();
auto jsonDataConsumerIdIt = internal.find("dataConsumerId");
if (jsonDataConsumerIdIt == internal.end() || !jsonDataConsumerIdIt->is_string())
MS_THROW_ERROR("missing internal.dataConsumerId");
auto it = this->mapDataConsumers.find(jsonDataConsumerIdIt->get<std::string>());
if (it == this->mapDataConsumers.end())
MS_THROW_ERROR("DataConsumer not found");
RTC::DataConsumer* dataConsumer = it->second;
return dataConsumer;
}
void Transport::HandleRtcpPacket(RTC::RTCP::Packet* packet)
{
MS_TRACE();
switch (packet->GetType())
{
case RTC::RTCP::Type::RR:
{
auto* rr = static_cast<RTC::RTCP::ReceiverReportPacket*>(packet);
for (auto it = rr->Begin(); it != rr->End(); ++it)
{
auto& report = *it;
auto* consumer = GetConsumerByMediaSsrc(report->GetSsrc());
if (!consumer)
{
if (report->GetSsrc() == RTC::RtpProbationSsrc)
{
continue;
}
MS_DEBUG_TAG(
rtcp,
"no Consumer found for received Receiver Report [ssrc:%" PRIu32 "]",
report->GetSsrc());
continue;
}
consumer->ReceiveRtcpReceiverReport(report);
}
if (this->tccClient && !this->mapConsumers.empty())
{
float rtt = 0;
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
if (consumer->IsActive())
{
rtt = consumer->GetRtt();
break;
}
}
this->tccClient->ReceiveRtcpReceiverReport(rr, rtt, DepLibUV::GetTimeMsInt64());
}
break;
}
case RTC::RTCP::Type::PSFB:
{
auto* feedback = static_cast<RTC::RTCP::FeedbackPsPacket*>(packet);
switch (feedback->GetMessageType())
{
case RTC::RTCP::FeedbackPs::MessageType::PLI:
{
auto* consumer = GetConsumerByMediaSsrc(feedback->GetMediaSsrc());
if (feedback->GetMediaSsrc() == RTC::RtpProbationSsrc)
{
break;
}
else if (!consumer)
{
MS_DEBUG_TAG(
rtcp,
"no Consumer found for received PLI Feedback packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
break;
}
MS_DEBUG_TAG(
rtcp,
"PLI received, requesting key frame for Consumer "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
consumer->ReceiveKeyFrameRequest(
RTC::RTCP::FeedbackPs::MessageType::PLI, feedback->GetMediaSsrc());
break;
}
case RTC::RTCP::FeedbackPs::MessageType::FIR:
{
auto* fir = static_cast<RTC::RTCP::FeedbackPsFirPacket*>(packet);
for (auto it = fir->Begin(); it != fir->End(); ++it)
{
auto& item = *it;
auto* consumer = GetConsumerByMediaSsrc(item->GetSsrc());
if (item->GetSsrc() == RTC::RtpProbationSsrc)
{
continue;
}
else if (!consumer)
{
MS_DEBUG_TAG(
rtcp,
"no Consumer found for received FIR Feedback packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 ", item ssrc:%" PRIu32 "]",
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc(),
item->GetSsrc());
continue;
}
MS_DEBUG_TAG(
rtcp,
"FIR received, requesting key frame for Consumer "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 ", item ssrc:%" PRIu32 "]",
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc(),
item->GetSsrc());
consumer->ReceiveKeyFrameRequest(feedback->GetMessageType(), item->GetSsrc());
}
break;
}
case RTC::RTCP::FeedbackPs::MessageType::AFB:
{
auto* afb = static_cast<RTC::RTCP::FeedbackPsAfbPacket*>(feedback);
if (afb->GetApplication() == RTC::RTCP::FeedbackPsAfbPacket::Application::REMB)
{
auto* remb = static_cast<RTC::RTCP::FeedbackPsRembPacket*>(afb);
if (
this->tccClient &&
this->tccClient->GetBweType() == RTC::BweType::REMB
)
{
this->tccClient->ReceiveEstimatedBitrate(remb->GetBitrate());
}
break;
}
else
{
MS_DEBUG_TAG(
rtcp,
"ignoring unsupported %s Feedback PS AFB packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
RTC::RTCP::FeedbackPsPacket::MessageType2String(feedback->GetMessageType()).c_str(),
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
break;
}
}
default:
{
MS_DEBUG_TAG(
rtcp,
"ignoring unsupported %s Feedback packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
RTC::RTCP::FeedbackPsPacket::MessageType2String(feedback->GetMessageType()).c_str(),
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
}
}
break;
}
case RTC::RTCP::Type::RTPFB:
{
auto* feedback = static_cast<RTC::RTCP::FeedbackRtpPacket*>(packet);
auto* consumer = GetConsumerByMediaSsrc(feedback->GetMediaSsrc());
if (
!consumer &&
feedback->GetMessageType() != RTC::RTCP::FeedbackRtp::MessageType::TCC &&
(
feedback->GetMediaSsrc() != RTC::RtpProbationSsrc ||
!GetConsumerByRtxSsrc(feedback->GetMediaSsrc())
)
)
{
MS_DEBUG_TAG(
rtcp,
"no Consumer found for received Feedback packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
break;
}
switch (feedback->GetMessageType())
{
case RTC::RTCP::FeedbackRtp::MessageType::NACK:
{
if (!consumer)
{
MS_DEBUG_TAG(
rtcp,
"no Consumer found for received NACK Feedback packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
break;
}
auto* nackPacket = static_cast<RTC::RTCP::FeedbackRtpNackPacket*>(packet);
consumer->ReceiveNack(nackPacket);
break;
}
case RTC::RTCP::FeedbackRtp::MessageType::TCC:
{
auto* feedback = static_cast<RTC::RTCP::FeedbackRtpTransportPacket*>(packet);
if (this->tccClient)
this->tccClient->ReceiveRtcpTransportFeedback(feedback);
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
if (this->senderBwe)
this->senderBwe->ReceiveRtcpTransportFeedback(feedback);
#endif
break;
}
default:
{
MS_DEBUG_TAG(
rtcp,
"ignoring unsupported %s Feedback packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
RTC::RTCP::FeedbackRtpPacket::MessageType2String(feedback->GetMessageType()).c_str(),
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
}
}
break;
}
case RTC::RTCP::Type::SR:
{
auto* sr = static_cast<RTC::RTCP::SenderReportPacket*>(packet);
for (auto it = sr->Begin(); it != sr->End(); ++it)
{
auto& report = *it;
auto* producer = this->rtpListener.GetProducer(report->GetSsrc());
if (!producer)
{
MS_DEBUG_TAG(
rtcp,
"no Producer found for received Sender Report [ssrc:%" PRIu32 "]",
report->GetSsrc());
continue;
}
producer->ReceiveRtcpSenderReport(report);
}
break;
}
case RTC::RTCP::Type::SDES:
{
break;
}
case RTC::RTCP::Type::BYE:
{
MS_DEBUG_TAG(rtcp, "ignoring received RTCP BYE");
break;
}
case RTC::RTCP::Type::XR:
{
auto* xr = static_cast<RTC::RTCP::ExtendedReportPacket*>(packet);
for (auto it = xr->Begin(); it != xr->End(); ++it)
{
auto& report = *it;
switch (report->GetType())
{
case RTC::RTCP::ExtendedReportBlock::Type::DLRR:
{
auto* dlrr = static_cast<RTC::RTCP::DelaySinceLastRr*>(report);
for (auto it2 = dlrr->Begin(); it2 != dlrr->End(); ++it2)
{
auto& ssrcInfo = *it2;
if (ssrcInfo->GetSsrc() == 0)
ssrcInfo->SetSsrc(xr->GetSsrc());
auto* producer = this->rtpListener.GetProducer(ssrcInfo->GetSsrc());
if (!producer)
{
MS_DEBUG_TAG(
rtcp,
"no Producer found for received Sender Extended Report [ssrc:%" PRIu32 "]",
ssrcInfo->GetSsrc());
continue;
}
producer->ReceiveRtcpXrDelaySinceLastRr(ssrcInfo);
}
break;
}
default:;
}
}
break;
}
default:
{
MS_DEBUG_TAG(
rtcp,
"unhandled RTCP type received [type:%" PRIu8 "]",
static_cast<uint8_t>(packet->GetType()));
}
}
}
void Transport::SendRtcp(uint64_t nowMs)
{
MS_TRACE();
std::unique_ptr<RTC::RTCP::CompoundPacket> packet{ nullptr };
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
for (auto* rtpStream : consumer->GetRtpStreams())
{
packet.reset(new RTC::RTCP::CompoundPacket());
consumer->GetRtcp(packet.get(), rtpStream, nowMs);
if (packet->HasSenderReport())
{
packet->Serialize(RTC::RTCP::Buffer);
SendRtcpCompoundPacket(packet.get());
}
}
}
packet.reset(new RTC::RTCP::CompoundPacket());
for (auto& kv : this->mapProducers)
{
auto* producer = kv.second;
producer->GetRtcp(packet.get(), nowMs);
if (packet->GetSize() + sizeof(RTCP::ReceiverReport::Header) > RTC::MtuSize)
{
packet->Serialize(RTC::RTCP::Buffer);
SendRtcpCompoundPacket(packet.get());
packet.reset(new RTC::RTCP::CompoundPacket());
}
}
if (packet->GetReceiverReportCount() != 0u)
{
packet->Serialize(RTC::RTCP::Buffer);
SendRtcpCompoundPacket(packet.get());
}
}
void Transport::DistributeAvailableOutgoingBitrate()
{
MS_TRACE();
MS_ASSERT(this->tccClient, "no TransportCongestionClient");
std::multimap<uint8_t, RTC::Consumer*> multimapPriorityConsumer;
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
auto priority = consumer->GetBitratePriority();
if (priority > 0u)
multimapPriorityConsumer.emplace(priority, consumer);
}
if (multimapPriorityConsumer.empty())
return;
uint32_t availableBitrate = this->tccClient->GetAvailableBitrate();
this->tccClient->RescheduleNextAvailableBitrateEvent();
MS_DEBUG_DEV("before layer-by-layer iterations [availableBitrate:%" PRIu32 "]", availableBitrate);
while (availableBitrate > 0u)
{
auto previousAvailableBitrate = availableBitrate;
for (auto it = multimapPriorityConsumer.rbegin(); it != multimapPriorityConsumer.rend(); ++it)
{
auto priority = it->first;
auto* consumer = it->second;
auto bweType = this->tccClient->GetBweType();
for (uint8_t i{ 1u }; i <= priority; ++i)
{
uint32_t usedBitrate;
switch (bweType)
{
case RTC::BweType::TRANSPORT_CC:
usedBitrate = consumer->IncreaseLayer(availableBitrate, false);
break;
case RTC::BweType::REMB:
usedBitrate = consumer->IncreaseLayer(availableBitrate, true);
break;
}
MS_ASSERT(usedBitrate <= availableBitrate, "Consumer used more layer bitrate than given");
availableBitrate -= usedBitrate;
if (usedBitrate == 0u)
break;
}
}
if (availableBitrate == previousAvailableBitrate)
break;
}
MS_DEBUG_DEV("after layer-by-layer iterations [availableBitrate:%" PRIu32 "]", availableBitrate);
for (auto it = multimapPriorityConsumer.rbegin(); it != multimapPriorityConsumer.rend(); ++it)
{
auto* consumer = it->second;
consumer->ApplyLayers();
}
}
void Transport::ComputeOutgoingDesiredBitrate(bool forceBitrate)
{
MS_TRACE();
MS_ASSERT(this->tccClient, "no TransportCongestionClient");
uint32_t totalDesiredBitrate{ 0u };
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
auto desiredBitrate = consumer->GetDesiredBitrate();
totalDesiredBitrate += desiredBitrate;
}
MS_DEBUG_DEV("total desired bitrate: %" PRIu32, totalDesiredBitrate);
this->tccClient->SetDesiredBitrate(totalDesiredBitrate, forceBitrate);
}
inline void Transport::EmitTraceEventProbationType(RTC::RtpPacket* packet) const
{
MS_TRACE();
if (!this->traceEventTypes.probation)
return;
json data = json::object();
data["type"] = "probation";
data["timestamp"] = DepLibUV::GetTimeMs();
data["direction"] = "out";
packet->FillJson(data["info"]);
Channel::ChannelNotifier::Emit(this->id, "trace", data);
}
inline void Transport::EmitTraceEventBweType(
RTC::TransportCongestionControlClient::Bitrates& bitrates) const
{
MS_TRACE();
if (!this->traceEventTypes.bwe)
return;
json data = json::object();
data["type"] = "bwe";
data["timestamp"] = DepLibUV::GetTimeMs();
data["direction"] = "out";
data["info"]["desiredBitrate"] = bitrates.desiredBitrate;
data["info"]["effectiveDesiredBitrate"] = bitrates.effectiveDesiredBitrate;
data["info"]["minBitrate"] = bitrates.minBitrate;
data["info"]["maxBitrate"] = bitrates.maxBitrate;
data["info"]["startBitrate"] = bitrates.startBitrate;
data["info"]["maxPaddingBitrate"] = bitrates.maxPaddingBitrate;
data["info"]["availableBitrate"] = bitrates.availableBitrate;
switch (this->tccClient->GetBweType())
{
case RTC::BweType::TRANSPORT_CC:
data["info"]["type"] = "transport-cc";
break;
case RTC::BweType::REMB:
data["info"]["type"] = "remb";
break;
}
Channel::ChannelNotifier::Emit(this->id, "trace", data);
}
inline void Transport::OnProducerPaused(RTC::Producer* producer)
{
MS_TRACE();
this->listener->OnTransportProducerPaused(this, producer);
}
inline void Transport::OnProducerResumed(RTC::Producer* producer)
{
MS_TRACE();
this->listener->OnTransportProducerResumed(this, producer);
}
inline void Transport::OnProducerNewRtpStream(
RTC::Producer* producer, RTC::RtpStream* rtpStream, uint32_t mappedSsrc)
{
MS_TRACE();
this->listener->OnTransportProducerNewRtpStream(this, producer, rtpStream, mappedSsrc);
}
inline void Transport::OnProducerRtpStreamScore(
RTC::Producer* producer, RTC::RtpStream* rtpStream, uint8_t score, uint8_t previousScore)
{
MS_TRACE();
this->listener->OnTransportProducerRtpStreamScore(this, producer, rtpStream, score, previousScore);
}
inline void Transport::OnProducerRtcpSenderReport(
RTC::Producer* producer, RTC::RtpStream* rtpStream, bool first)
{
MS_TRACE();
this->listener->OnTransportProducerRtcpSenderReport(this, producer, rtpStream, first);
}
inline void Transport::OnProducerRtpPacketReceived(RTC::Producer* producer, RTC::RtpPacket* packet)
{
MS_TRACE();
this->listener->OnTransportProducerRtpPacketReceived(this, producer, packet);
}
inline void Transport::OnProducerSendRtcpPacket(RTC::Producer* , RTC::RTCP::Packet* packet)
{
MS_TRACE();
SendRtcpPacket(packet);
}
inline void Transport::OnProducerNeedWorstRemoteFractionLost(
RTC::Producer* producer, uint32_t mappedSsrc, uint8_t& worstRemoteFractionLost)
{
MS_TRACE();
this->listener->OnTransportNeedWorstRemoteFractionLost(
this, producer, mappedSsrc, worstRemoteFractionLost);
}
inline void Transport::OnConsumerSendRtpPacket(RTC::Consumer* consumer, RTC::RtpPacket* packet)
{
MS_TRACE();
packet->UpdateAbsSendTime(DepLibUV::GetTimeMs());
if (
this->tccClient &&
this->tccClient->GetBweType() == RTC::BweType::TRANSPORT_CC &&
packet->UpdateTransportWideCc01(this->transportWideCcSeq + 1)
)
{
this->transportWideCcSeq++;
auto* tccClient = this->tccClient;
webrtc::RtpPacketSendInfo packetInfo;
packetInfo.ssrc = packet->GetSsrc();
packetInfo.transport_sequence_number = this->transportWideCcSeq;
packetInfo.has_rtp_sequence_number = true;
packetInfo.rtp_sequence_number = packet->GetSequenceNumber();
packetInfo.length = packet->GetSize();
packetInfo.pacing_info = this->tccClient->GetPacingInfo();
this->tccClient->InsertPacket(packetInfo);
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
auto* senderBwe = this->senderBwe;
RTC::SenderBandwidthEstimator::SentInfo sentInfo;
sentInfo.wideSeq = this->transportWideCcSeq;
sentInfo.size = packet->GetSize();
sentInfo.sendingAtMs = DepLibUV::GetTimeMs();
auto* cb = new onSendCallback([tccClient, &packetInfo, senderBwe, &sentInfo](bool sent) {
if (sent)
{
tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64());
sentInfo.sentAtMs = DepLibUV::GetTimeMs();
senderBwe->RtpPacketSent(sentInfo);
}
});
SendRtpPacket(consumer, packet, cb);
#else
const auto* cb = new onSendCallback([tccClient, &packetInfo](bool sent) {
if (sent)
tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64());
});
SendRtpPacket(consumer, packet, cb);
#endif
}
else
{
SendRtpPacket(consumer, packet);
}
this->sendRtpTransmission.Update(packet);
}
inline void Transport::OnConsumerRetransmitRtpPacket(RTC::Consumer* consumer, RTC::RtpPacket* packet)
{
MS_TRACE();
packet->UpdateAbsSendTime(DepLibUV::GetTimeMs());
if (
this->tccClient &&
this->tccClient->GetBweType() == RTC::BweType::TRANSPORT_CC &&
packet->UpdateTransportWideCc01(this->transportWideCcSeq + 1)
)
{
this->transportWideCcSeq++;
auto* tccClient = this->tccClient;
webrtc::RtpPacketSendInfo packetInfo;
packetInfo.ssrc = packet->GetSsrc();
packetInfo.transport_sequence_number = this->transportWideCcSeq;
packetInfo.has_rtp_sequence_number = true;
packetInfo.rtp_sequence_number = packet->GetSequenceNumber();
packetInfo.length = packet->GetSize();
packetInfo.pacing_info = this->tccClient->GetPacingInfo();
this->tccClient->InsertPacket(packetInfo);
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
auto* senderBwe = this->senderBwe;
RTC::SenderBandwidthEstimator::SentInfo sentInfo;
sentInfo.wideSeq = this->transportWideCcSeq;
sentInfo.size = packet->GetSize();
sentInfo.sendingAtMs = DepLibUV::GetTimeMs();
auto* cb = new onSendCallback([tccClient, &packetInfo, senderBwe, &sentInfo](bool sent) {
if (sent)
{
tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64());
sentInfo.sentAtMs = DepLibUV::GetTimeMs();
senderBwe->RtpPacketSent(sentInfo);
}
});
SendRtpPacket(consumer, packet, cb);
#else
const auto* cb = new onSendCallback([tccClient, &packetInfo](bool sent) {
if (sent)
tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64());
});
SendRtpPacket(consumer, packet, cb);
#endif
}
else
{
SendRtpPacket(consumer, packet);
}
this->sendRtxTransmission.Update(packet);
}
inline void Transport::OnConsumerKeyFrameRequested(RTC::Consumer* consumer, uint32_t mappedSsrc)
{
MS_TRACE();
if (!IsConnected())
{
MS_WARN_TAG(rtcp, "ignoring key rame request (transport not connected)");
return;
}
this->listener->OnTransportConsumerKeyFrameRequested(this, consumer, mappedSsrc);
}
inline void Transport::OnConsumerNeedBitrateChange(RTC::Consumer* )
{
MS_TRACE();
MS_ASSERT(this->tccClient, "no TransportCongestionClient");
DistributeAvailableOutgoingBitrate();
ComputeOutgoingDesiredBitrate();
}
inline void Transport::OnConsumerNeedZeroBitrate(RTC::Consumer* )
{
MS_TRACE();
MS_ASSERT(this->tccClient, "no TransportCongestionClient");
DistributeAvailableOutgoingBitrate();
ComputeOutgoingDesiredBitrate( true);
}
inline void Transport::OnConsumerProducerClosed(RTC::Consumer* consumer)
{
MS_TRACE();
this->mapConsumers.erase(consumer->id);
for (auto ssrc : consumer->GetMediaSsrcs())
{
this->mapSsrcConsumer.erase(ssrc);
SendStreamClosed(ssrc);
}
for (auto ssrc : consumer->GetRtxSsrcs())
{
this->mapRtxSsrcConsumer.erase(ssrc);
SendStreamClosed(ssrc);
}
this->listener->OnTransportConsumerProducerClosed(this, consumer);
delete consumer;
if (this->tccClient)
ComputeOutgoingDesiredBitrate( true);
}
inline void Transport::OnDataProducerMessageReceived(
RTC::DataProducer* dataProducer, uint32_t ppid, const uint8_t* msg, size_t len)
{
MS_TRACE();
this->listener->OnTransportDataProducerMessageReceived(this, dataProducer, ppid, msg, len);
}
inline void Transport::OnDataConsumerSendMessage(
RTC::DataConsumer* dataConsumer, uint32_t ppid, const uint8_t* msg, size_t len, onQueuedCallback* cb)
{
MS_TRACE();
SendMessage(dataConsumer, ppid, msg, len, cb);
}
inline void Transport::OnDataConsumerDataProducerClosed(RTC::DataConsumer* dataConsumer)
{
MS_TRACE();
this->mapDataConsumers.erase(dataConsumer->id);
this->listener->OnTransportDataConsumerDataProducerClosed(this, dataConsumer);
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
this->sctpAssociation->DataConsumerClosed(dataConsumer);
}
delete dataConsumer;
}
inline void Transport::OnSctpAssociationConnecting(RTC::SctpAssociation* )
{
MS_TRACE();
json data = json::object();
data["sctpState"] = "connecting";
Channel::ChannelNotifier::Emit(this->id, "sctpstatechange", data);
}
inline void Transport::OnSctpAssociationConnected(RTC::SctpAssociation* )
{
MS_TRACE();
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
dataConsumer->SctpAssociationConnected();
}
}
json data = json::object();
data["sctpState"] = "connected";
Channel::ChannelNotifier::Emit(this->id, "sctpstatechange", data);
}
inline void Transport::OnSctpAssociationFailed(RTC::SctpAssociation* )
{
MS_TRACE();
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
dataConsumer->SctpAssociationClosed();
}
}
json data = json::object();
data["sctpState"] = "failed";
Channel::ChannelNotifier::Emit(this->id, "sctpstatechange", data);
}
inline void Transport::OnSctpAssociationClosed(RTC::SctpAssociation* )
{
MS_TRACE();
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
dataConsumer->SctpAssociationClosed();
}
}
json data = json::object();
data["sctpState"] = "closed";
Channel::ChannelNotifier::Emit(this->id, "sctpstatechange", data);
}
inline void Transport::OnSctpAssociationSendData(
RTC::SctpAssociation* , const uint8_t* data, size_t len)
{
MS_TRACE();
if (this->destroying)
return;
if (this->sctpAssociation)
SendSctpData(data, len);
}
inline void Transport::OnSctpAssociationMessageReceived(
RTC::SctpAssociation* ,
uint16_t streamId,
uint32_t ppid,
const uint8_t* msg,
size_t len)
{
MS_TRACE();
RTC::DataProducer* dataProducer = this->sctpListener.GetDataProducer(streamId);
if (!dataProducer)
{
MS_WARN_TAG(
sctp, "no suitable DataProducer for received SCTP message [streamId:%" PRIu16 "]", streamId);
return;
}
try
{
dataProducer->ReceiveMessage(ppid, msg, len);
}
catch (std::exception& error)
{
}
}
inline void Transport::OnSctpAssociationBufferedAmount(
RTC::SctpAssociation* , uint32_t bufferedAmount)
{
MS_TRACE();
for (const auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
dataConsumer->SctpAssociationBufferedAmount(bufferedAmount);
}
}
inline void Transport::OnTransportCongestionControlClientBitrates(
RTC::TransportCongestionControlClient* ,
RTC::TransportCongestionControlClient::Bitrates& bitrates)
{
MS_TRACE();
MS_DEBUG_DEV("outgoing available bitrate:%" PRIu32, bitrates.availableBitrate);
DistributeAvailableOutgoingBitrate();
ComputeOutgoingDesiredBitrate();
EmitTraceEventBweType(bitrates);
}
inline void Transport::OnTransportCongestionControlClientSendRtpPacket(
RTC::TransportCongestionControlClient* tccClient,
RTC::RtpPacket* packet,
const webrtc::PacedPacketInfo& pacingInfo)
{
MS_TRACE();
packet->UpdateAbsSendTime(DepLibUV::GetTimeMs());
if (
this->tccClient->GetBweType() == RTC::BweType::TRANSPORT_CC &&
packet->UpdateTransportWideCc01(this->transportWideCcSeq + 1)
)
{
this->transportWideCcSeq++;
EmitTraceEventProbationType(packet);
webrtc::RtpPacketSendInfo packetInfo;
packetInfo.ssrc = packet->GetSsrc();
packetInfo.transport_sequence_number = this->transportWideCcSeq;
packetInfo.has_rtp_sequence_number = true;
packetInfo.rtp_sequence_number = packet->GetSequenceNumber();
packetInfo.length = packet->GetSize();
packetInfo.pacing_info = pacingInfo;
this->tccClient->InsertPacket(packetInfo);
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
auto* senderBwe = this->senderBwe;
RTC::SenderBandwidthEstimator::SentInfo sentInfo;
sentInfo.wideSeq = this->transportWideCcSeq;
sentInfo.size = packet->GetSize();
sentInfo.isProbation = true;
sentInfo.sendingAtMs = DepLibUV::GetTimeMs();
auto* cb = new onSendCallback([tccClient, &packetInfo, senderBwe, &sentInfo](bool sent) {
if (sent)
{
tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64());
sentInfo.sentAtMs = DepLibUV::GetTimeMs();
senderBwe->RtpPacketSent(sentInfo);
}
});
SendRtpPacket(nullptr, packet, cb);
#else
const auto* cb = new onSendCallback([tccClient, &packetInfo](bool sent) {
if (sent)
tccClient->PacketSent(packetInfo, DepLibUV::GetTimeMsInt64());
});
SendRtpPacket(nullptr, packet, cb);
#endif
}
else
{
EmitTraceEventProbationType(packet);
SendRtpPacket(nullptr, packet);
}
this->sendProbationTransmission.Update(packet);
MS_DEBUG_DEV(
"probation sent [seq:%" PRIu16 ", wideSeq:%" PRIu16 ", size:%zu, bitrate:%" PRIu32 "]",
packet->GetSequenceNumber(),
this->transportWideCcSeq,
packet->GetSize(),
this->sendProbationTransmission.GetBitrate(DepLibUV::GetTimeMs()));
}
inline void Transport::OnTransportCongestionControlServerSendRtcpPacket(
RTC::TransportCongestionControlServer* , RTC::RTCP::Packet* packet)
{
MS_TRACE();
packet->Serialize(RTC::RTCP::Buffer);
SendRtcpPacket(packet);
}
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
inline void Transport::OnSenderBandwidthEstimatorAvailableBitrate(
RTC::SenderBandwidthEstimator* ,
uint32_t availableBitrate,
uint32_t previousAvailableBitrate)
{
MS_TRACE();
MS_DEBUG_DEV(
"outgoing available bitrate [now:%" PRIu32 ", before:%" PRIu32 "]",
availableBitrate,
previousAvailableBitrate);
}
#endif
inline void Transport::OnTimer(Timer* timer)
{
MS_TRACE();
if (timer == this->rtcpTimer)
{
auto interval = static_cast<uint64_t>(RTC::RTCP::MaxVideoIntervalMs);
uint64_t nowMs = DepLibUV::GetTimeMs();
SendRtcp(nowMs);
if (!this->mapConsumers.empty())
{
uint32_t rate{ 0 };
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
rate += consumer->GetTransmissionRate(nowMs) / 1000;
}
if (rate != 0u)
interval = 360000 / rate;
if (interval > RTC::RTCP::MaxVideoIntervalMs)
interval = RTC::RTCP::MaxVideoIntervalMs;
}
interval *= static_cast<float>(Utils::Crypto::GetRandomUInt(5, 15)) / 10;
this->rtcpTimer->Start(interval);
}
}
}