#define MS_CLASS "RTC::Transport"
#include "RTC/Transport.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#ifdef MS_LIBURING_SUPPORTED
#include "DepLibUring.hpp"
#endif
#include "FBS/sctpAssociation.h"
#include "FBS/transport.h"
#include "RTC/BweType.hpp"
#include "RTC/Consts.hpp"
#include "RTC/PipeConsumer.hpp"
#include "RTC/RTCP/FeedbackPs.hpp"
#include "RTC/RTCP/FeedbackPsAfb.hpp"
#include "RTC/RTCP/FeedbackPsRemb.hpp"
#include "RTC/RTCP/FeedbackRtpNack.hpp"
#include "RTC/RTCP/FeedbackRtpTransport.hpp"
#include "RTC/RTCP/XrDelaySinceLastRr.hpp"
#include "RTC/RtpDictionaries.hpp"
#include "RTC/SCTP/association/Association.hpp"
#include "RTC/SCTP/public/SctpOptions.hpp"
#include "RTC/SimpleConsumer.hpp"
#include "RTC/SimulcastConsumer.hpp"
#include "RTC/SvcConsumer.hpp"
#ifdef MS_RTC_LOGGER_RTP
#include "RTC/RtcLogger.hpp"
#endif
#include <libwebrtc/modules/rtp_rtcp/include/rtp_rtcp_defines.h>
#include <array>
#include <iterator>
#include <map>
namespace RTC
{
Transport::Transport(
SharedInterface* shared,
const std::string& id,
RTC::Transport::Listener* listener,
const FBS::Transport::Options* options)
: id(id),
shared(shared),
listener(listener),
recvRtpTransmission(shared, false),
sendRtpTransmission(shared, false),
recvRtxTransmission(shared, false, 1000u),
sendRtxTransmission(shared, false, 1000u),
sendProbationTransmission(shared, false, 100u)
{
MS_TRACE();
this->maxSendMessageSize = options->maxSendMessageSize();
this->maxReceiveMessageSize = options->maxReceiveMessageSize();
if (options->direct())
{
this->direct = true;
}
else
{
this->sctpSendBufferSize = options->sctpSendBufferSize();
this->sctpPerStreamSendQueueLimit = options->sctpPerStreamSendQueueLimit();
this->sctpMaxReceiverWindowBufferSize = options->sctpMaxReceiverWindowBufferSize();
}
if (
auto initialAvailableOutgoingBitrate = options->initialAvailableOutgoingBitrate();
initialAvailableOutgoingBitrate.has_value())
{
this->initialAvailableOutgoingBitrate = initialAvailableOutgoingBitrate.value();
}
if (options->enableSctp())
{
if (this->direct)
{
MS_THROW_TYPE_ERROR("cannot enable SCTP in a direct Transport");
}
const RTC::SCTP::SctpOptions sctpOptions = {
.mtu = RTC::Consts::MaxSafeMtuSizeForSctp,
.maxSendMessageSize = this->maxSendMessageSize,
.maxSendBufferSize = this->sctpSendBufferSize,
.perStreamSendQueueLimit = this->sctpPerStreamSendQueueLimit,
.maxReceiveMessageSize = this->maxReceiveMessageSize,
.maxReceiverWindowBufferSize = this->sctpMaxReceiverWindowBufferSize
};
this->sctpAssociation = std::make_unique<RTC::SCTP::Association>(
sctpOptions, this, this->shared, options->isDataChannel());
}
this->rtcpTimer = this->shared->CreateTimer(this);
}
Transport::~Transport()
{
MS_TRACE();
for (auto& kv : this->mapProducers)
{
auto* producer = kv.second;
delete producer;
}
this->mapProducers.clear();
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
delete consumer;
}
this->mapConsumers.clear();
this->mapSsrcConsumer.clear();
this->mapRtxSsrcConsumer.clear();
for (auto& kv : this->mapDataProducers)
{
auto* dataProducer = kv.second;
delete dataProducer;
}
this->mapDataProducers.clear();
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
delete dataConsumer;
}
this->mapDataConsumers.clear();
this->mapSctpStreamIdDataConsumers.clear();
delete this->rtcpTimer;
this->rtcpTimer = nullptr;
}
void Transport::CloseProducersAndConsumers()
{
MS_TRACE();
for (auto& kv : this->mapProducers)
{
auto* producer = kv.second;
this->listener->OnTransportProducerClosed(this, producer);
delete producer;
}
this->mapProducers.clear();
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
this->listener->OnTransportConsumerClosed(this, consumer);
delete consumer;
}
this->mapConsumers.clear();
this->mapSsrcConsumer.clear();
this->mapRtxSsrcConsumer.clear();
for (auto& kv : this->mapDataProducers)
{
auto* dataProducer = kv.second;
this->listener->OnTransportDataProducerClosed(this, dataProducer);
delete dataProducer;
}
this->mapDataProducers.clear();
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
this->listener->OnTransportDataConsumerClosed(this, dataConsumer);
delete dataConsumer;
}
this->mapDataConsumers.clear();
this->mapSctpStreamIdDataConsumers.clear();
}
void Transport::ListenServerClosed()
{
MS_TRACE();
this->listener->OnTransportListenServerClosed(this);
}
flatbuffers::Offset<FBS::Transport::Dump> Transport::FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const
{
MS_TRACE();
std::vector<flatbuffers::Offset<flatbuffers::String>> producerIds;
for (const auto& kv : this->mapProducers)
{
const auto& producerId = kv.first;
producerIds.emplace_back(builder.CreateString(producerId));
}
std::vector<flatbuffers::Offset<flatbuffers::String>> consumerIds;
for (const auto& kv : this->mapConsumers)
{
const auto& consumerId = kv.first;
consumerIds.emplace_back(builder.CreateString(consumerId));
}
std::vector<flatbuffers::Offset<FBS::Common::Uint32String>> mapSsrcConsumerId;
for (const auto& kv : this->mapSsrcConsumer)
{
auto ssrc = kv.first;
auto* consumer = kv.second;
mapSsrcConsumerId.emplace_back(
FBS::Common::CreateUint32StringDirect(builder, ssrc, consumer->id.c_str()));
}
std::vector<flatbuffers::Offset<FBS::Common::Uint32String>> mapRtxSsrcConsumerId;
for (const auto& kv : this->mapRtxSsrcConsumer)
{
auto ssrc = kv.first;
auto* consumer = kv.second;
mapRtxSsrcConsumerId.emplace_back(
FBS::Common::CreateUint32StringDirect(builder, ssrc, consumer->id.c_str()));
}
std::vector<flatbuffers::Offset<flatbuffers::String>> dataProducerIds;
for (const auto& kv : this->mapDataProducers)
{
const auto& dataProducerId = kv.first;
dataProducerIds.emplace_back(builder.CreateString(dataProducerId));
}
std::vector<flatbuffers::Offset<flatbuffers::String>> dataConsumerIds;
for (const auto& kv : this->mapDataConsumers)
{
const auto& dataConsumerId = kv.first;
dataConsumerIds.emplace_back(builder.CreateString(dataConsumerId));
}
auto recvRtpHeaderExtensions = FBS::Transport::CreateRecvRtpHeaderExtensions(
builder,
this->recvRtpHeaderExtensionIds.mid != 0u
? flatbuffers::Optional<uint8_t>(this->recvRtpHeaderExtensionIds.mid)
: flatbuffers::nullopt,
this->recvRtpHeaderExtensionIds.rid != 0u
? flatbuffers::Optional<uint8_t>(this->recvRtpHeaderExtensionIds.rid)
: flatbuffers::nullopt,
this->recvRtpHeaderExtensionIds.rrid != 0u
? flatbuffers::Optional<uint8_t>(this->recvRtpHeaderExtensionIds.rrid)
: flatbuffers::nullopt,
this->recvRtpHeaderExtensionIds.absSendTime != 0u
? flatbuffers::Optional<uint8_t>(this->recvRtpHeaderExtensionIds.absSendTime)
: flatbuffers::nullopt,
this->recvRtpHeaderExtensionIds.transportWideCc01 != 0u
? flatbuffers::Optional<uint8_t>(this->recvRtpHeaderExtensionIds.transportWideCc01)
: flatbuffers::nullopt);
auto rtpListenerOffset = this->rtpListener.FillBuffer(builder);
flatbuffers::Offset<FBS::SctpParameters::SctpParameters> sctpParameters;
FBS::SctpAssociation::SctpState sctpState{ FBS::SctpAssociation::SctpState::NEW };
flatbuffers::Offset<FBS::Transport::SctpListener> sctpListener;
if (this->sctpAssociation)
{
sctpParameters = this->sctpAssociation->FillBuffer(builder);
switch (this->sctpAssociation->GetAssociationState())
{
case RTC::SCTP::Types::AssociationState::NEW:
{
sctpState = FBS::SctpAssociation::SctpState::NEW;
break;
}
case RTC::SCTP::Types::AssociationState::CONNECTING:
{
sctpState = FBS::SctpAssociation::SctpState::CONNECTING;
break;
}
case RTC::SCTP::Types::AssociationState::CONNECTED:
{
sctpState = FBS::SctpAssociation::SctpState::CONNECTED;
break;
}
case RTC::SCTP::Types::AssociationState::SHUTTING_DOWN:
case RTC::SCTP::Types::AssociationState::CLOSED:
{
sctpState = FBS::SctpAssociation::SctpState::CLOSED;
break;
}
}
sctpListener = this->sctpListener.FillBuffer(builder);
}
std::vector<FBS::Transport::TraceEventType> traceEventTypes;
if (this->traceEventTypes.probation)
{
traceEventTypes.emplace_back(FBS::Transport::TraceEventType::PROBATION);
}
if (this->traceEventTypes.bwe)
{
traceEventTypes.emplace_back(FBS::Transport::TraceEventType::BWE);
}
return FBS::Transport::CreateDumpDirect(
builder,
this->id.c_str(),
this->direct,
&producerIds,
&consumerIds,
&mapSsrcConsumerId,
&mapRtxSsrcConsumerId,
&dataProducerIds,
&dataConsumerIds,
recvRtpHeaderExtensions,
rtpListenerOffset,
this->maxSendMessageSize,
this->maxReceiveMessageSize,
sctpParameters,
this->sctpAssociation ? flatbuffers::Optional<FBS::SctpAssociation::SctpState>(sctpState)
: flatbuffers::nullopt,
sctpListener,
&traceEventTypes);
}
flatbuffers::Offset<FBS::Transport::Stats> Transport::FillBufferStats(
flatbuffers::FlatBufferBuilder& builder)
{
MS_TRACE();
auto nowMs = this->shared->GetTimeMs();
FBS::SctpAssociation::SctpState sctpState{ FBS::SctpAssociation::SctpState::NEW };
if (this->sctpAssociation)
{
switch (this->sctpAssociation->GetAssociationState())
{
case RTC::SCTP::Types::AssociationState::NEW:
{
sctpState = FBS::SctpAssociation::SctpState::NEW;
break;
}
case RTC::SCTP::Types::AssociationState::CONNECTING:
{
sctpState = FBS::SctpAssociation::SctpState::CONNECTING;
break;
}
case RTC::SCTP::Types::AssociationState::CONNECTED:
{
sctpState = FBS::SctpAssociation::SctpState::CONNECTED;
break;
}
case RTC::SCTP::Types::AssociationState::SHUTTING_DOWN:
case RTC::SCTP::Types::AssociationState::CLOSED:
{
sctpState = FBS::SctpAssociation::SctpState::CLOSED;
break;
}
}
}
return FBS::Transport::CreateStatsDirect(
builder,
this->id.c_str(),
nowMs,
this->sctpAssociation ? flatbuffers::Optional<FBS::SctpAssociation::SctpState>(sctpState)
: flatbuffers::nullopt,
this->recvTransmission.GetBytes(),
this->recvTransmission.GetRate(nowMs),
this->sendTransmission.GetBytes(),
this->sendTransmission.GetRate(nowMs),
this->recvRtpTransmission.GetBytes(),
this->recvRtpTransmission.GetBitrate(nowMs),
this->sendRtpTransmission.GetBytes(),
this->sendRtpTransmission.GetBitrate(nowMs),
this->recvRtxTransmission.GetBytes(),
this->recvRtxTransmission.GetBitrate(nowMs),
this->sendRtxTransmission.GetBytes(),
this->sendRtxTransmission.GetBitrate(nowMs),
this->sendProbationTransmission.GetBytes(),
this->sendProbationTransmission.GetBitrate(nowMs),
this->tccClient ? flatbuffers::Optional<uint32_t>(this->tccClient->GetAvailableBitrate())
: flatbuffers::nullopt,
this->tccServer ? flatbuffers::Optional<uint32_t>(this->tccServer->GetAvailableBitrate())
: flatbuffers::nullopt,
this->maxIncomingBitrate ? flatbuffers::Optional<uint32_t>(this->maxIncomingBitrate)
: flatbuffers::nullopt,
this->maxOutgoingBitrate ? flatbuffers::Optional<uint32_t>(this->maxOutgoingBitrate)
: flatbuffers::nullopt,
this->minOutgoingBitrate ? flatbuffers::Optional<uint32_t>(this->minOutgoingBitrate)
: flatbuffers::nullopt,
this->tccServer ? flatbuffers::Optional<double>(this->tccServer->GetPacketLoss())
: flatbuffers::nullopt,
this->tccClient ? flatbuffers::Optional<double>(this->tccClient->GetPacketLoss())
: flatbuffers::nullopt);
}
void Transport::HandleRequest(Channel::ChannelRequest* request)
{
MS_TRACE();
switch (request->method)
{
case Channel::ChannelRequest::Method::TRANSPORT_SET_MAX_INCOMING_BITRATE:
{
const auto* body = request->data->body_as<FBS::Transport::SetMaxIncomingBitrateRequest>();
this->maxIncomingBitrate = body->maxIncomingBitrate();
MS_DEBUG_TAG(bwe, "maximum incoming bitrate set to %" PRIu32, this->maxIncomingBitrate);
request->Accept();
if (this->tccServer)
{
this->tccServer->SetMaxIncomingBitrate(this->maxIncomingBitrate);
}
break;
}
case Channel::ChannelRequest::Method::TRANSPORT_SET_MAX_OUTGOING_BITRATE:
{
const auto* body = request->data->body_as<FBS::Transport::SetMaxOutgoingBitrateRequest>();
const uint32_t bitrate = body->maxOutgoingBitrate();
if (bitrate > 0u && bitrate < RTC::TransportCongestionControlMinOutgoingBitrate)
{
MS_THROW_TYPE_ERROR(
"bitrate must be >= %" PRIu32 " or 0 (unlimited)",
RTC::TransportCongestionControlMinOutgoingBitrate);
}
else if (bitrate > 0u && bitrate < this->minOutgoingBitrate)
{
MS_THROW_TYPE_ERROR(
"bitrate must be >= current min outgoing bitrate (%" PRIu32 ") or 0 (unlimited)",
this->minOutgoingBitrate);
}
if (this->tccClient)
{
this->tccClient->SetMaxOutgoingBitrate(bitrate);
this->maxOutgoingBitrate = bitrate;
MS_DEBUG_TAG(bwe, "maximum outgoing bitrate set to %" PRIu32, this->maxOutgoingBitrate);
ComputeOutgoingDesiredBitrate();
}
else
{
this->maxOutgoingBitrate = bitrate;
}
request->Accept();
break;
}
case Channel::ChannelRequest::Method::TRANSPORT_SET_MIN_OUTGOING_BITRATE:
{
const auto* body = request->data->body_as<FBS::Transport::SetMinOutgoingBitrateRequest>();
const uint32_t bitrate = body->minOutgoingBitrate();
if (bitrate > 0u && bitrate < RTC::TransportCongestionControlMinOutgoingBitrate)
{
MS_THROW_TYPE_ERROR(
"bitrate must be >= %" PRIu32 " or 0 (unlimited)",
RTC::TransportCongestionControlMinOutgoingBitrate);
}
else if (bitrate > 0u && this->maxOutgoingBitrate > 0 && bitrate > this->maxOutgoingBitrate)
{
MS_THROW_TYPE_ERROR(
"bitrate must be <= current max outgoing bitrate (%" PRIu32 ") or 0 (unlimited)",
this->maxOutgoingBitrate);
}
if (this->tccClient)
{
this->tccClient->SetMinOutgoingBitrate(bitrate);
this->minOutgoingBitrate = bitrate;
MS_DEBUG_TAG(bwe, "minimum outgoing bitrate set to %" PRIu32, this->minOutgoingBitrate);
ComputeOutgoingDesiredBitrate();
}
else
{
this->minOutgoingBitrate = bitrate;
}
request->Accept();
break;
}
case Channel::ChannelRequest::Method::TRANSPORT_PRODUCE:
{
const auto* body = request->data->body_as<FBS::Transport::ProduceRequest>();
auto producerId = body->producerId()->str();
if (this->mapProducers.find(producerId) != this->mapProducers.end())
{
MS_THROW_ERROR("a Producer with same producerId already exists");
}
auto* producer = new RTC::Producer(this->shared, producerId, this, body);
try
{
this->rtpListener.AddProducer(producer);
}
catch (const MediaSoupError& error)
{
delete producer;
throw;
}
try
{
this->listener->OnTransportNewProducer(this, producer);
}
catch (const MediaSoupError& error)
{
this->rtpListener.RemoveProducer(producer);
delete producer;
throw;
}
this->mapProducers[producerId] = producer;
MS_DEBUG_DEV("Producer created [producerId:%s]", producerId.c_str());
const auto& producerRtpHeaderExtensionIds = producer->GetRtpHeaderExtensionIds();
if (producerRtpHeaderExtensionIds.mid != 0u)
{
this->recvRtpHeaderExtensionIds.mid = producerRtpHeaderExtensionIds.mid;
}
if (producerRtpHeaderExtensionIds.rid != 0u)
{
this->recvRtpHeaderExtensionIds.rid = producerRtpHeaderExtensionIds.rid;
}
if (producerRtpHeaderExtensionIds.rrid != 0u)
{
this->recvRtpHeaderExtensionIds.rrid = producerRtpHeaderExtensionIds.rrid;
}
if (producerRtpHeaderExtensionIds.absSendTime != 0u)
{
this->recvRtpHeaderExtensionIds.absSendTime = producerRtpHeaderExtensionIds.absSendTime;
}
if (producerRtpHeaderExtensionIds.transportWideCc01 != 0u)
{
this->recvRtpHeaderExtensionIds.transportWideCc01 =
producerRtpHeaderExtensionIds.transportWideCc01;
}
if (producerRtpHeaderExtensionIds.dependencyDescriptor != 0u)
{
this->recvRtpHeaderExtensionIds.dependencyDescriptor =
producerRtpHeaderExtensionIds.dependencyDescriptor;
}
auto responseOffset = FBS::Transport::CreateProduceResponse(
request->GetBufferBuilder(), FBS::RtpParameters::Type(producer->GetType()));
request->Accept(FBS::Response::Body::Transport_ProduceResponse, responseOffset);
const auto& rtpHeaderExtensionIds = producer->GetRtpHeaderExtensionIds();
const auto& codecs = producer->GetRtpParameters().codecs;
if (!this->tccServer)
{
bool createTccServer{ false };
RTC::BweType bweType;
if (
rtpHeaderExtensionIds.transportWideCc01 != 0u &&
std::any_of(
codecs.begin(),
codecs.end(),
[](const RTC::RtpCodecParameters& codec)
{
return std::any_of(
codec.rtcpFeedback.begin(),
codec.rtcpFeedback.end(),
[](const RTC::RtcpFeedback& fb)
{
return fb.type == "transport-cc";
});
}))
{
MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlServer with transport-cc");
createTccServer = true;
bweType = RTC::BweType::TRANSPORT_CC;
}
else if (
rtpHeaderExtensionIds.absSendTime != 0u && std::any_of(
codecs.begin(),
codecs.end(),
[](const RTC::RtpCodecParameters& codec)
{
return std::any_of(
codec.rtcpFeedback.begin(),
codec.rtcpFeedback.end(),
[](const RTC::RtcpFeedback& fb)
{
return fb.type == "goog-remb";
});
}))
{
MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlServer with REMB");
createTccServer = true;
bweType = RTC::BweType::REMB;
}
if (createTccServer)
{
this->tccServer = std::make_shared<RTC::TransportCongestionControlServer>(
this, this->shared, bweType, RTC::Consts::RtcpPacketMaxSize);
if (this->maxIncomingBitrate != 0u)
{
this->tccServer->SetMaxIncomingBitrate(this->maxIncomingBitrate);
}
if (IsConnected())
{
this->tccServer->TransportConnected();
}
}
}
break;
}
case Channel::ChannelRequest::Method::TRANSPORT_CONSUME:
{
const auto* body = request->data->body_as<FBS::Transport::ConsumeRequest>();
const std::string producerId = body->producerId()->str();
const std::string consumerId = body->consumerId()->str();
if (this->mapConsumers.find(consumerId) != this->mapConsumers.end())
{
MS_THROW_ERROR("a Consumer with same consumerId already exists");
}
auto type = RTC::RtpParameters::Type(body->type());
RTC::Consumer* consumer{ nullptr };
switch (type)
{
case RTC::RtpParameters::Type::SIMPLE:
{
consumer = new RTC::SimpleConsumer(this->shared, consumerId, producerId, this, body);
break;
}
case RTC::RtpParameters::Type::SIMULCAST:
{
consumer = new RTC::SimulcastConsumer(this->shared, consumerId, producerId, this, body);
break;
}
case RTC::RtpParameters::Type::SVC:
{
consumer = new RTC::SvcConsumer(this->shared, consumerId, producerId, this, body);
break;
}
case RTC::RtpParameters::Type::PIPE:
{
consumer = new RTC::PipeConsumer(this->shared, consumerId, producerId, this, body);
break;
}
}
try
{
this->listener->OnTransportNewConsumer(this, consumer, producerId);
}
catch (const MediaSoupError& error)
{
delete consumer;
throw;
}
this->mapConsumers[consumerId] = consumer;
for (auto ssrc : consumer->GetMediaSsrcs())
{
this->mapSsrcConsumer[ssrc] = consumer;
}
for (auto ssrc : consumer->GetRtxSsrcs())
{
this->mapRtxSsrcConsumer[ssrc] = consumer;
}
MS_DEBUG_DEV(
"Consumer created [consumerId:%s, producerId:%s]", consumerId.c_str(), producerId.c_str());
flatbuffers::Offset<FBS::Consumer::ConsumerLayers> preferredLayersOffset;
auto preferredLayers = consumer->GetPreferredLayers();
if (preferredLayers.spatial > -1 && preferredLayers.temporal > -1)
{
const flatbuffers::Optional<int16_t> preferredTemporalLayer{ preferredLayers.temporal };
preferredLayersOffset = FBS::Consumer::CreateConsumerLayers(
request->GetBufferBuilder(), preferredLayers.spatial, preferredTemporalLayer);
}
auto scoreOffset = consumer->FillBufferScore(request->GetBufferBuilder());
auto responseOffset = FBS::Transport::CreateConsumeResponse(
request->GetBufferBuilder(),
consumer->IsPaused(),
consumer->IsProducerPaused(),
scoreOffset,
preferredLayersOffset);
request->Accept(FBS::Response::Body::Transport_ConsumeResponse, responseOffset);
const auto& rtpHeaderExtensionIds = consumer->GetRtpHeaderExtensionIds();
const auto& codecs = consumer->GetRtpParameters().codecs;
if (!this->tccClient)
{
bool createTccClient{ false };
RTC::BweType bweType;
if (
consumer->GetKind() == RTC::Media::Kind::VIDEO &&
rtpHeaderExtensionIds.transportWideCc01 != 0u &&
std::any_of(
codecs.begin(),
codecs.end(),
[](const RTC::RtpCodecParameters& codec)
{
return std::any_of(
codec.rtcpFeedback.begin(),
codec.rtcpFeedback.end(),
[](const RTC::RtcpFeedback& fb)
{
return fb.type == "transport-cc";
});
}))
{
MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlClient with transport-cc");
createTccClient = true;
bweType = RTC::BweType::TRANSPORT_CC;
}
else if (
consumer->GetKind() == RTC::Media::Kind::VIDEO &&
rtpHeaderExtensionIds.absSendTime != 0u &&
std::any_of(
codecs.begin(),
codecs.end(),
[](const RTC::RtpCodecParameters& codec)
{
return std::any_of(
codec.rtcpFeedback.begin(),
codec.rtcpFeedback.end(),
[](const RTC::RtcpFeedback& fb)
{
return fb.type == "goog-remb";
});
}))
{
MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlClient with REMB");
createTccClient = true;
bweType = RTC::BweType::REMB;
}
if (createTccClient)
{
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
consumer->SetExternallyManagedBitrate();
};
this->tccClient = std::make_shared<RTC::TransportCongestionControlClient>(
this,
this->shared,
bweType,
this->initialAvailableOutgoingBitrate,
this->maxOutgoingBitrate,
this->minOutgoingBitrate);
if (IsConnected())
{
this->tccClient->TransportConnected();
}
}
}
if (this->tccClient)
{
consumer->SetExternallyManagedBitrate();
}
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
if (
!this->senderBwe && consumer->GetKind() == RTC::Media::Kind::VIDEO &&
rtpHeaderExtensionIds.transportWideCc01 != 0u &&
std::any_of(
codecs.begin(),
codecs.end(),
[](const RTC::RtpCodecParameters& codec)
{
return std::any_of(
codec.rtcpFeedback.begin(),
codec.rtcpFeedback.end(),
[](const RTC::RtcpFeedback& fb)
{
return fb.type == "transport-cc";
});
}))
{
MS_DEBUG_TAG(bwe, "enabling SenderBandwidthEstimator");
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
consumer->SetExternallyManagedBitrate();
};
this->senderBwe = std::make_shared<RTC::SenderBandwidthEstimator>(
this, this->shared, this->initialAvailableOutgoingBitrate);
if (IsConnected())
{
this->senderBwe->TransportConnected();
}
}
if (this->senderBwe)
{
consumer->SetExternallyManagedBitrate();
}
#endif
if (IsConnected())
{
consumer->TransportConnected();
}
break;
}
case Channel::ChannelRequest::Method::TRANSPORT_PRODUCE_DATA:
{
if (!this->sctpAssociation && !this->direct)
{
MS_THROW_ERROR("SCTP not enabled and not a direct Transport");
}
const auto* body = request->data->body_as<FBS::Transport::ProduceDataRequest>();
auto dataProducerId = body->dataProducerId()->str();
CheckNoDataProducer(dataProducerId);
auto* dataProducer = new RTC::DataProducer(
this->shared, dataProducerId, this->maxReceiveMessageSize, this, body);
switch (dataProducer->GetType())
{
case RTC::DataProducer::Type::SCTP:
{
if (!this->sctpAssociation)
{
delete dataProducer;
MS_THROW_TYPE_ERROR(
"cannot create a DataProducer of type 'sctp', SCTP not enabled in this Transport");
;
}
break;
}
case RTC::DataProducer::Type::DIRECT:
{
if (!this->direct)
{
delete dataProducer;
MS_THROW_TYPE_ERROR(
"cannot create a DataProducer of type 'direct', not a direct Transport");
;
}
break;
}
}
if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP)
{
try
{
this->sctpListener.AddDataProducer(dataProducer);
}
catch (const MediaSoupError& error)
{
delete dataProducer;
throw;
}
}
try
{
this->listener->OnTransportNewDataProducer(this, dataProducer);
}
catch (const MediaSoupError& error)
{
if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP)
{
this->sctpListener.RemoveDataProducer(dataProducer);
}
delete dataProducer;
throw;
}
this->mapDataProducers[dataProducerId] = dataProducer;
MS_DEBUG_DEV("DataProducer created [dataProducerId:%s]", dataProducerId.c_str());
auto dumpOffset = dataProducer->FillBuffer(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::DataProducer_DumpResponse, dumpOffset);
if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP)
{
this->sctpAssociation->MayConnect();
}
break;
}
case Channel::ChannelRequest::Method::TRANSPORT_CONSUME_DATA:
{
if (!this->sctpAssociation && !this->direct)
{
MS_THROW_ERROR("SCTP not enabled and not a direct Transport");
}
const auto* body = request->data->body_as<FBS::Transport::ConsumeDataRequest>();
auto dataProducerId = body->dataProducerId()->str();
auto dataConsumerId = body->dataConsumerId()->str();
CheckNoDataConsumer(dataConsumerId);
auto* dataConsumer = new RTC::DataConsumer(
this->shared, dataConsumerId, dataProducerId, this, body, this->maxSendMessageSize);
switch (dataConsumer->GetType())
{
case RTC::DataConsumer::Type::SCTP:
{
if (!this->sctpAssociation)
{
delete dataConsumer;
MS_THROW_TYPE_ERROR(
"cannot create a DataConsumer of type 'sctp', SCTP not enabled in this Transport");
;
}
try
{
CheckNoSctpDataConsumer(dataConsumer->GetSctpStreamParameters().streamId);
}
catch (const MediaSoupError& error)
{
delete dataConsumer;
throw;
}
break;
}
case RTC::DataConsumer::Type::DIRECT:
{
if (!this->direct)
{
delete dataConsumer;
MS_THROW_TYPE_ERROR(
"cannot create a DataConsumer of type 'direct', not a direct Transport");
;
}
break;
}
}
try
{
this->listener->OnTransportNewDataConsumer(this, dataConsumer, dataProducerId);
}
catch (const MediaSoupError& error)
{
delete dataConsumer;
throw;
}
this->mapDataConsumers[dataConsumerId] = dataConsumer;
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
this->mapSctpStreamIdDataConsumers[dataConsumer->GetSctpStreamParameters().streamId] =
dataConsumer;
}
MS_DEBUG_DEV(
"DataConsumer created [dataConsumerId:%s, dataProducerId:%s]",
dataConsumerId.c_str(),
dataProducerId.c_str());
auto dumpOffset = dataConsumer->FillBuffer(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::DataConsumer_DumpResponse, dumpOffset);
if (IsConnected())
{
dataConsumer->TransportConnected();
}
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
if (this->sctpAssociation->GetAssociationState() == RTC::SCTP::Types::AssociationState::CONNECTED)
{
dataConsumer->SctpAssociationConnected();
}
this->sctpAssociation->MayConnect();
}
break;
}
case Channel::ChannelRequest::Method::TRANSPORT_ENABLE_TRACE_EVENT:
{
const auto* body = request->data->body_as<FBS::Transport::EnableTraceEventRequest>();
struct TraceEventTypes newTraceEventTypes;
for (const auto& type : *body->events())
{
switch (type)
{
case FBS::Transport::TraceEventType::PROBATION:
{
newTraceEventTypes.probation = true;
break;
}
case FBS::Transport::TraceEventType::BWE:
{
newTraceEventTypes.bwe = true;
break;
}
}
}
this->traceEventTypes = newTraceEventTypes;
request->Accept();
break;
}
case Channel::ChannelRequest::Method::TRANSPORT_CLOSE_PRODUCER:
{
const auto* body = request->data->body_as<FBS::Transport::CloseProducerRequest>();
RTC::Producer* producer = AssertAndGetProducerById(body->producerId()->str());
this->rtpListener.RemoveProducer(producer);
this->mapProducers.erase(producer->id);
for (const auto& kv : producer->GetRtpStreams())
{
auto* rtpStream = kv.first;
RecvStreamClosed(rtpStream->GetSsrc());
if (rtpStream->HasRtx())
{
RecvStreamClosed(rtpStream->GetRtxSsrc());
}
}
this->listener->OnTransportProducerClosed(this, producer);
MS_DEBUG_DEV("Producer closed [producerId:%s]", producer->id.c_str());
delete producer;
request->Accept();
break;
}
case Channel::ChannelRequest::Method::TRANSPORT_CLOSE_CONSUMER:
{
const auto* body = request->data->body_as<FBS::Transport::CloseConsumerRequest>();
RTC::Consumer* consumer = AssertAndGetConsumerById(body->consumerId()->str());
this->mapConsumers.erase(consumer->id);
for (auto ssrc : consumer->GetMediaSsrcs())
{
this->mapSsrcConsumer.erase(ssrc);
SendStreamClosed(ssrc);
}
for (auto ssrc : consumer->GetRtxSsrcs())
{
this->mapRtxSsrcConsumer.erase(ssrc);
SendStreamClosed(ssrc);
}
this->listener->OnTransportConsumerClosed(this, consumer);
MS_DEBUG_DEV("Consumer closed [consumerId:%s]", consumer->id.c_str());
delete consumer;
request->Accept();
if (this->tccClient)
{
ComputeOutgoingDesiredBitrate( true);
}
break;
}
case Channel::ChannelRequest::Method::TRANSPORT_CLOSE_DATAPRODUCER:
{
if (!this->sctpAssociation && !this->direct)
{
MS_THROW_ERROR("cannot close DataProducer, SCTP not enabled and not a direct Transport");
}
const auto* body = request->data->body_as<FBS::Transport::CloseDataProducerRequest>();
RTC::DataProducer* dataProducer = AssertAndGetDataProducerById(body->dataProducerId()->str());
if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP)
{
this->sctpListener.RemoveDataProducer(dataProducer);
}
this->mapDataProducers.erase(dataProducer->id);
if (this->sctpAssociation && this->sctpAssociation->IsDataChannel())
{
this->sctpAssociation->ResetStreams(
std::array<uint16_t, 1>{ dataProducer->GetSctpStreamParameters().streamId });
}
this->listener->OnTransportDataProducerClosed(this, dataProducer);
MS_DEBUG_DEV("DataProducer closed [dataProducerId:%s]", dataProducer->id.c_str());
delete dataProducer;
request->Accept();
break;
}
case Channel::ChannelRequest::Method::TRANSPORT_CLOSE_DATACONSUMER:
{
if (!this->sctpAssociation && !this->direct)
{
MS_THROW_ERROR("cannot close DataConsumer, SCTP not enabled and not a direct Transport");
}
const auto* body = request->data->body_as<FBS::Transport::CloseDataConsumerRequest>();
RTC::DataConsumer* dataConsumer = AssertAndGetDataConsumerById(body->dataConsumerId()->str());
this->mapDataConsumers.erase(dataConsumer->id);
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
this->mapSctpStreamIdDataConsumers.erase(dataConsumer->GetSctpStreamParameters().streamId);
}
if (this->sctpAssociation)
{
this->sctpAssociation->ResetStreams(
std::array<uint16_t, 1>{ dataConsumer->GetSctpStreamParameters().streamId });
}
this->listener->OnTransportDataConsumerClosed(this, dataConsumer);
MS_DEBUG_DEV("DataConsumer closed [dataConsumerId:%s]", dataConsumer->id.c_str());
delete dataConsumer;
request->Accept();
break;
}
default:
{
MS_THROW_ERROR("unknown method '%s'", request->methodCStr);
}
}
return;
switch (request->method)
{
default:
{
MS_ERROR("unknown method");
}
}
}
void Transport::HandleNotification(Channel::ChannelNotification* notification)
{
MS_TRACE();
switch (notification->event)
{
default:
{
MS_ERROR("unknown event '%s'", notification->eventCStr);
}
}
}
void Transport::SetDestroying()
{
MS_TRACE();
if (this->sctpAssociation)
{
this->sctpAssociation->Close();
}
this->isDestroying = true;
}
void Transport::Connected()
{
MS_TRACE();
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
consumer->TransportConnected();
}
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
dataConsumer->TransportConnected();
}
if (this->sctpAssociation)
{
this->sctpAssociation->MayConnect();
}
this->rtcpTimer->Start(static_cast<uint64_t>(RTC::RTCP::MaxVideoIntervalMs / 2));
if (this->tccClient)
{
this->tccClient->TransportConnected();
}
if (this->tccServer)
{
this->tccServer->TransportConnected();
}
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
if (this->senderBwe)
{
this->senderBwe->TransportConnected();
}
#endif
}
void Transport::Disconnected()
{
MS_TRACE();
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
consumer->TransportDisconnected();
}
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
dataConsumer->TransportDisconnected();
}
this->rtcpTimer->Stop();
if (this->tccClient)
{
this->tccClient->TransportDisconnected();
}
if (this->tccServer)
{
this->tccServer->TransportDisconnected();
}
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
if (this->senderBwe)
{
this->senderBwe->TransportDisconnected();
}
#endif
}
void Transport::ReceiveRtpPacket(RTC::RTP::Packet* packet)
{
MS_TRACE();
#ifdef MS_RTC_LOGGER_RTP
packet->logger.recvTransportId = this->id;
#endif
packet->AssignExtensionIds(this->recvRtpHeaderExtensionIds);
auto nowMs = this->shared->GetTimeMs();
if (this->tccServer)
{
this->tccServer->IncomingPacket(nowMs, packet);
}
RTC::Producer* producer = this->rtpListener.GetProducer(packet);
if (!producer)
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::PRODUCER_NOT_FOUND);
#endif
MS_WARN_TAG(
rtp,
"no suitable Producer for received RTP packet [ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]",
packet->GetSsrc(),
packet->GetPayloadType());
RecvStreamClosed(packet->GetSsrc());
delete packet;
return;
}
auto result = producer->ReceiveRtpPacket(packet);
switch (result)
{
case RTC::Producer::ReceiveRtpPacketResult::MEDIA:
{
this->recvRtpTransmission.Update(packet);
break;
}
case RTC::Producer::ReceiveRtpPacketResult::RETRANSMISSION:
{
this->recvRtxTransmission.Update(packet);
break;
}
case RTC::Producer::ReceiveRtpPacketResult::DISCARDED:
{
RecvStreamClosed(packet->GetSsrc());
break;
}
default:;
}
delete packet;
}
void Transport::ReceiveRtcpPacket(RTC::RTCP::Packet* packet)
{
MS_TRACE();
while (packet)
{
HandleRtcpPacket(packet);
auto* previousPacket = packet;
packet = packet->GetNext();
delete previousPacket;
}
}
void Transport::ReceiveSctpData(const uint8_t* data, size_t len)
{
MS_TRACE();
if (!this->sctpAssociation)
{
MS_DEBUG_TAG(sctp, "ignoring SCTP packet (SCTP not enabled)");
return;
}
this->sctpAssociation->ReceiveSctpData(data, len);
}
void Transport::SendSctpMessage(
RTC::DataConsumer* dataConsumer, RTC::SCTP::Message message, onQueuedCallback* cb)
{
MS_TRACE();
if (!this->sctpAssociation)
{
MS_THROW_ERROR("SCTP not enabled");
if (cb)
{
(*cb)(false, false);
delete cb;
}
return;
}
const auto& sctpStreamParameters = dataConsumer->GetSctpStreamParameters();
const RTC::SCTP::SendMessageOptions sendMessageOptions{
.unordered = !sctpStreamParameters.ordered,
.lifetimeMs = sctpStreamParameters.ordered
? std::nullopt
: std::optional<uint64_t>(sctpStreamParameters.maxPacketLifeTime),
.maxRetransmissions = sctpStreamParameters.ordered
? std::nullopt
: std::optional<uint64_t>(sctpStreamParameters.maxRetransmits),
};
const auto sendStatus =
this->sctpAssociation->SendMessage(std::move(message), sendMessageOptions);
switch (sendStatus)
{
case RTC::SCTP::Types::SendMessageStatus::SUCCESS:
{
if (cb)
{
(*cb)(true, false);
}
break;
}
case RTC::SCTP::Types::SendMessageStatus::ERROR_RESOURCE_EXHAUSTION:
{
const auto sendStatusStringView = RTC::SCTP::Types::SendMessageStatusToString(sendStatus);
MS_WARN_TAG(
sctp,
"failed to send SCTP message [sendStatus:%.*s]",
static_cast<int>(sendStatusStringView.size()),
sendStatusStringView.data());
if (cb)
{
(*cb)(false, true);
}
dataConsumer->SctpSendBufferFull();
break;
}
default:
{
const auto sendStatusStringView = RTC::SCTP::Types::SendMessageStatusToString(sendStatus);
MS_WARN_TAG(
sctp,
"failed to send SCTP message [sendStatus:%.*s]",
static_cast<int>(sendStatusStringView.size()),
sendStatusStringView.data());
if (cb)
{
(*cb)(false, false);
}
break;
}
}
delete cb;
}
void Transport::CheckNoDataProducer(const std::string& dataProducerId) const
{
if (this->mapDataProducers.find(dataProducerId) != this->mapDataProducers.end())
{
MS_THROW_ERROR("a DataProducer with same dataProducerId already exists");
}
}
void Transport::CheckNoDataConsumer(const std::string& dataConsumerId) const
{
MS_TRACE();
if (this->mapDataConsumers.find(dataConsumerId) != this->mapDataConsumers.end())
{
MS_THROW_ERROR("a DataConsumer with same dataConsumerId already exists");
}
}
void Transport::CheckNoSctpDataConsumer(uint16_t streamId) const
{
MS_TRACE();
if (this->mapSctpStreamIdDataConsumers.find(streamId) != this->mapSctpStreamIdDataConsumers.end())
{
MS_THROW_ERROR("an SCTP DataConsumer with same streamId %" PRIu16 " already exists", streamId);
}
}
RTC::Producer* Transport::AssertAndGetProducerById(const std::string& producerId) const
{
MS_TRACE();
auto it = this->mapProducers.find(producerId);
if (it == this->mapProducers.end())
{
MS_THROW_ERROR("Producer not found");
}
return it->second;
}
RTC::Consumer* Transport::AssertAndGetConsumerById(const std::string& consumerId) const
{
MS_TRACE();
auto it = this->mapConsumers.find(consumerId);
if (it == this->mapConsumers.end())
{
MS_THROW_ERROR("Consumer not found");
}
return it->second;
}
inline RTC::Consumer* Transport::GetConsumerByMediaSsrc(uint32_t ssrc) const
{
MS_TRACE();
auto mapSsrcConsumerIt = this->mapSsrcConsumer.find(ssrc);
if (mapSsrcConsumerIt == this->mapSsrcConsumer.end())
{
return nullptr;
}
auto* consumer = mapSsrcConsumerIt->second;
return consumer;
}
inline RTC::Consumer* Transport::GetConsumerByRtxSsrc(uint32_t ssrc) const
{
MS_TRACE();
auto mapRtxSsrcConsumerIt = this->mapRtxSsrcConsumer.find(ssrc);
if (mapRtxSsrcConsumerIt == this->mapRtxSsrcConsumer.end())
{
return nullptr;
}
auto* consumer = mapRtxSsrcConsumerIt->second;
return consumer;
}
RTC::DataProducer* Transport::AssertAndGetDataProducerById(const std::string& dataProducerId) const
{
MS_TRACE();
auto it = this->mapDataProducers.find(dataProducerId);
if (it == this->mapDataProducers.end())
{
MS_THROW_ERROR("DataProducer not found");
}
return it->second;
}
RTC::DataConsumer* Transport::AssertAndGetDataConsumerById(const std::string& dataConsumerId) const
{
MS_TRACE();
auto it = this->mapDataConsumers.find(dataConsumerId);
if (it == this->mapDataConsumers.end())
{
MS_THROW_ERROR("DataConsumer not found");
}
return it->second;
}
RTC::DataConsumer* Transport::GetSctpDataConsumerByStreamId(uint16_t streamId) const
{
MS_TRACE();
auto it = this->mapSctpStreamIdDataConsumers.find(streamId);
if (it == this->mapSctpStreamIdDataConsumers.end())
{
MS_THROW_ERROR("SCTP DataConsumer with streamId %" PRIu16 " not found", streamId);
}
return it->second;
}
void Transport::HandleRtcpPacket(RTC::RTCP::Packet* packet)
{
MS_TRACE();
switch (packet->GetType())
{
case RTC::RTCP::Type::RR:
{
auto* rr = static_cast<RTC::RTCP::ReceiverReportPacket*>(packet);
for (auto it = rr->Begin(); it != rr->End(); ++it)
{
auto& report = *it;
auto* consumer = GetConsumerByMediaSsrc(report->GetSsrc());
if (!consumer)
{
if (report->GetSsrc() == RTC::RTP::ProbationGenerator::Ssrc)
{
continue;
}
if (GetConsumerByRtxSsrc(report->GetSsrc()) != nullptr)
{
continue;
}
MS_DEBUG_TAG(
rtcp,
"no Consumer found for received Receiver Report [ssrc:%" PRIu32 "]",
report->GetSsrc());
continue;
}
consumer->ReceiveRtcpReceiverReport(report);
}
if (this->tccClient && !this->mapConsumers.empty())
{
float rtt = 0;
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
if (consumer->IsActive())
{
rtt = consumer->GetRtt();
break;
}
}
this->tccClient->ReceiveRtcpReceiverReport(rr, rtt, this->shared->GetTimeMsInt64());
}
break;
}
case RTC::RTCP::Type::PSFB:
{
auto* feedback = static_cast<RTC::RTCP::FeedbackPsPacket*>(packet);
switch (feedback->GetMessageType())
{
case RTC::RTCP::FeedbackPs::MessageType::PLI:
{
auto* consumer = GetConsumerByMediaSsrc(feedback->GetMediaSsrc());
if (feedback->GetMediaSsrc() == RTC::RTP::ProbationGenerator::Ssrc)
{
break;
}
else if (!consumer)
{
MS_DEBUG_TAG(
rtcp,
"no Consumer found for received PLI feedback packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
break;
}
MS_DEBUG_TAG(
rtcp,
"PLI received, requesting key frame for Consumer "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
consumer->ReceiveKeyFrameRequest(
RTC::RTCP::FeedbackPs::MessageType::PLI, feedback->GetMediaSsrc());
break;
}
case RTC::RTCP::FeedbackPs::MessageType::FIR:
{
auto* fir = static_cast<RTC::RTCP::FeedbackPsFirPacket*>(packet);
for (auto it = fir->Begin(); it != fir->End(); ++it)
{
auto& item = *it;
auto* consumer = GetConsumerByMediaSsrc(item->GetSsrc());
if (item->GetSsrc() == RTC::RTP::ProbationGenerator::Ssrc)
{
continue;
}
else if (!consumer)
{
MS_DEBUG_TAG(
rtcp,
"no Consumer found for received FIR feedback packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 ", item ssrc:%" PRIu32 "]",
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc(),
item->GetSsrc());
continue;
}
MS_DEBUG_TAG(
rtcp,
"FIR received, requesting key frame for Consumer "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 ", item ssrc:%" PRIu32 "]",
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc(),
item->GetSsrc());
consumer->ReceiveKeyFrameRequest(feedback->GetMessageType(), item->GetSsrc());
}
break;
}
case RTC::RTCP::FeedbackPs::MessageType::AFB:
{
auto* afb = static_cast<RTC::RTCP::FeedbackPsAfbPacket*>(feedback);
if (afb->GetApplication() == RTC::RTCP::FeedbackPsAfbPacket::Application::REMB)
{
auto* remb = static_cast<RTC::RTCP::FeedbackPsRembPacket*>(afb);
if (this->tccClient && this->tccClient->GetBweType() == RTC::BweType::REMB)
{
this->tccClient->ReceiveEstimatedBitrate(remb->GetBitrate());
}
break;
}
else
{
MS_DEBUG_TAG(
rtcp,
"ignoring unsupported %s feedback PS AFB packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
RTC::RTCP::FeedbackPsPacket::MessageTypeToString(feedback->GetMessageType()).c_str(),
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
break;
}
}
default:
{
MS_DEBUG_TAG(
rtcp,
"ignoring unsupported %s feedback packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
RTC::RTCP::FeedbackPsPacket::MessageTypeToString(feedback->GetMessageType()).c_str(),
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
}
}
break;
}
case RTC::RTCP::Type::RTPFB:
{
auto* feedback = static_cast<RTC::RTCP::FeedbackRtpPacket*>(packet);
auto* consumer = GetConsumerByMediaSsrc(feedback->GetMediaSsrc());
if (
!consumer && feedback->GetMessageType() != RTC::RTCP::FeedbackRtp::MessageType::TCC &&
(feedback->GetMediaSsrc() != RTC::RTP::ProbationGenerator::Ssrc ||
!GetConsumerByRtxSsrc(feedback->GetMediaSsrc())))
{
MS_DEBUG_TAG(
rtcp,
"no Consumer found for received feedback packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
break;
}
switch (feedback->GetMessageType())
{
case RTC::RTCP::FeedbackRtp::MessageType::NACK:
{
if (!consumer)
{
MS_DEBUG_TAG(
rtcp,
"no Consumer found for received NACK feedback packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
break;
}
auto* nackPacket = static_cast<RTC::RTCP::FeedbackRtpNackPacket*>(packet);
consumer->ReceiveNack(nackPacket);
break;
}
case RTC::RTCP::FeedbackRtp::MessageType::TCC:
{
auto* feedback = static_cast<RTC::RTCP::FeedbackRtpTransportPacket*>(packet);
if (this->tccClient)
{
this->tccClient->ReceiveRtcpTransportFeedback(feedback);
}
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
if (this->senderBwe)
{
this->senderBwe->ReceiveRtcpTransportFeedback(feedback);
}
#endif
break;
}
default:
{
MS_DEBUG_TAG(
rtcp,
"ignoring unsupported %s feedback packet "
"[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
RTC::RTCP::FeedbackRtpPacket::MessageTypeToString(feedback->GetMessageType()).c_str(),
feedback->GetSenderSsrc(),
feedback->GetMediaSsrc());
}
}
break;
}
case RTC::RTCP::Type::SR:
{
auto* sr = static_cast<RTC::RTCP::SenderReportPacket*>(packet);
for (auto it = sr->Begin(); it != sr->End(); ++it)
{
auto& report = *it;
auto* producer = this->rtpListener.GetProducer(report->GetSsrc());
if (!producer)
{
MS_DEBUG_TAG(
rtcp,
"no Producer found for received Sender Report [ssrc:%" PRIu32 "]",
report->GetSsrc());
continue;
}
producer->ReceiveRtcpSenderReport(report);
}
break;
}
case RTC::RTCP::Type::SDES:
{
break;
}
case RTC::RTCP::Type::BYE:
{
MS_DEBUG_TAG(rtcp, "ignoring received RTCP BYE");
break;
}
case RTC::RTCP::Type::XR:
{
auto* xr = static_cast<RTC::RTCP::ExtendedReportPacket*>(packet);
for (auto it = xr->Begin(); it != xr->End(); ++it)
{
auto& report = *it;
switch (report->GetType())
{
case RTC::RTCP::ExtendedReportBlock::Type::DLRR:
{
auto* dlrr = static_cast<RTC::RTCP::DelaySinceLastRr*>(report);
for (auto it2 = dlrr->Begin(); it2 != dlrr->End(); ++it2)
{
auto& ssrcInfo = *it2;
if (ssrcInfo->GetSsrc() == 0)
{
ssrcInfo->SetSsrc(xr->GetSsrc());
}
auto* producer = this->rtpListener.GetProducer(ssrcInfo->GetSsrc());
if (!producer)
{
MS_DEBUG_TAG(
rtcp,
"no Producer found for received Sender Extended Report [ssrc:%" PRIu32 "]",
ssrcInfo->GetSsrc());
continue;
}
producer->ReceiveRtcpXrDelaySinceLastRr(ssrcInfo);
}
break;
}
case RTC::RTCP::ExtendedReportBlock::Type::RRT:
{
auto* rrt = static_cast<RTC::RTCP::ReceiverReferenceTime*>(report);
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
consumer->ReceiveRtcpXrReceiverReferenceTime(rrt);
}
break;
}
default:;
}
}
break;
}
default:
{
MS_DEBUG_TAG(
rtcp,
"unhandled RTCP type received [type:%" PRIu8 "]",
static_cast<uint8_t>(packet->GetType()));
}
}
}
void Transport::SendRtcp(uint64_t nowMs)
{
MS_TRACE();
std::unique_ptr<RTC::RTCP::CompoundPacket> packet{ new RTC::RTCP::CompoundPacket() };
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
DepLibUring::SetActive();
}
#endif
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
auto rtcpAdded = consumer->GetRtcp(packet.get(), nowMs);
if (!rtcpAdded)
{
SendRtcpCompoundPacket(packet.get());
packet.reset(new RTC::RTCP::CompoundPacket());
consumer->GetRtcp(packet.get(), nowMs);
}
}
for (auto& kv : this->mapProducers)
{
auto* producer = kv.second;
auto rtcpAdded = producer->GetRtcp(packet.get(), nowMs);
if (!rtcpAdded)
{
SendRtcpCompoundPacket(packet.get());
packet.reset(new RTC::RTCP::CompoundPacket());
producer->GetRtcp(packet.get(), nowMs);
}
}
if (packet->GetReceiverReportCount() > 0u || packet->GetSenderReportCount() > 0u)
{
SendRtcpCompoundPacket(packet.get());
}
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
DepLibUring::Submit();
}
#endif
}
void Transport::DistributeAvailableOutgoingBitrate()
{
MS_TRACE();
MS_ASSERT(this->tccClient, "no TransportCongestionClient");
std::multimap<uint8_t, RTC::Consumer*> multimapPriorityConsumer;
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
auto priority = consumer->GetBitratePriority();
if (priority > 0u)
{
multimapPriorityConsumer.emplace(priority, consumer);
}
}
if (multimapPriorityConsumer.empty())
{
return;
}
bool baseAllocation = true;
uint32_t availableBitrate = this->tccClient->GetAvailableBitrate();
this->tccClient->RescheduleNextAvailableBitrateEvent();
MS_DEBUG_DEV("before layer-by-layer iterations [availableBitrate:%" PRIu32 "]", availableBitrate);
while (availableBitrate > 0u)
{
auto previousAvailableBitrate = availableBitrate;
for (auto it = multimapPriorityConsumer.rbegin(); it != multimapPriorityConsumer.rend(); ++it)
{
auto priority = it->first;
auto* consumer = it->second;
auto bweType = this->tccClient->GetBweType();
for (uint8_t i{ 1u }; i <= (baseAllocation ? 1u : priority); ++i)
{
uint32_t usedBitrate{ 0u };
const bool considerLoss = (bweType == RTC::BweType::REMB);
usedBitrate = consumer->IncreaseLayer(availableBitrate, considerLoss);
MS_ASSERT(usedBitrate <= availableBitrate, "Consumer used more layer bitrate than given");
availableBitrate -= usedBitrate;
if (usedBitrate == 0u)
{
break;
}
}
}
if (availableBitrate == previousAvailableBitrate)
{
break;
}
baseAllocation = false;
}
MS_DEBUG_DEV("after layer-by-layer iterations [availableBitrate:%" PRIu32 "]", availableBitrate);
for (auto it = multimapPriorityConsumer.rbegin(); it != multimapPriorityConsumer.rend(); ++it)
{
auto* consumer = it->second;
consumer->ApplyLayers();
}
}
void Transport::ComputeOutgoingDesiredBitrate(bool forceBitrate)
{
MS_TRACE();
MS_ASSERT(this->tccClient, "no TransportCongestionClient");
uint32_t totalDesiredBitrate{ 0u };
for (auto& kv : this->mapConsumers)
{
auto* consumer = kv.second;
auto desiredBitrate = consumer->GetDesiredBitrate();
totalDesiredBitrate += desiredBitrate;
}
MS_DEBUG_DEV("total desired bitrate: %" PRIu32, totalDesiredBitrate);
this->tccClient->SetDesiredBitrate(totalDesiredBitrate, forceBitrate);
}
inline void Transport::EmitTraceEventProbationType(RTC::RTP::Packet* ) const
{
MS_TRACE();
if (!this->traceEventTypes.probation)
{
return;
}
auto notification = FBS::Transport::CreateTraceNotification(
this->shared->GetChannelNotifier()->GetBufferBuilder(),
FBS::Transport::TraceEventType::PROBATION,
this->shared->GetTimeMs(),
FBS::Common::TraceDirection::DIRECTION_OUT);
this->shared->GetChannelNotifier()->Emit(
this->id,
FBS::Notification::Event::TRANSPORT_TRACE,
FBS::Notification::Body::Transport_TraceNotification,
notification);
}
inline void Transport::EmitTraceEventBweType(
RTC::TransportCongestionControlClient::Bitrates& bitrates) const
{
MS_TRACE();
if (!this->traceEventTypes.bwe)
{
return;
}
auto traceInfo = FBS::Transport::CreateBweTraceInfo(
this->shared->GetChannelNotifier()->GetBufferBuilder(),
this->tccClient->GetBweType() == RTC::BweType::TRANSPORT_CC
? FBS::Transport::BweType::TRANSPORT_CC
: FBS::Transport::BweType::REMB,
bitrates.desiredBitrate,
bitrates.effectiveDesiredBitrate,
bitrates.minBitrate,
bitrates.maxBitrate,
bitrates.startBitrate,
bitrates.maxPaddingBitrate,
bitrates.availableBitrate);
auto notification = FBS::Transport::CreateTraceNotification(
this->shared->GetChannelNotifier()->GetBufferBuilder(),
FBS::Transport::TraceEventType::BWE,
this->shared->GetTimeMs(),
FBS::Common::TraceDirection::DIRECTION_OUT,
FBS::Transport::TraceInfo::BweTraceInfo,
traceInfo.Union());
this->shared->GetChannelNotifier()->Emit(
this->id,
FBS::Notification::Event::TRANSPORT_TRACE,
FBS::Notification::Body::Transport_TraceNotification,
notification);
}
void Transport::OnProducerPaused(RTC::Producer* producer)
{
MS_TRACE();
this->listener->OnTransportProducerPaused(this, producer);
}
void Transport::OnProducerResumed(RTC::Producer* producer)
{
MS_TRACE();
this->listener->OnTransportProducerResumed(this, producer);
}
void Transport::OnProducerNewRtpStream(
RTC::Producer* producer, RTC::RTP::RtpStreamRecv* rtpStream, uint32_t mappedSsrc)
{
MS_TRACE();
this->listener->OnTransportProducerNewRtpStream(this, producer, rtpStream, mappedSsrc);
}
void Transport::OnProducerRtpStreamScore(
RTC::Producer* producer, RTC::RTP::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore)
{
MS_TRACE();
this->listener->OnTransportProducerRtpStreamScore(this, producer, rtpStream, score, previousScore);
}
void Transport::OnProducerRtcpSenderReport(
RTC::Producer* producer, RTC::RTP::RtpStreamRecv* rtpStream, bool first)
{
MS_TRACE();
this->listener->OnTransportProducerRtcpSenderReport(this, producer, rtpStream, first);
}
void Transport::OnProducerRtpPacketReceived(RTC::Producer* producer, RTC::RTP::Packet* packet)
{
MS_TRACE();
this->listener->OnTransportProducerRtpPacketReceived(this, producer, packet);
}
void Transport::OnProducerSendRtcpPacket(RTC::Producer* , RTC::RTCP::Packet* packet)
{
MS_TRACE();
SendRtcpPacket(packet);
}
void Transport::OnProducerNeedWorstRemoteFractionLost(
RTC::Producer* producer, uint32_t mappedSsrc, uint8_t& worstRemoteFractionLost)
{
MS_TRACE();
this->listener->OnTransportNeedWorstRemoteFractionLost(
this, producer, mappedSsrc, worstRemoteFractionLost);
}
void Transport::OnConsumerSendRtpPacket(RTC::Consumer* consumer, RTC::RTP::Packet* packet)
{
MS_TRACE();
#ifdef MS_RTC_LOGGER_RTP
packet->logger.sendTransportId = this->id;
packet->logger.Sent();
#endif
packet->UpdateAbsSendTime(this->shared->GetTimeMs());
if (
this->tccClient && this->tccClient->GetBweType() == RTC::BweType::TRANSPORT_CC &&
packet->UpdateTransportWideCc01(this->transportWideCcSeq + 1))
{
this->transportWideCcSeq++;
webrtc::RtpPacketSendInfo packetInfo;
packetInfo.ssrc = packet->GetSsrc();
packetInfo.transport_sequence_number = this->transportWideCcSeq;
packetInfo.has_rtp_sequence_number = true;
packetInfo.rtp_sequence_number = packet->GetSequenceNumber();
packetInfo.length = packet->GetLength();
packetInfo.pacing_info = this->tccClient->GetPacingInfo();
this->tccClient->InsertPacket(packetInfo);
const std::weak_ptr<RTC::TransportCongestionControlClient> tccClientWeakPtr(this->tccClient);
auto* shared = this->shared;
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
std::weak_ptr<RTC::SenderBandwidthEstimator> senderBweWeakPtr(this->senderBwe);
RTC::SenderBandwidthEstimator::SentInfo sentInfo;
sentInfo.wideSeq = this->transportWideCcSeq;
sentInfo.size = packet->GetLength();
sentInfo.sendingAtMs = this->shared->GetTimeMs();
const auto* cb = new onSendCallback(
[tccClientWeakPtr, shared, packetInfo, senderBweWeakPtr, sentInfo](bool sent) mutable
{
if (sent)
{
auto tccClient = tccClientWeakPtr.lock();
if (tccClient)
{
tccClient->PacketSent(packetInfo, shared->GetTimeMsInt64());
}
auto senderBwe = senderBweWeakPtr.lock();
if (senderBwe)
{
sentInfo.sentAtMs = shared->GetTimeMs();
senderBwe->RtpPacketSent(sentInfo);
}
}
});
SendRtpPacket(consumer, packet, cb);
#else
const auto* cb = new onSendCallback(
[tccClientWeakPtr, shared, packetInfo](bool sent)
{
if (sent)
{
auto tccClient = tccClientWeakPtr.lock();
if (tccClient)
{
tccClient->PacketSent(packetInfo, shared->GetTimeMsInt64());
}
}
});
SendRtpPacket(consumer, packet, cb);
#endif
}
else
{
SendRtpPacket(consumer, packet);
}
this->sendRtpTransmission.Update(packet);
}
void Transport::OnConsumerRetransmitRtpPacket(RTC::Consumer* consumer, RTC::RTP::Packet* packet)
{
MS_TRACE();
packet->UpdateAbsSendTime(this->shared->GetTimeMs());
if (
this->tccClient && this->tccClient->GetBweType() == RTC::BweType::TRANSPORT_CC &&
packet->UpdateTransportWideCc01(this->transportWideCcSeq + 1))
{
this->transportWideCcSeq++;
webrtc::RtpPacketSendInfo packetInfo;
packetInfo.ssrc = packet->GetSsrc();
packetInfo.transport_sequence_number = this->transportWideCcSeq;
packetInfo.has_rtp_sequence_number = true;
packetInfo.rtp_sequence_number = packet->GetSequenceNumber();
packetInfo.length = packet->GetLength();
packetInfo.pacing_info = this->tccClient->GetPacingInfo();
this->tccClient->InsertPacket(packetInfo);
const std::weak_ptr<RTC::TransportCongestionControlClient> tccClientWeakPtr(this->tccClient);
auto* shared = this->shared;
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
std::weak_ptr<RTC::SenderBandwidthEstimator> senderBweWeakPtr = this->senderBwe;
RTC::SenderBandwidthEstimator::SentInfo sentInfo;
sentInfo.wideSeq = this->transportWideCcSeq;
sentInfo.size = packet->GetLength();
sentInfo.sendingAtMs = this->shared->GetTimeMs();
const auto* cb = new onSendCallback(
[tccClientWeakPtr, shared, packetInfo, senderBweWeakPtr, sentInfo](bool sent) mutable
{
if (sent)
{
auto tccClient = tccClientWeakPtr.lock();
if (tccClient)
{
tccClient->PacketSent(packetInfo, shared->GetTimeMsInt64());
}
auto senderBwe = senderBweWeakPtr.lock();
if (senderBwe)
{
sentInfo.sentAtMs = shared->GetTimeMs();
senderBwe->RtpPacketSent(sentInfo);
}
}
});
SendRtpPacket(consumer, packet, cb);
#else
const auto* cb = new onSendCallback(
[tccClientWeakPtr, shared, packetInfo](bool sent)
{
if (sent)
{
auto tccClient = tccClientWeakPtr.lock();
if (tccClient)
{
tccClient->PacketSent(packetInfo, shared->GetTimeMsInt64());
}
}
});
SendRtpPacket(consumer, packet, cb);
#endif
}
else
{
SendRtpPacket(consumer, packet);
}
this->sendRtxTransmission.Update(packet);
}
void Transport::OnConsumerKeyFrameRequested(RTC::Consumer* consumer, uint32_t mappedSsrc)
{
MS_TRACE();
if (!IsConnected())
{
MS_WARN_TAG(rtcp, "ignoring key rame request (transport not connected)");
return;
}
this->listener->OnTransportConsumerKeyFrameRequested(this, consumer, mappedSsrc);
}
void Transport::OnConsumerNeedBitrateChange(RTC::Consumer* )
{
MS_TRACE();
MS_ASSERT(this->tccClient, "no TransportCongestionClient");
DistributeAvailableOutgoingBitrate();
ComputeOutgoingDesiredBitrate();
}
void Transport::OnConsumerNeedZeroBitrate(RTC::Consumer* )
{
MS_TRACE();
MS_ASSERT(this->tccClient, "no TransportCongestionClient");
DistributeAvailableOutgoingBitrate();
ComputeOutgoingDesiredBitrate( true);
}
void Transport::OnConsumerProducerClosed(RTC::Consumer* consumer)
{
MS_TRACE();
this->mapConsumers.erase(consumer->id);
for (auto ssrc : consumer->GetMediaSsrcs())
{
this->mapSsrcConsumer.erase(ssrc);
SendStreamClosed(ssrc);
}
for (auto ssrc : consumer->GetRtxSsrcs())
{
this->mapRtxSsrcConsumer.erase(ssrc);
SendStreamClosed(ssrc);
}
this->listener->OnTransportConsumerProducerClosed(this, consumer);
MS_DEBUG_DEV("Consumer closed [consumerId:%s]", consumer->id.c_str());
delete consumer;
if (this->tccClient)
{
ComputeOutgoingDesiredBitrate( true);
}
}
void Transport::OnDataProducerMessageReceived(
RTC::DataProducer* dataProducer,
RTC::SCTP::Message message,
std::vector<uint16_t>& subchannels,
std::optional<uint16_t> requiredSubchannel)
{
MS_TRACE();
this->listener->OnTransportDataProducerMessageReceived(
this, dataProducer, std::move(message), subchannels, requiredSubchannel);
}
void Transport::OnDataProducerPaused(RTC::DataProducer* dataProducer)
{
MS_TRACE();
this->listener->OnTransportDataProducerPaused(this, dataProducer);
}
void Transport::OnDataProducerResumed(RTC::DataProducer* dataProducer)
{
MS_TRACE();
this->listener->OnTransportDataProducerResumed(this, dataProducer);
}
void Transport::OnDataConsumerSendMessage(
RTC::DataConsumer* dataConsumer, RTC::SCTP::Message message, onQueuedCallback* cb)
{
MS_TRACE();
SendMessage(dataConsumer, std::move(message), cb);
}
void Transport::OnDataConsumerNeedBufferedAmount(
const RTC::DataConsumer* dataConsumer, uint32_t& bufferedAmount) const
{
MS_TRACE();
if (this->sctpAssociation)
{
bufferedAmount = static_cast<uint32_t>(this->sctpAssociation->GetStreamBufferedAmount(
dataConsumer->GetSctpStreamParameters().streamId));
}
else
{
bufferedAmount = 0;
}
}
void Transport::OnDataConsumerNeedBufferedAmountLowThreshold(
const RTC::DataConsumer* dataConsumer, uint32_t& bufferedAmountLowThreshold) const
{
if (this->sctpAssociation)
{
bufferedAmountLowThreshold =
static_cast<uint32_t>(this->sctpAssociation->GetStreamBufferedAmountLowThreshold(
dataConsumer->GetSctpStreamParameters().streamId));
}
else
{
bufferedAmountLowThreshold = 0;
}
}
void Transport::OnDataConsumerSetBufferedAmountLowThreshold(
const RTC::DataConsumer* dataConsumer, uint32_t bytes) const
{
MS_TRACE();
MS_ASSERT(
dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP, "DataConsumer must have type SCTP");
if (this->sctpAssociation)
{
this->sctpAssociation->SetStreamBufferedAmountLowThreshold(
dataConsumer->GetSctpStreamParameters().streamId, static_cast<size_t>(bytes));
}
}
void Transport::OnDataConsumerDataProducerClosed(RTC::DataConsumer* dataConsumer)
{
MS_TRACE();
this->mapDataConsumers.erase(dataConsumer->id);
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
this->mapSctpStreamIdDataConsumers.erase(dataConsumer->GetSctpStreamParameters().streamId);
}
if (this->sctpAssociation)
{
this->sctpAssociation->ResetStreams(
std::array<uint16_t, 1>{ dataConsumer->GetSctpStreamParameters().streamId });
}
this->listener->OnTransportDataConsumerDataProducerClosed(this, dataConsumer);
MS_DEBUG_DEV("DataConsumer closed [dataConsumerId:%s]", dataConsumer->id.c_str());
delete dataConsumer;
}
bool Transport::OnAssociationSendData(const uint8_t* data, size_t len)
{
MS_TRACE();
if (this->isDestroying)
{
MS_WARN_DEV("ignoring sending data because Transport is being destroying");
return false;
}
return SendData(data, len);
}
void Transport::OnAssociationConnecting()
{
MS_TRACE();
auto sctpStateChangeNotification = FBS::Transport::CreateSctpStateChangeNotification(
this->shared->GetChannelNotifier()->GetBufferBuilder(),
FBS::SctpAssociation::SctpState::CONNECTING);
this->shared->GetChannelNotifier()->Emit(
this->id,
FBS::Notification::Event::TRANSPORT_SCTP_STATE_CHANGE,
FBS::Notification::Body::Transport_SctpStateChangeNotification,
sctpStateChangeNotification);
}
void Transport::OnAssociationConnected()
{
MS_TRACE();
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
dataConsumer->SctpAssociationConnected();
}
}
auto sctpNegotiatedCapabilitiesOffset = FBS::SctpAssociation::CreateSctpNegotiatedCapabilities(
this->shared->GetChannelNotifier()->GetBufferBuilder(),
this->sctpAssociation->GetNegotiatedMaxOutboundStreams(),
this->sctpAssociation->GetNegotiatedMaxInboundStreams());
auto sctpNegotiatedCapabilitiesNotification =
FBS::Transport::CreateSctpNegotiatedCapabilitiesNotification(
this->shared->GetChannelNotifier()->GetBufferBuilder(), sctpNegotiatedCapabilitiesOffset);
this->shared->GetChannelNotifier()->Emit(
this->id,
FBS::Notification::Event::TRANSPORT_SCTP_NEGOTIATED_CAPABILITIES,
FBS::Notification::Body::Transport_SctpNegotiatedCapabilitiesNotification,
sctpNegotiatedCapabilitiesNotification);
auto sctpStateChangeNotification = FBS::Transport::CreateSctpStateChangeNotification(
this->shared->GetChannelNotifier()->GetBufferBuilder(),
FBS::SctpAssociation::SctpState::CONNECTED);
this->shared->GetChannelNotifier()->Emit(
this->id,
FBS::Notification::Event::TRANSPORT_SCTP_STATE_CHANGE,
FBS::Notification::Body::Transport_SctpStateChangeNotification,
sctpStateChangeNotification);
#if MS_LOG_DEV_LEVEL == 3
MS_DUMP("--- SCTP association connected:");
this->sctpAssociation->Dump();
#endif
}
void Transport::OnAssociationFailed(RTC::SCTP::Types::ErrorKind errorKind, std::string_view errorMessage)
{
MS_TRACE();
const auto errorKindStringView = RTC::SCTP::Types::ErrorKindToString(errorKind);
if (errorKind == RTC::SCTP::Types::ErrorKind::SUCCESS || errorKind == RTC::SCTP::Types::ErrorKind::PEER_REPORTED)
{
MS_DEBUG_TAG(
sctp,
"SCTP association failed [errorKind:%.*s, message:%.*s]",
static_cast<int>(errorKindStringView.size()),
errorKindStringView.data(),
static_cast<int>(errorMessage.size()),
errorMessage.data());
}
else
{
MS_WARN_TAG(
sctp,
"SCTP association failed [errorKind:%.*s, message:%.*s]",
static_cast<int>(errorKindStringView.size()),
errorKindStringView.data(),
static_cast<int>(errorMessage.size()),
errorMessage.data());
}
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
dataConsumer->SctpAssociationClosed();
}
}
auto sctpStateChangeNotification = FBS::Transport::CreateSctpStateChangeNotification(
this->shared->GetChannelNotifier()->GetBufferBuilder(),
FBS::SctpAssociation::SctpState::FAILED);
this->shared->GetChannelNotifier()->Emit(
this->id,
FBS::Notification::Event::TRANSPORT_SCTP_STATE_CHANGE,
FBS::Notification::Body::Transport_SctpStateChangeNotification,
sctpStateChangeNotification);
}
void Transport::OnAssociationClosed(RTC::SCTP::Types::ErrorKind errorKind, std::string_view errorMessage)
{
MS_TRACE();
const auto errorKindStringView = RTC::SCTP::Types::ErrorKindToString(errorKind);
if (errorKind == RTC::SCTP::Types::ErrorKind::SUCCESS || errorKind == RTC::SCTP::Types::ErrorKind::PEER_REPORTED)
{
MS_DEBUG_TAG(
sctp,
"SCTP association closed [errorKind:%.*s, message:%.*s]",
static_cast<int>(errorKindStringView.size()),
errorKindStringView.data(),
static_cast<int>(errorMessage.size()),
errorMessage.data());
}
else
{
MS_WARN_TAG(
sctp,
"SCTP association closed [errorKind:%.*s, message:%.*s]",
static_cast<int>(errorKindStringView.size()),
errorKindStringView.data(),
static_cast<int>(errorMessage.size()),
errorMessage.data());
}
for (auto& kv : this->mapDataConsumers)
{
auto* dataConsumer = kv.second;
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
dataConsumer->SctpAssociationClosed();
}
}
auto sctpStateChangeNotification = FBS::Transport::CreateSctpStateChangeNotification(
this->shared->GetChannelNotifier()->GetBufferBuilder(),
FBS::SctpAssociation::SctpState::CLOSED);
this->shared->GetChannelNotifier()->Emit(
this->id,
FBS::Notification::Event::TRANSPORT_SCTP_STATE_CHANGE,
FBS::Notification::Body::Transport_SctpStateChangeNotification,
sctpStateChangeNotification);
}
void Transport::OnAssociationRestarted()
{
MS_TRACE();
MS_DEBUG_TAG(sctp, "SCTP association restarted");
}
void Transport::OnAssociationError(RTC::SCTP::Types::ErrorKind errorKind, std::string_view errorMessage)
{
MS_TRACE();
const auto errorKindStringView = RTC::SCTP::Types::ErrorKindToString(errorKind);
MS_WARN_TAG(
sctp,
"SCTP association error [errorKind:%.*s, message:%.*s]",
static_cast<int>(errorKindStringView.size()),
errorKindStringView.data(),
static_cast<int>(errorMessage.size()),
errorMessage.data());
}
void Transport::OnAssociationMessageReceived(RTC::SCTP::Message message)
{
MS_TRACE();
RTC::DataProducer* dataProducer = this->sctpListener.GetDataProducer(message.GetStreamId());
if (!dataProducer)
{
MS_WARN_TAG(
sctp,
"no suitable DataProducer for received SCTP message [streamId:%" PRIu16 "]",
message.GetStreamId());
return;
}
try
{
static thread_local std::vector<uint16_t> emptySubchannels;
dataProducer->ReceiveMessage(
std::move(message), emptySubchannels, std::nullopt);
}
catch (std::exception& error)
{
MS_WARN_TAG(
sctp,
"DataProducer::ReceiveMessage() failed for received SCTP message [streamId:%" PRIu16 "]: %s",
message.GetStreamId(),
error.what());
}
}
void Transport::OnAssociationStreamsResetPerformed(std::span<const uint16_t> )
{
MS_TRACE();
MS_DEBUG_DEV("SCTP association streams reset performed");
}
void Transport::OnAssociationStreamsResetFailed(
std::span<const uint16_t> , std::string_view errorMessage)
{
MS_TRACE();
MS_WARN_TAG(
sctp,
"SCTP association streams reset failed [message:%.*s]",
static_cast<int>(errorMessage.size()),
errorMessage.data());
}
void Transport::OnAssociationInboundStreamsReset(std::span<const uint16_t> inboundStreamIds)
{
MS_TRACE();
if (this->sctpAssociation->IsDataChannel())
{
std::vector<RTC::DataConsumer*> dataConsumersToClose;
std::vector<uint16_t> streamsToReset;
for (const auto streamId : inboundStreamIds)
{
const auto it = this->mapSctpStreamIdDataConsumers.find(streamId);
if (it != this->mapSctpStreamIdDataConsumers.end())
{
auto* dataConsumer = it->second;
dataConsumersToClose.push_back(dataConsumer);
streamsToReset.push_back(streamId);
}
}
if (!dataConsumersToClose.empty())
{
this->sctpAssociation->ResetStreams(streamsToReset);
for (auto* dataConsumer : dataConsumersToClose)
{
this->mapDataConsumers.erase(dataConsumer->id);
if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
{
this->mapSctpStreamIdDataConsumers.erase(dataConsumer->GetSctpStreamParameters().streamId);
}
this->listener->OnTransportDataConsumerClosed(this, dataConsumer);
MS_DEBUG_DEV(
"SCTP DataConsumer closed via SCTP inbound stream reset [dataConsumerId:%s, streamId:%" PRIu16
"]",
dataConsumer->id.c_str(),
dataConsumer->GetSctpStreamParameters().streamId);
delete dataConsumer;
}
}
}
}
void Transport::OnAssociationStreamBufferedAmountLow(uint16_t streamId)
{
MS_TRACE();
const auto* dataConsumer = GetSctpDataConsumerByStreamId(streamId);
if (!dataConsumer)
{
return;
}
dataConsumer->SctpBufferedAmountLow(this->sctpAssociation->GetStreamBufferedAmount(streamId));
}
void Transport::OnAssociationTotalBufferedAmountLow()
{
MS_TRACE();
}
bool Transport::OnAssociationIsTransportReadyForSctp()
{
MS_TRACE();
return IsConnected() && (!this->mapDataProducers.empty() || !this->mapDataConsumers.empty());
}
void Transport::OnTransportCongestionControlClientBitrates(
RTC::TransportCongestionControlClient* ,
RTC::TransportCongestionControlClient::Bitrates& bitrates)
{
MS_TRACE();
MS_DEBUG_DEV("outgoing available bitrate:%" PRIu32, bitrates.availableBitrate);
DistributeAvailableOutgoingBitrate();
ComputeOutgoingDesiredBitrate();
EmitTraceEventBweType(bitrates);
}
void Transport::OnTransportCongestionControlClientSendRtpPacket(
RTC::TransportCongestionControlClient* ,
RTC::RTP::Packet* packet,
const webrtc::PacedPacketInfo& pacingInfo)
{
MS_TRACE();
packet->UpdateAbsSendTime(this->shared->GetTimeMs());
if (
this->tccClient->GetBweType() == RTC::BweType::TRANSPORT_CC &&
packet->UpdateTransportWideCc01(this->transportWideCcSeq + 1))
{
this->transportWideCcSeq++;
EmitTraceEventProbationType(packet);
webrtc::RtpPacketSendInfo packetInfo;
packetInfo.ssrc = packet->GetSsrc();
packetInfo.transport_sequence_number = this->transportWideCcSeq;
packetInfo.has_rtp_sequence_number = true;
packetInfo.rtp_sequence_number = packet->GetSequenceNumber();
packetInfo.length = packet->GetLength();
packetInfo.pacing_info = pacingInfo;
this->tccClient->InsertPacket(packetInfo);
const std::weak_ptr<RTC::TransportCongestionControlClient> tccClientWeakPtr(this->tccClient);
auto* shared = this->shared;
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
std::weak_ptr<RTC::SenderBandwidthEstimator> senderBweWeakPtr = this->senderBwe;
RTC::SenderBandwidthEstimator::SentInfo sentInfo;
sentInfo.wideSeq = this->transportWideCcSeq;
sentInfo.size = packet->GetLength();
sentInfo.isProbation = true;
sentInfo.sendingAtMs = this->shared->GetTimeMs();
const auto* cb = new onSendCallback(
[tccClientWeakPtr, shared, packetInfo, senderBweWeakPtr, sentInfo](bool sent) mutable
{
if (sent)
{
auto tccClient = tccClientWeakPtr.lock();
if (tccClient)
{
tccClient->PacketSent(packetInfo, shared->GetTimeMsInt64());
}
auto senderBwe = senderBweWeakPtr.lock();
if (senderBwe)
{
sentInfo.sentAtMs = shared->GetTimeMs();
senderBwe->RtpPacketSent(sentInfo);
}
}
});
SendRtpPacket(nullptr, packet, cb);
#else
const auto* cb = new onSendCallback(
[tccClientWeakPtr, shared, packetInfo](bool sent)
{
if (sent)
{
auto tccClient = tccClientWeakPtr.lock();
if (tccClient)
{
tccClient->PacketSent(packetInfo, shared->GetTimeMsInt64());
}
}
});
SendRtpPacket(nullptr, packet, cb);
#endif
}
else
{
EmitTraceEventProbationType(packet);
SendRtpPacket(nullptr, packet);
}
this->sendProbationTransmission.Update(packet);
MS_DEBUG_DEV(
"probation sent [seq:%" PRIu16 ", wideSeq:%" PRIu16 ", size:%zu, bitrate:%" PRIu32 "]",
packet->GetSequenceNumber(),
this->transportWideCcSeq,
packet->GetLength(),
this->sendProbationTransmission.GetBitrate(this->shared->GetTimeMs()));
}
void Transport::OnTransportCongestionControlServerSendRtcpPacket(
RTC::TransportCongestionControlServer* , RTC::RTCP::Packet* packet)
{
MS_TRACE();
packet->Serialize(RTC::RTCP::SerializationBuffer);
SendRtcpPacket(packet);
}
#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
void Transport::OnSenderBandwidthEstimatorAvailableBitrate(
RTC::SenderBandwidthEstimator* ,
uint32_t availableBitrate,
uint32_t previousAvailableBitrate)
{
MS_TRACE();
MS_DEBUG_DEV(
"outgoing available bitrate [now:%" PRIu32 ", before:%" PRIu32 "]",
availableBitrate,
previousAvailableBitrate);
}
#endif
void Transport::OnTimer(TimerHandleInterface* timer)
{
MS_TRACE();
if (timer == this->rtcpTimer)
{
auto interval = static_cast<uint64_t>(RTC::RTCP::MaxVideoIntervalMs);
const uint64_t nowMs = this->shared->GetTimeMs();
SendRtcp(nowMs);
interval *= static_cast<float>(Utils::Crypto::GetRandomUInt<uint16_t>(10, 15)) / 10;
this->rtcpTimer->Start(interval);
}
}
}