Skip to main content

tycho_collator/utils/
task_descr.rs

1use std::future::Future;
2
3use tokio::sync::oneshot;
4
5pub struct TaskDesc<F: ?Sized, R> {
6    id: u64,
7    descr: String,
8    closure: Box<F>, // closure for execution
9    responder: Option<oneshot::Sender<R>>,
10}
11
12impl<F: ?Sized, R> TaskDesc<F, R> {
13    pub fn create(id: u64, descr: &str, closure: Box<F>) -> Self {
14        Self {
15            id,
16            descr: descr.into(),
17            closure,
18            responder: None,
19        }
20    }
21    pub fn create_with_responder(
22        id: u64,
23        descr: &str,
24        closure: Box<F>,
25    ) -> (Self, oneshot::Receiver<R>) {
26        let (sender, receiver) = oneshot::channel::<R>();
27        let task = Self {
28            id,
29            descr: descr.into(),
30            closure,
31            responder: Some(sender),
32        };
33        (task, receiver)
34    }
35    pub fn id(&self) -> u64 {
36        self.id
37    }
38    pub fn descr(&self) -> &str {
39        &self.descr
40    }
41    pub fn get_descr(&self) -> String {
42        self.descr.clone()
43    }
44    pub fn extract(self) -> (Box<F>, Option<oneshot::Sender<R>>) {
45        (self.closure, self.responder)
46    }
47    pub fn closure(&self) -> &F {
48        &self.closure
49    }
50    pub fn respond(self, res: R) -> Option<R> {
51        self.responder.respond(res)
52    }
53}
54
55pub trait TaskResponder<R> {
56    /// Respond to receiver with result and return None.
57    /// Return Some(res) if no responder or receiver exist
58    fn respond(self, res: R) -> Option<R>;
59}
60impl<R> TaskResponder<R> for Option<oneshot::Sender<R>> {
61    fn respond(self, res: R) -> Option<R> {
62        if let Some(responder) = self {
63            match responder.send(res) {
64                Ok(()) => None,
65                Err(res) => {
66                    tracing::warn!("response receiver dropped");
67                    Some(res)
68                }
69            }
70        } else {
71            Some(res)
72        }
73    }
74}
75
76#[allow(unused)]
77pub struct TaskResponseReceiver<R> {
78    inner_receiver: oneshot::Receiver<anyhow::Result<R>>,
79}
80#[allow(unused)]
81impl<R> TaskResponseReceiver<R>
82where
83    R: Send + 'static,
84{
85    pub fn create(receiver: oneshot::Receiver<anyhow::Result<R>>) -> Self {
86        Self {
87            inner_receiver: receiver,
88        }
89    }
90    pub async fn try_recv(self) -> anyhow::Result<R> {
91        // TODO: awaiting error and error in result are merged here, need to fix
92        self.inner_receiver.await?
93    }
94
95    /// Example:
96    /// ```ignore
97    /// let dispatcher = self.dispatcher.clone();
98    /// receiver.process_on_recv(|res| async move {
99    ///     dispatcher
100    ///        .enqueue_task(method_to_async_task_closure!(
101    ///             refresh_collation_sessions,
102    ///             res
103    ///         ))
104    ///         .await
105    /// });
106    /// ```
107    pub async fn process_on_recv<Fut>(
108        self,
109        process_callback: impl FnOnce(R) -> Fut + Send + 'static,
110    ) where
111        Fut: Future<Output = anyhow::Result<()>> + Send,
112    {
113        tokio::spawn(async move {
114            match self.try_recv().await {
115                Ok(res) => {
116                    if let Err(e) = process_callback(res).await {
117                        tracing::error!("Error processing task response: {e:?}");
118                        // TODO: may be unwind panic?
119                    }
120                }
121                Err(err) => tracing::error!("Error in task result or on receiving: {err:?}"),
122            }
123        });
124    }
125}
126
127#[allow(unused)]
128pub struct TaskResponseReceiverWithConvert<R, T>
129where
130    T: TryFrom<R>,
131{
132    _marker_t: std::marker::PhantomData<T>,
133    inner_receiver: oneshot::Receiver<anyhow::Result<R>>,
134}
135#[allow(unused)]
136impl<R, T> TaskResponseReceiverWithConvert<R, T>
137where
138    R: Send + 'static,
139    T: TryFrom<R, Error = anyhow::Error> + Send + 'static,
140{
141    pub fn create(receiver: oneshot::Receiver<anyhow::Result<R>>) -> Self {
142        Self {
143            _marker_t: std::marker::PhantomData,
144            inner_receiver: receiver,
145        }
146    }
147    pub async fn try_recv(self) -> anyhow::Result<T> {
148        // TODO: awaiting error and error in result are merged here, need to fix
149        self.inner_receiver.await?.and_then(|res| res.try_into())
150    }
151
152    /// Example:
153    /// ```ignore
154    /// let dispatcher = self.dispatcher.clone();
155    /// receiver.process_on_recv(|res| async move {
156    ///     dispatcher
157    ///        .enqueue_task(method_to_async_task_closure!(
158    ///             refresh_collation_sessions,
159    ///             res
160    ///         ))
161    ///         .await
162    /// });
163    /// ```
164    pub async fn process_on_recv<Fut>(
165        self,
166        process_callback: impl FnOnce(T) -> Fut + Send + 'static,
167    ) where
168        Fut: Future<Output = anyhow::Result<()>> + Send,
169    {
170        tokio::spawn(async move {
171            match self.try_recv().await {
172                Ok(res) => {
173                    if let Err(e) = process_callback(res).await {
174                        tracing::error!("Error processing task response: {e:?}");
175                        // TODO: may be unwind panic?
176                    }
177                }
178                Err(err) => tracing::error!("Error in task result or on receiving: {err:?}"),
179            }
180        });
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::TaskDesc;
187
188    #[test]
189    fn void_task_without_responder() {
190        let task = TaskDesc::create(
191            1,
192            "task descr",
193            Box::new(|| {
194                println!("task executed");
195            }),
196        );
197
198        let task_fn = task.closure();
199
200        task_fn();
201
202        let respond_res = task.respond(());
203        println!("resond_res: {respond_res:?}");
204
205        assert!(
206            respond_res.is_some(),
207            "task without responder should return Some(()) when call .respond()"
208        );
209    }
210
211    #[tokio::test]
212    async fn void_task_with_responder() {
213        let (task, receiver) = TaskDesc::create_with_responder(
214            1,
215            "task descr",
216            Box::new(|| {
217                println!("task executed");
218            }),
219        );
220
221        let task_fn = task.closure();
222
223        task_fn();
224
225        let respond_res = task.respond(());
226        println!("resond_res: {respond_res:?}");
227
228        assert!(
229            respond_res.is_none(),
230            "task with responder should return None when call .respond()"
231        );
232
233        let received_res = receiver.await;
234        println!("received_res: {received_res:?}");
235
236        assert!(received_res.is_ok());
237    }
238
239    #[test]
240    fn returning_task_without_responder() {
241        let task = TaskDesc::create(
242            1,
243            "task descr",
244            Box::new(|| {
245                println!("task executed");
246                String::from("task result")
247            }),
248        );
249
250        let task_fn = task.closure();
251
252        let res = task_fn();
253        println!("task res: {res:?}");
254
255        assert_eq!(&res, "task result");
256
257        let respond_res = task.respond(res);
258        println!("resond_res: {respond_res:?}");
259
260        assert!(
261            respond_res.is_some(),
262            "task without responder should return Some(res) when call .respond()"
263        );
264    }
265
266    #[tokio::test]
267    async fn returning_task_with_responder() {
268        let (task, receiver) = TaskDesc::create_with_responder(
269            1,
270            "task descr",
271            Box::new(|| {
272                println!("task executed");
273                String::from("task result")
274            }),
275        );
276
277        let task_fn = task.closure();
278
279        let res = task_fn();
280        println!("task res: {res:?}");
281
282        assert_eq!(&res, "task result");
283
284        let expected_received = res.clone();
285
286        let respond_res = task.respond(res);
287        println!("resond_res: {respond_res:?}");
288
289        assert!(
290            respond_res.is_none(),
291            "task with responder should return None when call .respond()"
292        );
293
294        let received_res = receiver.await;
295        println!("received_res: {received_res:?}");
296
297        assert!(received_res.is_ok());
298
299        let received = received_res.unwrap();
300
301        assert_eq!(received, expected_received);
302    }
303
304    #[tokio::test]
305    async fn returning_task_with_responder_and_dropped_receiver() {
306        let task = {
307            let (task, _receiver) = TaskDesc::create_with_responder(
308                1,
309                "task descr",
310                Box::new(|| {
311                    println!("task executed");
312                    String::from("task result")
313                }),
314            );
315            task
316        };
317
318        let task_fn = task.closure();
319
320        let res = task_fn();
321        println!("task res: {res:?}");
322
323        assert_eq!(&res, "task result");
324
325        let respond_res = task.respond(res);
326        println!("resond_res: {respond_res:?}");
327
328        assert!(
329            respond_res.is_some(),
330            "task with responder should return Some(res) when call .respond() when receiver is dropped"
331        );
332    }
333
334    async fn async_test_void_func() {
335        println!("async test void func executed");
336    }
337
338    async fn async_test_func() -> String {
339        println!("async test func executed");
340        String::from("async task result")
341    }
342
343    #[tokio::test]
344    async fn async_void_task_with_responder() {
345        let (task, receiver) = TaskDesc::create_with_responder(
346            1,
347            "task descr",
348            Box::new(|| {
349                println!("task executed");
350                async_test_void_func()
351            }),
352        );
353
354        let task_fn = task.closure();
355
356        let res = task_fn().await;
357        println!("task res: {res:?}");
358
359        let respond_res = task.respond(res);
360        println!("resond_res: {respond_res:?}");
361
362        assert!(
363            respond_res.is_none(),
364            "task with responder should return None when call .respond()"
365        );
366
367        let received_res = receiver.await;
368        println!("received_res: {received_res:?}");
369
370        assert!(received_res.is_ok());
371    }
372
373    #[tokio::test]
374    async fn returning_void_task_with_responder() {
375        let (task, receiver) = TaskDesc::create_with_responder(
376            1,
377            "task descr",
378            Box::new(|| {
379                println!("task executed");
380                async_test_func()
381            }),
382        );
383
384        let task_fn = task.closure();
385
386        let res = task_fn().await;
387        println!("task res: {res:?}");
388
389        assert_eq!(&res, "async task result");
390
391        let expected_received = res.clone();
392
393        let respond_res = task.respond(res);
394        println!("resond_res: {respond_res:?}");
395
396        assert!(
397            respond_res.is_none(),
398            "task with responder should return None when call .respond()"
399        );
400
401        let received_res = receiver.await;
402        println!("received_res: {received_res:?}");
403
404        assert!(received_res.is_ok());
405
406        let received = received_res.unwrap();
407
408        assert_eq!(received, expected_received);
409    }
410}