1use std::future::Future;
2
3use tokio::sync::oneshot;
4
5pub struct TaskDesc<F: ?Sized, R> {
6 id: u64,
7 descr: String,
8 closure: Box<F>, responder: Option<oneshot::Sender<R>>,
10}
11
12impl<F: ?Sized, R> TaskDesc<F, R> {
13 pub fn create(id: u64, descr: &str, closure: Box<F>) -> Self {
14 Self {
15 id,
16 descr: descr.into(),
17 closure,
18 responder: None,
19 }
20 }
21 pub fn create_with_responder(
22 id: u64,
23 descr: &str,
24 closure: Box<F>,
25 ) -> (Self, oneshot::Receiver<R>) {
26 let (sender, receiver) = oneshot::channel::<R>();
27 let task = Self {
28 id,
29 descr: descr.into(),
30 closure,
31 responder: Some(sender),
32 };
33 (task, receiver)
34 }
35 pub fn id(&self) -> u64 {
36 self.id
37 }
38 pub fn descr(&self) -> &str {
39 &self.descr
40 }
41 pub fn get_descr(&self) -> String {
42 self.descr.clone()
43 }
44 pub fn extract(self) -> (Box<F>, Option<oneshot::Sender<R>>) {
45 (self.closure, self.responder)
46 }
47 pub fn closure(&self) -> &F {
48 &self.closure
49 }
50 pub fn respond(self, res: R) -> Option<R> {
51 self.responder.respond(res)
52 }
53}
54
55pub trait TaskResponder<R> {
56 fn respond(self, res: R) -> Option<R>;
59}
60impl<R> TaskResponder<R> for Option<oneshot::Sender<R>> {
61 fn respond(self, res: R) -> Option<R> {
62 if let Some(responder) = self {
63 match responder.send(res) {
64 Ok(()) => None,
65 Err(res) => {
66 tracing::warn!("response receiver dropped");
67 Some(res)
68 }
69 }
70 } else {
71 Some(res)
72 }
73 }
74}
75
76#[allow(unused)]
77pub struct TaskResponseReceiver<R> {
78 inner_receiver: oneshot::Receiver<anyhow::Result<R>>,
79}
80#[allow(unused)]
81impl<R> TaskResponseReceiver<R>
82where
83 R: Send + 'static,
84{
85 pub fn create(receiver: oneshot::Receiver<anyhow::Result<R>>) -> Self {
86 Self {
87 inner_receiver: receiver,
88 }
89 }
90 pub async fn try_recv(self) -> anyhow::Result<R> {
91 self.inner_receiver.await?
93 }
94
95 pub async fn process_on_recv<Fut>(
108 self,
109 process_callback: impl FnOnce(R) -> Fut + Send + 'static,
110 ) where
111 Fut: Future<Output = anyhow::Result<()>> + Send,
112 {
113 tokio::spawn(async move {
114 match self.try_recv().await {
115 Ok(res) => {
116 if let Err(e) = process_callback(res).await {
117 tracing::error!("Error processing task response: {e:?}");
118 }
120 }
121 Err(err) => tracing::error!("Error in task result or on receiving: {err:?}"),
122 }
123 });
124 }
125}
126
127#[allow(unused)]
128pub struct TaskResponseReceiverWithConvert<R, T>
129where
130 T: TryFrom<R>,
131{
132 _marker_t: std::marker::PhantomData<T>,
133 inner_receiver: oneshot::Receiver<anyhow::Result<R>>,
134}
135#[allow(unused)]
136impl<R, T> TaskResponseReceiverWithConvert<R, T>
137where
138 R: Send + 'static,
139 T: TryFrom<R, Error = anyhow::Error> + Send + 'static,
140{
141 pub fn create(receiver: oneshot::Receiver<anyhow::Result<R>>) -> Self {
142 Self {
143 _marker_t: std::marker::PhantomData,
144 inner_receiver: receiver,
145 }
146 }
147 pub async fn try_recv(self) -> anyhow::Result<T> {
148 self.inner_receiver.await?.and_then(|res| res.try_into())
150 }
151
152 pub async fn process_on_recv<Fut>(
165 self,
166 process_callback: impl FnOnce(T) -> Fut + Send + 'static,
167 ) where
168 Fut: Future<Output = anyhow::Result<()>> + Send,
169 {
170 tokio::spawn(async move {
171 match self.try_recv().await {
172 Ok(res) => {
173 if let Err(e) = process_callback(res).await {
174 tracing::error!("Error processing task response: {e:?}");
175 }
177 }
178 Err(err) => tracing::error!("Error in task result or on receiving: {err:?}"),
179 }
180 });
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::TaskDesc;
187
188 #[test]
189 fn void_task_without_responder() {
190 let task = TaskDesc::create(
191 1,
192 "task descr",
193 Box::new(|| {
194 println!("task executed");
195 }),
196 );
197
198 let task_fn = task.closure();
199
200 task_fn();
201
202 let respond_res = task.respond(());
203 println!("resond_res: {respond_res:?}");
204
205 assert!(
206 respond_res.is_some(),
207 "task without responder should return Some(()) when call .respond()"
208 );
209 }
210
211 #[tokio::test]
212 async fn void_task_with_responder() {
213 let (task, receiver) = TaskDesc::create_with_responder(
214 1,
215 "task descr",
216 Box::new(|| {
217 println!("task executed");
218 }),
219 );
220
221 let task_fn = task.closure();
222
223 task_fn();
224
225 let respond_res = task.respond(());
226 println!("resond_res: {respond_res:?}");
227
228 assert!(
229 respond_res.is_none(),
230 "task with responder should return None when call .respond()"
231 );
232
233 let received_res = receiver.await;
234 println!("received_res: {received_res:?}");
235
236 assert!(received_res.is_ok());
237 }
238
239 #[test]
240 fn returning_task_without_responder() {
241 let task = TaskDesc::create(
242 1,
243 "task descr",
244 Box::new(|| {
245 println!("task executed");
246 String::from("task result")
247 }),
248 );
249
250 let task_fn = task.closure();
251
252 let res = task_fn();
253 println!("task res: {res:?}");
254
255 assert_eq!(&res, "task result");
256
257 let respond_res = task.respond(res);
258 println!("resond_res: {respond_res:?}");
259
260 assert!(
261 respond_res.is_some(),
262 "task without responder should return Some(res) when call .respond()"
263 );
264 }
265
266 #[tokio::test]
267 async fn returning_task_with_responder() {
268 let (task, receiver) = TaskDesc::create_with_responder(
269 1,
270 "task descr",
271 Box::new(|| {
272 println!("task executed");
273 String::from("task result")
274 }),
275 );
276
277 let task_fn = task.closure();
278
279 let res = task_fn();
280 println!("task res: {res:?}");
281
282 assert_eq!(&res, "task result");
283
284 let expected_received = res.clone();
285
286 let respond_res = task.respond(res);
287 println!("resond_res: {respond_res:?}");
288
289 assert!(
290 respond_res.is_none(),
291 "task with responder should return None when call .respond()"
292 );
293
294 let received_res = receiver.await;
295 println!("received_res: {received_res:?}");
296
297 assert!(received_res.is_ok());
298
299 let received = received_res.unwrap();
300
301 assert_eq!(received, expected_received);
302 }
303
304 #[tokio::test]
305 async fn returning_task_with_responder_and_dropped_receiver() {
306 let task = {
307 let (task, _receiver) = TaskDesc::create_with_responder(
308 1,
309 "task descr",
310 Box::new(|| {
311 println!("task executed");
312 String::from("task result")
313 }),
314 );
315 task
316 };
317
318 let task_fn = task.closure();
319
320 let res = task_fn();
321 println!("task res: {res:?}");
322
323 assert_eq!(&res, "task result");
324
325 let respond_res = task.respond(res);
326 println!("resond_res: {respond_res:?}");
327
328 assert!(
329 respond_res.is_some(),
330 "task with responder should return Some(res) when call .respond() when receiver is dropped"
331 );
332 }
333
334 async fn async_test_void_func() {
335 println!("async test void func executed");
336 }
337
338 async fn async_test_func() -> String {
339 println!("async test func executed");
340 String::from("async task result")
341 }
342
343 #[tokio::test]
344 async fn async_void_task_with_responder() {
345 let (task, receiver) = TaskDesc::create_with_responder(
346 1,
347 "task descr",
348 Box::new(|| {
349 println!("task executed");
350 async_test_void_func()
351 }),
352 );
353
354 let task_fn = task.closure();
355
356 let res = task_fn().await;
357 println!("task res: {res:?}");
358
359 let respond_res = task.respond(res);
360 println!("resond_res: {respond_res:?}");
361
362 assert!(
363 respond_res.is_none(),
364 "task with responder should return None when call .respond()"
365 );
366
367 let received_res = receiver.await;
368 println!("received_res: {received_res:?}");
369
370 assert!(received_res.is_ok());
371 }
372
373 #[tokio::test]
374 async fn returning_void_task_with_responder() {
375 let (task, receiver) = TaskDesc::create_with_responder(
376 1,
377 "task descr",
378 Box::new(|| {
379 println!("task executed");
380 async_test_func()
381 }),
382 );
383
384 let task_fn = task.closure();
385
386 let res = task_fn().await;
387 println!("task res: {res:?}");
388
389 assert_eq!(&res, "async task result");
390
391 let expected_received = res.clone();
392
393 let respond_res = task.respond(res);
394 println!("resond_res: {respond_res:?}");
395
396 assert!(
397 respond_res.is_none(),
398 "task with responder should return None when call .respond()"
399 );
400
401 let received_res = receiver.await;
402 println!("received_res: {received_res:?}");
403
404 assert!(received_res.is_ok());
405
406 let received = received_res.unwrap();
407
408 assert_eq!(received, expected_received);
409 }
410}