#define MS_CLASS "RTC::SCTP::HeartbeatHandler"
#define MS_LOG_DEV_LEVEL 3
#include "RTC/SCTP/association/HeartbeatHandler.hpp"
#include "DepLibUV.hpp"
#include "Logger.hpp"
#include "Utils.hpp"
#include "RTC/SCTP/packet/parameters/HeartbeatInfoParameter.hpp"
#include "RTC/SCTP/public/SctpTypes.hpp"
#include <string>
namespace RTC
{
namespace SCTP
{
static constexpr int HeartbeatInfoLength{ 8 };
HeartbeatHandler::HeartbeatHandler(
AssociationListener& associationListener, const SctpOptions& sctpOptions, TCBContext* tcbContext)
: associationListener(associationListener),
sctpOptions(sctpOptions),
tcbContext(tcbContext),
intervalDurationMs(sctpOptions.heartbeatIntervalMs),
intervalDurationShouldIncludeRtt(sctpOptions.heartbeatIntervalIncludeRtt),
intervalTimer(
std::make_unique<BackoffTimerHandle>(
this,
sctpOptions.initialRtoMs,
BackoffTimerHandle::BackoffAlgorithm::EXPONENTIAL,
sctpOptions.timerMaxBackoffTimeoutMs,
std::nullopt)),
timeoutTimer(
std::make_unique<BackoffTimerHandle>(
this,
sctpOptions.initialRtoMs,
BackoffTimerHandle::BackoffAlgorithm::FIXED,
std::nullopt,
0))
{
MS_TRACE();
}
HeartbeatHandler::~HeartbeatHandler()
{
MS_TRACE();
}
void HeartbeatHandler::RestartTimer()
{
MS_TRACE();
if (this->intervalDurationMs == 0)
{
return;
}
if (intervalDurationShouldIncludeRtt)
{
this->intervalTimer->SetBaseTimeoutMs(
this->intervalDurationMs + this->tcbContext->GetCurrentRtoMs());
}
else
{
this->intervalTimer->SetBaseTimeoutMs(this->intervalDurationMs);
}
this->intervalTimer->Start();
}
void HeartbeatHandler::ProcessReceivedHeartbeatRequestChunk(
const HeartbeatRequestChunk* receivedHeartbeatRequestChunk)
{
MS_TRACE();
auto packet = this->tcbContext->CreatePacket();
auto* heartbeatAckChunk = packet->BuildChunkInPlace<HeartbeatAckChunk>();
for (auto it = receivedHeartbeatRequestChunk->ParametersBegin();
it != receivedHeartbeatRequestChunk->ParametersEnd();
++it)
{
const auto* parameter = *it;
heartbeatAckChunk->AddParameter(parameter);
}
heartbeatAckChunk->Consolidate();
this->tcbContext->Send(packet.get());
}
void HeartbeatHandler::ProcessReceivedHeartbeatAckChunk(
const HeartbeatAckChunk* receivedHeartbeatAckChunk)
{
MS_TRACE();
this->timeoutTimer->Stop();
const auto* heartbeatInfoParameter =
receivedHeartbeatAckChunk->GetFirstParameterOfType<HeartbeatInfoParameter>();
if (!heartbeatInfoParameter)
{
this->associationListener.OnAssociationError(
Types::ErrorKind::PARSE_FAILED,
"ignoring HEARTBEAT_ACK chunk without Heartbeat Info parameter");
return;
}
const auto* info = heartbeatInfoParameter->GetInfo();
const uint16_t infoLen = heartbeatInfoParameter->GetInfoLength();
if (!info)
{
this->associationListener.OnAssociationError(
Types::ErrorKind::PARSE_FAILED, "ignoring Heartbeat Info parameter without info field");
return;
}
else if (infoLen != HeartbeatInfoLength)
{
this->associationListener.OnAssociationError(
Types::ErrorKind::PARSE_FAILED, "ignoring Heartbeat Info parameter with wrong length");
return;
}
const uint64_t createdAtMs = Utils::Byte::Get8Bytes(info, 0);
const uint64_t nowMs = DepLibUV::GetTimeMs();
if (createdAtMs > 0 && createdAtMs <= nowMs)
{
const uint64_t rtt = nowMs - createdAtMs;
MS_DEBUG_DEV("valid HEARTBEAT_ACK Chunk received, calling ObserveRtt(%" PRIu64 ")", rtt);
this->tcbContext->ObserveRtt(rtt);
}
else
{
MS_WARN_DEV(
"ignoring received HEARTBEAT_ACK Chunk with invalid info content [createdAtMs:%" PRIu64
", nowMs:%" PRIu64 "]",
createdAtMs,
nowMs);
}
this->tcbContext->ClearTxErrorCounter();
}
void HeartbeatHandler::OnIntervalTimer(uint64_t& , bool& )
{
MS_TRACE();
if (!this->tcbContext->IsAssociationEstablished())
{
MS_DEBUG_DEV("won't send HEARTBEAT_REQUEST when SCTP Association is not established");
return;
}
const auto maxRestarts = this->intervalTimer->GetMaxRestarts();
MS_DEBUG_TAG(
sctp,
"interval timer has expired %zu/%s]",
this->intervalTimer->GetExpirationCount(),
maxRestarts ? std::to_string(maxRestarts.value()).c_str() : "Infinite");
this->timeoutTimer->SetBaseTimeoutMs(this->tcbContext->GetCurrentRtoMs());
this->timeoutTimer->Start();
alignas(8) uint8_t info[HeartbeatInfoLength];
const uint64_t nowMs = DepLibUV::GetTimeMs();
Utils::Byte::Set8Bytes(info, 0, nowMs);
auto packet = this->tcbContext->CreatePacket();
auto* heartbeatRequestChunk = packet->BuildChunkInPlace<HeartbeatRequestChunk>();
auto* heartbeatInfoParameter =
heartbeatRequestChunk->BuildParameterInPlace<HeartbeatInfoParameter>();
heartbeatInfoParameter->SetInfo(info, HeartbeatInfoLength);
heartbeatInfoParameter->Consolidate();
heartbeatRequestChunk->Consolidate();
MS_DEBUG_DEV("sending HEARTBEAT_REQUEST Chunk with info content [nowMs:%" PRIu64 "]", nowMs);
this->tcbContext->Send(packet.get());
}
void HeartbeatHandler::OnTimeoutTimer(uint64_t& , bool& )
{
MS_TRACE();
const auto maxRestarts = this->timeoutTimer->GetMaxRestarts();
MS_DEBUG_TAG(
sctp,
"timeout timer has expired %zu/%s]",
this->timeoutTimer->GetExpirationCount(),
maxRestarts ? std::to_string(maxRestarts.value()).c_str() : "Infinite");
MS_ASSERT(!this->timeoutTimer->IsRunning(), "timeout timer shouldn't be running");
this->tcbContext->IncrementTxErrorCounter("hearbeat timeout");
}
void HeartbeatHandler::OnTimer(BackoffTimerHandle* backoffTimer, uint64_t& baseTimeoutMs, bool& stop)
{
MS_TRACE();
if (backoffTimer == this->intervalTimer.get())
{
OnIntervalTimer(baseTimeoutMs, stop);
}
else if (backoffTimer == this->timeoutTimer.get())
{
OnTimeoutTimer(baseTimeoutMs, stop);
}
}
} }