#define MS_CLASS "RTC::RTP::RtpStreamSend"
#include "RTC/RTP/RtpStreamSend.hpp"
#ifdef MS_LIBURING_SUPPORTED
#include "DepLibUring.hpp"
#endif
#include "Logger.hpp"
#include "Utils.hpp"
#include "RTC/Consts.hpp"
#include "RTC/RtpDictionaries.hpp"
#include <vector>
namespace RTC
{
namespace RTP
{
static constexpr size_t RetransmissionBufferMaxItems{ 2500u };
static constexpr size_t MaxRequestedPackets{ 17u };
thread_local std::vector<RTP::RetransmissionBuffer::Item*> RetransmissionContainer(
MaxRequestedPackets + 1);
static constexpr uint32_t DefaultRtt{ 100u };
const uint32_t RtpStreamSend::MaxRetransmissionDelayForVideoMs{ 2000u };
const uint32_t RtpStreamSend::MaxRetransmissionDelayForAudioMs{ 1000u };
RtpStreamSend::RtpStreamSend(
RTP::RtpStreamSend::Listener* listener, RTP::RtpStream::Params& params, std::string& mid)
: RTP::RtpStream::RtpStream(listener, params, 10),
mid(mid),
transmissionCounter( true)
{
MS_TRACE();
if (this->params.useNack)
{
uint32_t maxRetransmissionDelayMs{ 0 };
switch (params.mimeType.type)
{
case RTC::RtpCodecMimeType::Type::VIDEO:
{
maxRetransmissionDelayMs = RtpStreamSend::MaxRetransmissionDelayForVideoMs;
break;
}
case RTC::RtpCodecMimeType::Type::AUDIO:
{
maxRetransmissionDelayMs = RtpStreamSend::MaxRetransmissionDelayForAudioMs;
break;
}
}
this->retransmissionBuffer = new RTC::RTP::RetransmissionBuffer(
RetransmissionBufferMaxItems, maxRetransmissionDelayMs, params.clockRate);
}
}
RtpStreamSend::~RtpStreamSend()
{
MS_TRACE();
delete this->retransmissionBuffer;
this->retransmissionBuffer = nullptr;
}
flatbuffers::Offset<FBS::RtpStream::Stats> RtpStreamSend::FillBufferStats(
flatbuffers::FlatBufferBuilder& builder)
{
MS_TRACE();
const uint64_t nowMs = DepLibUV::GetTimeMs();
auto baseStats = RTP::RtpStream::FillBufferStats(builder);
auto stats = FBS::RtpStream::CreateSendStats(
builder,
baseStats,
this->transmissionCounter.GetPacketCount(),
this->transmissionCounter.GetBytes(),
this->transmissionCounter.GetBitrate(nowMs));
return FBS::RtpStream::CreateStats(builder, FBS::RtpStream::StatsData::SendStats, stats.Union());
}
void RtpStreamSend::SetRtx(uint8_t payloadType, uint32_t ssrc)
{
MS_TRACE();
RTP::RtpStream::SetRtx(payloadType, ssrc);
this->rtxSeq = Utils::Crypto::GetRandomUInt<uint16_t>(0u, 0xFFFF);
}
RtpStreamSend::ReceivePacketResult RtpStreamSend::ReceivePacket(
RTP::Packet* packet, const RTP::SharedPacket& sharedPacket)
{
MS_TRACE();
MS_ASSERT(
packet->GetSsrc() == this->params.ssrc, "RTP packet SSRC does not match the encodings SSRC");
if (!RtpStream::ReceiveStreamPacket(packet))
{
return ReceivePacketResult::DISCARDED;
}
bool stored{ false };
if (this->retransmissionBuffer)
{
if (this->retransmissionBuffer->Get(packet->GetSequenceNumber()))
{
MS_DEBUG_DEV(
"packet already stored in retransmission buffer [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
return ReceivePacketResult::DISCARDED;
}
stored = this->retransmissionBuffer->Insert(packet, sharedPacket);
}
this->transmissionCounter.Update(packet);
return stored ? ReceivePacketResult::ACCEPTED_AND_STORED
: ReceivePacketResult::ACCEPTED_AND_NOT_STORED;
}
void RtpStreamSend::ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket)
{
MS_TRACE();
this->nackCount++;
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
DepLibUring::SetActive();
}
#endif
for (auto it = nackPacket->Begin(); it != nackPacket->End(); ++it)
{
const RTC::RTCP::FeedbackRtpNackItem* item = *it;
this->nackPacketCount += item->CountRequestedPackets();
FillRetransmissionContainer(item->GetPacketId(), item->GetLostPacketBitmask());
for (auto* item : RetransmissionContainer)
{
if (!item)
{
break;
}
MS_ASSERT(
item->sharedPacket.HasPacket(),
"item in retransmission container doesn't contain a packet [ssrc:%" PRIu32
", seq:%" PRIu16 ", timestamp:%" PRIu32 "]",
item->ssrc,
item->sequenceNumber,
item->timestamp);
auto* packet = item->sharedPacket.GetPacket();
auto origSsrc = packet->GetSsrc();
auto origSeq = packet->GetSequenceNumber();
auto origTimestamp = packet->GetTimestamp();
auto origMarker = packet->HasMarker();
std::string origMid;
packet->SetSsrc(item->ssrc);
packet->SetSequenceNumber(item->sequenceNumber);
packet->SetTimestamp(item->timestamp);
packet->SetMarker(item->marker);
if (item->encoder != nullptr)
{
packet->EncodePayload(item->encoder.get());
}
if (!this->mid.empty())
{
packet->ReadMid(origMid);
packet->UpdateMid(this->mid);
}
if (HasRtx())
{
this->rtxSeq++;
packet->RtxEncode(this->params.rtxPayloadType, this->params.rtxSsrc, this->rtxSeq);
}
static_cast<RTP::RtpStreamSend::Listener*>(this->listener)
->OnRtpStreamRetransmitRtpPacket(this, packet);
RTP::RtpStream::PacketRetransmitted(packet);
if (item->sentTimes == 1)
{
RTP::RtpStream::PacketRepaired(packet);
}
if (HasRtx())
{
packet->RtxDecode(RtpStream::GetPayloadType(), item->ssrc);
}
if (!this->mid.empty())
{
packet->UpdateMid(origMid);
}
if (item->encoder != nullptr)
{
packet->RestorePayload();
}
packet->SetSsrc(origSsrc);
packet->SetSequenceNumber(origSeq);
packet->SetTimestamp(origTimestamp);
packet->SetMarker(origMarker);
}
}
#ifdef MS_LIBURING_SUPPORTED
if (DepLibUring::IsEnabled())
{
DepLibUring::Submit();
}
#endif
}
void RtpStreamSend::ReceiveKeyFrameRequest(RTC::RTCP::FeedbackPs::MessageType messageType)
{
MS_TRACE();
switch (messageType)
{
case RTC::RTCP::FeedbackPs::MessageType::PLI:
{
this->pliCount++;
break;
}
case RTC::RTCP::FeedbackPs::MessageType::FIR:
{
this->firCount++;
break;
}
default:;
}
}
void RtpStreamSend::ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report)
{
MS_TRACE();
const uint64_t nowMs = DepLibUV::GetTimeMs();
auto ntp = Utils::Time::TimeMs2Ntp(nowMs);
uint32_t compactNtp = (ntp.seconds & 0x0000FFFF) << 16;
compactNtp |= (ntp.fractions & 0xFFFF0000) >> 16;
const uint32_t lastSr = report->GetLastSenderReport();
const uint32_t dlsr = report->GetDelaySinceLastSenderReport();
uint32_t rtt{ 0 };
if (lastSr && dlsr && (compactNtp > dlsr + lastSr))
{
rtt = compactNtp - dlsr - lastSr;
}
this->rtt = static_cast<float>(rtt >> 16) * 1000;
this->rtt += (static_cast<float>(rtt & 0x0000FFFF) / 65536) * 1000;
this->rtt = std::max(this->rtt, 0.0f);
this->packetsLost = report->GetTotalLost();
this->fractionLost = report->GetFractionLost();
this->jitter = static_cast<float>(report->GetJitter());
UpdateScore(report);
}
void RtpStreamSend::ReceiveRtcpXrReceiverReferenceTime(RTC::RTCP::ReceiverReferenceTime* report)
{
MS_TRACE();
this->lastRrReceivedMs = DepLibUV::GetTimeMs();
this->lastRrTimestamp = report->GetNtpSec() << 16;
this->lastRrTimestamp += report->GetNtpFrac() >> 16;
}
RTC::RTCP::SenderReport* RtpStreamSend::GetRtcpSenderReport(uint64_t nowMs)
{
MS_TRACE();
if (this->transmissionCounter.GetPacketCount() == 0u)
{
return nullptr;
}
auto ntp = Utils::Time::TimeMs2Ntp(nowMs);
auto* report = new RTC::RTCP::SenderReport();
auto diffMs = nowMs - this->maxPacketMs;
auto diffTs = diffMs * GetClockRate() / 1000;
report->SetSsrc(GetSsrc());
report->SetPacketCount(this->transmissionCounter.GetPacketCount());
report->SetOctetCount(this->transmissionCounter.GetBytes());
report->SetNtpSec(ntp.seconds);
report->SetNtpFrac(ntp.fractions);
report->SetRtpTs(this->maxPacketTs + diffTs);
this->lastSenderReportNtpMs = nowMs;
this->lastSenderReportTs = this->maxPacketTs + diffTs;
return report;
}
RTC::RTCP::DelaySinceLastRr::SsrcInfo* RtpStreamSend::GetRtcpXrDelaySinceLastRrSsrcInfo(uint64_t nowMs)
{
MS_TRACE();
if (this->lastRrReceivedMs == 0u)
{
return nullptr;
}
auto delayMs = static_cast<uint32_t>(nowMs - this->lastRrReceivedMs);
uint32_t dlrr = (delayMs / 1000) << 16;
dlrr |= uint32_t{ (delayMs % 1000) * 65536 / 1000 };
auto* ssrcInfo = new RTC::RTCP::DelaySinceLastRr::SsrcInfo();
ssrcInfo->SetSsrc(GetSsrc());
ssrcInfo->SetDelaySinceLastReceiverReport(dlrr);
ssrcInfo->SetLastReceiverReport(this->lastRrTimestamp);
return ssrcInfo;
}
RTC::RTCP::SdesChunk* RtpStreamSend::GetRtcpSdesChunk()
{
MS_TRACE();
const auto& cname = GetCname();
auto* sdesChunk = new RTC::RTCP::SdesChunk(GetSsrc());
auto* sdesItem =
new RTC::RTCP::SdesItem(RTC::RTCP::SdesItem::Type::CNAME, cname.size(), cname.c_str());
sdesChunk->AddItem(sdesItem);
return sdesChunk;
}
void RtpStreamSend::Pause()
{
MS_TRACE();
if (this->retransmissionBuffer)
{
this->retransmissionBuffer->Clear();
}
this->jitter = 0;
}
void RtpStreamSend::Resume()
{
MS_TRACE();
}
uint32_t RtpStreamSend::GetBitrate(
uint64_t , uint8_t , uint8_t )
{
MS_TRACE();
MS_ABORT("invalid method call");
}
uint32_t RtpStreamSend::GetSpatialLayerBitrate(uint64_t , uint8_t )
{
MS_TRACE();
MS_ABORT("invalid method call");
}
uint32_t RtpStreamSend::GetLayerBitrate(
uint64_t , uint8_t , uint8_t )
{
MS_TRACE();
MS_ABORT("invalid method call");
}
void RtpStreamSend::FillRetransmissionContainer(uint16_t seq, uint16_t bitmask)
{
MS_TRACE();
RetransmissionContainer[0] = nullptr;
if (!this->retransmissionBuffer)
{
MS_WARN_TAG(rtx, "NACK not supported");
return;
}
const uint64_t nowMs = DepLibUV::GetTimeMs();
const uint16_t rtt = (this->rtt > 0.0f ? this->rtt : DefaultRtt);
uint16_t currentSeq = seq;
bool requested{ true };
size_t containerIdx{ 0 };
const uint16_t origBitmask = bitmask;
uint16_t sentBitmask{ 0b0000000000000000 };
bool isFirstPacket{ true };
bool firstPacketSent{ false };
uint8_t bitmaskCounter{ 0 };
while (requested || bitmask != 0)
{
bool sent = false;
if (requested)
{
auto* item = this->retransmissionBuffer->Get(currentSeq);
if (!item)
{
}
else if (item->resentAtMs != 0u && nowMs - item->resentAtMs <= static_cast<uint64_t>(rtt))
{
MS_DEBUG_TAG(
rtx,
"ignoring retransmission for a packet already resent in the last RTT ms "
"[seq:%" PRIu16 ", rtt:%" PRIu16 "]",
item->sequenceNumber,
rtt);
}
else
{
item->resentAtMs = nowMs;
item->sentTimes++;
RetransmissionContainer[containerIdx++] = item;
sent = true;
if (isFirstPacket)
{
firstPacketSent = true;
}
}
}
requested = (bitmask & 1) != 0;
bitmask >>= 1;
currentSeq++;
if (!isFirstPacket)
{
sentBitmask |= (sent ? 1 : 0) << bitmaskCounter;
bitmaskCounter++;
}
else
{
isFirstPacket = false;
}
}
if (!firstPacketSent || origBitmask != sentBitmask)
{
MS_WARN_DEV(
"could not resend all packets [seq:%" PRIu16
", first:%s, "
"bitmask:" MS_UINT16_TO_BINARY_PATTERN ", sent bitmask:" MS_UINT16_TO_BINARY_PATTERN "]",
seq,
firstPacketSent ? "yes" : "no",
MS_UINT16_TO_BINARY(origBitmask),
MS_UINT16_TO_BINARY(sentBitmask));
}
else
{
MS_DEBUG_DEV(
"all packets resent [seq:%" PRIu16 ", bitmask:" MS_UINT16_TO_BINARY_PATTERN "]",
seq,
MS_UINT16_TO_BINARY(origBitmask));
}
RetransmissionContainer[containerIdx] = nullptr;
}
void RtpStreamSend::UpdateScore(RTC::RTCP::ReceiverReport* report)
{
MS_TRACE();
auto totalSent = this->transmissionCounter.GetPacketCount();
auto sent = totalSent - this->sentPriorScore;
this->sentPriorScore = totalSent;
const int32_t totalLost = report->GetTotalLost() > 0 ? report->GetTotalLost() : 0;
uint32_t lost;
if (totalLost < this->lostPriorScore)
{
lost = 0;
}
else
{
lost = totalLost - this->lostPriorScore;
}
this->lostPriorScore = totalLost;
auto totalRepaired = this->packetsRepaired;
uint32_t repaired = totalRepaired - this->repairedPriorScore;
this->repairedPriorScore = totalRepaired;
auto totatRetransmitted = this->packetsRetransmitted;
const uint32_t retransmitted = totatRetransmitted - this->retransmittedPriorScore;
this->retransmittedPriorScore = totatRetransmitted;
if (sent == 0)
{
RTP::RtpStream::UpdateScore(10);
return;
}
lost = std::min<size_t>(lost, sent);
repaired = std::min(repaired, lost);
#if MS_LOG_DEV_LEVEL == 3
MS_DEBUG_TAG(
score,
"[totalSent:%zu, totalLost:%" PRIi32 ", totalRepaired:%zu",
totalSent,
totalLost,
totalRepaired);
MS_DEBUG_TAG(
score,
"fixed values [sent:%zu, lost:%" PRIu32 ", repaired:%" PRIu32 ", retransmitted:%" PRIu32,
sent,
lost,
repaired,
retransmitted);
#endif
auto repairedRatio = static_cast<float>(repaired) / static_cast<float>(sent);
auto repairedWeight = std::pow(1 / (repairedRatio + 1), 4);
MS_ASSERT(retransmitted >= repaired, "repaired packets cannot be more than retransmitted ones");
if (retransmitted > 0)
{
repairedWeight *= static_cast<float>(repaired) / retransmitted;
}
lost -= repaired * repairedWeight;
auto deliveredRatio = static_cast<float>(sent - lost) / static_cast<float>(sent);
auto score = static_cast<uint8_t>(std::round(std::pow(deliveredRatio, 4) * 10));
#if MS_LOG_DEV_LEVEL == 3
MS_DEBUG_TAG(
score,
"[deliveredRatio:%f, repairedRatio:%f, repairedWeight:%f, new lost:%" PRIu32
", score:%" PRIu8 "]",
deliveredRatio,
repairedRatio,
repairedWeight,
lost,
score);
#endif
RTP::RtpStream::UpdateScore(score);
}
void RtpStreamSend::UserOnSequenceNumberReset()
{
MS_TRACE();
if (this->retransmissionBuffer)
{
this->retransmissionBuffer->Clear();
}
}
} }