1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{
    endpoint::Endpoint,
    event::{self, EndpointPublisher},
    io::{rx::Rx, tx::Tx},
    time::clock::{ClockWithTimer, Timer},
};
use core::pin::Pin;

pub mod select;
use select::Select;

pub struct EventLoop<E, C, R, T> {
    pub endpoint: E,
    pub clock: C,
    pub rx: R,
    pub tx: T,
}

impl<E, C, R, T> EventLoop<E, C, R, T>
where
    E: Endpoint,
    C: ClockWithTimer,
    R: Rx<PathHandle = E::PathHandle>,
    T: Tx<PathHandle = E::PathHandle>,
{
    /// Starts running the endpoint event loop in an async task
    pub async fn start(self) {
        let Self {
            mut endpoint,
            clock,
            mut rx,
            mut tx,
        } = self;

        /// Creates a event publisher with the endpoint's subscriber
        macro_rules! publisher {
            ($timestamp:expr) => {{
                let timestamp = $timestamp;
                let subscriber = endpoint.subscriber();
                event::EndpointPublisherSubscriber::new(
                    event::builder::EndpointMeta {
                        endpoint_type: E::ENDPOINT_TYPE,
                        timestamp,
                    },
                    None,
                    subscriber,
                )
            }};
        }

        let mut timer = clock.timer();

        loop {
            // Poll for RX readiness
            let rx_ready = rx.ready();

            // Poll for TX readiness
            let tx_ready = tx.ready();

            // Poll for any application-driven updates
            let mut wakeups = endpoint.wakeups(&clock);

            // TODO use the [pin macro](https://doc.rust-lang.org/std/pin/macro.pin.html) once
            // available in MSRV
            //
            // See https://github.com/aws/s2n-quic/issues/1751
            let wakeups = unsafe {
                // Safety: the wakeups future is on the stack and won't move
                Pin::new_unchecked(&mut wakeups)
            };

            // Poll for timer expiration
            let timer_ready = timer.ready();

            // Concurrently poll all of the futures and wake up on the first one that's ready
            let select = Select::new(rx_ready, tx_ready, wakeups, timer_ready);

            let select::Outcome {
                rx_result,
                tx_result,
                timeout_expired,
                application_wakeup,
            } = if let Ok(outcome) = select.await {
                outcome
            } else {
                // The endpoint has shut down; stop the event loop
                return;
            };

            // notify the application that we woke up and why
            let wakeup_timestamp = clock.get_time();
            publisher!(wakeup_timestamp).on_platform_event_loop_wakeup(
                event::builder::PlatformEventLoopWakeup {
                    timeout_expired,
                    rx_ready: rx_result.is_some(),
                    tx_ready: tx_result.is_some(),
                    application_wakeup,
                },
            );

            match rx_result {
                Some(Ok(())) => {
                    // we received some packets. give them to the endpoint.
                    rx.queue(|queue| {
                        endpoint.receive(queue, &clock);
                    });
                }
                Some(Err(error)) => {
                    // The RX provider has encountered an error. shut down the event loop
                    let mut publisher = publisher!(clock.get_time());
                    rx.handle_error(error, &mut publisher);
                    return;
                }
                None => {
                    // We didn't receive any packets; nothing to do
                }
            }

            match tx_result {
                Some(Ok(())) => {
                    // The TX queue was full and now has capacity. The endpoint can now continue to
                    // transmit
                }
                Some(Err(error)) => {
                    // The RX provider has encountered an error. shut down the event loop
                    let mut publisher = publisher!(clock.get_time());
                    tx.handle_error(error, &mut publisher);
                    return;
                }
                None => {
                    // The TX queue is either waiting to be flushed or has capacity. Either way, we
                    // call `endpoint.transmit` to at least update the clock and poll any timer
                    // expirations.
                }
            }

            // Let the endpoint transmit, if possible
            tx.queue(|queue| {
                endpoint.transmit(queue, &clock);
            });

            // Get the next expiration from the endpoint and update the timer
            let timeout = endpoint.timeout();
            if let Some(timeout) = timeout {
                timer.update(timeout);
            }

            let sleep_timestamp = clock.get_time();
            // compute the relative timeout to the current time
            let timeout = timeout.map(|t| t.saturating_duration_since(sleep_timestamp));
            // compute how long it took to process the current iteration
            let processing_duration = sleep_timestamp.saturating_duration_since(wakeup_timestamp);

            // publish the event to the application
            publisher!(sleep_timestamp).on_platform_event_loop_sleep(
                event::builder::PlatformEventLoopSleep {
                    timeout,
                    processing_duration,
                },
            );
        }
    }
}