1use crate::{
2 BackendStreamingTask, BackendTask, DynFutureTask, DynMutationFuture, DynMutationStream,
3 DynStateMutation, DynStreamTask, TaskId,
4};
5use futures::stream::FuturesUnordered;
6use futures::{FutureExt, StreamExt};
7use std::any::{type_name, TypeId};
8use std::fmt::Debug;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tokio::task::{AbortHandle, JoinError, JoinHandle};
12
13#[must_use = "AsyncTasks do nothing unless you run them"]
16pub struct AsyncTask<Frntend, Bkend, Md> {
17 pub(crate) task: AsyncTaskKind<Frntend, Bkend, Md>,
18 pub(crate) constraint: Option<Constraint<Md>>,
19 pub(crate) metadata: Vec<Md>,
20}
21
22pub(crate) enum AsyncTaskKind<Frntend, Bkend, Md> {
23 Future(FutureTask<Frntend, Bkend, Md>),
24 Stream(StreamTask<Frntend, Bkend, Md>),
25 Multi(Vec<AsyncTask<Frntend, Bkend, Md>>),
26 NoOp,
27}
28
29pub(crate) struct StreamTask<Frntend, Bkend, Md> {
30 pub(crate) task: DynStreamTask<Frntend, Bkend, Md>,
31 pub(crate) type_id: TypeId,
32 pub(crate) type_name: &'static str,
33 pub(crate) type_debug: String,
34}
35
36pub(crate) struct FutureTask<Frntend, Bkend, Md> {
37 pub(crate) task: DynFutureTask<Frntend, Bkend, Md>,
38 pub(crate) type_id: TypeId,
39 pub(crate) type_name: &'static str,
40 pub(crate) type_debug: String,
41}
42
43impl<Frntend, Bkend, Md> FromIterator<AsyncTask<Frntend, Bkend, Md>>
44 for AsyncTask<Frntend, Bkend, Md>
45{
46 fn from_iter<T: IntoIterator<Item = AsyncTask<Frntend, Bkend, Md>>>(iter: T) -> Self {
47 let v = iter.into_iter().collect();
48 AsyncTask {
50 task: AsyncTaskKind::Multi(v),
51 constraint: None,
52 metadata: vec![],
53 }
54 }
55}
56
57impl<Frntend, Bkend, Md> AsyncTask<Frntend, Bkend, Md> {
58 pub fn push(self, next: AsyncTask<Frntend, Bkend, Md>) -> AsyncTask<Frntend, Bkend, Md> {
59 match self.task {
60 AsyncTaskKind::Future(_) | AsyncTaskKind::Stream(_) => {
61 let v = vec![self, next];
62 AsyncTask {
63 task: AsyncTaskKind::Multi(v),
64 constraint: None,
65 metadata: vec![],
66 }
67 }
68 AsyncTaskKind::Multi(mut m) => {
69 m.push(next);
70 AsyncTask {
71 task: AsyncTaskKind::Multi(m),
72 constraint: self.constraint,
73 metadata: self.metadata,
74 }
75 }
76 AsyncTaskKind::NoOp => next,
77 }
78 }
79 pub fn is_no_op(&self) -> bool {
80 matches!(self.task, AsyncTaskKind::NoOp)
81 }
82 pub fn new_no_op() -> AsyncTask<Frntend, Bkend, Md> {
83 Self {
84 task: AsyncTaskKind::NoOp,
85 constraint: None,
86 metadata: vec![],
87 }
88 }
89 pub fn new_future<R>(
90 request: R,
91 handler: impl FnOnce(&mut Frntend, R::Output) + Send + 'static,
92 constraint: Option<Constraint<Md>>,
93 ) -> AsyncTask<Frntend, Bkend, Md>
94 where
95 R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
96 Bkend: 'static,
97 Frntend: 'static,
98 {
99 let metadata = R::metadata();
100 let type_id = request.type_id();
101 let type_name = type_name::<R>();
102 let type_debug = format!("{request:?}");
103 let task = Box::new(move |b: &Bkend| {
104 Box::new({
105 let future = request.into_future(b);
106 Box::pin(async move {
107 let output = future.await;
108 Box::new(move |frontend: &mut Frntend| {
109 handler(frontend, output);
110 AsyncTask::new_no_op()
111 }) as DynStateMutation<Frntend, Bkend, Md>
112 })
113 }) as DynMutationFuture<Frntend, Bkend, Md>
114 }) as DynFutureTask<Frntend, Bkend, Md>;
115 let task = FutureTask {
116 task,
117 type_id,
118 type_name,
119 type_debug,
120 };
121 AsyncTask {
122 task: AsyncTaskKind::Future(task),
123 constraint,
124 metadata,
125 }
126 }
127 pub fn new_future_chained<R>(
128 request: R,
129 handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md> + Send + 'static,
130 constraint: Option<Constraint<Md>>,
131 ) -> AsyncTask<Frntend, Bkend, Md>
132 where
133 R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
134 Bkend: 'static,
135 Frntend: 'static,
136 {
137 let metadata = R::metadata();
138 let type_id = request.type_id();
139 let type_name = type_name::<R>();
140 let type_debug = format!("{request:?}");
141 let task = Box::new(move |b: &Bkend| {
142 Box::new({
143 let future = request.into_future(b);
144 Box::pin(async move {
145 let output = future.await;
146 Box::new(move |frontend: &mut Frntend| handler(frontend, output))
147 as DynStateMutation<Frntend, Bkend, Md>
148 })
149 }) as DynMutationFuture<Frntend, Bkend, Md>
150 }) as DynFutureTask<Frntend, Bkend, Md>;
151 let task = FutureTask {
152 task,
153 type_id,
154 type_name,
155 type_debug,
156 };
157 AsyncTask {
158 task: AsyncTaskKind::Future(task),
159 constraint,
160 metadata,
161 }
162 }
163 pub fn new_stream<R>(
164 request: R,
165 handler: impl FnOnce(&mut Frntend, R::Output) + Send + Clone + 'static,
167 constraint: Option<Constraint<Md>>,
168 ) -> AsyncTask<Frntend, Bkend, Md>
169 where
170 R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
171 Bkend: 'static,
172 Frntend: 'static,
173 {
174 let metadata = R::metadata();
175 let type_id = request.type_id();
176 let type_name = type_name::<R>();
177 let type_debug = format!("{request:?}");
178 let task = Box::new(move |b: &Bkend| {
179 let stream = request.into_stream(b);
180 Box::new({
181 stream.map(move |output| {
182 Box::new({
183 let handler = handler.clone();
184 move |frontend: &mut Frntend| {
185 handler.clone()(frontend, output);
186 AsyncTask::new_no_op()
187 }
188 }) as DynStateMutation<Frntend, Bkend, Md>
189 })
190 }) as DynMutationStream<Frntend, Bkend, Md>
191 }) as DynStreamTask<Frntend, Bkend, Md>;
192 let task = StreamTask {
193 task,
194 type_id,
195 type_name,
196 type_debug,
197 };
198 AsyncTask {
199 task: AsyncTaskKind::Stream(task),
200 constraint,
201 metadata,
202 }
203 }
204 pub fn new_stream_chained<R>(
205 request: R,
206 handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md>
208 + Send
209 + Clone
210 + 'static,
211 constraint: Option<Constraint<Md>>,
212 ) -> AsyncTask<Frntend, Bkend, Md>
213 where
214 R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
215 Bkend: 'static,
216 Frntend: 'static,
217 {
218 let metadata = R::metadata();
219 let type_id = request.type_id();
220 let type_name = type_name::<R>();
221 let type_debug = format!("{request:?}");
222 let task = Box::new(move |b: &Bkend| {
223 let stream = request.into_stream(b);
224 Box::new({
225 stream.map(move |output| {
226 Box::new({
227 let handler = handler.clone();
228 move |frontend: &mut Frntend| handler.clone()(frontend, output)
229 }) as DynStateMutation<Frntend, Bkend, Md>
230 })
231 }) as DynMutationStream<Frntend, Bkend, Md>
232 }) as DynStreamTask<Frntend, Bkend, Md>;
233 let task = StreamTask {
234 task,
235 type_id,
236 type_name,
237 type_debug,
238 };
239 AsyncTask {
240 task: AsyncTaskKind::Stream(task),
241 constraint,
242 metadata,
243 }
244 }
245 pub fn map<NewFrntend>(
249 self,
250 f: impl Fn(&mut NewFrntend) -> &mut Frntend + Clone + Send + 'static,
251 ) -> AsyncTask<NewFrntend, Bkend, Md>
252 where
253 Bkend: 'static,
254 Frntend: 'static,
255 Md: 'static,
256 {
257 let Self {
258 task,
259 constraint,
260 metadata,
261 } = self;
262 match task {
263 AsyncTaskKind::Future(FutureTask {
264 task,
265 type_id,
266 type_name,
267 type_debug,
268 }) => {
269 let task = Box::new(|b: &Bkend| {
270 Box::new(task(b).map(|task| {
271 Box::new(|nf: &mut NewFrntend| {
272 let task = task(f(nf));
273 task.map(f)
274 }) as DynStateMutation<NewFrntend, Bkend, Md>
275 })) as DynMutationFuture<NewFrntend, Bkend, Md>
276 }) as DynFutureTask<NewFrntend, Bkend, Md>;
277 let task = FutureTask {
278 task,
279 type_id,
280 type_name,
281 type_debug,
282 };
283 AsyncTask {
284 task: AsyncTaskKind::Future(task),
285 constraint,
286 metadata,
287 }
288 }
289 AsyncTaskKind::Stream(StreamTask {
290 task,
291 type_id,
292 type_name,
293 type_debug,
294 }) => {
295 let task = Box::new(|b: &Bkend| {
296 Box::new({
297 task(b).map(move |task| {
298 Box::new({
299 let f = f.clone();
300 move |nf: &mut NewFrntend| {
301 let task = task(f(nf));
302 task.map(f.clone())
303 }
304 })
305 as DynStateMutation<NewFrntend, Bkend, Md>
306 })
307 }) as DynMutationStream<NewFrntend, Bkend, Md>
308 }) as DynStreamTask<NewFrntend, Bkend, Md>;
309 let stream_task = StreamTask {
310 task,
311 type_id,
312 type_name,
313 type_debug,
314 };
315 AsyncTask {
316 task: AsyncTaskKind::Stream(stream_task),
317 constraint,
318 metadata,
319 }
320 }
321 AsyncTaskKind::NoOp => AsyncTask {
322 task: AsyncTaskKind::NoOp,
323 constraint,
324 metadata,
325 },
326 AsyncTaskKind::Multi(v) => {
327 let mapped = v.into_iter().map(|task| task.map(f.clone())).collect();
328 AsyncTask {
329 task: AsyncTaskKind::Multi(mapped),
330 constraint,
331 metadata,
332 }
333 }
334 }
335 }
336}
337
338pub(crate) struct TaskList<Bkend, Frntend, Md> {
339 pub inner: Vec<SpawnedTask<Bkend, Frntend, Md>>,
340}
341
342pub(crate) struct SpawnedTask<Frntend, Bkend, Md> {
343 pub(crate) type_id: TypeId,
344 pub(crate) type_name: &'static str,
345 pub(crate) type_debug: Arc<String>,
346 pub(crate) receiver: TaskWaiter<Frntend, Bkend, Md>,
347 pub(crate) task_id: TaskId,
348 pub(crate) metadata: Vec<Md>,
349}
350
351#[derive(Debug, Clone)]
353pub struct TaskInformation<'a, Cstrnt> {
354 pub type_id: TypeId,
355 pub type_name: &'static str,
356 pub type_debug: &'a str,
357 pub constraint: &'a Option<Constraint<Cstrnt>>,
358}
359
360#[derive(Eq, PartialEq, Debug)]
361pub struct Constraint<Cstrnt> {
362 pub(crate) constraint_type: ConstraitType<Cstrnt>,
363}
364
365#[derive(Eq, PartialEq, Debug)]
366pub enum ConstraitType<Cstrnt> {
367 BlockSameType,
368 KillSameType,
369 BlockMatchingMetatdata(Cstrnt),
370}
371
372pub(crate) enum TaskWaiter<Frntend, Bkend, Md> {
373 Future(JoinHandle<DynStateMutation<Frntend, Bkend, Md>>),
374 Stream {
375 receiver: mpsc::Receiver<DynStateMutation<Frntend, Bkend, Md>>,
376 abort_handle: AbortHandle,
377 },
378}
379
380impl<Frntend, Bkend, Md> TaskWaiter<Frntend, Bkend, Md> {
381 fn kill(&mut self) {
382 match self {
383 TaskWaiter::Future(handle) => handle.abort(),
384 TaskWaiter::Stream {
385 abort_handle: abort,
386 ..
387 } => abort.abort(),
388 }
389 }
390}
391
392pub enum TaskOutcome<Frntend, Bkend, Md> {
393 StreamClosed,
396 TaskPanicked {
400 error: JoinError,
401 type_id: TypeId,
402 type_name: &'static str,
403 type_debug: Arc<String>,
404 task_id: TaskId,
405 },
406 MutationReceived {
408 mutation: DynStateMutation<Frntend, Bkend, Md>,
409 type_id: TypeId,
410 type_name: &'static str,
411 type_debug: Arc<String>,
412 task_id: TaskId,
413 },
414}
415
416impl<Bkend, Frntend, Md: PartialEq> TaskList<Frntend, Bkend, Md> {
417 pub(crate) fn new() -> Self {
418 Self { inner: vec![] }
419 }
420 pub(crate) async fn get_next_response(&mut self) -> Option<TaskOutcome<Frntend, Bkend, Md>> {
422 let task_completed = self
423 .inner
424 .iter_mut()
425 .enumerate()
426 .map(|(idx, task)| async move {
427 match task.receiver {
428 TaskWaiter::Future(ref mut receiver) => match receiver.await {
429 Ok(mutation) => (
430 Some(idx),
431 TaskOutcome::MutationReceived {
432 mutation,
433 type_id: task.type_id,
434 type_debug: task.type_debug.clone(),
435 task_id: task.task_id,
436 type_name: task.type_name,
437 },
438 ),
439 Err(error) => (
440 Some(idx),
441 TaskOutcome::TaskPanicked {
442 type_id: task.type_id,
443 type_name: task.type_name,
444 type_debug: task.type_debug.clone(),
445 task_id: task.task_id,
446 error,
447 },
448 ),
449 },
450 TaskWaiter::Stream {
451 ref mut receiver, ..
452 } => {
453 if let Some(mutation) = receiver.recv().await {
454 return (
455 None,
456 TaskOutcome::MutationReceived {
457 mutation,
458 type_id: task.type_id,
459 type_name: task.type_name,
460 task_id: task.task_id,
461 type_debug: task.type_debug.clone(),
462 },
463 );
464 }
465 (Some(idx), TaskOutcome::StreamClosed)
466 }
467 }
468 })
469 .collect::<FuturesUnordered<_>>()
470 .next()
471 .await;
472 let (maybe_completed_id, outcome) = task_completed?;
473 if let Some(completed_id) = maybe_completed_id {
474 self.inner.swap_remove(completed_id);
477 };
478 Some(outcome)
479 }
480 pub(crate) fn push(&mut self, task: SpawnedTask<Frntend, Bkend, Md>) {
481 self.inner.push(task)
482 }
483 pub(crate) fn handle_constraint(&mut self, constraint: Constraint<Md>, type_id: TypeId) {
485 let task_doesnt_match_constraint = |task: &SpawnedTask<_, _, _>| (task.type_id != type_id);
490 let task_doesnt_match_metadata =
491 |task: &SpawnedTask<_, _, _>, constraint| !task.metadata.contains(constraint);
492 match constraint.constraint_type {
493 ConstraitType::BlockMatchingMetatdata(metadata) => self
494 .inner
495 .retain(|task| task_doesnt_match_metadata(task, &metadata)),
496 ConstraitType::BlockSameType => {
497 self.inner.retain(task_doesnt_match_constraint);
498 }
499 ConstraitType::KillSameType => self.inner.retain_mut(|task| {
500 if !task_doesnt_match_constraint(task) {
501 task.receiver.kill();
502 return false;
503 }
504 true
505 }),
506 }
507 }
508}
509
510impl<Cstrnt> Constraint<Cstrnt> {
511 pub fn new_block_same_type() -> Self {
512 Self {
513 constraint_type: ConstraitType::BlockSameType,
514 }
515 }
516 pub fn new_kill_same_type() -> Self {
517 Self {
518 constraint_type: ConstraitType::KillSameType,
519 }
520 }
521 pub fn new_block_matching_metadata(metadata: Cstrnt) -> Self {
522 Self {
523 constraint_type: ConstraitType::BlockMatchingMetatdata(metadata),
524 }
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use crate::{AsyncTask, BackendStreamingTask, BackendTask};
531 use futures::StreamExt;
532 #[derive(Debug)]
533 struct Task1;
534 #[derive(Debug)]
535 struct Task2;
536 #[derive(Debug)]
537 struct StreamingTask;
538 impl BackendTask<()> for Task1 {
539 type Output = ();
540 type MetadataType = ();
541 #[allow(clippy::manual_async_fn)]
542 fn into_future(
543 self,
544 _: &(),
545 ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
546 async {}
547 }
548 }
549 impl BackendTask<()> for Task2 {
550 type Output = ();
551 type MetadataType = ();
552 #[allow(clippy::manual_async_fn)]
553 fn into_future(
554 self,
555 _: &(),
556 ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
557 async {}
558 }
559 }
560 impl BackendStreamingTask<()> for StreamingTask {
561 type Output = ();
562 type MetadataType = ();
563 fn into_stream(
564 self,
565 _: &(),
566 ) -> impl futures::Stream<Item = Self::Output> + Send + Unpin + 'static {
567 futures::stream::once(async move {}).boxed()
568 }
569 }
570 #[tokio::test]
571 async fn test_recursive_map() {
572 let recursive_task = AsyncTask::new_stream_chained(
573 StreamingTask,
574 |_: &mut (), _| {
575 AsyncTask::new_future_chained(
576 Task1,
577 |_: &mut (), _| AsyncTask::new_future(Task2, |_: &mut (), _| {}, None),
578 None,
579 )
580 },
581 None,
582 );
583 #[allow(unused_must_use)]
586 let _ = recursive_task.map(|tmp: &mut ()| tmp);
587 }
588}