async_callback_manager/
task.rs

1use crate::{
2    BackendStreamingTask, BackendTask, DynFutureTask, DynMutationFuture, DynMutationStream,
3    DynStateMutation, DynStreamTask, TaskId,
4};
5use futures::stream::FuturesUnordered;
6use futures::{FutureExt, StreamExt};
7use std::any::{type_name, TypeId};
8use std::fmt::Debug;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tokio::task::{AbortHandle, JoinError, JoinHandle};
12
13/// An asynchrnonous task that can generate state mutations and/or more tasks to
14/// be spawned by an AsyncCallbackManager.
15#[must_use = "AsyncTasks do nothing unless you run them"]
16pub struct AsyncTask<Frntend, Bkend, Md> {
17    pub(crate) task: AsyncTaskKind<Frntend, Bkend, Md>,
18    pub(crate) constraint: Option<Constraint<Md>>,
19    pub(crate) metadata: Vec<Md>,
20}
21
22pub(crate) enum AsyncTaskKind<Frntend, Bkend, Md> {
23    Future(FutureTask<Frntend, Bkend, Md>),
24    Stream(StreamTask<Frntend, Bkend, Md>),
25    Multi(Vec<AsyncTask<Frntend, Bkend, Md>>),
26    NoOp,
27}
28
29pub(crate) struct StreamTask<Frntend, Bkend, Md> {
30    pub(crate) task: DynStreamTask<Frntend, Bkend, Md>,
31    pub(crate) type_id: TypeId,
32    pub(crate) type_name: &'static str,
33    pub(crate) type_debug: String,
34}
35
36pub(crate) struct FutureTask<Frntend, Bkend, Md> {
37    pub(crate) task: DynFutureTask<Frntend, Bkend, Md>,
38    pub(crate) type_id: TypeId,
39    pub(crate) type_name: &'static str,
40    pub(crate) type_debug: String,
41}
42
43impl<Frntend, Bkend, Md> FromIterator<AsyncTask<Frntend, Bkend, Md>>
44    for AsyncTask<Frntend, Bkend, Md>
45{
46    fn from_iter<T: IntoIterator<Item = AsyncTask<Frntend, Bkend, Md>>>(iter: T) -> Self {
47        let v = iter.into_iter().collect();
48        // TODO: Better handle constraints / metadata.
49        AsyncTask {
50            task: AsyncTaskKind::Multi(v),
51            constraint: None,
52            metadata: vec![],
53        }
54    }
55}
56
57impl<Frntend, Bkend, Md> AsyncTask<Frntend, Bkend, Md> {
58    pub fn push(self, next: AsyncTask<Frntend, Bkend, Md>) -> AsyncTask<Frntend, Bkend, Md> {
59        match self.task {
60            AsyncTaskKind::Future(_) | AsyncTaskKind::Stream(_) => {
61                let v = vec![self, next];
62                AsyncTask {
63                    task: AsyncTaskKind::Multi(v),
64                    constraint: None,
65                    metadata: vec![],
66                }
67            }
68            AsyncTaskKind::Multi(mut m) => {
69                m.push(next);
70                AsyncTask {
71                    task: AsyncTaskKind::Multi(m),
72                    constraint: self.constraint,
73                    metadata: self.metadata,
74                }
75            }
76            AsyncTaskKind::NoOp => next,
77        }
78    }
79    pub fn is_no_op(&self) -> bool {
80        matches!(self.task, AsyncTaskKind::NoOp)
81    }
82    pub fn new_no_op() -> AsyncTask<Frntend, Bkend, Md> {
83        Self {
84            task: AsyncTaskKind::NoOp,
85            constraint: None,
86            metadata: vec![],
87        }
88    }
89    pub fn new_future<R>(
90        request: R,
91        handler: impl FnOnce(&mut Frntend, R::Output) + Send + 'static,
92        constraint: Option<Constraint<Md>>,
93    ) -> AsyncTask<Frntend, Bkend, Md>
94    where
95        R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
96        Bkend: 'static,
97        Frntend: 'static,
98    {
99        let metadata = R::metadata();
100        let type_id = request.type_id();
101        let type_name = type_name::<R>();
102        let type_debug = format!("{request:?}");
103        let task = Box::new(move |b: &Bkend| {
104            Box::new({
105                let future = request.into_future(b);
106                Box::pin(async move {
107                    let output = future.await;
108                    Box::new(move |frontend: &mut Frntend| {
109                        handler(frontend, output);
110                        AsyncTask::new_no_op()
111                    }) as DynStateMutation<Frntend, Bkend, Md>
112                })
113            }) as DynMutationFuture<Frntend, Bkend, Md>
114        }) as DynFutureTask<Frntend, Bkend, Md>;
115        let task = FutureTask {
116            task,
117            type_id,
118            type_name,
119            type_debug,
120        };
121        AsyncTask {
122            task: AsyncTaskKind::Future(task),
123            constraint,
124            metadata,
125        }
126    }
127    pub fn new_future_chained<R>(
128        request: R,
129        handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md> + Send + 'static,
130        constraint: Option<Constraint<Md>>,
131    ) -> AsyncTask<Frntend, Bkend, Md>
132    where
133        R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
134        Bkend: 'static,
135        Frntend: 'static,
136    {
137        let metadata = R::metadata();
138        let type_id = request.type_id();
139        let type_name = type_name::<R>();
140        let type_debug = format!("{request:?}");
141        let task = Box::new(move |b: &Bkend| {
142            Box::new({
143                let future = request.into_future(b);
144                Box::pin(async move {
145                    let output = future.await;
146                    Box::new(move |frontend: &mut Frntend| handler(frontend, output))
147                        as DynStateMutation<Frntend, Bkend, Md>
148                })
149            }) as DynMutationFuture<Frntend, Bkend, Md>
150        }) as DynFutureTask<Frntend, Bkend, Md>;
151        let task = FutureTask {
152            task,
153            type_id,
154            type_name,
155            type_debug,
156        };
157        AsyncTask {
158            task: AsyncTaskKind::Future(task),
159            constraint,
160            metadata,
161        }
162    }
163    pub fn new_stream<R>(
164        request: R,
165        // TODO: Review Clone bounds.
166        handler: impl FnOnce(&mut Frntend, R::Output) + Send + Clone + 'static,
167        constraint: Option<Constraint<Md>>,
168    ) -> AsyncTask<Frntend, Bkend, Md>
169    where
170        R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
171        Bkend: 'static,
172        Frntend: 'static,
173    {
174        let metadata = R::metadata();
175        let type_id = request.type_id();
176        let type_name = type_name::<R>();
177        let type_debug = format!("{request:?}");
178        let task = Box::new(move |b: &Bkend| {
179            let stream = request.into_stream(b);
180            Box::new({
181                stream.map(move |output| {
182                    Box::new({
183                        let handler = handler.clone();
184                        move |frontend: &mut Frntend| {
185                            handler.clone()(frontend, output);
186                            AsyncTask::new_no_op()
187                        }
188                    }) as DynStateMutation<Frntend, Bkend, Md>
189                })
190            }) as DynMutationStream<Frntend, Bkend, Md>
191        }) as DynStreamTask<Frntend, Bkend, Md>;
192        let task = StreamTask {
193            task,
194            type_id,
195            type_name,
196            type_debug,
197        };
198        AsyncTask {
199            task: AsyncTaskKind::Stream(task),
200            constraint,
201            metadata,
202        }
203    }
204    pub fn new_stream_chained<R>(
205        request: R,
206        // TODO: Review Clone bounds.
207        handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md>
208            + Send
209            + Clone
210            + 'static,
211        constraint: Option<Constraint<Md>>,
212    ) -> AsyncTask<Frntend, Bkend, Md>
213    where
214        R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
215        Bkend: 'static,
216        Frntend: 'static,
217    {
218        let metadata = R::metadata();
219        let type_id = request.type_id();
220        let type_name = type_name::<R>();
221        let type_debug = format!("{request:?}");
222        let task = Box::new(move |b: &Bkend| {
223            let stream = request.into_stream(b);
224            Box::new({
225                stream.map(move |output| {
226                    Box::new({
227                        let handler = handler.clone();
228                        move |frontend: &mut Frntend| handler.clone()(frontend, output)
229                    }) as DynStateMutation<Frntend, Bkend, Md>
230                })
231            }) as DynMutationStream<Frntend, Bkend, Md>
232        }) as DynStreamTask<Frntend, Bkend, Md>;
233        let task = StreamTask {
234            task,
235            type_id,
236            type_name,
237            type_debug,
238        };
239        AsyncTask {
240            task: AsyncTaskKind::Stream(task),
241            constraint,
242            metadata,
243        }
244    }
245    /// # Warning
246    /// This is recursive, if you have set up a cycle of AsyncTasks, map may
247    /// overflow.
248    pub fn map<NewFrntend>(
249        self,
250        f: impl Fn(&mut NewFrntend) -> &mut Frntend + Clone + Send + 'static,
251    ) -> AsyncTask<NewFrntend, Bkend, Md>
252    where
253        Bkend: 'static,
254        Frntend: 'static,
255        Md: 'static,
256    {
257        let Self {
258            task,
259            constraint,
260            metadata,
261        } = self;
262        match task {
263            AsyncTaskKind::Future(FutureTask {
264                task,
265                type_id,
266                type_name,
267                type_debug,
268            }) => {
269                let task = Box::new(|b: &Bkend| {
270                    Box::new(task(b).map(|task| {
271                        Box::new(|nf: &mut NewFrntend| {
272                            let task = task(f(nf));
273                            task.map(f)
274                        }) as DynStateMutation<NewFrntend, Bkend, Md>
275                    })) as DynMutationFuture<NewFrntend, Bkend, Md>
276                }) as DynFutureTask<NewFrntend, Bkend, Md>;
277                let task = FutureTask {
278                    task,
279                    type_id,
280                    type_name,
281                    type_debug,
282                };
283                AsyncTask {
284                    task: AsyncTaskKind::Future(task),
285                    constraint,
286                    metadata,
287                }
288            }
289            AsyncTaskKind::Stream(StreamTask {
290                task,
291                type_id,
292                type_name,
293                type_debug,
294            }) => {
295                let task = Box::new(|b: &Bkend| {
296                    Box::new({
297                        task(b).map(move |task| {
298                            Box::new({
299                                let f = f.clone();
300                                move |nf: &mut NewFrntend| {
301                                    let task = task(f(nf));
302                                    task.map(f.clone())
303                                }
304                            })
305                                as DynStateMutation<NewFrntend, Bkend, Md>
306                        })
307                    }) as DynMutationStream<NewFrntend, Bkend, Md>
308                }) as DynStreamTask<NewFrntend, Bkend, Md>;
309                let stream_task = StreamTask {
310                    task,
311                    type_id,
312                    type_name,
313                    type_debug,
314                };
315                AsyncTask {
316                    task: AsyncTaskKind::Stream(stream_task),
317                    constraint,
318                    metadata,
319                }
320            }
321            AsyncTaskKind::NoOp => AsyncTask {
322                task: AsyncTaskKind::NoOp,
323                constraint,
324                metadata,
325            },
326            AsyncTaskKind::Multi(v) => {
327                let mapped = v.into_iter().map(|task| task.map(f.clone())).collect();
328                AsyncTask {
329                    task: AsyncTaskKind::Multi(mapped),
330                    constraint,
331                    metadata,
332                }
333            }
334        }
335    }
336}
337
338pub(crate) struct TaskList<Bkend, Frntend, Md> {
339    pub inner: Vec<SpawnedTask<Bkend, Frntend, Md>>,
340}
341
342pub(crate) struct SpawnedTask<Frntend, Bkend, Md> {
343    pub(crate) type_id: TypeId,
344    pub(crate) type_name: &'static str,
345    pub(crate) type_debug: Arc<String>,
346    pub(crate) receiver: TaskWaiter<Frntend, Bkend, Md>,
347    pub(crate) task_id: TaskId,
348    pub(crate) metadata: Vec<Md>,
349}
350
351/// User visible struct for introspection.
352#[derive(Debug, Clone)]
353pub struct TaskInformation<'a, Cstrnt> {
354    pub type_id: TypeId,
355    pub type_name: &'static str,
356    pub type_debug: &'a str,
357    pub constraint: &'a Option<Constraint<Cstrnt>>,
358}
359
360#[derive(Eq, PartialEq, Debug)]
361pub struct Constraint<Cstrnt> {
362    pub(crate) constraint_type: ConstraitType<Cstrnt>,
363}
364
365#[derive(Eq, PartialEq, Debug)]
366pub enum ConstraitType<Cstrnt> {
367    BlockSameType,
368    KillSameType,
369    BlockMatchingMetatdata(Cstrnt),
370}
371
372pub(crate) enum TaskWaiter<Frntend, Bkend, Md> {
373    Future(JoinHandle<DynStateMutation<Frntend, Bkend, Md>>),
374    Stream {
375        receiver: mpsc::Receiver<DynStateMutation<Frntend, Bkend, Md>>,
376        abort_handle: AbortHandle,
377    },
378}
379
380impl<Frntend, Bkend, Md> TaskWaiter<Frntend, Bkend, Md> {
381    fn kill(&mut self) {
382        match self {
383            TaskWaiter::Future(handle) => handle.abort(),
384            TaskWaiter::Stream {
385                abort_handle: abort,
386                ..
387            } => abort.abort(),
388        }
389    }
390}
391
392pub enum TaskOutcome<Frntend, Bkend, Md> {
393    /// No task was recieved because a stream closed, but there are still more
394    /// tasks.
395    StreamClosed,
396    /// No task was recieved because the next task panicked.
397    /// Currently only applicable to Future type tasks.
398    // TODO: Implement for Stream type tasks.
399    TaskPanicked {
400        error: JoinError,
401        type_id: TypeId,
402        type_name: &'static str,
403        type_debug: Arc<String>,
404        task_id: TaskId,
405    },
406    /// Mutation was received from a task.
407    MutationReceived {
408        mutation: DynStateMutation<Frntend, Bkend, Md>,
409        type_id: TypeId,
410        type_name: &'static str,
411        type_debug: Arc<String>,
412        task_id: TaskId,
413    },
414}
415
416impl<Bkend, Frntend, Md: PartialEq> TaskList<Frntend, Bkend, Md> {
417    pub(crate) fn new() -> Self {
418        Self { inner: vec![] }
419    }
420    /// Await for the next response from one of the spawned tasks.
421    pub(crate) async fn get_next_response(&mut self) -> Option<TaskOutcome<Frntend, Bkend, Md>> {
422        let task_completed = self
423            .inner
424            .iter_mut()
425            .enumerate()
426            .map(|(idx, task)| async move {
427                match task.receiver {
428                    TaskWaiter::Future(ref mut receiver) => match receiver.await {
429                        Ok(mutation) => (
430                            Some(idx),
431                            TaskOutcome::MutationReceived {
432                                mutation,
433                                type_id: task.type_id,
434                                type_debug: task.type_debug.clone(),
435                                task_id: task.task_id,
436                                type_name: task.type_name,
437                            },
438                        ),
439                        Err(error) => (
440                            Some(idx),
441                            TaskOutcome::TaskPanicked {
442                                type_id: task.type_id,
443                                type_name: task.type_name,
444                                type_debug: task.type_debug.clone(),
445                                task_id: task.task_id,
446                                error,
447                            },
448                        ),
449                    },
450                    TaskWaiter::Stream {
451                        ref mut receiver, ..
452                    } => {
453                        if let Some(mutation) = receiver.recv().await {
454                            return (
455                                None,
456                                TaskOutcome::MutationReceived {
457                                    mutation,
458                                    type_id: task.type_id,
459                                    type_name: task.type_name,
460                                    task_id: task.task_id,
461                                    type_debug: task.type_debug.clone(),
462                                },
463                            );
464                        }
465                        (Some(idx), TaskOutcome::StreamClosed)
466                    }
467                }
468            })
469            .collect::<FuturesUnordered<_>>()
470            .next()
471            .await;
472        let (maybe_completed_id, outcome) = task_completed?;
473        if let Some(completed_id) = maybe_completed_id {
474            // Safe - this value is in range as produced from enumerate on
475            // original list.
476            self.inner.swap_remove(completed_id);
477        };
478        Some(outcome)
479    }
480    pub(crate) fn push(&mut self, task: SpawnedTask<Frntend, Bkend, Md>) {
481        self.inner.push(task)
482    }
483    // TODO: Tests
484    pub(crate) fn handle_constraint(&mut self, constraint: Constraint<Md>, type_id: TypeId) {
485        // TODO: Consider the situation where one component kills tasks belonging to
486        // another component.
487        //
488        // Assuming here that kill implies block also.
489        let task_doesnt_match_constraint = |task: &SpawnedTask<_, _, _>| (task.type_id != type_id);
490        let task_doesnt_match_metadata =
491            |task: &SpawnedTask<_, _, _>, constraint| !task.metadata.contains(constraint);
492        match constraint.constraint_type {
493            ConstraitType::BlockMatchingMetatdata(metadata) => self
494                .inner
495                .retain(|task| task_doesnt_match_metadata(task, &metadata)),
496            ConstraitType::BlockSameType => {
497                self.inner.retain(task_doesnt_match_constraint);
498            }
499            ConstraitType::KillSameType => self.inner.retain_mut(|task| {
500                if !task_doesnt_match_constraint(task) {
501                    task.receiver.kill();
502                    return false;
503                }
504                true
505            }),
506        }
507    }
508}
509
510impl<Cstrnt> Constraint<Cstrnt> {
511    pub fn new_block_same_type() -> Self {
512        Self {
513            constraint_type: ConstraitType::BlockSameType,
514        }
515    }
516    pub fn new_kill_same_type() -> Self {
517        Self {
518            constraint_type: ConstraitType::KillSameType,
519        }
520    }
521    pub fn new_block_matching_metadata(metadata: Cstrnt) -> Self {
522        Self {
523            constraint_type: ConstraitType::BlockMatchingMetatdata(metadata),
524        }
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use crate::{AsyncTask, BackendStreamingTask, BackendTask};
531    use futures::StreamExt;
532    #[derive(Debug)]
533    struct Task1;
534    #[derive(Debug)]
535    struct Task2;
536    #[derive(Debug)]
537    struct StreamingTask;
538    impl BackendTask<()> for Task1 {
539        type Output = ();
540        type MetadataType = ();
541        #[allow(clippy::manual_async_fn)]
542        fn into_future(
543            self,
544            _: &(),
545        ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
546            async {}
547        }
548    }
549    impl BackendTask<()> for Task2 {
550        type Output = ();
551        type MetadataType = ();
552        #[allow(clippy::manual_async_fn)]
553        fn into_future(
554            self,
555            _: &(),
556        ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
557            async {}
558        }
559    }
560    impl BackendStreamingTask<()> for StreamingTask {
561        type Output = ();
562        type MetadataType = ();
563        fn into_stream(
564            self,
565            _: &(),
566        ) -> impl futures::Stream<Item = Self::Output> + Send + Unpin + 'static {
567            futures::stream::once(async move {}).boxed()
568        }
569    }
570    #[tokio::test]
571    async fn test_recursive_map() {
572        let recursive_task = AsyncTask::new_stream_chained(
573            StreamingTask,
574            |_: &mut (), _| {
575                AsyncTask::new_future_chained(
576                    Task1,
577                    |_: &mut (), _| AsyncTask::new_future(Task2, |_: &mut (), _| {}, None),
578                    None,
579                )
580            },
581            None,
582        );
583        // Here, it's expected that this is succesful.
584        // TODO: Run the task for an expected outcome.
585        #[allow(unused_must_use)]
586        let _ = recursive_task.map(|tmp: &mut ()| tmp);
587    }
588}