#define MS_CLASS "RTC::RtpStreamSend"
#include "RTC/RtpStreamSend.hpp"
#include "Logger.hpp"
#include "Utils.hpp"
#include "RTC/SeqManager.hpp"
namespace RTC
{
static constexpr size_t MaxRequestedPackets{ 17u };
thread_local static std::vector<RTC::RtpStreamSend::StorageItem*> RetransmissionContainer(
MaxRequestedPackets + 1);
static constexpr uint32_t DefaultRtt{ 100u };
static constexpr uint16_t MaxSeq = std::numeric_limits<uint16_t>::max();
const uint32_t RtpStreamSend::MinRetransmissionDelay{ 200u };
const uint32_t RtpStreamSend::MaxRetransmissionDelay{ 2000u };
void RtpStreamSend::StorageItem::Reset()
{
MS_TRACE();
this->packet.reset();
this->ssrc = 0;
this->sequenceNumber = 0;
this->timestamp = 0;
this->resentAtMs = 0;
this->sentTimes = 0;
}
RtpStreamSend::StorageItem* RtpStreamSend::StorageItemBuffer::GetFirst() const
{
return this->Get(this->startSeq);
}
RtpStreamSend::StorageItem* RtpStreamSend::StorageItemBuffer::Get(uint16_t seq) const
{
if (RTC::SeqManager<uint16_t>::IsSeqLowerThan(seq, this->startSeq))
return nullptr;
auto idx{ static_cast<uint16_t>(seq - this->startSeq) };
if (this->buffer.empty() || idx > static_cast<uint16_t>(this->buffer.size() - 1))
return nullptr;
return this->buffer.at(idx);
}
size_t RtpStreamSend::StorageItemBuffer::GetBufferSize() const
{
return this->buffer.size();
}
void RtpStreamSend::StorageItemBuffer::Insert(uint16_t seq, StorageItem* storageItem)
{
if (this->buffer.empty())
{
this->startSeq = seq;
this->buffer.push_back(storageItem);
}
else if (RTC::SeqManager<uint16_t>::IsSeqHigherThan(seq, this->startSeq))
{
auto idx{ static_cast<uint16_t>(seq - this->startSeq) };
if (idx <= static_cast<uint16_t>(this->buffer.size() - 1))
{
MS_ASSERT(this->buffer[idx] == nullptr, "Must insert into empty slot");
this->buffer[idx] = storageItem;
}
else
{
auto addToBack = static_cast<uint16_t>(seq - (this->startSeq + this->buffer.size() - 1));
for (uint16_t i{ 1 }; i < addToBack; ++i)
this->buffer.push_back(nullptr);
this->buffer.push_back(storageItem);
}
}
else
{
auto addToFront = static_cast<uint16_t>(this->startSeq - seq);
for (uint16_t i{ 1 }; i < addToFront; ++i)
this->buffer.push_front(nullptr);
this->buffer.push_front(storageItem);
this->startSeq = seq;
}
MS_ASSERT(
this->buffer.size() <= MaxSeq,
"StorageItemBuffer contains more than %" PRIu16 " entries",
MaxSeq);
}
void RtpStreamSend::StorageItemBuffer::RemoveFirst()
{
MS_ASSERT(!this->buffer.empty(), "buffer is empty");
auto* storageItem = this->buffer[0];
delete storageItem;
this->buffer[0] = nullptr;
while (!this->buffer.empty() && this->buffer.front() == nullptr)
{
this->buffer.pop_front();
this->startSeq++;
}
}
void RtpStreamSend::StorageItemBuffer::Clear()
{
for (auto* storageItem : this->buffer)
{
if (!storageItem)
continue;
storageItem->Reset();
delete storageItem;
}
this->buffer.clear();
this->startSeq = 0;
}
RtpStreamSend::StorageItemBuffer::~StorageItemBuffer()
{
Clear();
}
RtpStreamSend::RtpStreamSend(
RTC::RtpStreamSend::Listener* listener, RTC::RtpStream::Params& params, std::string& mid)
: RTC::RtpStream::RtpStream(listener, params, 10), mid(mid),
retransmissionBufferSize(RtpStreamSend::MaxRetransmissionDelay)
{
MS_TRACE();
}
RtpStreamSend::~RtpStreamSend()
{
MS_TRACE();
ClearBuffer();
}
void RtpStreamSend::FillJsonStats(json& jsonObject)
{
MS_TRACE();
uint64_t nowMs = DepLibUV::GetTimeMs();
RTC::RtpStream::FillJsonStats(jsonObject);
jsonObject["type"] = "outbound-rtp";
jsonObject["packetCount"] = this->transmissionCounter.GetPacketCount();
jsonObject["byteCount"] = this->transmissionCounter.GetBytes();
jsonObject["bitrate"] = this->transmissionCounter.GetBitrate(nowMs);
}
void RtpStreamSend::SetRtx(uint8_t payloadType, uint32_t ssrc)
{
MS_TRACE();
RTC::RtpStream::SetRtx(payloadType, ssrc);
this->rtxSeq = Utils::Crypto::GetRandomUInt(0u, 0xFFFF);
}
bool RtpStreamSend::ReceivePacket(std::shared_ptr<RTC::RtpPacket> packet)
{
MS_TRACE();
if (!RtpStream::ReceiveStreamPacket(packet.get()))
return false;
if (this->params.useNack)
StorePacket(packet);
this->transmissionCounter.Update(packet.get());
return true;
}
void RtpStreamSend::ReceiveNack(RTC::RTCP::FeedbackRtpNackPacket* nackPacket)
{
MS_TRACE();
this->nackCount++;
for (auto it = nackPacket->Begin(); it != nackPacket->End(); ++it)
{
RTC::RTCP::FeedbackRtpNackItem* item = *it;
this->nackPacketCount += item->CountRequestedPackets();
FillRetransmissionContainer(item->GetPacketId(), item->GetLostPacketBitmask());
for (auto* storageItem : RetransmissionContainer)
{
if (!storageItem)
break;
auto packet = storageItem->packet;
static_cast<RTC::RtpStreamSend::Listener*>(this->listener)
->OnRtpStreamRetransmitRtpPacket(this, packet.get());
RTC::RtpStream::PacketRetransmitted(packet.get());
if (storageItem->sentTimes == 1)
RTC::RtpStream::PacketRepaired(packet.get());
if (HasRtx())
{
packet->RtxDecode(RtpStream::GetPayloadType(), storageItem->ssrc);
}
}
}
}
void RtpStreamSend::ReceiveKeyFrameRequest(RTC::RTCP::FeedbackPs::MessageType messageType)
{
MS_TRACE();
switch (messageType)
{
case RTC::RTCP::FeedbackPs::MessageType::PLI:
this->pliCount++;
break;
case RTC::RTCP::FeedbackPs::MessageType::FIR:
this->firCount++;
break;
default:;
}
}
void RtpStreamSend::ReceiveRtcpReceiverReport(RTC::RTCP::ReceiverReport* report)
{
MS_TRACE();
uint64_t nowMs = DepLibUV::GetTimeMs();
auto ntp = Utils::Time::TimeMs2Ntp(nowMs);
uint32_t compactNtp = (ntp.seconds & 0x0000FFFF) << 16;
compactNtp |= (ntp.fractions & 0xFFFF0000) >> 16;
uint32_t lastSr = report->GetLastSenderReport();
uint32_t dlsr = report->GetDelaySinceLastSenderReport();
uint32_t rtt{ 0 };
if (lastSr && dlsr && (compactNtp > dlsr + lastSr))
rtt = compactNtp - dlsr - lastSr;
this->rtt = static_cast<float>(rtt >> 16) * 1000;
this->rtt += (static_cast<float>(rtt & 0x0000FFFF) / 65536) * 1000;
if (this->rtt > 0.0f)
{
this->hasRtt = true;
}
auto newRetransmissionBufferSize = static_cast<uint32_t>(this->rtt + 100.0);
auto avgRetransmissionBufferSize =
(this->retransmissionBufferSize * 7 + newRetransmissionBufferSize) / 8;
this->retransmissionBufferSize = std::max(
std::min(avgRetransmissionBufferSize, RtpStreamSend::MaxRetransmissionDelay),
RtpStreamSend::MinRetransmissionDelay);
this->packetsLost = report->GetTotalLost();
this->fractionLost = report->GetFractionLost();
UpdateScore(report);
}
void RtpStreamSend::ReceiveRtcpXrReceiverReferenceTime(RTC::RTCP::ReceiverReferenceTime* report)
{
MS_TRACE();
this->lastRrReceivedMs = DepLibUV::GetTimeMs();
this->lastRrTimestamp = report->GetNtpSec() << 16;
this->lastRrTimestamp += report->GetNtpFrac() >> 16;
}
RTC::RTCP::SenderReport* RtpStreamSend::GetRtcpSenderReport(uint64_t nowMs)
{
MS_TRACE();
if (this->transmissionCounter.GetPacketCount() == 0u)
return nullptr;
auto ntp = Utils::Time::TimeMs2Ntp(nowMs);
auto* report = new RTC::RTCP::SenderReport();
auto diffMs = nowMs - this->maxPacketMs;
auto diffTs = diffMs * GetClockRate() / 1000;
report->SetSsrc(GetSsrc());
report->SetPacketCount(this->transmissionCounter.GetPacketCount());
report->SetOctetCount(this->transmissionCounter.GetBytes());
report->SetNtpSec(ntp.seconds);
report->SetNtpFrac(ntp.fractions);
report->SetRtpTs(this->maxPacketTs + diffTs);
this->lastSenderReportNtpMs = nowMs;
this->lastSenderReportTs = this->maxPacketTs + diffTs;
return report;
}
RTC::RTCP::DelaySinceLastRr::SsrcInfo* RtpStreamSend::GetRtcpXrDelaySinceLastRr(uint64_t nowMs)
{
MS_TRACE();
if (this->lastRrReceivedMs == 0u)
return nullptr;
auto delayMs = static_cast<uint32_t>(nowMs - this->lastRrReceivedMs);
uint32_t dlrr = (delayMs / 1000) << 16;
dlrr |= uint32_t{ (delayMs % 1000) * 65536 / 1000 };
auto* ssrcInfo = new RTC::RTCP::DelaySinceLastRr::SsrcInfo();
ssrcInfo->SetSsrc(GetSsrc());
ssrcInfo->SetDelaySinceLastReceiverReport(dlrr);
ssrcInfo->SetLastReceiverReport(this->lastRrTimestamp);
return ssrcInfo;
}
RTC::RTCP::SdesChunk* RtpStreamSend::GetRtcpSdesChunk()
{
MS_TRACE();
const auto& cname = GetCname();
auto* sdesChunk = new RTC::RTCP::SdesChunk(GetSsrc());
auto* sdesItem =
new RTC::RTCP::SdesItem(RTC::RTCP::SdesItem::Type::CNAME, cname.size(), cname.c_str());
sdesChunk->AddItem(sdesItem);
return sdesChunk;
}
void RtpStreamSend::Pause()
{
MS_TRACE();
ClearBuffer();
}
void RtpStreamSend::Resume()
{
MS_TRACE();
}
uint32_t RtpStreamSend::GetBitrate(
uint64_t , uint8_t , uint8_t )
{
MS_TRACE();
MS_ABORT("invalid method call");
}
uint32_t RtpStreamSend::GetSpatialLayerBitrate(uint64_t , uint8_t )
{
MS_TRACE();
MS_ABORT("invalid method call");
}
uint32_t RtpStreamSend::GetLayerBitrate(
uint64_t , uint8_t , uint8_t )
{
MS_TRACE();
MS_ABORT("invalid method call");
}
void RtpStreamSend::StorePacket(std::shared_ptr<RTC::RtpPacket> packet)
{
MS_TRACE();
MS_ASSERT(
packet->GetSsrc() == this->params.ssrc, "RTP packet SSRC does not match the encodings SSRC");
if (packet->GetSize() > RTC::MtuSize)
{
MS_WARN_TAG(
rtp,
"packet too big [ssrc:%" PRIu32 ", seq:%" PRIu16 ", size:%zu]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
packet->GetSize());
return;
}
this->ClearOldPackets(packet.get());
auto seq = packet->GetSequenceNumber();
auto* storageItem = this->storageItemBuffer.Get(seq);
if (storageItem)
{
if (packet->GetTimestamp() == storageItem->timestamp)
return;
storageItem->Reset();
}
else
{
storageItem = new StorageItem();
this->storageItemBuffer.Insert(seq, storageItem);
}
storageItem->packet = packet;
storageItem->ssrc = packet->GetSsrc();
storageItem->sequenceNumber = packet->GetSequenceNumber();
storageItem->timestamp = packet->GetTimestamp();
}
void RtpStreamSend::ClearOldPackets(const RtpPacket* packet)
{
MS_TRACE();
auto packetTs{ packet->GetTimestamp() };
auto clockRate{ this->params.clockRate };
const auto bufferSize = this->storageItemBuffer.GetBufferSize();
for (size_t i{ 0 }; i < bufferSize && this->storageItemBuffer.GetBufferSize() != 0; ++i)
{
auto* firstStorageItem = this->storageItemBuffer.GetFirst();
MS_ASSERT(firstStorageItem, "first storage item is missing");
MS_ASSERT(firstStorageItem->packet, "storage item does not contain original packet");
auto firstPacketTs{ firstStorageItem->timestamp };
uint32_t diffTs{ packetTs - firstPacketTs };
if (RTC::SeqManager<uint32_t>::IsSeqLowerThan(packetTs, firstPacketTs))
break;
if (static_cast<uint32_t>(diffTs * 1000 / clockRate) < this->retransmissionBufferSize)
break;
this->storageItemBuffer.RemoveFirst();
}
}
void RtpStreamSend::ClearBuffer()
{
MS_TRACE();
this->storageItemBuffer.Clear();
}
void RtpStreamSend::FillRetransmissionContainer(uint16_t seq, uint16_t bitmask)
{
MS_TRACE();
RetransmissionContainer[0] = nullptr;
if (!this->params.useNack)
{
MS_WARN_TAG(rtx, "NACK not supported");
return;
}
uint64_t nowMs = DepLibUV::GetTimeMs();
uint16_t rtt = (this->rtt != 0u ? this->rtt : DefaultRtt);
uint16_t currentSeq = seq;
bool requested{ true };
size_t containerIdx{ 0 };
uint16_t origBitmask = bitmask;
uint16_t sentBitmask{ 0b0000000000000000 };
bool isFirstPacket{ true };
bool firstPacketSent{ false };
uint8_t bitmaskCounter{ 0 };
bool tooOldPacketFound{ false };
while (requested || bitmask != 0)
{
bool sent = false;
if (requested)
{
auto* storageItem = this->storageItemBuffer.Get(currentSeq);
std::shared_ptr<RTC::RtpPacket> packet{ nullptr };
uint32_t diffMs;
if (storageItem)
{
packet = storageItem->packet;
packet->SetSsrc(storageItem->ssrc);
packet->SetSequenceNumber(storageItem->sequenceNumber);
packet->SetTimestamp(storageItem->timestamp);
if (!this->mid.empty())
packet->UpdateMid(mid);
uint32_t diffTs = this->maxPacketTs - packet->GetTimestamp();
diffMs = diffTs * 1000 / this->params.clockRate;
}
if (!storageItem)
{
}
else if (diffMs > MaxRetransmissionDelay)
{
if (!tooOldPacketFound)
{
MS_WARN_TAG(
rtx,
"ignoring retransmission for too old packet "
"[seq:%" PRIu16 ", max age:%" PRIu32 "ms, packet age:%" PRIu32 "ms]",
packet->GetSequenceNumber(),
MaxRetransmissionDelay,
diffMs);
tooOldPacketFound = true;
}
}
else if (
storageItem->resentAtMs != 0u &&
nowMs - storageItem->resentAtMs <= static_cast<uint64_t>(rtt)
)
{
MS_DEBUG_TAG(
rtx,
"ignoring retransmission for a packet already resent in the last RTT ms "
"[seq:%" PRIu16 ", rtt:%" PRIu32 "]",
packet->GetSequenceNumber(),
rtt);
}
else
{
if (HasRtx())
{
++this->rtxSeq;
packet->RtxEncode(this->params.rtxPayloadType, this->params.rtxSsrc, this->rtxSeq);
}
storageItem->resentAtMs = nowMs;
storageItem->sentTimes++;
RetransmissionContainer[containerIdx++] = storageItem;
sent = true;
if (isFirstPacket)
firstPacketSent = true;
}
}
requested = (bitmask & 1) != 0;
bitmask >>= 1;
++currentSeq;
if (!isFirstPacket)
{
sentBitmask |= (sent ? 1 : 0) << bitmaskCounter;
++bitmaskCounter;
}
else
{
isFirstPacket = false;
}
}
if (!firstPacketSent || origBitmask != sentBitmask)
{
MS_WARN_DEV(
"could not resend all packets [seq:%" PRIu16
", first:%s, "
"bitmask:" MS_UINT16_TO_BINARY_PATTERN ", sent bitmask:" MS_UINT16_TO_BINARY_PATTERN "]",
seq,
firstPacketSent ? "yes" : "no",
MS_UINT16_TO_BINARY(origBitmask),
MS_UINT16_TO_BINARY(sentBitmask));
}
else
{
MS_DEBUG_DEV(
"all packets resent [seq:%" PRIu16 ", bitmask:" MS_UINT16_TO_BINARY_PATTERN "]",
seq,
MS_UINT16_TO_BINARY(origBitmask));
}
RetransmissionContainer[containerIdx] = nullptr;
}
void RtpStreamSend::UpdateScore(RTC::RTCP::ReceiverReport* report)
{
MS_TRACE();
auto totalSent = this->transmissionCounter.GetPacketCount();
auto sent = totalSent - this->sentPriorScore;
this->sentPriorScore = totalSent;
uint32_t totalLost = report->GetTotalLost() > 0 ? report->GetTotalLost() : 0;
uint32_t lost;
if (totalLost < this->lostPriorScore)
lost = 0;
else
lost = totalLost - this->lostPriorScore;
this->lostPriorScore = totalLost;
auto totalRepaired = this->packetsRepaired;
uint32_t repaired = totalRepaired - this->repairedPriorScore;
this->repairedPriorScore = totalRepaired;
auto totatRetransmitted = this->packetsRetransmitted;
uint32_t retransmitted = totatRetransmitted - this->retransmittedPriorScore;
this->retransmittedPriorScore = totatRetransmitted;
if (sent == 0)
{
RTC::RtpStream::UpdateScore(10);
return;
}
if (lost > sent)
lost = sent;
if (repaired > lost)
repaired = lost;
#if MS_LOG_DEV_LEVEL == 3
MS_DEBUG_TAG(
score,
"[totalSent:%zu, totalLost:%" PRIi32 ", totalRepaired:%zu",
totalSent,
totalLost,
totalRepaired);
MS_DEBUG_TAG(
score,
"fixed values [sent:%zu, lost:%" PRIu32 ", repaired:%" PRIu32 ", retransmitted:%" PRIu32,
sent,
lost,
repaired,
retransmitted);
#endif
auto repairedRatio = static_cast<float>(repaired) / static_cast<float>(sent);
auto repairedWeight = std::pow(1 / (repairedRatio + 1), 4);
MS_ASSERT(retransmitted >= repaired, "repaired packets cannot be more than retransmitted ones");
if (retransmitted > 0)
repairedWeight *= static_cast<float>(repaired) / retransmitted;
lost -= repaired * repairedWeight;
auto deliveredRatio = static_cast<float>(sent - lost) / static_cast<float>(sent);
auto score = static_cast<uint8_t>(std::round(std::pow(deliveredRatio, 4) * 10));
#if MS_LOG_DEV_LEVEL == 3
MS_DEBUG_TAG(
score,
"[deliveredRatio:%f, repairedRatio:%f, repairedWeight:%f, new lost:%" PRIu32 ", score:%" PRIu8
"]",
deliveredRatio,
repairedRatio,
repairedWeight,
lost,
score);
#endif
RtpStream::UpdateScore(score);
}
}