async_callback_manager/manager/
task_list.rs

1use crate::task::dyn_task::DynStateMutation;
2use crate::{Constraint, ConstraitType, TaskId};
3use futures::stream::FuturesUnordered;
4use std::any::TypeId;
5use std::sync::Arc;
6use tokio::sync::mpsc;
7use tokio::task::{JoinError, JoinHandle};
8use tokio_stream::StreamExt;
9
10pub(crate) struct TaskList<Bkend, Frntend, Md> {
11    pub inner: Vec<SpawnedTask<Bkend, Frntend, Md>>,
12}
13
14pub(crate) struct SpawnedTask<Frntend, Bkend, Md> {
15    pub(crate) type_id: TypeId,
16    pub(crate) type_name: &'static str,
17    pub(crate) type_debug: Arc<String>,
18    pub(crate) receiver: TaskWaiter<Frntend, Bkend, Md>,
19    pub(crate) task_id: TaskId,
20    pub(crate) metadata: Vec<Md>,
21}
22
23/// User visible struct for introspection.
24#[derive(Debug, Clone)]
25pub struct TaskInformation<'a, Cstrnt> {
26    pub type_id: TypeId,
27    pub type_name: &'static str,
28    pub type_debug: &'a str,
29    pub constraint: &'a Option<Constraint<Cstrnt>>,
30}
31
32pub(crate) enum TaskWaiter<Frntend, Bkend, Md> {
33    Future(JoinHandle<DynStateMutation<Frntend, Bkend, Md>>),
34    Stream {
35        receiver: mpsc::Receiver<DynStateMutation<Frntend, Bkend, Md>>,
36        join_handle: JoinHandle<()>,
37    },
38}
39
40impl<Frntend, Bkend, Md> TaskWaiter<Frntend, Bkend, Md> {
41    fn kill(&mut self) {
42        match self {
43            TaskWaiter::Future(handle) => handle.abort(),
44            TaskWaiter::Stream { join_handle, .. } => join_handle.abort_handle().abort(),
45        }
46    }
47}
48
49pub enum TaskOutcome<Frntend, Bkend, Md> {
50    /// The stream has completed, it won't be sending any more tasks.
51    StreamFinished {
52        type_id: TypeId,
53        type_name: &'static str,
54        type_debug: Arc<String>,
55        task_id: TaskId,
56    },
57    /// The stream has panicked, it won't be sending any more tasks.
58    StreamPanicked {
59        error: JoinError,
60        type_id: TypeId,
61        type_name: &'static str,
62        type_debug: Arc<String>,
63        task_id: TaskId,
64    },
65    /// No task was recieved because the next task panicked.
66    TaskPanicked {
67        error: JoinError,
68        type_id: TypeId,
69        type_name: &'static str,
70        type_debug: Arc<String>,
71        task_id: TaskId,
72    },
73    /// Mutation was received from a task.
74    MutationReceived {
75        mutation: DynStateMutation<Frntend, Bkend, Md>,
76        type_id: TypeId,
77        type_name: &'static str,
78        type_debug: Arc<String>,
79        task_id: TaskId,
80    },
81}
82
83impl<Bkend, Frntend, Md: PartialEq> TaskList<Frntend, Bkend, Md> {
84    pub(crate) fn new() -> Self {
85        Self { inner: vec![] }
86    }
87    /// Await for the next response from one of the spawned tasks.
88    pub(crate) async fn get_next_response(&mut self) -> Option<TaskOutcome<Frntend, Bkend, Md>> {
89        let task_completed = self
90            .inner
91            .iter_mut()
92            .enumerate()
93            .map(|(idx, task)| async move {
94                match task.receiver {
95                    TaskWaiter::Future(ref mut receiver) => match receiver.await {
96                        Ok(mutation) => (
97                            Some(idx),
98                            TaskOutcome::MutationReceived {
99                                mutation,
100                                type_id: task.type_id,
101                                type_debug: task.type_debug.clone(),
102                                task_id: task.task_id,
103                                type_name: task.type_name,
104                            },
105                        ),
106                        Err(error) => (
107                            Some(idx),
108                            TaskOutcome::TaskPanicked {
109                                type_id: task.type_id,
110                                type_name: task.type_name,
111                                type_debug: task.type_debug.clone(),
112                                task_id: task.task_id,
113                                error,
114                            },
115                        ),
116                    },
117                    TaskWaiter::Stream {
118                        ref mut receiver,
119                        ref mut join_handle,
120                    } => {
121                        if let Some(mutation) = receiver.recv().await {
122                            return (
123                                None,
124                                TaskOutcome::MutationReceived {
125                                    mutation,
126                                    type_id: task.type_id,
127                                    type_name: task.type_name,
128                                    task_id: task.task_id,
129                                    type_debug: task.type_debug.clone(),
130                                },
131                            );
132                        };
133                        match join_handle.await {
134                            Err(error) if error.is_panic() => (
135                                Some(idx),
136                                TaskOutcome::StreamPanicked {
137                                    error,
138                                    type_id: task.type_id,
139                                    type_name: task.type_name,
140                                    type_debug: task.type_debug.clone(),
141                                    task_id: task.task_id,
142                                },
143                            ),
144                            // Ok case or Err case where Err is not a panic (ie, it's an abort).
145                            _ => (
146                                Some(idx),
147                                TaskOutcome::StreamFinished {
148                                    type_id: task.type_id,
149                                    type_name: task.type_name,
150                                    type_debug: task.type_debug.clone(),
151                                    task_id: task.task_id,
152                                },
153                            ),
154                        }
155                    }
156                }
157            })
158            .collect::<FuturesUnordered<_>>()
159            .next()
160            .await;
161        let (maybe_completed_idx, outcome) = task_completed?;
162        if let Some(completed_idx) = maybe_completed_idx {
163            // Safe - this value is in range as produced from enumerate on
164            // original list.
165            self.inner.swap_remove(completed_idx);
166        };
167        Some(outcome)
168    }
169    pub(crate) fn push(&mut self, task: SpawnedTask<Frntend, Bkend, Md>) {
170        self.inner.push(task)
171    }
172    // TODO: Tests
173    pub(crate) fn handle_constraint(&mut self, constraint: Constraint<Md>, type_id: TypeId) {
174        // TODO: Consider the situation where one component kills tasks belonging to
175        // another component.
176        //
177        // Assuming here that kill implies block also.
178        let task_doesnt_match_constraint = |task: &SpawnedTask<_, _, _>| task.type_id != type_id;
179        let task_doesnt_match_metadata =
180            |task: &SpawnedTask<_, _, _>, constraint| !task.metadata.contains(constraint);
181        match constraint.constraint_type {
182            ConstraitType::BlockMatchingMetatdata(metadata) => self
183                .inner
184                .retain(|task| task_doesnt_match_metadata(task, &metadata)),
185            ConstraitType::BlockSameType => {
186                self.inner.retain(task_doesnt_match_constraint);
187            }
188            ConstraitType::KillSameType => self.inner.retain_mut(|task| {
189                if !task_doesnt_match_constraint(task) {
190                    task.receiver.kill();
191                    return false;
192                }
193                true
194            }),
195        }
196    }
197}