async_callback_manager/
task.rs

1use crate::{
2    BackendStreamingTask, BackendTask, DynFutureTask, DynMutationFuture, DynMutationStream,
3    DynStateMutation, DynStreamTask, TaskId,
4};
5use futures::stream::FuturesUnordered;
6use futures::{FutureExt, StreamExt};
7use std::any::{TypeId, type_name};
8use std::fmt::Debug;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tokio::task::{JoinError, JoinHandle};
12
13/// An asynchrnonous task that can generate state mutations and/or more tasks to
14/// be spawned by an AsyncCallbackManager.
15#[must_use = "AsyncTasks do nothing unless you run them"]
16pub struct AsyncTask<Frntend, Bkend, Md> {
17    pub(crate) task: AsyncTaskKind<Frntend, Bkend, Md>,
18    pub(crate) constraint: Option<Constraint<Md>>,
19    pub(crate) metadata: Vec<Md>,
20}
21
22pub(crate) enum AsyncTaskKind<Frntend, Bkend, Md> {
23    Future(FutureTask<Frntend, Bkend, Md>),
24    Stream(StreamTask<Frntend, Bkend, Md>),
25    Multi(Vec<AsyncTask<Frntend, Bkend, Md>>),
26    NoOp,
27}
28
29pub(crate) struct StreamTask<Frntend, Bkend, Md> {
30    pub(crate) task: DynStreamTask<Frntend, Bkend, Md>,
31    pub(crate) type_id: TypeId,
32    pub(crate) type_name: &'static str,
33    pub(crate) type_debug: String,
34}
35
36pub(crate) struct FutureTask<Frntend, Bkend, Md> {
37    pub(crate) task: DynFutureTask<Frntend, Bkend, Md>,
38    pub(crate) type_id: TypeId,
39    pub(crate) type_name: &'static str,
40    pub(crate) type_debug: String,
41}
42
43impl<Frntend, Bkend, Md> FromIterator<AsyncTask<Frntend, Bkend, Md>>
44    for AsyncTask<Frntend, Bkend, Md>
45{
46    fn from_iter<T: IntoIterator<Item = AsyncTask<Frntend, Bkend, Md>>>(iter: T) -> Self {
47        let v = iter.into_iter().collect();
48        // TODO: Better handle constraints / metadata.
49        AsyncTask {
50            task: AsyncTaskKind::Multi(v),
51            constraint: None,
52            metadata: vec![],
53        }
54    }
55}
56
57impl<Frntend, Bkend, Md> AsyncTask<Frntend, Bkend, Md> {
58    pub fn push(self, next: AsyncTask<Frntend, Bkend, Md>) -> AsyncTask<Frntend, Bkend, Md> {
59        match self.task {
60            AsyncTaskKind::Future(_) | AsyncTaskKind::Stream(_) => {
61                let v = vec![self, next];
62                AsyncTask {
63                    task: AsyncTaskKind::Multi(v),
64                    constraint: None,
65                    metadata: vec![],
66                }
67            }
68            AsyncTaskKind::Multi(mut m) => {
69                m.push(next);
70                AsyncTask {
71                    task: AsyncTaskKind::Multi(m),
72                    constraint: self.constraint,
73                    metadata: self.metadata,
74                }
75            }
76            AsyncTaskKind::NoOp => next,
77        }
78    }
79    pub fn is_no_op(&self) -> bool {
80        matches!(self.task, AsyncTaskKind::NoOp)
81    }
82    pub fn new_no_op() -> AsyncTask<Frntend, Bkend, Md> {
83        Self {
84            task: AsyncTaskKind::NoOp,
85            constraint: None,
86            metadata: vec![],
87        }
88    }
89    pub fn new_future<R>(
90        request: R,
91        handler: impl FnOnce(&mut Frntend, R::Output) + Send + 'static,
92        constraint: Option<Constraint<Md>>,
93    ) -> AsyncTask<Frntend, Bkend, Md>
94    where
95        R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
96        Bkend: 'static,
97        Frntend: 'static,
98    {
99        let metadata = R::metadata();
100        let type_id = request.type_id();
101        let type_name = type_name::<R>();
102        let type_debug = format!("{request:?}");
103        let task = Box::new(move |b: &Bkend| {
104            Box::new({
105                let future = BackendTask::into_future(request, b);
106                Box::pin(async move {
107                    let output = future.await;
108                    Box::new(move |frontend: &mut Frntend| {
109                        handler(frontend, output);
110                        AsyncTask::new_no_op()
111                    }) as DynStateMutation<Frntend, Bkend, Md>
112                })
113            }) as DynMutationFuture<Frntend, Bkend, Md>
114        }) as DynFutureTask<Frntend, Bkend, Md>;
115        let task = FutureTask {
116            task,
117            type_id,
118            type_name,
119            type_debug,
120        };
121        AsyncTask {
122            task: AsyncTaskKind::Future(task),
123            constraint,
124            metadata,
125        }
126    }
127    pub fn new_future_chained<R>(
128        request: R,
129        handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md> + Send + 'static,
130        constraint: Option<Constraint<Md>>,
131    ) -> AsyncTask<Frntend, Bkend, Md>
132    where
133        R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
134        Bkend: 'static,
135        Frntend: 'static,
136    {
137        let metadata = R::metadata();
138        let type_id = request.type_id();
139        let type_name = type_name::<R>();
140        let type_debug = format!("{request:?}");
141        let task = Box::new(move |b: &Bkend| {
142            Box::new({
143                let future = BackendTask::into_future(request, b);
144                Box::pin(async move {
145                    let output = future.await;
146                    Box::new(move |frontend: &mut Frntend| handler(frontend, output))
147                        as DynStateMutation<Frntend, Bkend, Md>
148                })
149            }) as DynMutationFuture<Frntend, Bkend, Md>
150        }) as DynFutureTask<Frntend, Bkend, Md>;
151        let task = FutureTask {
152            task,
153            type_id,
154            type_name,
155            type_debug,
156        };
157        AsyncTask {
158            task: AsyncTaskKind::Future(task),
159            constraint,
160            metadata,
161        }
162    }
163    pub fn new_stream<R>(
164        request: R,
165        // TODO: Review Clone bounds.
166        handler: impl FnOnce(&mut Frntend, R::Output) + Send + Clone + 'static,
167        constraint: Option<Constraint<Md>>,
168    ) -> AsyncTask<Frntend, Bkend, Md>
169    where
170        R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
171        Bkend: 'static,
172        Frntend: 'static,
173    {
174        let metadata = R::metadata();
175        let type_id = request.type_id();
176        let type_name = type_name::<R>();
177        let type_debug = format!("{request:?}");
178        let task = Box::new(move |b: &Bkend| {
179            let stream = request.into_stream(b);
180            Box::new({
181                stream.map(move |output| {
182                    Box::new({
183                        let handler = handler.clone();
184                        move |frontend: &mut Frntend| {
185                            handler.clone()(frontend, output);
186                            AsyncTask::new_no_op()
187                        }
188                    }) as DynStateMutation<Frntend, Bkend, Md>
189                })
190            }) as DynMutationStream<Frntend, Bkend, Md>
191        }) as DynStreamTask<Frntend, Bkend, Md>;
192        let task = StreamTask {
193            task,
194            type_id,
195            type_name,
196            type_debug,
197        };
198        AsyncTask {
199            task: AsyncTaskKind::Stream(task),
200            constraint,
201            metadata,
202        }
203    }
204    pub fn new_stream_chained<R>(
205        request: R,
206        // TODO: Review Clone bounds.
207        handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md>
208        + Send
209        + Clone
210        + 'static,
211        constraint: Option<Constraint<Md>>,
212    ) -> AsyncTask<Frntend, Bkend, Md>
213    where
214        R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
215        Bkend: 'static,
216        Frntend: 'static,
217    {
218        let metadata = R::metadata();
219        let type_id = request.type_id();
220        let type_name = type_name::<R>();
221        let type_debug = format!("{request:?}");
222        let task = Box::new(move |b: &Bkend| {
223            let stream = request.into_stream(b);
224            Box::new({
225                stream.map(move |output| {
226                    Box::new({
227                        let handler = handler.clone();
228                        move |frontend: &mut Frntend| handler.clone()(frontend, output)
229                    }) as DynStateMutation<Frntend, Bkend, Md>
230                })
231            }) as DynMutationStream<Frntend, Bkend, Md>
232        }) as DynStreamTask<Frntend, Bkend, Md>;
233        let task = StreamTask {
234            task,
235            type_id,
236            type_name,
237            type_debug,
238        };
239        AsyncTask {
240            task: AsyncTaskKind::Stream(task),
241            constraint,
242            metadata,
243        }
244    }
245    /// # Warning
246    /// This is recursive, if you have set up a cycle of AsyncTasks, map may
247    /// overflow.
248    pub fn map<NewFrntend>(
249        self,
250        f: impl Fn(&mut NewFrntend) -> &mut Frntend + Clone + Send + 'static,
251    ) -> AsyncTask<NewFrntend, Bkend, Md>
252    where
253        Bkend: 'static,
254        Frntend: 'static,
255        Md: 'static,
256    {
257        let Self {
258            task,
259            constraint,
260            metadata,
261        } = self;
262        match task {
263            AsyncTaskKind::Future(FutureTask {
264                task,
265                type_id,
266                type_name,
267                type_debug,
268            }) => {
269                let task = Box::new(|b: &Bkend| {
270                    Box::new(task(b).map(|task| {
271                        Box::new(|nf: &mut NewFrntend| {
272                            let task = task(f(nf));
273                            task.map(f)
274                        }) as DynStateMutation<NewFrntend, Bkend, Md>
275                    })) as DynMutationFuture<NewFrntend, Bkend, Md>
276                }) as DynFutureTask<NewFrntend, Bkend, Md>;
277                let task = FutureTask {
278                    task,
279                    type_id,
280                    type_name,
281                    type_debug,
282                };
283                AsyncTask {
284                    task: AsyncTaskKind::Future(task),
285                    constraint,
286                    metadata,
287                }
288            }
289            AsyncTaskKind::Stream(StreamTask {
290                task,
291                type_id,
292                type_name,
293                type_debug,
294            }) => {
295                let task = Box::new(|b: &Bkend| {
296                    Box::new({
297                        task(b).map(move |task| {
298                            Box::new({
299                                let f = f.clone();
300                                move |nf: &mut NewFrntend| {
301                                    let task = task(f(nf));
302                                    task.map(f.clone())
303                                }
304                            })
305                                as DynStateMutation<NewFrntend, Bkend, Md>
306                        })
307                    }) as DynMutationStream<NewFrntend, Bkend, Md>
308                }) as DynStreamTask<NewFrntend, Bkend, Md>;
309                let stream_task = StreamTask {
310                    task,
311                    type_id,
312                    type_name,
313                    type_debug,
314                };
315                AsyncTask {
316                    task: AsyncTaskKind::Stream(stream_task),
317                    constraint,
318                    metadata,
319                }
320            }
321            AsyncTaskKind::NoOp => AsyncTask {
322                task: AsyncTaskKind::NoOp,
323                constraint,
324                metadata,
325            },
326            AsyncTaskKind::Multi(v) => {
327                let mapped = v.into_iter().map(|task| task.map(f.clone())).collect();
328                AsyncTask {
329                    task: AsyncTaskKind::Multi(mapped),
330                    constraint,
331                    metadata,
332                }
333            }
334        }
335    }
336}
337
338pub(crate) struct TaskList<Bkend, Frntend, Md> {
339    pub inner: Vec<SpawnedTask<Bkend, Frntend, Md>>,
340}
341
342pub(crate) struct SpawnedTask<Frntend, Bkend, Md> {
343    pub(crate) type_id: TypeId,
344    pub(crate) type_name: &'static str,
345    pub(crate) type_debug: Arc<String>,
346    pub(crate) receiver: TaskWaiter<Frntend, Bkend, Md>,
347    pub(crate) task_id: TaskId,
348    pub(crate) metadata: Vec<Md>,
349}
350
351/// User visible struct for introspection.
352#[derive(Debug, Clone)]
353pub struct TaskInformation<'a, Cstrnt> {
354    pub type_id: TypeId,
355    pub type_name: &'static str,
356    pub type_debug: &'a str,
357    pub constraint: &'a Option<Constraint<Cstrnt>>,
358}
359
360#[derive(Eq, PartialEq, Debug)]
361pub struct Constraint<Cstrnt> {
362    pub(crate) constraint_type: ConstraitType<Cstrnt>,
363}
364
365#[derive(Eq, PartialEq, Debug)]
366pub enum ConstraitType<Cstrnt> {
367    BlockSameType,
368    KillSameType,
369    BlockMatchingMetatdata(Cstrnt),
370}
371
372pub(crate) enum TaskWaiter<Frntend, Bkend, Md> {
373    Future(JoinHandle<DynStateMutation<Frntend, Bkend, Md>>),
374    Stream {
375        receiver: mpsc::Receiver<DynStateMutation<Frntend, Bkend, Md>>,
376        join_handle: JoinHandle<()>,
377    },
378}
379
380impl<Frntend, Bkend, Md> TaskWaiter<Frntend, Bkend, Md> {
381    fn kill(&mut self) {
382        match self {
383            TaskWaiter::Future(handle) => handle.abort(),
384            TaskWaiter::Stream { join_handle, .. } => join_handle.abort_handle().abort(),
385        }
386    }
387}
388
389pub enum TaskOutcome<Frntend, Bkend, Md> {
390    /// The stream has completed, it won't be sending any more tasks.
391    StreamFinished {
392        type_id: TypeId,
393        type_name: &'static str,
394        type_debug: Arc<String>,
395        task_id: TaskId,
396    },
397    /// The stream has panicked, it won't be sending any more tasks.
398    StreamPanicked {
399        error: JoinError,
400        type_id: TypeId,
401        type_name: &'static str,
402        type_debug: Arc<String>,
403        task_id: TaskId,
404    },
405    /// No task was recieved because the next task panicked.
406    TaskPanicked {
407        error: JoinError,
408        type_id: TypeId,
409        type_name: &'static str,
410        type_debug: Arc<String>,
411        task_id: TaskId,
412    },
413    /// Mutation was received from a task.
414    MutationReceived {
415        mutation: DynStateMutation<Frntend, Bkend, Md>,
416        type_id: TypeId,
417        type_name: &'static str,
418        type_debug: Arc<String>,
419        task_id: TaskId,
420    },
421}
422
423impl<Bkend, Frntend, Md: PartialEq> TaskList<Frntend, Bkend, Md> {
424    pub(crate) fn new() -> Self {
425        Self { inner: vec![] }
426    }
427    /// Await for the next response from one of the spawned tasks.
428    pub(crate) async fn get_next_response(&mut self) -> Option<TaskOutcome<Frntend, Bkend, Md>> {
429        let task_completed = self
430            .inner
431            .iter_mut()
432            .enumerate()
433            .map(|(idx, task)| async move {
434                match task.receiver {
435                    TaskWaiter::Future(ref mut receiver) => match receiver.await {
436                        Ok(mutation) => (
437                            Some(idx),
438                            TaskOutcome::MutationReceived {
439                                mutation,
440                                type_id: task.type_id,
441                                type_debug: task.type_debug.clone(),
442                                task_id: task.task_id,
443                                type_name: task.type_name,
444                            },
445                        ),
446                        Err(error) => (
447                            Some(idx),
448                            TaskOutcome::TaskPanicked {
449                                type_id: task.type_id,
450                                type_name: task.type_name,
451                                type_debug: task.type_debug.clone(),
452                                task_id: task.task_id,
453                                error,
454                            },
455                        ),
456                    },
457                    TaskWaiter::Stream {
458                        ref mut receiver,
459                        ref mut join_handle,
460                    } => {
461                        if let Some(mutation) = receiver.recv().await {
462                            return (
463                                None,
464                                TaskOutcome::MutationReceived {
465                                    mutation,
466                                    type_id: task.type_id,
467                                    type_name: task.type_name,
468                                    task_id: task.task_id,
469                                    type_debug: task.type_debug.clone(),
470                                },
471                            );
472                        };
473                        match join_handle.await {
474                            Err(error) if error.is_panic() => (
475                                Some(idx),
476                                TaskOutcome::StreamPanicked {
477                                    error,
478                                    type_id: task.type_id,
479                                    type_name: task.type_name,
480                                    type_debug: task.type_debug.clone(),
481                                    task_id: task.task_id,
482                                },
483                            ),
484                            // Ok case or Err case where Err is not a panic (ie, it's an abort).
485                            _ => (
486                                Some(idx),
487                                TaskOutcome::StreamFinished {
488                                    type_id: task.type_id,
489                                    type_name: task.type_name,
490                                    type_debug: task.type_debug.clone(),
491                                    task_id: task.task_id,
492                                },
493                            ),
494                        }
495                    }
496                }
497            })
498            .collect::<FuturesUnordered<_>>()
499            .next()
500            .await;
501        let (maybe_completed_idx, outcome) = task_completed?;
502        if let Some(completed_idx) = maybe_completed_idx {
503            // Safe - this value is in range as produced from enumerate on
504            // original list.
505            self.inner.swap_remove(completed_idx);
506        };
507        Some(outcome)
508    }
509    pub(crate) fn push(&mut self, task: SpawnedTask<Frntend, Bkend, Md>) {
510        self.inner.push(task)
511    }
512    // TODO: Tests
513    pub(crate) fn handle_constraint(&mut self, constraint: Constraint<Md>, type_id: TypeId) {
514        // TODO: Consider the situation where one component kills tasks belonging to
515        // another component.
516        //
517        // Assuming here that kill implies block also.
518        let task_doesnt_match_constraint = |task: &SpawnedTask<_, _, _>| task.type_id != type_id;
519        let task_doesnt_match_metadata =
520            |task: &SpawnedTask<_, _, _>, constraint| !task.metadata.contains(constraint);
521        match constraint.constraint_type {
522            ConstraitType::BlockMatchingMetatdata(metadata) => self
523                .inner
524                .retain(|task| task_doesnt_match_metadata(task, &metadata)),
525            ConstraitType::BlockSameType => {
526                self.inner.retain(task_doesnt_match_constraint);
527            }
528            ConstraitType::KillSameType => self.inner.retain_mut(|task| {
529                if !task_doesnt_match_constraint(task) {
530                    task.receiver.kill();
531                    return false;
532                }
533                true
534            }),
535        }
536    }
537}
538
539impl<Cstrnt> Constraint<Cstrnt> {
540    pub fn new_block_same_type() -> Self {
541        Self {
542            constraint_type: ConstraitType::BlockSameType,
543        }
544    }
545    pub fn new_kill_same_type() -> Self {
546        Self {
547            constraint_type: ConstraitType::KillSameType,
548        }
549    }
550    pub fn new_block_matching_metadata(metadata: Cstrnt) -> Self {
551        Self {
552            constraint_type: ConstraitType::BlockMatchingMetatdata(metadata),
553        }
554    }
555}
556
557#[cfg(test)]
558mod tests {
559    use crate::{AsyncTask, BackendStreamingTask, BackendTask};
560    use futures::StreamExt;
561    #[derive(Debug)]
562    struct Task1;
563    #[derive(Debug)]
564    struct Task2;
565    #[derive(Debug)]
566    struct StreamingTask;
567    impl BackendTask<()> for Task1 {
568        type Output = ();
569        type MetadataType = ();
570        #[allow(clippy::manual_async_fn)]
571        fn into_future(
572            self,
573            _: &(),
574        ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
575            async {}
576        }
577    }
578    impl BackendTask<()> for Task2 {
579        type Output = ();
580        type MetadataType = ();
581        #[allow(clippy::manual_async_fn)]
582        fn into_future(
583            self,
584            _: &(),
585        ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
586            async {}
587        }
588    }
589    impl BackendStreamingTask<()> for StreamingTask {
590        type Output = ();
591        type MetadataType = ();
592        fn into_stream(
593            self,
594            _: &(),
595        ) -> impl futures::Stream<Item = Self::Output> + Send + Unpin + 'static {
596            futures::stream::once(async move {}).boxed()
597        }
598    }
599    #[tokio::test]
600    async fn test_recursive_map() {
601        let recursive_task = AsyncTask::new_stream_chained(
602            StreamingTask,
603            |_: &mut (), _| {
604                AsyncTask::new_future_chained(
605                    Task1,
606                    |_: &mut (), _| AsyncTask::new_future(Task2, |_: &mut (), _| {}, None),
607                    None,
608                )
609            },
610            None,
611        );
612        // Here, it's expected that this is succesful.
613        // TODO: Run the task for an expected outcome.
614        #[allow(unused_must_use)]
615        let _ = recursive_task.map(|tmp: &mut ()| tmp);
616    }
617}