1use std::fmt::Debug;
4use std::future::Future;
5use std::pin::Pin;
6use std::ptr::NonNull;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU8, Ordering};
9use std::task::{Context, Poll};
10use std::time::Duration;
11
12use futures::task::{FutureObj, Spawn, SpawnError};
13use mountpoint_s3_crt_sys::*;
14use thiserror::Error;
15
16use crate::CrtError as _;
17use crate::common::allocator::Allocator;
18use crate::common::error::Error;
19use crate::common::task_scheduler::{Task, TaskScheduler, TaskStatus};
20use crate::io::futures::FutureSpawner;
21use crate::io::io_library_init;
22
23#[derive(Debug)]
25pub struct EventLoop {
26 pub(crate) inner: NonNull<aws_event_loop>,
28 _event_loop_group: EventLoopGroup,
30}
31
32unsafe impl Send for EventLoop {}
39unsafe impl Sync for EventLoop {}
41
42impl EventLoop {
43 fn schedule_task_future(&self, task: Task, when: u64) {
45 unsafe {
48 aws_event_loop_schedule_task_future(self.inner.as_ptr(), task.into_aws_task_ptr(), when);
49 }
50 }
51
52 fn current_clock_time(&self) -> Result<u64, Error> {
54 unsafe {
56 let mut time_nanos: u64 = 0;
57 aws_event_loop_current_clock_time(self.inner.as_ptr(), &mut time_nanos).ok_or_last_error()?;
58 Ok(time_nanos)
59 }
60 }
61}
62
63impl crate::private::Sealed for EventLoop {}
64
65impl TaskScheduler for EventLoop {
66 fn schedule_task_now(&self, task: Task) -> Result<(), Error> {
67 unsafe {
70 aws_event_loop_schedule_task_now(self.inner.as_ptr(), task.into_aws_task_ptr());
71 }
72 Ok(())
73 }
74}
75
76impl Clone for EventLoop {
80 fn clone(&self) -> Self {
81 Self {
82 inner: self.inner,
83 _event_loop_group: self._event_loop_group.clone(),
84 }
85 }
86}
87
88#[derive(Debug)]
91pub struct EventLoopGroup {
92 pub(crate) inner: NonNull<aws_event_loop_group>,
93}
94
95unsafe impl Send for EventLoopGroup {}
98unsafe impl Sync for EventLoopGroup {}
100
101impl EventLoopGroup {
102 pub fn new_default(
106 allocator: &Allocator,
107 max_threads: Option<u16>,
108 on_shutdown: impl FnOnce() + Send + 'static,
109 ) -> Result<Self, Error> {
110 io_library_init(allocator);
111
112 let loop_count = max_threads.unwrap_or(0);
113
114 let user_data = Box::leak(Box::new(ShutdownCallbackUserData {
115 callback: Box::new(on_shutdown),
116 }));
117 let shutdown_options = aws_shutdown_callback_options {
118 shutdown_callback_fn: Some(shutdown_callback),
119 shutdown_callback_user_data: user_data as *mut ShutdownCallbackUserData as *mut libc::c_void,
120 };
121
122 let event_loop_group_options = aws_event_loop_group_options {
123 loop_count,
124 shutdown_options: &shutdown_options,
125 ..Default::default()
126 };
127
128 let inner = unsafe {
132 aws_event_loop_group_new(allocator.inner.as_ptr(), &event_loop_group_options)
133 .ok_or_last_error()
134 .inspect_err(|_| {
135 let user_data: Box<ShutdownCallbackUserData> = Box::from_raw(user_data);
136 std::mem::drop(user_data);
137 })?
138 };
139
140 Ok(Self { inner })
141 }
142
143 pub fn get_next_loop(&self) -> Result<EventLoop, Error> {
146 unsafe {
149 let inner = aws_event_loop_group_get_next_loop(self.inner.as_ptr()).ok_or_last_error()?;
150
151 Ok(EventLoop {
152 inner,
153 _event_loop_group: self.clone(),
154 })
155 }
156 }
157
158 pub fn get_loop_count(&self) -> usize {
160 unsafe { aws_event_loop_group_get_loop_count(self.inner.as_ptr()) }
162 }
163}
164
165struct ShutdownCallbackUserData {
166 callback: Box<dyn FnOnce()>,
167}
168
169unsafe extern "C" fn shutdown_callback(user_data: *mut libc::c_void) {
171 assert!(!user_data.is_null());
172
173 let user_data: Box<ShutdownCallbackUserData> = unsafe { Box::from_raw(user_data as *mut ShutdownCallbackUserData) };
175
176 (user_data.callback)();
177}
178
179impl crate::private::Sealed for EventLoopGroup {}
180
181impl TaskScheduler for EventLoopGroup {
184 fn schedule_task_now(&self, task: Task) -> Result<(), Error> {
185 let event_loop = self.get_next_loop()?;
186 event_loop.schedule_task_now(task)
187 }
188}
189
190impl Spawn for EventLoopGroup {
191 fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
192 let _handle = self.spawn_future(future);
193 Ok(())
194 }
195}
196
197impl Clone for EventLoopGroup {
198 fn clone(&self) -> Self {
199 let inner = unsafe { NonNull::new_unchecked(aws_event_loop_group_acquire(self.inner.as_ptr())) };
201 Self { inner }
202 }
203}
204
205impl Drop for EventLoopGroup {
206 fn drop(&mut self) {
207 unsafe {
209 aws_event_loop_group_release(self.inner.as_ptr());
210 }
211 }
212}
213
214#[derive(Debug)]
217pub struct EventLoopTimer {
218 event_loop: EventLoop,
220 duration: Duration,
222
223 state: Arc<AtomicU8>,
229}
230
231impl EventLoopTimer {
232 const TIMER_UNSCHEDULED: u8 = 0;
233 const TIMER_RUNNING: u8 = 1;
234 const TIMER_DONE: u8 = 2;
235 const TIMER_CANCELED: u8 = 3;
236
237 pub fn new(event_loop: &EventLoop, duration: Duration) -> Self {
243 Self {
244 event_loop: event_loop.clone(),
245 duration,
246 state: Arc::new(AtomicU8::new(Self::TIMER_UNSCHEDULED)),
247 }
248 }
249}
250
251impl Future for EventLoopTimer {
252 type Output = Result<(), EventLoopTimerError>;
253
254 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
255 match self.state.load(Ordering::SeqCst) {
257 Self::TIMER_DONE => return Poll::Ready(Ok(())),
258 Self::TIMER_CANCELED => return Poll::Ready(Err(EventLoopTimerError::Canceled)),
259 _ => {}
260 }
261
262 if self
266 .state
267 .compare_exchange(
268 Self::TIMER_UNSCHEDULED,
269 Self::TIMER_RUNNING,
270 Ordering::SeqCst,
271 Ordering::SeqCst,
272 )
273 .is_err()
274 {
275 return Poll::Pending;
276 }
277
278 let now = match self.event_loop.current_clock_time() {
279 Ok(now) => now,
280 Err(e) => return Poll::Ready(Err(e.into())),
281 };
282
283 let nanos: u64 = self
285 .duration
286 .as_nanos()
287 .try_into()
288 .expect("cannot set a timer beyond 2^64 nanoseconds");
289
290 let waker = cx.waker().clone();
291 let state = self.state.clone();
292
293 self.event_loop.schedule_task_future(
294 Task::init(
295 &Allocator::default(),
296 move |status| {
297 let new_state = match status {
300 TaskStatus::RunReady => Self::TIMER_DONE,
301 TaskStatus::Canceled => Self::TIMER_CANCELED,
302 };
303 state.store(new_state, Ordering::SeqCst);
306 waker.wake()
307 },
308 "event_loop_timer",
309 ),
310 now + nanos,
312 );
313
314 Poll::Pending
315 }
316}
317
318#[derive(Error, Debug)]
320pub enum EventLoopTimerError {
321 #[error("The timer was cancelled")]
323 Canceled,
324
325 #[error("Internal CRT error: {0}")]
327 InternalError(#[from] crate::common::error::Error),
328}
329
330#[cfg(test)]
331mod test {
332 use super::*;
333 use crate::common::allocator::Allocator;
334 use crate::io::futures::FutureSpawner;
335 use futures::executor::block_on;
336 use std::sync::atomic::{AtomicI32, Ordering};
337 use std::sync::{Arc, mpsc};
338 use std::time::Duration;
339
340 const RECV_TIMEOUT: Duration = Duration::from_secs(5);
343
344 #[test]
347 fn test_schedule_tasks_default_el_group() {
348 const NUM_TASKS: i32 = 2_000;
349
350 let allocator = Allocator::default();
351 let el_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
352
353 let counter = Arc::new(AtomicI32::new(0));
354
355 let (tx, rx) = mpsc::channel::<i32>();
356
357 for id in 0..NUM_TASKS {
358 let el = el_group.get_next_loop().unwrap();
359
360 let counter = counter.clone();
361 let tx = tx.clone();
362
363 let task = Task::init(
364 &allocator,
365 move |_| {
366 counter.fetch_add(1, Ordering::SeqCst);
367 tx.send(id).unwrap();
368 },
369 "test",
370 );
371
372 el.schedule_task_now(task).expect("failed to schedule task");
373 }
374
375 for _ in 0..NUM_TASKS {
376 rx.recv_timeout(RECV_TIMEOUT).unwrap();
377 }
378
379 let final_result = counter.load(Ordering::SeqCst);
380
381 assert_eq!(final_result, NUM_TASKS);
382 }
383
384 #[test]
386 fn test_event_loop_group_shutdown() {
387 let allocator = Allocator::default();
388
389 let (tx, rx) = mpsc::channel();
390
391 {
392 let _el_group = EventLoopGroup::new_default(&allocator, None, move || tx.send(()).unwrap()).unwrap();
393 }
394
395 rx.recv_timeout(RECV_TIMEOUT).unwrap();
397 }
398
399 #[test]
401 fn test_event_loop_group_get_loop_count() {
402 let allocator = Allocator::default();
403
404 let el_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
405
406 assert!(el_group.get_loop_count() > 0);
407 }
408
409 #[test]
411 fn test_timer_future() {
412 let allocator = Allocator::default();
413
414 let el_group = EventLoopGroup::new_default(&allocator, None, || {}).unwrap();
415 let event_loop = el_group.get_next_loop().unwrap();
416
417 let timer1 = EventLoopTimer::new(&event_loop, Duration::from_secs(1));
419 let timer2 = EventLoopTimer::new(&event_loop, Duration::from_secs(1));
420
421 let before_nanos = event_loop
422 .current_clock_time()
423 .expect("Failed to get current clock time");
424
425 let handle = el_group.spawn_future(async {
427 timer1.await.expect("timer1 failed");
428 timer2.await.expect("timer2 failed");
429 });
430
431 block_on(handle.into_future()).unwrap();
433
434 let after_nanos = event_loop
435 .current_clock_time()
436 .expect("Failed to get current clock time");
437
438 assert!(after_nanos > before_nanos + u64::try_from(Duration::from_secs(2).as_nanos()).unwrap());
440 }
441}