1#![allow(unused_variables)]
2
3use super::async_fn::*;
4use super::{sleep, Caller, Dispatcher, LockRef, ReturnType};
5use futures_lite::future::or;
6use std::time::{Duration, Instant};
7
8macro_rules! listen_impl {
9 ($dispatcher:ident, $callback:ident, $($p:ident),*) => {{
10 let (tx, rx) = async_channel::unbounded();
11 let lock = $dispatcher.get_lock();
12
13 let task = async move {
14 let mut pending = Vec::new();
15
16 loop {
17 let next_exec_instant = pending
19 .iter()
20 .map(|(instant, $($p),*)| instant)
21 .enumerate()
22 .min_by(|(_, a), (_, b)| Instant::cmp(a, b));
23
24 let mut sleep_time = Duration::MAX;
25
26 if let Some((i, instant)) = next_exec_instant {
27 let now = Instant::now();
28
29 if *instant < now {
31 let (_, $($p),*) = pending.remove(i);
34 let locked = lock.lock_ref().await;
35 let ret = $callback(&*locked, $($p),*).await;
36 ret.log();
37 continue;
38
39 } else {
40 sleep_time = *instant - now;
42 }
43 }
44
45 let pause = async {
46 sleep(sleep_time).await;
47 None
48 };
49
50 let recv = async {
51 Some(rx.recv().await)
52 };
53
54 match or(pause, recv).await {
55 Some(Ok(task)) => pending.push(task),
56 Some(_) => break,
57 None => (),
58 }
59 }
60
61 log::error!("Exiting deferred task");
62 };
63
64 let boxed = Box::pin(task);
65 $dispatcher.spawn(boxed);
66
67 Caller {
68 inner: Some(tx),
69 }
70 }}
71}
72
73macro_rules! defer_impl {
74 ($method:ident, $async_fn:ident, $($name:ident: $p:ident),*) => {
75 impl<T: Send, L: LockRef<Inner = T>> Dispatcher<L> {
76 pub fn $method<F, R, $($p: Send + 'static, )*>(&mut self, callback: F) -> Caller<(Instant, $($p, )*)>
77 where
78 for<'a> F: $async_fn<&'a T, $($p, )* Output = R>,
79 R: ReturnType
80 {
81 listen_impl!(self, callback, $($name),*)
82 }
83 }
84 }
85}
86
87defer_impl! {listen_ref_0, AsyncFn1, }
88defer_impl! {listen_ref_1, AsyncFn2, a: P1}
89defer_impl! {listen_ref_2, AsyncFn3, a: P1, b: P2}
90defer_impl! {listen_ref_3, AsyncFn4, a: P1, b: P2, c: P3}
91defer_impl! {listen_ref_4, AsyncFn5, a: P1, b: P2, c: P3, d: P4}
92defer_impl! {listen_ref_5, AsyncFn6, a: P1, b: P2, c: P3, d: P4, e: P5}
93defer_impl! {listen_ref_6, AsyncFn7, a: P1, b: P2, c: P3, d: P4, e: P5, f: P6}
94defer_impl! {listen_ref_7, AsyncFn8, a: P1, b: P2, c: P3, d: P4, e: P5, f: P6, g: P7}