s2n-quic-transport 0.16.0

Internal crate used by s2n-quic
Documentation
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

//! `StreamContainer` is a container for all Streams. It manages the permanent
//! map of all active Streams, as well as a variety of dynamic Stream lists.

// hide warnings from the intrusive_collections crate
#![allow(unknown_lints, clippy::non_send_fields_in_send_ty)]

use crate::{
    stream,
    stream::{stream_impl::StreamTrait, stream_interests::StreamInterests},
    transmission,
};
use alloc::rc::Rc;
use core::{cell::RefCell, ops::Deref};
use intrusive_collections::{
    intrusive_adapter, KeyAdapter, LinkedList, LinkedListLink, RBTree, RBTreeLink,
};
use s2n_quic_core::{stream::StreamId, time::timer};

// Intrusive list adapter for managing the list of `done` streams
intrusive_adapter!(DoneStreamsAdapter<S> = Rc<StreamNode<S>>: StreamNode<S> {
    done_streams_link: LinkedListLink
});

// Intrusive list adapter for managing the list of `waiting_for_frame_delivery` streams
intrusive_adapter!(WaitingForFrameDeliveryAdapter<S> = Rc<StreamNode<S>>: StreamNode<S> {
    waiting_for_frame_delivery_link: LinkedListLink
});

// Intrusive list adapter for managing the list of
// `waiting_for_transmission` streams
intrusive_adapter!(WaitingForTransmissionAdapter<S> = Rc<StreamNode<S>>: StreamNode<S> {
    waiting_for_transmission_link: LinkedListLink
});

// Intrusive list adapter for managing the list of
// `waiting_for_retransmission` streams
intrusive_adapter!(WaitingForRetransmissionAdapter<S> = Rc<StreamNode<S>>: StreamNode<S> {
    waiting_for_retransmission_link: LinkedListLink
});

// Intrusive list adapter for managing the list of
// `waiting_for_connection_flow_control_credits` streams
intrusive_adapter!(WaitingForConnectionFlowControlCreditsAdapter<S> = Rc<StreamNode<S>>: StreamNode<S> {
    waiting_for_connection_flow_control_credits_link: LinkedListLink
});

// Intrusive list adapter for managing the list of
// `waiting_for_stream_flow_control_credits` streams
intrusive_adapter!(WaitingForStreamFlowControlCreditsAdapter<S> = Rc<StreamNode<S>>: StreamNode<S> {
    waiting_for_stream_flow_control_credits_link: LinkedListLink
});

// Intrusive red black tree adapter for managing all streams in a tree for
// lookup by Stream ID
intrusive_adapter!(StreamTreeAdapter<S> = Rc<StreamNode<S>>: StreamNode<S> {
    tree_link: RBTreeLink
});

/// A wrapper around a `Stream` implementation which allows to insert the
/// it in multiple intrusive collections. The collections into which the `Stream`
/// gets inserted are referenced inside this `StreamNode`.
struct StreamNode<S> {
    /// This contains the actual implementation of the `Stream`
    inner: RefCell<S>,
    /// Allows the Stream to be part of the `stream_map` collection
    tree_link: RBTreeLink,
    /// Allows the Stream to be part of the `done_streams` collection
    done_streams_link: LinkedListLink,
    /// Allows the Stream to be part of the `waiting_for_frame_delivery` collection
    waiting_for_frame_delivery_link: LinkedListLink,
    /// Allows the Stream to be part of the `waiting_for_transmission` collection
    waiting_for_transmission_link: LinkedListLink,
    /// Allows the Stream to be part of the `waiting_for_transmission` collection
    waiting_for_retransmission_link: LinkedListLink,
    /// Allows the Stream to be part of the `waiting_for_connection_flow_control_credits` collection
    waiting_for_connection_flow_control_credits_link: LinkedListLink,
    /// Allows the Stream to be part of the `waiting_for_stream_flow_control_credits` collection
    waiting_for_stream_flow_control_credits_link: LinkedListLink,
}

impl<S> StreamNode<S> {
    /// Creates a new `StreamNode` which wraps the given Stream implementation of type `S`
    pub fn new(stream_impl: S) -> StreamNode<S> {
        StreamNode {
            inner: RefCell::new(stream_impl),
            tree_link: RBTreeLink::new(),
            done_streams_link: LinkedListLink::new(),
            waiting_for_frame_delivery_link: LinkedListLink::new(),
            waiting_for_transmission_link: LinkedListLink::new(),
            waiting_for_retransmission_link: LinkedListLink::new(),
            waiting_for_connection_flow_control_credits_link: LinkedListLink::new(),
            waiting_for_stream_flow_control_credits_link: LinkedListLink::new(),
        }
    }
}

// This is required to build an intrusive `RBTree` of `StreamNode`s which
// utilizes `StreamId`s as a key.
impl<'a, S: StreamTrait> KeyAdapter<'a> for StreamTreeAdapter<S> {
    type Key = StreamId;

    fn get_key(&self, x: &'a StreamNode<S>) -> StreamId {
        x.inner.borrow().stream_id()
    }
}

/// Obtains a `Rc<StreamNode>` from a `&StreamNode`.
///
/// This method is only safe to be called if the `StreamNode` is known to be
/// stored inside a `Rc`.
unsafe fn stream_node_rc_from_ref<S>(stream_node: &StreamNode<S>) -> Rc<StreamNode<S>> {
    // In order to be able to to get a `Rc` we construct a temporary `Rc`
    // from it using the `Rc::from_raw` API and clone the `Rc`.
    // The temporary `Rc` must be released without calling `drop`,
    // because this would decrement and thereby invalidate the refcount
    // (which wasn't changed by calling `Rc::from_raw`).
    let temp_node_ptr: core::mem::ManuallyDrop<Rc<StreamNode<S>>> = core::mem::ManuallyDrop::new(
        Rc::<StreamNode<S>>::from_raw(stream_node as *const StreamNode<S>),
    );
    temp_node_ptr.deref().clone()
}

/// Contains all secondary lists of Streams.
///
/// A Stream can be a member in any of those, in addition to being a member of
/// `StreamContainer::stream_map`.
struct InterestLists<S> {
    /// Streams which have been finalized
    done_streams: LinkedList<DoneStreamsAdapter<S>>,
    /// Streams which are waiting for packet acknowledgements and
    /// packet loss notifications
    waiting_for_frame_delivery: LinkedList<WaitingForFrameDeliveryAdapter<S>>,
    /// Streams which need to transmit data
    waiting_for_transmission: LinkedList<WaitingForTransmissionAdapter<S>>,
    /// Streams which need to transmit data
    waiting_for_retransmission: LinkedList<WaitingForRetransmissionAdapter<S>>,
    /// Streams which are blocked on transmission due to waiting on the
    /// connection flow control window to increase
    waiting_for_connection_flow_control_credits:
        LinkedList<WaitingForConnectionFlowControlCreditsAdapter<S>>,
    /// Streams which are blocked on transmission due to waiting on the
    /// stream flow control window to increase
    waiting_for_stream_flow_control_credits:
        LinkedList<WaitingForStreamFlowControlCreditsAdapter<S>>,
}

impl<S: StreamTrait> InterestLists<S> {
    fn new() -> Self {
        Self {
            done_streams: LinkedList::new(DoneStreamsAdapter::new()),
            waiting_for_frame_delivery: LinkedList::new(WaitingForFrameDeliveryAdapter::new()),
            waiting_for_transmission: LinkedList::new(WaitingForTransmissionAdapter::new()),
            waiting_for_retransmission: LinkedList::new(WaitingForRetransmissionAdapter::new()),
            waiting_for_connection_flow_control_credits: LinkedList::new(
                WaitingForConnectionFlowControlCreditsAdapter::new(),
            ),
            waiting_for_stream_flow_control_credits: LinkedList::new(
                WaitingForStreamFlowControlCreditsAdapter::new(),
            ),
        }
    }

    /// Update all interest lists based on latest interest reported by a Node
    fn update_interests(
        &mut self,
        node: &Rc<StreamNode<S>>,
        interests: StreamInterests,
        result: StreamContainerIterationResult,
    ) -> bool {
        // Note that all comparisons start by checking whether the stream is
        // already part of the given list. This is required in order for the
        // following operation to be safe. Inserting an element in a list while
        // it is already part of a (different) list can panic. Trying to remove
        // an element from a list while it is not actually part of the list
        // is undefined.

        macro_rules! sync_interests {
            ($interest:expr, $link_name:ident, $list_name:ident) => {
                if $interest != node.$link_name.is_linked() {
                    if $interest {
                        if matches!(result, StreamContainerIterationResult::Continue) {
                            self.$list_name.push_back(node.clone());
                        } else {
                            self.$list_name.push_front(node.clone());
                        }
                    } else {
                        // Safety: We know that the node is only ever part of this list.
                        // While elements are in temporary lists, they always get unlinked
                        // from those temporary lists while their interest is updated.
                        let mut cursor = unsafe {
                            self.$list_name
                                .cursor_mut_from_ptr(node.deref() as *const StreamNode<S>)
                        };
                        cursor.remove();
                    }
                }
                debug_assert_eq!($interest, node.$link_name.is_linked());
            };
        }

        sync_interests!(
            interests.delivery_notifications,
            waiting_for_frame_delivery_link,
            waiting_for_frame_delivery
        );
        sync_interests!(
            matches!(interests.transmission, transmission::Interest::NewData),
            waiting_for_transmission_link,
            waiting_for_transmission
        );
        sync_interests!(
            matches!(interests.transmission, transmission::Interest::LostData),
            waiting_for_retransmission_link,
            waiting_for_retransmission
        );
        sync_interests!(
            interests.connection_flow_control_credits,
            waiting_for_connection_flow_control_credits_link,
            waiting_for_connection_flow_control_credits
        );
        sync_interests!(
            interests.stream_flow_control_credits,
            waiting_for_stream_flow_control_credits_link,
            waiting_for_stream_flow_control_credits
        );

        if !interests.retained != node.done_streams_link.is_linked() {
            if !interests.retained {
                self.done_streams.push_back(node.clone());
            } else {
                panic!("Done streams should never report not done later");
            }
            true
        } else {
            false
        }
    }
}

/// A collection of all intrusive lists Streams are part of.
///
/// The container will automatically update the membership of a `Stream` in a
/// variety of interest lists after each interaction with the `Stream`.
///
/// The Stream container can be interacted with in 2 fashions:
/// - The `with_stream()` method allows users to obtain a mutable reference to
///   a single `Stream`. After the interaction was completed, the `Stream` will
///   be queried for its interests again.
/// - There exist a variety of iteration methods, which allow to iterate over
///   all or a subset of streams in each interest list.
pub struct StreamContainer<S> {
    /// Streams organized as a tree, for lookup by Stream ID
    stream_map: RBTree<StreamTreeAdapter<S>>,
    /// The number of streams which are tracked by the Container.
    /// This needs to be in-sync with Streams that get inserted into `stream_map`.
    nr_active_streams: usize,
    /// Additional interest lists in which Streams will be placed dynamically
    interest_lists: InterestLists<S>,
}

impl<S> core::fmt::Debug for StreamContainer<S> {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
        f.debug_struct("StreamContainer")
            .field("nr_active_streams", &self.nr_active_streams)
            .finish()
    }
}

macro_rules! iterate_uninterruptible {
    ($sel:ident, $list_name:tt, $link_name:ident, $controller:ident, $func:ident) => {
        for stream in $sel.interest_lists.$list_name.take() {
            debug_assert!(!stream.$link_name.is_linked());

            let interests = {
                let mut mut_stream = stream.inner.borrow_mut();
                $func(&mut *mut_stream);
                mut_stream.get_stream_interests()
            };

            $sel.interest_lists.update_interests(
                &stream,
                interests,
                StreamContainerIterationResult::Continue,
            );
        }

        if !$sel.interest_lists.done_streams.is_empty() {
            $sel.finalize_done_streams($controller);
        }
    };
}

macro_rules! iterate_interruptible {
    ($sel:ident, $list_name:tt, $link_name:ident, $controller:ident, $func:ident) => {
        let mut extracted_list = $sel.interest_lists.$list_name.take();
        let mut cursor = extracted_list.front_mut();

        while let Some(stream) = cursor.remove() {
            // Note that while we iterate over the intrusive lists here
            // `stream` is part of no list anymore, since it also got dropped
            // from list that is described by the `cursor`.
            debug_assert!(!stream.$link_name.is_linked());
            let mut mut_stream = stream.inner.borrow_mut();
            let result = $func(&mut *mut_stream);

            // Update the interests after the interaction
            let interests = mut_stream.get_stream_interests();
            $sel.interest_lists
                .update_interests(&stream, interests, result);

            match result {
                StreamContainerIterationResult::BreakAndInsertAtBack => {
                    $sel.interest_lists
                        .$list_name
                        .front_mut()
                        .splice_after(extracted_list);
                    break;
                }
                StreamContainerIterationResult::Continue => {}
            }
        }

        if !$sel.interest_lists.done_streams.is_empty() {
            $sel.finalize_done_streams($controller);
        }
    };
}

impl<S: StreamTrait> StreamContainer<S> {
    /// Creates a new `StreamContainer`
    pub fn new() -> Self {
        Self {
            stream_map: RBTree::new(StreamTreeAdapter::new()),
            nr_active_streams: 0,
            interest_lists: InterestLists::new(),
        }
    }

    /// Insert a new Stream into the container
    pub fn insert_stream(&mut self, stream: S) {
        // Even though it likely might have none, it seems like it
        // would be better to avoid future bugs
        let interests = stream.get_stream_interests();

        let new_stream = Rc::new(StreamNode::new(stream));

        self.interest_lists.update_interests(
            &new_stream,
            interests,
            StreamContainerIterationResult::Continue,
        );

        self.stream_map.insert(new_stream);
        self.nr_active_streams += 1;
    }

    /// Returns the amount of streams which are tracked by the `StreamContainer`
    pub fn nr_active_streams(&self) -> usize {
        self.nr_active_streams
    }

    /// Returns true if the container contains a Stream with the given ID
    pub fn contains(&self, stream_id: StreamId) -> bool {
        !self.stream_map.find(&stream_id).is_null()
    }

    /// Looks up the `Stream` with the given ID and executes the provided function
    /// on it.
    ///
    /// After the transaction with the `Stream` had been completed, the `Stream`
    /// will get queried for its new interests, and all lists will be updated
    /// according to those.
    ///
    /// The `stream::Controller` will be notified of streams that have been
    /// closed to allow for further streams to be opened.
    ///
    /// `Stream`s which signal finalization interest will be removed from the
    /// `StreamContainer`.
    pub fn with_stream<F, R>(
        &mut self,
        stream_id: StreamId,
        controller: &mut stream::Controller,
        func: F,
    ) -> Option<R>
    where
        F: FnOnce(&mut S) -> R,
    {
        let node_ptr: Rc<StreamNode<S>>;
        let result: R;
        let interests;

        // This block is required since we mutably borrow `self` inside the
        // block in order to obtain a Stream reference and to executing the
        // provided method.
        // We need to release the borrow in order to be able to update the
        // Streams interests after having executed the method.
        {
            let node = self.stream_map.find(&stream_id).get()?;

            // We have to obtain an `Rc<StreamNode>` in order to be able to
            // perform interest updates later on. However the intrusive tree
            // API only provides us a raw reference.
            // Safety: We know that all of our StreamNode's are stored in
            // `Rc` pointers.
            node_ptr = unsafe { stream_node_rc_from_ref(node) };

            let stream: &mut S = &mut node.inner.borrow_mut();
            result = func(stream);
            interests = stream.get_stream_interests();
        }

        // Update the interest lists after the interactions and then remove
        // all finalized streams
        if self.interest_lists.update_interests(
            &node_ptr,
            interests,
            StreamContainerIterationResult::Continue,
        ) {
            self.finalize_done_streams(controller);
        }

        Some(result)
    }

    /// Removes all Streams in the `done` state from the `StreamManager`.
    ///
    /// The `stream::Controller` will be notified of streams that have been
    /// closed to allow for further streams to be opened.
    pub fn finalize_done_streams(&mut self, controller: &mut stream::Controller) {
        for stream in self.interest_lists.done_streams.take() {
            // Remove the Stream from `stream_map`
            let mut cursor = self.stream_map.find_mut(&stream.inner.borrow().stream_id());
            let remove_result = cursor.remove();
            debug_assert!(remove_result.is_some());
            self.nr_active_streams -= 1;

            // And remove the Stream from all other interest lists it might be
            // part of.
            let stream_ptr = &*stream as *const StreamNode<S>;

            macro_rules! remove_stream_from_list {
                ($list_name:ident, $link_name:ident) => {
                    if stream.$link_name.is_linked() {
                        // Safety: We know that the Stream is part of the list,
                        // because it is linked, and we never place Streams in
                        // other lists when `finalize_done_streams` is called.
                        let mut cursor = unsafe {
                            self.interest_lists
                                .$list_name
                                .cursor_mut_from_ptr(stream_ptr)
                        };
                        let remove_result = cursor.remove();
                        debug_assert!(remove_result.is_some());
                    }
                };
            }

            remove_stream_from_list!(waiting_for_frame_delivery, waiting_for_frame_delivery_link);
            remove_stream_from_list!(waiting_for_transmission, waiting_for_transmission_link);
            remove_stream_from_list!(waiting_for_retransmission, waiting_for_retransmission_link);
            remove_stream_from_list!(
                waiting_for_connection_flow_control_credits,
                waiting_for_connection_flow_control_credits_link
            );
            remove_stream_from_list!(
                waiting_for_stream_flow_control_credits,
                waiting_for_stream_flow_control_credits_link
            );

            controller.on_close_stream(stream.inner.borrow().stream_id());
        }
    }

    /// Iterates over all `Stream`s which are waiting for frame delivery,
    /// and executes the given function on each `Stream`
    ///
    /// The `stream::Controller` will be notified of streams that have been
    /// closed to allow for further streams to be opened.
    pub fn iterate_frame_delivery_list<F>(
        &mut self,
        controller: &mut stream::Controller,
        mut func: F,
    ) where
        F: FnMut(&mut S),
    {
        iterate_uninterruptible!(
            self,
            waiting_for_frame_delivery,
            waiting_for_frame_delivery_link,
            controller,
            func
        );
    }

    /// Iterates over all `Stream`s which waiting for connection flow control
    /// credits, and executes the given function on each `Stream`
    ///
    /// The `stream::Controller` will be notified of streams that have been
    /// closed to allow for further streams to be opened.
    pub fn iterate_connection_flow_credits_list<F>(
        &mut self,
        controller: &mut stream::Controller,
        mut func: F,
    ) where
        F: FnMut(&mut S) -> StreamContainerIterationResult,
    {
        iterate_interruptible!(
            self,
            waiting_for_connection_flow_control_credits,
            waiting_for_connection_flow_control_credits_link,
            controller,
            func
        );
    }

    /// Iterates over all `Stream`s which are waiting for stream flow control
    /// credits, and executes the given function on each `Stream`
    ///
    /// The `stream::Controller` will be notified of streams that have been
    /// closed to allow for further streams to be opened.
    pub fn iterate_stream_flow_credits_list<F>(
        &mut self,
        controller: &mut stream::Controller,
        mut func: F,
    ) where
        F: FnMut(&mut S) -> StreamContainerIterationResult,
    {
        iterate_interruptible!(
            self,
            waiting_for_stream_flow_control_credits,
            waiting_for_stream_flow_control_credits_link,
            controller,
            func
        );
    }

    /// Iterates over all `Stream`s which are waiting for transmission,
    /// and executes the given function on each `Stream`
    ///
    /// The `stream::Controller` will be notified of streams that have been
    /// closed to allow for further streams to be opened.
    pub fn iterate_transmission_list<F>(&mut self, controller: &mut stream::Controller, mut func: F)
    where
        F: FnMut(&mut S) -> StreamContainerIterationResult,
    {
        iterate_interruptible!(
            self,
            waiting_for_transmission,
            waiting_for_transmission_link,
            controller,
            func
        );
    }

    /// Iterates over all `Stream`s which are waiting for retransmission,
    /// and executes the given function on each `Stream`
    ///
    /// The `stream::Controller` will be notified of streams that have been
    /// closed to allow for further streams to be opened.
    pub fn iterate_retransmission_list<F>(
        &mut self,
        controller: &mut stream::Controller,
        mut func: F,
    ) where
        F: FnMut(&mut S) -> StreamContainerIterationResult,
    {
        iterate_interruptible!(
            self,
            waiting_for_retransmission,
            waiting_for_retransmission_link,
            controller,
            func
        );
    }

    /// Iterates over all `Stream`s which are part of this container, and executes
    /// the given function on each `Stream`
    ///
    /// The `stream::Controller` will be notified of streams that have been
    /// closed to allow for further streams to be opened.
    pub fn iterate_streams<F>(&mut self, controller: &mut stream::Controller, mut func: F)
    where
        F: FnMut(&mut S),
    {
        // Note: We can not use iterate_uninterruptible here, because that
        // iteration will extract nodes from the list but not automatically insert
        // all Nodes back into the main interest list. `stream_map` is not
        // populated by the interest maps.

        for stream in self.stream_map.iter() {
            debug_assert!(stream.tree_link.is_linked());

            let mut mut_stream = stream.inner.borrow_mut();
            func(&mut *mut_stream);
            let interests = mut_stream.get_stream_interests();

            // Update the interest lists here
            // Safety: The stream reference is obtained from the RBTree, which
            // stores it's nodes as `Rc`
            let stream_node_rc = unsafe { stream_node_rc_from_ref(stream) };
            self.interest_lists.update_interests(
                &stream_node_rc,
                interests,
                StreamContainerIterationResult::Continue,
            );
        }

        if !self.interest_lists.done_streams.is_empty() {
            // Cleanup all `done` streams after we finished interacting with all
            // of them.
            self.finalize_done_streams(controller);
        }
    }

    /// Returns whether or not streams have data to send
    pub fn has_pending_streams(&self) -> bool {
        !self.interest_lists.waiting_for_transmission.is_empty()
            || !self.interest_lists.waiting_for_retransmission.is_empty()
    }
}

impl<S: StreamTrait> timer::Provider for StreamContainer<S> {
    #[inline]
    fn timers<Q: timer::Query>(&self, query: &mut Q) -> timer::Result {
        // TODO denormalize this into a single value
        for stream in self
            .interest_lists
            .waiting_for_stream_flow_control_credits
            .iter()
        {
            stream.inner.borrow().timers(query)?;
        }
        Ok(())
    }
}

impl<S: StreamTrait> transmission::interest::Provider for StreamContainer<S> {
    #[inline]
    fn transmission_interest<Q: transmission::interest::Query>(
        &self,
        query: &mut Q,
    ) -> transmission::interest::Result {
        if !self.interest_lists.waiting_for_retransmission.is_empty() {
            query.on_lost_data()?;
        } else if !self.interest_lists.waiting_for_transmission.is_empty() {
            query.on_new_data()?;
        }

        Ok(())
    }
}

/// Return values for iterations over a `Stream` list.
/// The value instructs the iterator whether iteration will be continued.
#[derive(Clone, Copy, Debug)]
pub enum StreamContainerIterationResult {
    /// Continue iteration over the list
    Continue,
    /// Aborts the iteration over a list and add the remaining items at the
    /// back of the list
    BreakAndInsertAtBack,
}