Skip to main content

tycho_collator/utils/
async_dispatcher.rs

1use std::pin::Pin;
2use std::sync::Arc;
3
4use anyhow::{Result, anyhow};
5use futures_util::StreamExt;
6use futures_util::future::Future;
7use futures_util::stream::FuturesUnordered;
8use tokio::sync::mpsc;
9
10pub const STANDARD_ASYNC_DISPATCHER_BUFFER_SIZE: usize = 100;
11
12pub type TaskFunc<W> = Box<dyn FnOnce(Arc<W>) -> Fut + Send>;
13pub type Fut = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
14
15pub enum AsyncTask<W> {
16    Spawn(TaskFunc<W>),
17    Enqueue(TaskFunc<W>),
18}
19
20pub struct AsyncDispatcherContext<W> {
21    pub spawned_tasks_receiver: mpsc::Receiver<TaskFunc<W>>,
22    pub queued_tasks_receiver: mpsc::Receiver<TaskFunc<W>>,
23}
24
25pub struct AsyncDispatcher<W> {
26    descr: String,
27    queue_buffer_size: usize,
28    spawned_tasks_sender: mpsc::Sender<TaskFunc<W>>,
29    queued_tasks_sender: mpsc::Sender<TaskFunc<W>>,
30}
31
32impl<W> Clone for AsyncDispatcher<W> {
33    fn clone(&self) -> Self {
34        Self {
35            descr: self.descr.clone(),
36            queue_buffer_size: self.queue_buffer_size,
37            spawned_tasks_sender: self.spawned_tasks_sender.clone(),
38            queued_tasks_sender: self.queued_tasks_sender.clone(),
39        }
40    }
41}
42
43impl<W> AsyncDispatcher<W>
44where
45    W: Send + Sync + 'static,
46{
47    pub fn new(descr: &str, queue_buffer_size: usize) -> (Self, AsyncDispatcherContext<W>) {
48        let (spawned_tasks_sender, spawned_tasks_receiver) =
49            mpsc::channel::<TaskFunc<W>>(queue_buffer_size);
50        let (queued_tasks_sender, queued_tasks_receiver) =
51            mpsc::channel::<TaskFunc<W>>(queue_buffer_size);
52        let dispatcher = Self {
53            descr: descr.to_owned(),
54            queue_buffer_size,
55            spawned_tasks_sender,
56            queued_tasks_sender,
57        };
58        (dispatcher, AsyncDispatcherContext {
59            spawned_tasks_receiver,
60            queued_tasks_receiver,
61        })
62    }
63
64    pub fn run(&self, worker: Arc<W>, ctx: AsyncDispatcherContext<W>) {
65        let AsyncDispatcherContext {
66            mut spawned_tasks_receiver,
67            mut queued_tasks_receiver,
68        } = ctx;
69
70        // queued tasks
71        let dispatcher_descr = self.descr.clone();
72        let queue_worker = worker.clone();
73        tokio::spawn(async move {
74            while let Some(func) = queued_tasks_receiver.recv().await {
75                let task_res = func(queue_worker.clone()).await;
76                if let Err(err) = task_res {
77                    panic!(
78                        "async dispatcher: {dispatcher_descr}: queued task result error! {err:?}",
79                    )
80                }
81            }
82        });
83
84        // async tasks
85        let dispatcher_descr = self.descr.clone();
86        tokio::spawn(async move {
87            let mut futures = FuturesUnordered::new();
88            loop {
89                tokio::select! {
90                    task_opt = spawned_tasks_receiver.recv() => match task_opt {
91                        Some(func) => {
92                            let join_task = tycho_util::futures::JoinTask::new(func(worker.clone()));
93                            futures.push(join_task);
94                        }
95                        None => {
96                            panic!("async dispatcher: {dispatcher_descr}: tasks channel closed!")
97                        }
98                    },
99                    task_res = async {
100                        if futures.is_empty() {
101                            futures_util::future::pending::<Result<()>>().await
102                        } else {
103                            futures.next().await.unwrap()
104                        }
105                    } => {
106                        if let Err(err) = task_res {
107                            panic!(
108                                "async dispatcher: {dispatcher_descr}: spawned task result error! {err:?}",
109                            )
110                        }
111                    }
112                }
113            }
114        });
115    }
116
117    pub async fn spawn_task<F>(&self, func: F) -> Result<()>
118    where
119        F: FnOnce(Arc<W>) -> Fut + Send + 'static,
120    {
121        self.spawned_tasks_sender
122            .send(Box::new(func))
123            .await
124            .map_err(|err| {
125                anyhow!(
126                    "async dispatcher: {}: spawned tasks receiver dropped {err:?}",
127                    self.descr,
128                )
129            })
130    }
131
132    pub fn spawn_task_blocking<F>(&self, func: F) -> Result<()>
133    where
134        F: FnOnce(Arc<W>) -> Fut + Send + 'static,
135    {
136        self.spawned_tasks_sender
137            .blocking_send(Box::new(func))
138            .map_err(|err| {
139                anyhow!(
140                    "async dispatcher: {}: spawned tasks receiver dropped {err:?}",
141                    self.descr,
142                )
143            })
144    }
145
146    pub async fn enqueue_task<F>(&self, func: F) -> Result<()>
147    where
148        F: FnOnce(Arc<W>) -> Fut + Send + 'static,
149    {
150        self.queued_tasks_sender
151            .send(Box::new(func))
152            .await
153            .map_err(|err| {
154                anyhow!(
155                    "async dispatcher: {}: queued tasks receiver dropped {err:?}",
156                    self.descr,
157                )
158            })
159    }
160
161    pub fn enqueue_task_blocking<F>(&self, func: F) -> Result<()>
162    where
163        F: FnOnce(Arc<W>) -> Fut + Send + 'static,
164    {
165        self.queued_tasks_sender
166            .blocking_send(Box::new(func))
167            .map_err(|err| {
168                anyhow!(
169                    "async dispatcher: {}: queued tasks receiver dropped {err:?}",
170                    self.descr,
171                )
172            })
173    }
174}
175
176#[macro_export]
177macro_rules! method_to_async_closure {
178    ($method:ident, $($arg:expr),*) => {
179        move |worker| {
180            Box::pin(async move { worker.$method($($arg),*).await })
181        }
182    };
183}