1use crate::{
2 BackendStreamingTask, BackendTask, DynFutureTask, DynMutationFuture, DynMutationStream,
3 DynStateMutation, DynStreamTask, TaskId,
4};
5use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
6use std::{
7 any::{type_name, TypeId},
8 fmt::Debug,
9 sync::Arc,
10};
11use tokio::{
12 sync::mpsc,
13 task::{AbortHandle, JoinError, JoinHandle},
14};
15
16#[must_use = "AsyncTasks do nothing unless you run them"]
19pub struct AsyncTask<Frntend, Bkend, Md> {
20 pub(crate) task: AsyncTaskKind<Frntend, Bkend, Md>,
21 pub(crate) constraint: Option<Constraint<Md>>,
22 pub(crate) metadata: Vec<Md>,
23}
24
25pub(crate) enum AsyncTaskKind<Frntend, Bkend, Md> {
26 Future(FutureTask<Frntend, Bkend, Md>),
27 Stream(StreamTask<Frntend, Bkend, Md>),
28 Multi(Vec<AsyncTask<Frntend, Bkend, Md>>),
29 NoOp,
30}
31
32pub(crate) struct StreamTask<Frntend, Bkend, Md> {
33 pub(crate) task: DynStreamTask<Frntend, Bkend, Md>,
34 pub(crate) type_id: TypeId,
35 pub(crate) type_name: &'static str,
36 pub(crate) type_debug: String,
37}
38
39pub(crate) struct FutureTask<Frntend, Bkend, Md> {
40 pub(crate) task: DynFutureTask<Frntend, Bkend, Md>,
41 pub(crate) type_id: TypeId,
42 pub(crate) type_name: &'static str,
43 pub(crate) type_debug: String,
44}
45
46impl<Frntend, Bkend, Md> FromIterator<AsyncTask<Frntend, Bkend, Md>>
47 for AsyncTask<Frntend, Bkend, Md>
48{
49 fn from_iter<T: IntoIterator<Item = AsyncTask<Frntend, Bkend, Md>>>(iter: T) -> Self {
50 let v = iter.into_iter().collect();
51 AsyncTask {
53 task: AsyncTaskKind::Multi(v),
54 constraint: None,
55 metadata: vec![],
56 }
57 }
58}
59
60impl<Frntend, Bkend, Md> AsyncTask<Frntend, Bkend, Md> {
61 pub fn push(self, next: AsyncTask<Frntend, Bkend, Md>) -> AsyncTask<Frntend, Bkend, Md> {
62 match self.task {
63 AsyncTaskKind::Future(_) | AsyncTaskKind::Stream(_) => {
64 let v = vec![self, next];
65 AsyncTask {
66 task: AsyncTaskKind::Multi(v),
67 constraint: None,
68 metadata: vec![],
69 }
70 }
71 AsyncTaskKind::Multi(mut m) => {
72 m.push(next);
73 AsyncTask {
74 task: AsyncTaskKind::Multi(m),
75 constraint: self.constraint,
76 metadata: self.metadata,
77 }
78 }
79 AsyncTaskKind::NoOp => next,
80 }
81 }
82 pub fn new_no_op() -> AsyncTask<Frntend, Bkend, Md> {
83 Self {
84 task: AsyncTaskKind::NoOp,
85 constraint: None,
86 metadata: vec![],
87 }
88 }
89 pub fn new_future<R>(
90 request: R,
91 handler: impl FnOnce(&mut Frntend, R::Output) + Send + 'static,
92 constraint: Option<Constraint<Md>>,
93 ) -> AsyncTask<Frntend, Bkend, Md>
94 where
95 R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
96 Bkend: 'static,
97 Frntend: 'static,
98 {
99 let metadata = R::metadata();
100 let type_id = request.type_id();
101 let type_name = type_name::<R>();
102 let type_debug = format!("{:?}", request);
103 let task = Box::new(move |b: &Bkend| {
104 Box::new({
105 let future = request.into_future(b);
106 Box::pin(async move {
107 let output = future.await;
108 Box::new(move |frontend: &mut Frntend| {
109 handler(frontend, output);
110 AsyncTask::new_no_op()
111 }) as DynStateMutation<Frntend, Bkend, Md>
112 })
113 }) as DynMutationFuture<Frntend, Bkend, Md>
114 }) as DynFutureTask<Frntend, Bkend, Md>;
115 let task = FutureTask {
116 task,
117 type_id,
118 type_name,
119 type_debug,
120 };
121 AsyncTask {
122 task: AsyncTaskKind::Future(task),
123 constraint,
124 metadata,
125 }
126 }
127 pub fn new_future_chained<R>(
128 request: R,
129 handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md> + Send + 'static,
130 constraint: Option<Constraint<Md>>,
131 ) -> AsyncTask<Frntend, Bkend, Md>
132 where
133 R: BackendTask<Bkend, MetadataType = Md> + Debug + 'static,
134 Bkend: 'static,
135 Frntend: 'static,
136 {
137 let metadata = R::metadata();
138 let type_id = request.type_id();
139 let type_name = type_name::<R>();
140 let type_debug = format!("{:?}", request);
141 let task = Box::new(move |b: &Bkend| {
142 Box::new({
143 let future = request.into_future(b);
144 Box::pin(async move {
145 let output = future.await;
146 Box::new(move |frontend: &mut Frntend| handler(frontend, output))
147 as DynStateMutation<Frntend, Bkend, Md>
148 })
149 }) as DynMutationFuture<Frntend, Bkend, Md>
150 }) as DynFutureTask<Frntend, Bkend, Md>;
151 let task = FutureTask {
152 task,
153 type_id,
154 type_name,
155 type_debug,
156 };
157 AsyncTask {
158 task: AsyncTaskKind::Future(task),
159 constraint,
160 metadata,
161 }
162 }
163 pub fn new_stream<R>(
164 request: R,
165 handler: impl FnOnce(&mut Frntend, R::Output) + Send + Clone + 'static,
167 constraint: Option<Constraint<Md>>,
168 ) -> AsyncTask<Frntend, Bkend, Md>
169 where
170 R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
171 Bkend: 'static,
172 Frntend: 'static,
173 {
174 let metadata = R::metadata();
175 let type_id = request.type_id();
176 let type_name = type_name::<R>();
177 let type_debug = format!("{:?}", request);
178 let task = Box::new(move |b: &Bkend| {
179 let stream = request.into_stream(b);
180 Box::new({
181 stream.map(move |output| {
182 Box::new({
183 let handler = handler.clone();
184 move |frontend: &mut Frntend| {
185 handler.clone()(frontend, output);
186 AsyncTask::new_no_op()
187 }
188 }) as DynStateMutation<Frntend, Bkend, Md>
189 })
190 }) as DynMutationStream<Frntend, Bkend, Md>
191 }) as DynStreamTask<Frntend, Bkend, Md>;
192 let task = StreamTask {
193 task,
194 type_id,
195 type_name,
196 type_debug,
197 };
198 AsyncTask {
199 task: AsyncTaskKind::Stream(task),
200 constraint,
201 metadata,
202 }
203 }
204 pub fn new_stream_chained<R>(
205 request: R,
206 handler: impl FnOnce(&mut Frntend, R::Output) -> AsyncTask<Frntend, Bkend, Md>
208 + Send
209 + Clone
210 + 'static,
211 constraint: Option<Constraint<Md>>,
212 ) -> AsyncTask<Frntend, Bkend, Md>
213 where
214 R: BackendStreamingTask<Bkend, MetadataType = Md> + Debug + 'static,
215 Bkend: 'static,
216 Frntend: 'static,
217 {
218 let metadata = R::metadata();
219 let type_id = request.type_id();
220 let type_name = type_name::<R>();
221 let type_debug = format!("{:?}", request);
222 let task = Box::new(move |b: &Bkend| {
223 let stream = request.into_stream(b);
224 Box::new({
225 stream.map(move |output| {
226 Box::new({
227 let handler = handler.clone();
228 move |frontend: &mut Frntend| handler.clone()(frontend, output)
229 }) as DynStateMutation<Frntend, Bkend, Md>
230 })
231 }) as DynMutationStream<Frntend, Bkend, Md>
232 }) as DynStreamTask<Frntend, Bkend, Md>;
233 let task = StreamTask {
234 task,
235 type_id,
236 type_name,
237 type_debug,
238 };
239 AsyncTask {
240 task: AsyncTaskKind::Stream(task),
241 constraint,
242 metadata,
243 }
244 }
245 pub fn map<NewFrntend>(
249 self,
250 f: impl Fn(&mut NewFrntend) -> &mut Frntend + Clone + Send + 'static,
251 ) -> AsyncTask<NewFrntend, Bkend, Md>
252 where
253 Bkend: 'static,
254 Frntend: 'static,
255 Md: 'static,
256 {
257 let Self {
258 task,
259 constraint,
260 metadata,
261 } = self;
262 match task {
263 AsyncTaskKind::Future(FutureTask {
264 task,
265 type_id,
266 type_name,
267 type_debug,
268 }) => {
269 let task = Box::new(|b: &Bkend| {
270 Box::new(task(b).map(|task| {
271 Box::new(|nf: &mut NewFrntend| {
272 let task = task(f(nf));
273 task.map(f)
274 }) as DynStateMutation<NewFrntend, Bkend, Md>
275 })) as DynMutationFuture<NewFrntend, Bkend, Md>
276 }) as DynFutureTask<NewFrntend, Bkend, Md>;
277 let task = FutureTask {
278 task,
279 type_id,
280 type_name,
281 type_debug,
282 };
283 AsyncTask {
284 task: AsyncTaskKind::Future(task),
285 constraint,
286 metadata,
287 }
288 }
289 AsyncTaskKind::Stream(StreamTask {
290 task,
291 type_id,
292 type_name,
293 type_debug,
294 }) => {
295 let task = Box::new(|b: &Bkend| {
296 Box::new({
297 task(b).map(move |task| {
298 Box::new({
299 let f = f.clone();
300 move |nf: &mut NewFrntend| {
301 let task = task(f(nf));
302 task.map(f.clone())
303 }
304 })
305 as DynStateMutation<NewFrntend, Bkend, Md>
306 })
307 }) as DynMutationStream<NewFrntend, Bkend, Md>
308 }) as DynStreamTask<NewFrntend, Bkend, Md>;
309 let stream_task = StreamTask {
310 task,
311 type_id,
312 type_name,
313 type_debug,
314 };
315 AsyncTask {
316 task: AsyncTaskKind::Stream(stream_task),
317 constraint,
318 metadata,
319 }
320 }
321 AsyncTaskKind::NoOp => AsyncTask {
322 task: AsyncTaskKind::NoOp,
323 constraint,
324 metadata,
325 },
326 AsyncTaskKind::Multi(v) => {
327 let mapped = v.into_iter().map(|task| task.map(f.clone())).collect();
328 AsyncTask {
329 task: AsyncTaskKind::Multi(mapped),
330 constraint,
331 metadata,
332 }
333 }
334 }
335 }
336}
337
338pub(crate) struct TaskList<Bkend, Frntend, Md> {
339 pub inner: Vec<SpawnedTask<Bkend, Frntend, Md>>,
340}
341
342pub(crate) struct SpawnedTask<Frntend, Bkend, Md> {
343 pub(crate) type_id: TypeId,
344 pub(crate) type_name: &'static str,
345 pub(crate) type_debug: Arc<String>,
346 pub(crate) receiver: TaskWaiter<Frntend, Bkend, Md>,
347 pub(crate) task_id: TaskId,
348 pub(crate) metadata: Vec<Md>,
349}
350
351#[derive(Debug, Clone)]
353pub struct TaskInformation<'a, Cstrnt> {
354 pub type_id: TypeId,
355 pub type_name: &'static str,
356 pub type_debug: &'a str,
357 pub constraint: &'a Option<Constraint<Cstrnt>>,
358}
359
360#[derive(Eq, PartialEq, Debug)]
361pub struct Constraint<Cstrnt> {
362 pub(crate) constraint_type: ConstraitType<Cstrnt>,
363}
364
365#[derive(Eq, PartialEq, Debug)]
366pub enum ConstraitType<Cstrnt> {
367 BlockSameType,
368 KillSameType,
369 BlockMatchingMetatdata(Cstrnt),
370}
371
372pub(crate) enum TaskWaiter<Frntend, Bkend, Md> {
373 Future(JoinHandle<DynStateMutation<Frntend, Bkend, Md>>),
374 Stream {
375 receiver: mpsc::Receiver<DynStateMutation<Frntend, Bkend, Md>>,
376 abort_handle: AbortHandle,
377 },
378}
379
380impl<Frntend, Bkend, Md> TaskWaiter<Frntend, Bkend, Md> {
381 fn kill(&mut self) {
382 match self {
383 TaskWaiter::Future(handle) => handle.abort(),
384 TaskWaiter::Stream {
385 abort_handle: abort,
386 ..
387 } => abort.abort(),
388 }
389 }
390}
391
392pub enum TaskOutcome<Frntend, Bkend, Md> {
393 StreamClosed,
396 TaskPanicked {
400 error: JoinError,
401 type_id: TypeId,
402 type_name: &'static str,
403 type_debug: Arc<String>,
404 task_id: TaskId,
405 },
406 MutationReceived {
408 mutation: DynStateMutation<Frntend, Bkend, Md>,
409 type_id: TypeId,
410 type_name: &'static str,
411 type_debug: Arc<String>,
412 task_id: TaskId,
413 },
414}
415
416impl<Bkend, Frntend, Md: PartialEq> TaskList<Frntend, Bkend, Md> {
417 pub(crate) fn new() -> Self {
418 Self { inner: vec![] }
419 }
420 pub(crate) async fn get_next_response(&mut self) -> Option<TaskOutcome<Frntend, Bkend, Md>> {
422 let task_completed = self
423 .inner
424 .iter_mut()
425 .enumerate()
426 .map(|(idx, task)| async move {
427 match task.receiver {
428 TaskWaiter::Future(ref mut receiver) => match receiver.await {
429 Ok(mutation) => (
430 Some(idx),
431 TaskOutcome::MutationReceived {
432 mutation,
433 type_id: task.type_id,
434 type_debug: task.type_debug.clone(),
435 task_id: task.task_id,
436 type_name: task.type_name,
437 },
438 ),
439 Err(error) => (
440 Some(idx),
441 TaskOutcome::TaskPanicked {
442 type_id: task.type_id,
443 type_name: task.type_name,
444 type_debug: task.type_debug.clone(),
445 task_id: task.task_id,
446 error,
447 },
448 ),
449 },
450 TaskWaiter::Stream {
451 ref mut receiver, ..
452 } => {
453 if let Some(mutation) = receiver.recv().await {
454 return (
455 None,
456 TaskOutcome::MutationReceived {
457 mutation,
458 type_id: task.type_id,
459 type_name: task.type_name,
460 task_id: task.task_id,
461 type_debug: task.type_debug.clone(),
462 },
463 );
464 }
465 (Some(idx), TaskOutcome::StreamClosed)
466 }
467 }
468 })
469 .collect::<FuturesUnordered<_>>()
470 .next()
471 .await;
472 let (maybe_completed_id, outcome) = task_completed?;
473 if let Some(completed_id) = maybe_completed_id {
474 self.inner.swap_remove(completed_id);
477 };
478 Some(outcome)
479 }
480 pub(crate) fn push(&mut self, task: SpawnedTask<Frntend, Bkend, Md>) {
481 self.inner.push(task)
482 }
483 pub(crate) fn handle_constraint(&mut self, constraint: Constraint<Md>, type_id: TypeId) {
485 let task_doesnt_match_constraint = |task: &SpawnedTask<_, _, _>| (task.type_id != type_id);
490 let task_doesnt_match_metadata =
491 |task: &SpawnedTask<_, _, _>, constraint| !task.metadata.contains(constraint);
492 match constraint.constraint_type {
493 ConstraitType::BlockMatchingMetatdata(metadata) => self
494 .inner
495 .retain(|task| task_doesnt_match_metadata(task, &metadata)),
496 ConstraitType::BlockSameType => {
497 self.inner.retain(task_doesnt_match_constraint);
498 }
499 ConstraitType::KillSameType => self.inner.retain_mut(|task| {
500 if !task_doesnt_match_constraint(task) {
501 task.receiver.kill();
502 return false;
503 }
504 true
505 }),
506 }
507 }
508}
509
510impl<Cstrnt> Constraint<Cstrnt> {
511 pub fn new_block_same_type() -> Self {
512 Self {
513 constraint_type: ConstraitType::BlockSameType,
514 }
515 }
516 pub fn new_kill_same_type() -> Self {
517 Self {
518 constraint_type: ConstraitType::KillSameType,
519 }
520 }
521 pub fn new_block_matching_metadata(metadata: Cstrnt) -> Self {
522 Self {
523 constraint_type: ConstraitType::BlockMatchingMetatdata(metadata),
524 }
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use futures::StreamExt;
531
532 use crate::{AsyncTask, BackendStreamingTask, BackendTask};
533 #[derive(Debug)]
534 struct Task1;
535 #[derive(Debug)]
536 struct Task2;
537 #[derive(Debug)]
538 struct StreamingTask;
539 impl BackendTask<()> for Task1 {
540 type Output = ();
541 type MetadataType = ();
542 #[allow(clippy::manual_async_fn)]
543 fn into_future(
544 self,
545 _: &(),
546 ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
547 async {}
548 }
549 }
550 impl BackendTask<()> for Task2 {
551 type Output = ();
552 type MetadataType = ();
553 #[allow(clippy::manual_async_fn)]
554 fn into_future(
555 self,
556 _: &(),
557 ) -> impl std::future::Future<Output = Self::Output> + Send + 'static {
558 async {}
559 }
560 }
561 impl BackendStreamingTask<()> for StreamingTask {
562 type Output = ();
563 type MetadataType = ();
564 fn into_stream(
565 self,
566 _: &(),
567 ) -> impl futures::Stream<Item = Self::Output> + Send + Unpin + 'static {
568 futures::stream::once(async move {}).boxed()
569 }
570 }
571 #[tokio::test]
572 async fn test_recursive_map() {
573 let recursive_task = AsyncTask::new_stream_chained(
574 StreamingTask,
575 |_: &mut (), _| {
576 AsyncTask::new_future_chained(
577 Task1,
578 |_: &mut (), _| AsyncTask::new_future(Task2, |_: &mut (), _| {}, None),
579 None,
580 )
581 },
582 None,
583 );
584 #[allow(unused_must_use)]
587 let _ = recursive_task.map(|tmp: &mut ()| tmp);
588 }
589}