#define MS_CLASS "RTC::RtpStream"
#include "RTC/RtpStream.hpp"
#include "Logger.hpp"
#include "RTC/SeqManager.hpp"
namespace RTC
{
static constexpr uint16_t MaxDropout{ 3000 };
static constexpr uint16_t MaxMisorder{ 1500 };
static constexpr uint32_t RtpSeqMod{ 1 << 16 };
static constexpr size_t ScoreHistogramLength{ 24 };
RtpStream::RtpStream(
RTC::RtpStream::Listener* listener, RTC::RtpStream::Params& params, uint8_t initialScore)
: listener(listener), params(params), score(initialScore), activeSinceMs(DepLibUV::GetTimeMs())
{
MS_TRACE();
}
RtpStream::~RtpStream()
{
MS_TRACE();
delete this->rtxStream;
}
void RtpStream::FillJson(json& jsonObject) const
{
MS_TRACE();
this->params.FillJson(jsonObject["params"]);
jsonObject["score"] = this->score;
if (HasRtx())
this->rtxStream->FillJson(jsonObject["rtxStream"]);
}
void RtpStream::FillJsonStats(json& jsonObject)
{
MS_TRACE();
uint64_t nowMs = DepLibUV::GetTimeMs();
jsonObject["timestamp"] = nowMs;
jsonObject["ssrc"] = this->params.ssrc;
jsonObject["kind"] = RtpCodecMimeType::type2String[this->params.mimeType.type];
jsonObject["mimeType"] = this->params.mimeType.ToString();
jsonObject["packetsLost"] = this->packetsLost;
jsonObject["fractionLost"] = this->fractionLost;
jsonObject["packetsDiscarded"] = this->packetsDiscarded;
jsonObject["packetsRetransmitted"] = this->packetsRetransmitted;
jsonObject["packetsRepaired"] = this->packetsRepaired;
jsonObject["nackCount"] = this->nackCount;
jsonObject["nackPacketCount"] = this->nackPacketCount;
jsonObject["pliCount"] = this->pliCount;
jsonObject["firCount"] = this->firCount;
jsonObject["score"] = this->score;
if (!this->params.rid.empty())
jsonObject["rid"] = this->params.rid;
if (this->params.rtxSsrc)
jsonObject["rtxSsrc"] = this->params.rtxSsrc;
if (this->rtxStream)
jsonObject["rtxPacketsDiscarded"] = this->rtxStream->GetPacketsDiscarded();
if (this->hasRtt)
jsonObject["roundTripTime"] = this->rtt;
}
void RtpStream::SetRtx(uint8_t payloadType, uint32_t ssrc)
{
MS_TRACE();
this->params.rtxPayloadType = payloadType;
this->params.rtxSsrc = ssrc;
if (HasRtx())
{
delete this->rtxStream;
this->rtxStream = nullptr;
}
RTC::RtxStream::Params params;
params.ssrc = ssrc;
params.payloadType = payloadType;
params.mimeType.type = GetMimeType().type;
params.mimeType.subtype = RTC::RtpCodecMimeType::Subtype::RTX;
params.clockRate = GetClockRate();
params.rrid = GetRid();
params.cname = GetCname();
params.mimeType.UpdateMimeType();
this->rtxStream = new RTC::RtxStream(params);
}
bool RtpStream::ReceivePacket(RTC::RtpPacket* packet)
{
MS_TRACE();
uint16_t seq = packet->GetSequenceNumber();
if (!this->started)
{
InitSeq(seq);
this->started = true;
this->maxSeq = seq - 1;
this->maxPacketTs = packet->GetTimestamp();
this->maxPacketMs = DepLibUV::GetTimeMs();
}
if (!UpdateSeq(packet))
{
MS_WARN_TAG(
rtp,
"invalid packet [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
return false;
}
if (RTC::SeqManager<uint32_t>::IsSeqHigherThan(packet->GetTimestamp(), this->maxPacketTs))
{
this->maxPacketTs = packet->GetTimestamp();
this->maxPacketMs = DepLibUV::GetTimeMs();
}
return true;
}
void RtpStream::ResetScore(uint8_t score, bool notify)
{
MS_TRACE();
this->scores.clear();
if (this->score != score)
{
auto previousScore = this->score;
this->score = score;
if (previousScore == 0u)
this->activeSinceMs = DepLibUV::GetTimeMs();
if (notify)
this->listener->OnRtpStreamScore(this, score, previousScore);
}
}
bool RtpStream::UpdateSeq(RTC::RtpPacket* packet)
{
MS_TRACE();
uint16_t seq = packet->GetSequenceNumber();
uint16_t udelta = seq - this->maxSeq;
if (udelta < MaxDropout)
{
if (seq < this->maxSeq)
{
this->cycles += RtpSeqMod;
}
this->maxSeq = seq;
}
else if (udelta <= RtpSeqMod - MaxMisorder)
{
if (seq == this->badSeq)
{
MS_WARN_TAG(
rtp,
"too bad sequence number, re-syncing RTP [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
InitSeq(seq);
this->maxPacketTs = packet->GetTimestamp();
this->maxPacketMs = DepLibUV::GetTimeMs();
}
else
{
MS_WARN_TAG(
rtp,
"bad sequence number, ignoring packet [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
this->badSeq = (seq + 1) & (RtpSeqMod - 1);
this->packetsDiscarded++;
return false;
}
}
else
{
}
return true;
}
void RtpStream::UpdateScore(uint8_t score)
{
MS_TRACE();
if (this->scores.size() == ScoreHistogramLength)
this->scores.erase(this->scores.begin());
auto previousScore = this->score;
this->scores.push_back(score);
size_t weight{ 0 };
size_t samples{ 0 };
size_t totalScore{ 0 };
for (auto score : this->scores)
{
weight++;
samples += weight;
totalScore += weight * score;
}
this->score = static_cast<uint8_t>(std::round(static_cast<double>(totalScore) / samples));
if (this->score != previousScore)
{
MS_DEBUG_TAG(
score,
"[added score:%" PRIu8 ", previous computed score:%" PRIu8 ", new computed score:%" PRIu8
"] (calling listener)",
score,
previousScore,
this->score);
if (previousScore == 0u)
this->activeSinceMs = DepLibUV::GetTimeMs();
this->listener->OnRtpStreamScore(this, this->score, previousScore);
}
else
{
#if MS_LOG_DEV_LEVEL == 3
MS_DEBUG_TAG(
score,
"[added score:%" PRIu8 ", previous computed score:%" PRIu8 ", new computed score:%" PRIu8
"] (no change)",
score,
previousScore,
this->score);
#endif
}
}
void RtpStream::PacketRetransmitted(RTC::RtpPacket* )
{
MS_TRACE();
this->packetsRetransmitted++;
}
void RtpStream::PacketRepaired(RTC::RtpPacket* )
{
MS_TRACE();
this->packetsRepaired++;
}
inline void RtpStream::InitSeq(uint16_t seq)
{
MS_TRACE();
this->baseSeq = seq;
this->maxSeq = seq;
this->badSeq = RtpSeqMod + 1; }
void RtpStream::Params::FillJson(json& jsonObject) const
{
MS_TRACE();
jsonObject["encodingIdx"] = this->encodingIdx;
jsonObject["ssrc"] = this->ssrc;
jsonObject["payloadType"] = this->payloadType;
jsonObject["mimeType"] = this->mimeType.ToString();
jsonObject["clockRate"] = this->clockRate;
if (!this->rid.empty())
jsonObject["rid"] = this->rid;
jsonObject["cname"] = this->cname;
if (this->rtxSsrc != 0)
{
jsonObject["rtxSsrc"] = this->rtxSsrc;
jsonObject["rtxPayloadType"] = this->rtxPayloadType;
}
jsonObject["useNack"] = this->useNack;
jsonObject["usePli"] = this->usePli;
jsonObject["useFir"] = this->useFir;
jsonObject["useInBandFec"] = this->useInBandFec;
jsonObject["useDtx"] = this->useDtx;
jsonObject["spatialLayers"] = this->spatialLayers;
jsonObject["temporalLayers"] = this->temporalLayers;
}
}