async_callback_manager/
task.rs

1use crate::{
2    BackendStreamingTask, BackendTask, DynFutureTask, DynMutationFuture, DynMutationStream,
3    DynStateMutation, DynStreamTask, TaskId,
4};
5use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
6use std::{
7    any::{type_name, TypeId},
8    fmt::Debug,
9    sync::Arc,
10};
11use tokio::{
12    sync::mpsc,
13    task::{AbortHandle, JoinError, JoinHandle},
14};
15
16/// An asynchrnonous task that can generate state mutations and/or more tasks to
17/// be spawned by an AsyncCallbackManager.
18#[must_use = "AsyncTasks do nothing unless you run them"]
19pub struct AsyncTask<Frntend, Bkend, Md> {
20    pub(crate) task: AsyncTaskKind<Frntend, Bkend, Md>,
21    pub(crate) constraint: Option<Constraint<Md>>,
22    pub(crate) metadata: Vec<Md>,
23}
24
25pub(crate) enum AsyncTaskKind<Frntend, Bkend, Md> {
26    Future(FutureTask<Frntend, Bkend, Md>),
27    Stream(StreamTask<Frntend, Bkend, Md>),
28    Multi(Vec<AsyncTask<Frntend, Bkend, Md>>),
29    NoOp,
30}
31
32pub(crate) struct StreamTask<Frntend, Bkend, Md> {
33    pub(crate) task: DynStreamTask<Frntend, Bkend, Md>,
34    pub(crate) type_id: TypeId,
35    pub(crate) type_name: &'static str,
36    pub(crate) type_debug: String,
37}
38
39pub(crate) struct FutureTask<Frntend, Bkend, Md> {
40    pub(crate) task: DynFutureTask<Frntend, Bkend, Md>,
41    pub(crate) type_id: TypeId,
42    pub(crate) type_name: &'static str,
43    pub(crate) type_debug: String,
44}
45
46impl<Frntend, Bkend, Md> FromIterator<AsyncTask<Frntend, Bkend, Md>>
47    for AsyncTask<Frntend, Bkend, Md>
48{
49    fn from_iter<T: IntoIterator<Item = AsyncTask<Frntend, Bkend, Md>>>(iter: T) -> Self {
50        let v = iter.into_iter().collect();
51        // TODO: Better handle constraints / metadata.
52        AsyncTask {
53            task: AsyncTaskKind::Multi(v),
54            constraint: None,
55            metadata: vec![],
56        }
57    }
58}
59
60impl<Frntend, Bkend, Md> AsyncTask<Frntend, Bkend, Md> {
61    pub fn push(self, next: AsyncTask<Frntend, Bkend, Md>) -> AsyncTask<Frntend, Bkend, Md> {
62        match self.task {
63            AsyncTaskKind::Future(_) | AsyncTaskKind::Stream(_) => {
64                let v = vec![self, next];
65                AsyncTask {
66                    task: AsyncTaskKind::Multi(v),
67                    constraint: None,
68                    metadata: vec![],
69                }
70            }
71            AsyncTaskKind::Multi(mut m) => {
72                m.push(next);
73                AsyncTask {
74                    task: AsyncTaskKind::Multi(m),
75                    constraint: self.constraint,
76                    metadata: self.metadata,
77                }
78            }
79            AsyncTaskKind::NoOp => next,
80        }
81    }
82    pub fn is_no_op(&self) -> bool {
83        matches!(self.task, AsyncTaskKind::NoOp)
84    }
85    pub fn new_no_op() -> AsyncTask<Frntend, Bkend, Md> {
86        Self {
87            task: AsyncTaskKind::NoOp,
88            constraint: None,
89            metadata: vec![],
90        }
91    }
92    pub fn new_future<R>(
93        request: R,
94        handler: impl FnOnce(&mut Frntend, R::Output) + Send + 'static,
95        constraint: Option<Constraint<Md>>,
96    ) -> AsyncTask<Frntend, Bkend, Md>
97    where
98        R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
99        Bkend: 'static,
100        Frntend: 'static,
101    {
102        let metadata = R::metadata();
103        let type_id = request.type_id();
104        let type_name = type_name::<R>();
105        let type_debug = format!("{:?}", request);
106        let task = Box::new(move |b: &Bkend| {
107            Box::new({
108                let future = request.into_future(b);
109                Box::pin(async move {
110                    let output = future.await;
111                    Box::new(move |frontend: &mut Frntend| {
112                        handler(frontend, output);
113                        AsyncTask::new_no_op()
114                    }) as DynStateMutation<Frntend, Bkend, Md>
115                })
116            }) as DynMutationFuture<Frntend, Bkend, Md>
117        }) as DynFutureTask<Frntend, Bkend, Md>;
118        let task = FutureTask {
119            task,
120            type_id,
121            type_name,
122            type_debug,
123        };
124        AsyncTask {
125            task: AsyncTaskKind::Future(task),
126            constraint,
127            metadata,
128        }
129    }
130    pub fn new_future_chained<R>(
131        request: R,
132        handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md> + Send + 'static,
133        constraint: Option<Constraint<Md>>,
134    ) -> AsyncTask<Frntend, Bkend, Md>
135    where
136        R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
137        Bkend: 'static,
138        Frntend: 'static,
139    {
140        let metadata = R::metadata();
141        let type_id = request.type_id();
142        let type_name = type_name::<R>();
143        let type_debug = format!("{:?}", request);
144        let task = Box::new(move |b: &Bkend| {
145            Box::new({
146                let future = request.into_future(b);
147                Box::pin(async move {
148                    let output = future.await;
149                    Box::new(move |frontend: &mut Frntend| handler(frontend, output))
150                        as DynStateMutation<Frntend, Bkend, Md>
151                })
152            }) as DynMutationFuture<Frntend, Bkend, Md>
153        }) as DynFutureTask<Frntend, Bkend, Md>;
154        let task = FutureTask {
155            task,
156            type_id,
157            type_name,
158            type_debug,
159        };
160        AsyncTask {
161            task: AsyncTaskKind::Future(task),
162            constraint,
163            metadata,
164        }
165    }
166    pub fn new_stream<R>(
167        request: R,
168        // TODO: Review Clone bounds.
169        handler: impl FnOnce(&mut Frntend, R::Output) + Send + Clone + 'static,
170        constraint: Option<Constraint<Md>>,
171    ) -> AsyncTask<Frntend, Bkend, Md>
172    where
173        R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
174        Bkend: 'static,
175        Frntend: 'static,
176    {
177        let metadata = R::metadata();
178        let type_id = request.type_id();
179        let type_name = type_name::<R>();
180        let type_debug = format!("{:?}", request);
181        let task = Box::new(move |b: &Bkend| {
182            let stream = request.into_stream(b);
183            Box::new({
184                stream.map(move |output| {
185                    Box::new({
186                        let handler = handler.clone();
187                        move |frontend: &mut Frntend| {
188                            handler.clone()(frontend, output);
189                            AsyncTask::new_no_op()
190                        }
191                    }) as DynStateMutation<Frntend, Bkend, Md>
192                })
193            }) as DynMutationStream<Frntend, Bkend, Md>
194        }) as DynStreamTask<Frntend, Bkend, Md>;
195        let task = StreamTask {
196            task,
197            type_id,
198            type_name,
199            type_debug,
200        };
201        AsyncTask {
202            task: AsyncTaskKind::Stream(task),
203            constraint,
204            metadata,
205        }
206    }
207    pub fn new_stream_chained<R>(
208        request: R,
209        // TODO: Review Clone bounds.
210        handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md>
211            + Send
212            + Clone
213            + 'static,
214        constraint: Option<Constraint<Md>>,
215    ) -> AsyncTask<Frntend, Bkend, Md>
216    where
217        R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
218        Bkend: 'static,
219        Frntend: 'static,
220    {
221        let metadata = R::metadata();
222        let type_id = request.type_id();
223        let type_name = type_name::<R>();
224        let type_debug = format!("{:?}", request);
225        let task = Box::new(move |b: &Bkend| {
226            let stream = request.into_stream(b);
227            Box::new({
228                stream.map(move |output| {
229                    Box::new({
230                        let handler = handler.clone();
231                        move |frontend: &mut Frntend| handler.clone()(frontend, output)
232                    }) as DynStateMutation<Frntend, Bkend, Md>
233                })
234            }) as DynMutationStream<Frntend, Bkend, Md>
235        }) as DynStreamTask<Frntend, Bkend, Md>;
236        let task = StreamTask {
237            task,
238            type_id,
239            type_name,
240            type_debug,
241        };
242        AsyncTask {
243            task: AsyncTaskKind::Stream(task),
244            constraint,
245            metadata,
246        }
247    }
248    /// # Warning
249    /// This is recursive, if you have set up a cycle of AsyncTasks, map may
250    /// overflow.
251    pub fn map<NewFrntend>(
252        self,
253        f: impl Fn(&mut NewFrntend) -> &mut Frntend + Clone + Send + 'static,
254    ) -> AsyncTask<NewFrntend, Bkend, Md>
255    where
256        Bkend: 'static,
257        Frntend: 'static,
258        Md: 'static,
259    {
260        let Self {
261            task,
262            constraint,
263            metadata,
264        } = self;
265        match task {
266            AsyncTaskKind::Future(FutureTask {
267                task,
268                type_id,
269                type_name,
270                type_debug,
271            }) => {
272                let task = Box::new(|b: &Bkend| {
273                    Box::new(task(b).map(|task| {
274                        Box::new(|nf: &mut NewFrntend| {
275                            let task = task(f(nf));
276                            task.map(f)
277                        }) as DynStateMutation<NewFrntend, Bkend, Md>
278                    })) as DynMutationFuture<NewFrntend, Bkend, Md>
279                }) as DynFutureTask<NewFrntend, Bkend, Md>;
280                let task = FutureTask {
281                    task,
282                    type_id,
283                    type_name,
284                    type_debug,
285                };
286                AsyncTask {
287                    task: AsyncTaskKind::Future(task),
288                    constraint,
289                    metadata,
290                }
291            }
292            AsyncTaskKind::Stream(StreamTask {
293                task,
294                type_id,
295                type_name,
296                type_debug,
297            }) => {
298                let task = Box::new(|b: &Bkend| {
299                    Box::new({
300                        task(b).map(move |task| {
301                            Box::new({
302                                let f = f.clone();
303                                move |nf: &mut NewFrntend| {
304                                    let task = task(f(nf));
305                                    task.map(f.clone())
306                                }
307                            })
308                                as DynStateMutation<NewFrntend, Bkend, Md>
309                        })
310                    }) as DynMutationStream<NewFrntend, Bkend, Md>
311                }) as DynStreamTask<NewFrntend, Bkend, Md>;
312                let stream_task = StreamTask {
313                    task,
314                    type_id,
315                    type_name,
316                    type_debug,
317                };
318                AsyncTask {
319                    task: AsyncTaskKind::Stream(stream_task),
320                    constraint,
321                    metadata,
322                }
323            }
324            AsyncTaskKind::NoOp => AsyncTask {
325                task: AsyncTaskKind::NoOp,
326                constraint,
327                metadata,
328            },
329            AsyncTaskKind::Multi(v) => {
330                let mapped = v.into_iter().map(|task| task.map(f.clone())).collect();
331                AsyncTask {
332                    task: AsyncTaskKind::Multi(mapped),
333                    constraint,
334                    metadata,
335                }
336            }
337        }
338    }
339}
340
341pub(crate) struct TaskList<Bkend, Frntend, Md> {
342    pub inner: Vec<SpawnedTask<Bkend, Frntend, Md>>,
343}
344
345pub(crate) struct SpawnedTask<Frntend, Bkend, Md> {
346    pub(crate) type_id: TypeId,
347    pub(crate) type_name: &'static str,
348    pub(crate) type_debug: Arc<String>,
349    pub(crate) receiver: TaskWaiter<Frntend, Bkend, Md>,
350    pub(crate) task_id: TaskId,
351    pub(crate) metadata: Vec<Md>,
352}
353
354/// User visible struct for introspection.
355#[derive(Debug, Clone)]
356pub struct TaskInformation<'a, Cstrnt> {
357    pub type_id: TypeId,
358    pub type_name: &'static str,
359    pub type_debug: &'a str,
360    pub constraint: &'a Option<Constraint<Cstrnt>>,
361}
362
363#[derive(Eq, PartialEq, Debug)]
364pub struct Constraint<Cstrnt> {
365    pub(crate) constraint_type: ConstraitType<Cstrnt>,
366}
367
368#[derive(Eq, PartialEq, Debug)]
369pub enum ConstraitType<Cstrnt> {
370    BlockSameType,
371    KillSameType,
372    BlockMatchingMetatdata(Cstrnt),
373}
374
375pub(crate) enum TaskWaiter<Frntend, Bkend, Md> {
376    Future(JoinHandle<DynStateMutation<Frntend, Bkend, Md>>),
377    Stream {
378        receiver: mpsc::Receiver<DynStateMutation<Frntend, Bkend, Md>>,
379        abort_handle: AbortHandle,
380    },
381}
382
383impl<Frntend, Bkend, Md> TaskWaiter<Frntend, Bkend, Md> {
384    fn kill(&mut self) {
385        match self {
386            TaskWaiter::Future(handle) => handle.abort(),
387            TaskWaiter::Stream {
388                abort_handle: abort,
389                ..
390            } => abort.abort(),
391        }
392    }
393}
394
395pub enum TaskOutcome<Frntend, Bkend, Md> {
396    /// No task was recieved because a stream closed, but there are still more
397    /// tasks.
398    StreamClosed,
399    /// No task was recieved because the next task panicked.
400    /// Currently only applicable to Future type tasks.
401    // TODO: Implement for Stream type tasks.
402    TaskPanicked {
403        error: JoinError,
404        type_id: TypeId,
405        type_name: &'static str,
406        type_debug: Arc<String>,
407        task_id: TaskId,
408    },
409    /// Mutation was received from a task.
410    MutationReceived {
411        mutation: DynStateMutation<Frntend, Bkend, Md>,
412        type_id: TypeId,
413        type_name: &'static str,
414        type_debug: Arc<String>,
415        task_id: TaskId,
416    },
417}
418
419impl<Bkend, Frntend, Md: PartialEq> TaskList<Frntend, Bkend, Md> {
420    pub(crate) fn new() -> Self {
421        Self { inner: vec![] }
422    }
423    /// Await for the next response from one of the spawned tasks.
424    pub(crate) async fn get_next_response(&mut self) -> Option<TaskOutcome<Frntend, Bkend, Md>> {
425        let task_completed = self
426            .inner
427            .iter_mut()
428            .enumerate()
429            .map(|(idx, task)| async move {
430                match task.receiver {
431                    TaskWaiter::Future(ref mut receiver) => match receiver.await {
432                        Ok(mutation) => (
433                            Some(idx),
434                            TaskOutcome::MutationReceived {
435                                mutation,
436                                type_id: task.type_id,
437                                type_debug: task.type_debug.clone(),
438                                task_id: task.task_id,
439                                type_name: task.type_name,
440                            },
441                        ),
442                        Err(error) => (
443                            Some(idx),
444                            TaskOutcome::TaskPanicked {
445                                type_id: task.type_id,
446                                type_name: task.type_name,
447                                type_debug: task.type_debug.clone(),
448                                task_id: task.task_id,
449                                error,
450                            },
451                        ),
452                    },
453                    TaskWaiter::Stream {
454                        ref mut receiver, ..
455                    } => {
456                        if let Some(mutation) = receiver.recv().await {
457                            return (
458                                None,
459                                TaskOutcome::MutationReceived {
460                                    mutation,
461                                    type_id: task.type_id,
462                                    type_name: task.type_name,
463                                    task_id: task.task_id,
464                                    type_debug: task.type_debug.clone(),
465                                },
466                            );
467                        }
468                        (Some(idx), TaskOutcome::StreamClosed)
469                    }
470                }
471            })
472            .collect::<FuturesUnordered<_>>()
473            .next()
474            .await;
475        let (maybe_completed_id, outcome) = task_completed?;
476        if let Some(completed_id) = maybe_completed_id {
477            // Safe - this value is in range as produced from enumerate on
478            // original list.
479            self.inner.swap_remove(completed_id);
480        };
481        Some(outcome)
482    }
483    pub(crate) fn push(&mut self, task: SpawnedTask<Frntend, Bkend, Md>) {
484        self.inner.push(task)
485    }
486    // TODO: Tests
487    pub(crate) fn handle_constraint(&mut self, constraint: Constraint<Md>, type_id: TypeId) {
488        // TODO: Consider the situation where one component kills tasks belonging to
489        // another component.
490        //
491        // Assuming here that kill implies block also.
492        let task_doesnt_match_constraint = |task: &SpawnedTask<_, _, _>| (task.type_id != type_id);
493        let task_doesnt_match_metadata =
494            |task: &SpawnedTask<_, _, _>, constraint| !task.metadata.contains(constraint);
495        match constraint.constraint_type {
496            ConstraitType::BlockMatchingMetatdata(metadata) => self
497                .inner
498                .retain(|task| task_doesnt_match_metadata(task, &metadata)),
499            ConstraitType::BlockSameType => {
500                self.inner.retain(task_doesnt_match_constraint);
501            }
502            ConstraitType::KillSameType => self.inner.retain_mut(|task| {
503                if !task_doesnt_match_constraint(task) {
504                    task.receiver.kill();
505                    return false;
506                }
507                true
508            }),
509        }
510    }
511}
512
513impl<Cstrnt> Constraint<Cstrnt> {
514    pub fn new_block_same_type() -> Self {
515        Self {
516            constraint_type: ConstraitType::BlockSameType,
517        }
518    }
519    pub fn new_kill_same_type() -> Self {
520        Self {
521            constraint_type: ConstraitType::KillSameType,
522        }
523    }
524    pub fn new_block_matching_metadata(metadata: Cstrnt) -> Self {
525        Self {
526            constraint_type: ConstraitType::BlockMatchingMetatdata(metadata),
527        }
528    }
529}
530
531#[cfg(test)]
532mod tests {
533    use futures::StreamExt;
534
535    use crate::{AsyncTask, BackendStreamingTask, BackendTask};
536    #[derive(Debug)]
537    struct Task1;
538    #[derive(Debug)]
539    struct Task2;
540    #[derive(Debug)]
541    struct StreamingTask;
542    impl BackendTask<()> for Task1 {
543        type Output = ();
544        type MetadataType = ();
545        #[allow(clippy::manual_async_fn)]
546        fn into_future(
547            self,
548            _: &(),
549        ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
550            async {}
551        }
552    }
553    impl BackendTask<()> for Task2 {
554        type Output = ();
555        type MetadataType = ();
556        #[allow(clippy::manual_async_fn)]
557        fn into_future(
558            self,
559            _: &(),
560        ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
561            async {}
562        }
563    }
564    impl BackendStreamingTask<()> for StreamingTask {
565        type Output = ();
566        type MetadataType = ();
567        fn into_stream(
568            self,
569            _: &(),
570        ) -> impl futures::Stream<Item = Self::Output> + Send + Unpin + 'static {
571            futures::stream::once(async move {}).boxed()
572        }
573    }
574    #[tokio::test]
575    async fn test_recursive_map() {
576        let recursive_task = AsyncTask::new_stream_chained(
577            StreamingTask,
578            |_: &mut (), _| {
579                AsyncTask::new_future_chained(
580                    Task1,
581                    |_: &mut (), _| AsyncTask::new_future(Task2, |_: &mut (), _| {}, None),
582                    None,
583                )
584            },
585            None,
586        );
587        // Here, it's expected that this is succesful.
588        // TODO: Run the task for an expected outcome.
589        #[allow(unused_must_use)]
590        let _ = recursive_task.map(|tmp: &mut ()| tmp);
591    }
592}