#define MS_CLASS "RTC::TransportCongestionControlServer"
#include "RTC/TransportCongestionControlServer.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include "RTC/RTCP/FeedbackPsRemb.hpp"
#include <iterator>
#include <sstream>
namespace RTC
{
static constexpr uint64_t TransportCcFeedbackSendInterval{ 100u }; static constexpr uint64_t LimitationRembInterval{ 1500u }; static constexpr uint8_t UnlimitedRembNumPackets{ 4u };
static constexpr size_t PacketLossHistogramLength{ 24 };
TransportCongestionControlServer::TransportCongestionControlServer(
RTC::TransportCongestionControlServer::Listener* listener,
RTC::BweType bweType,
size_t maxRtcpPacketLen)
: listener(listener), bweType(bweType), maxRtcpPacketLen(maxRtcpPacketLen)
{
MS_TRACE();
switch (this->bweType)
{
case RTC::BweType::TRANSPORT_CC:
{
this->transportCcFeedbackPacket.reset(new RTC::RTCP::FeedbackRtpTransportPacket(0u, 0u));
this->transportCcFeedbackPacket->SetFeedbackPacketCount(this->transportCcFeedbackPacketCount);
this->transportCcFeedbackSendPeriodicTimer = new Timer(this);
break;
}
case RTC::BweType::REMB:
{
this->rembServer = new webrtc::RemoteBitrateEstimatorAbsSendTime(this);
break;
}
}
}
TransportCongestionControlServer::~TransportCongestionControlServer()
{
MS_TRACE();
delete this->transportCcFeedbackSendPeriodicTimer;
this->transportCcFeedbackSendPeriodicTimer = nullptr;
delete this->rembServer;
this->rembServer = nullptr;
}
void TransportCongestionControlServer::TransportConnected()
{
MS_TRACE();
switch (this->bweType)
{
case RTC::BweType::TRANSPORT_CC:
{
this->transportCcFeedbackSendPeriodicTimer->Start(
TransportCcFeedbackSendInterval, TransportCcFeedbackSendInterval);
break;
}
default:;
}
}
void TransportCongestionControlServer::TransportDisconnected()
{
MS_TRACE();
switch (this->bweType)
{
case RTC::BweType::TRANSPORT_CC:
{
this->transportCcFeedbackSendPeriodicTimer->Stop();
this->transportCcFeedbackPacket.reset(new RTC::RTCP::FeedbackRtpTransportPacket(0u, 0u));
break;
}
default:;
}
}
double TransportCongestionControlServer::GetPacketLoss() const
{
MS_TRACE();
return this->packetLoss;
}
void TransportCongestionControlServer::IncomingPacket(uint64_t nowMs, const RTC::RtpPacket* packet)
{
MS_TRACE();
switch (this->bweType)
{
case RTC::BweType::TRANSPORT_CC:
{
uint16_t wideSeqNumber;
if (!packet->ReadTransportWideCc01(wideSeqNumber))
break;
this->transportCcFeedbackSenderSsrc = 0u;
this->transportCcFeedbackMediaSsrc = packet->GetSsrc();
this->transportCcFeedbackPacket->SetSenderSsrc(0u);
this->transportCcFeedbackPacket->SetMediaSsrc(this->transportCcFeedbackMediaSsrc);
auto result =
this->transportCcFeedbackPacket->AddPacket(wideSeqNumber, nowMs, this->maxRtcpPacketLen);
switch (result)
{
case RTC::RTCP::FeedbackRtpTransportPacket::AddPacketResult::SUCCESS:
{
if (this->transportCcFeedbackPacket->IsFull())
{
MS_DEBUG_DEV("transport-cc feedback packet is full, sending feedback now");
SendTransportCcFeedback();
}
break;
}
case RTC::RTCP::FeedbackRtpTransportPacket::AddPacketResult::MAX_SIZE_EXCEEDED:
{
SendTransportCcFeedback();
this->transportCcFeedbackPacket->AddPacket(wideSeqNumber, nowMs, this->maxRtcpPacketLen);
break;
}
case RTC::RTCP::FeedbackRtpTransportPacket::AddPacketResult::FATAL:
{
this->transportCcFeedbackPacket.reset(new RTC::RTCP::FeedbackRtpTransportPacket(
this->transportCcFeedbackSenderSsrc, this->transportCcFeedbackMediaSsrc));
this->transportCcFeedbackPacket->SetFeedbackPacketCount(
this->transportCcFeedbackPacketCount);
break;
}
}
MaySendLimitationRembFeedback();
break;
}
case RTC::BweType::REMB:
{
uint32_t absSendTime;
if (!packet->ReadAbsSendTime(absSendTime))
break;
auto nowMsInt64 = static_cast<int64_t>(nowMs);
this->rembServer->IncomingPacket(nowMsInt64, packet->GetPayloadLength(), *packet, absSendTime);
break;
}
}
}
void TransportCongestionControlServer::SetMaxIncomingBitrate(uint32_t bitrate)
{
MS_TRACE();
auto previousMaxIncomingBitrate = this->maxIncomingBitrate;
this->maxIncomingBitrate = bitrate;
if (previousMaxIncomingBitrate != 0u && this->maxIncomingBitrate == 0u)
{
this->unlimitedRembCounter = UnlimitedRembNumPackets;
MaySendLimitationRembFeedback();
}
}
inline void TransportCongestionControlServer::SendTransportCcFeedback()
{
MS_TRACE();
if (!this->transportCcFeedbackPacket->IsSerializable())
return;
auto latestWideSeqNumber = this->transportCcFeedbackPacket->GetLatestSequenceNumber();
auto latestTimestamp = this->transportCcFeedbackPacket->GetLatestTimestamp();
this->listener->OnTransportCongestionControlServerSendRtcpPacket(
this, this->transportCcFeedbackPacket.get());
size_t expected_packets = this->transportCcFeedbackPacket->GetPacketStatusCount();
size_t lost_packets = 0;
for (const auto& result : this->transportCcFeedbackPacket->GetPacketResults())
{
if (!result.received)
lost_packets += 1;
}
this->UpdatePacketLoss(static_cast<double>(lost_packets) / expected_packets);
this->transportCcFeedbackPacket.reset(new RTC::RTCP::FeedbackRtpTransportPacket(
this->transportCcFeedbackSenderSsrc, this->transportCcFeedbackMediaSsrc));
this->transportCcFeedbackPacket->SetFeedbackPacketCount(++this->transportCcFeedbackPacketCount);
if (latestTimestamp > 0u)
{
this->transportCcFeedbackPacket->AddPacket(
latestWideSeqNumber, latestTimestamp, this->maxRtcpPacketLen);
}
}
inline void TransportCongestionControlServer::MaySendLimitationRembFeedback()
{
MS_TRACE();
auto nowMs = DepLibUV::GetTimeMs();
if (this->unlimitedRembCounter > 0u && this->maxIncomingBitrate != 0u)
this->unlimitedRembCounter = 0u;
if (
(
(this->bweType != RTC::BweType::REMB && this->maxIncomingBitrate != 0u) ||
this->unlimitedRembCounter > 0u
) &&
(
nowMs - this->limitationRembSentAtMs > LimitationRembInterval ||
this->unlimitedRembCounter == UnlimitedRembNumPackets
)
)
{
MS_DEBUG_DEV(
"sending limitation RTCP REMB packet [bitrate:%" PRIu32 "]", this->maxIncomingBitrate);
RTC::RTCP::FeedbackPsRembPacket packet(0u, 0u);
packet.SetBitrate(this->maxIncomingBitrate);
packet.Serialize(RTC::RTCP::Buffer);
this->listener->OnTransportCongestionControlServerSendRtcpPacket(this, &packet);
this->limitationRembSentAtMs = nowMs;
if (this->unlimitedRembCounter > 0u)
this->unlimitedRembCounter--;
}
}
void TransportCongestionControlServer::UpdatePacketLoss(double packetLoss)
{
if (this->packetLossHistory.size() == PacketLossHistogramLength)
this->packetLossHistory.pop_front();
this->packetLossHistory.push_back(packetLoss);
size_t weight{ 0 };
size_t samples{ 0 };
double totalPacketLoss{ 0 };
for (auto packetLossEntry : this->packetLossHistory)
{
weight++;
samples += weight;
totalPacketLoss += weight * packetLossEntry;
}
this->packetLoss = totalPacketLoss / samples;
}
inline void TransportCongestionControlServer::OnRembServerAvailableBitrate(
const webrtc::RemoteBitrateEstimator* ,
const std::vector<uint32_t>& ssrcs,
uint32_t availableBitrate)
{
MS_TRACE();
if (this->maxIncomingBitrate != 0u)
availableBitrate = std::min(availableBitrate, this->maxIncomingBitrate);
#if MS_LOG_DEV_LEVEL == 3
std::ostringstream ssrcsStream;
if (!ssrcs.empty())
{
std::copy(ssrcs.begin(), ssrcs.end() - 1, std::ostream_iterator<uint32_t>(ssrcsStream, ","));
ssrcsStream << ssrcs.back();
}
MS_DEBUG_DEV(
"sending RTCP REMB packet [bitrate:%" PRIu32 ", ssrcs:%s]",
availableBitrate,
ssrcsStream.str().c_str());
#endif
RTC::RTCP::FeedbackPsRembPacket packet(0u, 0u);
packet.SetBitrate(availableBitrate);
packet.SetSsrcs(ssrcs);
packet.Serialize(RTC::RTCP::Buffer);
this->listener->OnTransportCongestionControlServerSendRtcpPacket(this, &packet);
}
inline void TransportCongestionControlServer::OnTimer(Timer* timer)
{
MS_TRACE();
if (timer == this->transportCcFeedbackSendPeriodicTimer)
SendTransportCcFeedback();
}
}