#define MS_CLASS "RTC::RtpStream"
#include "RTC/RtpStream.hpp"
#include "Logger.hpp"
#include "Utils.hpp"
namespace RTC
{
static constexpr uint16_t MaxDropout{ 3000 };
static constexpr uint16_t MaxMisorder{ 1500 };
static constexpr uint32_t RtpSeqMod{ 1 << 16 };
static constexpr size_t ScoreHistogramLength{ 24 };
RtpStream::RtpStream(
RTC::RtpStream::Listener* listener, RTC::RtpStream::Params& params, uint8_t initialScore)
: listener(listener), params(params), score(initialScore), activeSinceMs(DepLibUV::GetTimeMs())
{
MS_TRACE();
}
RtpStream::~RtpStream()
{
MS_TRACE();
delete this->rtxStream;
this->rtxStream = nullptr;
}
flatbuffers::Offset<FBS::RtpStream::Dump> RtpStream::FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const
{
MS_TRACE();
auto params = this->params.FillBuffer(builder);
flatbuffers::Offset<FBS::RtxStream::RtxDump> rtxStream;
if (HasRtx())
{
rtxStream = this->rtxStream->FillBuffer(builder);
}
return FBS::RtpStream::CreateDump(builder, params, this->score, rtxStream);
}
flatbuffers::Offset<FBS::RtpStream::Stats> RtpStream::FillBufferStats(
flatbuffers::FlatBufferBuilder& builder)
{
MS_TRACE();
const uint64_t nowMs = DepLibUV::GetTimeMs();
const auto mediaKind = this->params.mimeType.type == RTC::RtpCodecMimeType::Type::AUDIO
? FBS::RtpParameters::MediaKind::AUDIO
: FBS::RtpParameters::MediaKind::VIDEO;
auto baseStats = FBS::RtpStream::CreateBaseStatsDirect(
builder,
nowMs,
this->params.ssrc,
mediaKind,
this->params.mimeType.ToString().c_str(),
this->packetsLost,
this->fractionLost,
this->packetsDiscarded,
this->packetsRetransmitted,
this->packetsRepaired,
this->nackCount,
this->nackPacketCount,
this->pliCount,
this->firCount,
this->score,
!this->params.rid.empty() ? this->params.rid.c_str() : nullptr,
this->params.rtxSsrc ? flatbuffers::Optional<uint32_t>(this->params.rtxSsrc)
: flatbuffers::nullopt,
this->rtxStream ? this->rtxStream->GetPacketsDiscarded() : 0,
this->rtt > 0.0f ? this->rtt : 0);
return FBS::RtpStream::CreateStats(
builder, FBS::RtpStream::StatsData::BaseStats, baseStats.Union());
}
void RtpStream::SetRtx(uint8_t payloadType, uint32_t ssrc)
{
MS_TRACE();
this->params.rtxPayloadType = payloadType;
this->params.rtxSsrc = ssrc;
if (HasRtx())
{
delete this->rtxStream;
this->rtxStream = nullptr;
}
RTC::RtxStream::Params params;
params.ssrc = ssrc;
params.payloadType = payloadType;
params.mimeType.type = GetMimeType().type;
params.mimeType.subtype = RTC::RtpCodecMimeType::Subtype::RTX;
params.clockRate = GetClockRate();
params.rrid = GetRid();
params.cname = GetCname();
params.mimeType.UpdateMimeType();
this->rtxStream = new RTC::RtxStream(params);
}
bool RtpStream::ReceiveStreamPacket(RTC::RtpPacket* packet)
{
MS_TRACE();
const uint16_t seq = packet->GetSequenceNumber();
if (!this->started)
{
InitSeq(seq);
this->started = true;
this->maxSeq = seq - 1;
this->maxPacketTs = packet->GetTimestamp();
this->maxPacketMs = DepLibUV::GetTimeMs();
}
if (!UpdateSeq(packet))
{
MS_WARN_TAG(
rtp,
"invalid packet [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
return false;
}
if (Utils::Number<uint32_t>::IsHigherThan(packet->GetTimestamp(), this->maxPacketTs))
{
this->maxPacketTs = packet->GetTimestamp();
this->maxPacketMs = DepLibUV::GetTimeMs();
}
return true;
}
void RtpStream::ResetScore(uint8_t score, bool notify)
{
MS_TRACE();
this->scores.clear();
if (this->score != score)
{
auto previousScore = this->score;
this->score = score;
if (previousScore == 0u)
{
this->activeSinceMs = DepLibUV::GetTimeMs();
}
if (notify)
{
this->listener->OnRtpStreamScore(this, score, previousScore);
}
}
}
bool RtpStream::UpdateSeq(RTC::RtpPacket* packet)
{
MS_TRACE();
const uint16_t seq = packet->GetSequenceNumber();
const uint16_t udelta = seq - this->maxSeq;
if (udelta < MaxDropout)
{
if (seq < this->maxSeq)
{
this->cycles += RtpSeqMod;
}
this->maxSeq = seq;
if (Utils::Number<uint32_t>::IsLowerThan(packet->GetTimestamp(), this->maxPacketTs))
{
MS_DEBUG_TAG(
rtp,
"timestamp moved backwards, updating [ssrc:%" PRIu32 ", seq:%" PRIu16
", old maxPacketTs:%" PRIu32 ", new maxPacketTs:%" PRIu32 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
this->maxPacketTs,
packet->GetTimestamp());
this->maxPacketTs = packet->GetTimestamp();
this->maxPacketMs = DepLibUV::GetTimeMs();
}
}
else if (udelta <= RtpSeqMod - MaxMisorder)
{
if (seq == this->badSeq)
{
MS_WARN_TAG(
rtp,
"too bad sequence number, re-syncing RTP [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
InitSeq(seq);
this->maxPacketTs = packet->GetTimestamp();
this->maxPacketMs = DepLibUV::GetTimeMs();
UserOnSequenceNumberReset();
}
else
{
MS_WARN_TAG(
rtp,
"bad sequence number, ignoring packet [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
this->badSeq = (seq + 1) & (RtpSeqMod - 1);
this->packetsDiscarded++;
return false;
}
}
else
{
}
return true;
}
void RtpStream::UpdateScore(uint8_t score)
{
MS_TRACE();
if (this->scores.size() == ScoreHistogramLength)
{
this->scores.erase(this->scores.begin());
}
auto previousScore = this->score;
this->scores.push_back(score);
size_t weight{ 0 };
size_t samples{ 0 };
size_t totalScore{ 0 };
for (auto score : this->scores)
{
weight++;
samples += weight;
totalScore += weight * score;
}
this->score = static_cast<uint8_t>(std::round(static_cast<double>(totalScore) / samples));
if (this->score != previousScore)
{
MS_DEBUG_TAG(
score,
"[added score:%" PRIu8 ", previous computed score:%" PRIu8 ", new computed score:%" PRIu8
"] (calling listener)",
score,
previousScore,
this->score);
if (previousScore == 0u)
{
this->activeSinceMs = DepLibUV::GetTimeMs();
}
this->listener->OnRtpStreamScore(this, this->score, previousScore);
}
else
{
#if MS_LOG_DEV_LEVEL == 3
MS_DEBUG_TAG(
score,
"[added score:%" PRIu8 ", previous computed score:%" PRIu8 ", new computed score:%" PRIu8
"] (no change)",
score,
previousScore,
this->score);
#endif
}
}
void RtpStream::PacketRetransmitted(RTC::RtpPacket* )
{
MS_TRACE();
this->packetsRetransmitted++;
}
void RtpStream::PacketRepaired(RTC::RtpPacket* )
{
MS_TRACE();
this->packetsRepaired++;
}
inline void RtpStream::InitSeq(uint16_t seq)
{
MS_TRACE();
this->baseSeq = seq;
this->maxSeq = seq;
this->badSeq = RtpSeqMod + 1; }
flatbuffers::Offset<FBS::RtpStream::Params> RtpStream::Params::FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const
{
MS_TRACE();
return FBS::RtpStream::CreateParamsDirect(
builder,
this->encodingIdx,
this->ssrc,
this->payloadType,
this->mimeType.ToString().c_str(),
this->clockRate,
this->rid.c_str(),
this->cname.c_str(),
this->rtxSsrc != 0 ? flatbuffers::Optional<uint32_t>(this->rtxSsrc) : flatbuffers::nullopt,
this->rtxSsrc != 0 ? flatbuffers::Optional<uint8_t>(this->rtxPayloadType) : flatbuffers::nullopt,
this->useNack,
this->usePli,
this->useFir,
this->useInBandFec,
this->useDtx,
this->spatialLayers,
this->temporalLayers);
}
}