s2n-quic-core 0.81.0

Internal crate used by s2n-quic
Documentation
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
    endpoint::Endpoint,
    event::{self, EndpointPublisher, IntoEvent as _},
    inet::SocketAddress,
    io::{rx::Rx, tx::Tx},
    task::cooldown::Cooldown,
    time::clock::{ClockWithTimer, Timer},
};
use core::pin::Pin;

pub mod select;
use select::Select;

pub trait Stats {
    fn publish<P: event::EndpointPublisher>(&mut self, publisher: &mut P);
}

pub struct EventLoop<E, C, R, T, S> {
    pub endpoint: E,
    pub clock: C,
    pub rx: R,
    pub tx: T,
    pub cooldown: Cooldown,
    pub stats: S,
}

impl<E, C, R, T, S> EventLoop<E, C, R, T, S>
where
    E: Endpoint,
    C: ClockWithTimer,
    R: Rx<PathHandle = E::PathHandle>,
    T: Tx<PathHandle = E::PathHandle>,
    S: Stats,
{
    /// Starts running the endpoint event loop in an async task
    pub async fn start(self, local_addr: SocketAddress) {
        let Self {
            mut endpoint,
            clock,
            mut rx,
            mut tx,
            mut cooldown,
            mut stats,
        } = self;

        /// Creates a event publisher with the endpoint's subscriber
        macro_rules! publisher {
            ($timestamp:expr) => {{
                let timestamp = $timestamp;
                let subscriber = endpoint.subscriber();
                event::EndpointPublisherSubscriber::new(
                    event::builder::EndpointMeta {
                        endpoint_type: E::ENDPOINT_TYPE,
                        timestamp,
                    },
                    None,
                    subscriber,
                )
            }};
        }

        publisher!(clock.get_time()).on_platform_event_loop_started(
            event::builder::PlatformEventLoopStarted {
                local_address: local_addr.into_event(),
            },
        );

        let mut timer = clock.timer();

        loop {
            // Poll for RX readiness
            let rx_ready = rx.ready();

            // Poll for TX readiness
            let tx_ready = tx.ready();

            // Poll for any application-driven updates
            let mut wakeups = endpoint.wakeups(&clock);

            // TODO use the [pin macro](https://doc.rust-lang.org/std/pin/macro.pin.html) once
            // available in MSRV
            //
            // See https://github.com/aws/s2n-quic/issues/1751
            let wakeups = unsafe {
                // Safety: the wakeups future is on the stack and won't move
                Pin::new_unchecked(&mut wakeups)
            };

            // Poll for timer expiration
            let timer_ready = timer.ready();

            // Concurrently poll all of the futures and wake up on the first one that's ready
            let select = Select::new(rx_ready, tx_ready, wakeups, timer_ready);

            let select = cooldown.wrap(select);

            let select::Outcome {
                rx_result,
                tx_result,
                timeout_expired,
                application_wakeup,
            } = if let Ok(outcome) = select.await {
                outcome
            } else {
                // The endpoint has shut down; stop the event loop
                return;
            };

            // notify the application that we woke up and why
            let wakeup_timestamp = clock.get_time();
            {
                let mut publisher = publisher!(wakeup_timestamp);

                publisher.on_platform_event_loop_wakeup(event::builder::PlatformEventLoopWakeup {
                    timeout_expired,
                    rx_ready: rx_result.is_some(),
                    tx_ready: tx_result.is_some(),
                    application_wakeup,
                });

                stats.publish(&mut publisher);
            }

            match rx_result {
                Some(Ok(())) => {
                    // we received some packets. give them to the endpoint.
                    rx.queue(|queue| {
                        endpoint.receive(queue, &clock);
                    });
                }
                Some(Err(error)) => {
                    // The RX provider has encountered an error. shut down the event loop
                    let mut publisher = publisher!(clock.get_time());
                    rx.handle_error(error, &mut publisher);
                    return;
                }
                None => {
                    // We didn't receive any packets; nothing to do
                }
            }

            match tx_result {
                Some(Ok(())) => {
                    // The TX queue was full and now has capacity. The endpoint can now continue to
                    // transmit
                }
                Some(Err(error)) => {
                    // The RX provider has encountered an error. shut down the event loop
                    let mut publisher = publisher!(clock.get_time());
                    tx.handle_error(error, &mut publisher);
                    return;
                }
                None => {
                    // The TX queue is either waiting to be flushed or has capacity. Either way, we
                    // call `endpoint.transmit` to at least update the clock and poll any timer
                    // expirations.
                }
            }

            // Let the endpoint transmit, if possible
            tx.queue(|queue| {
                endpoint.transmit(queue, &clock);
            });

            // Get the next expiration from the endpoint and update the timer
            let timeout = endpoint.timeout();
            if let Some(timeout) = timeout {
                timer.update(timeout);
            }

            let sleep_timestamp = clock.get_time();
            // compute the relative timeout to the current time
            let timeout = timeout.map(|t| t.saturating_duration_since(sleep_timestamp));
            // compute how long it took to process the current iteration
            let processing_duration = sleep_timestamp.saturating_duration_since(wakeup_timestamp);

            // publish the event to the application
            publisher!(sleep_timestamp).on_platform_event_loop_sleep(
                event::builder::PlatformEventLoopSleep {
                    timeout,
                    processing_duration,
                },
            );
        }
    }
}