#define MS_CLASS "RTC::PipeConsumer"
#include "RTC/PipeConsumer.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "RTC/Codecs/Tools.hpp"
namespace RTC
{
PipeConsumer::PipeConsumer(
RTC::Shared* shared,
const std::string& id,
const std::string& producerId,
RTC::Consumer::Listener* listener,
json& data)
: RTC::Consumer::Consumer(shared, id, producerId, listener, data, RTC::RtpParameters::Type::PIPE)
{
MS_TRACE();
if (this->rtpParameters.encodings.size() != this->consumableRtpEncodings.size())
MS_THROW_TYPE_ERROR("number of rtpParameters.encodings and consumableRtpEncodings do not match");
auto& encoding = this->rtpParameters.encodings[0];
auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);
this->keyFrameSupported = RTC::Codecs::Tools::CanBeKeyFrame(mediaCodec->mimeType);
CreateRtpStreams();
this->shared->channelMessageRegistrator->RegisterHandler(
this->id,
this,
nullptr,
nullptr);
}
PipeConsumer::~PipeConsumer()
{
MS_TRACE();
this->shared->channelMessageRegistrator->UnregisterHandler(this->id);
for (auto* rtpStream : this->rtpStreams)
{
delete rtpStream;
}
this->rtpStreams.clear();
this->mapMappedSsrcSsrc.clear();
this->mapSsrcRtpStream.clear();
}
void PipeConsumer::FillJson(json& jsonObject) const
{
MS_TRACE();
RTC::Consumer::FillJson(jsonObject);
jsonObject["rtpStreams"] = json::array();
auto jsonRtpStreamsIt = jsonObject.find("rtpStreams");
for (auto* rtpStream : this->rtpStreams)
{
jsonRtpStreamsIt->emplace_back(json::value_t::object);
auto& jsonEntry = (*jsonRtpStreamsIt)[jsonRtpStreamsIt->size() - 1];
rtpStream->FillJson(jsonEntry);
}
}
void PipeConsumer::FillJsonStats(json& jsonArray) const
{
MS_TRACE();
for (auto* rtpStream : this->rtpStreams)
{
jsonArray.emplace_back(json::value_t::object);
auto& jsonEntry = jsonArray[jsonArray.size() - 1];
rtpStream->FillJsonStats(jsonEntry);
}
}
void PipeConsumer::FillJsonScore(json& jsonObject) const
{
MS_TRACE();
MS_ASSERT(this->producerRtpStreamScores, "producerRtpStreamScores not set");
jsonObject["score"] = 10;
jsonObject["producerScore"] = 10;
jsonObject["producerScores"] = *this->producerRtpStreamScores;
}
void PipeConsumer::HandleRequest(Channel::ChannelRequest* request)
{
MS_TRACE();
switch (request->methodId)
{
case Channel::ChannelRequest::MethodId::CONSUMER_REQUEST_KEY_FRAME:
{
if (IsActive())
RequestKeyFrame();
request->Accept();
break;
}
case Channel::ChannelRequest::MethodId::CONSUMER_SET_PREFERRED_LAYERS:
{
request->Accept();
break;
}
default:
{
RTC::Consumer::HandleRequest(request);
}
}
}
void PipeConsumer::ProducerRtpStream(RTC::RtpStream* , uint32_t )
{
MS_TRACE();
}
void PipeConsumer::ProducerNewRtpStream(RTC::RtpStream* , uint32_t )
{
MS_TRACE();
}
void PipeConsumer::ProducerRtpStreamScore(
RTC::RtpStream* , uint8_t , uint8_t )
{
MS_TRACE();
}
void PipeConsumer::ProducerRtcpSenderReport(RTC::RtpStream* , bool )
{
MS_TRACE();
}
uint8_t PipeConsumer::GetBitratePriority() const
{
MS_TRACE();
return 0u;
}
uint32_t PipeConsumer::IncreaseLayer(uint32_t , bool )
{
MS_TRACE();
return 0u;
}
void PipeConsumer::ApplyLayers()
{
MS_TRACE();
}
uint32_t PipeConsumer::GetDesiredBitrate() const
{
MS_TRACE();
return 0u;
}
void PipeConsumer::SendRtpPacket(RTC::RtpPacket* packet, std::shared_ptr<RTC::RtpPacket>& sharedPacket)
{
MS_TRACE();
if (!IsActive())
return;
auto payloadType = packet->GetPayloadType();
if (!this->supportedCodecPayloadTypes[payloadType])
{
MS_DEBUG_DEV("payload type not supported [payloadType:%" PRIu8 "]", payloadType);
return;
}
auto ssrc = this->mapMappedSsrcSsrc.at(packet->GetSsrc());
auto* rtpStream = this->mapSsrcRtpStream.at(ssrc);
auto& syncRequired = this->mapRtpStreamSyncRequired.at(rtpStream);
auto& rtpSeqManager = this->mapRtpStreamRtpSeqManager.at(rtpStream);
if (syncRequired && this->keyFrameSupported && !packet->IsKeyFrame())
return;
bool isSyncPacket = syncRequired;
if (isSyncPacket)
{
if (packet->IsKeyFrame())
MS_DEBUG_TAG(rtp, "sync key frame received");
rtpSeqManager.Sync(packet->GetSequenceNumber() - 1);
syncRequired = false;
}
uint16_t seq;
rtpSeqManager.Input(packet->GetSequenceNumber(), seq);
auto origSsrc = packet->GetSsrc();
auto origSeq = packet->GetSequenceNumber();
packet->SetSsrc(ssrc);
packet->SetSequenceNumber(seq);
if (isSyncPacket)
{
MS_DEBUG_TAG(
rtp,
"sending sync packet [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32
"] from original [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp(),
origSsrc,
origSeq);
}
if (rtpStream->ReceivePacket(packet, sharedPacket))
{
this->listener->OnConsumerSendRtpPacket(this, packet);
EmitTraceEventRtpAndKeyFrameTypes(packet);
}
else
{
MS_WARN_TAG(
rtp,
"failed to send packet [ssrc:%" PRIu32 ", seq:%" PRIu16 ", ts:%" PRIu32
"] from original [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetTimestamp(),
origSsrc,
origSeq);
}
packet->SetSsrc(origSsrc);
packet->SetSequenceNumber(origSeq);
}
bool PipeConsumer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs)
{
MS_TRACE();
if (
nowMs != this->lastRtcpSentTime &&
static_cast<float>((nowMs - this->lastRtcpSentTime) * 1.15) < this->maxRtcpInterval
)
{
return true;
}
std::vector<RTCP::SenderReport*> senderReports;
std::vector<RTCP::SdesChunk*> sdesChunks;
std::vector<RTCP::DelaySinceLastRr*> xrReports;
for (auto* rtpStream : this->rtpStreams)
{
auto* report = rtpStream->GetRtcpSenderReport(nowMs);
if (!report)
continue;
senderReports.push_back(report);
auto* sdesChunk = rtpStream->GetRtcpSdesChunk();
sdesChunks.push_back(sdesChunk);
auto* dlrr = rtpStream->GetRtcpXrDelaySinceLastRr(nowMs);
if (dlrr)
{
auto* report = new RTC::RTCP::DelaySinceLastRr();
report->AddSsrcInfo(dlrr);
xrReports.push_back(report);
}
}
if (!packet->Add(senderReports, sdesChunks, xrReports))
return false;
this->lastRtcpSentTime = nowMs;
return true;
}
void PipeConsumer::NeedWorstRemoteFractionLost(uint32_t , uint8_t& worstRemoteFractionLost)
{
MS_TRACE();
if (!IsActive())
return;
for (auto* rtpStream : this->rtpStreams)
{
auto fractionLost = rtpStream->GetFractionLost();
if (fractionLost > worstRemoteFractionLost)
worstRemoteFractionLost = fractionLost;
}
}
void PipeConsumer::ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket)
{
MS_TRACE();
if (!IsActive())
return;
EmitTraceEventNackType();
auto ssrc = nackPacket->GetMediaSsrc();
auto* rtpStream = this->mapSsrcRtpStream.at(ssrc);
rtpStream->ReceiveNack(nackPacket);
}
void PipeConsumer::ReceiveKeyFrameRequest(RTC::RTCP::FeedbackPs::MessageType messageType, uint32_t ssrc)
{
MS_TRACE();
switch (messageType)
{
case RTC::RTCP::FeedbackPs::MessageType::PLI:
{
EmitTraceEventPliType(ssrc);
break;
}
case RTC::RTCP::FeedbackPs::MessageType::FIR:
{
EmitTraceEventFirType(ssrc);
break;
}
default:;
}
auto* rtpStream = this->mapSsrcRtpStream.at(ssrc);
rtpStream->ReceiveKeyFrameRequest(messageType);
if (IsActive())
RequestKeyFrame();
}
void PipeConsumer::ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report)
{
MS_TRACE();
auto* rtpStream = this->mapSsrcRtpStream.at(report->GetSsrc());
rtpStream->ReceiveRtcpReceiverReport(report);
}
void PipeConsumer::ReceiveRtcpXrReceiverReferenceTime(RTC::RTCP::ReceiverReferenceTime* report)
{
MS_TRACE();
for (auto* rtpStream : this->rtpStreams)
{
rtpStream->ReceiveRtcpXrReceiverReferenceTime(report);
}
}
uint32_t PipeConsumer::GetTransmissionRate(uint64_t nowMs)
{
MS_TRACE();
if (!IsActive())
return 0u;
uint32_t rate{ 0u };
for (auto* rtpStream : this->rtpStreams)
{
rate += rtpStream->GetBitrate(nowMs);
}
return rate;
}
float PipeConsumer::GetRtt() const
{
MS_TRACE();
float rtt{ 0 };
for (auto* rtpStream : this->rtpStreams)
{
if (rtpStream->GetRtt() > rtt)
rtt = rtpStream->GetRtt();
}
return rtt;
}
void PipeConsumer::UserOnTransportConnected()
{
MS_TRACE();
for (auto& kv : this->mapRtpStreamSyncRequired)
{
kv.second = true;
}
if (IsActive())
{
for (auto* rtpStream : this->rtpStreams)
{
rtpStream->Resume();
}
RequestKeyFrame();
}
}
void PipeConsumer::UserOnTransportDisconnected()
{
MS_TRACE();
for (auto* rtpStream : this->rtpStreams)
{
rtpStream->Pause();
}
}
void PipeConsumer::UserOnPaused()
{
MS_TRACE();
for (auto* rtpStream : this->rtpStreams)
{
rtpStream->Pause();
}
}
void PipeConsumer::UserOnResumed()
{
MS_TRACE();
for (auto& kv : this->mapRtpStreamSyncRequired)
{
kv.second = true;
}
if (IsActive())
{
for (auto* rtpStream : this->rtpStreams)
{
rtpStream->Resume();
}
RequestKeyFrame();
}
}
void PipeConsumer::CreateRtpStreams()
{
MS_TRACE();
for (size_t idx{ 0u }; idx < this->rtpParameters.encodings.size(); ++idx)
{
auto& encoding = this->rtpParameters.encodings[idx];
const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);
auto& consumableEncoding = this->consumableRtpEncodings[idx];
MS_DEBUG_TAG(
rtp, "[ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]", encoding.ssrc, mediaCodec->payloadType);
RTC::RtpStream::Params params;
params.encodingIdx = idx;
params.ssrc = encoding.ssrc;
params.payloadType = mediaCodec->payloadType;
params.mimeType = mediaCodec->mimeType;
params.clockRate = mediaCodec->clockRate;
params.cname = this->rtpParameters.rtcp.cname;
params.spatialLayers = encoding.spatialLayers;
params.temporalLayers = encoding.temporalLayers;
if (mediaCodec->parameters.HasInteger("useinbandfec") && mediaCodec->parameters.GetInteger("useinbandfec") == 1)
{
MS_DEBUG_TAG(rtp, "in band FEC enabled");
params.useInBandFec = true;
}
if (mediaCodec->parameters.HasInteger("usedtx") && mediaCodec->parameters.GetInteger("usedtx") == 1)
{
MS_DEBUG_TAG(rtp, "DTX enabled");
params.useDtx = true;
}
if (encoding.dtx)
{
MS_DEBUG_TAG(rtp, "DTX enabled");
params.useDtx = true;
}
for (const auto& fb : mediaCodec->rtcpFeedback)
{
if (!params.useNack && fb.type == "nack" && fb.parameter.empty())
{
MS_DEBUG_2TAGS(rtp, rtcp, "NACK supported");
params.useNack = true;
}
else if (!params.usePli && fb.type == "nack" && fb.parameter == "pli")
{
MS_DEBUG_2TAGS(rtp, rtcp, "PLI supported");
params.usePli = true;
}
else if (!params.useFir && fb.type == "ccm" && fb.parameter == "fir")
{
MS_DEBUG_2TAGS(rtp, rtcp, "FIR supported");
params.useFir = true;
}
}
auto* rtpStream = new RTC::RtpStreamSend(this, params, this->rtpParameters.mid);
if (IsPaused() || IsProducerPaused())
rtpStream->Pause();
const auto* rtxCodec = this->rtpParameters.GetRtxCodecForEncoding(encoding);
if (rtxCodec && encoding.hasRtx)
rtpStream->SetRtx(rtxCodec->payloadType, encoding.rtx.ssrc);
this->rtpStreams.push_back(rtpStream);
this->mapMappedSsrcSsrc[consumableEncoding.ssrc] = encoding.ssrc;
this->mapSsrcRtpStream[encoding.ssrc] = rtpStream;
this->mapRtpStreamSyncRequired[rtpStream] = false;
this->mapRtpStreamRtpSeqManager[rtpStream];
}
}
void PipeConsumer::RequestKeyFrame()
{
MS_TRACE();
if (this->kind != RTC::Media::Kind::VIDEO)
return;
for (auto& consumableRtpEncoding : this->consumableRtpEncodings)
{
auto mappedSsrc = consumableRtpEncoding.ssrc;
this->listener->OnConsumerKeyFrameRequested(this, mappedSsrc);
}
}
inline void PipeConsumer::OnRtpStreamScore(
RTC::RtpStream* , uint8_t , uint8_t )
{
MS_TRACE();
}
inline void PipeConsumer::OnRtpStreamRetransmitRtpPacket(
RTC::RtpStreamSend* rtpStream, RTC::RtpPacket* packet)
{
MS_TRACE();
this->listener->OnConsumerRetransmitRtpPacket(this, packet);
EmitTraceEventRtpAndKeyFrameTypes(packet, rtpStream->HasRtx());
}
}