#define MS_CLASS "RTC::RtxStream"
#include "RTC/RtxStream.hpp"
#include "Logger.hpp"
#include "Utils.hpp"
namespace RTC
{
static constexpr uint16_t MaxDropout{ 3000 };
static constexpr uint16_t MaxMisorder{ 1500 };
static constexpr uint32_t RtpSeqMod{ 1 << 16 };
RtxStream::RtxStream(RTC::RtxStream::Params& params) : params(params)
{
MS_TRACE();
MS_ASSERT(
params.mimeType.subtype == RTC::RtpCodecMimeType::Subtype::RTX, "mimeType.subtype is not RTX");
}
RtxStream::~RtxStream()
{
MS_TRACE();
}
flatbuffers::Offset<FBS::RtxStream::RtxDump> RtxStream::FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const
{
MS_TRACE();
auto params = this->params.FillBuffer(builder);
return FBS::RtxStream::CreateRtxDump(builder, params);
}
bool RtxStream::ReceivePacket(RTC::RtpPacket* packet)
{
MS_TRACE();
const uint16_t seq = packet->GetSequenceNumber();
if (!this->started)
{
InitSeq(seq);
this->started = true;
this->maxSeq = seq - 1;
this->maxPacketTs = packet->GetTimestamp();
this->maxPacketMs = DepLibUV::GetTimeMs();
}
if (!UpdateSeq(packet))
{
MS_WARN_TAG(
rtx,
"invalid packet [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
return false;
}
if (Utils::Number<uint32_t>::IsHigherThan(packet->GetTimestamp(), this->maxPacketTs))
{
this->maxPacketTs = packet->GetTimestamp();
this->maxPacketMs = DepLibUV::GetTimeMs();
}
this->packetsCount++;
return true;
}
RTC::RTCP::ReceiverReport* RtxStream::GetRtcpReceiverReport()
{
MS_TRACE();
auto* report = new RTC::RTCP::ReceiverReport();
report->SetSsrc(GetSsrc());
const uint32_t prevPacketsLost = this->packetsLost;
auto expected = GetExpectedPackets();
if (expected > this->packetsCount)
{
this->packetsLost = expected - this->packetsCount;
}
else
{
this->packetsLost = 0u;
}
const uint32_t expectedInterval = expected - this->expectedPrior;
this->expectedPrior = expected;
const uint32_t receivedInterval = this->packetsCount - this->receivedPrior;
this->receivedPrior = this->packetsCount;
const int32_t lostInterval = expectedInterval - receivedInterval;
if (expectedInterval == 0 || lostInterval <= 0)
{
this->fractionLost = 0;
}
else
{
this->fractionLost = std::round((static_cast<double>(lostInterval << 8) / expectedInterval));
}
this->reportedPacketLost += (this->packetsLost - prevPacketsLost);
report->SetTotalLost(this->reportedPacketLost);
report->SetFractionLost(this->fractionLost);
report->SetLastSeq(static_cast<uint32_t>(this->maxSeq) + this->cycles);
report->SetJitter(0);
if (this->lastSrReceived != 0)
{
const uint32_t delayMs = DepLibUV::GetTimeMs() - this->lastSrReceived;
uint32_t dlsr = (delayMs / 1000) << 16;
dlsr |= uint32_t{ (delayMs % 1000) * 65536 / 1000 };
report->SetDelaySinceLastSenderReport(dlsr);
report->SetLastSenderReport(this->lastSrTimestamp);
}
else
{
report->SetDelaySinceLastSenderReport(0);
report->SetLastSenderReport(0);
}
return report;
}
void RtxStream::ReceiveRtcpSenderReport(RTC::RTCP::SenderReport* report)
{
MS_TRACE();
this->lastSrReceived = DepLibUV::GetTimeMs();
this->lastSrTimestamp = report->GetNtpSec() << 16;
this->lastSrTimestamp += report->GetNtpFrac() >> 16;
}
bool RtxStream::UpdateSeq(RTC::RtpPacket* packet)
{
MS_TRACE();
const uint16_t seq = packet->GetSequenceNumber();
const uint16_t udelta = seq - this->maxSeq;
if (udelta < MaxDropout)
{
if (seq < this->maxSeq)
{
this->cycles += RtpSeqMod;
}
this->maxSeq = seq;
if (Utils::Number<uint32_t>::IsLowerThan(packet->GetTimestamp(), this->maxPacketTs))
{
MS_DEBUG_TAG(
rtx,
"timestamp moved backwards, updating [ssrc:%" PRIu32 ", seq:%" PRIu16
", old maxPacketTs:%" PRIu32 ", new maxPacketTs:%" PRIu32 "]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
this->maxPacketTs,
packet->GetTimestamp());
this->maxPacketTs = packet->GetTimestamp();
this->maxPacketMs = DepLibUV::GetTimeMs();
}
}
else if (udelta <= RtpSeqMod - MaxMisorder)
{
if (seq == this->badSeq)
{
MS_WARN_TAG(
rtx,
"too bad sequence number, re-syncing RTP [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
InitSeq(seq);
this->maxPacketTs = packet->GetTimestamp();
this->maxPacketMs = DepLibUV::GetTimeMs();
}
else
{
MS_WARN_TAG(
rtx,
"bad sequence number, ignoring packet [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
this->badSeq = (seq + 1) & (RtpSeqMod - 1);
this->packetsDiscarded++;
return false;
}
}
else
{
}
return true;
}
inline void RtxStream::InitSeq(uint16_t seq)
{
MS_TRACE();
this->baseSeq = seq;
this->maxSeq = seq;
this->badSeq = RtpSeqMod + 1; }
flatbuffers::Offset<FBS::RtxStream::Params> RtxStream::Params::FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const
{
MS_TRACE();
return FBS::RtxStream::CreateParamsDirect(
builder,
this->ssrc,
this->payloadType,
this->mimeType.ToString().c_str(),
this->clockRate,
this->rrid.c_str(),
this->cname.c_str());
}
}