#define MS_CLASS "RTC::SimulcastConsumer"
#include "RTC/SimulcastConsumer.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include "RTC/Codecs/Tools.hpp"
#ifdef MS_RTC_LOGGER_RTP
#include "RTC/RtcLogger.hpp"
#endif
#include <limits>
namespace RTC
{
static constexpr uint64_t StreamMinActiveMs{ 2000u };
static constexpr uint64_t BweDowngradeConservativeMs{ 10000u };
static constexpr uint64_t BweDowngradeMinActiveMs{ 8000u };
static constexpr uint16_t MaxSequenceNumberGap{ 100u };
static constexpr size_t TargetLayerRetransmissionBufferSize{ 30u };
SimulcastConsumer::SimulcastConsumer(
RTC::Shared* shared,
const std::string& id,
const std::string& producerId,
RTC::Consumer::Listener* listener,
const FBS::Transport::ConsumeRequest* data)
: RTC::Consumer::Consumer(
shared, id, producerId, listener, data, RTC::RtpParameters::Type::SIMULCAST)
{
MS_TRACE();
auto& encoding = this->rtpParameters.encodings[0];
if (encoding.spatialLayers != this->consumableRtpEncodings.size())
{
MS_THROW_TYPE_ERROR("encoding.spatialLayers does not match number of consumableRtpEncodings");
}
for (size_t idx{ 0u }; idx < this->consumableRtpEncodings.size(); ++idx)
{
auto& encoding = this->consumableRtpEncodings[idx];
this->mapMappedSsrcSpatialLayer[encoding.ssrc] = static_cast<int16_t>(idx);
}
if (flatbuffers::IsFieldPresent(data, FBS::Transport::ConsumeRequest::VT_PREFERREDLAYERS))
{
const auto* preferredLayers = data->preferredLayers();
this->preferredSpatialLayer = preferredLayers->spatialLayer();
if (this->preferredSpatialLayer > encoding.spatialLayers - 1)
{
this->preferredSpatialLayer = static_cast<int16_t>(encoding.spatialLayers - 1);
}
if (preferredLayers->temporalLayer().has_value())
{
this->preferredTemporalLayer = preferredLayers->temporalLayer().value();
if (this->preferredTemporalLayer > encoding.temporalLayers - 1)
{
this->preferredTemporalLayer = static_cast<int16_t>(encoding.temporalLayers - 1);
}
}
else
{
this->preferredTemporalLayer = static_cast<int16_t>(encoding.temporalLayers - 1);
}
}
else
{
this->preferredSpatialLayer = static_cast<int16_t>(encoding.spatialLayers - 1);
this->preferredTemporalLayer = static_cast<int16_t>(encoding.temporalLayers - 1);
}
this->producerRtpStreams.insert(
this->producerRtpStreams.begin(), this->consumableRtpEncodings.size(), nullptr);
const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);
if (!RTC::Codecs::Tools::IsValidTypeForCodec(this->type, mediaCodec->mimeType))
{
MS_THROW_TYPE_ERROR(
"%s codec not supported for simulcast", mediaCodec->mimeType.ToString().c_str());
}
const uint16_t initialOutputSeq =
Utils::Crypto::GetRandomUInt(1000u, std::numeric_limits<uint16_t>::max() / 2);
this->rtpSeqManager.reset(new RTC::SeqManager<uint16_t>(initialOutputSeq));
RTC::Codecs::EncodingContext::Params params;
params.spatialLayers = encoding.spatialLayers;
params.temporalLayers = encoding.temporalLayers;
this->encodingContext.reset(RTC::Codecs::Tools::GetEncodingContext(mediaCodec->mimeType, params));
MS_ASSERT(this->encodingContext, "no encoding context for this codec");
CreateRtpStream();
this->shared->channelMessageRegistrator->RegisterHandler(
this->id,
this,
nullptr);
}
SimulcastConsumer::~SimulcastConsumer()
{
MS_TRACE();
this->shared->channelMessageRegistrator->UnregisterHandler(this->id);
delete this->rtpStream;
this->targetLayerRetransmissionBuffer.clear();
}
flatbuffers::Offset<FBS::Consumer::DumpResponse> SimulcastConsumer::FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const
{
MS_TRACE();
auto base = RTC::Consumer::FillBuffer(builder);
std::vector<flatbuffers::Offset<FBS::RtpStream::Dump>> rtpStreams;
rtpStreams.emplace_back(this->rtpStream->FillBuffer(builder));
auto dump = FBS::Consumer::CreateConsumerDumpDirect(
builder,
base,
&rtpStreams,
this->preferredSpatialLayer,
this->targetSpatialLayer,
this->currentSpatialLayer,
this->preferredTemporalLayer,
this->targetTemporalLayer,
this->encodingContext->GetCurrentTemporalLayer());
return FBS::Consumer::CreateDumpResponse(builder, dump);
}
flatbuffers::Offset<FBS::Consumer::GetStatsResponse> SimulcastConsumer::FillBufferStats(
flatbuffers::FlatBufferBuilder& builder)
{
MS_TRACE();
std::vector<flatbuffers::Offset<FBS::RtpStream::Stats>> rtpStreams;
rtpStreams.emplace_back(this->rtpStream->FillBufferStats(builder));
auto* producerCurrentRtpStream = GetProducerCurrentRtpStream();
if (producerCurrentRtpStream)
{
rtpStreams.emplace_back(producerCurrentRtpStream->FillBufferStats(builder));
}
return FBS::Consumer::CreateGetStatsResponseDirect(builder, &rtpStreams);
}
flatbuffers::Offset<FBS::Consumer::ConsumerScore> SimulcastConsumer::FillBufferScore(
flatbuffers::FlatBufferBuilder& builder) const
{
MS_TRACE();
MS_ASSERT(this->producerRtpStreamScores, "producerRtpStreamScores not set");
auto* producerCurrentRtpStream = GetProducerCurrentRtpStream();
uint8_t producerScore{ 0 };
if (producerCurrentRtpStream)
{
producerScore = producerCurrentRtpStream->GetScore();
}
else
{
producerScore = 0;
}
return FBS::Consumer::CreateConsumerScoreDirect(
builder, this->rtpStream->GetScore(), producerScore, this->producerRtpStreamScores);
}
void SimulcastConsumer::HandleRequest(Channel::ChannelRequest* request)
{
MS_TRACE();
switch (request->method)
{
case Channel::ChannelRequest::Method::CONSUMER_DUMP:
{
auto dumpOffset = FillBuffer(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::Consumer_DumpResponse, dumpOffset);
break;
}
case Channel::ChannelRequest::Method::CONSUMER_REQUEST_KEY_FRAME:
{
if (IsActive())
{
RequestKeyFrames();
}
request->Accept();
break;
}
case Channel::ChannelRequest::Method::CONSUMER_SET_PREFERRED_LAYERS:
{
auto previousPreferredSpatialLayer = this->preferredSpatialLayer;
auto previousPreferredTemporalLayer = this->preferredTemporalLayer;
const auto* body = request->data->body_as<FBS::Consumer::SetPreferredLayersRequest>();
const auto* preferredLayers = body->preferredLayers();
this->preferredSpatialLayer = preferredLayers->spatialLayer();
if (this->preferredSpatialLayer > this->rtpStream->GetSpatialLayers() - 1)
{
this->preferredSpatialLayer = static_cast<int16_t>(this->rtpStream->GetSpatialLayers() - 1);
}
if (preferredLayers->temporalLayer().has_value())
{
this->preferredTemporalLayer = preferredLayers->temporalLayer().value();
if (this->preferredTemporalLayer > this->rtpStream->GetTemporalLayers() - 1)
{
this->preferredTemporalLayer =
static_cast<int16_t>(this->rtpStream->GetTemporalLayers() - 1);
}
}
else
{
this->preferredTemporalLayer =
static_cast<int16_t>(this->rtpStream->GetTemporalLayers() - 1);
}
MS_DEBUG_DEV(
"preferred layers changed [spatial:%" PRIi16 ", temporal:%" PRIi16 ", consumerId:%s]",
this->preferredSpatialLayer,
this->preferredTemporalLayer,
this->id.c_str());
const flatbuffers::Optional<int16_t> preferredTemporalLayer{ this->preferredTemporalLayer };
auto preferredLayersOffset = FBS::Consumer::CreateConsumerLayers(
request->GetBufferBuilder(), this->preferredSpatialLayer, preferredTemporalLayer);
auto responseOffset = FBS::Consumer::CreateSetPreferredLayersResponse(
request->GetBufferBuilder(), preferredLayersOffset);
request->Accept(FBS::Response::Body::Consumer_SetPreferredLayersResponse, responseOffset);
if (
IsActive() &&
(
this->preferredSpatialLayer != previousPreferredSpatialLayer ||
this->preferredTemporalLayer != previousPreferredTemporalLayer
)
)
{
MayChangeLayers( true);
}
break;
}
default:
{
RTC::Consumer::HandleRequest(request);
}
}
}
void SimulcastConsumer::ProducerRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc)
{
MS_TRACE();
auto it = this->mapMappedSsrcSpatialLayer.find(mappedSsrc);
MS_ASSERT(it != this->mapMappedSsrcSpatialLayer.end(), "unknown mappedSsrc");
const int16_t spatialLayer = it->second;
this->producerRtpStreams[spatialLayer] = rtpStream;
}
void SimulcastConsumer::ProducerNewRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc)
{
MS_TRACE();
auto it = this->mapMappedSsrcSpatialLayer.find(mappedSsrc);
MS_ASSERT(it != this->mapMappedSsrcSpatialLayer.end(), "unknown mappedSsrc");
const int16_t spatialLayer = it->second;
this->producerRtpStreams[spatialLayer] = rtpStream;
EmitScore();
if (IsActive())
{
MayChangeLayers();
}
}
void SimulcastConsumer::ProducerRtpStreamScore(
RTC::RtpStreamRecv* , uint8_t score, uint8_t previousScore)
{
MS_TRACE();
EmitScore();
if (RTC::Consumer::IsActive())
{
if (!IsActive())
{
UpdateTargetLayers(-1, -1);
}
else if (
!this->externallyManagedBitrate ||
(score == 0u || previousScore == 0u)
)
{
MayChangeLayers();
}
}
}
void SimulcastConsumer::ProducerRtcpSenderReport(RTC::RtpStreamRecv* rtpStream, bool first)
{
MS_TRACE();
if (!first)
{
return;
}
MS_DEBUG_TAG(simulcast, "first SenderReport [ssrc:%" PRIu32 "]", rtpStream->GetSsrc());
auto* producerTsReferenceRtpStream = GetProducerTsReferenceRtpStream();
if (!producerTsReferenceRtpStream || !producerTsReferenceRtpStream->GetSenderReportNtpMs())
{
return;
}
if (IsActive())
{
MayChangeLayers();
}
}
uint8_t SimulcastConsumer::GetBitratePriority() const
{
MS_TRACE();
MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed");
if (!IsActive())
{
return 0u;
}
return this->priority;
}
uint32_t SimulcastConsumer::IncreaseLayer(uint32_t bitrate, bool considerLoss)
{
MS_TRACE();
MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed");
MS_ASSERT(IsActive(), "should be active");
if (
this->provisionalTargetSpatialLayer == this->preferredSpatialLayer &&
this->provisionalTargetTemporalLayer == this->preferredTemporalLayer
)
{
return 0u;
}
uint32_t virtualBitrate;
if (considerLoss)
{
auto lossPercentage = this->rtpStream->GetLossPercentage();
if (lossPercentage < 2)
{
virtualBitrate = 1.08 * bitrate;
}
else if (lossPercentage > 10)
{
virtualBitrate = (1 - 0.5 * (lossPercentage / 100)) * bitrate;
}
else
{
virtualBitrate = bitrate;
}
}
else
{
virtualBitrate = bitrate;
}
uint32_t requiredBitrate{ 0u };
int16_t spatialLayer{ 0 };
int16_t temporalLayer{ 0 };
auto nowMs = DepLibUV::GetTimeMs();
for (size_t sIdx{ 0u }; sIdx < this->producerRtpStreams.size(); ++sIdx)
{
spatialLayer = static_cast<int16_t>(sIdx);
if (nowMs - this->lastBweDowngradeAtMs < BweDowngradeConservativeMs)
{
if (this->provisionalTargetSpatialLayer > -1 && spatialLayer > this->currentSpatialLayer)
{
MS_DEBUG_DEV(
"avoid upgrading to spatial layer %" PRIi16 " due to recent BWE downgrade", spatialLayer);
goto done;
}
}
if (spatialLayer < this->provisionalTargetSpatialLayer)
{
continue;
}
else if (spatialLayer > this->preferredSpatialLayer)
{
MS_DEBUG_DEV(
"avoid upgrading to spatial layer %" PRIi16
" since it's higher than preferred spatial layer %" PRIi16,
spatialLayer,
this->preferredSpatialLayer);
goto done;
}
auto* producerRtpStream = this->producerRtpStreams.at(spatialLayer);
if (!producerRtpStream)
{
continue;
}
if (producerRtpStream->GetScore() == 0)
{
continue;
}
if (
spatialLayer != this->provisionalTargetSpatialLayer &&
this->provisionalTargetSpatialLayer != -1 &&
producerRtpStream->GetActiveMs() < StreamMinActiveMs
)
{
const auto* provisionalProducerRtpStream =
this->producerRtpStreams.at(this->provisionalTargetSpatialLayer);
if (provisionalProducerRtpStream->GetActiveMs() >= StreamMinActiveMs)
{
continue;
}
}
if (!CanSwitchToSpatialLayer(spatialLayer))
{
continue;
}
temporalLayer = 0;
for (; temporalLayer < producerRtpStream->GetTemporalLayers(); ++temporalLayer)
{
if (
spatialLayer == this->provisionalTargetSpatialLayer &&
temporalLayer <= this->provisionalTargetTemporalLayer
)
{
continue;
}
requiredBitrate = producerRtpStream->GetLayerBitrate(nowMs, 0, temporalLayer);
if (
requiredBitrate &&
temporalLayer == 0 &&
this->provisionalTargetSpatialLayer > -1 &&
spatialLayer > this->provisionalTargetSpatialLayer
)
{
auto* provisionalProducerRtpStream =
this->producerRtpStreams.at(this->provisionalTargetSpatialLayer);
auto provisionalRequiredBitrate =
provisionalProducerRtpStream->GetBitrate(nowMs, 0, this->provisionalTargetTemporalLayer);
if (requiredBitrate > provisionalRequiredBitrate)
{
requiredBitrate -= provisionalRequiredBitrate;
}
else
{
requiredBitrate = 1u; }
}
MS_DEBUG_DEV(
"testing layers %" PRIi16 ":%" PRIi16 " [virtual bitrate:%" PRIu32
", required bitrate:%" PRIu32 "]",
spatialLayer,
temporalLayer,
virtualBitrate,
requiredBitrate);
if (requiredBitrate)
{
goto done;
}
else
{
break;
}
}
if (spatialLayer >= this->preferredSpatialLayer)
{
break;
}
}
done:
if (!requiredBitrate)
{
return 0u;
}
if (requiredBitrate > virtualBitrate)
{
return 0u;
}
this->provisionalTargetSpatialLayer = spatialLayer;
this->provisionalTargetTemporalLayer = temporalLayer;
MS_DEBUG_DEV(
"setting provisional layers to %" PRIi16 ":%" PRIi16 " [virtual bitrate:%" PRIu32
", required bitrate:%" PRIu32 "]",
this->provisionalTargetSpatialLayer,
this->provisionalTargetTemporalLayer,
virtualBitrate,
requiredBitrate);
if (requiredBitrate <= bitrate)
{
return requiredBitrate;
}
else if (requiredBitrate <= virtualBitrate)
{
return bitrate;
}
else
{
return requiredBitrate; }
}
void SimulcastConsumer::ApplyLayers()
{
MS_TRACE();
MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed");
MS_ASSERT(IsActive(), "should be active");
auto provisionalTargetSpatialLayer = this->provisionalTargetSpatialLayer;
auto provisionalTargetTemporalLayer = this->provisionalTargetTemporalLayer;
this->provisionalTargetSpatialLayer = -1;
this->provisionalTargetTemporalLayer = -1;
if (
provisionalTargetSpatialLayer != this->targetSpatialLayer ||
provisionalTargetTemporalLayer != this->targetTemporalLayer
)
{
UpdateTargetLayers(provisionalTargetSpatialLayer, provisionalTargetTemporalLayer);
if (
this->rtpStream->GetActiveMs() > BweDowngradeMinActiveMs &&
this->targetSpatialLayer < this->currentSpatialLayer &&
this->currentSpatialLayer <= this->preferredSpatialLayer
)
{
MS_DEBUG_DEV(
"possible target spatial layer downgrade (from %" PRIi16 " to %" PRIi16
") due to BWE limitation",
this->currentSpatialLayer,
this->targetSpatialLayer);
this->lastBweDowngradeAtMs = DepLibUV::GetTimeMs();
}
}
}
uint32_t SimulcastConsumer::GetDesiredBitrate() const
{
MS_TRACE();
MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed");
if (!IsActive())
{
return 0u;
}
auto nowMs = DepLibUV::GetTimeMs();
uint32_t desiredBitrate{ 0u };
for (auto sIdx{ static_cast<int16_t>(this->producerRtpStreams.size() - 1) }; sIdx >= 0; --sIdx)
{
auto* producerRtpStream = this->producerRtpStreams.at(sIdx);
if (!producerRtpStream)
{
continue;
}
auto streamBitrate = producerRtpStream->GetBitrate(nowMs);
if (streamBitrate > desiredBitrate)
{
desiredBitrate = streamBitrate;
}
}
auto maxBitrate = this->rtpParameters.encodings[0].maxBitrate;
if (maxBitrate > desiredBitrate)
{
desiredBitrate = maxBitrate;
}
return desiredBitrate;
}
void SimulcastConsumer::SendRtpPacket(RTC::RtpPacket* packet, RTC::SharedRtpPacket& sharedPacket)
{
MS_TRACE();
#ifdef MS_RTC_LOGGER_RTP
packet->logger.consumerId = this->id;
#endif
auto spatialLayer = this->mapMappedSsrcSpatialLayer.at(packet->GetSsrc());
if (!IsActive())
{
if (spatialLayer == this->currentSpatialLayer)
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::CONSUMER_INACTIVE);
#endif
this->rtpSeqManager->Drop(packet->GetSequenceNumber());
}
return;
}
if (this->targetTemporalLayer == -1)
{
if (spatialLayer == this->currentSpatialLayer)
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::INVALID_TARGET_LAYER);
#endif
this->rtpSeqManager->Drop(packet->GetSequenceNumber());
}
return;
}
auto payloadType = packet->GetPayloadType();
if (!this->supportedCodecPayloadTypes[payloadType])
{
if (spatialLayer == this->currentSpatialLayer)
{
MS_WARN_DEV("payload type not supported [payloadType:%" PRIu8 "]", payloadType);
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::UNSUPPORTED_PAYLOAD_TYPE);
#endif
this->rtpSeqManager->Drop(packet->GetSequenceNumber());
}
return;
}
bool shouldSwitchCurrentSpatialLayer{ false };
if (this->currentSpatialLayer != this->targetSpatialLayer && spatialLayer == this->targetSpatialLayer)
{
if (!packet->IsKeyFrame())
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::NOT_A_KEYFRAME);
#endif
StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket);
return;
}
shouldSwitchCurrentSpatialLayer = true;
this->syncRequired = true;
this->spatialLayerToSync = spatialLayer;
}
else if (spatialLayer != this->currentSpatialLayer)
{
return;
}
if (this->syncRequired && !packet->IsKeyFrame())
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::NOT_A_KEYFRAME);
#endif
StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket);
return;
}
if (packet->GetPayloadLength() == 0)
{
if (spatialLayer == this->currentSpatialLayer)
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::EMPTY_PAYLOAD);
#endif
this->rtpSeqManager->Drop(packet->GetSequenceNumber());
}
return;
}
const bool isSyncPacket = this->syncRequired;
bool sendPacketsInTargetLayerRetransmissionBuffer{ false };
if (isSyncPacket && (this->spatialLayerToSync == -1 || spatialLayer == this->spatialLayerToSync))
{
if (packet->IsKeyFrame())
{
MS_DEBUG_TAG(
rtp,
"sync key frame received [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp());
sendPacketsInTargetLayerRetransmissionBuffer = true;
}
uint32_t tsOffset{ 0u };
if (spatialLayer == this->tsReferenceSpatialLayer)
{
tsOffset = 0u;
}
else
{
auto* producerTsReferenceRtpStream = GetProducerTsReferenceRtpStream();
auto* producerTargetRtpStream = GetProducerTargetRtpStream();
MS_ASSERT(
producerTsReferenceRtpStream->GetSenderReportNtpMs(),
"no Sender Report for TS reference RTP stream");
MS_ASSERT(
producerTargetRtpStream->GetSenderReportNtpMs(), "no Sender Report for current RTP stream");
auto ntpMs1 = producerTsReferenceRtpStream->GetSenderReportNtpMs();
auto ts1 = producerTsReferenceRtpStream->GetSenderReportTs();
auto ntpMs2 = producerTargetRtpStream->GetSenderReportNtpMs();
auto ts2 = producerTargetRtpStream->GetSenderReportTs();
int64_t diffMs;
if (ntpMs2 >= ntpMs1)
{
diffMs = ntpMs2 - ntpMs1;
}
else
{
diffMs = -1 * (ntpMs1 - ntpMs2);
}
const int64_t diffTs = diffMs * this->rtpStream->GetClockRate() / 1000;
const uint32_t newTs2 = ts2 - diffTs;
tsOffset = newTs2 - ts1;
}
if (
shouldSwitchCurrentSpatialLayer &&
(packet->GetTimestamp() - tsOffset <= this->rtpStream->GetMaxPacketTs())
)
{
static const uint32_t MaxExtraOffsetMs{ 75u };
static const uint8_t MsOffset{ 33u };
const int64_t maxTsExtraOffset = MaxExtraOffsetMs * this->rtpStream->GetClockRate() / 1000;
uint32_t tsExtraOffset = this->rtpStream->GetMaxPacketTs() - packet->GetTimestamp() +
tsOffset + MsOffset * this->rtpStream->GetClockRate() / 1000;
if (this->keyFrameForTsOffsetRequested)
{
if (tsExtraOffset > maxTsExtraOffset)
{
MS_WARN_TAG(
simulcast,
"giving up on proper stream switching after got a requested keyframe for which still too high RTP timestamp extra offset is needed (%" PRIu32
")",
tsExtraOffset);
tsExtraOffset = 1u;
}
}
else if (tsExtraOffset > maxTsExtraOffset)
{
MS_WARN_TAG(
simulcast,
"cannot switch stream due to too high RTP timestamp extra offset needed (%" PRIu32
"), requesting keyframe",
tsExtraOffset);
RequestKeyFrameForTargetSpatialLayer();
this->keyFrameForTsOffsetRequested = true;
this->syncRequired = false;
this->spatialLayerToSync = -1;
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(
RtcLogger::RtpPacket::DiscardReason::TOO_HIGH_TIMESTAMP_EXTRA_NEEDED);
#endif
return;
}
if (tsExtraOffset > 0u)
{
MS_DEBUG_TAG(
simulcast,
"RTP timestamp extra offset generated for stream switching: %" PRIu32,
tsExtraOffset);
tsOffset -= tsExtraOffset;
}
}
this->tsOffset = tsOffset;
this->rtpSeqManager->Sync(packet->GetSequenceNumber() - (this->lastSentPacketHasMarker ? 1 : 2));
this->encodingContext->SyncRequired();
this->syncRequired = false;
this->spatialLayerToSync = -1;
this->keyFrameForTsOffsetRequested = false;
}
if (!shouldSwitchCurrentSpatialLayer && this->checkingForOldPacketsInSpatialLayer)
{
if (SeqManager<uint16_t>::IsSeqLowerThan(
packet->GetSequenceNumber(), this->snReferenceSpatialLayer))
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(
RtcLogger::RtpPacket::DiscardReason::PACKET_PREVIOUS_TO_SPATIAL_LAYER_SWITCH);
#endif
this->rtpSeqManager->Drop(packet->GetSequenceNumber());
return;
}
else if (SeqManager<uint16_t>::IsSeqHigherThan(
packet->GetSequenceNumber(), this->snReferenceSpatialLayer + MaxSequenceNumberGap))
{
this->checkingForOldPacketsInSpatialLayer = false;
}
}
bool marker{ false };
if (shouldSwitchCurrentSpatialLayer)
{
this->currentSpatialLayer = this->targetSpatialLayer;
this->snReferenceSpatialLayer = packet->GetSequenceNumber();
this->checkingForOldPacketsInSpatialLayer = true;
this->encodingContext->SetTargetTemporalLayer(this->targetTemporalLayer);
this->encodingContext->SetCurrentTemporalLayer(packet->GetTemporalLayer());
this->rtpStream->ResetScore(10u, false);
EmitLayersChange();
EmitScore();
packet->ProcessPayload(this->encodingContext.get(), marker);
}
else
{
auto previousTemporalLayer = this->encodingContext->GetCurrentTemporalLayer();
if (!packet->ProcessPayload(this->encodingContext.get(), marker))
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::DROPPED_BY_CODEC);
#endif
this->rtpSeqManager->Drop(packet->GetSequenceNumber());
return;
}
if (previousTemporalLayer != this->encodingContext->GetCurrentTemporalLayer())
{
EmitLayersChange();
}
}
uint16_t seq;
const uint32_t timestamp = packet->GetTimestamp() - this->tsOffset;
this->rtpSeqManager->Input(packet->GetSequenceNumber(), seq);
auto origSsrc = packet->GetSsrc();
auto origSeq = packet->GetSequenceNumber();
auto origTimestamp = packet->GetTimestamp();
packet->SetSsrc(this->rtpParameters.encodings[0].ssrc);
packet->SetSequenceNumber(seq);
packet->SetTimestamp(timestamp);
#ifdef MS_RTC_LOGGER_RTP
packet->logger.sendRtpTimestamp = timestamp;
packet->logger.sendSeqNumber = seq;
#endif
if (isSyncPacket)
{
MS_DEBUG_TAG(
rtp,
"sending sync packet [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32
"] from original [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp(),
origSsrc,
origSeq,
origTimestamp);
}
const RTC::RtpStreamSend::ReceivePacketResult result =
this->rtpStream->ReceivePacket(packet, sharedPacket);
if (result != RTC::RtpStreamSend::ReceivePacketResult::DISCARDED)
{
if (this->rtpSeqManager->GetMaxOutput() == packet->GetSequenceNumber())
{
this->lastSentPacketHasMarker = packet->HasMarker();
}
this->listener->OnConsumerSendRtpPacket(this, packet);
EmitTraceEventRtpAndKeyFrameTypes(packet);
}
else
{
MS_WARN_TAG(
rtp,
"failed to send packet [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32
"] from original [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp(),
origSsrc,
origSeq,
origTimestamp);
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::SEND_RTP_STREAM_DISCARDED);
#endif
}
packet->SetSsrc(origSsrc);
packet->SetSequenceNumber(origSeq);
packet->SetTimestamp(origTimestamp);
packet->RestorePayload();
if (!sharedPacket.HasPacket() && result == RTC::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED)
{
sharedPacket.Assign(packet);
}
if (sendPacketsInTargetLayerRetransmissionBuffer)
{
if (result != RTC::RtpStreamSend::ReceivePacketResult::DISCARDED)
{
for (auto& kv : this->targetLayerRetransmissionBuffer)
{
auto& bufferedSharedPacket = kv.second;
auto* bufferedPacket = bufferedSharedPacket.GetPacket();
if (bufferedPacket->GetSequenceNumber() > origSeq)
{
MS_DEBUG_DEV(
"sending packet buffered in the target layer retransmission buffer [ssrc:%" PRIu32
", seq:%" PRIu16 ", ts:%" PRIu32
"] after sending first packet of the key frame [ssrc:%" PRIu32 ", seq:%" PRIu16
", ts:%" PRIu32 "]",
bufferedPacket->GetSsrc(),
bufferedPacket->GetSequenceNumber(),
bufferedPacket->GetTimestamp(),
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp());
SendRtpPacket(bufferedPacket, bufferedSharedPacket);
if (this->targetLayerRetransmissionBuffer.size() == 0)
{
MS_DEBUG_DEV(
"target layer retransmission buffer emptied while iterating it, exiting the loop");
break;
}
}
}
}
this->targetLayerRetransmissionBuffer.clear();
}
}
bool SimulcastConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs)
{
MS_TRACE();
if (static_cast<float>((nowMs - this->lastRtcpSentTime) * 1.15) < this->maxRtcpInterval)
{
return true;
}
auto* senderReport = this->rtpStream->GetRtcpSenderReport(nowMs);
if (!senderReport)
{
return true;
}
auto* sdesChunk = this->rtpStream->GetRtcpSdesChunk();
auto* delaySinceLastRrSsrcInfo = this->rtpStream->GetRtcpXrDelaySinceLastRrSsrcInfo(nowMs);
if (!packet->Add(senderReport, sdesChunk, delaySinceLastRrSsrcInfo))
{
return false;
}
this->lastRtcpSentTime = nowMs;
return true;
}
void SimulcastConsumer::NeedWorstRemoteFractionLost(
uint32_t , uint8_t& worstRemoteFractionLost)
{
MS_TRACE();
if (!IsActive())
{
return;
}
auto fractionLost = this->rtpStream->GetFractionLost();
if (fractionLost > worstRemoteFractionLost)
{
worstRemoteFractionLost = fractionLost;
}
}
void SimulcastConsumer::ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket)
{
MS_TRACE();
if (!IsActive())
{
return;
}
EmitTraceEventNackType();
this->rtpStream->ReceiveNack(nackPacket);
}
void SimulcastConsumer::ReceiveKeyFrameRequest(
RTC::RTCP::FeedbackPs::MessageType messageType, uint32_t ssrc)
{
MS_TRACE();
switch (messageType)
{
case RTC::RTCP::FeedbackPs::MessageType::PLI:
{
EmitTraceEventPliType(ssrc);
break;
}
case RTC::RTCP::FeedbackPs::MessageType::FIR:
{
EmitTraceEventFirType(ssrc);
break;
}
default:;
}
this->rtpStream->ReceiveKeyFrameRequest(messageType);
if (IsActive())
{
RequestKeyFrameForCurrentSpatialLayer();
}
}
void SimulcastConsumer::ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report)
{
MS_TRACE();
this->rtpStream->ReceiveRtcpReceiverReport(report);
}
void SimulcastConsumer::ReceiveRtcpXrReceiverReferenceTime(RTC::RTCP::ReceiverReferenceTime* report)
{
MS_TRACE();
this->rtpStream->ReceiveRtcpXrReceiverReferenceTime(report);
}
uint32_t SimulcastConsumer::GetTransmissionRate(uint64_t nowMs)
{
MS_TRACE();
if (!IsActive())
{
return 0u;
}
return this->rtpStream->GetBitrate(nowMs);
}
float SimulcastConsumer::GetRtt() const
{
MS_TRACE();
return this->rtpStream->GetRtt();
}
void SimulcastConsumer::UserOnTransportConnected()
{
MS_TRACE();
this->syncRequired = true;
this->spatialLayerToSync = -1;
this->keyFrameForTsOffsetRequested = false;
if (IsActive())
{
MayChangeLayers();
}
}
void SimulcastConsumer::UserOnTransportDisconnected()
{
MS_TRACE();
this->lastBweDowngradeAtMs = 0u;
this->rtpStream->Pause();
this->targetLayerRetransmissionBuffer.clear();
UpdateTargetLayers(-1, -1);
}
void SimulcastConsumer::UserOnPaused()
{
MS_TRACE();
this->lastBweDowngradeAtMs = 0u;
this->rtpStream->Pause();
this->targetLayerRetransmissionBuffer.clear();
UpdateTargetLayers(-1, -1);
if (this->externallyManagedBitrate)
{
this->listener->OnConsumerNeedZeroBitrate(this);
}
}
void SimulcastConsumer::UserOnResumed()
{
MS_TRACE();
this->syncRequired = true;
this->spatialLayerToSync = -1;
this->keyFrameForTsOffsetRequested = false;
this->checkingForOldPacketsInSpatialLayer = false;
if (IsActive())
{
MayChangeLayers();
}
}
void SimulcastConsumer::CreateRtpStream()
{
MS_TRACE();
auto& encoding = this->rtpParameters.encodings[0];
const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);
MS_DEBUG_TAG(
rtp, "[ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]", encoding.ssrc, mediaCodec->payloadType);
RTC::RtpStream::Params params;
params.ssrc = encoding.ssrc;
params.payloadType = mediaCodec->payloadType;
params.mimeType = mediaCodec->mimeType;
params.clockRate = mediaCodec->clockRate;
params.cname = this->rtpParameters.rtcp.cname;
params.spatialLayers = encoding.spatialLayers;
params.temporalLayers = encoding.temporalLayers;
if (mediaCodec->parameters.HasInteger("useinbandfec") && mediaCodec->parameters.GetInteger("useinbandfec") == 1)
{
MS_DEBUG_TAG(rtp, "in band FEC enabled");
params.useInBandFec = true;
}
if (mediaCodec->parameters.HasInteger("usedtx") && mediaCodec->parameters.GetInteger("usedtx") == 1)
{
MS_DEBUG_TAG(rtp, "DTX enabled");
params.useDtx = true;
}
if (encoding.dtx)
{
MS_DEBUG_TAG(rtp, "DTX enabled");
params.useDtx = true;
}
for (const auto& fb : mediaCodec->rtcpFeedback)
{
if (!params.useNack && fb.type == "nack" && fb.parameter.empty())
{
MS_DEBUG_2TAGS(rtp, rtcp, "NACK supported");
params.useNack = true;
}
else if (!params.usePli && fb.type == "nack" && fb.parameter == "pli")
{
MS_DEBUG_2TAGS(rtp, rtcp, "PLI supported");
params.usePli = true;
}
else if (!params.useFir && fb.type == "ccm" && fb.parameter == "fir")
{
MS_DEBUG_2TAGS(rtp, rtcp, "FIR supported");
params.useFir = true;
}
}
this->rtpStream = new RTC::RtpStreamSend(this, params, this->rtpParameters.mid);
this->rtpStreams.push_back(this->rtpStream);
if (IsPaused() || IsProducerPaused())
{
this->rtpStream->Pause();
}
const auto* rtxCodec = this->rtpParameters.GetRtxCodecForEncoding(encoding);
if (rtxCodec && encoding.hasRtx)
{
this->rtpStream->SetRtx(rtxCodec->payloadType, encoding.rtx.ssrc);
}
}
void SimulcastConsumer::RequestKeyFrames()
{
MS_TRACE();
if (this->kind != RTC::Media::Kind::VIDEO)
{
return;
}
auto* producerTargetRtpStream = GetProducerTargetRtpStream();
auto* producerCurrentRtpStream = GetProducerCurrentRtpStream();
if (producerTargetRtpStream)
{
auto mappedSsrc = this->consumableRtpEncodings[this->targetSpatialLayer].ssrc;
this->listener->OnConsumerKeyFrameRequested(this, mappedSsrc);
}
if (producerCurrentRtpStream && producerCurrentRtpStream != producerTargetRtpStream)
{
auto mappedSsrc = this->consumableRtpEncodings[this->currentSpatialLayer].ssrc;
this->listener->OnConsumerKeyFrameRequested(this, mappedSsrc);
}
}
void SimulcastConsumer::RequestKeyFrameForTargetSpatialLayer()
{
MS_TRACE();
if (this->kind != RTC::Media::Kind::VIDEO)
{
return;
}
auto* producerTargetRtpStream = GetProducerTargetRtpStream();
if (!producerTargetRtpStream)
{
return;
}
auto mappedSsrc = this->consumableRtpEncodings[this->targetSpatialLayer].ssrc;
this->listener->OnConsumerKeyFrameRequested(this, mappedSsrc);
}
void SimulcastConsumer::RequestKeyFrameForCurrentSpatialLayer()
{
MS_TRACE();
if (this->kind != RTC::Media::Kind::VIDEO)
{
return;
}
auto* producerCurrentRtpStream = GetProducerCurrentRtpStream();
if (!producerCurrentRtpStream)
{
return;
}
auto mappedSsrc = this->consumableRtpEncodings[this->currentSpatialLayer].ssrc;
this->listener->OnConsumerKeyFrameRequested(this, mappedSsrc);
}
void SimulcastConsumer::MayChangeLayers(bool force)
{
MS_TRACE();
int16_t newTargetSpatialLayer;
int16_t newTargetTemporalLayer;
if (RecalculateTargetLayers(newTargetSpatialLayer, newTargetTemporalLayer))
{
if (this->externallyManagedBitrate)
{
if (newTargetSpatialLayer != this->targetSpatialLayer || force)
{
this->listener->OnConsumerNeedBitrateChange(this);
}
}
else
{
UpdateTargetLayers(newTargetSpatialLayer, newTargetTemporalLayer);
}
}
}
bool SimulcastConsumer::RecalculateTargetLayers(
int16_t& newTargetSpatialLayer, int16_t& newTargetTemporalLayer) const
{
MS_TRACE();
newTargetSpatialLayer = -1;
newTargetTemporalLayer = -1;
auto nowMs = DepLibUV::GetTimeMs();
for (size_t sIdx{ 0u }; sIdx < this->producerRtpStreams.size(); ++sIdx)
{
auto spatialLayer = static_cast<int16_t>(sIdx);
auto* producerRtpStream = this->producerRtpStreams.at(sIdx);
auto producerScore = producerRtpStream ? producerRtpStream->GetScore() : 0u;
if (nowMs - this->lastBweDowngradeAtMs < BweDowngradeConservativeMs)
{
if (newTargetSpatialLayer > -1 && spatialLayer > this->currentSpatialLayer)
{
continue;
}
}
if (producerScore == 0u)
{
continue;
}
if (
this->externallyManagedBitrate &&
newTargetSpatialLayer != -1 &&
producerRtpStream->GetActiveMs() < StreamMinActiveMs
)
{
continue;
}
if (!CanSwitchToSpatialLayer(spatialLayer))
{
continue;
}
newTargetSpatialLayer = spatialLayer;
if (spatialLayer >= this->preferredSpatialLayer)
{
break;
}
}
if (newTargetSpatialLayer != -1)
{
if (newTargetSpatialLayer == this->preferredSpatialLayer)
{
newTargetTemporalLayer = this->preferredTemporalLayer;
}
else if (newTargetSpatialLayer < this->preferredSpatialLayer)
{
newTargetTemporalLayer = static_cast<int16_t>(this->rtpStream->GetTemporalLayers() - 1);
}
else
{
newTargetTemporalLayer = 0;
}
}
return (
newTargetSpatialLayer != this->targetSpatialLayer ||
newTargetTemporalLayer != this->targetTemporalLayer
);
}
void SimulcastConsumer::UpdateTargetLayers(int16_t newTargetSpatialLayer, int16_t newTargetTemporalLayer)
{
MS_TRACE();
if (
newTargetSpatialLayer != -1 && (this->tsReferenceSpatialLayer == -1 ||
!GetProducerTsReferenceRtpStream()->GetSenderReportNtpMs()))
{
MS_DEBUG_TAG(
simulcast, "using spatial layer %" PRIi16 " as RTP timestamp reference", newTargetSpatialLayer);
this->tsReferenceSpatialLayer = newTargetSpatialLayer;
}
if (newTargetSpatialLayer != this->targetSpatialLayer)
{
this->targetLayerRetransmissionBuffer.clear();
}
if (newTargetSpatialLayer == -1)
{
this->targetSpatialLayer = -1;
this->targetTemporalLayer = -1;
this->currentSpatialLayer = -1;
this->encodingContext->SetTargetTemporalLayer(-1);
this->encodingContext->SetCurrentTemporalLayer(-1);
MS_DEBUG_TAG(
simulcast, "target layers changed [spatial:-1, temporal:-1, consumerId:%s]", this->id.c_str());
EmitLayersChange();
return;
}
this->targetSpatialLayer = newTargetSpatialLayer;
this->targetTemporalLayer = newTargetTemporalLayer;
if (this->targetSpatialLayer == this->currentSpatialLayer)
{
this->encodingContext->SetTargetTemporalLayer(this->targetTemporalLayer);
}
MS_DEBUG_TAG(
simulcast,
"target layers changed [spatial:%" PRIi16 ", temporal:%" PRIi16 ", consumerId:%s]",
this->targetSpatialLayer,
this->targetTemporalLayer,
this->id.c_str());
if (this->targetSpatialLayer != this->currentSpatialLayer)
{
RequestKeyFrameForTargetSpatialLayer();
}
}
bool SimulcastConsumer::CanSwitchToSpatialLayer(int16_t spatialLayer) const
{
MS_TRACE();
MS_ASSERT(
this->producerRtpStreams.at(spatialLayer),
"no Producer RtpStream for the given spatialLayer:%" PRIi16,
spatialLayer);
return (
this->tsReferenceSpatialLayer == -1 ||
spatialLayer == this->tsReferenceSpatialLayer ||
this->producerRtpStreams.at(spatialLayer)->GetSenderReportNtpMs()
);
}
void SimulcastConsumer::StorePacketInTargetLayerRetransmissionBuffer(
RTC::RtpPacket* packet, RTC::SharedRtpPacket& sharedPacket)
{
MS_TRACE();
MS_DEBUG_DEV(
"storing packet in target layer retransmission buffer [ssrc:%" PRIu32 ", seq:%" PRIu16
", ts:%" PRIu32 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp());
if (!sharedPacket.HasPacket())
{
sharedPacket.Assign(packet);
}
else
{
sharedPacket.AssertSamePacket(packet);
}
this->targetLayerRetransmissionBuffer[packet->GetSequenceNumber()] = sharedPacket;
if (this->targetLayerRetransmissionBuffer.size() > TargetLayerRetransmissionBufferSize)
{
this->targetLayerRetransmissionBuffer.erase(this->targetLayerRetransmissionBuffer.begin());
}
}
void SimulcastConsumer::EmitScore() const
{
MS_TRACE();
auto scoreOffset = FillBufferScore(this->shared->channelNotifier->GetBufferBuilder());
auto notificationOffset = FBS::Consumer::CreateScoreNotification(
this->shared->channelNotifier->GetBufferBuilder(), scoreOffset);
this->shared->channelNotifier->Emit(
this->id,
FBS::Notification::Event::CONSUMER_SCORE,
FBS::Notification::Body::Consumer_ScoreNotification,
notificationOffset);
}
void SimulcastConsumer::EmitLayersChange() const
{
MS_TRACE();
MS_DEBUG_DEV(
"current layers changed to [spatial:%" PRIi16 ", temporal:%" PRIi16 ", consumerId:%s]",
this->currentSpatialLayer,
this->encodingContext->GetCurrentTemporalLayer(),
this->id.c_str());
flatbuffers::Offset<FBS::Consumer::ConsumerLayers> layersOffset;
if (this->currentSpatialLayer >= 0)
{
layersOffset = FBS::Consumer::CreateConsumerLayers(
this->shared->channelNotifier->GetBufferBuilder(),
this->currentSpatialLayer,
this->encodingContext->GetCurrentTemporalLayer());
}
auto notificationOffset = FBS::Consumer::CreateLayersChangeNotification(
this->shared->channelNotifier->GetBufferBuilder(), layersOffset);
this->shared->channelNotifier->Emit(
this->id,
FBS::Notification::Event::CONSUMER_LAYERS_CHANGE,
FBS::Notification::Body::Consumer_LayersChangeNotification,
notificationOffset);
}
RTC::RtpStreamRecv* SimulcastConsumer::GetProducerCurrentRtpStream() const
{
MS_TRACE();
if (this->currentSpatialLayer == -1)
{
return nullptr;
}
return this->producerRtpStreams.at(this->currentSpatialLayer);
}
RTC::RtpStreamRecv* SimulcastConsumer::GetProducerTargetRtpStream() const
{
MS_TRACE();
if (this->targetSpatialLayer == -1)
{
return nullptr;
}
return this->producerRtpStreams.at(this->targetSpatialLayer);
}
RTC::RtpStreamRecv* SimulcastConsumer::GetProducerTsReferenceRtpStream() const
{
MS_TRACE();
if (this->tsReferenceSpatialLayer == -1)
{
return nullptr;
}
return this->producerRtpStreams.at(this->tsReferenceSpatialLayer);
}
void SimulcastConsumer::OnRtpStreamScore(
RTC::RtpStream* , uint8_t , uint8_t )
{
MS_TRACE();
EmitScore();
if (IsActive())
{
if (!this->externallyManagedBitrate)
{
MayChangeLayers();
}
}
}
void SimulcastConsumer::OnRtpStreamRetransmitRtpPacket(
RTC::RtpStreamSend* , RTC::RtpPacket* packet)
{
MS_TRACE();
this->listener->OnConsumerRetransmitRtpPacket(this, packet);
EmitTraceEventRtpAndKeyFrameTypes(packet, this->rtpStream->HasRtx());
}
}