1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use crate::{
endpoint::Endpoint,
event::{self, EndpointPublisher, IntoEvent as _},
inet::SocketAddress,
io::{rx::Rx, tx::Tx},
task::cooldown::Cooldown,
time::clock::{ClockWithTimer, Timer},
};
use core::pin::Pin;
pub mod select;
use select::Select;
pub trait Stats {
fn publish<P: event::EndpointPublisher>(&mut self, publisher: &mut P);
}
pub struct EventLoop<E, C, R, T, S> {
pub endpoint: E,
pub clock: C,
pub rx: R,
pub tx: T,
pub cooldown: Cooldown,
pub stats: S,
}
impl<E, C, R, T, S> EventLoop<E, C, R, T, S>
where
E: Endpoint,
C: ClockWithTimer,
R: Rx<PathHandle = E::PathHandle>,
T: Tx<PathHandle = E::PathHandle>,
S: Stats,
{
/// Starts running the endpoint event loop in an async task
pub async fn start(self, local_addr: SocketAddress) {
let Self {
mut endpoint,
clock,
mut rx,
mut tx,
mut cooldown,
mut stats,
} = self;
/// Creates a event publisher with the endpoint's subscriber
macro_rules! publisher {
($timestamp:expr) => {{
let timestamp = $timestamp;
let subscriber = endpoint.subscriber();
event::EndpointPublisherSubscriber::new(
event::builder::EndpointMeta {
endpoint_type: E::ENDPOINT_TYPE,
timestamp,
},
None,
subscriber,
)
}};
}
publisher!(clock.get_time()).on_platform_event_loop_started(
event::builder::PlatformEventLoopStarted {
local_address: local_addr.into_event(),
},
);
let mut timer = clock.timer();
loop {
// Poll for RX readiness
let rx_ready = rx.ready();
// Poll for TX readiness
let tx_ready = tx.ready();
// Poll for any application-driven updates
let mut wakeups = endpoint.wakeups(&clock);
// TODO use the [pin macro](https://doc.rust-lang.org/std/pin/macro.pin.html) once
// available in MSRV
//
// See https://github.com/aws/s2n-quic/issues/1751
let wakeups = unsafe {
// Safety: the wakeups future is on the stack and won't move
Pin::new_unchecked(&mut wakeups)
};
// Poll for timer expiration
let timer_ready = timer.ready();
// Concurrently poll all of the futures and wake up on the first one that's ready
let select = Select::new(rx_ready, tx_ready, wakeups, timer_ready);
let select = cooldown.wrap(select);
let select::Outcome {
rx_result,
tx_result,
timeout_expired,
application_wakeup,
} = if let Ok(outcome) = select.await {
outcome
} else {
// The endpoint has shut down; stop the event loop
return;
};
// notify the application that we woke up and why
let wakeup_timestamp = clock.get_time();
{
let mut publisher = publisher!(wakeup_timestamp);
publisher.on_platform_event_loop_wakeup(event::builder::PlatformEventLoopWakeup {
timeout_expired,
rx_ready: rx_result.is_some(),
tx_ready: tx_result.is_some(),
application_wakeup,
});
stats.publish(&mut publisher);
}
match rx_result {
Some(Ok(())) => {
// we received some packets. give them to the endpoint.
rx.queue(|queue| {
endpoint.receive(queue, &clock);
});
}
Some(Err(error)) => {
// The RX provider has encountered an error. shut down the event loop
let mut publisher = publisher!(clock.get_time());
rx.handle_error(error, &mut publisher);
return;
}
None => {
// We didn't receive any packets; nothing to do
}
}
match tx_result {
Some(Ok(())) => {
// The TX queue was full and now has capacity. The endpoint can now continue to
// transmit
}
Some(Err(error)) => {
// The RX provider has encountered an error. shut down the event loop
let mut publisher = publisher!(clock.get_time());
tx.handle_error(error, &mut publisher);
return;
}
None => {
// The TX queue is either waiting to be flushed or has capacity. Either way, we
// call `endpoint.transmit` to at least update the clock and poll any timer
// expirations.
}
}
// Let the endpoint transmit, if possible
tx.queue(|queue| {
endpoint.transmit(queue, &clock);
});
// Get the next expiration from the endpoint and update the timer
let timeout = endpoint.timeout();
if let Some(timeout) = timeout {
timer.update(timeout);
}
let sleep_timestamp = clock.get_time();
// compute the relative timeout to the current time
let timeout = timeout.map(|t| t.saturating_duration_since(sleep_timestamp));
// compute how long it took to process the current iteration
let processing_duration = sleep_timestamp.saturating_duration_since(wakeup_timestamp);
// publish the event to the application
publisher!(sleep_timestamp).on_platform_event_loop_sleep(
event::builder::PlatformEventLoopSleep {
timeout,
processing_duration,
},
);
}
}
}