#define MS_CLASS "RTC::TransportCongestionControlClient"
#define USE_TREND_CALCULATOR
#include "RTC/TransportCongestionControlClient.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include <libwebrtc/api/transport/network_types.h>
#include <limits>
namespace RTC
{
static constexpr float MaxBitrateMarginFactor{ 0.1f };
static constexpr float MaxBitrateIncrementFactor{ 1.35f };
static constexpr float MaxPaddingBitrateFactor{ 0.85f };
static constexpr uint64_t AvailableBitrateEventInterval{ 1000u }; static constexpr size_t PacketLossHistogramLength{ 24 };
TransportCongestionControlClient::TransportCongestionControlClient(
RTC::TransportCongestionControlClient::Listener* listener,
RTC::BweType bweType,
uint32_t initialAvailableBitrate,
uint32_t maxOutgoingBitrate,
uint32_t minOutgoingBitrate)
: listener(listener), bweType(bweType),
initialAvailableBitrate(std::max<uint32_t>(
initialAvailableBitrate, RTC::TransportCongestionControlMinOutgoingBitrate)),
maxOutgoingBitrate(maxOutgoingBitrate), minOutgoingBitrate(minOutgoingBitrate)
{
MS_TRACE();
webrtc::GoogCcFactoryConfig config;
config.feedback_only = true;
this->controllerFactory = new webrtc::GoogCcNetworkControllerFactory(std::move(config));
}
TransportCongestionControlClient::~TransportCongestionControlClient()
{
MS_TRACE();
delete this->controllerFactory;
this->controllerFactory = nullptr;
DestroyController();
}
void TransportCongestionControlClient::InitializeController()
{
MS_ASSERT(this->rtpTransportControllerSend == nullptr, "transport controller already initialized");
webrtc::BitrateConstraints bitrateConfig;
bitrateConfig.start_bitrate_bps = static_cast<int>(this->initialAvailableBitrate);
this->rtpTransportControllerSend =
new webrtc::RtpTransportControllerSend(this, nullptr, this->controllerFactory, bitrateConfig);
this->rtpTransportControllerSend->RegisterTargetTransferRateObserver(this);
this->probationGenerator = new RTC::RtpProbationGenerator();
this->rtpTransportControllerSend->EnablePeriodicAlrProbing(true);
this->processTimer = new TimerHandle(this);
this->processTimer->Start(std::min(
this->rtpTransportControllerSend->packet_sender()->TimeUntilNextProcess(),
this->controllerFactory->GetProcessInterval().ms()
));
}
void TransportCongestionControlClient::DestroyController()
{
delete this->rtpTransportControllerSend;
this->rtpTransportControllerSend = nullptr;
delete this->probationGenerator;
this->probationGenerator = nullptr;
delete this->processTimer;
this->processTimer = nullptr;
}
void TransportCongestionControlClient::TransportConnected()
{
MS_TRACE();
if (this->rtpTransportControllerSend == nullptr)
{
InitializeController();
}
this->rtpTransportControllerSend->OnNetworkAvailability(true);
}
void TransportCongestionControlClient::TransportDisconnected()
{
MS_TRACE();
#ifdef USE_TREND_CALCULATOR
auto nowMs = DepLibUV::GetTimeMsInt64();
#endif
this->bitrates.desiredBitrate = 0u;
this->bitrates.effectiveDesiredBitrate = 0u;
#ifdef USE_TREND_CALCULATOR
this->desiredBitrateTrend.ForceUpdate(0u, nowMs);
#endif
this->rtpTransportControllerSend->OnNetworkAvailability(false);
}
void TransportCongestionControlClient::InsertPacket(webrtc::RtpPacketSendInfo& packetInfo)
{
MS_TRACE();
if (this->rtpTransportControllerSend == nullptr)
{
return;
}
this->rtpTransportControllerSend->packet_sender()->InsertPacket(packetInfo.length);
this->rtpTransportControllerSend->OnAddPacket(packetInfo);
}
webrtc::PacedPacketInfo TransportCongestionControlClient::GetPacingInfo()
{
MS_TRACE();
if (this->rtpTransportControllerSend == nullptr)
{
return {};
}
return this->rtpTransportControllerSend->packet_sender()->GetPacingInfo();
}
void TransportCongestionControlClient::PacketSent(
const webrtc::RtpPacketSendInfo& packetInfo, int64_t nowMs)
{
MS_TRACE();
if (this->rtpTransportControllerSend == nullptr)
{
return;
}
rtc::SentPacket const sentPacket(packetInfo.transport_sequence_number, nowMs);
this->rtpTransportControllerSend->OnSentPacket(sentPacket, packetInfo.length);
}
void TransportCongestionControlClient::ReceiveEstimatedBitrate(uint32_t bitrate)
{
MS_TRACE();
if (this->rtpTransportControllerSend == nullptr)
{
return;
}
this->rtpTransportControllerSend->OnReceivedEstimatedBitrate(bitrate);
}
void TransportCongestionControlClient::ReceiveRtcpReceiverReport(
RTC::RTCP::ReceiverReportPacket* packet, float rtt, int64_t nowMs)
{
MS_TRACE();
webrtc::ReportBlockList reportBlockList;
for (auto it = packet->Begin(); it != packet->End(); ++it)
{
auto& report = *it;
reportBlockList.emplace_back(
packet->GetSsrc(),
report->GetSsrc(),
report->GetFractionLost(),
report->GetTotalLost(),
report->GetLastSeq(),
report->GetJitter(),
report->GetLastSenderReport(),
report->GetDelaySinceLastSenderReport());
}
if (this->rtpTransportControllerSend == nullptr)
{
return;
}
this->rtpTransportControllerSend->OnReceivedRtcpReceiverReport(
reportBlockList, static_cast<int64_t>(rtt), nowMs);
}
void TransportCongestionControlClient::ReceiveRtcpTransportFeedback(
const RTC::RTCP::FeedbackRtpTransportPacket* feedback)
{
MS_TRACE();
const size_t expectedPackets = feedback->GetPacketStatusCount();
size_t lostPackets = 0;
for (const auto& result : feedback->GetPacketResults())
{
if (!result.received)
{
lostPackets += 1;
}
}
if (expectedPackets > 0)
{
this->UpdatePacketLoss(static_cast<double>(lostPackets) / expectedPackets);
}
if (this->rtpTransportControllerSend == nullptr)
{
return;
}
this->rtpTransportControllerSend->OnTransportFeedback(*feedback);
}
void TransportCongestionControlClient::UpdatePacketLoss(double packetLoss)
{
if (this->packetLossHistory.size() == PacketLossHistogramLength)
{
this->packetLossHistory.pop_front();
}
this->packetLossHistory.push_back(packetLoss);
size_t weight{ 0 };
size_t samples{ 0 };
double totalPacketLoss{ 0 };
for (auto packetLossEntry : this->packetLossHistory)
{
weight++;
samples += weight;
totalPacketLoss += weight * packetLossEntry;
}
this->packetLoss = totalPacketLoss / samples;
}
void TransportCongestionControlClient::SetMaxOutgoingBitrate(uint32_t maxBitrate)
{
this->maxOutgoingBitrate = maxBitrate;
ApplyBitrateUpdates();
if (this->maxOutgoingBitrate > 0u)
{
this->bitrates.availableBitrate =
std::min<uint32_t>(this->maxOutgoingBitrate, this->bitrates.availableBitrate);
}
}
void TransportCongestionControlClient::SetMinOutgoingBitrate(uint32_t minBitrate)
{
this->minOutgoingBitrate = minBitrate;
ApplyBitrateUpdates();
this->bitrates.minBitrate = std::max<uint32_t>(
this->minOutgoingBitrate, RTC::TransportCongestionControlMinOutgoingBitrate);
}
void TransportCongestionControlClient::SetDesiredBitrate(uint32_t desiredBitrate, bool force)
{
MS_TRACE();
#ifdef USE_TREND_CALCULATOR
auto nowMs = DepLibUV::GetTimeMsInt64();
#endif
#ifdef USE_TREND_CALCULATOR
if (!force)
{
this->desiredBitrateTrend.Update(desiredBitrate, nowMs);
}
else
{
this->desiredBitrateTrend.ForceUpdate(desiredBitrate, nowMs);
}
#endif
this->bitrates.desiredBitrate = desiredBitrate;
#ifdef USE_TREND_CALCULATOR
this->bitrates.effectiveDesiredBitrate = this->desiredBitrateTrend.GetValue();
#else
this->bitrates.effectiveDesiredBitrate = desiredBitrate;
#endif
this->bitrates.minBitrate = std::max<uint32_t>(
this->minOutgoingBitrate, RTC::TransportCongestionControlMinOutgoingBitrate);
this->bitrates.startBitrate = std::max<uint32_t>(
RTC::TransportCongestionControlMinOutgoingBitrate, this->bitrates.availableBitrate);
ApplyBitrateUpdates();
}
void TransportCongestionControlClient::ApplyBitrateUpdates()
{
auto currentMaxBitrate = this->bitrates.maxBitrate;
uint32_t newMaxBitrate = 0;
#ifdef USE_TREND_CALCULATOR
if (this->desiredBitrateTrend.GetValue() > 0u)
#else
if (this->bitrates.desiredBitrate > 0u)
#endif
{
newMaxBitrate = std::max<uint32_t>(
this->initialAvailableBitrate,
#ifdef USE_TREND_CALCULATOR
this->desiredBitrateTrend.GetValue() * MaxBitrateIncrementFactor);
#else
this->bitrates.desiredBitrate * MaxBitrateIncrementFactor);
#endif
auto maxBitrateMargin = newMaxBitrate * MaxBitrateMarginFactor;
if (currentMaxBitrate > newMaxBitrate - maxBitrateMargin && currentMaxBitrate < newMaxBitrate + maxBitrateMargin)
{
newMaxBitrate = currentMaxBitrate;
}
}
else
{
newMaxBitrate = this->initialAvailableBitrate;
}
if (this->maxOutgoingBitrate > 0u)
{
newMaxBitrate = std::min<uint32_t>(this->maxOutgoingBitrate, newMaxBitrate);
}
if (newMaxBitrate != currentMaxBitrate)
{
this->bitrates.maxPaddingBitrate = newMaxBitrate * MaxPaddingBitrateFactor;
this->bitrates.maxBitrate = newMaxBitrate;
}
this->bitrates.minBitrate = std::max<uint32_t>(
this->minOutgoingBitrate, RTC::TransportCongestionControlMinOutgoingBitrate);
MS_DEBUG_DEV(
"[desiredBitrate:%" PRIu32 ", desiredBitrateTrend:%" PRIu32 ", startBitrate:%" PRIu32
", minBitrate:%" PRIu32 ", maxBitrate:%" PRIu32 ", maxPaddingBitrate:%" PRIu32 "]",
this->bitrates.desiredBitrate,
this->desiredBitrateTrend.GetValue(),
this->bitrates.startBitrate,
this->bitrates.minBitrate,
this->bitrates.maxBitrate,
this->bitrates.maxPaddingBitrate);
if (this->rtpTransportControllerSend == nullptr)
{
return;
}
this->rtpTransportControllerSend->SetAllocatedSendBitrateLimits(
this->bitrates.minBitrate, this->bitrates.maxPaddingBitrate, this->bitrates.maxBitrate);
webrtc::TargetRateConstraints constraints;
constraints.at_time = webrtc::Timestamp::ms(DepLibUV::GetTimeMs());
constraints.min_data_rate = webrtc::DataRate::bps(this->bitrates.minBitrate);
constraints.max_data_rate = webrtc::DataRate::bps(this->bitrates.maxBitrate);
constraints.starting_rate = webrtc::DataRate::bps(this->bitrates.startBitrate);
this->rtpTransportControllerSend->SetClientBitratePreferences(constraints);
}
uint32_t TransportCongestionControlClient::GetAvailableBitrate() const
{
MS_TRACE();
return this->bitrates.availableBitrate;
}
double TransportCongestionControlClient::GetPacketLoss() const
{
MS_TRACE();
return this->packetLoss;
}
void TransportCongestionControlClient::RescheduleNextAvailableBitrateEvent()
{
MS_TRACE();
this->lastAvailableBitrateEventAtMs = DepLibUV::GetTimeMs();
}
void TransportCongestionControlClient::MayEmitAvailableBitrateEvent(uint32_t previousAvailableBitrate)
{
MS_TRACE();
const uint64_t nowMs = DepLibUV::GetTimeMsInt64();
bool notify{ false };
if (this->lastAvailableBitrateEventAtMs == 0u)
{
this->lastAvailableBitrateEventAtMs = nowMs;
return;
}
if (!this->availableBitrateEventCalled)
{
this->availableBitrateEventCalled = true;
notify = true;
}
else if (nowMs - this->lastAvailableBitrateEventAtMs >= AvailableBitrateEventInterval)
{
notify = true;
}
else if (this->bitrates.availableBitrate < previousAvailableBitrate * 0.75)
{
MS_WARN_TAG(
bwe,
"high BWE value decrease detected, notifying the listener [now:%" PRIu32 ", before:%" PRIu32
"]",
this->bitrates.availableBitrate,
previousAvailableBitrate);
notify = true;
}
else if (this->bitrates.availableBitrate > previousAvailableBitrate * 1.50)
{
MS_DEBUG_TAG(
bwe,
"high BWE value increase detected, notifying the listener [now:%" PRIu32 ", before:%" PRIu32
"]",
this->bitrates.availableBitrate,
previousAvailableBitrate);
notify = true;
}
if (notify)
{
MS_DEBUG_DEV(
"notifying the listener with new available bitrate:%" PRIu32,
this->bitrates.availableBitrate);
this->lastAvailableBitrateEventAtMs = nowMs;
this->listener->OnTransportCongestionControlClientBitrates(this, this->bitrates);
}
}
void TransportCongestionControlClient::OnTargetTransferRate(webrtc::TargetTransferRate targetTransferRate)
{
MS_TRACE();
if (
this->availableBitrateEventCalled &&
targetTransferRate.target_rate.bps() == this->initialAvailableBitrate
)
{
return;
}
auto previousAvailableBitrate = this->bitrates.availableBitrate;
if (targetTransferRate.target_rate.bps() > std::numeric_limits<uint32_t>::max())
{
this->bitrates.availableBitrate = std::numeric_limits<uint32_t>::max();
}
else
{
this->bitrates.availableBitrate = static_cast<uint32_t>(targetTransferRate.target_rate.bps());
}
MS_DEBUG_DEV("new available bitrate:%" PRIu32, this->bitrates.availableBitrate);
MayEmitAvailableBitrateEvent(previousAvailableBitrate);
}
void TransportCongestionControlClient::SendPacket(
RTC::RtpPacket* packet, const webrtc::PacedPacketInfo& pacingInfo)
{
MS_TRACE();
this->listener->OnTransportCongestionControlClientSendRtpPacket(this, packet, pacingInfo);
}
RTC::RtpPacket* TransportCongestionControlClient::GeneratePadding(size_t size)
{
MS_TRACE();
MS_ASSERT(this->probationGenerator, "probation generator not initialized")
return this->probationGenerator->GetNextPacket(size);
}
void TransportCongestionControlClient::OnTimer(TimerHandle* timer)
{
MS_TRACE();
if (timer == this->processTimer)
{
this->rtpTransportControllerSend->Process();
this->rtpTransportControllerSend->packet_sender()->Process();
this->processTimer->Start(std::min<uint64_t>(
this->rtpTransportControllerSend->packet_sender()->TimeUntilNextProcess(),
this->controllerFactory->GetProcessInterval().ms()
));
MayEmitAvailableBitrateEvent(this->bitrates.availableBitrate);
}
}
}