#define MS_CLASS "RTC::TransportCongestionControlServer"
#include "RTC/TransportCongestionControlServer.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include "RTC/RTCP/FeedbackPsRemb.hpp"
#include <iterator>
#include <sstream>
namespace RTC
{
static constexpr uint64_t TransportCcFeedbackSendInterval{ 100u }; static constexpr uint64_t LimitationRembInterval{ 1500u }; static constexpr uint64_t PacketArrivalTimestampWindow{ 500u }; static constexpr uint8_t UnlimitedRembNumPackets{ 4u };
static constexpr size_t PacketLossHistogramLength{ 24 };
TransportCongestionControlServer::TransportCongestionControlServer(
RTC::TransportCongestionControlServer::Listener* listener,
RTC::BweType bweType,
size_t maxRtcpPacketLen)
: listener(listener), bweType(bweType), maxRtcpPacketLen(maxRtcpPacketLen)
{
MS_TRACE();
switch (this->bweType)
{
case RTC::BweType::TRANSPORT_CC:
{
ResetTransportCcFeedback(0u);
this->transportCcFeedbackSendPeriodicTimer = new TimerHandle(this);
break;
}
case RTC::BweType::REMB:
{
this->rembServer = new webrtc::RemoteBitrateEstimatorAbsSendTime(this);
break;
}
}
}
TransportCongestionControlServer::~TransportCongestionControlServer()
{
MS_TRACE();
delete this->transportCcFeedbackSendPeriodicTimer;
this->transportCcFeedbackSendPeriodicTimer = nullptr;
delete this->rembServer;
this->rembServer = nullptr;
}
void TransportCongestionControlServer::TransportConnected()
{
MS_TRACE();
switch (this->bweType)
{
case RTC::BweType::TRANSPORT_CC:
{
this->transportCcFeedbackSendPeriodicTimer->Start(
TransportCcFeedbackSendInterval, TransportCcFeedbackSendInterval);
break;
}
default:;
}
}
void TransportCongestionControlServer::TransportDisconnected()
{
MS_TRACE();
switch (this->bweType)
{
case RTC::BweType::TRANSPORT_CC:
{
this->transportCcFeedbackSendPeriodicTimer->Stop();
ResetTransportCcFeedback(this->transportCcFeedbackPacketCount);
break;
}
default:;
}
}
double TransportCongestionControlServer::GetPacketLoss() const
{
MS_TRACE();
return this->packetLoss;
}
void TransportCongestionControlServer::IncomingPacket(uint64_t nowMs, const RTC::RtpPacket* packet)
{
MS_TRACE();
switch (this->bweType)
{
case RTC::BweType::TRANSPORT_CC:
{
uint16_t wideSeqNumber{ 0 };
if (!packet->ReadTransportWideCc01(wideSeqNumber))
{
break;
}
if (!this->mapPacketArrivalTimes.try_emplace(wideSeqNumber, nowMs).second)
{
break;
}
if (
!this->transportWideSeqNumberReceived ||
RTC::SeqManager<uint16_t>::IsSeqLowerThan(
wideSeqNumber, this->transportCcFeedbackWideSeqNumStart))
{
this->transportCcFeedbackWideSeqNumStart = wideSeqNumber;
}
this->transportWideSeqNumberReceived = true;
MayDropOldPacketArrivalTimes(wideSeqNumber, nowMs);
this->transportCcFeedbackSenderSsrc = 0u;
this->transportCcFeedbackMediaSsrc = packet->GetSsrc();
MaySendLimitationRembFeedback(nowMs);
break;
}
case RTC::BweType::REMB:
{
uint32_t absSendTime{ 0 };
if (!packet->ReadAbsSendTime(absSendTime))
{
break;
}
auto nowMsInt64 = static_cast<int64_t>(nowMs);
this->rembServer->IncomingPacket(nowMsInt64, packet->GetPayloadLength(), *packet, absSendTime);
break;
}
}
}
void TransportCongestionControlServer::FillAndSendTransportCcFeedback()
{
MS_TRACE();
if (!this->transportWideSeqNumberReceived)
{
return;
}
auto it = this->mapPacketArrivalTimes.lower_bound(this->transportCcFeedbackWideSeqNumStart);
if (it == this->mapPacketArrivalTimes.end())
{
return;
}
for (; it != this->mapPacketArrivalTimes.end(); ++it)
{
auto sequenceNumber = it->first;
auto timestamp = it->second;
if (!this->transportCcFeedbackPacket->IsBaseSet())
{
this->transportCcFeedbackPacket->SetBase(this->transportCcFeedbackWideSeqNumStart, timestamp);
}
auto result = this->transportCcFeedbackPacket->AddPacket(
sequenceNumber, timestamp, this->maxRtcpPacketLen);
switch (result)
{
case RTC::RTCP::FeedbackRtpTransportPacket::AddPacketResult::SUCCESS:
{
if (this->transportCcFeedbackPacket->IsFull())
{
MS_DEBUG_DEV("transport-cc feedback packet is full, sending feedback now");
auto sent = SendTransportCcFeedback();
if (sent)
{
++this->transportCcFeedbackPacketCount;
}
ResetTransportCcFeedback(this->transportCcFeedbackPacketCount);
}
break;
}
case RTC::RTCP::FeedbackRtpTransportPacket::AddPacketResult::MAX_SIZE_EXCEEDED:
{
auto sent = SendTransportCcFeedback();
if (sent)
{
++this->transportCcFeedbackPacketCount;
}
ResetTransportCcFeedback(this->transportCcFeedbackPacketCount);
--it;
break;
}
case RTC::RTCP::FeedbackRtpTransportPacket::AddPacketResult::FATAL:
{
ResetTransportCcFeedback(this->transportCcFeedbackPacketCount);
break;
}
}
}
auto sent = SendTransportCcFeedback();
if (sent)
{
++this->transportCcFeedbackPacketCount;
}
ResetTransportCcFeedback(this->transportCcFeedbackPacketCount);
}
void TransportCongestionControlServer::SetMaxIncomingBitrate(uint32_t bitrate)
{
MS_TRACE();
auto previousMaxIncomingBitrate = this->maxIncomingBitrate;
this->maxIncomingBitrate = bitrate;
if (previousMaxIncomingBitrate != 0u && this->maxIncomingBitrate == 0u)
{
this->unlimitedRembCounter = UnlimitedRembNumPackets;
auto nowMs = DepLibUV::GetTimeMs();
MaySendLimitationRembFeedback(nowMs);
}
}
bool TransportCongestionControlServer::SendTransportCcFeedback()
{
MS_TRACE();
this->transportCcFeedbackPacket->Finish();
if (!this->transportCcFeedbackPacket->IsSerializable())
{
MS_WARN_TAG(rtcp, "couldn't send feedback-cc packet because it is not serializable");
return false;
}
auto latestWideSeqNumber = this->transportCcFeedbackPacket->GetLatestSequenceNumber();
this->listener->OnTransportCongestionControlServerSendRtcpPacket(
this, this->transportCcFeedbackPacket.get());
const size_t expectedPackets = this->transportCcFeedbackPacket->GetPacketStatusCount();
size_t lostPackets = 0;
for (const auto& result : this->transportCcFeedbackPacket->GetPacketResults())
{
if (!result.received)
{
lostPackets += 1;
}
}
if (expectedPackets > 0)
{
this->UpdatePacketLoss(static_cast<double>(lostPackets) / expectedPackets);
}
this->transportCcFeedbackWideSeqNumStart = latestWideSeqNumber + 1;
return true;
}
void TransportCongestionControlServer::MayDropOldPacketArrivalTimes(uint16_t seqNum, uint64_t nowMs)
{
MS_TRACE();
if (nowMs >= PacketArrivalTimestampWindow)
{
uint64_t expiryTimestamp = nowMs - PacketArrivalTimestampWindow;
auto it = this->mapPacketArrivalTimes.begin();
while (it != this->mapPacketArrivalTimes.end() &&
it->first != this->transportCcFeedbackWideSeqNumStart &&
RTC::SeqManager<uint16_t>::IsSeqLowerThan(it->first, seqNum) &&
it->second <= expiryTimestamp)
{
it = this->mapPacketArrivalTimes.erase(it);
}
}
}
void TransportCongestionControlServer::MaySendLimitationRembFeedback(uint64_t nowMs)
{
MS_TRACE();
if (this->unlimitedRembCounter > 0u && this->maxIncomingBitrate != 0u)
{
this->unlimitedRembCounter = 0u;
}
if (
(
(this->bweType != RTC::BweType::REMB && this->maxIncomingBitrate != 0u) ||
this->unlimitedRembCounter > 0u
) &&
(
nowMs - this->limitationRembSentAtMs > LimitationRembInterval ||
this->unlimitedRembCounter == UnlimitedRembNumPackets
)
)
{
MS_DEBUG_DEV(
"sending limitation RTCP REMB packet [bitrate:%" PRIu32 "]", this->maxIncomingBitrate);
RTC::RTCP::FeedbackPsRembPacket packet(0u, 0u);
packet.SetBitrate(this->maxIncomingBitrate);
packet.Serialize(RTC::RTCP::Buffer);
this->listener->OnTransportCongestionControlServerSendRtcpPacket(this, &packet);
this->limitationRembSentAtMs = nowMs;
if (this->unlimitedRembCounter > 0u)
{
this->unlimitedRembCounter--;
}
}
}
void TransportCongestionControlServer::UpdatePacketLoss(double packetLoss)
{
if (this->packetLossHistory.size() == PacketLossHistogramLength)
{
this->packetLossHistory.pop_front();
}
this->packetLossHistory.push_back(packetLoss);
size_t weight{ 0 };
size_t samples{ 0 };
double totalPacketLoss{ 0 };
for (auto packetLossEntry : this->packetLossHistory)
{
weight++;
samples += weight;
totalPacketLoss += weight * packetLossEntry;
}
this->packetLoss = totalPacketLoss / samples;
}
void TransportCongestionControlServer::ResetTransportCcFeedback(uint8_t feedbackPacketCount)
{
MS_TRACE();
this->transportCcFeedbackPacket.reset(new RTC::RTCP::FeedbackRtpTransportPacket(
this->transportCcFeedbackSenderSsrc, this->transportCcFeedbackMediaSsrc));
this->transportCcFeedbackPacket->SetFeedbackPacketCount(feedbackPacketCount);
}
void TransportCongestionControlServer::OnRembServerAvailableBitrate(
const webrtc::RemoteBitrateEstimator* ,
const std::vector<uint32_t>& ssrcs,
uint32_t availableBitrate)
{
MS_TRACE();
if (this->maxIncomingBitrate != 0u)
{
availableBitrate = std::min(availableBitrate, this->maxIncomingBitrate);
}
#if MS_LOG_DEV_LEVEL == 3
std::ostringstream ssrcsStream;
if (!ssrcs.empty())
{
std::copy(ssrcs.begin(), ssrcs.end() - 1, std::ostream_iterator<uint32_t>(ssrcsStream, ","));
ssrcsStream << ssrcs.back();
}
MS_DEBUG_DEV(
"sending RTCP REMB packet [bitrate:%" PRIu32 ", ssrcs:%s]",
availableBitrate,
ssrcsStream.str().c_str());
#endif
RTC::RTCP::FeedbackPsRembPacket packet(0u, 0u);
packet.SetBitrate(availableBitrate);
packet.SetSsrcs(ssrcs);
packet.Serialize(RTC::RTCP::Buffer);
this->listener->OnTransportCongestionControlServerSendRtcpPacket(this, &packet);
}
void TransportCongestionControlServer::OnTimer(TimerHandle* timer)
{
MS_TRACE();
if (timer == this->transportCcFeedbackSendPeriodicTimer)
{
FillAndSendTransportCcFeedback();
}
}
}