async_defer/
defer.rs

1#![allow(unused_variables)]
2
3use super::async_fn::*;
4use super::{sleep, Caller, Dispatcher, LockRef, ReturnType};
5use futures_lite::future::or;
6use std::time::{Duration, Instant};
7
8macro_rules! listen_impl {
9    ($dispatcher:ident, $callback:ident, $($p:ident),*) => {{
10        let (tx, rx) = async_channel::unbounded();
11        let lock = $dispatcher.get_lock();
12
13        let task = async move {
14            let mut pending = Vec::new();
15
16            loop {
17                // find earliest scheduled task
18                let next_exec_instant = pending
19                    .iter()
20                    .map(|(instant, $($p),*)| instant)
21                    .enumerate()
22                    .min_by(|(_, a), (_, b)| Instant::cmp(a, b));
23
24                let mut sleep_time = Duration::MAX;
25
26                if let Some((i, instant)) = next_exec_instant {
27                    let now = Instant::now();
28
29                    // is it time to execute the task?
30                    if *instant < now {
31                        // execute the task
32
33                        let (_, $($p),*) = pending.remove(i);
34                        let locked = lock.lock_ref().await;
35                        let ret = $callback(&*locked, $($p),*).await;
36                        ret.log();
37                        continue;
38
39                    } else {
40                        // sleep until the next scheduled execution
41                        sleep_time = *instant - now;
42                    }
43                }
44
45                let pause = async {
46                    sleep(sleep_time).await;
47                    None
48                };
49
50                let recv = async {
51                    Some(rx.recv().await)
52                };
53
54                match or(pause, recv).await {
55                    Some(Ok(task)) => pending.push(task),
56                    Some(_) => break,
57                    None => (/* end of pause */),
58                }
59            }
60
61            log::error!("Exiting deferred task");
62        };
63
64        let boxed = Box::pin(task);
65        $dispatcher.spawn(boxed);
66
67        Caller {
68            inner: Some(tx),
69        }
70    }}
71}
72
73macro_rules! defer_impl {
74    ($method:ident, $async_fn:ident, $($name:ident: $p:ident),*) => {
75        impl<T: Send, L: LockRef<Inner = T>> Dispatcher<L> {
76            pub fn $method<F, R, $($p: Send + 'static, )*>(&mut self, callback: F) -> Caller<(Instant, $($p, )*)>
77            where
78                for<'a> F: $async_fn<&'a T, $($p, )* Output = R>,
79                R: ReturnType
80            {
81                listen_impl!(self, callback, $($name),*)
82            }
83        }
84    }
85}
86
87defer_impl! {listen_ref_0, AsyncFn1, }
88defer_impl! {listen_ref_1, AsyncFn2, a: P1}
89defer_impl! {listen_ref_2, AsyncFn3, a: P1, b: P2}
90defer_impl! {listen_ref_3, AsyncFn4, a: P1, b: P2, c: P3}
91defer_impl! {listen_ref_4, AsyncFn5, a: P1, b: P2, c: P3, d: P4}
92defer_impl! {listen_ref_5, AsyncFn6, a: P1, b: P2, c: P3, d: P4, e: P5}
93defer_impl! {listen_ref_6, AsyncFn7, a: P1, b: P2, c: P3, d: P4, e: P5, f: P6}
94defer_impl! {listen_ref_7, AsyncFn8, a: P1, b: P2, c: P3, d: P4, e: P5, f: P6, g: P7}