tycho_collator/utils/
async_dispatcher.rs1use std::pin::Pin;
2use std::sync::Arc;
3
4use anyhow::{Result, anyhow};
5use futures_util::StreamExt;
6use futures_util::future::Future;
7use futures_util::stream::FuturesUnordered;
8use tokio::sync::mpsc;
9
10pub const STANDARD_ASYNC_DISPATCHER_BUFFER_SIZE: usize = 100;
11
12pub type TaskFunc<W> = Box<dyn FnOnce(Arc<W>) -> Fut + Send>;
13pub type Fut = Pin<Box<dyn Future<Output = Result<()>> + Send>>;
14
15pub enum AsyncTask<W> {
16 Spawn(TaskFunc<W>),
17 Enqueue(TaskFunc<W>),
18}
19
20pub struct AsyncDispatcherContext<W> {
21 pub spawned_tasks_receiver: mpsc::Receiver<TaskFunc<W>>,
22 pub queued_tasks_receiver: mpsc::Receiver<TaskFunc<W>>,
23}
24
25pub struct AsyncDispatcher<W> {
26 descr: String,
27 queue_buffer_size: usize,
28 spawned_tasks_sender: mpsc::Sender<TaskFunc<W>>,
29 queued_tasks_sender: mpsc::Sender<TaskFunc<W>>,
30}
31
32impl<W> Clone for AsyncDispatcher<W> {
33 fn clone(&self) -> Self {
34 Self {
35 descr: self.descr.clone(),
36 queue_buffer_size: self.queue_buffer_size,
37 spawned_tasks_sender: self.spawned_tasks_sender.clone(),
38 queued_tasks_sender: self.queued_tasks_sender.clone(),
39 }
40 }
41}
42
43impl<W> AsyncDispatcher<W>
44where
45 W: Send + Sync + 'static,
46{
47 pub fn new(descr: &str, queue_buffer_size: usize) -> (Self, AsyncDispatcherContext<W>) {
48 let (spawned_tasks_sender, spawned_tasks_receiver) =
49 mpsc::channel::<TaskFunc<W>>(queue_buffer_size);
50 let (queued_tasks_sender, queued_tasks_receiver) =
51 mpsc::channel::<TaskFunc<W>>(queue_buffer_size);
52 let dispatcher = Self {
53 descr: descr.to_owned(),
54 queue_buffer_size,
55 spawned_tasks_sender,
56 queued_tasks_sender,
57 };
58 (dispatcher, AsyncDispatcherContext {
59 spawned_tasks_receiver,
60 queued_tasks_receiver,
61 })
62 }
63
64 pub fn run(&self, worker: Arc<W>, ctx: AsyncDispatcherContext<W>) {
65 let AsyncDispatcherContext {
66 mut spawned_tasks_receiver,
67 mut queued_tasks_receiver,
68 } = ctx;
69
70 let dispatcher_descr = self.descr.clone();
72 let queue_worker = worker.clone();
73 tokio::spawn(async move {
74 while let Some(func) = queued_tasks_receiver.recv().await {
75 let task_res = func(queue_worker.clone()).await;
76 if let Err(err) = task_res {
77 panic!(
78 "async dispatcher: {dispatcher_descr}: queued task result error! {err:?}",
79 )
80 }
81 }
82 });
83
84 let dispatcher_descr = self.descr.clone();
86 tokio::spawn(async move {
87 let mut futures = FuturesUnordered::new();
88 loop {
89 tokio::select! {
90 task_opt = spawned_tasks_receiver.recv() => match task_opt {
91 Some(func) => {
92 let join_task = tycho_util::futures::JoinTask::new(func(worker.clone()));
93 futures.push(join_task);
94 }
95 None => {
96 panic!("async dispatcher: {dispatcher_descr}: tasks channel closed!")
97 }
98 },
99 task_res = async {
100 if futures.is_empty() {
101 futures_util::future::pending::<Result<()>>().await
102 } else {
103 futures.next().await.unwrap()
104 }
105 } => {
106 if let Err(err) = task_res {
107 panic!(
108 "async dispatcher: {dispatcher_descr}: spawned task result error! {err:?}",
109 )
110 }
111 }
112 }
113 }
114 });
115 }
116
117 pub async fn spawn_task<F>(&self, func: F) -> Result<()>
118 where
119 F: FnOnce(Arc<W>) -> Fut + Send + 'static,
120 {
121 self.spawned_tasks_sender
122 .send(Box::new(func))
123 .await
124 .map_err(|err| {
125 anyhow!(
126 "async dispatcher: {}: spawned tasks receiver dropped {err:?}",
127 self.descr,
128 )
129 })
130 }
131
132 pub fn spawn_task_blocking<F>(&self, func: F) -> Result<()>
133 where
134 F: FnOnce(Arc<W>) -> Fut + Send + 'static,
135 {
136 self.spawned_tasks_sender
137 .blocking_send(Box::new(func))
138 .map_err(|err| {
139 anyhow!(
140 "async dispatcher: {}: spawned tasks receiver dropped {err:?}",
141 self.descr,
142 )
143 })
144 }
145
146 pub async fn enqueue_task<F>(&self, func: F) -> Result<()>
147 where
148 F: FnOnce(Arc<W>) -> Fut + Send + 'static,
149 {
150 self.queued_tasks_sender
151 .send(Box::new(func))
152 .await
153 .map_err(|err| {
154 anyhow!(
155 "async dispatcher: {}: queued tasks receiver dropped {err:?}",
156 self.descr,
157 )
158 })
159 }
160
161 pub fn enqueue_task_blocking<F>(&self, func: F) -> Result<()>
162 where
163 F: FnOnce(Arc<W>) -> Fut + Send + 'static,
164 {
165 self.queued_tasks_sender
166 .blocking_send(Box::new(func))
167 .map_err(|err| {
168 anyhow!(
169 "async dispatcher: {}: queued tasks receiver dropped {err:?}",
170 self.descr,
171 )
172 })
173 }
174}
175
176#[macro_export]
177macro_rules! method_to_async_closure {
178 ($method:ident, $($arg:expr),*) => {
179 move |worker| {
180 Box::pin(async move { worker.$method($($arg),*).await })
181 }
182 };
183}