#define MS_CLASS "RTC::Producer"
#include "RTC/Producer.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#include "Channel/ChannelNotifier.hpp"
#include "RTC/Codecs/Tools.hpp"
#include "RTC/RTCP/FeedbackPs.hpp"
#include "RTC/RTCP/FeedbackRtp.hpp"
#include "RTC/RTCP/XrReceiverReferenceTime.hpp"
#include <cstring>
#include <iterator>
#include <sstream>
namespace RTC
{
Producer::Producer(const std::string& id, RTC::Producer::Listener* listener, json& data)
: id(id), listener(listener)
{
MS_TRACE();
auto jsonKindIt = data.find("kind");
if (jsonKindIt == data.end() || !jsonKindIt->is_string())
{
MS_THROW_TYPE_ERROR("missing kind");
}
this->kind = RTC::Media::GetKind(jsonKindIt->get<std::string>());
if (this->kind == RTC::Media::Kind::ALL)
{
MS_THROW_TYPE_ERROR("invalid empty kind");
}
auto jsonRtpParametersIt = data.find("rtpParameters");
if (jsonRtpParametersIt == data.end() || !jsonRtpParametersIt->is_object())
{
MS_THROW_TYPE_ERROR("missing rtpParameters");
}
this->rtpParameters = RTC::RtpParameters(*jsonRtpParametersIt);
this->type = RTC::RtpParameters::GetType(this->rtpParameters);
this->rtpStreamByEncodingIdx.resize(this->rtpParameters.encodings.size(), nullptr);
this->rtpStreamScores.resize(this->rtpParameters.encodings.size(), 0u);
auto& encoding = this->rtpParameters.encodings[0];
auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);
if (!RTC::Codecs::Tools::IsValidTypeForCodec(this->type, mediaCodec->mimeType))
{
MS_THROW_TYPE_ERROR(
"%s codec not supported for %s",
mediaCodec->mimeType.ToString().c_str(),
RTC::RtpParameters::GetTypeString(this->type).c_str());
}
auto jsonRtpMappingIt = data.find("rtpMapping");
if (jsonRtpMappingIt == data.end() || !jsonRtpMappingIt->is_object())
{
MS_THROW_TYPE_ERROR("missing rtpMapping");
}
auto jsonCodecsIt = jsonRtpMappingIt->find("codecs");
if (jsonCodecsIt == jsonRtpMappingIt->end() || !jsonCodecsIt->is_array())
{
MS_THROW_TYPE_ERROR("missing rtpMapping.codecs");
}
for (auto& codec : *jsonCodecsIt)
{
if (!codec.is_object())
{
MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.codecs (not an object)");
}
auto jsonPayloadTypeIt = codec.find("payloadType");
if (
jsonPayloadTypeIt == codec.end() ||
!Utils::Json::IsPositiveInteger(*jsonPayloadTypeIt)
)
{
MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.codecs (missing payloadType)");
}
auto jsonMappedPayloadTypeIt = codec.find("mappedPayloadType");
if (
jsonMappedPayloadTypeIt == codec.end() ||
!Utils::Json::IsPositiveInteger(*jsonMappedPayloadTypeIt)
)
{
MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.codecs (missing mappedPayloadType)");
}
this->rtpMapping.codecs[jsonPayloadTypeIt->get<uint8_t>()] =
jsonMappedPayloadTypeIt->get<uint8_t>();
}
auto jsonEncodingsIt = jsonRtpMappingIt->find("encodings");
if (jsonEncodingsIt == jsonRtpMappingIt->end() || !jsonEncodingsIt->is_array())
{
MS_THROW_TYPE_ERROR("missing rtpMapping.encodings");
}
this->rtpMapping.encodings.reserve(jsonEncodingsIt->size());
for (auto& encoding : *jsonEncodingsIt)
{
if (!encoding.is_object())
{
MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.encodings");
}
this->rtpMapping.encodings.emplace_back();
auto& encodingMapping = this->rtpMapping.encodings.back();
auto jsonSsrcIt = encoding.find("ssrc");
if (
jsonSsrcIt != encoding.end() &&
Utils::Json::IsPositiveInteger(*jsonSsrcIt)
)
{
encodingMapping.ssrc = jsonSsrcIt->get<uint32_t>();
}
auto jsonRidIt = encoding.find("rid");
if (jsonRidIt != encoding.end() && jsonRidIt->is_string())
{
encodingMapping.rid = jsonRidIt->get<std::string>();
}
if (
jsonEncodingsIt->size() > 1 &&
jsonSsrcIt == encoding.end() &&
jsonRidIt == encoding.end()
)
{
MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.encodings (missing ssrc or rid)");
}
if (
this->rtpParameters.mid.empty() &&
jsonEncodingsIt->size() == 1 &&
jsonSsrcIt == encoding.end() &&
jsonRidIt == encoding.end()
)
{
MS_THROW_TYPE_ERROR(
"wrong entry in rtpMapping.encodings (missing ssrc or rid, or rtpParameters.mid)");
}
auto jsonMappedSsrcIt = encoding.find("mappedSsrc");
if (
jsonMappedSsrcIt == encoding.end() ||
!Utils::Json::IsPositiveInteger(*jsonMappedSsrcIt)
)
{
MS_THROW_TYPE_ERROR("wrong entry in rtpMapping.encodings (missing mappedSsrc)");
}
encodingMapping.mappedSsrc = jsonMappedSsrcIt->get<uint32_t>();
}
auto jsonPausedIt = data.find("paused");
if (jsonPausedIt != data.end() && jsonPausedIt->is_boolean())
{
this->paused = jsonPausedIt->get<bool>();
}
if (this->rtpParameters.encodings.size() != this->rtpMapping.encodings.size())
{
MS_THROW_TYPE_ERROR("rtpParameters.encodings size does not match rtpMapping.encodings size");
}
for (auto& exten : this->rtpParameters.headerExtensions)
{
if (exten.id == 0u)
{
MS_THROW_TYPE_ERROR("RTP extension id cannot be 0");
}
if (this->rtpHeaderExtensionIds.mid == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::MID)
{
this->rtpHeaderExtensionIds.mid = exten.id;
}
if (this->rtpHeaderExtensionIds.rid == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::RTP_STREAM_ID)
{
this->rtpHeaderExtensionIds.rid = exten.id;
}
if (this->rtpHeaderExtensionIds.rrid == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::REPAIRED_RTP_STREAM_ID)
{
this->rtpHeaderExtensionIds.rrid = exten.id;
}
if (this->rtpHeaderExtensionIds.absSendTime == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::ABS_SEND_TIME)
{
this->rtpHeaderExtensionIds.absSendTime = exten.id;
}
if (this->rtpHeaderExtensionIds.transportWideCc01 == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::TRANSPORT_WIDE_CC_01)
{
this->rtpHeaderExtensionIds.transportWideCc01 = exten.id;
}
if (this->rtpHeaderExtensionIds.frameMarking07 == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::FRAME_MARKING_07)
{
this->rtpHeaderExtensionIds.frameMarking07 = exten.id;
}
if (this->rtpHeaderExtensionIds.frameMarking == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::FRAME_MARKING)
{
this->rtpHeaderExtensionIds.frameMarking = exten.id;
}
if (this->rtpHeaderExtensionIds.ssrcAudioLevel == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::SSRC_AUDIO_LEVEL)
{
this->rtpHeaderExtensionIds.ssrcAudioLevel = exten.id;
}
if (this->rtpHeaderExtensionIds.videoOrientation == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::VIDEO_ORIENTATION)
{
this->rtpHeaderExtensionIds.videoOrientation = exten.id;
}
if (this->rtpHeaderExtensionIds.toffset == 0u && exten.type == RTC::RtpHeaderExtensionUri::Type::TOFFSET)
{
this->rtpHeaderExtensionIds.toffset = exten.id;
}
}
if (this->kind == RTC::Media::Kind::AUDIO)
this->maxRtcpInterval = RTC::RTCP::MaxAudioIntervalMs;
else
this->maxRtcpInterval = RTC::RTCP::MaxVideoIntervalMs;
if (this->kind == RTC::Media::Kind::VIDEO)
{
auto jsonKeyFrameRequestDelayIt = data.find("keyFrameRequestDelay");
uint32_t keyFrameRequestDelay = 0u;
if (
jsonKeyFrameRequestDelayIt != data.end() &&
jsonKeyFrameRequestDelayIt->is_number_integer()
)
{
keyFrameRequestDelay = jsonKeyFrameRequestDelayIt->get<uint32_t>();
}
this->keyFrameRequestManager = new RTC::KeyFrameRequestManager(this, keyFrameRequestDelay);
}
}
Producer::~Producer()
{
MS_TRACE();
for (auto& kv : this->mapSsrcRtpStream)
{
auto* rtpStream = kv.second;
delete rtpStream;
}
this->mapSsrcRtpStream.clear();
this->rtpStreamByEncodingIdx.clear();
this->rtpStreamScores.clear();
this->mapRtxSsrcRtpStream.clear();
this->mapRtpStreamMappedSsrc.clear();
this->mapMappedSsrcSsrc.clear();
delete this->keyFrameRequestManager;
}
void Producer::FillJson(json& jsonObject) const
{
MS_TRACE();
jsonObject["id"] = this->id;
jsonObject["kind"] = RTC::Media::GetString(this->kind);
this->rtpParameters.FillJson(jsonObject["rtpParameters"]);
jsonObject["type"] = RTC::RtpParameters::GetTypeString(this->type);
jsonObject["rtpMapping"] = json::object();
auto jsonRtpMappingIt = jsonObject.find("rtpMapping");
{
(*jsonRtpMappingIt)["codecs"] = json::array();
auto jsonCodecsIt = jsonRtpMappingIt->find("codecs");
size_t idx{ 0 };
for (auto& kv : this->rtpMapping.codecs)
{
jsonCodecsIt->emplace_back(json::value_t::object);
auto& jsonEntry = (*jsonCodecsIt)[idx];
auto payloadType = kv.first;
auto mappedPayloadType = kv.second;
jsonEntry["payloadType"] = payloadType;
jsonEntry["mappedPayloadType"] = mappedPayloadType;
++idx;
}
}
{
(*jsonRtpMappingIt)["encodings"] = json::array();
auto jsonEncodingsIt = jsonRtpMappingIt->find("encodings");
for (size_t i{ 0 }; i < this->rtpMapping.encodings.size(); ++i)
{
jsonEncodingsIt->emplace_back(json::value_t::object);
auto& jsonEntry = (*jsonEncodingsIt)[i];
const auto& encodingMapping = this->rtpMapping.encodings[i];
if (!encodingMapping.rid.empty())
jsonEntry["rid"] = encodingMapping.rid;
else
jsonEntry["rid"] = nullptr;
if (encodingMapping.ssrc != 0u)
jsonEntry["ssrc"] = encodingMapping.ssrc;
else
jsonEntry["ssrc"] = nullptr;
jsonEntry["mappedSsrc"] = encodingMapping.mappedSsrc;
}
}
jsonObject["rtpStreams"] = json::array();
auto jsonRtpStreamsIt = jsonObject.find("rtpStreams");
for (auto* rtpStream : this->rtpStreamByEncodingIdx)
{
if (!rtpStream)
continue;
jsonRtpStreamsIt->emplace_back(json::value_t::object);
auto& jsonEntry = (*jsonRtpStreamsIt)[jsonRtpStreamsIt->size() - 1];
rtpStream->FillJson(jsonEntry);
}
jsonObject["paused"] = this->paused;
std::vector<std::string> traceEventTypes;
std::ostringstream traceEventTypesStream;
if (this->traceEventTypes.rtp)
traceEventTypes.emplace_back("rtp");
if (this->traceEventTypes.keyframe)
traceEventTypes.emplace_back("keyframe");
if (this->traceEventTypes.nack)
traceEventTypes.emplace_back("nack");
if (this->traceEventTypes.pli)
traceEventTypes.emplace_back("pli");
if (this->traceEventTypes.fir)
traceEventTypes.emplace_back("fir");
if (!traceEventTypes.empty())
{
std::copy(
traceEventTypes.begin(),
traceEventTypes.end() - 1,
std::ostream_iterator<std::string>(traceEventTypesStream, ","));
traceEventTypesStream << traceEventTypes.back();
}
jsonObject["traceEventTypes"] = traceEventTypesStream.str();
}
void Producer::FillJsonStats(json& jsonArray) const
{
MS_TRACE();
for (auto* rtpStream : this->rtpStreamByEncodingIdx)
{
if (!rtpStream)
continue;
jsonArray.emplace_back(json::value_t::object);
auto& jsonEntry = jsonArray[jsonArray.size() - 1];
rtpStream->FillJsonStats(jsonEntry);
}
}
void Producer::HandleRequest(Channel::ChannelRequest* request)
{
MS_TRACE();
switch (request->methodId)
{
case Channel::ChannelRequest::MethodId::PRODUCER_DUMP:
{
json data = json::object();
FillJson(data);
request->Accept(data);
break;
}
case Channel::ChannelRequest::MethodId::PRODUCER_GET_STATS:
{
json data = json::array();
FillJsonStats(data);
request->Accept(data);
break;
}
case Channel::ChannelRequest::MethodId::PRODUCER_PAUSE:
{
if (this->paused)
{
request->Accept();
return;
}
for (auto& kv : this->mapSsrcRtpStream)
{
auto* rtpStream = kv.second;
rtpStream->Pause();
}
this->paused = true;
MS_DEBUG_DEV("Producer paused [producerId:%s]", this->id.c_str());
this->listener->OnProducerPaused(this);
request->Accept();
break;
}
case Channel::ChannelRequest::MethodId::PRODUCER_RESUME:
{
if (!this->paused)
{
request->Accept();
return;
}
for (auto& kv : this->mapSsrcRtpStream)
{
auto* rtpStream = kv.second;
rtpStream->Resume();
}
this->paused = false;
MS_DEBUG_DEV("Producer resumed [producerId:%s]", this->id.c_str());
this->listener->OnProducerResumed(this);
if (this->keyFrameRequestManager)
{
MS_DEBUG_2TAGS(rtcp, rtx, "requesting forced key frame(s) after resumed");
for (auto& kv : this->mapSsrcRtpStream)
{
auto ssrc = kv.first;
this->keyFrameRequestManager->ForceKeyFrameNeeded(ssrc);
}
}
request->Accept();
break;
}
case Channel::ChannelRequest::MethodId::PRODUCER_ENABLE_TRACE_EVENT:
{
auto jsonTypesIt = request->data.find("types");
if (jsonTypesIt == request->data.end() || !jsonTypesIt->is_array())
MS_THROW_TYPE_ERROR("wrong types (not an array)");
struct TraceEventTypes newTraceEventTypes;
for (const auto& type : *jsonTypesIt)
{
if (!type.is_string())
MS_THROW_TYPE_ERROR("wrong type (not a string)");
std::string typeStr = type.get<std::string>();
if (typeStr == "rtp")
newTraceEventTypes.rtp = true;
else if (typeStr == "keyframe")
newTraceEventTypes.keyframe = true;
else if (typeStr == "nack")
newTraceEventTypes.nack = true;
else if (typeStr == "pli")
newTraceEventTypes.pli = true;
else if (typeStr == "fir")
newTraceEventTypes.fir = true;
}
this->traceEventTypes = newTraceEventTypes;
request->Accept();
break;
}
default:
{
MS_THROW_ERROR("unknown method '%s'", request->method.c_str());
}
}
}
Producer::ReceiveRtpPacketResult Producer::ReceiveRtpPacket(RTC::RtpPacket* packet)
{
MS_TRACE();
this->currentRtpPacket = nullptr;
auto numRtpStreamsBefore = this->mapSsrcRtpStream.size();
auto* rtpStream = GetRtpStream(packet);
if (!rtpStream)
{
MS_WARN_TAG(rtp, "no stream found for received packet [ssrc:%" PRIu32 "]", packet->GetSsrc());
return ReceiveRtpPacketResult::DISCARDED;
}
PreProcessRtpPacket(packet);
ReceiveRtpPacketResult result;
bool isRtx{ false };
if (packet->GetSsrc() == rtpStream->GetSsrc())
{
result = ReceiveRtpPacketResult::MEDIA;
if (!rtpStream->ReceivePacket(packet))
{
if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore)
NotifyNewRtpStream(rtpStream);
return result;
}
}
else if (packet->GetSsrc() == rtpStream->GetRtxSsrc())
{
result = ReceiveRtpPacketResult::RETRANSMISSION;
isRtx = true;
if (!rtpStream->ReceiveRtxPacket(packet))
return result;
}
else
{
MS_ABORT("found stream does not match received packet");
}
if (packet->IsKeyFrame())
{
MS_DEBUG_TAG(
rtp,
"key frame received [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
if (this->keyFrameRequestManager)
this->keyFrameRequestManager->KeyFrameReceived(packet->GetSsrc());
}
if (this->mapSsrcRtpStream.size() > numRtpStreamsBefore)
{
if (this->keyFrameRequestManager && !this->paused && !packet->IsKeyFrame())
this->keyFrameRequestManager->ForceKeyFrameNeeded(packet->GetSsrc());
this->currentRtpPacket = packet;
NotifyNewRtpStream(rtpStream);
this->currentRtpPacket = nullptr;
}
if (this->paused)
return result;
EmitTraceEventRtpAndKeyFrameTypes(packet, isRtx);
if (!MangleRtpPacket(packet, rtpStream))
return ReceiveRtpPacketResult::DISCARDED;
PostProcessRtpPacket(packet);
this->listener->OnProducerRtpPacketReceived(this, packet);
return result;
}
void Producer::ReceiveRtcpSenderReport(RTC::RTCP::SenderReport* report)
{
MS_TRACE();
auto it = this->mapSsrcRtpStream.find(report->GetSsrc());
if (it != this->mapSsrcRtpStream.end())
{
auto* rtpStream = it->second;
bool first = rtpStream->GetSenderReportNtpMs() == 0;
rtpStream->ReceiveRtcpSenderReport(report);
this->listener->OnProducerRtcpSenderReport(this, rtpStream, first);
return;
}
auto it2 = this->mapRtxSsrcRtpStream.find(report->GetSsrc());
if (it2 != this->mapRtxSsrcRtpStream.end())
{
auto* rtpStream = it2->second;
rtpStream->ReceiveRtxRtcpSenderReport(report);
return;
}
MS_DEBUG_TAG(rtcp, "RtpStream not found [ssrc:%" PRIu32 "]", report->GetSsrc());
}
void Producer::ReceiveRtcpXrDelaySinceLastRr(RTC::RTCP::DelaySinceLastRr::SsrcInfo* ssrcInfo)
{
MS_TRACE();
auto it = this->mapSsrcRtpStream.find(ssrcInfo->GetSsrc());
if (it == this->mapSsrcRtpStream.end())
{
MS_WARN_TAG(rtcp, "RtpStream not found [ssrc:%" PRIu32 "]", ssrcInfo->GetSsrc());
return;
}
auto* rtpStream = it->second;
rtpStream->ReceiveRtcpXrDelaySinceLastRr(ssrcInfo);
}
void Producer::GetRtcp(RTC::RTCP::CompoundPacket* packet, uint64_t nowMs)
{
MS_TRACE();
if (static_cast<float>((nowMs - this->lastRtcpSentTime) * 1.15) < this->maxRtcpInterval)
return;
for (auto& kv : this->mapSsrcRtpStream)
{
auto* rtpStream = kv.second;
auto* report = rtpStream->GetRtcpReceiverReport();
packet->AddReceiverReport(report);
auto* rtxReport = rtpStream->GetRtxRtcpReceiverReport();
if (rtxReport)
packet->AddReceiverReport(rtxReport);
}
if (!packet->HasReceiverReferenceTime())
{
auto ntp = Utils::Time::TimeMs2Ntp(nowMs);
auto* report = new RTC::RTCP::ReceiverReferenceTime();
report->SetNtpSec(ntp.seconds);
report->SetNtpFrac(ntp.fractions);
packet->AddReceiverReferenceTime(report);
}
this->lastRtcpSentTime = nowMs;
}
void Producer::RequestKeyFrame(uint32_t mappedSsrc)
{
MS_TRACE();
if (!this->keyFrameRequestManager || this->paused)
return;
auto it = this->mapMappedSsrcSsrc.find(mappedSsrc);
if (it == this->mapMappedSsrcSsrc.end())
{
MS_WARN_2TAGS(rtcp, rtx, "given mappedSsrc not found, ignoring");
return;
}
uint32_t ssrc = it->second;
if (
this->currentRtpPacket &&
this->currentRtpPacket->GetSsrc() == ssrc &&
this->currentRtpPacket->IsKeyFrame()
)
{
return;
}
this->keyFrameRequestManager->KeyFrameNeeded(ssrc);
}
RTC::RtpStreamRecv* Producer::GetRtpStream(RTC::RtpPacket* packet)
{
MS_TRACE();
uint32_t ssrc = packet->GetSsrc();
uint8_t payloadType = packet->GetPayloadType();
{
auto it = this->mapSsrcRtpStream.find(ssrc);
if (it != this->mapSsrcRtpStream.end())
{
auto* rtpStream = it->second;
return rtpStream;
}
}
{
auto it = this->mapRtxSsrcRtpStream.find(ssrc);
if (it != this->mapRtxSsrcRtpStream.end())
{
auto* rtpStream = it->second;
return rtpStream;
}
}
for (size_t i{ 0 }; i < this->rtpParameters.encodings.size(); ++i)
{
auto& encoding = this->rtpParameters.encodings[i];
const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);
const auto* rtxCodec = this->rtpParameters.GetRtxCodecForEncoding(encoding);
bool isMediaPacket = (mediaCodec->payloadType == payloadType);
bool isRtxPacket = (rtxCodec && rtxCodec->payloadType == payloadType);
if (isMediaPacket && encoding.ssrc == ssrc)
{
auto* rtpStream = CreateRtpStream(packet, *mediaCodec, i);
return rtpStream;
}
else if (isRtxPacket && encoding.hasRtx && encoding.rtx.ssrc == ssrc)
{
auto it = this->mapSsrcRtpStream.find(encoding.ssrc);
if (it == this->mapSsrcRtpStream.end())
{
MS_DEBUG_2TAGS(rtp, rtx, "ignoring RTX packet for not yet created RtpStream (ssrc lookup)");
return nullptr;
}
auto* rtpStream = it->second;
if (rtpStream->HasRtx())
{
MS_DEBUG_2TAGS(rtp, rtx, "ignoring RTX packet with new ssrc (ssrc lookup)");
return nullptr;
}
rtpStream->SetRtx(payloadType, ssrc);
this->mapRtxSsrcRtpStream[ssrc] = rtpStream;
return rtpStream;
}
}
std::string rid;
if (packet->ReadRid(rid))
{
for (size_t i{ 0 }; i < this->rtpParameters.encodings.size(); ++i)
{
auto& encoding = this->rtpParameters.encodings[i];
if (encoding.rid != rid)
continue;
const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);
const auto* rtxCodec = this->rtpParameters.GetRtxCodecForEncoding(encoding);
bool isMediaPacket = (mediaCodec->payloadType == payloadType);
bool isRtxPacket = (rtxCodec && rtxCodec->payloadType == payloadType);
if (isMediaPacket)
{
for (auto& kv : this->mapSsrcRtpStream)
{
auto* rtpStream = kv.second;
if (rtpStream->GetRid() == rid)
{
MS_WARN_TAG(
rtp, "ignoring packet with unknown ssrc but already handled RID (RID lookup)");
return nullptr;
}
}
auto* rtpStream = CreateRtpStream(packet, *mediaCodec, i);
return rtpStream;
}
else if (isRtxPacket)
{
for (auto& kv : this->mapSsrcRtpStream)
{
auto* rtpStream = kv.second;
if (rtpStream->GetRid() == rid)
{
if (rtpStream->HasRtx())
{
MS_DEBUG_2TAGS(rtp, rtx, "ignoring RTX packet with new SSRC (RID lookup)");
return nullptr;
}
rtpStream->SetRtx(payloadType, ssrc);
this->mapRtxSsrcRtpStream[ssrc] = rtpStream;
return rtpStream;
}
}
MS_DEBUG_2TAGS(rtp, rtx, "ignoring RTX packet for not yet created RtpStream (RID lookup)");
return nullptr;
}
}
MS_WARN_TAG(rtp, "ignoring packet with unknown RID (RID lookup)");
return nullptr;
}
if (
this->rtpParameters.encodings.size() == 1 &&
!this->rtpParameters.encodings[0].ssrc &&
this->rtpParameters.encodings[0].rid.empty()
)
{
auto& encoding = this->rtpParameters.encodings[0];
const auto* mediaCodec = this->rtpParameters.GetCodecForEncoding(encoding);
const auto* rtxCodec = this->rtpParameters.GetRtxCodecForEncoding(encoding);
bool isMediaPacket = (mediaCodec->payloadType == payloadType);
bool isRtxPacket = (rtxCodec && rtxCodec->payloadType == payloadType);
if (isMediaPacket)
{
if (!this->mapSsrcRtpStream.empty())
{
MS_WARN_TAG(
rtp,
"ignoring packet with unknown ssrc not matching the already existing stream (single RtpStream lookup)");
return nullptr;
}
auto* rtpStream = CreateRtpStream(packet, *mediaCodec, 0);
return rtpStream;
}
else if (isRtxPacket)
{
auto it = this->mapSsrcRtpStream.begin();
if (it == this->mapSsrcRtpStream.end())
{
MS_DEBUG_2TAGS(
rtp, rtx, "ignoring RTX packet for not yet created RtpStream (single stream lookup)");
return nullptr;
}
auto* rtpStream = it->second;
if (rtpStream->HasRtx())
{
MS_DEBUG_2TAGS(rtp, rtx, "ignoring RTX packet with new SSRC (single stream lookup)");
return nullptr;
}
rtpStream->SetRtx(payloadType, ssrc);
this->mapRtxSsrcRtpStream[ssrc] = rtpStream;
return rtpStream;
}
}
return nullptr;
}
RTC::RtpStreamRecv* Producer::CreateRtpStream(
RTC::RtpPacket* packet, const RTC::RtpCodecParameters& mediaCodec, size_t encodingIdx)
{
MS_TRACE();
uint32_t ssrc = packet->GetSsrc();
MS_ASSERT(
this->mapSsrcRtpStream.find(ssrc) == this->mapSsrcRtpStream.end(),
"RtpStream with given SSRC already exists");
MS_ASSERT(
!this->rtpStreamByEncodingIdx[encodingIdx],
"RtpStream for given encoding index already exists");
auto& encoding = this->rtpParameters.encodings[encodingIdx];
auto& encodingMapping = this->rtpMapping.encodings[encodingIdx];
MS_DEBUG_TAG(
rtp,
"[encodingIdx:%zu, ssrc:%" PRIu32 ", rid:%s, payloadType:%" PRIu8 "]",
encodingIdx,
ssrc,
encoding.rid.c_str(),
mediaCodec.payloadType);
RTC::RtpStream::Params params;
params.encodingIdx = encodingIdx;
params.ssrc = ssrc;
params.payloadType = mediaCodec.payloadType;
params.mimeType = mediaCodec.mimeType;
params.clockRate = mediaCodec.clockRate;
params.rid = encoding.rid;
params.cname = this->rtpParameters.rtcp.cname;
params.spatialLayers = encoding.spatialLayers;
params.temporalLayers = encoding.temporalLayers;
if (mediaCodec.parameters.HasInteger("useinbandfec") && mediaCodec.parameters.GetInteger("useinbandfec") == 1)
{
MS_DEBUG_TAG(rtcp, "in band FEC enabled");
params.useInBandFec = true;
}
if (mediaCodec.parameters.HasInteger("usedtx") && mediaCodec.parameters.GetInteger("usedtx") == 1)
{
MS_DEBUG_TAG(rtp, "DTX enabled");
params.useDtx = true;
}
if (encoding.dtx)
{
MS_DEBUG_TAG(rtp, "DTX enabled");
params.useDtx = true;
}
for (const auto& fb : mediaCodec.rtcpFeedback)
{
if (!params.useNack && fb.type == "nack" && fb.parameter.empty())
{
MS_DEBUG_2TAGS(rtp, rtcp, "NACK supported");
params.useNack = true;
}
else if (!params.usePli && fb.type == "nack" && fb.parameter == "pli")
{
MS_DEBUG_2TAGS(rtp, rtcp, "PLI supported");
params.usePli = true;
}
else if (!params.useFir && fb.type == "ccm" && fb.parameter == "fir")
{
MS_DEBUG_2TAGS(rtp, rtcp, "FIR supported");
params.useFir = true;
}
}
auto* rtpStream = new RTC::RtpStreamRecv(this, params);
this->mapSsrcRtpStream[ssrc] = rtpStream;
this->rtpStreamByEncodingIdx[encodingIdx] = rtpStream;
this->rtpStreamScores[encodingIdx] = rtpStream->GetScore();
this->mapRtpStreamMappedSsrc[rtpStream] = encodingMapping.mappedSsrc;
this->mapMappedSsrcSsrc[encodingMapping.mappedSsrc] = ssrc;
if (this->paused)
rtpStream->Pause();
EmitScore();
return rtpStream;
}
void Producer::NotifyNewRtpStream(RTC::RtpStreamRecv* rtpStream)
{
MS_TRACE();
auto mappedSsrc = this->mapRtpStreamMappedSsrc.at(rtpStream);
this->listener->OnProducerNewRtpStream(this, static_cast<RTC::RtpStream*>(rtpStream), mappedSsrc);
}
inline void Producer::PreProcessRtpPacket(RTC::RtpPacket* packet)
{
MS_TRACE();
if (this->kind == RTC::Media::Kind::VIDEO)
{
packet->SetFrameMarking07ExtensionId(this->rtpHeaderExtensionIds.frameMarking07);
packet->SetFrameMarkingExtensionId(this->rtpHeaderExtensionIds.frameMarking);
}
}
inline bool Producer::MangleRtpPacket(RTC::RtpPacket* packet, RTC::RtpStreamRecv* rtpStream) const
{
MS_TRACE();
{
uint8_t payloadType = packet->GetPayloadType();
auto it = this->rtpMapping.codecs.find(payloadType);
if (it == this->rtpMapping.codecs.end())
{
MS_WARN_TAG(rtp, "unknown payload type [payloadType:%" PRIu8 "]", payloadType);
return false;
}
uint8_t mappedPayloadType = it->second;
packet->SetPayloadType(mappedPayloadType);
}
{
uint32_t mappedSsrc = this->mapRtpStreamMappedSsrc.at(rtpStream);
packet->SetSsrc(mappedSsrc);
}
{
thread_local static uint8_t buffer[4096];
thread_local static std::vector<RTC::RtpPacket::GenericExtension> extensions;
if (extensions.capacity() != 24)
extensions.reserve(24);
extensions.clear();
uint8_t* extenValue;
uint8_t extenLen;
uint8_t* bufferPtr{ buffer };
{
extenLen = RTC::MidMaxLength;
extensions.emplace_back(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::MID), extenLen, bufferPtr);
bufferPtr += extenLen;
}
if (this->kind == RTC::Media::Kind::AUDIO)
{
extenValue = packet->GetExtension(this->rtpHeaderExtensionIds.ssrcAudioLevel, extenLen);
if (extenValue)
{
std::memcpy(bufferPtr, extenValue, extenLen);
extensions.emplace_back(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::SSRC_AUDIO_LEVEL),
extenLen,
bufferPtr);
}
}
else if (this->kind == RTC::Media::Kind::VIDEO)
{
{
extenLen = 3u;
uint32_t absSendTime{ 0u };
Utils::Byte::Set3Bytes(bufferPtr, 0, absSendTime);
extensions.emplace_back(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::ABS_SEND_TIME), extenLen, bufferPtr);
bufferPtr += extenLen;
}
{
extenLen = 2u;
uint16_t wideSeqNumber{ 0u };
Utils::Byte::Set2Bytes(bufferPtr, 0, wideSeqNumber);
extensions.emplace_back(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::TRANSPORT_WIDE_CC_01),
extenLen,
bufferPtr);
bufferPtr += extenLen;
}
extenValue = packet->GetExtension(this->rtpHeaderExtensionIds.frameMarking07, extenLen);
if (extenValue)
{
std::memcpy(bufferPtr, extenValue, extenLen);
extensions.emplace_back(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::FRAME_MARKING_07),
extenLen,
bufferPtr);
bufferPtr += extenLen;
}
extenValue = packet->GetExtension(this->rtpHeaderExtensionIds.frameMarking, extenLen);
if (extenValue)
{
std::memcpy(bufferPtr, extenValue, extenLen);
extensions.emplace_back(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::FRAME_MARKING), extenLen, bufferPtr);
bufferPtr += extenLen;
}
extenValue = packet->GetExtension(this->rtpHeaderExtensionIds.videoOrientation, extenLen);
if (extenValue)
{
std::memcpy(bufferPtr, extenValue, extenLen);
extensions.emplace_back(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::VIDEO_ORIENTATION),
extenLen,
bufferPtr);
bufferPtr += extenLen;
}
extenValue = packet->GetExtension(this->rtpHeaderExtensionIds.toffset, extenLen);
if (extenValue)
{
std::memcpy(bufferPtr, extenValue, extenLen);
extensions.emplace_back(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::TOFFSET), extenLen, bufferPtr);
}
}
packet->SetExtensions(1, extensions);
packet->SetMidExtensionId(static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::MID));
packet->SetAbsSendTimeExtensionId(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::ABS_SEND_TIME));
packet->SetTransportWideCc01ExtensionId(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::TRANSPORT_WIDE_CC_01));
packet->SetFrameMarking07ExtensionId(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::FRAME_MARKING_07));
packet->SetFrameMarkingExtensionId(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::FRAME_MARKING));
packet->SetSsrcAudioLevelExtensionId(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::SSRC_AUDIO_LEVEL));
packet->SetVideoOrientationExtensionId(
static_cast<uint8_t>(RTC::RtpHeaderExtensionUri::Type::VIDEO_ORIENTATION));
}
return true;
}
inline void Producer::PostProcessRtpPacket(RTC::RtpPacket* packet)
{
MS_TRACE();
if (this->kind == RTC::Media::Kind::VIDEO)
{
bool camera;
bool flip;
uint16_t rotation;
if (packet->ReadVideoOrientation(camera, flip, rotation))
{
if (
!this->videoOrientationDetected ||
camera != this->videoOrientation.camera ||
flip != this->videoOrientation.flip ||
rotation != this->videoOrientation.rotation
)
{
this->videoOrientationDetected = true;
this->videoOrientation.camera = camera;
this->videoOrientation.flip = flip;
this->videoOrientation.rotation = rotation;
json data = json::object();
data["camera"] = this->videoOrientation.camera;
data["flip"] = this->videoOrientation.flip;
data["rotation"] = this->videoOrientation.rotation;
Channel::ChannelNotifier::Emit(this->id, "videoorientationchange", data);
}
}
}
}
inline void Producer::EmitScore() const
{
MS_TRACE();
json data = json::array();
for (auto* rtpStream : this->rtpStreamByEncodingIdx)
{
if (!rtpStream)
continue;
data.emplace_back(json::value_t::object);
auto& jsonEntry = data[data.size() - 1];
jsonEntry["encodingIdx"] = rtpStream->GetEncodingIdx();
jsonEntry["ssrc"] = rtpStream->GetSsrc();
if (!rtpStream->GetRid().empty())
jsonEntry["rid"] = rtpStream->GetRid();
jsonEntry["score"] = rtpStream->GetScore();
}
Channel::ChannelNotifier::Emit(this->id, "score", data);
}
inline void Producer::EmitTraceEventRtpAndKeyFrameTypes(RTC::RtpPacket* packet, bool isRtx) const
{
MS_TRACE();
if (this->traceEventTypes.keyframe && packet->IsKeyFrame())
{
json data = json::object();
data["type"] = "keyframe";
data["timestamp"] = DepLibUV::GetTimeMs();
data["direction"] = "in";
packet->FillJson(data["info"]);
if (isRtx)
data["info"]["isRtx"] = true;
Channel::ChannelNotifier::Emit(this->id, "trace", data);
}
else if (this->traceEventTypes.rtp)
{
json data = json::object();
data["type"] = "rtp";
data["timestamp"] = DepLibUV::GetTimeMs();
data["direction"] = "in";
packet->FillJson(data["info"]);
if (isRtx)
data["info"]["isRtx"] = true;
Channel::ChannelNotifier::Emit(this->id, "trace", data);
}
}
inline void Producer::EmitTraceEventPliType(uint32_t ssrc) const
{
MS_TRACE();
if (!this->traceEventTypes.pli)
return;
json data = json::object();
data["type"] = "pli";
data["timestamp"] = DepLibUV::GetTimeMs();
data["direction"] = "out";
data["info"]["ssrc"] = ssrc;
Channel::ChannelNotifier::Emit(this->id, "trace", data);
}
inline void Producer::EmitTraceEventFirType(uint32_t ssrc) const
{
MS_TRACE();
if (!this->traceEventTypes.fir)
return;
json data = json::object();
data["type"] = "fir";
data["timestamp"] = DepLibUV::GetTimeMs();
data["direction"] = "out";
data["info"]["ssrc"] = ssrc;
Channel::ChannelNotifier::Emit(this->id, "trace", data);
}
inline void Producer::EmitTraceEventNackType() const
{
MS_TRACE();
if (!this->traceEventTypes.nack)
return;
json data = json::object();
data["type"] = "nack";
data["timestamp"] = DepLibUV::GetTimeMs();
data["direction"] = "out";
data["info"] = json::object();
Channel::ChannelNotifier::Emit(this->id, "trace", data);
}
inline void Producer::OnRtpStreamScore(RTC::RtpStream* rtpStream, uint8_t score, uint8_t previousScore)
{
MS_TRACE();
this->rtpStreamScores[rtpStream->GetEncodingIdx()] = score;
this->listener->OnProducerRtpStreamScore(this, rtpStream, score, previousScore);
EmitScore();
}
inline void Producer::OnRtpStreamSendRtcpPacket(
RTC::RtpStreamRecv* , RTC::RTCP::Packet* packet)
{
switch (packet->GetType())
{
case RTC::RTCP::Type::PSFB:
{
auto* feedback = static_cast<RTC::RTCP::FeedbackPsPacket*>(packet);
switch (feedback->GetMessageType())
{
case RTC::RTCP::FeedbackPs::MessageType::PLI:
{
EmitTraceEventPliType(feedback->GetMediaSsrc());
break;
}
case RTC::RTCP::FeedbackPs::MessageType::FIR:
{
EmitTraceEventFirType(feedback->GetMediaSsrc());
break;
}
default:;
}
}
case RTC::RTCP::Type::RTPFB:
{
auto* feedback = static_cast<RTC::RTCP::FeedbackRtpPacket*>(packet);
switch (feedback->GetMessageType())
{
case RTC::RTCP::FeedbackRtp::MessageType::NACK:
{
EmitTraceEventNackType();
break;
}
default:;
}
}
default:;
}
this->listener->OnProducerSendRtcpPacket(this, packet);
}
inline void Producer::OnRtpStreamNeedWorstRemoteFractionLost(
RTC::RtpStreamRecv* rtpStream, uint8_t& worstRemoteFractionLost)
{
auto mappedSsrc = this->mapRtpStreamMappedSsrc.at(rtpStream);
this->listener->OnProducerNeedWorstRemoteFractionLost(this, mappedSsrc, worstRemoteFractionLost);
}
inline void Producer::OnKeyFrameNeeded(
RTC::KeyFrameRequestManager* , uint32_t ssrc)
{
MS_TRACE();
auto it = this->mapSsrcRtpStream.find(ssrc);
if (it == this->mapSsrcRtpStream.end())
{
MS_WARN_2TAGS(rtcp, rtx, "no associated RtpStream found [ssrc:%" PRIu32 "]", ssrc);
return;
}
auto* rtpStream = it->second;
rtpStream->RequestKeyFrame();
}
}