#define MS_CLASS "RTC::NackGenerator"
#include "RTC/NackGenerator.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include <iterator>
#include <sstream>
#include <utility>
namespace RTC
{
constexpr size_t MaxPacketAge{ 10000u };
constexpr size_t MaxNackPackets{ 1000u };
constexpr uint32_t DefaultRtt{ 100u };
constexpr uint8_t MaxNackRetries{ 10u };
constexpr uint64_t TimerInterval{ 40u };
NackGenerator::NackGenerator(Listener* listener) : listener(listener), rtt(DefaultRtt)
{
MS_TRACE();
this->timer = new Timer(this);
}
NackGenerator::~NackGenerator()
{
MS_TRACE();
delete this->timer;
}
bool NackGenerator::ReceivePacket(RTC::RtpPacket* packet, bool isRecovered)
{
MS_TRACE();
uint16_t seq = packet->GetSequenceNumber();
bool isKeyFrame = packet->IsKeyFrame();
if (!this->started)
{
this->started = true;
this->lastSeq = seq;
if (isKeyFrame)
this->keyFrameList.insert(seq);
return false;
}
if (seq == this->lastSeq)
return false;
if (SeqManager<uint16_t>::IsSeqLowerThan(seq, this->lastSeq))
{
auto it = this->nackList.find(seq);
if (it != this->nackList.end())
{
MS_DEBUG_DEV(
"NACKed packet received [ssrc:%" PRIu32 ", seq:%" PRIu16 ", recovered:%s]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
isRecovered ? "true" : "false");
this->nackList.erase(it);
return true;
}
if (!isRecovered)
{
MS_WARN_DEV(
"ignoring older packet not present in the NACK list [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
}
return false;
}
if (isKeyFrame)
this->keyFrameList.insert(seq);
{
auto it = this->keyFrameList.lower_bound(seq - MaxPacketAge);
if (it != this->keyFrameList.begin())
this->keyFrameList.erase(this->keyFrameList.begin(), it);
}
if (isRecovered)
{
this->recoveredList.insert(seq);
auto it = this->recoveredList.lower_bound(seq - MaxPacketAge);
if (it != this->recoveredList.begin())
this->recoveredList.erase(this->recoveredList.begin(), it);
return false;
}
AddPacketsToNackList(this->lastSeq + 1, seq);
this->lastSeq = seq;
std::vector<uint16_t> nackBatch = GetNackBatch(NackFilter::SEQ);
if (!nackBatch.empty())
this->listener->OnNackGeneratorNackRequired(nackBatch);
if (!this->timer->IsActive())
MayRunTimer();
return false;
}
void NackGenerator::AddPacketsToNackList(uint16_t seqStart, uint16_t seqEnd)
{
MS_TRACE();
auto it = this->nackList.lower_bound(seqEnd - MaxPacketAge);
this->nackList.erase(this->nackList.begin(), it);
uint16_t numNewNacks = seqEnd - seqStart;
if (this->nackList.size() + numNewNacks > MaxNackPackets)
{
while (
RemoveNackItemsUntilKeyFrame() &&
this->nackList.size() + numNewNacks > MaxNackPackets
)
{
}
if (this->nackList.size() + numNewNacks > MaxNackPackets)
{
MS_WARN_TAG(
rtx, "NACK list full, clearing it and requesting a key frame [seqEnd:%" PRIu16 "]", seqEnd);
this->nackList.clear();
this->listener->OnNackGeneratorKeyFrameRequired();
return;
}
}
for (uint16_t seq = seqStart; seq != seqEnd; ++seq)
{
MS_ASSERT(this->nackList.find(seq) == this->nackList.end(), "packet already in the NACK list");
if (this->recoveredList.find(seq) != this->recoveredList.end())
continue;
this->nackList.emplace(std::make_pair(seq, NackInfo{ seq, seq }));
}
}
bool NackGenerator::RemoveNackItemsUntilKeyFrame()
{
MS_TRACE();
while (!this->keyFrameList.empty())
{
auto it = this->nackList.lower_bound(*this->keyFrameList.begin());
if (it != this->nackList.begin())
{
this->nackList.erase(this->nackList.begin(), it);
return true;
}
this->keyFrameList.erase(this->keyFrameList.begin());
}
return false;
}
std::vector<uint16_t> NackGenerator::GetNackBatch(NackFilter filter)
{
MS_TRACE();
uint64_t nowMs = DepLibUV::GetTimeMs();
std::vector<uint16_t> nackBatch;
auto it = this->nackList.begin();
while (it != this->nackList.end())
{
NackInfo& nackInfo = it->second;
uint16_t seq = nackInfo.seq;
if (
filter == NackFilter::SEQ &&
nackInfo.sentAtMs == 0 &&
(
nackInfo.sendAtSeq == this->lastSeq ||
SeqManager<uint16_t>::IsSeqHigherThan(this->lastSeq, nackInfo.sendAtSeq)
)
)
{
nackBatch.emplace_back(seq);
nackInfo.retries++;
nackInfo.sentAtMs = nowMs;
if (nackInfo.retries >= MaxNackRetries)
{
MS_WARN_TAG(
rtx,
"sequence number removed from the NACK list due to max retries [filter:seq, seq:%" PRIu16
"]",
seq);
it = this->nackList.erase(it);
}
else
{
++it;
}
continue;
}
if (filter == NackFilter::TIME && nowMs - nackInfo.sentAtMs >= this->rtt)
{
nackBatch.emplace_back(seq);
nackInfo.retries++;
nackInfo.sentAtMs = nowMs;
if (nackInfo.retries >= MaxNackRetries)
{
MS_WARN_TAG(
rtx,
"sequence number removed from the NACK list due to max retries [filter:time, seq:%" PRIu16
"]",
seq);
it = this->nackList.erase(it);
}
else
{
++it;
}
continue;
}
++it;
}
#if MS_LOG_DEV_LEVEL == 3
if (!nackBatch.empty())
{
std::ostringstream seqsStream;
std::copy(
nackBatch.begin(), nackBatch.end() - 1, std::ostream_iterator<uint32_t>(seqsStream, ","));
seqsStream << nackBatch.back();
if (filter == NackFilter::SEQ)
MS_DEBUG_DEV("[filter:SEQ, asking seqs:%s]", seqsStream.str().c_str());
else
MS_DEBUG_DEV("[filter:TIME, asking seqs:%s]", seqsStream.str().c_str());
}
#endif
return nackBatch;
}
void NackGenerator::Reset()
{
MS_TRACE();
this->nackList.clear();
this->keyFrameList.clear();
this->recoveredList.clear();
this->started = false;
this->lastSeq = 0u;
}
inline void NackGenerator::MayRunTimer() const
{
if (!this->nackList.empty())
this->timer->Start(TimerInterval);
}
inline void NackGenerator::OnTimer(Timer* )
{
MS_TRACE();
std::vector<uint16_t> nackBatch = GetNackBatch(NackFilter::TIME);
if (!nackBatch.empty())
this->listener->OnNackGeneratorNackRequired(nackBatch);
MayRunTimer();
}
}