#define MS_CLASS "RTC::SCTP::StreamResetHandler"
#include "RTC/SCTP/association/StreamResetHandler.hpp"
#include "Logger.hpp"
#include "RTC/SCTP/packet/Parameter.hpp"
#include "RTC/SCTP/packet/parameters/ReconfigurationResponseParameter.hpp"
namespace RTC
{
namespace SCTP
{
StreamResetHandler::StreamResetHandler(
AssociationListenerInterface& associationListener,
SharedInterface* shared,
TransmissionControlBlockContextInterface* tcbContext,
DataTracker* dataTracker,
ReassemblyQueue* reassemblyQueue,
RetransmissionQueue* retransmissionQueue)
: associationListener(associationListener),
shared(shared),
tcbContext(tcbContext),
dataTracker(dataTracker),
reassemblyQueue(reassemblyQueue),
retransmissionQueue(retransmissionQueue),
reConfigTimer(this->shared->CreateBackoffTimer(
BackoffTimerHandleInterface::BackoffTimerHandleOptions{
.listener = this,
.label = "sctp-re-config",
.baseTimeoutMs = 0,
.backoffAlgorithm = BackoffTimerHandleInterface::BackoffAlgorithm::EXPONENTIAL,
.maxBackoffTimeoutMs = std::nullopt,
.maxRestarts = std::nullopt,
})),
nextOutgoingReqSeqNbr(tcbContext->GetLocalInitialTsn()),
lastProcessedReqSeqNbr(
this->incomingReConfigRequestSnUnwrapper.Unwrap(tcbContext->GetRemoteInitialTsn() - 1)),
lastProcessedReqResult(ReconfigurationResponseParameter::Result::SUCCESS_NOTHING_TO_DO)
{
MS_TRACE();
}
StreamResetHandler::~StreamResetHandler()
{
MS_TRACE();
}
void StreamResetHandler::ResetStreams(std::span<const uint16_t> outgoingStreamIds)
{
MS_TRACE();
for (const auto streamId : outgoingStreamIds)
{
this->retransmissionQueue->PrepareResetStream(streamId);
}
}
bool StreamResetHandler::ShouldSendStreamResetRequest() const
{
MS_TRACE();
return !this->currentRequest.has_value() &&
this->retransmissionQueue->HasStreamsReadyToBeReset();
}
void StreamResetHandler::AddStreamResetRequest(Packet* packet)
{
MS_TRACE();
MS_ASSERT(ShouldSendStreamResetRequest(), "should not send a stream reset request");
this->currentRequest.emplace(
this->retransmissionQueue->GetLastAssignedTsn(),
this->retransmissionQueue->BeginResetStreams());
this->reConfigTimer->SetBaseTimeoutMs(this->tcbContext->GetCurrentRtoMs());
this->reConfigTimer->Start();
AddReConfigChunk(packet);
}
void StreamResetHandler::HandleReceivedReConfigChunk(const ReConfigChunk* receivedReConfigChunk)
{
MS_TRACE();
if (!ValidateReceivedReConfigChunk(receivedReConfigChunk))
{
this->associationListener.OnAssociationError(
Types::ErrorKind::PARSE_FAILED, "invalid RE-CONFIG command received");
return;
}
auto packet = this->tcbContext->CreatePacket();
auto* reConfigChunk = packet->BuildChunkInPlace<ReConfigChunk>();
for (auto it = receivedReConfigChunk->ParametersBegin();
it != receivedReConfigChunk->ParametersEnd();
++it)
{
const auto* parameter = *it;
switch (parameter->GetType())
{
case Parameter::ParameterType::OUTGOING_SSN_RESET_REQUEST:
{
HandleReceivedOutgoingSsnResetRequestParameter(
reinterpret_cast<const OutgoingSsnResetRequestParameter*>(parameter), reConfigChunk);
break;
}
case Parameter::ParameterType::INCOMING_SSN_RESET_REQUEST:
{
HandleReceivedIncomingSsnResetRequestParameter(
reinterpret_cast<const IncomingSsnResetRequestParameter*>(parameter), reConfigChunk);
break;
}
case Parameter::ParameterType::RECONFIGURATION_RESPONSE:
{
HandleReceivedReconfigurationResponseParameter(
reinterpret_cast<const ReconfigurationResponseParameter*>(parameter));
break;
}
default:;
}
}
reConfigChunk->Consolidate();
if (reConfigChunk->GetParametersCount() > 0)
{
this->tcbContext->SendPacket(packet.get());
}
}
bool StreamResetHandler::ValidateReceivedReConfigChunk(const ReConfigChunk* receivedReConfigChunk)
{
MS_TRACE();
if (receivedReConfigChunk->GetParametersCount() == 1)
{
const auto* firstParameter = receivedReConfigChunk->GetParameterAt(0);
if (
firstParameter->GetType() == Parameter::ParameterType::OUTGOING_SSN_RESET_REQUEST ||
firstParameter->GetType() == Parameter::ParameterType::INCOMING_SSN_RESET_REQUEST ||
firstParameter->GetType() == Parameter::ParameterType::SSN_TSN_RESET_REQUEST ||
firstParameter->GetType() == Parameter::ParameterType::ADD_OUTGOING_STREAMS_REQUEST ||
firstParameter->GetType() == Parameter::ParameterType::ADD_INCOMING_STREAMS_REQUEST ||
firstParameter->GetType() == Parameter::ParameterType::RECONFIGURATION_RESPONSE)
{
return true;
}
}
else if (receivedReConfigChunk->GetParametersCount() == 2)
{
const auto* firstParameter = receivedReConfigChunk->GetParameterAt(0);
const auto* secondParameter = receivedReConfigChunk->GetParameterAt(1);
if (
(firstParameter->GetType() == Parameter::ParameterType::OUTGOING_SSN_RESET_REQUEST &&
secondParameter->GetType() == Parameter::ParameterType::INCOMING_SSN_RESET_REQUEST) ||
(firstParameter->GetType() == Parameter::ParameterType::INCOMING_SSN_RESET_REQUEST &&
secondParameter->GetType() == Parameter::ParameterType::OUTGOING_SSN_RESET_REQUEST) ||
(firstParameter->GetType() == Parameter::ParameterType::ADD_OUTGOING_STREAMS_REQUEST &&
secondParameter->GetType() == Parameter::ParameterType::ADD_INCOMING_STREAMS_REQUEST) ||
(firstParameter->GetType() == Parameter::ParameterType::ADD_INCOMING_STREAMS_REQUEST &&
secondParameter->GetType() == Parameter::ParameterType::ADD_OUTGOING_STREAMS_REQUEST) ||
(firstParameter->GetType() == Parameter::ParameterType::RECONFIGURATION_RESPONSE &&
secondParameter->GetType() == Parameter::ParameterType::OUTGOING_SSN_RESET_REQUEST) ||
(firstParameter->GetType() == Parameter::ParameterType::OUTGOING_SSN_RESET_REQUEST &&
secondParameter->GetType() == Parameter::ParameterType::RECONFIGURATION_RESPONSE) ||
(firstParameter->GetType() == Parameter::ParameterType::RECONFIGURATION_RESPONSE &&
secondParameter->GetType() == Parameter::ParameterType::RECONFIGURATION_RESPONSE) ||
(firstParameter->GetType() == Parameter::ParameterType::RECONFIGURATION_RESPONSE &&
secondParameter->GetType() == Parameter::ParameterType::RECONFIGURATION_RESPONSE))
{
return true;
}
}
MS_WARN_TAG(sctp, "invalid set of RE-CONFIG parameters");
return false;
}
void StreamResetHandler::AddReConfigChunk(Packet* packet)
{
MS_TRACE();
MS_ASSERT(this->currentRequest.has_value(), "currentRequest optional must have value");
if (!this->currentRequest->HasBeenSent())
{
this->currentRequest->PrepareToSend(this->nextOutgoingReqSeqNbr);
this->nextOutgoingReqSeqNbr = uint32_t{ this->nextOutgoingReqSeqNbr + 1 };
}
auto* reConfigChunk = packet->BuildChunkInPlace<ReConfigChunk>();
auto* outgoingSsnResetRequestParameter =
reConfigChunk->BuildParameterInPlace<OutgoingSsnResetRequestParameter>();
outgoingSsnResetRequestParameter->SetReconfigurationRequestSequenceNumber(
this->currentRequest->GetReqSeqNbr());
outgoingSsnResetRequestParameter->SetReconfigurationResponseSequenceNumber(
this->currentRequest->GetReqSeqNbr());
outgoingSsnResetRequestParameter->SetSenderLastAssignedTsn(
this->currentRequest->GetSenderLastAssignedTsn());
for (const auto& streamId : this->currentRequest->GetStreamIds())
{
outgoingSsnResetRequestParameter->AddStreamId(streamId);
}
outgoingSsnResetRequestParameter->Consolidate();
reConfigChunk->Consolidate();
}
StreamResetHandler::ReqSeqNbrValidationResult StreamResetHandler::ValidateReqSeqNbr(
StreamResetHandler::UnwrappedReConfigRequestSn reqSeqNbr)
{
MS_TRACE();
if (reqSeqNbr == this->lastProcessedReqSeqNbr)
{
return ReqSeqNbrValidationResult::RETRANSMISSION;
}
else if (reqSeqNbr != this->lastProcessedReqSeqNbr.GetNextValue())
{
MS_WARN_TAG(sctp, "bad reqSeqNbr: %" PRIu32, reqSeqNbr.Wrap());
return ReqSeqNbrValidationResult::BAD_SEQUENCE_NUMBER;
}
else
{
return ReqSeqNbrValidationResult::VALID;
}
}
void StreamResetHandler::HandleReceivedOutgoingSsnResetRequestParameter(
const OutgoingSsnResetRequestParameter* receivedOutgoingSsnResetRequestParameter,
ReConfigChunk* reConfigChunk)
{
MS_TRACE();
const UnwrappedReConfigRequestSn requestSn = this->incomingReConfigRequestSnUnwrapper.Unwrap(
receivedOutgoingSsnResetRequestParameter->GetReconfigurationRequestSequenceNumber());
const ReqSeqNbrValidationResult validationResult = ValidateReqSeqNbr(requestSn);
if (validationResult == ReqSeqNbrValidationResult::BAD_SEQUENCE_NUMBER)
{
auto* reconfigurationResponseParameter =
reConfigChunk->BuildParameterInPlace<ReconfigurationResponseParameter>();
reconfigurationResponseParameter->SetReconfigurationResponseSequenceNumber(
receivedOutgoingSsnResetRequestParameter->GetReconfigurationRequestSequenceNumber());
reconfigurationResponseParameter->SetResult(
ReconfigurationResponseParameter::Result::ERROR_BAD_SEQUENCE_NUMBER);
reconfigurationResponseParameter->Consolidate();
return;
}
if (
validationResult == ReqSeqNbrValidationResult::RETRANSMISSION &&
this->lastProcessedReqResult != ReconfigurationResponseParameter::Result::IN_PROGRESS)
{
auto* reconfigurationResponseParameter =
reConfigChunk->BuildParameterInPlace<ReconfigurationResponseParameter>();
reconfigurationResponseParameter->SetReconfigurationResponseSequenceNumber(
receivedOutgoingSsnResetRequestParameter->GetReconfigurationRequestSequenceNumber());
reconfigurationResponseParameter->SetResult(this->lastProcessedReqResult);
reconfigurationResponseParameter->Consolidate();
return;
}
this->lastProcessedReqSeqNbr = requestSn;
if (
this->dataTracker->IsLaterThanCumulativeAckedTsn(
receivedOutgoingSsnResetRequestParameter->GetSenderLastAssignedTsn()))
{
this->reassemblyQueue->EnterDeferredReset(
receivedOutgoingSsnResetRequestParameter->GetSenderLastAssignedTsn(),
receivedOutgoingSsnResetRequestParameter->GetStreamIds());
this->lastProcessedReqResult = ReconfigurationResponseParameter::Result::IN_PROGRESS;
MS_DEBUG_DEV(
"reset outgoing in progress, sender last assigned tsn %" PRIu32 " not yet reached",
receivedOutgoingSsnResetRequestParameter->GetSenderLastAssignedTsn());
}
else
{
this->reassemblyQueue->ResetStreamsAndLeaveDeferredReset(
receivedOutgoingSsnResetRequestParameter->GetStreamIds());
this->associationListener.OnAssociationInboundStreamsReset(
receivedOutgoingSsnResetRequestParameter->GetStreamIds());
this->lastProcessedReqResult = ReconfigurationResponseParameter::Result::SUCCESS_PERFORMED;
MS_DEBUG_DEV("reset outgoing performed");
MS_DEBUG_DEV(
"reset outgoing performed, sender last assigned tsn %" PRIu32 " reached",
receivedOutgoingSsnResetRequestParameter->GetSenderLastAssignedTsn());
}
auto* reconfigurationResponseParameter =
reConfigChunk->BuildParameterInPlace<ReconfigurationResponseParameter>();
reconfigurationResponseParameter->SetReconfigurationResponseSequenceNumber(
receivedOutgoingSsnResetRequestParameter->GetReconfigurationRequestSequenceNumber());
reconfigurationResponseParameter->SetResult(this->lastProcessedReqResult);
reconfigurationResponseParameter->Consolidate();
}
void StreamResetHandler::HandleReceivedIncomingSsnResetRequestParameter(
const IncomingSsnResetRequestParameter* receivedIncomingSsnResetRequestParameter,
ReConfigChunk* reConfigChunk)
{
MS_TRACE();
const UnwrappedReConfigRequestSn requestSn = this->incomingReConfigRequestSnUnwrapper.Unwrap(
receivedIncomingSsnResetRequestParameter->GetReconfigurationRequestSequenceNumber());
const ReqSeqNbrValidationResult validationResult = ValidateReqSeqNbr(requestSn);
if (validationResult == ReqSeqNbrValidationResult::VALID || validationResult == ReqSeqNbrValidationResult::RETRANSMISSION)
{
auto* reconfigurationResponseParameter =
reConfigChunk->BuildParameterInPlace<ReconfigurationResponseParameter>();
reconfigurationResponseParameter->SetReconfigurationResponseSequenceNumber(
receivedIncomingSsnResetRequestParameter->GetReconfigurationRequestSequenceNumber());
reconfigurationResponseParameter->SetResult(
ReconfigurationResponseParameter::Result::SUCCESS_NOTHING_TO_DO);
reconfigurationResponseParameter->Consolidate();
}
else
{
auto* reconfigurationResponseParameter =
reConfigChunk->BuildParameterInPlace<ReconfigurationResponseParameter>();
reconfigurationResponseParameter->SetReconfigurationResponseSequenceNumber(
receivedIncomingSsnResetRequestParameter->GetReconfigurationRequestSequenceNumber());
reconfigurationResponseParameter->SetResult(
ReconfigurationResponseParameter::Result::ERROR_BAD_SEQUENCE_NUMBER);
reconfigurationResponseParameter->Consolidate();
}
}
void StreamResetHandler::HandleReceivedReconfigurationResponseParameter(
const ReconfigurationResponseParameter* receivedReconfigurationResponseParameter)
{
MS_TRACE();
if (
this->currentRequest.has_value() && this->currentRequest->HasBeenSent() &&
receivedReconfigurationResponseParameter->GetReconfigurationResponseSequenceNumber() ==
this->currentRequest->GetReqSeqNbr())
{
this->reConfigTimer->Stop();
switch (receivedReconfigurationResponseParameter->GetResult())
{
case RTC::SCTP::ReconfigurationResponseParameter::Result::SUCCESS_NOTHING_TO_DO:
case RTC::SCTP::ReconfigurationResponseParameter::Result::SUCCESS_PERFORMED:
{
MS_DEBUG_DEV(
"reset stream success [reqSeqNbr:%" PRIu32 "]", this->currentRequest->GetReqSeqNbr());
this->associationListener.OnAssociationStreamsResetPerformed(
this->currentRequest->GetStreamIds());
this->currentRequest = std::nullopt;
this->retransmissionQueue->CommitResetStreams();
break;
}
case RTC::SCTP::ReconfigurationResponseParameter::Result::IN_PROGRESS:
{
MS_DEBUG_DEV(
"reset stream still pending [reqSeqNbr:%" PRIu32 "]",
this->currentRequest->GetReqSeqNbr());
this->currentRequest->SetDeferred(true);
this->reConfigTimer->SetBaseTimeoutMs(this->tcbContext->GetCurrentRtoMs());
this->reConfigTimer->Start();
break;
}
case RTC::SCTP::ReconfigurationResponseParameter::Result::ERROR_REQUEST_ALREADY_IN_PROGRESS:
case RTC::SCTP::ReconfigurationResponseParameter::Result::DENIED:
case RTC::SCTP::ReconfigurationResponseParameter::Result::ERROR_WRONG_SSN:
case RTC::SCTP::ReconfigurationResponseParameter::Result::ERROR_BAD_SEQUENCE_NUMBER:
{
MS_WARN_TAG(
sctp,
"reset stream error [reqSeqNbr:%" PRIu32 ", result:%s]",
this->currentRequest->GetReqSeqNbr(),
ReconfigurationResponseParameter::ResultToString(
receivedReconfigurationResponseParameter->GetResult())
.c_str());
this->associationListener.OnAssociationStreamsResetFailed(
this->currentRequest->GetStreamIds(),
ReconfigurationResponseParameter::ResultToString(
receivedReconfigurationResponseParameter->GetResult()));
this->currentRequest = std::nullopt;
this->retransmissionQueue->RollbackResetStreams();
break;
}
}
}
}
void StreamResetHandler::OnReConfigTimer(uint64_t& baseTimeoutMs, bool& )
{
MS_TRACE();
if (this->currentRequest && this->currentRequest->HasBeenSent())
{
if (this->currentRequest->IsDeferred())
{
this->currentRequest->SetDeferred(false);
}
else if (!this->tcbContext->IncrementTxErrorCounter("RECONFIG timeout"))
{
return;
}
}
else
{
}
auto packet = this->tcbContext->CreatePacket();
AddReConfigChunk(packet.get());
this->tcbContext->SendPacket(packet.get());
baseTimeoutMs = this->tcbContext->GetCurrentRtoMs();
}
void StreamResetHandler::OnBackoffTimer(
BackoffTimerHandleInterface* backoffTimer, uint64_t& baseTimeoutMs, bool& stop)
{
MS_TRACE();
const auto maxRestarts = backoffTimer->GetMaxRestarts();
MS_DEBUG_TAG(
sctp,
"%s timer has expired [expìrations:%zu/%s]",
backoffTimer->GetLabel().c_str(),
backoffTimer->GetExpirationCount(),
maxRestarts ? std::to_string(maxRestarts.value()).c_str() : "Infinite");
if (backoffTimer == this->reConfigTimer.get())
{
OnReConfigTimer(baseTimeoutMs, stop);
}
}
} }