#include "FBS/consumer.h"
#define MS_CLASS "RTC::SimpleConsumer"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include "RTC/Codecs/Tools.hpp"
#include "RTC/SimpleConsumer.hpp"
#ifdef MS_RTC_LOGGER_RTP
#include "RTC/RtcLogger.hpp"
#endif
#include <limits>
namespace RTC
{
static constexpr size_t TargetLayerRetransmissionBufferSize{ 15u };
SimpleConsumer::SimpleConsumer(
RTC::Shared* shared,
const std::string& id,
const std::string& producerId,
RTC::Consumer::Listener* listener,
const FBS::Transport::ConsumeRequest* data)
: RTC::Consumer::Consumer(shared, id, producerId, listener, data, RTC::RtpParameters::Type::SIMPLE)
{
MS_TRACE();
if (this->consumableRtpEncodings.size() != 1u)
{
MS_THROW_TYPE_ERROR("invalid consumableRtpEncodings with size != 1");
}
auto& encoding = this->rtpParameters.encodings[0];
const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);
this->keyFrameSupported = RTC::Codecs::Tools::CanBeKeyFrame(mediaCodec->mimeType);
CreateRtpStream();
const uint16_t initialOutputSeq =
Utils::Crypto::GetRandomUInt(1000u, std::numeric_limits<uint16_t>::max() / 2);
this->rtpSeqManager.reset(new RTC::SeqManager<uint16_t>(initialOutputSeq));
if (
mediaCodec->mimeType.type == RTC::RtpCodecMimeType::Type::AUDIO &&
(mediaCodec->mimeType.subtype == RTC::RtpCodecMimeType::Subtype::OPUS ||
mediaCodec->mimeType.subtype == RTC::RtpCodecMimeType::Subtype::MULTIOPUS))
{
RTC::Codecs::EncodingContext::Params params;
this->encodingContext.reset(
RTC::Codecs::Tools::GetEncodingContext(mediaCodec->mimeType, params));
this->encodingContext->SetIgnoreDtx(data->ignoreDtx());
}
this->shared->channelMessageRegistrator->RegisterHandler(
this->id,
this,
nullptr);
}
SimpleConsumer::~SimpleConsumer()
{
MS_TRACE();
this->shared->channelMessageRegistrator->UnregisterHandler(this->id);
delete this->rtpStream;
this->targetLayerRetransmissionBuffer.clear();
}
flatbuffers::Offset<FBS::Consumer::DumpResponse> SimpleConsumer::FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const
{
MS_TRACE();
auto base = RTC::Consumer::FillBuffer(builder);
std::vector<flatbuffers::Offset<FBS::RtpStream::Dump>> rtpStreams;
rtpStreams.emplace_back(this->rtpStream->FillBuffer(builder));
auto dump = FBS::Consumer::CreateConsumerDumpDirect(builder, base, &rtpStreams);
return FBS::Consumer::CreateDumpResponse(builder, dump);
}
flatbuffers::Offset<FBS::Consumer::GetStatsResponse> SimpleConsumer::FillBufferStats(
flatbuffers::FlatBufferBuilder& builder)
{
MS_TRACE();
std::vector<flatbuffers::Offset<FBS::RtpStream::Stats>> rtpStreams;
rtpStreams.emplace_back(this->rtpStream->FillBufferStats(builder));
if (this->producerRtpStream)
{
rtpStreams.emplace_back(this->producerRtpStream->FillBufferStats(builder));
}
return FBS::Consumer::CreateGetStatsResponseDirect(builder, &rtpStreams);
}
flatbuffers::Offset<FBS::Consumer::ConsumerScore> SimpleConsumer::FillBufferScore(
flatbuffers::FlatBufferBuilder& builder) const
{
MS_TRACE();
MS_ASSERT(this->producerRtpStreamScores, "producerRtpStreamScores not set");
uint8_t producerScore{ 0 };
if (this->producerRtpStream)
{
producerScore = this->producerRtpStream->GetScore();
}
return FBS::Consumer::CreateConsumerScoreDirect(
builder, this->rtpStream->GetScore(), producerScore, this->producerRtpStreamScores);
}
void SimpleConsumer::HandleRequest(Channel::ChannelRequest* request)
{
MS_TRACE();
switch (request->method)
{
case Channel::ChannelRequest::Method::CONSUMER_DUMP:
{
auto dumpOffset = FillBuffer(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::Consumer_DumpResponse, dumpOffset);
break;
}
case Channel::ChannelRequest::Method::CONSUMER_REQUEST_KEY_FRAME:
{
if (IsActive())
{
RequestKeyFrame();
}
request->Accept();
break;
}
case Channel::ChannelRequest::Method::CONSUMER_SET_PREFERRED_LAYERS:
{
auto responseOffset =
FBS::Consumer::CreateSetPreferredLayersResponse(request->GetBufferBuilder());
request->Accept(FBS::Response::Body::Consumer_SetPreferredLayersResponse, responseOffset);
break;
}
default:
{
RTC::Consumer::HandleRequest(request);
}
}
}
void SimpleConsumer::ProducerRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t )
{
MS_TRACE();
this->producerRtpStream = rtpStream;
}
void SimpleConsumer::ProducerNewRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t )
{
MS_TRACE();
this->producerRtpStream = rtpStream;
EmitScore();
}
void SimpleConsumer::ProducerRtpStreamScore(
RTC::RtpStreamRecv* , uint8_t , uint8_t )
{
MS_TRACE();
EmitScore();
}
void SimpleConsumer::ProducerRtcpSenderReport(RTC::RtpStreamRecv* , bool )
{
MS_TRACE();
}
uint8_t SimpleConsumer::GetBitratePriority() const
{
MS_TRACE();
MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed");
if (this->kind != RTC::Media::Kind::VIDEO)
{
return 0u;
}
if (!IsActive())
{
return 0u;
}
return this->priority;
}
uint32_t SimpleConsumer::IncreaseLayer(uint32_t bitrate, bool )
{
MS_TRACE();
MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed");
MS_ASSERT(this->kind == RTC::Media::Kind::VIDEO, "should be video");
MS_ASSERT(IsActive(), "should be active");
if (this->managingBitrate)
{
return 0u;
}
this->managingBitrate = true;
auto nowMs = DepLibUV::GetTimeMs();
auto desiredBitrate = this->producerRtpStream->GetBitrate(nowMs);
if (desiredBitrate < bitrate)
{
return desiredBitrate;
}
else
{
return bitrate;
}
}
void SimpleConsumer::ApplyLayers()
{
MS_TRACE();
MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed");
MS_ASSERT(this->kind == RTC::Media::Kind::VIDEO, "should be video");
MS_ASSERT(IsActive(), "should be active");
this->managingBitrate = false;
}
uint32_t SimpleConsumer::GetDesiredBitrate() const
{
MS_TRACE();
MS_ASSERT(this->externallyManagedBitrate, "bitrate is not externally managed");
if (this->kind != RTC::Media::Kind::VIDEO)
{
return 0u;
}
if (!IsActive())
{
return 0u;
}
auto nowMs = DepLibUV::GetTimeMs();
auto desiredBitrate = this->producerRtpStream->GetBitrate(nowMs);
auto maxBitrate = this->rtpParameters.encodings[0].maxBitrate;
if (maxBitrate > desiredBitrate)
{
desiredBitrate = maxBitrate;
}
return desiredBitrate;
}
void SimpleConsumer::SendRtpPacket(RTC::RtpPacket* packet, RTC::SharedRtpPacket& sharedPacket)
{
MS_TRACE();
#ifdef MS_RTC_LOGGER_RTP
packet->logger.consumerId = this->id;
#endif
if (!IsActive())
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::CONSUMER_INACTIVE);
#endif
this->rtpSeqManager->Drop(packet->GetSequenceNumber());
return;
}
if (this->syncRequired && this->keyFrameSupported && !packet->IsKeyFrame())
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::NOT_A_KEYFRAME);
#endif
StorePacketInTargetLayerRetransmissionBuffer(packet, sharedPacket);
return;
}
auto payloadType = packet->GetPayloadType();
if (!this->supportedCodecPayloadTypes[payloadType])
{
MS_WARN_DEV("payload type not supported [payloadType:%" PRIu8 "]", payloadType);
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::UNSUPPORTED_PAYLOAD_TYPE);
#endif
this->rtpSeqManager->Drop(packet->GetSequenceNumber());
return;
}
if (packet->GetPayloadLength() == 0)
{
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::EMPTY_PAYLOAD);
#endif
this->rtpSeqManager->Drop(packet->GetSequenceNumber());
return;
}
bool marker;
if (this->encodingContext && !packet->ProcessPayload(this->encodingContext.get(), marker))
{
MS_DEBUG_DEV(
"discarding packet [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp());
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::DROPPED_BY_CODEC);
#endif
this->rtpSeqManager->Drop(packet->GetSequenceNumber());
return;
}
const bool isSyncPacket = this->syncRequired;
bool sendPacketsInTargetLayerRetransmissionBuffer{ false };
if (isSyncPacket)
{
if (packet->IsKeyFrame())
{
MS_DEBUG_TAG(
rtp,
"sync key frame received [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp());
sendPacketsInTargetLayerRetransmissionBuffer = true;
}
this->rtpSeqManager->Sync(packet->GetSequenceNumber() - 1);
this->syncRequired = false;
}
uint16_t seq;
this->rtpSeqManager->Input(packet->GetSequenceNumber(), seq);
auto origSsrc = packet->GetSsrc();
auto origSeq = packet->GetSequenceNumber();
packet->SetSsrc(this->rtpParameters.encodings[0].ssrc);
packet->SetSequenceNumber(seq);
#ifdef MS_RTC_LOGGER_RTP
packet->logger.sendRtpTimestamp = packet->GetTimestamp();
packet->logger.sendSeqNumber = seq;
#endif
if (isSyncPacket)
{
MS_DEBUG_TAG(
rtp,
"sending sync packet [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32
"] from original [seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp(),
origSeq);
}
const RTC::RtpStreamSend::ReceivePacketResult result =
this->rtpStream->ReceivePacket(packet, sharedPacket);
if (result != RTC::RtpStreamSend::ReceivePacketResult::DISCARDED)
{
this->listener->OnConsumerSendRtpPacket(this, packet);
EmitTraceEventRtpAndKeyFrameTypes(packet);
}
else
{
MS_WARN_TAG(
rtp,
"failed to send packet [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32
"] from original [seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp(),
origSeq);
#ifdef MS_RTC_LOGGER_RTP
packet->logger.Discarded(RtcLogger::RtpPacket::DiscardReason::SEND_RTP_STREAM_DISCARDED);
#endif
}
packet->SetSsrc(origSsrc);
packet->SetSequenceNumber(origSeq);
if (!sharedPacket.HasPacket() && result == RTC::RtpStreamSend::ReceivePacketResult::ACCEPTED_AND_STORED)
{
sharedPacket.Assign(packet);
}
if (sendPacketsInTargetLayerRetransmissionBuffer)
{
if (result != RTC::RtpStreamSend::ReceivePacketResult::DISCARDED)
{
for (auto& kv : this->targetLayerRetransmissionBuffer)
{
auto& bufferedSharedPacket = kv.second;
auto* bufferedPacket = bufferedSharedPacket.GetPacket();
if (bufferedPacket->GetSequenceNumber() > origSeq)
{
MS_DEBUG_DEV(
"sending packet buffered in the target layer retransmission buffer [ssrc:%" PRIu32
", seq:%" PRIu16 ", ts:%" PRIu32
"] after sending first packet of the key frame [ssrc:%" PRIu32 ", seq:%" PRIu16
", ts:%" PRIu32 "]",
bufferedPacket->GetSsrc(),
bufferedPacket->GetSequenceNumber(),
bufferedPacket->GetTimestamp(),
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp());
SendRtpPacket(bufferedPacket, bufferedSharedPacket);
if (this->targetLayerRetransmissionBuffer.size() == 0)
{
MS_DEBUG_DEV(
"target layer retransmission buffer emptied while iterating it, exiting the loop");
break;
}
}
}
}
this->targetLayerRetransmissionBuffer.clear();
}
}
bool SimpleConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs)
{
MS_TRACE();
if (static_cast<float>((nowMs - this->lastRtcpSentTime) * 1.15) < this->maxRtcpInterval)
{
return true;
}
auto* senderReport = this->rtpStream->GetRtcpSenderReport(nowMs);
if (!senderReport)
{
return true;
}
auto* sdesChunk = this->rtpStream->GetRtcpSdesChunk();
auto* delaySinceLastRrSsrcInfo = this->rtpStream->GetRtcpXrDelaySinceLastRrSsrcInfo(nowMs);
if (!packet->Add(senderReport, sdesChunk, delaySinceLastRrSsrcInfo))
{
return false;
}
this->lastRtcpSentTime = nowMs;
return true;
}
void SimpleConsumer::NeedWorstRemoteFractionLost(
uint32_t , uint8_t& worstRemoteFractionLost)
{
MS_TRACE();
if (!IsActive())
{
return;
}
auto fractionLost = this->rtpStream->GetFractionLost();
if (fractionLost > worstRemoteFractionLost)
{
worstRemoteFractionLost = fractionLost;
}
}
void SimpleConsumer::ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket)
{
MS_TRACE();
if (!IsActive())
{
return;
}
EmitTraceEventNackType();
this->rtpStream->ReceiveNack(nackPacket);
}
void SimpleConsumer::ReceiveKeyFrameRequest(
RTC::RTCP::FeedbackPs::MessageType messageType, uint32_t ssrc)
{
MS_TRACE();
switch (messageType)
{
case RTC::RTCP::FeedbackPs::MessageType::PLI:
{
EmitTraceEventPliType(ssrc);
break;
}
case RTC::RTCP::FeedbackPs::MessageType::FIR:
{
EmitTraceEventFirType(ssrc);
break;
}
default:;
}
this->rtpStream->ReceiveKeyFrameRequest(messageType);
if (IsActive())
{
RequestKeyFrame();
}
}
void SimpleConsumer::ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report)
{
MS_TRACE();
this->rtpStream->ReceiveRtcpReceiverReport(report);
}
void SimpleConsumer::ReceiveRtcpXrReceiverReferenceTime(RTC::RTCP::ReceiverReferenceTime* report)
{
MS_TRACE();
this->rtpStream->ReceiveRtcpXrReceiverReferenceTime(report);
}
uint32_t SimpleConsumer::GetTransmissionRate(uint64_t nowMs)
{
MS_TRACE();
if (!IsActive())
{
return 0u;
}
return this->rtpStream->GetBitrate(nowMs);
}
float SimpleConsumer::GetRtt() const
{
MS_TRACE();
return this->rtpStream->GetRtt();
}
void SimpleConsumer::UserOnTransportConnected()
{
MS_TRACE();
this->syncRequired = true;
if (IsActive())
{
RequestKeyFrame();
}
}
void SimpleConsumer::UserOnTransportDisconnected()
{
MS_TRACE();
this->rtpStream->Pause();
this->targetLayerRetransmissionBuffer.clear();
}
void SimpleConsumer::UserOnPaused()
{
MS_TRACE();
this->rtpStream->Pause();
this->targetLayerRetransmissionBuffer.clear();
if (this->externallyManagedBitrate && this->kind == RTC::Media::Kind::VIDEO)
{
this->listener->OnConsumerNeedZeroBitrate(this);
}
}
void SimpleConsumer::UserOnResumed()
{
MS_TRACE();
this->syncRequired = true;
if (IsActive())
{
RequestKeyFrame();
}
}
void SimpleConsumer::CreateRtpStream()
{
MS_TRACE();
auto& encoding = this->rtpParameters.encodings[0];
const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);
MS_DEBUG_TAG(
rtp, "[ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]", encoding.ssrc, mediaCodec->payloadType);
RTC::RtpStream::Params params;
params.ssrc = encoding.ssrc;
params.payloadType = mediaCodec->payloadType;
params.mimeType = mediaCodec->mimeType;
params.clockRate = mediaCodec->clockRate;
params.cname = this->rtpParameters.rtcp.cname;
if (mediaCodec->parameters.HasInteger("useinbandfec") && mediaCodec->parameters.GetInteger("useinbandfec") == 1)
{
MS_DEBUG_TAG(rtp, "in band FEC enabled");
params.useInBandFec = true;
}
if (mediaCodec->parameters.HasInteger("usedtx") && mediaCodec->parameters.GetInteger("usedtx") == 1)
{
MS_DEBUG_TAG(rtp, "DTX enabled");
params.useDtx = true;
}
if (encoding.dtx)
{
MS_DEBUG_TAG(rtp, "DTX enabled");
params.useDtx = true;
}
for (const auto& fb : mediaCodec->rtcpFeedback)
{
if (!params.useNack && fb.type == "nack" && fb.parameter.empty())
{
MS_DEBUG_2TAGS(rtp, rtcp, "NACK supported");
params.useNack = true;
}
else if (!params.usePli && fb.type == "nack" && fb.parameter == "pli")
{
MS_DEBUG_2TAGS(rtp, rtcp, "PLI supported");
params.usePli = true;
}
else if (!params.useFir && fb.type == "ccm" && fb.parameter == "fir")
{
MS_DEBUG_2TAGS(rtp, rtcp, "FIR supported");
params.useFir = true;
}
}
this->rtpStream = new RTC::RtpStreamSend(this, params, this->rtpParameters.mid);
this->rtpStreams.push_back(this->rtpStream);
if (IsPaused() || IsProducerPaused())
{
this->rtpStream->Pause();
}
const auto* rtxCodec = this->rtpParameters.GetRtxCodecForEncoding(encoding);
if (rtxCodec && encoding.hasRtx)
{
this->rtpStream->SetRtx(rtxCodec->payloadType, encoding.rtx.ssrc);
}
}
void SimpleConsumer::RequestKeyFrame()
{
MS_TRACE();
if (this->kind != RTC::Media::Kind::VIDEO)
{
return;
}
auto mappedSsrc = this->consumableRtpEncodings[0].ssrc;
this->listener->OnConsumerKeyFrameRequested(this, mappedSsrc);
}
void SimpleConsumer::StorePacketInTargetLayerRetransmissionBuffer(
RTC::RtpPacket* packet, RTC::SharedRtpPacket& sharedPacket)
{
MS_TRACE();
MS_DEBUG_DEV(
"storing packet in target layer retransmission buffer [ssrc:%" PRIu32 ", seq:%" PRIu16
", ts:%" PRIu32 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp());
if (!sharedPacket.HasPacket())
{
sharedPacket.Assign(packet);
}
else
{
sharedPacket.AssertSamePacket(packet);
}
this->targetLayerRetransmissionBuffer[packet->GetSequenceNumber()] = sharedPacket;
if (this->targetLayerRetransmissionBuffer.size() > TargetLayerRetransmissionBufferSize)
{
this->targetLayerRetransmissionBuffer.erase(this->targetLayerRetransmissionBuffer.begin());
}
}
void SimpleConsumer::EmitScore() const
{
MS_TRACE();
auto scoreOffset = FillBufferScore(this->shared->channelNotifier->GetBufferBuilder());
auto notificationOffset = FBS::Consumer::CreateScoreNotification(
this->shared->channelNotifier->GetBufferBuilder(), scoreOffset);
this->shared->channelNotifier->Emit(
this->id,
FBS::Notification::Event::CONSUMER_SCORE,
FBS::Notification::Body::Consumer_ScoreNotification,
notificationOffset);
}
void SimpleConsumer::OnRtpStreamScore(
RTC::RtpStream* , uint8_t , uint8_t )
{
MS_TRACE();
EmitScore();
}
void SimpleConsumer::OnRtpStreamRetransmitRtpPacket(
RTC::RtpStreamSend* , RTC::RtpPacket* packet)
{
MS_TRACE();
this->listener->OnConsumerRetransmitRtpPacket(this, packet);
EmitTraceEventRtpAndKeyFrameTypes(packet, this->rtpStream->HasRtx());
}
}