mountpoint_s3_crt/io/
event_loop.rs

1//! Infrastructure for asynchronous green-threaded execution
2
3use std::fmt::Debug;
4use std::future::Future;
5use std::pin::Pin;
6use std::ptr::NonNull;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU8, Ordering};
9use std::task::{Context, Poll};
10use std::time::Duration;
11
12use futures::task::{FutureObj, Spawn, SpawnError};
13use mountpoint_s3_crt_sys::*;
14use thiserror::Error;
15
16use crate::CrtError as _;
17use crate::common::allocator::Allocator;
18use crate::common::error::Error;
19use crate::common::task_scheduler::{Task, TaskScheduler, TaskStatus};
20use crate::io::futures::FutureSpawner;
21use crate::io::io_library_init;
22
23/// An event loop that can be used to schedule and execute tasks
24#[derive(Debug)]
25pub struct EventLoop {
26    /// Pointer to the underlying `aws_event_loop`
27    pub(crate) inner: NonNull<aws_event_loop>,
28    /// Hold a cloned copy of the event loop group so that it's not destroyed while this event loop exists
29    _event_loop_group: EventLoopGroup,
30}
31
32// SAFETY: Event Loops are safe to send across threads, since they're the main way to schedule things onto other threads.
33// From aws_c_io README
34// > The functions we specify as thread-safe, we do so because those functions are necessary for abiding by the stated threading model.
35// > For example, since scheduling a task is the main function for addressing cross-threaded operations, it has to be thread-safe.
36// From event_loop.h:aws_event_loop_schedule_task_now
37// >  * This function may be called from outside or inside the event loop thread.
38unsafe impl Send for EventLoop {}
39// SAFETY: See above argument.
40unsafe impl Sync for EventLoop {}
41
42impl EventLoop {
43    /// Schedule a task to execute on this event loop at the specified time
44    fn schedule_task_future(&self, task: Task, when: u64) {
45        // SAFETY: self.inner is a valid aws_event_loop and into_aws_task_ptr leaks memory until
46        // the callback fires, so it will live as long as it needs to.
47        unsafe {
48            aws_event_loop_schedule_task_future(self.inner.as_ptr(), task.into_aws_task_ptr(), when);
49        }
50    }
51
52    /// Get the current timestamp for this event loop's clock
53    fn current_clock_time(&self) -> Result<u64, Error> {
54        // SAFETY: self.inner is a valid aws_event_loop.
55        unsafe {
56            let mut time_nanos: u64 = 0;
57            aws_event_loop_current_clock_time(self.inner.as_ptr(), &mut time_nanos).ok_or_last_error()?;
58            Ok(time_nanos)
59        }
60    }
61}
62
63impl crate::private::Sealed for EventLoop {}
64
65impl TaskScheduler for EventLoop {
66    fn schedule_task_now(&self, task: Task) -> Result<(), Error> {
67        // SAFETY: self.inner is a valid aws_event_loop and into_aws_task_ptr leaks memory until
68        // the callback fires, so it will live as long as it needs to.
69        unsafe {
70            aws_event_loop_schedule_task_now(self.inner.as_ptr(), task.into_aws_task_ptr());
71        }
72        Ok(())
73    }
74}
75
76/// EventLoops don't destroy anything on Drop, so we are free to duplicate the pointer to it. But we
77/// do need to clone the EventLoopGroup reference so that the group won't be Dropped while this
78/// reference to the EventLoop exists.
79impl Clone for EventLoop {
80    fn clone(&self) -> Self {
81        Self {
82            inner: self.inner,
83            _event_loop_group: self._event_loop_group.clone(),
84        }
85    }
86}
87
88/// An event loop group collects one or more [`EventLoop`]s together for processor affiniity and
89/// load balancing purposes
90#[derive(Debug)]
91pub struct EventLoopGroup {
92    pub(crate) inner: NonNull<aws_event_loop_group>,
93}
94
95// SAFETY: EventLoopGroups have to be safe to share across threads since they're the primary
96// mechanism the CRT provides for scheduling things on other threads.
97unsafe impl Send for EventLoopGroup {}
98// SAFETY: See above.
99unsafe impl Sync for EventLoopGroup {}
100
101impl EventLoopGroup {
102    /// Create a new default [EventLoopGroup].
103    /// max_threads: use None for the CRT default
104    /// on_shutdown will be called when the event loop group shuts down.
105    pub fn new_default(
106        allocator: &Allocator,
107        max_threads: Option<u16>,
108        on_shutdown: impl FnOnce() + Send + 'static,
109    ) -> Result<Self, Error> {
110        io_library_init(allocator);
111
112        let loop_count = max_threads.unwrap_or(0);
113
114        let user_data = Box::leak(Box::new(ShutdownCallbackUserData {
115            callback: Box::new(on_shutdown),
116        }));
117        let shutdown_options = aws_shutdown_callback_options {
118            shutdown_callback_fn: Some(shutdown_callback),
119            shutdown_callback_user_data: user_data as *mut ShutdownCallbackUserData as *mut libc::c_void,
120        };
121
122        let event_loop_group_options = aws_event_loop_group_options {
123            loop_count,
124            shutdown_options: &shutdown_options,
125            ..Default::default()
126        };
127
128        // SAFETY: `allocator` is a valid `aws_allocator`. If the creation of the event loop group
129        // fails, then (and only then), we drop the callback user_data, which is safe since we know
130        // the callback won't fire if the event loop group cannot be created.
131        let inner = unsafe {
132            aws_event_loop_group_new(allocator.inner.as_ptr(), &event_loop_group_options)
133                .ok_or_last_error()
134                .inspect_err(|_| {
135                    let user_data: Box<ShutdownCallbackUserData> = Box::from_raw(user_data);
136                    std::mem::drop(user_data);
137                })?
138        };
139
140        Ok(Self { inner })
141    }
142
143    /// Get the next event loop to schedule a task on. (Internally, the CRT will make a choice
144    /// on which loop in the group will be returned.)
145    pub fn get_next_loop(&self) -> Result<EventLoop, Error> {
146        // SAFETY: we make sure to embed a copy of the event loop group into the EventLoop struct so
147        // we don't free the group while we still have a reference to one of its event loops.
148        unsafe {
149            let inner = aws_event_loop_group_get_next_loop(self.inner.as_ptr()).ok_or_last_error()?;
150
151            Ok(EventLoop {
152                inner,
153                _event_loop_group: self.clone(),
154            })
155        }
156    }
157
158    /// Get the number of loops in this group.
159    pub fn get_loop_count(&self) -> usize {
160        // SAFETY: self.inner is a valid event_loop_group.
161        unsafe { aws_event_loop_group_get_loop_count(self.inner.as_ptr()) }
162    }
163}
164
165struct ShutdownCallbackUserData {
166    callback: Box<dyn FnOnce()>,
167}
168
169/// SAFETY: not safe to call directly, only let the CRT call this function as a callback.
170unsafe extern "C" fn shutdown_callback(user_data: *mut libc::c_void) {
171    assert!(!user_data.is_null());
172
173    // SAFETY: `user_data` was leaked in `EventLoopGroup::new_default`.
174    let user_data: Box<ShutdownCallbackUserData> = unsafe { Box::from_raw(user_data as *mut ShutdownCallbackUserData) };
175
176    (user_data.callback)();
177}
178
179impl crate::private::Sealed for EventLoopGroup {}
180
181/// Scheduling a task on an [EventLoopGroup] first finds the next [EventLoop] to use (as reported by
182/// the CRT), then uses that one to run the [Task].
183impl TaskScheduler for EventLoopGroup {
184    fn schedule_task_now(&self, task: Task) -> Result<(), Error> {
185        let event_loop = self.get_next_loop()?;
186        event_loop.schedule_task_now(task)
187    }
188}
189
190impl Spawn for EventLoopGroup {
191    fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
192        let _handle = self.spawn_future(future);
193        Ok(())
194    }
195}
196
197impl Clone for EventLoopGroup {
198    fn clone(&self) -> Self {
199        // SAFETY: aws_event_loop_group_acquire returns the same pointer as input, and self.inner is [NonNull].
200        let inner = unsafe { NonNull::new_unchecked(aws_event_loop_group_acquire(self.inner.as_ptr())) };
201        Self { inner }
202    }
203}
204
205impl Drop for EventLoopGroup {
206    fn drop(&mut self) {
207        // SAFETY: In Clone, we call acquire to increment the ref count, so on Drop it is safe to decerement by calling release.
208        unsafe {
209            aws_event_loop_group_release(self.inner.as_ptr());
210        }
211    }
212}
213
214/// EventLoopTimer is a Future that delays for some amount of time.
215/// Internally it schedules the timer on an event loop using schedule_task_future.
216#[derive(Debug)]
217pub struct EventLoopTimer {
218    /// The event loop that the timer task is running on.
219    event_loop: EventLoop,
220    /// How long the timer should run for.
221    duration: Duration,
222
223    /// State of the timer. See associated constants below.
224    /// 0 => Not yet started.
225    /// 1 => Scheduled but not fired.
226    /// 2 => Done.
227    /// 3 => Canceled.
228    state: Arc<AtomicU8>,
229}
230
231impl EventLoopTimer {
232    const TIMER_UNSCHEDULED: u8 = 0;
233    const TIMER_RUNNING: u8 = 1;
234    const TIMER_DONE: u8 = 2;
235    const TIMER_CANCELED: u8 = 3;
236
237    /// Create a new EventLoopTimer.
238    /// event_loop is the loop that the timer thread will run on (which doesn't need to be related
239    /// to the event loop the calling thread or future uses).
240    /// duration is how long the timer should delay for. The timer doesn't start until the first
241    /// time it's been polled / awaited upon.
242    pub fn new(event_loop: &EventLoop, duration: Duration) -> Self {
243        Self {
244            event_loop: event_loop.clone(),
245            duration,
246            state: Arc::new(AtomicU8::new(Self::TIMER_UNSCHEDULED)),
247        }
248    }
249}
250
251impl Future for EventLoopTimer {
252    type Output = Result<(), EventLoopTimerError>;
253
254    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
255        // If the timer callback has already fired, return with [Poll::Ready].
256        match self.state.load(Ordering::SeqCst) {
257            Self::TIMER_DONE => return Poll::Ready(Ok(())),
258            Self::TIMER_CANCELED => return Poll::Ready(Err(EventLoopTimerError::Canceled)),
259            _ => {}
260        }
261
262        // Set the state to RUNNING if the current state is UNSCHEDULED. If this fails (because the
263        // current state is not RUNNING), then another thread got here first so we can return
264        // [Poll::Pending] and this function will be called again when the timer completes.
265        if self
266            .state
267            .compare_exchange(
268                Self::TIMER_UNSCHEDULED,
269                Self::TIMER_RUNNING,
270                Ordering::SeqCst,
271                Ordering::SeqCst,
272            )
273            .is_err()
274        {
275            return Poll::Pending;
276        }
277
278        let now = match self.event_loop.current_clock_time() {
279            Ok(now) => now,
280            Err(e) => return Poll::Ready(Err(e.into())),
281        };
282
283        // 2^64 nanoseconds is almost 600 years, shrug
284        let nanos: u64 = self
285            .duration
286            .as_nanos()
287            .try_into()
288            .expect("cannot set a timer beyond 2^64 nanoseconds");
289
290        let waker = cx.waker().clone();
291        let state = self.state.clone();
292
293        self.event_loop.schedule_task_future(
294            Task::init(
295                &Allocator::default(),
296                move |status| {
297                    // Compute the new state the timer should move into. If the [Task] was canceled,
298                    // the timer Future should complete with an error.
299                    let new_state = match status {
300                        TaskStatus::RunReady => Self::TIMER_DONE,
301                        TaskStatus::Canceled => Self::TIMER_CANCELED,
302                    };
303                    // Store the new state, and wake up the future so that [TimerFuture::poll] gets
304                    // called again.
305                    state.store(new_state, Ordering::SeqCst);
306                    waker.wake()
307                },
308                "event_loop_timer",
309            ),
310            // Schedule the task to execute "nanos" nanoseconds into the future.
311            now + nanos,
312        );
313
314        Poll::Pending
315    }
316}
317
318/// [EventLoopTimer] failure results
319#[derive(Error, Debug)]
320pub enum EventLoopTimerError {
321    /// The timer was cancelled
322    #[error("The timer was cancelled")]
323    Canceled,
324
325    /// An internal error from the AWS Common Runtime
326    #[error("Internal CRT error: {0}")]
327    InternalError(#[from] crate::common::error::Error),
328}
329
330#[cfg(test)]
331mod test {
332    use super::*;
333    use crate::common::allocator::Allocator;
334    use crate::io::futures::FutureSpawner;
335    use futures::executor::block_on;
336    use std::sync::atomic::{AtomicI32, Ordering};
337    use std::sync::{Arc, mpsc};
338    use std::time::Duration;
339
340    /// How long each test should wait to receive values from channels. We set this deadline so that
341    /// if there's a bug, the tests won't try to spin forever.
342    const RECV_TIMEOUT: Duration = Duration::from_secs(5);
343
344    /// Test that scheduling tasks on the default event loop works, by scheduling a large number
345    /// of parallel tasks that all increment a counter.
346    #[test]
347    fn test_schedule_tasks_default_el_group() {
348        const NUM_TASKS: i32 = 2_000;
349
350        let allocator = Allocator::default();
351        let el_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
352
353        let counter = Arc::new(AtomicI32::new(0));
354
355        let (tx, rx) = mpsc::channel::<i32>();
356
357        for id in 0..NUM_TASKS {
358            let el = el_group.get_next_loop().unwrap();
359
360            let counter = counter.clone();
361            let tx = tx.clone();
362
363            let task = Task::init(
364                &allocator,
365                move |_| {
366                    counter.fetch_add(1, Ordering::SeqCst);
367                    tx.send(id).unwrap();
368                },
369                "test",
370            );
371
372            el.schedule_task_now(task).expect("failed to schedule task");
373        }
374
375        for _ in 0..NUM_TASKS {
376            rx.recv_timeout(RECV_TIMEOUT).unwrap();
377        }
378
379        let final_result = counter.load(Ordering::SeqCst);
380
381        assert_eq!(final_result, NUM_TASKS);
382    }
383
384    /// Test that the event loop group shutdown callback works.
385    #[test]
386    fn test_event_loop_group_shutdown() {
387        let allocator = Allocator::default();
388
389        let (tx, rx) = mpsc::channel();
390
391        {
392            let _el_group = EventLoopGroup::new_default(&allocator, None, move || tx.send(()).unwrap()).unwrap();
393        }
394
395        // Wait until the event loop group's shutdown callback fires.
396        rx.recv_timeout(RECV_TIMEOUT).unwrap();
397    }
398
399    /// Test [EventLoopGroup::get_loop_count]
400    #[test]
401    fn test_event_loop_group_get_loop_count() {
402        let allocator = Allocator::default();
403
404        let el_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
405
406        assert!(el_group.get_loop_count() > 0);
407    }
408
409    /// Test the EventLoopTimer with some simple timers.
410    #[test]
411    fn test_timer_future() {
412        let allocator = Allocator::default();
413
414        let el_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
415        let event_loop = el_group.get_next_loop().unwrap();
416
417        // Create two timers, each delaying 1 second. They won't start timing until they're awaited.
418        let timer1 = EventLoopTimer::new(&event_loop, Duration::from_secs(1));
419        let timer2 = EventLoopTimer::new(&event_loop, Duration::from_secs(1));
420
421        let before_nanos = event_loop
422            .current_clock_time()
423            .expect("Failed to get current clock time");
424
425        // Run a future that awaits both timers.
426        let handle = el_group.spawn_future(async {
427            timer1.await.expect("timer1 failed");
428            timer2.await.expect("timer2 failed");
429        });
430
431        // Wait for the future to complete
432        block_on(handle.into_future()).unwrap();
433
434        let after_nanos = event_loop
435            .current_clock_time()
436            .expect("Failed to get current clock time");
437
438        // At least 2 seconds should have passed by the time the future completes.
439        assert!(after_nanos > before_nanos + u64::try_from(Duration::from_secs(2).as_nanos()).unwrap());
440    }
441}