1use crate::{
2 BackendStreamingTask, BackendTask, DynFutureTask, DynMutationFuture, DynMutationStream,
3 DynStateMutation, DynStreamTask, TaskId,
4};
5use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
6use std::{
7 any::{type_name, TypeId},
8 fmt::Debug,
9 sync::Arc,
10};
11use tokio::{
12 sync::mpsc,
13 task::{AbortHandle, JoinError, JoinHandle},
14};
15
16#[must_use = "AsyncTasks do nothing unless you run them"]
19pub struct AsyncTask<Frntend, Bkend, Md> {
20 pub(crate) task: AsyncTaskKind<Frntend, Bkend, Md>,
21 pub(crate) constraint: Option<Constraint<Md>>,
22 pub(crate) metadata: Vec<Md>,
23}
24
25pub(crate) enum AsyncTaskKind<Frntend, Bkend, Md> {
26 Future(FutureTask<Frntend, Bkend, Md>),
27 Stream(StreamTask<Frntend, Bkend, Md>),
28 Multi(Vec<AsyncTask<Frntend, Bkend, Md>>),
29 NoOp,
30}
31
32pub(crate) struct StreamTask<Frntend, Bkend, Md> {
33 pub(crate) task: DynStreamTask<Frntend, Bkend, Md>,
34 pub(crate) type_id: TypeId,
35 pub(crate) type_name: &'static str,
36 pub(crate) type_debug: String,
37}
38
39pub(crate) struct FutureTask<Frntend, Bkend, Md> {
40 pub(crate) task: DynFutureTask<Frntend, Bkend, Md>,
41 pub(crate) type_id: TypeId,
42 pub(crate) type_name: &'static str,
43 pub(crate) type_debug: String,
44}
45
46impl<Frntend, Bkend, Md> FromIterator<AsyncTask<Frntend, Bkend, Md>>
47 for AsyncTask<Frntend, Bkend, Md>
48{
49 fn from_iter<T: IntoIterator<Item = AsyncTask<Frntend, Bkend, Md>>>(iter: T) -> Self {
50 let v = iter.into_iter().collect();
51 AsyncTask {
53 task: AsyncTaskKind::Multi(v),
54 constraint: None,
55 metadata: vec![],
56 }
57 }
58}
59
60impl<Frntend, Bkend, Md> AsyncTask<Frntend, Bkend, Md> {
61 pub fn push(self, next: AsyncTask<Frntend, Bkend, Md>) -> AsyncTask<Frntend, Bkend, Md> {
62 match self.task {
63 AsyncTaskKind::Future(_) | AsyncTaskKind::Stream(_) => {
64 let v = vec![self, next];
65 AsyncTask {
66 task: AsyncTaskKind::Multi(v),
67 constraint: None,
68 metadata: vec![],
69 }
70 }
71 AsyncTaskKind::Multi(mut m) => {
72 m.push(next);
73 AsyncTask {
74 task: AsyncTaskKind::Multi(m),
75 constraint: self.constraint,
76 metadata: self.metadata,
77 }
78 }
79 AsyncTaskKind::NoOp => next,
80 }
81 }
82 pub fn is_no_op(&self) -> bool {
83 matches!(self.task, AsyncTaskKind::NoOp)
84 }
85 pub fn new_no_op() -> AsyncTask<Frntend, Bkend, Md> {
86 Self {
87 task: AsyncTaskKind::NoOp,
88 constraint: None,
89 metadata: vec![],
90 }
91 }
92 pub fn new_future<R>(
93 request: R,
94 handler: impl FnOnce(&mut Frntend, R::Output) + Send + 'static,
95 constraint: Option<Constraint<Md>>,
96 ) -> AsyncTask<Frntend, Bkend, Md>
97 where
98 R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
99 Bkend: 'static,
100 Frntend: 'static,
101 {
102 let metadata = R::metadata();
103 let type_id = request.type_id();
104 let type_name = type_name::<R>();
105 let type_debug = format!("{:?}", request);
106 let task = Box::new(move |b: &Bkend| {
107 Box::new({
108 let future = request.into_future(b);
109 Box::pin(async move {
110 let output = future.await;
111 Box::new(move |frontend: &mut Frntend| {
112 handler(frontend, output);
113 AsyncTask::new_no_op()
114 }) as DynStateMutation<Frntend, Bkend, Md>
115 })
116 }) as DynMutationFuture<Frntend, Bkend, Md>
117 }) as DynFutureTask<Frntend, Bkend, Md>;
118 let task = FutureTask {
119 task,
120 type_id,
121 type_name,
122 type_debug,
123 };
124 AsyncTask {
125 task: AsyncTaskKind::Future(task),
126 constraint,
127 metadata,
128 }
129 }
130 pub fn new_future_chained<R>(
131 request: R,
132 handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md> + Send + 'static,
133 constraint: Option<Constraint<Md>>,
134 ) -> AsyncTask<Frntend, Bkend, Md>
135 where
136 R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
137 Bkend: 'static,
138 Frntend: 'static,
139 {
140 let metadata = R::metadata();
141 let type_id = request.type_id();
142 let type_name = type_name::<R>();
143 let type_debug = format!("{:?}", request);
144 let task = Box::new(move |b: &Bkend| {
145 Box::new({
146 let future = request.into_future(b);
147 Box::pin(async move {
148 let output = future.await;
149 Box::new(move |frontend: &mut Frntend| handler(frontend, output))
150 as DynStateMutation<Frntend, Bkend, Md>
151 })
152 }) as DynMutationFuture<Frntend, Bkend, Md>
153 }) as DynFutureTask<Frntend, Bkend, Md>;
154 let task = FutureTask {
155 task,
156 type_id,
157 type_name,
158 type_debug,
159 };
160 AsyncTask {
161 task: AsyncTaskKind::Future(task),
162 constraint,
163 metadata,
164 }
165 }
166 pub fn new_stream<R>(
167 request: R,
168 handler: impl FnOnce(&mut Frntend, R::Output) + Send + Clone + 'static,
170 constraint: Option<Constraint<Md>>,
171 ) -> AsyncTask<Frntend, Bkend, Md>
172 where
173 R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
174 Bkend: 'static,
175 Frntend: 'static,
176 {
177 let metadata = R::metadata();
178 let type_id = request.type_id();
179 let type_name = type_name::<R>();
180 let type_debug = format!("{:?}", request);
181 let task = Box::new(move |b: &Bkend| {
182 let stream = request.into_stream(b);
183 Box::new({
184 stream.map(move |output| {
185 Box::new({
186 let handler = handler.clone();
187 move |frontend: &mut Frntend| {
188 handler.clone()(frontend, output);
189 AsyncTask::new_no_op()
190 }
191 }) as DynStateMutation<Frntend, Bkend, Md>
192 })
193 }) as DynMutationStream<Frntend, Bkend, Md>
194 }) as DynStreamTask<Frntend, Bkend, Md>;
195 let task = StreamTask {
196 task,
197 type_id,
198 type_name,
199 type_debug,
200 };
201 AsyncTask {
202 task: AsyncTaskKind::Stream(task),
203 constraint,
204 metadata,
205 }
206 }
207 pub fn new_stream_chained<R>(
208 request: R,
209 handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md>
211 + Send
212 + Clone
213 + 'static,
214 constraint: Option<Constraint<Md>>,
215 ) -> AsyncTask<Frntend, Bkend, Md>
216 where
217 R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
218 Bkend: 'static,
219 Frntend: 'static,
220 {
221 let metadata = R::metadata();
222 let type_id = request.type_id();
223 let type_name = type_name::<R>();
224 let type_debug = format!("{:?}", request);
225 let task = Box::new(move |b: &Bkend| {
226 let stream = request.into_stream(b);
227 Box::new({
228 stream.map(move |output| {
229 Box::new({
230 let handler = handler.clone();
231 move |frontend: &mut Frntend| handler.clone()(frontend, output)
232 }) as DynStateMutation<Frntend, Bkend, Md>
233 })
234 }) as DynMutationStream<Frntend, Bkend, Md>
235 }) as DynStreamTask<Frntend, Bkend, Md>;
236 let task = StreamTask {
237 task,
238 type_id,
239 type_name,
240 type_debug,
241 };
242 AsyncTask {
243 task: AsyncTaskKind::Stream(task),
244 constraint,
245 metadata,
246 }
247 }
248 pub fn map<NewFrntend>(
252 self,
253 f: impl Fn(&mut NewFrntend) -> &mut Frntend + Clone + Send + 'static,
254 ) -> AsyncTask<NewFrntend, Bkend, Md>
255 where
256 Bkend: 'static,
257 Frntend: 'static,
258 Md: 'static,
259 {
260 let Self {
261 task,
262 constraint,
263 metadata,
264 } = self;
265 match task {
266 AsyncTaskKind::Future(FutureTask {
267 task,
268 type_id,
269 type_name,
270 type_debug,
271 }) => {
272 let task = Box::new(|b: &Bkend| {
273 Box::new(task(b).map(|task| {
274 Box::new(|nf: &mut NewFrntend| {
275 let task = task(f(nf));
276 task.map(f)
277 }) as DynStateMutation<NewFrntend, Bkend, Md>
278 })) as DynMutationFuture<NewFrntend, Bkend, Md>
279 }) as DynFutureTask<NewFrntend, Bkend, Md>;
280 let task = FutureTask {
281 task,
282 type_id,
283 type_name,
284 type_debug,
285 };
286 AsyncTask {
287 task: AsyncTaskKind::Future(task),
288 constraint,
289 metadata,
290 }
291 }
292 AsyncTaskKind::Stream(StreamTask {
293 task,
294 type_id,
295 type_name,
296 type_debug,
297 }) => {
298 let task = Box::new(|b: &Bkend| {
299 Box::new({
300 task(b).map(move |task| {
301 Box::new({
302 let f = f.clone();
303 move |nf: &mut NewFrntend| {
304 let task = task(f(nf));
305 task.map(f.clone())
306 }
307 })
308 as DynStateMutation<NewFrntend, Bkend, Md>
309 })
310 }) as DynMutationStream<NewFrntend, Bkend, Md>
311 }) as DynStreamTask<NewFrntend, Bkend, Md>;
312 let stream_task = StreamTask {
313 task,
314 type_id,
315 type_name,
316 type_debug,
317 };
318 AsyncTask {
319 task: AsyncTaskKind::Stream(stream_task),
320 constraint,
321 metadata,
322 }
323 }
324 AsyncTaskKind::NoOp => AsyncTask {
325 task: AsyncTaskKind::NoOp,
326 constraint,
327 metadata,
328 },
329 AsyncTaskKind::Multi(v) => {
330 let mapped = v.into_iter().map(|task| task.map(f.clone())).collect();
331 AsyncTask {
332 task: AsyncTaskKind::Multi(mapped),
333 constraint,
334 metadata,
335 }
336 }
337 }
338 }
339}
340
341pub(crate) struct TaskList<Bkend, Frntend, Md> {
342 pub inner: Vec<SpawnedTask<Bkend, Frntend, Md>>,
343}
344
345pub(crate) struct SpawnedTask<Frntend, Bkend, Md> {
346 pub(crate) type_id: TypeId,
347 pub(crate) type_name: &'static str,
348 pub(crate) type_debug: Arc<String>,
349 pub(crate) receiver: TaskWaiter<Frntend, Bkend, Md>,
350 pub(crate) task_id: TaskId,
351 pub(crate) metadata: Vec<Md>,
352}
353
354#[derive(Debug, Clone)]
356pub struct TaskInformation<'a, Cstrnt> {
357 pub type_id: TypeId,
358 pub type_name: &'static str,
359 pub type_debug: &'a str,
360 pub constraint: &'a Option<Constraint<Cstrnt>>,
361}
362
363#[derive(Eq, PartialEq, Debug)]
364pub struct Constraint<Cstrnt> {
365 pub(crate) constraint_type: ConstraitType<Cstrnt>,
366}
367
368#[derive(Eq, PartialEq, Debug)]
369pub enum ConstraitType<Cstrnt> {
370 BlockSameType,
371 KillSameType,
372 BlockMatchingMetatdata(Cstrnt),
373}
374
375pub(crate) enum TaskWaiter<Frntend, Bkend, Md> {
376 Future(JoinHandle<DynStateMutation<Frntend, Bkend, Md>>),
377 Stream {
378 receiver: mpsc::Receiver<DynStateMutation<Frntend, Bkend, Md>>,
379 abort_handle: AbortHandle,
380 },
381}
382
383impl<Frntend, Bkend, Md> TaskWaiter<Frntend, Bkend, Md> {
384 fn kill(&mut self) {
385 match self {
386 TaskWaiter::Future(handle) => handle.abort(),
387 TaskWaiter::Stream {
388 abort_handle: abort,
389 ..
390 } => abort.abort(),
391 }
392 }
393}
394
395pub enum TaskOutcome<Frntend, Bkend, Md> {
396 StreamClosed,
399 TaskPanicked {
403 error: JoinError,
404 type_id: TypeId,
405 type_name: &'static str,
406 type_debug: Arc<String>,
407 task_id: TaskId,
408 },
409 MutationReceived {
411 mutation: DynStateMutation<Frntend, Bkend, Md>,
412 type_id: TypeId,
413 type_name: &'static str,
414 type_debug: Arc<String>,
415 task_id: TaskId,
416 },
417}
418
419impl<Bkend, Frntend, Md: PartialEq> TaskList<Frntend, Bkend, Md> {
420 pub(crate) fn new() -> Self {
421 Self { inner: vec![] }
422 }
423 pub(crate) async fn get_next_response(&mut self) -> Option<TaskOutcome<Frntend, Bkend, Md>> {
425 let task_completed = self
426 .inner
427 .iter_mut()
428 .enumerate()
429 .map(|(idx, task)| async move {
430 match task.receiver {
431 TaskWaiter::Future(ref mut receiver) => match receiver.await {
432 Ok(mutation) => (
433 Some(idx),
434 TaskOutcome::MutationReceived {
435 mutation,
436 type_id: task.type_id,
437 type_debug: task.type_debug.clone(),
438 task_id: task.task_id,
439 type_name: task.type_name,
440 },
441 ),
442 Err(error) => (
443 Some(idx),
444 TaskOutcome::TaskPanicked {
445 type_id: task.type_id,
446 type_name: task.type_name,
447 type_debug: task.type_debug.clone(),
448 task_id: task.task_id,
449 error,
450 },
451 ),
452 },
453 TaskWaiter::Stream {
454 ref mut receiver, ..
455 } => {
456 if let Some(mutation) = receiver.recv().await {
457 return (
458 None,
459 TaskOutcome::MutationReceived {
460 mutation,
461 type_id: task.type_id,
462 type_name: task.type_name,
463 task_id: task.task_id,
464 type_debug: task.type_debug.clone(),
465 },
466 );
467 }
468 (Some(idx), TaskOutcome::StreamClosed)
469 }
470 }
471 })
472 .collect::<FuturesUnordered<_>>()
473 .next()
474 .await;
475 let (maybe_completed_id, outcome) = task_completed?;
476 if let Some(completed_id) = maybe_completed_id {
477 self.inner.swap_remove(completed_id);
480 };
481 Some(outcome)
482 }
483 pub(crate) fn push(&mut self, task: SpawnedTask<Frntend, Bkend, Md>) {
484 self.inner.push(task)
485 }
486 pub(crate) fn handle_constraint(&mut self, constraint: Constraint<Md>, type_id: TypeId) {
488 let task_doesnt_match_constraint = |task: &SpawnedTask<_, _, _>| (task.type_id != type_id);
493 let task_doesnt_match_metadata =
494 |task: &SpawnedTask<_, _, _>, constraint| !task.metadata.contains(constraint);
495 match constraint.constraint_type {
496 ConstraitType::BlockMatchingMetatdata(metadata) => self
497 .inner
498 .retain(|task| task_doesnt_match_metadata(task, &metadata)),
499 ConstraitType::BlockSameType => {
500 self.inner.retain(task_doesnt_match_constraint);
501 }
502 ConstraitType::KillSameType => self.inner.retain_mut(|task| {
503 if !task_doesnt_match_constraint(task) {
504 task.receiver.kill();
505 return false;
506 }
507 true
508 }),
509 }
510 }
511}
512
513impl<Cstrnt> Constraint<Cstrnt> {
514 pub fn new_block_same_type() -> Self {
515 Self {
516 constraint_type: ConstraitType::BlockSameType,
517 }
518 }
519 pub fn new_kill_same_type() -> Self {
520 Self {
521 constraint_type: ConstraitType::KillSameType,
522 }
523 }
524 pub fn new_block_matching_metadata(metadata: Cstrnt) -> Self {
525 Self {
526 constraint_type: ConstraitType::BlockMatchingMetatdata(metadata),
527 }
528 }
529}
530
531#[cfg(test)]
532mod tests {
533 use futures::StreamExt;
534
535 use crate::{AsyncTask, BackendStreamingTask, BackendTask};
536 #[derive(Debug)]
537 struct Task1;
538 #[derive(Debug)]
539 struct Task2;
540 #[derive(Debug)]
541 struct StreamingTask;
542 impl BackendTask<()> for Task1 {
543 type Output = ();
544 type MetadataType = ();
545 #[allow(clippy::manual_async_fn)]
546 fn into_future(
547 self,
548 _: &(),
549 ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
550 async {}
551 }
552 }
553 impl BackendTask<()> for Task2 {
554 type Output = ();
555 type MetadataType = ();
556 #[allow(clippy::manual_async_fn)]
557 fn into_future(
558 self,
559 _: &(),
560 ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
561 async {}
562 }
563 }
564 impl BackendStreamingTask<()> for StreamingTask {
565 type Output = ();
566 type MetadataType = ();
567 fn into_stream(
568 self,
569 _: &(),
570 ) -> impl futures::Stream<Item = Self::Output> + Send + Unpin + 'static {
571 futures::stream::once(async move {}).boxed()
572 }
573 }
574 #[tokio::test]
575 async fn test_recursive_map() {
576 let recursive_task = AsyncTask::new_stream_chained(
577 StreamingTask,
578 |_: &mut (), _| {
579 AsyncTask::new_future_chained(
580 Task1,
581 |_: &mut (), _| AsyncTask::new_future(Task2, |_: &mut (), _| {}, None),
582 None,
583 )
584 },
585 None,
586 );
587 #[allow(unused_must_use)]
590 let _ = recursive_task.map(|tmp: &mut ()| tmp);
591 }
592}