#define MS_CLASS "RTC::NackGenerator"
#include "RTC/NackGenerator.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include <iterator>
#include <sstream>
#include <utility>
namespace RTC
{
static constexpr size_t MaxPacketAge{ 10000u };
static constexpr size_t MaxNackPackets{ 1000u };
static constexpr uint32_t DefaultRtt{ 100u };
static constexpr uint8_t MaxNackRetries{ 10u };
static constexpr uint64_t TimerInterval{ 40u };
NackGenerator::NackGenerator(Listener* listener, unsigned int sendNackDelayMs)
: listener(listener), sendNackDelayMs(sendNackDelayMs), timer(new TimerHandle(this)),
rtt(DefaultRtt)
{
MS_TRACE();
}
NackGenerator::~NackGenerator()
{
MS_TRACE();
delete this->timer;
}
bool NackGenerator::ReceivePacket(RTC::RtpPacket* packet, bool isRecovered)
{
MS_TRACE();
const uint16_t seq = packet->GetSequenceNumber();
const bool isKeyFrame = packet->IsKeyFrame();
if (!this->started)
{
this->started = true;
this->lastSeq = seq;
if (isKeyFrame)
{
this->keyFrameList.insert(seq);
}
return false;
}
if (seq == this->lastSeq)
{
return false;
}
if (SeqManager<uint16_t>::IsSeqLowerThan(seq, this->lastSeq))
{
auto it = this->nackList.find(seq);
if (it != this->nackList.end())
{
MS_DEBUG_DEV(
"NACKed packet received [ssrc:%" PRIu32 ", seq:%" PRIu16 ", recovered:%s]",
packet->GetSsrc(),
packet->GetSequenceNumber(),
isRecovered ? "true" : "false");
auto retries = it->second.retries;
this->nackList.erase(it);
return retries != 0;
}
if (!isRecovered)
{
MS_WARN_DEV(
"ignoring older packet not present in the NACK list [ssrc:%" PRIu32 ", seq:%" PRIu16 "]",
packet->GetSsrc(),
packet->GetSequenceNumber());
}
return false;
}
if (isKeyFrame)
{
this->keyFrameList.insert(seq);
}
{
auto it = this->keyFrameList.lower_bound(seq - MaxPacketAge);
if (it != this->keyFrameList.begin())
{
this->keyFrameList.erase(this->keyFrameList.begin(), it);
}
}
if (isRecovered)
{
this->recoveredList.insert(seq);
auto it = this->recoveredList.lower_bound(seq - MaxPacketAge);
if (it != this->recoveredList.begin())
{
this->recoveredList.erase(this->recoveredList.begin(), it);
}
return false;
}
AddPacketsToNackList(this->lastSeq + 1, seq);
this->lastSeq = seq;
const std::vector<uint16_t> nackBatch = GetNackBatch(NackFilter::SEQ);
if (!nackBatch.empty())
{
this->listener->OnNackGeneratorNackRequired(nackBatch);
}
if (!this->timer->IsActive())
{
MayRunTimer();
}
return false;
}
void NackGenerator::AddPacketsToNackList(uint16_t seqStart, uint16_t seqEnd)
{
MS_TRACE();
auto it = this->nackList.lower_bound(seqEnd - MaxPacketAge);
this->nackList.erase(this->nackList.begin(), it);
const uint16_t numNewNacks = seqEnd - seqStart;
if (static_cast<uint16_t>(this->nackList.size()) + numNewNacks > MaxNackPackets)
{
while (
RemoveNackItemsUntilKeyFrame() &&
static_cast<uint16_t>(this->nackList.size()) + numNewNacks > MaxNackPackets
)
{
}
if (static_cast<uint16_t>(this->nackList.size()) + numNewNacks > MaxNackPackets)
{
MS_WARN_TAG(
rtx, "NACK list full, clearing it and requesting a key frame [seqEnd:%" PRIu16 "]", seqEnd);
this->nackList.clear();
this->listener->OnNackGeneratorKeyFrameRequired();
return;
}
}
for (uint16_t seq = seqStart; seq != seqEnd; ++seq)
{
MS_ASSERT(this->nackList.find(seq) == this->nackList.end(), "packet already in the NACK list");
if (this->recoveredList.find(seq) != this->recoveredList.end())
{
continue;
}
this->nackList.emplace(std::make_pair(
seq,
NackInfo{
DepLibUV::GetTimeMs(),
seq,
seq,
}));
}
}
bool NackGenerator::RemoveNackItemsUntilKeyFrame()
{
MS_TRACE();
while (!this->keyFrameList.empty())
{
auto it = this->nackList.lower_bound(*this->keyFrameList.begin());
if (it != this->nackList.begin())
{
this->nackList.erase(this->nackList.begin(), it);
return true;
}
this->keyFrameList.erase(this->keyFrameList.begin());
}
return false;
}
std::vector<uint16_t> NackGenerator::GetNackBatch(NackFilter filter)
{
MS_TRACE();
const uint64_t nowMs = DepLibUV::GetTimeMs();
std::vector<uint16_t> nackBatch;
auto it = this->nackList.begin();
while (it != this->nackList.end())
{
NackInfo& nackInfo = it->second;
const uint16_t seq = nackInfo.seq;
if (this->sendNackDelayMs > 0 && nowMs - nackInfo.createdAtMs < this->sendNackDelayMs)
{
++it;
continue;
}
if (
filter == NackFilter::SEQ &&
nackInfo.sentAtMs == 0 &&
(
nackInfo.sendAtSeq == this->lastSeq ||
SeqManager<uint16_t>::IsSeqHigherThan(this->lastSeq, nackInfo.sendAtSeq)
)
)
{
nackBatch.emplace_back(seq);
nackInfo.retries++;
nackInfo.sentAtMs = nowMs;
if (nackInfo.retries >= MaxNackRetries)
{
MS_WARN_TAG(
rtx,
"sequence number removed from the NACK list due to max retries [filter:seq, seq:%" PRIu16
"]",
seq);
it = this->nackList.erase(it);
}
else
{
++it;
}
continue;
}
if (
filter == NackFilter::TIME &&
(nackInfo.sentAtMs == 0 ||
nowMs - nackInfo.sentAtMs >= (this->rtt > 0u ? this->rtt : DefaultRtt)))
{
nackBatch.emplace_back(seq);
nackInfo.retries++;
nackInfo.sentAtMs = nowMs;
if (nackInfo.retries >= MaxNackRetries)
{
MS_WARN_TAG(
rtx,
"sequence number removed from the NACK list due to max retries [filter:time, seq:%" PRIu16
"]",
seq);
it = this->nackList.erase(it);
}
else
{
++it;
}
continue;
}
++it;
}
#if MS_LOG_DEV_LEVEL == 3
if (!nackBatch.empty())
{
std::ostringstream seqsStream;
std::copy(
nackBatch.begin(), nackBatch.end() - 1, std::ostream_iterator<uint32_t>(seqsStream, ","));
seqsStream << nackBatch.back();
if (filter == NackFilter::SEQ)
{
MS_DEBUG_DEV("[filter:SEQ, asking seqs:%s]", seqsStream.str().c_str());
}
else
{
MS_DEBUG_DEV("[filter:TIME, asking seqs:%s]", seqsStream.str().c_str());
}
}
#endif
return nackBatch;
}
void NackGenerator::Reset()
{
MS_TRACE();
this->nackList.clear();
this->keyFrameList.clear();
this->recoveredList.clear();
this->started = false;
this->lastSeq = 0u;
}
inline void NackGenerator::MayRunTimer() const
{
if (this->nackList.empty())
{
this->timer->Stop();
}
else
{
this->timer->Start(TimerInterval);
}
}
inline void NackGenerator::OnTimer(TimerHandle* )
{
MS_TRACE();
const std::vector<uint16_t> nackBatch = GetNackBatch(NackFilter::TIME);
if (!nackBatch.empty())
{
this->listener->OnNackGeneratorNackRequired(nackBatch);
}
MayRunTimer();
}
}