async_callback_manager/manager/
task_list.rs1use crate::task::dyn_task::DynStateMutation;
2use crate::{Constraint, ConstraitType, TaskId};
3use futures::stream::FuturesUnordered;
4use std::any::TypeId;
5use std::sync::Arc;
6use tokio::sync::mpsc;
7use tokio::task::{JoinError, JoinHandle};
8use tokio_stream::StreamExt;
9
10pub(crate) struct TaskList<Bkend, Frntend, Md> {
11 pub inner: Vec<SpawnedTask<Bkend, Frntend, Md>>,
12}
13
14pub(crate) struct SpawnedTask<Frntend, Bkend, Md> {
15 pub(crate) type_id: TypeId,
16 pub(crate) type_name: &'static str,
17 pub(crate) type_debug: Arc<String>,
18 pub(crate) receiver: TaskWaiter<Frntend, Bkend, Md>,
19 pub(crate) task_id: TaskId,
20 pub(crate) metadata: Vec<Md>,
21}
22
23#[derive(Debug, Clone)]
25pub struct TaskInformation<'a, Cstrnt> {
26 pub type_id: TypeId,
27 pub type_name: &'static str,
28 pub type_debug: &'a str,
29 pub constraint: &'a Option<Constraint<Cstrnt>>,
30}
31
32pub(crate) enum TaskWaiter<Frntend, Bkend, Md> {
33 Future(JoinHandle<DynStateMutation<Frntend, Bkend, Md>>),
34 Stream {
35 receiver: mpsc::Receiver<DynStateMutation<Frntend, Bkend, Md>>,
36 join_handle: JoinHandle<()>,
37 },
38}
39
40impl<Frntend, Bkend, Md> TaskWaiter<Frntend, Bkend, Md> {
41 fn kill(&mut self) {
42 match self {
43 TaskWaiter::Future(handle) => handle.abort(),
44 TaskWaiter::Stream { join_handle, .. } => join_handle.abort_handle().abort(),
45 }
46 }
47}
48
49pub enum TaskOutcome<Frntend, Bkend, Md> {
50 StreamFinished {
52 type_id: TypeId,
53 type_name: &'static str,
54 type_debug: Arc<String>,
55 task_id: TaskId,
56 },
57 StreamPanicked {
59 error: JoinError,
60 type_id: TypeId,
61 type_name: &'static str,
62 type_debug: Arc<String>,
63 task_id: TaskId,
64 },
65 TaskPanicked {
67 error: JoinError,
68 type_id: TypeId,
69 type_name: &'static str,
70 type_debug: Arc<String>,
71 task_id: TaskId,
72 },
73 MutationReceived {
75 mutation: DynStateMutation<Frntend, Bkend, Md>,
76 type_id: TypeId,
77 type_name: &'static str,
78 type_debug: Arc<String>,
79 task_id: TaskId,
80 },
81}
82
83impl<Bkend, Frntend, Md: PartialEq> TaskList<Frntend, Bkend, Md> {
84 pub(crate) fn new() -> Self {
85 Self { inner: vec![] }
86 }
87 pub(crate) async fn get_next_response(&mut self) -> Option<TaskOutcome<Frntend, Bkend, Md>> {
89 let task_completed = self
90 .inner
91 .iter_mut()
92 .enumerate()
93 .map(|(idx, task)| async move {
94 match task.receiver {
95 TaskWaiter::Future(ref mut receiver) => match receiver.await {
96 Ok(mutation) => (
97 Some(idx),
98 TaskOutcome::MutationReceived {
99 mutation,
100 type_id: task.type_id,
101 type_debug: task.type_debug.clone(),
102 task_id: task.task_id,
103 type_name: task.type_name,
104 },
105 ),
106 Err(error) => (
107 Some(idx),
108 TaskOutcome::TaskPanicked {
109 type_id: task.type_id,
110 type_name: task.type_name,
111 type_debug: task.type_debug.clone(),
112 task_id: task.task_id,
113 error,
114 },
115 ),
116 },
117 TaskWaiter::Stream {
118 ref mut receiver,
119 ref mut join_handle,
120 } => {
121 if let Some(mutation) = receiver.recv().await {
122 return (
123 None,
124 TaskOutcome::MutationReceived {
125 mutation,
126 type_id: task.type_id,
127 type_name: task.type_name,
128 task_id: task.task_id,
129 type_debug: task.type_debug.clone(),
130 },
131 );
132 };
133 match join_handle.await {
134 Err(error) if error.is_panic() => (
135 Some(idx),
136 TaskOutcome::StreamPanicked {
137 error,
138 type_id: task.type_id,
139 type_name: task.type_name,
140 type_debug: task.type_debug.clone(),
141 task_id: task.task_id,
142 },
143 ),
144 _ => (
146 Some(idx),
147 TaskOutcome::StreamFinished {
148 type_id: task.type_id,
149 type_name: task.type_name,
150 type_debug: task.type_debug.clone(),
151 task_id: task.task_id,
152 },
153 ),
154 }
155 }
156 }
157 })
158 .collect::<FuturesUnordered<_>>()
159 .next()
160 .await;
161 let (maybe_completed_idx, outcome) = task_completed?;
162 if let Some(completed_idx) = maybe_completed_idx {
163 self.inner.swap_remove(completed_idx);
166 };
167 Some(outcome)
168 }
169 pub(crate) fn push(&mut self, task: SpawnedTask<Frntend, Bkend, Md>) {
170 self.inner.push(task)
171 }
172 pub(crate) fn handle_constraint(&mut self, constraint: Constraint<Md>, type_id: TypeId) {
174 let task_doesnt_match_constraint = |task: &SpawnedTask<_, _, _>| task.type_id != type_id;
179 let task_doesnt_match_metadata =
180 |task: &SpawnedTask<_, _, _>, constraint| !task.metadata.contains(constraint);
181 match constraint.constraint_type {
182 ConstraitType::BlockMatchingMetatdata(metadata) => self
183 .inner
184 .retain(|task| task_doesnt_match_metadata(task, &metadata)),
185 ConstraitType::BlockSameType => {
186 self.inner.retain(task_doesnt_match_constraint);
187 }
188 ConstraitType::KillSameType => self.inner.retain_mut(|task| {
189 if !task_doesnt_match_constraint(task) {
190 task.receiver.kill();
191 return false;
192 }
193 true
194 }),
195 }
196 }
197}