s2n-quic-transport 0.16.0

Internal crate used by s2n-quic
Documentation
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

//! `StreamManager` manages the lifecycle of all `Stream`s inside a `Connection`

use crate::{
    connection,
    contexts::{ConnectionApiCallContext, OnTransmitError, WriteContext},
    recovery::RttEstimator,
    stream::{
        self,
        incoming_connection_flow_controller::IncomingConnectionFlowController,
        outgoing_connection_flow_controller::OutgoingConnectionFlowController,
        stream_container::{StreamContainer, StreamContainerIterationResult},
        stream_events::StreamEvents,
        stream_impl::StreamConfig,
        StreamError, StreamTrait,
    },
    transmission::{self, interest::Provider as _},
};
use core::{
    task::{Context, Poll, Waker},
    time::Duration,
};
use futures_core::ready;
use s2n_quic_core::{
    ack, endpoint,
    frame::{
        stream::StreamRef, DataBlocked, MaxData, MaxStreamData, MaxStreams, ResetStream,
        StopSending, StreamDataBlocked, StreamsBlocked,
    },
    packet::number::PacketNumberSpace,
    stream::{iter::StreamIter, ops, StreamId, StreamType},
    time::{timer, Timestamp},
    transport::{self, parameters::InitialFlowControlLimits},
    varint::VarInt,
};

/// Holds one Stream ID of each type (initiator/stream type)
#[derive(Debug)]
pub(super) struct StreamIdSet {
    server_initiated_unidirectional: Option<StreamId>,
    client_initiated_unidirectional: Option<StreamId>,
    server_initiated_bidirectional: Option<StreamId>,
    client_initiated_bidirectional: Option<StreamId>,
}

impl StreamIdSet {
    /// Returns the `StreamIdSet` where each `StreamId` is initialized to its
    /// initial value.
    pub fn initial() -> Self {
        Self {
            server_initiated_bidirectional: Some(StreamId::initial(
                endpoint::Type::Server,
                StreamType::Bidirectional,
            )),
            client_initiated_bidirectional: Some(StreamId::initial(
                endpoint::Type::Client,
                StreamType::Bidirectional,
            )),
            server_initiated_unidirectional: Some(StreamId::initial(
                endpoint::Type::Server,
                StreamType::Unidirectional,
            )),
            client_initiated_unidirectional: Some(StreamId::initial(
                endpoint::Type::Client,
                StreamType::Unidirectional,
            )),
        }
    }

    /// Returns the reference to the `StreamId` inside the set for the given
    /// initiator and stream type
    pub fn get_mut(
        &mut self,
        initiator: endpoint::Type,
        stream_type: StreamType,
    ) -> &mut Option<StreamId> {
        match (initiator, stream_type) {
            (endpoint::Type::Server, StreamType::Unidirectional) => {
                &mut self.server_initiated_unidirectional
            }
            (endpoint::Type::Client, StreamType::Unidirectional) => {
                &mut self.client_initiated_unidirectional
            }
            (endpoint::Type::Server, StreamType::Bidirectional) => {
                &mut self.server_initiated_bidirectional
            }
            (endpoint::Type::Client, StreamType::Bidirectional) => {
                &mut self.client_initiated_bidirectional
            }
        }
    }
}

/// Stores all required state for accepting incoming Streams via the
/// `accept()` method
#[derive(Debug)]
pub(super) struct AcceptState {
    /// The ID of the next bidirectional Stream that an `accept()` call
    /// should return.
    next_bidi_stream_to_accept: Option<StreamId>,
    /// The ID of the next unidirectional Stream that an `accept()` call
    /// should return.
    next_uni_stream_to_accept: Option<StreamId>,
    /// The `Waker` for the task which needs to get woken when the next
    /// bidirectional stream was accepted
    bidi_waker: Option<Waker>,
    /// The `Waker` for the task which needs to get woken when the next
    /// unidirectional stream was accepted
    uni_waker: Option<Waker>,
}

impl AcceptState {
    pub fn new(local_endpoint_type: endpoint::Type) -> AcceptState {
        let peer_type = local_endpoint_type.peer_type();

        AcceptState {
            next_bidi_stream_to_accept: Some(StreamId::initial(
                peer_type,
                StreamType::Bidirectional,
            )),
            next_uni_stream_to_accept: Some(StreamId::initial(
                peer_type,
                StreamType::Unidirectional,
            )),
            bidi_waker: None,
            uni_waker: None,
        }
    }

    /// Returns a mutable reference to the `Waker` for the given Stream type
    pub fn waker_mut(&mut self, stream_type: StreamType) -> &mut Option<Waker> {
        match stream_type {
            StreamType::Bidirectional => &mut self.bidi_waker,
            StreamType::Unidirectional => &mut self.uni_waker,
        }
    }

    /// Returns the ID of the next Stream that needs to get accepted through
    /// an `accept())` call.
    pub fn next_stream_id(&self, stream_type: StreamType) -> Option<StreamId> {
        match stream_type {
            StreamType::Bidirectional => self.next_bidi_stream_to_accept,
            StreamType::Unidirectional => self.next_uni_stream_to_accept,
        }
    }

    /// Returns a mutable reference to the ID of the next Stream that needs to
    /// get accepted through an `accept())` call.
    pub fn next_stream_mut(&mut self, stream_type: StreamType) -> &mut Option<StreamId> {
        match stream_type {
            StreamType::Bidirectional => &mut self.next_bidi_stream_to_accept,
            StreamType::Unidirectional => &mut self.next_uni_stream_to_accept,
        }
    }
}

/// Manages all active `Stream`s inside a connection
#[derive(Debug)]
pub struct StreamManagerState<S> {
    /// Flow control credit manager for receiving data
    pub(super) incoming_connection_flow_controller: IncomingConnectionFlowController,
    /// Flow control credit manager for sending data
    pub(super) outgoing_connection_flow_controller: OutgoingConnectionFlowController,
    /// Controller for managing streams concurrency limits
    stream_controller: stream::Controller,
    /// A container which contains all Streams
    streams: StreamContainer<S>,
    /// The next Stream ID which was not yet used for an initiated stream
    /// for each stream type
    pub(super) next_stream_ids: StreamIdSet,
    /// The type of our local endpoint (client or server)
    local_endpoint_type: endpoint::Type,
    /// The initial flow control limits which we advertised towards the peer
    /// via transport parameters
    initial_local_limits: InitialFlowControlLimits,
    /// The initial flow control limits we received from the peer via transport
    /// parameters
    initial_peer_limits: InitialFlowControlLimits,
    /// If the `StreamManager` was closed, this contains the error which was
    /// passed to the `close()` call
    close_reason: Option<connection::Error>,
    /// All state for accepting remotely initiated connections
    pub(super) accept_state: AcceptState,
    /// Limits for the Stream manager. Since only Stream limits are utilized at
    /// the moment we only store those
    stream_limits: stream::Limits,
}

impl<S: StreamTrait> StreamManagerState<S> {
    /// Performs the given transaction on the `StreamManagerState`.
    /// If an error occurs, all Streams will be reset with an internal reset.
    pub fn reset_streams_on_error<F, R>(&mut self, func: F) -> Result<R, transport::Error>
    where
        F: FnOnce(&mut Self) -> Result<R, transport::Error>,
    {
        let result = func(self);
        if let Err(err) = result.as_ref() {
            self.close((*err).into(), false);
        }
        result
    }

    /// Inserts the `Stream` into the StreamContainer.
    ///
    /// This method does not perform any validation whether it is allowed to
    /// open the `Stream`.
    fn insert_stream(&mut self, stream_id: StreamId) {
        // The receive window is announced by us towards to the peer
        let initial_receive_window = self
            .initial_local_limits
            .stream_limits
            .max_data(self.local_endpoint_type, stream_id);
        // The send window is announced to us by the peer
        let initial_send_window = self
            .initial_peer_limits
            .stream_limits
            .max_data(self.local_endpoint_type.peer_type(), stream_id);

        // We pass the initial_receive_window also as the desired flow control
        // window. Thereby we will maintain the same flow control window over
        // the lifetime of the Stream.
        // If we would want to have another limit, we would need to have various
        // limits for the various combinations of unidirectional/bidirectional
        // Streams. Those would bloat up the config, and essentially just
        // duplicate the transport parameters.
        debug_assert!(
            initial_receive_window <= VarInt::from_u32(core::u32::MAX),
            "Receive window must not exceed 32bit range"
        );

        self.streams.insert_stream(S::new(StreamConfig {
            incoming_connection_flow_controller: self.incoming_connection_flow_controller.clone(),
            outgoing_connection_flow_controller: self.outgoing_connection_flow_controller.clone(),
            local_endpoint_type: self.local_endpoint_type,
            stream_id,
            initial_receive_window,
            desired_flow_control_window: initial_receive_window.as_u64() as u32,
            initial_send_window,
            max_send_buffer_size: self.stream_limits.max_send_buffer_size.as_u32(),
        }));
    }

    /// Opens a Stream which is referenced in a frame if it has not yet been
    /// opened so far. This will also open all unopened frames which a lower
    /// Stream ID of the same type, as required by the QUIC specification.
    fn open_stream_if_necessary(&mut self, stream_id: StreamId) -> Result<(), transport::Error> {
        // If the stream ID is higher than any Stream ID we observed so far, we
        // need open all Stream IDs of the same type. Otherwise we need to look
        // up the Stream ID the map.

        let first_unopened_id: StreamId = if let Some(first_unopened_id) = *self
            .next_stream_ids
            .get_mut(stream_id.initiator(), stream_id.stream_type())
        {
            first_unopened_id
        } else {
            // All Streams for particular initiator end endpoint type have
            // already been opened. In this case we don't have to open a
            // Stream, and the referenced Stream ID can also not be higher
            // than a previous outgoing Stream ID we used.
            return Ok(());
        };

        if stream_id.initiator() != self.local_endpoint_type {
            if stream_id >= first_unopened_id {
                // This Stream ID is first referenced here. This means we have
                // to create a new Stream instance

                if self.close_reason.is_some() {
                    return Err(transport::Error::NO_ERROR.with_reason("Connection was closed"));
                }

                //= https://www.rfc-editor.org/rfc/rfc9000#section-4.6
                //# Endpoints MUST NOT exceed the limit set by their peer.  An endpoint
                //# that receives a frame with a stream ID exceeding the limit it has
                //# sent MUST treat this as a connection error of type
                //# STREAM_LIMIT_ERROR; see Section 11 for details on error handling.
                let stream_iter = StreamIter::new(first_unopened_id, stream_id);

                // Validate that there is enough capacity to open all streams.
                self.stream_controller.on_open_remote_stream(stream_iter)?;

                // We must create ALL streams with a lower Stream ID too:
                //
                //= https://www.rfc-editor.org/rfc/rfc9000#section-3.2
                //# Before a stream is created, all streams of the same type with lower-
                //# numbered stream IDs MUST be created.  This ensures that the creation
                //# order for streams is consistent on both endpoints.
                for stream_id in stream_iter {
                    self.insert_stream(stream_id);
                }

                //= https://www.rfc-editor.org/rfc/rfc9000#section-2.1
                //# A QUIC
                //# endpoint MUST NOT reuse a stream ID within a connection.

                // Increase the next expected Stream ID. We might thereby exhaust
                // the Stream ID range, which means we can no longer accept a
                // further Stream.
                *self
                    .next_stream_ids
                    .get_mut(stream_id.initiator(), stream_id.stream_type()) =
                    stream_id.next_of_type();

                // Wake up the application if it is waiting on new incoming Streams
                if let Some(waker) = self.accept_state.waker_mut(stream_id.stream_type()).take() {
                    waker.wake();
                }
            }
        } else {
            // Check if the peer is sending us a frame for a local initiated Stream with
            // a higher Stream ID than we ever used.
            // In this case the peer seems to be time-travelling and know about
            // Future Stream IDs we might use. We also will not accept this and
            // close the connection.
            if stream_id >= first_unopened_id {
                return Err(
                    transport::Error::STREAM_STATE_ERROR.with_reason("Stream was not yet opened")
                );
            }
        }

        Ok(())
    }

    fn poll_open_local_stream(
        &mut self,
        stream_type: StreamType,
        open_token: &mut connection::OpenToken,
        context: &Context,
    ) -> Poll<Result<StreamId, connection::Error>> {
        let first_unopened_id = self
            .next_stream_ids
            .get_mut(self.local_endpoint_type, stream_type)
            .ok_or_else(connection::Error::stream_id_exhausted)?;

        //= https://www.rfc-editor.org/rfc/rfc9000#section-4.6
        //# Endpoints MUST NOT exceed the limit set by their peer.
        //
        //= https://www.rfc-editor.org/rfc/rfc9000#section-19.11
        //# An endpoint MUST NOT open more streams than permitted by the current
        //# stream limit set by its peer.
        let poll_open =
            self.stream_controller
                .poll_open_local_stream(stream_type, open_token, context);

        // returns Pending if there is no capacity available
        ready!(poll_open);

        self.insert_stream(first_unopened_id);
        Poll::Ready(Ok(first_unopened_id))
    }

    fn close(&mut self, error: connection::Error, flush: bool) {
        if self.close_reason.is_some() {
            return;
        }
        self.close_reason = Some(error);

        self.streams
            .iterate_streams(&mut self.stream_controller, |stream| {
                // We have to wake inside the lock, since `StreamEvent`s has no capacity
                // to carry wakers in another iteration
                let mut events = StreamEvents::new();
                if flush {
                    stream.on_flush(error.into(), &mut events);
                } else {
                    stream.on_internal_reset(error.into(), &mut events);
                }
                events.wake_all();
            });

        // If the connection gets closed we need to notify tasks which are blocked
        // on `accept()`.

        if let Some(waker) = self
            .accept_state
            .waker_mut(StreamType::Bidirectional)
            .take()
        {
            waker.wake();
        }
        if let Some(waker) = self
            .accept_state
            .waker_mut(StreamType::Unidirectional)
            .take()
        {
            waker.wake();
        }

        self.stream_controller.close();
    }

    fn flush(&mut self, error: connection::Error) -> Poll<()> {
        self.close(error, true);

        // if we still have active streams, we're not done flushing
        if self.streams.nr_active_streams() > 0 {
            Poll::Pending
        } else {
            Poll::Ready(())
        }
    }
}

/// Manages all active `Stream`s inside a connection.
/// `AbstractStreamManager` is parameterized over the `Stream` type.
#[derive(Debug)]
pub struct AbstractStreamManager<S> {
    pub(super) inner: StreamManagerState<S>,
    last_blocked_sync_period: Duration,
}

// Sending the `AbstractStreamManager` between threads is safe, since we never expose the `Rc`s
// outside of the container
#[allow(unknown_lints, clippy::non_send_fields_in_send_ty)]
unsafe impl<S> Send for AbstractStreamManager<S> {}

impl<S: StreamTrait> AbstractStreamManager<S> {
    /// Creates a new `StreamManager` using the provided configuration parameters
    pub fn new(
        connection_limits: &connection::Limits,
        local_endpoint_type: endpoint::Type,
        initial_local_limits: InitialFlowControlLimits,
        initial_peer_limits: InitialFlowControlLimits,
    ) -> Self {
        debug_assert!(
            initial_local_limits.max_data <= VarInt::from_u32(core::u32::MAX),
            "Receive window must not exceed 32bit range"
        );

        Self {
            inner: StreamManagerState {
                incoming_connection_flow_controller: IncomingConnectionFlowController::new(
                    initial_local_limits.max_data,
                    initial_local_limits.max_data.as_u64() as u32,
                ),
                outgoing_connection_flow_controller: OutgoingConnectionFlowController::new(
                    initial_peer_limits.max_data,
                ),
                stream_controller: stream::Controller::new(
                    local_endpoint_type,
                    initial_peer_limits,
                    initial_local_limits,
                    connection_limits.stream_limits(),
                ),
                streams: StreamContainer::new(),
                next_stream_ids: StreamIdSet::initial(),
                local_endpoint_type,
                initial_local_limits,
                initial_peer_limits,
                close_reason: None,
                accept_state: AcceptState::new(local_endpoint_type),
                stream_limits: connection_limits.stream_limits(),
            },
            last_blocked_sync_period: Duration::ZERO,
        }
    }

    /// The number of bytes of forward progress the peer has made on incoming streams
    pub fn incoming_bytes_progressed(&self) -> VarInt {
        self.inner
            .incoming_connection_flow_controller
            .acquired_window()
    }

    /// The number of bytes of forward progress the local endpoint has made on outgoing streams
    pub fn outgoing_bytes_progressed(&self) -> VarInt {
        self.inner
            .outgoing_connection_flow_controller
            .acquired_window()
    }

    /// Accepts the next incoming stream of a given type
    pub fn poll_accept(
        &mut self,
        stream_type: Option<StreamType>,
        context: &Context,
    ) -> Poll<Result<Option<StreamId>, connection::Error>> {
        macro_rules! with_stream_type {
            (| $stream_type:ident | $block:stmt) => {
                if stream_type == None || stream_type == Some(StreamType::Bidirectional) {
                    let $stream_type = StreamType::Bidirectional;
                    $block
                }
                if stream_type == None || stream_type == Some(StreamType::Unidirectional) {
                    let $stream_type = StreamType::Unidirectional;
                    $block
                }
            };
        }

        // Clear a stored Waker
        with_stream_type!(|stream_type| *self.inner.accept_state.waker_mut(stream_type) = None);

        // If the connection was closed we still allow the application to accept
        // Streams which are already known to the StreamManager.
        // This is done for 2 reasons:
        // 1. If the application doesn't interact with the Streams and observes
        //    their close status, they won't get removed from StreamManager due
        //    to missing finalization interest
        // 2. The streams might already have received all data from the peer at
        //    this point, and for applications it can be helpful to act on this
        //    data.

        with_stream_type!(|stream_type| if let Some(stream_id) =
            self.accept_stream_with_type(stream_type)?
        {
            return Ok(Some(stream_id)).into();
        });

        match self.inner.close_reason {
            // The connection closed without an error
            Some(connection::Error::Closed { .. }) => return Ok(None).into(),
            // Translate application closes to end of stream
            Some(connection::Error::Transport { code, .. })
                if code == transport::Error::APPLICATION_ERROR.code =>
            {
                return Ok(None).into()
            }
            // Translate idle timer expiration to end of stream
            Some(connection::Error::IdleTimerExpired { .. }) => return Ok(None).into(),
            Some(reason) => return Err(reason).into(),
            None => {}
        }

        // Store the `Waker` for notifying the application if we accept a Stream
        with_stream_type!(
            |stream_type| *self.inner.accept_state.waker_mut(stream_type) =
                Some(context.waker().clone())
        );

        Poll::Pending
    }

    fn accept_stream_with_type(
        &mut self,
        stream_type: StreamType,
    ) -> Result<Option<StreamId>, connection::Error> {
        // Check if the Stream exists
        let next_id_to_accept = self
            .inner
            .accept_state
            .next_stream_id(stream_type)
            .ok_or_else(connection::Error::stream_id_exhausted)?;

        if self.inner.streams.contains(next_id_to_accept) {
            *self.inner.accept_state.next_stream_mut(stream_type) =
                next_id_to_accept.next_of_type();
            Ok(Some(next_id_to_accept))
        } else {
            Ok(None)
        }
    }

    /// Opens the next local initiated stream of a certain type
    pub fn poll_open_local_stream(
        &mut self,
        stream_type: StreamType,
        open_token: &mut connection::OpenToken,
        context: &Context,
    ) -> Poll<Result<StreamId, connection::Error>> {
        // If StreamManager was closed, return the error
        if let Some(error) = self.inner.close_reason {
            return Err(error).into();
        }

        let first_unopened_id =
            ready!(self
                .inner
                .poll_open_local_stream(stream_type, open_token, context))?;

        // Increase the next utilized Stream ID
        *self
            .inner
            .next_stream_ids
            .get_mut(self.inner.local_endpoint_type, stream_type) =
            first_unopened_id.next_of_type();

        Ok(first_unopened_id).into()
    }

    /// This method gets called when a packet delivery got acknowledged
    pub fn on_packet_ack<A: ack::Set>(&mut self, ack_set: &A) {
        self.inner
            .incoming_connection_flow_controller
            .on_packet_ack(ack_set);
        self.inner
            .outgoing_connection_flow_controller
            .on_packet_ack(ack_set);
        self.inner.stream_controller.on_packet_ack(ack_set);

        self.inner.streams.iterate_frame_delivery_list(
            &mut self.inner.stream_controller,
            |stream| {
                // We have to wake inside the lock, since `StreamEvent`s has no capacity
                // to carry wakers in another iteration
                let mut events = StreamEvents::new();
                stream.on_packet_ack(ack_set, &mut events);
                events.wake_all();
            },
        );
    }

    /// This method gets called when a packet loss is reported
    pub fn on_packet_loss<A: ack::Set>(&mut self, ack_set: &A) {
        self.inner
            .incoming_connection_flow_controller
            .on_packet_loss(ack_set);
        self.inner
            .outgoing_connection_flow_controller
            .on_packet_loss(ack_set);
        self.inner.stream_controller.on_packet_loss(ack_set);

        self.inner.streams.iterate_frame_delivery_list(
            &mut self.inner.stream_controller,
            |stream| {
                // We have to wake inside the lock, since `StreamEvent`s has no capacity
                // to carry wakers in another iteration
                let mut events = StreamEvents::new();
                stream.on_packet_loss(ack_set, &mut events);
                events.wake_all();
            },
        );
    }

    /// This method gets called when the RTT estimate is updated for the active path
    pub fn on_rtt_update(&mut self, rtt_estimator: &RttEstimator) {
        let blocked_sync_period = self.blocked_sync_period(rtt_estimator);

        {
            let last_blocked_sync_period = self.last_blocked_sync_period.as_millis() as u64;
            let current_blocked_sync_period = blocked_sync_period.as_millis() as u64;

            /// The number of milliseconds to which the change comparison is configured
            ///
            /// Ideally this number is a power of 2 so the computation is efficient
            const SENSITIVITY_MS: u64 = 16;

            // If we haven't changed a significant amount, there's no point in updating everything
            if last_blocked_sync_period / SENSITIVITY_MS
                == current_blocked_sync_period / SENSITIVITY_MS
            {
                return;
            }
        }

        self.last_blocked_sync_period = blocked_sync_period;

        self.inner
            .stream_controller
            .update_blocked_sync_period(blocked_sync_period);
        self.inner
            .outgoing_connection_flow_controller
            .update_blocked_sync_period(blocked_sync_period);
        self.inner.streams.iterate_stream_flow_credits_list(
            &mut self.inner.stream_controller,
            |stream| {
                stream.update_blocked_sync_period(blocked_sync_period);
                StreamContainerIterationResult::Continue
            },
        );
    }

    /// Called when the connection timer expires
    pub fn on_timeout(&mut self, now: Timestamp) {
        self.inner.stream_controller.on_timeout(now);
        self.inner
            .outgoing_connection_flow_controller
            .on_timeout(now);
        self.inner.streams.iterate_stream_flow_credits_list(
            &mut self.inner.stream_controller,
            |stream| {
                stream.on_timeout(now);
                StreamContainerIterationResult::Continue
            },
        );
    }

    /// Closes the [`AbstractStreamManager`] and resets all streams with the
    /// given error. The current implementation will still
    /// allow to forward frames to the contained Streams as well as to query them
    /// for data. However new Streams can not be created.
    pub fn close(&mut self, error: connection::Error) {
        self.inner.close(error, false);
    }

    /// If the `StreamManager` is closed, this returns the error which which was
    /// used to close it.
    pub fn close_reason(&self) -> Option<connection::Error> {
        self.inner.close_reason
    }

    /// Closes the [`AbstractStreamManager`], flushes all send streams and resets all receive streams.
    ///
    /// This is used for when the application drops the connection but still has pending data to
    /// transmit.
    pub fn flush(&mut self, error: connection::Error) -> Poll<()> {
        self.inner.flush(error)
    }

    /// Queries the component for any outgoing frames that need to get sent
    pub fn on_transmit<W: WriteContext>(&mut self, context: &mut W) -> Result<(), OnTransmitError> {
        self.inner
            .incoming_connection_flow_controller
            .on_transmit(context)?;
        self.inner
            .outgoing_connection_flow_controller
            .on_transmit(context)?;
        self.inner.stream_controller.on_transmit(context)?;

        // Due to an error we could not transmit all data.
        // We add streams which could not send data back into the
        // waiting_for_transmission list, so that they will be queried again
        // the next time transmission capacity is available.
        // We actually add those Streams to the end of the list,
        // since the earlier entries are from Streams which were not
        // able to write all the desired data and added themselves as
        // transmit interested again
        let mut transmit_result = Ok(());

        if context.transmission_constraint().can_retransmit() {
            // ensure components only retransmit in this phase
            let mut retransmission_context =
                transmission::context::RetransmissionContext::new(context);

            // Prioritize retransmitting lost data
            self.inner.streams.iterate_retransmission_list(
                &mut self.inner.stream_controller,
                |stream: &mut S| {
                    transmit_result = stream.on_transmit(&mut retransmission_context);
                    if transmit_result.is_err() {
                        StreamContainerIterationResult::BreakAndInsertAtBack
                    } else {
                        StreamContainerIterationResult::Continue
                    }
                },
            );

            // return if there were any errors
            transmit_result?;
        }

        if context.transmission_constraint().can_transmit() {
            self.inner.streams.iterate_transmission_list(
                &mut self.inner.stream_controller,
                |stream: &mut S| {
                    transmit_result = stream.on_transmit(context);
                    if transmit_result.is_err() {
                        StreamContainerIterationResult::BreakAndInsertAtBack
                    } else {
                        StreamContainerIterationResult::Continue
                    }
                },
            );
        }

        // There is no `finalize_done_streams` here, since we do not expect to
        // perform an operation which brings us in a finalization state

        transmit_result
    }

    /// Calculates the period for sending STREAMS_BLOCKED, STREAM_DATA_BLOCKED and
    /// DATA_BLOCKED frames when blocked, according to the idle timeout and latest RTT estimates
    fn blocked_sync_period(&self, rtt_estimator: &RttEstimator) -> Duration {
        //= https://www.rfc-editor.org/rfc/rfc9000#section-4.1
        //# To keep the
        //# connection from closing, a sender that is flow control limited SHOULD
        //# periodically send a STREAM_DATA_BLOCKED or DATA_BLOCKED frame when it
        //# has no ack-eliciting packets in flight.

        // STREAMS_BLOCKED, DATA_BLOCKED, and STREAM_DATA_BLOCKED frames are
        // sent to prevent the connection from closing due to an idle timeout
        // when we are blocked from opening or sending on streams. We use a pto count
        // of 1 so the periodic components can track backoff independently.

        // For extremely low RTT networks, this will ensure we do not send blocked
        // frames too frequently.
        const MIN_BLOCKED_SYNC_PERIOD: Duration = Duration::from_millis(5);

        let pto = rtt_estimator.pto_period(1, PacketNumberSpace::ApplicationData);

        pto.max(MIN_BLOCKED_SYNC_PERIOD)
    }

    // Frame reception
    // These functions are called from the packet delivery thread

    /// This method encapsulates all common actions for handling incoming frames
    /// which target a specific `Stream`.
    /// It will open unopened Streams, lookup the required `Stream`,
    /// and then call the provided function on the Stream.
    /// If this leads to a connection error it will reset all internal connections.
    fn handle_stream_frame<F>(
        &mut self,
        stream_id: StreamId,
        mut func: F,
    ) -> Result<(), transport::Error>
    where
        F: FnMut(&mut S, &mut StreamEvents) -> Result<(), transport::Error>,
    {
        let mut events = StreamEvents::new();

        let result = {
            // If Stream handling causes an error, trigger an internal reset
            self.inner.reset_streams_on_error(|state| {
                // Open streams if necessary
                state.open_stream_if_necessary(stream_id)?;
                // Apply the provided function on the Stream.
                // If the Stream does not exist it is no error.
                state
                    .streams
                    .with_stream(stream_id, &mut state.stream_controller, |stream| {
                        func(stream, &mut events)
                    })
                    .unwrap_or(Ok(()))
            })
        };

        // We wake `Waker`s outside of the Mutex to reduce contention.
        // TODO: This is now no longer outside the Mutex
        events.wake_all();
        result
    }

    /// This is called when a `STREAM_DATA` frame had been received for
    /// a stream
    pub fn on_data(&mut self, frame: &StreamRef) -> Result<(), transport::Error> {
        let stream_id = StreamId::from_varint(frame.stream_id);
        self.handle_stream_frame(stream_id, |stream, events| stream.on_data(frame, events))
    }

    /// This is called when a `DATA_BLOCKED` frame had been received
    pub fn on_data_blocked(&mut self, _frame: DataBlocked) -> Result<(), transport::Error> {
        Ok(()) // This is currently ignored
    }

    /// This is called when a `STREAM_DATA_BLOCKED` frame had been received for
    /// a stream
    pub fn on_stream_data_blocked(
        &mut self,
        frame: &StreamDataBlocked,
    ) -> Result<(), transport::Error> {
        let stream_id = StreamId::from_varint(frame.stream_id);
        self.handle_stream_frame(stream_id, |stream, events| {
            stream.on_stream_data_blocked(frame, events)
        })
    }

    /// This is called when a `RESET_STREAM` frame had been received for
    /// a stream
    pub fn on_reset_stream(&mut self, frame: &ResetStream) -> Result<(), transport::Error> {
        let stream_id = StreamId::from_varint(frame.stream_id);
        self.handle_stream_frame(stream_id, |stream, events| stream.on_reset(frame, events))
    }

    /// This is called when a `MAX_STREAM_DATA` frame had been received for
    /// a stream
    pub fn on_max_stream_data(&mut self, frame: &MaxStreamData) -> Result<(), transport::Error> {
        let stream_id = StreamId::from_varint(frame.stream_id);
        self.handle_stream_frame(stream_id, |stream, events| {
            stream.on_max_stream_data(frame, events)
        })
    }

    /// This is called when a `STOP_SENDING` frame had been received for
    /// a stream
    pub fn on_stop_sending(&mut self, frame: &StopSending) -> Result<(), transport::Error> {
        let stream_id = StreamId::from_varint(frame.stream_id);
        self.handle_stream_frame(stream_id, |stream, events| {
            stream.on_stop_sending(frame, events)
        })
    }

    /// This is called when a `MAX_DATA` frame had been received
    pub fn on_max_data(&mut self, frame: MaxData) -> Result<(), transport::Error> {
        self.inner
            .outgoing_connection_flow_controller
            .on_max_data(frame);

        if self
            .inner
            .outgoing_connection_flow_controller
            .available_window()
            == VarInt::from_u32(0)
        {
            return Ok(());
        }

        // Iterate over streams and allow them to grab credits from the
        // connection window. As soon as we run out of credits we stop
        // iterating and insert the remaining streams to the end of the list
        // again.
        let conn_flow = &mut self.inner.outgoing_connection_flow_controller;
        self.inner.streams.iterate_connection_flow_credits_list(
            &mut self.inner.stream_controller,
            |stream| {
                stream.on_connection_window_available();

                if conn_flow.available_window() == VarInt::from_u32(0) {
                    StreamContainerIterationResult::BreakAndInsertAtBack
                } else {
                    StreamContainerIterationResult::Continue
                }
            },
        );

        Ok(())
    }

    /// This is called when a `STREAMS_BLOCKED` frame had been received
    pub fn on_streams_blocked(&mut self, _frame: &StreamsBlocked) -> Result<(), transport::Error> {
        //= https://www.rfc-editor.org/rfc/rfc9000#section-4.6
        //= type=TODO
        //= tracking-issue=244
        //= feature=Stream concurrency
        Ok(()) // TODO: Implement me
    }

    /// This is called when a `MAX_STREAMS` frame had been received
    pub fn on_max_streams(&mut self, frame: &MaxStreams) -> Result<(), transport::Error> {
        self.inner.stream_controller.on_max_streams(frame);

        Ok(())
    }

    // User APIs

    /// Executes an application API call on the given Stream if the Stream exists
    /// and returns the result of the API call.
    ///
    /// If the Stream does not exist `unknown_stream_result` will be returned.
    ///
    /// If the application call requires transmission of data, the QUIC connection
    /// thread will be notified through the [`WakeHandle`] in the provided [`ConnectionApiCallContext`].
    fn perform_api_call<F, R>(
        &mut self,
        stream_id: StreamId,
        unknown_stream_result: R,
        api_call_context: &mut ConnectionApiCallContext,
        func: F,
    ) -> R
    where
        F: FnOnce(&mut S) -> R,
    {
        let had_transmission_interest = self.inner.streams.has_transmission_interest();

        let result = self
            .inner
            .streams
            .with_stream(stream_id, &mut self.inner.stream_controller, |stream| {
                func(stream)
            })
            .unwrap_or(unknown_stream_result);

        // A wakeup is only triggered if the the transmission list is
        // now empty, but was previously not. The edge triggered behavior
        // minimizes the amount of necessary wakeups.
        let require_wakeup =
            !had_transmission_interest && self.inner.streams.has_transmission_interest();

        // TODO: This currently wakes the connection task while inside the connection Mutex.
        // It will be better if we return the `Waker` instead and perform the wakeup afterwards.
        if require_wakeup {
            api_call_context.wakeup_handle().wakeup();
        }

        result
    }

    pub fn poll_request(
        &mut self,
        stream_id: StreamId,
        api_call_context: &mut ConnectionApiCallContext,
        request: &mut ops::Request,
        context: Option<&Context>,
    ) -> Result<ops::Response, StreamError> {
        self.perform_api_call(
            stream_id,
            Err(StreamError::invalid_stream()),
            api_call_context,
            |stream| stream.poll_request(request, context),
        )
    }

    /// Returns whether or not streams have data to send
    pub fn has_pending_streams(&self) -> bool {
        self.inner.streams.has_pending_streams()
    }
}

impl<S: StreamTrait> timer::Provider for AbstractStreamManager<S> {
    #[inline]
    fn timers<Q: timer::Query>(&self, query: &mut Q) -> timer::Result {
        self.inner.stream_controller.timers(query)?;
        self.inner
            .outgoing_connection_flow_controller
            .timers(query)?;
        self.inner.streams.timers(query)?;
        Ok(())
    }
}

impl<S: StreamTrait> transmission::interest::Provider for AbstractStreamManager<S> {
    #[inline]
    fn transmission_interest<Q: transmission::interest::Query>(
        &self,
        query: &mut Q,
    ) -> transmission::interest::Result {
        self.inner.streams.transmission_interest(query)?;
        self.inner.stream_controller.transmission_interest(query)?;
        self.inner
            .incoming_connection_flow_controller
            .transmission_interest(query)?;
        self.inner
            .outgoing_connection_flow_controller
            .transmission_interest(query)?;

        Ok(())
    }
}

impl<S: StreamTrait> connection::finalization::Provider for AbstractStreamManager<S> {
    fn finalization_status(&self) -> connection::finalization::Status {
        if self.inner.close_reason.is_some() && self.inner.streams.nr_active_streams() == 0 {
            connection::finalization::Status::Final
        } else if self.inner.close_reason.is_some() && self.inner.streams.nr_active_streams() > 0 {
            connection::finalization::Status::Draining
        } else {
            connection::finalization::Status::Idle
        }
    }
}

// These are methods that StreamManager only exposes for test purposes.
//
// They might perform additional allocations, and may not be as safe to call
// due to being allowed to panic! when invariants are violated.

#[cfg(test)]
impl<S: StreamTrait> AbstractStreamManager<S> {
    /// Executes the given function using the outgoing flow controller
    pub fn with_outgoing_connection_flow_controller<F, R>(&mut self, func: F) -> R
    where
        F: FnOnce(&mut OutgoingConnectionFlowController) -> R,
    {
        func(&mut self.inner.outgoing_connection_flow_controller)
    }

    /// Executes the given function using the stream controller
    pub fn with_stream_controller<F, R>(&mut self, func: F) -> R
    where
        F: FnOnce(&mut stream::Controller) -> R,
    {
        func(&mut self.inner.stream_controller)
    }

    /// Asserts that a Stream with the given ID exists, and executes the provided
    /// function on it
    pub fn with_asserted_stream<F, R>(&mut self, stream_id: StreamId, func: F) -> R
    where
        F: FnOnce(&mut S) -> R,
    {
        self.inner
            .streams
            .with_stream(stream_id, &mut self.inner.stream_controller, func)
            .expect("Stream is open")
    }

    /// Returns the list of Stream IDs which is currently tracked by the
    /// [`StreamManager`].
    pub fn active_streams(&mut self) -> Vec<StreamId> {
        let mut results = Vec::new();
        self.inner
            .streams
            .iterate_streams(&mut self.inner.stream_controller, |stream| {
                results.push(stream.stream_id())
            });
        results
    }

    /// Returns the list of Stream IDs for Streams which are waiting for
    /// connection flow control credits.
    pub fn streams_waiting_for_connection_flow_control_credits(&mut self) -> Vec<StreamId> {
        let mut results = Vec::new();
        self.inner.streams.iterate_connection_flow_credits_list(
            &mut self.inner.stream_controller,
            |stream| {
                results.push(stream.stream_id());
                StreamContainerIterationResult::Continue
            },
        );
        results
    }

    /// Returns the list of Stream IDs for Streams which are waiting for
    /// delivery notifications.
    pub fn streams_waiting_for_delivery_notifications(&mut self) -> Vec<StreamId> {
        let mut results = Vec::new();
        self.inner.streams.iterate_frame_delivery_list(
            &mut self.inner.stream_controller,
            |stream| {
                results.push(stream.stream_id());
            },
        );
        results
    }

    /// Returns the list of Stream IDs for Streams which are waiting for
    /// transmission.
    pub fn streams_waiting_for_transmission(&mut self) -> Vec<StreamId> {
        let mut results = Vec::new();
        self.inner
            .streams
            .iterate_transmission_list(&mut self.inner.stream_controller, |stream| {
                results.push(stream.stream_id());
                StreamContainerIterationResult::Continue
            });
        results
    }

    /// Returns the list of Stream IDs for Streams which are waiting for
    /// retransmission.
    pub fn streams_waiting_for_retransmission(&mut self) -> Vec<StreamId> {
        let mut results = Vec::new();
        self.inner.streams.iterate_retransmission_list(
            &mut self.inner.stream_controller,
            |stream| {
                results.push(stream.stream_id());
                StreamContainerIterationResult::Continue
            },
        );
        results
    }
}

#[cfg(test)]
mod tests;