#define MS_CLASS "RTC::RtpRetransmissionBuffer"
#include "RTC/RtpRetransmissionBuffer.hpp"
#include "Logger.hpp"
#include "Utils.hpp"
#include "RTC/SeqManager.hpp"
namespace RTC
{
RtpRetransmissionBuffer::Item* RtpRetransmissionBuffer::FillItem(
RtpRetransmissionBuffer::Item* item,
RTC::RtpPacket* packet,
const RTC::SharedRtpPacket& sharedPacket)
{
MS_TRACE();
item->sharedPacket = sharedPacket;
item->encoder = packet->GetPayloadEncoder();
item->ssrc = packet->GetSsrc();
item->sequenceNumber = packet->GetSequenceNumber();
item->timestamp = packet->GetTimestamp();
return item;
}
RtpRetransmissionBuffer::RtpRetransmissionBuffer(
uint16_t maxItems, uint32_t maxRetransmissionDelayMs, uint32_t clockRate)
: maxItems(maxItems), maxRetransmissionDelayMs(maxRetransmissionDelayMs), clockRate(clockRate)
{
MS_TRACE();
MS_ASSERT(maxItems > 0u, "maxItems must be greater than 0");
}
RtpRetransmissionBuffer::~RtpRetransmissionBuffer()
{
MS_TRACE();
Clear();
}
RtpRetransmissionBuffer::Item* RtpRetransmissionBuffer::Get(uint16_t seq) const
{
MS_TRACE();
const auto* oldestItem = GetOldest();
if (!oldestItem)
{
return nullptr;
}
if (RTC::SeqManager<uint16_t>::IsSeqLowerThan(seq, oldestItem->sequenceNumber))
{
return nullptr;
}
const uint16_t idx = seq - oldestItem->sequenceNumber;
if (static_cast<size_t>(idx) > this->buffer.size() - 1)
{
return nullptr;
}
return this->buffer.at(idx);
}
bool RtpRetransmissionBuffer::Insert(RTC::RtpPacket* packet, const RTC::SharedRtpPacket& sharedPacket)
{
MS_TRACE();
const auto ssrc = packet->GetSsrc();
const auto seq = packet->GetSequenceNumber();
const auto timestamp = packet->GetTimestamp();
MS_DEBUG_DEV("packet [seq:%" PRIu16 ", timestamp:%" PRIu32 "]", seq, timestamp);
if (this->buffer.empty())
{
MS_DEBUG_DEV("buffer empty [seq:%" PRIu16 ", timestamp:%" PRIu32 "]", seq, timestamp);
auto* item = new Item();
this->buffer.push_back(RtpRetransmissionBuffer::FillItem(item, packet, sharedPacket));
return true;
}
auto* oldestItem = GetOldest();
auto* newestItem = GetNewest();
if (
RTC::SeqManager<uint16_t>::IsSeqLowerThan(seq, newestItem->sequenceNumber) &&
Utils::Number<uint32_t>::IsHigherThan(timestamp, newestItem->timestamp))
{
MS_WARN_TAG(
rtp,
"packet has lower seq but higher timestamp than newest packet in the buffer, emptying the buffer [ssrc:%" PRIu32
", seq:%" PRIu16 ", timestamp:%" PRIu32 "]",
ssrc,
seq,
timestamp);
Clear();
auto* item = new Item();
this->buffer.push_back(RtpRetransmissionBuffer::FillItem(item, packet, sharedPacket));
return true;
}
auto newestTimestamp = Utils::Number<uint32_t>::IsHigherThan(timestamp, newestItem->timestamp)
? timestamp
: newestItem->timestamp;
if (ClearTooOldByTimestamp(newestTimestamp))
{
if (this->buffer.empty())
{
MS_WARN_TAG(
rtp,
"buffer empty after clearing too old packets [seq:%" PRIu16 ", timestamp:%" PRIu32 "]",
seq,
timestamp);
auto* item = new Item();
this->buffer.push_back(RtpRetransmissionBuffer::FillItem(item, packet, sharedPacket));
return true;
}
oldestItem = GetOldest();
newestItem = GetNewest();
}
MS_ASSERT(oldestItem != nullptr, "oldest item doesn't exist");
MS_ASSERT(newestItem != nullptr, "newest item doesn't exist");
if (RTC::SeqManager<uint16_t>::IsSeqHigherThan(seq, newestItem->sequenceNumber))
{
MS_DEBUG_DEV("packet in order [seq:%" PRIu16 ", timestamp:%" PRIu32 "]", seq, timestamp);
if (Utils::Number<uint32_t>::IsLowerThan(timestamp, newestItem->timestamp))
{
MS_WARN_TAG(
rtp,
"packet has higher seq but lower timestamp than newest packet in the buffer, discarding it [ssrc:%" PRIu32
", seq:%" PRIu16 ", timestamp:%" PRIu32 "]",
ssrc,
seq,
timestamp);
return false;
}
uint16_t numBlankSlots = seq - newestItem->sequenceNumber - 1;
if (this->buffer.size() + numBlankSlots + 1 > this->maxItems)
{
const uint16_t numItemsToRemove = this->buffer.size() + numBlankSlots + 1 - this->maxItems;
if (numItemsToRemove > this->buffer.size() - 1)
{
MS_WARN_TAG(
rtp,
"packet has too high seq and forces buffer emptying [ssrc:%" PRIu32 ", seq:%" PRIu16
", timestamp:%" PRIu32 "]",
ssrc,
seq,
timestamp);
numBlankSlots = 0u;
Clear();
}
else
{
MS_DEBUG_DEV(
"calling RemoveOldest(%" PRIu16 ") [bufferSize:%zu, numBlankSlots:%" PRIu16
", maxItems:%" PRIu16 "]",
numItemsToRemove,
this->buffer.size(),
numBlankSlots,
this->maxItems);
RemoveOldest(numItemsToRemove);
}
}
for (uint16_t i{ 0u }; i < numBlankSlots; ++i)
{
this->buffer.push_back(nullptr);
}
auto* item = new Item();
this->buffer.push_back(RtpRetransmissionBuffer::FillItem(item, packet, sharedPacket));
}
else if (RTC::SeqManager<uint16_t>::IsSeqLowerThan(seq, oldestItem->sequenceNumber))
{
MS_DEBUG_DEV(
"packet out of order and older than oldest packet in the buffer [seq:%" PRIu16
", timestamp:%" PRIu32 "]",
seq,
timestamp);
if (IsTooOldTimestamp(timestamp, newestItem->timestamp))
{
MS_WARN_DEV(
"packet's timestamp too old, discarding it [seq:%" PRIu16 ", timestamp:%" PRIu32 "]",
seq,
timestamp);
return false;
}
if (Utils::Number<uint32_t>::IsHigherThan(timestamp, oldestItem->timestamp))
{
MS_WARN_TAG(
rtp,
"packet has lower seq but higher timestamp than oldest packet in the buffer, discarding it [ssrc:%" PRIu32
", seq:%" PRIu16 ", timestamp:%" PRIu32 "]",
ssrc,
seq,
timestamp);
return false;
}
const uint16_t numBlankSlots = oldestItem->sequenceNumber - seq - 1;
if (this->buffer.size() + numBlankSlots + 1 > this->maxItems)
{
MS_WARN_TAG(
rtp,
"discarding received old packet to not exceed max buffer size [ssrc:%" PRIu32
", seq:%" PRIu16 ", timestamp:%" PRIu32 "]",
ssrc,
seq,
timestamp);
return false;
}
for (uint16_t i{ 0u }; i < numBlankSlots; ++i)
{
this->buffer.push_front(nullptr);
}
auto* item = new Item();
this->buffer.push_front(RtpRetransmissionBuffer::FillItem(item, packet, sharedPacket));
}
else
{
MS_DEBUG_DEV(
"packet out of order and in between oldest and newest packets in the buffer [seq:%" PRIu16
", timestamp:%" PRIu32 "]",
seq,
timestamp);
auto* item = Get(seq);
if (item)
{
MS_DEBUG_DEV(
"packet already in the buffer, discarding [seq:%" PRIu16 ", timestamp:%" PRIu32 "]",
seq,
timestamp);
return false;
}
const uint16_t idx = seq - oldestItem->sequenceNumber;
for (int32_t idx2 = idx - 1; idx2 >= 0; --idx2)
{
const auto* olderItem = this->buffer.at(idx2);
if (!olderItem)
{
continue;
}
if (timestamp >= olderItem->timestamp)
{
break;
}
else
{
MS_WARN_TAG(
rtp,
"packet timestamp is lower than timestamp of immediate older packet in the buffer, discarding it [ssrc:%" PRIu32
", seq:%" PRIu16 ", timestamp:%" PRIu32 "]",
ssrc,
seq,
timestamp);
return false;
}
}
for (size_t idx2 = idx + 1; idx2 < this->buffer.size(); ++idx2)
{
const auto* newerItem = this->buffer.at(idx2);
if (!newerItem)
{
continue;
}
if (timestamp <= newerItem->timestamp)
{
break;
}
else
{
MS_WARN_TAG(
rtp,
"packet timestamp is higher than timestamp of immediate newer packet in the buffer, discarding it [ssrc:%" PRIu32
", seq:%" PRIu16 ", timestamp:%" PRIu32 "]",
ssrc,
seq,
timestamp);
return false;
}
}
item = new Item();
this->buffer[idx] = RtpRetransmissionBuffer::FillItem(item, packet, sharedPacket);
}
MS_ASSERT(
this->buffer.size() <= this->maxItems,
"buffer contains %zu items (more than %" PRIu16 " max items)",
this->buffer.size(),
this->maxItems);
return true;
}
void RtpRetransmissionBuffer::Clear()
{
MS_TRACE();
for (auto* item : this->buffer)
{
if (!item)
{
continue;
}
item->Reset();
delete item;
}
this->buffer.clear();
}
void RtpRetransmissionBuffer::Dump() const
{
MS_TRACE();
MS_DUMP("<RtpRetransmissionBuffer>");
MS_DUMP(" buffer [size:%zu, maxSize:%" PRIu16 "]", this->buffer.size(), this->maxItems);
if (!this->buffer.empty())
{
const auto* oldestItem = GetOldest();
const auto* newestItem = GetNewest();
MS_DUMP(
" oldest item [seq:%" PRIu16 ", timestamp:%" PRIu32 "]",
oldestItem->sequenceNumber,
oldestItem->timestamp);
MS_DUMP(
" newest item [seq:%" PRIu16 ", timestamp:%" PRIu32 "]",
newestItem->sequenceNumber,
newestItem->timestamp);
MS_DUMP(
" buffer window: %" PRIu32 "ms",
static_cast<uint32_t>(newestItem->timestamp * 1000 / this->clockRate) -
static_cast<uint32_t>(oldestItem->timestamp * 1000 / this->clockRate));
}
MS_DUMP("</RtpRetransmissionBuffer>");
}
RtpRetransmissionBuffer::Item* RtpRetransmissionBuffer::GetOldest() const
{
MS_TRACE();
if (this->buffer.empty())
{
return nullptr;
}
return this->buffer.front();
}
RtpRetransmissionBuffer::Item* RtpRetransmissionBuffer::GetNewest() const
{
MS_TRACE();
if (this->buffer.empty())
{
return nullptr;
}
return this->buffer.back();
}
void RtpRetransmissionBuffer::RemoveOldest()
{
MS_TRACE();
if (this->buffer.empty())
{
return;
}
auto* item = this->buffer.front();
item->Reset();
delete item;
this->buffer.pop_front();
MS_DEBUG_DEV("removed 1 item from the front");
size_t numItemsRemoved{ 0u };
while (!this->buffer.empty() && this->buffer.front() == nullptr)
{
this->buffer.pop_front();
++numItemsRemoved;
}
if (numItemsRemoved)
{
MS_DEBUG_DEV("removed %zu blank slots from the front", numItemsRemoved);
}
}
void RtpRetransmissionBuffer::RemoveOldest(uint16_t numItems)
{
MS_TRACE();
MS_ASSERT(
numItems <= this->buffer.size(),
"attempting to remove more items than current buffer size [numItems:%" PRIu16
", bufferSize:%zu]",
numItems,
this->buffer.size());
const size_t intendedBufferSize = this->buffer.size() - numItems;
while (this->buffer.size() > intendedBufferSize)
{
RemoveOldest();
}
}
bool RtpRetransmissionBuffer::ClearTooOldByTimestamp(uint32_t newestTimestamp)
{
MS_TRACE();
RtpRetransmissionBuffer::Item* oldestItem{ nullptr };
bool itemsRemoved{ false };
while ((oldestItem = GetOldest()))
{
if (IsTooOldTimestamp(oldestItem->timestamp, newestTimestamp))
{
RemoveOldest();
itemsRemoved = true;
}
else
{
break;
}
}
return itemsRemoved;
}
bool RtpRetransmissionBuffer::IsTooOldTimestamp(uint32_t timestamp, uint32_t newestTimestamp) const
{
MS_TRACE();
if (Utils::Number<uint32_t>::IsHigherThan(timestamp, newestTimestamp))
{
return false;
}
const int64_t diffTs = newestTimestamp - timestamp;
return static_cast<uint32_t>(diffTs * 1000 / this->clockRate) > this->maxRetransmissionDelayMs;
}
void RtpRetransmissionBuffer::Item::Reset()
{
MS_TRACE();
this->sharedPacket = RTC::SharedRtpPacket();
this->ssrc = 0u;
this->sequenceNumber = 0u;
this->timestamp = 0u;
this->resentAtMs = 0u;
this->sentTimes = 0u;
}
}