1use crate::{
2 BackendStreamingTask, BackendTask, DynFutureTask, DynMutationFuture, DynMutationStream,
3 DynStateMutation, DynStreamTask, TaskId,
4};
5use futures::stream::FuturesUnordered;
6use futures::{FutureExt, StreamExt};
7use std::any::{TypeId, type_name};
8use std::fmt::Debug;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use tokio::task::{JoinError, JoinHandle};
12
13#[must_use = "AsyncTasks do nothing unless you run them"]
16pub struct AsyncTask<Frntend, Bkend, Md> {
17 pub(crate) task: AsyncTaskKind<Frntend, Bkend, Md>,
18 pub(crate) constraint: Option<Constraint<Md>>,
19 pub(crate) metadata: Vec<Md>,
20}
21
22pub(crate) enum AsyncTaskKind<Frntend, Bkend, Md> {
23 Future(FutureTask<Frntend, Bkend, Md>),
24 Stream(StreamTask<Frntend, Bkend, Md>),
25 Multi(Vec<AsyncTask<Frntend, Bkend, Md>>),
26 NoOp,
27}
28
29pub(crate) struct StreamTask<Frntend, Bkend, Md> {
30 pub(crate) task: DynStreamTask<Frntend, Bkend, Md>,
31 pub(crate) type_id: TypeId,
32 pub(crate) type_name: &'static str,
33 pub(crate) type_debug: String,
34}
35
36pub(crate) struct FutureTask<Frntend, Bkend, Md> {
37 pub(crate) task: DynFutureTask<Frntend, Bkend, Md>,
38 pub(crate) type_id: TypeId,
39 pub(crate) type_name: &'static str,
40 pub(crate) type_debug: String,
41}
42
43impl<Frntend, Bkend, Md> FromIterator<AsyncTask<Frntend, Bkend, Md>>
44 for AsyncTask<Frntend, Bkend, Md>
45{
46 fn from_iter<T: IntoIterator<Item = AsyncTask<Frntend, Bkend, Md>>>(iter: T) -> Self {
47 let v = iter.into_iter().collect();
48 AsyncTask {
50 task: AsyncTaskKind::Multi(v),
51 constraint: None,
52 metadata: vec![],
53 }
54 }
55}
56
57impl<Frntend, Bkend, Md> AsyncTask<Frntend, Bkend, Md> {
58 pub fn push(self, next: AsyncTask<Frntend, Bkend, Md>) -> AsyncTask<Frntend, Bkend, Md> {
59 match self.task {
60 AsyncTaskKind::Future(_) | AsyncTaskKind::Stream(_) => {
61 let v = vec![self, next];
62 AsyncTask {
63 task: AsyncTaskKind::Multi(v),
64 constraint: None,
65 metadata: vec![],
66 }
67 }
68 AsyncTaskKind::Multi(mut m) => {
69 m.push(next);
70 AsyncTask {
71 task: AsyncTaskKind::Multi(m),
72 constraint: self.constraint,
73 metadata: self.metadata,
74 }
75 }
76 AsyncTaskKind::NoOp => next,
77 }
78 }
79 pub fn is_no_op(&self) -> bool {
80 matches!(self.task, AsyncTaskKind::NoOp)
81 }
82 pub fn new_no_op() -> AsyncTask<Frntend, Bkend, Md> {
83 Self {
84 task: AsyncTaskKind::NoOp,
85 constraint: None,
86 metadata: vec![],
87 }
88 }
89 pub fn new_future<R>(
90 request: R,
91 handler: impl FnOnce(&mut Frntend, R::Output) + Send + 'static,
92 constraint: Option<Constraint<Md>>,
93 ) -> AsyncTask<Frntend, Bkend, Md>
94 where
95 R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
96 Bkend: 'static,
97 Frntend: 'static,
98 {
99 let metadata = R::metadata();
100 let type_id = request.type_id();
101 let type_name = type_name::<R>();
102 let type_debug = format!("{request:?}");
103 let task = Box::new(move |b: &Bkend| {
104 Box::new({
105 let future = BackendTask::into_future(request, b);
106 Box::pin(async move {
107 let output = future.await;
108 Box::new(move |frontend: &mut Frntend| {
109 handler(frontend, output);
110 AsyncTask::new_no_op()
111 }) as DynStateMutation<Frntend, Bkend, Md>
112 })
113 }) as DynMutationFuture<Frntend, Bkend, Md>
114 }) as DynFutureTask<Frntend, Bkend, Md>;
115 let task = FutureTask {
116 task,
117 type_id,
118 type_name,
119 type_debug,
120 };
121 AsyncTask {
122 task: AsyncTaskKind::Future(task),
123 constraint,
124 metadata,
125 }
126 }
127 pub fn new_future_chained<R>(
128 request: R,
129 handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md> + Send + 'static,
130 constraint: Option<Constraint<Md>>,
131 ) -> AsyncTask<Frntend, Bkend, Md>
132 where
133 R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
134 Bkend: 'static,
135 Frntend: 'static,
136 {
137 let metadata = R::metadata();
138 let type_id = request.type_id();
139 let type_name = type_name::<R>();
140 let type_debug = format!("{request:?}");
141 let task = Box::new(move |b: &Bkend| {
142 Box::new({
143 let future = BackendTask::into_future(request, b);
144 Box::pin(async move {
145 let output = future.await;
146 Box::new(move |frontend: &mut Frntend| handler(frontend, output))
147 as DynStateMutation<Frntend, Bkend, Md>
148 })
149 }) as DynMutationFuture<Frntend, Bkend, Md>
150 }) as DynFutureTask<Frntend, Bkend, Md>;
151 let task = FutureTask {
152 task,
153 type_id,
154 type_name,
155 type_debug,
156 };
157 AsyncTask {
158 task: AsyncTaskKind::Future(task),
159 constraint,
160 metadata,
161 }
162 }
163 pub fn new_stream<R>(
164 request: R,
165 handler: impl FnOnce(&mut Frntend, R::Output) + Send + Clone + 'static,
167 constraint: Option<Constraint<Md>>,
168 ) -> AsyncTask<Frntend, Bkend, Md>
169 where
170 R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
171 Bkend: 'static,
172 Frntend: 'static,
173 {
174 let metadata = R::metadata();
175 let type_id = request.type_id();
176 let type_name = type_name::<R>();
177 let type_debug = format!("{request:?}");
178 let task = Box::new(move |b: &Bkend| {
179 let stream = request.into_stream(b);
180 Box::new({
181 stream.map(move |output| {
182 Box::new({
183 let handler = handler.clone();
184 move |frontend: &mut Frntend| {
185 handler.clone()(frontend, output);
186 AsyncTask::new_no_op()
187 }
188 }) as DynStateMutation<Frntend, Bkend, Md>
189 })
190 }) as DynMutationStream<Frntend, Bkend, Md>
191 }) as DynStreamTask<Frntend, Bkend, Md>;
192 let task = StreamTask {
193 task,
194 type_id,
195 type_name,
196 type_debug,
197 };
198 AsyncTask {
199 task: AsyncTaskKind::Stream(task),
200 constraint,
201 metadata,
202 }
203 }
204 pub fn new_stream_chained<R>(
205 request: R,
206 handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md>
208 + Send
209 + Clone
210 + 'static,
211 constraint: Option<Constraint<Md>>,
212 ) -> AsyncTask<Frntend, Bkend, Md>
213 where
214 R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
215 Bkend: 'static,
216 Frntend: 'static,
217 {
218 let metadata = R::metadata();
219 let type_id = request.type_id();
220 let type_name = type_name::<R>();
221 let type_debug = format!("{request:?}");
222 let task = Box::new(move |b: &Bkend| {
223 let stream = request.into_stream(b);
224 Box::new({
225 stream.map(move |output| {
226 Box::new({
227 let handler = handler.clone();
228 move |frontend: &mut Frntend| handler.clone()(frontend, output)
229 }) as DynStateMutation<Frntend, Bkend, Md>
230 })
231 }) as DynMutationStream<Frntend, Bkend, Md>
232 }) as DynStreamTask<Frntend, Bkend, Md>;
233 let task = StreamTask {
234 task,
235 type_id,
236 type_name,
237 type_debug,
238 };
239 AsyncTask {
240 task: AsyncTaskKind::Stream(task),
241 constraint,
242 metadata,
243 }
244 }
245 pub fn map<NewFrntend>(
249 self,
250 f: impl Fn(&mut NewFrntend) -> &mut Frntend + Clone + Send + 'static,
251 ) -> AsyncTask<NewFrntend, Bkend, Md>
252 where
253 Bkend: 'static,
254 Frntend: 'static,
255 Md: 'static,
256 {
257 let Self {
258 task,
259 constraint,
260 metadata,
261 } = self;
262 match task {
263 AsyncTaskKind::Future(FutureTask {
264 task,
265 type_id,
266 type_name,
267 type_debug,
268 }) => {
269 let task = Box::new(|b: &Bkend| {
270 Box::new(task(b).map(|task| {
271 Box::new(|nf: &mut NewFrntend| {
272 let task = task(f(nf));
273 task.map(f)
274 }) as DynStateMutation<NewFrntend, Bkend, Md>
275 })) as DynMutationFuture<NewFrntend, Bkend, Md>
276 }) as DynFutureTask<NewFrntend, Bkend, Md>;
277 let task = FutureTask {
278 task,
279 type_id,
280 type_name,
281 type_debug,
282 };
283 AsyncTask {
284 task: AsyncTaskKind::Future(task),
285 constraint,
286 metadata,
287 }
288 }
289 AsyncTaskKind::Stream(StreamTask {
290 task,
291 type_id,
292 type_name,
293 type_debug,
294 }) => {
295 let task = Box::new(|b: &Bkend| {
296 Box::new({
297 task(b).map(move |task| {
298 Box::new({
299 let f = f.clone();
300 move |nf: &mut NewFrntend| {
301 let task = task(f(nf));
302 task.map(f.clone())
303 }
304 })
305 as DynStateMutation<NewFrntend, Bkend, Md>
306 })
307 }) as DynMutationStream<NewFrntend, Bkend, Md>
308 }) as DynStreamTask<NewFrntend, Bkend, Md>;
309 let stream_task = StreamTask {
310 task,
311 type_id,
312 type_name,
313 type_debug,
314 };
315 AsyncTask {
316 task: AsyncTaskKind::Stream(stream_task),
317 constraint,
318 metadata,
319 }
320 }
321 AsyncTaskKind::NoOp => AsyncTask {
322 task: AsyncTaskKind::NoOp,
323 constraint,
324 metadata,
325 },
326 AsyncTaskKind::Multi(v) => {
327 let mapped = v.into_iter().map(|task| task.map(f.clone())).collect();
328 AsyncTask {
329 task: AsyncTaskKind::Multi(mapped),
330 constraint,
331 metadata,
332 }
333 }
334 }
335 }
336}
337
338pub(crate) struct TaskList<Bkend, Frntend, Md> {
339 pub inner: Vec<SpawnedTask<Bkend, Frntend, Md>>,
340}
341
342pub(crate) struct SpawnedTask<Frntend, Bkend, Md> {
343 pub(crate) type_id: TypeId,
344 pub(crate) type_name: &'static str,
345 pub(crate) type_debug: Arc<String>,
346 pub(crate) receiver: TaskWaiter<Frntend, Bkend, Md>,
347 pub(crate) task_id: TaskId,
348 pub(crate) metadata: Vec<Md>,
349}
350
351#[derive(Debug, Clone)]
353pub struct TaskInformation<'a, Cstrnt> {
354 pub type_id: TypeId,
355 pub type_name: &'static str,
356 pub type_debug: &'a str,
357 pub constraint: &'a Option<Constraint<Cstrnt>>,
358}
359
360#[derive(Eq, PartialEq, Debug)]
361pub struct Constraint<Cstrnt> {
362 pub(crate) constraint_type: ConstraitType<Cstrnt>,
363}
364
365#[derive(Eq, PartialEq, Debug)]
366pub enum ConstraitType<Cstrnt> {
367 BlockSameType,
368 KillSameType,
369 BlockMatchingMetatdata(Cstrnt),
370}
371
372pub(crate) enum TaskWaiter<Frntend, Bkend, Md> {
373 Future(JoinHandle<DynStateMutation<Frntend, Bkend, Md>>),
374 Stream {
375 receiver: mpsc::Receiver<DynStateMutation<Frntend, Bkend, Md>>,
376 join_handle: JoinHandle<()>,
377 },
378}
379
380impl<Frntend, Bkend, Md> TaskWaiter<Frntend, Bkend, Md> {
381 fn kill(&mut self) {
382 match self {
383 TaskWaiter::Future(handle) => handle.abort(),
384 TaskWaiter::Stream { join_handle, .. } => join_handle.abort_handle().abort(),
385 }
386 }
387}
388
389pub enum TaskOutcome<Frntend, Bkend, Md> {
390 StreamFinished {
392 type_id: TypeId,
393 type_name: &'static str,
394 type_debug: Arc<String>,
395 task_id: TaskId,
396 },
397 StreamPanicked {
399 error: JoinError,
400 type_id: TypeId,
401 type_name: &'static str,
402 type_debug: Arc<String>,
403 task_id: TaskId,
404 },
405 TaskPanicked {
407 error: JoinError,
408 type_id: TypeId,
409 type_name: &'static str,
410 type_debug: Arc<String>,
411 task_id: TaskId,
412 },
413 MutationReceived {
415 mutation: DynStateMutation<Frntend, Bkend, Md>,
416 type_id: TypeId,
417 type_name: &'static str,
418 type_debug: Arc<String>,
419 task_id: TaskId,
420 },
421}
422
423impl<Bkend, Frntend, Md: PartialEq> TaskList<Frntend, Bkend, Md> {
424 pub(crate) fn new() -> Self {
425 Self { inner: vec![] }
426 }
427 pub(crate) async fn get_next_response(&mut self) -> Option<TaskOutcome<Frntend, Bkend, Md>> {
429 let task_completed = self
430 .inner
431 .iter_mut()
432 .enumerate()
433 .map(|(idx, task)| async move {
434 match task.receiver {
435 TaskWaiter::Future(ref mut receiver) => match receiver.await {
436 Ok(mutation) => (
437 Some(idx),
438 TaskOutcome::MutationReceived {
439 mutation,
440 type_id: task.type_id,
441 type_debug: task.type_debug.clone(),
442 task_id: task.task_id,
443 type_name: task.type_name,
444 },
445 ),
446 Err(error) => (
447 Some(idx),
448 TaskOutcome::TaskPanicked {
449 type_id: task.type_id,
450 type_name: task.type_name,
451 type_debug: task.type_debug.clone(),
452 task_id: task.task_id,
453 error,
454 },
455 ),
456 },
457 TaskWaiter::Stream {
458 ref mut receiver,
459 ref mut join_handle,
460 } => {
461 if let Some(mutation) = receiver.recv().await {
462 return (
463 None,
464 TaskOutcome::MutationReceived {
465 mutation,
466 type_id: task.type_id,
467 type_name: task.type_name,
468 task_id: task.task_id,
469 type_debug: task.type_debug.clone(),
470 },
471 );
472 };
473 match join_handle.await {
474 Err(error) if error.is_panic() => (
475 Some(idx),
476 TaskOutcome::StreamPanicked {
477 error,
478 type_id: task.type_id,
479 type_name: task.type_name,
480 type_debug: task.type_debug.clone(),
481 task_id: task.task_id,
482 },
483 ),
484 _ => (
486 Some(idx),
487 TaskOutcome::StreamFinished {
488 type_id: task.type_id,
489 type_name: task.type_name,
490 type_debug: task.type_debug.clone(),
491 task_id: task.task_id,
492 },
493 ),
494 }
495 }
496 }
497 })
498 .collect::<FuturesUnordered<_>>()
499 .next()
500 .await;
501 let (maybe_completed_idx, outcome) = task_completed?;
502 if let Some(completed_idx) = maybe_completed_idx {
503 self.inner.swap_remove(completed_idx);
506 };
507 Some(outcome)
508 }
509 pub(crate) fn push(&mut self, task: SpawnedTask<Frntend, Bkend, Md>) {
510 self.inner.push(task)
511 }
512 pub(crate) fn handle_constraint(&mut self, constraint: Constraint<Md>, type_id: TypeId) {
514 let task_doesnt_match_constraint = |task: &SpawnedTask<_, _, _>| task.type_id != type_id;
519 let task_doesnt_match_metadata =
520 |task: &SpawnedTask<_, _, _>, constraint| !task.metadata.contains(constraint);
521 match constraint.constraint_type {
522 ConstraitType::BlockMatchingMetatdata(metadata) => self
523 .inner
524 .retain(|task| task_doesnt_match_metadata(task, &metadata)),
525 ConstraitType::BlockSameType => {
526 self.inner.retain(task_doesnt_match_constraint);
527 }
528 ConstraitType::KillSameType => self.inner.retain_mut(|task| {
529 if !task_doesnt_match_constraint(task) {
530 task.receiver.kill();
531 return false;
532 }
533 true
534 }),
535 }
536 }
537}
538
539impl<Cstrnt> Constraint<Cstrnt> {
540 pub fn new_block_same_type() -> Self {
541 Self {
542 constraint_type: ConstraitType::BlockSameType,
543 }
544 }
545 pub fn new_kill_same_type() -> Self {
546 Self {
547 constraint_type: ConstraitType::KillSameType,
548 }
549 }
550 pub fn new_block_matching_metadata(metadata: Cstrnt) -> Self {
551 Self {
552 constraint_type: ConstraitType::BlockMatchingMetatdata(metadata),
553 }
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 use crate::{AsyncTask, BackendStreamingTask, BackendTask};
560 use futures::StreamExt;
561 #[derive(Debug)]
562 struct Task1;
563 #[derive(Debug)]
564 struct Task2;
565 #[derive(Debug)]
566 struct StreamingTask;
567 impl BackendTask<()> for Task1 {
568 type Output = ();
569 type MetadataType = ();
570 #[allow(clippy::manual_async_fn)]
571 fn into_future(
572 self,
573 _: &(),
574 ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
575 async {}
576 }
577 }
578 impl BackendTask<()> for Task2 {
579 type Output = ();
580 type MetadataType = ();
581 #[allow(clippy::manual_async_fn)]
582 fn into_future(
583 self,
584 _: &(),
585 ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
586 async {}
587 }
588 }
589 impl BackendStreamingTask<()> for StreamingTask {
590 type Output = ();
591 type MetadataType = ();
592 fn into_stream(
593 self,
594 _: &(),
595 ) -> impl futures::Stream<Item = Self::Output> + Send + Unpin + 'static {
596 futures::stream::once(async move {}).boxed()
597 }
598 }
599 #[tokio::test]
600 async fn test_recursive_map() {
601 let recursive_task = AsyncTask::new_stream_chained(
602 StreamingTask,
603 |_: &mut (), _| {
604 AsyncTask::new_future_chained(
605 Task1,
606 |_: &mut (), _| AsyncTask::new_future(Task2, |_: &mut (), _| {}, None),
607 None,
608 )
609 },
610 None,
611 );
612 #[allow(unused_must_use)]
615 let _ = recursive_task.map(|tmp: &mut ()| tmp);
616 }
617}