async_callback_manager/
task.rs

1use crate::task::dyn_task::{
2    FusedTask, IntoDynFutureTask, IntoDynStreamTask, OptionHandler, TryHandler,
3};
4use crate::task::map::{MapDynFutureTask, MapDynStreamTask};
5use crate::{BackendStreamingTask, BackendTask, Constraint, TaskHandler};
6use std::any::{TypeId, type_name};
7use std::boxed::Box;
8use std::fmt::Debug;
9
10pub mod dyn_task;
11mod map;
12#[cfg(test)]
13mod tests;
14
15/// An asynchrnonous task that can generate state mutations and/or more tasks to
16/// be spawned by an AsyncCallbackManager.
17#[must_use = "AsyncTasks do nothing unless you run them"]
18pub struct AsyncTask<Frntend, Bkend, Md> {
19    pub(crate) task: AsyncTaskKind<Frntend, Bkend, Md>,
20    pub(crate) constraint: Option<Constraint<Md>>,
21    pub(crate) metadata: Vec<Md>,
22}
23pub(crate) enum AsyncTaskKind<Frntend, Bkend, Md> {
24    Future(FutureTask<Frntend, Bkend, Md>),
25    Stream(StreamTask<Frntend, Bkend, Md>),
26    Multi(Vec<AsyncTask<Frntend, Bkend, Md>>),
27    NoOp,
28}
29pub(crate) struct FutureTask<Frntend, Bkend, Md> {
30    pub(crate) task: Box<dyn IntoDynFutureTask<Frntend, Bkend, Md>>,
31    pub(crate) type_id: TypeId,
32    pub(crate) type_name: &'static str,
33    pub(crate) type_debug: String,
34}
35pub(crate) struct StreamTask<Frntend, Bkend, Md> {
36    pub(crate) task: Box<dyn IntoDynStreamTask<Frntend, Bkend, Md>>,
37    pub(crate) type_id: TypeId,
38    pub(crate) type_name: &'static str,
39    pub(crate) type_debug: String,
40}
41
42// Allow conversion of () into no-op Task.
43impl<Frntend, Bkend, Md> From<()> for AsyncTask<Frntend, Bkend, Md> {
44    fn from(_: ()) -> Self {
45        AsyncTask::new_no_op()
46    }
47}
48
49// Debug must be implemented manually to remove Frntend, Bkend Debug bounds.
50impl<Frntend, Bkend, Md> Debug for AsyncTask<Frntend, Bkend, Md>
51where
52    Md: Debug,
53    AsyncTaskKind<Frntend, Bkend, Md>: Debug,
54{
55    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("AsyncTask")
57            .field("task", &self.task)
58            .field("constraint", &self.constraint)
59            .field("metadata", &self.metadata)
60            .finish()
61    }
62}
63// Debug must be implemented manually to remove Frntend, Bkend Debug bounds.
64impl<Frntend, Bkend, Md> Debug for AsyncTaskKind<Frntend, Bkend, Md>
65where
66    Md: Debug,
67    FutureTask<Frntend, Bkend, Md>: Debug,
68    StreamTask<Frntend, Bkend, Md>: Debug,
69{
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        match self {
72            Self::Future(arg0) => f.debug_tuple("Future").field(arg0).finish(),
73            Self::Stream(arg0) => f.debug_tuple("Stream").field(arg0).finish(),
74            Self::Multi(arg0) => f.debug_tuple("Multi").field(arg0).finish(),
75            Self::NoOp => write!(f, "NoOp"),
76        }
77    }
78}
79// Debug must be implemented manually to remove Frntend, Bkend Debug bounds.
80impl<Frntend, Bkend, Md> Debug for FutureTask<Frntend, Bkend, Md>
81where
82    dyn IntoDynFutureTask<Frntend, Bkend, Md>: Debug,
83{
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("FutureTask")
86            .field("task", &self.task)
87            .field("type_id", &self.type_id)
88            .field("type_name", &self.type_name)
89            .field("type_debug", &self.type_debug)
90            .finish()
91    }
92}
93// Debug must be implemented manually to remove Frntend, Bkend Debug bounds.
94impl<Frntend, Bkend, Md> Debug for StreamTask<Frntend, Bkend, Md>
95where
96    dyn IntoDynStreamTask<Frntend, Bkend, Md>: Debug,
97{
98    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
99        f.debug_struct("StreamTask")
100            .field("task", &self.task)
101            .field("type_id", &self.type_id)
102            .field("type_name", &self.type_name)
103            .field("type_debug", &self.type_debug)
104            .finish()
105    }
106}
107
108// PartialEq must be implemented manually to remove Frntend, Bkend PartialEq
109// bounds.
110impl<Frntend, Bkend, Md> PartialEq for AsyncTask<Frntend, Bkend, Md>
111where
112    Md: PartialEq + 'static,
113    Frntend: 'static,
114    Bkend: 'static,
115    AsyncTaskKind<Frntend, Bkend, Md>: PartialEq,
116{
117    fn eq(&self, other: &Self) -> bool {
118        self.task == other.task
119            && self.constraint == other.constraint
120            && self.metadata == other.metadata
121    }
122}
123// PartialEq must be implemented manually to remove Frntend, Bkend PartialEq
124// bounds.
125impl<Frntend, Bkend, Md> PartialEq for AsyncTaskKind<Frntend, Bkend, Md>
126where
127    Md: PartialEq + 'static,
128    Frntend: 'static,
129    Bkend: 'static,
130    FutureTask<Frntend, Bkend, Md>: PartialEq,
131    StreamTask<Frntend, Bkend, Md>: PartialEq,
132{
133    fn eq(&self, other: &Self) -> bool {
134        match (self, other) {
135            (Self::Future(l0), Self::Future(r0)) => l0 == r0,
136            (Self::Stream(l0), Self::Stream(r0)) => l0 == r0,
137            (Self::Multi(l0), Self::Multi(r0)) => l0 == r0,
138            (Self::NoOp, Self::NoOp) => true,
139            _ => false,
140        }
141    }
142}
143// PartialEq must be implemented manually to remove Frntend, Bkend PartialEq
144// bounds and use dyn_partial_eq function.
145#[cfg(feature = "task-equality")]
146impl<Frntend, Bkend, Md> PartialEq for FutureTask<Frntend, Bkend, Md>
147where
148    Frntend: 'static,
149    Bkend: 'static,
150    Md: 'static,
151{
152    fn eq(&self, other: &Self) -> bool {
153        self.task.dyn_partial_eq(other.task.as_ref())
154            && self.type_id == other.type_id
155            && self.type_name == other.type_name
156            && self.type_debug == other.type_debug
157    }
158}
159// PartialEq must be implemented manually to remove Frntend, Bkend PartialEq
160// bounds and use dyn_partial_eq function.
161#[cfg(feature = "task-equality")]
162impl<Frntend, Bkend, Md> PartialEq for StreamTask<Frntend, Bkend, Md>
163where
164    Frntend: 'static,
165    Bkend: 'static,
166    Md: 'static,
167{
168    fn eq(&self, other: &Self) -> bool {
169        self.task.dyn_partial_eq(other.task.as_ref())
170            && self.type_id == other.type_id
171            && self.type_name == other.type_name
172            && self.type_debug == other.type_debug
173    }
174}
175
176impl<Frntend, Bkend, Md> FromIterator<AsyncTask<Frntend, Bkend, Md>>
177    for AsyncTask<Frntend, Bkend, Md>
178{
179    fn from_iter<T: IntoIterator<Item = AsyncTask<Frntend, Bkend, Md>>>(iter: T) -> Self {
180        let v = iter.into_iter().collect();
181        // TODO: Better handle constraints / metadata.
182        AsyncTask {
183            task: AsyncTaskKind::Multi(v),
184            constraint: None,
185            metadata: vec![],
186        }
187    }
188}
189
190impl<Frntend, Bkend, Md> AsyncTask<Frntend, Bkend, Md>
191where
192    Md: PartialEq + 'static,
193    Frntend: 'static,
194    Bkend: 'static,
195    Self: PartialEq,
196{
197    /// Assert that this effect contains at least other effect (it may contain
198    /// multiple effects).
199    pub fn contains(&self, other: &AsyncTask<Frntend, Bkend, Md>) -> bool {
200        match &self.task {
201            AsyncTaskKind::Multi(self_tasks) => {
202                // Contains is used here to guard against nested multi tasks
203                self_tasks.iter().any(|self_task| self_task.contains(other))
204            }
205            _ => self == other,
206        }
207    }
208}
209
210impl<Frntend, Bkend, Md> AsyncTask<Frntend, Bkend, Md> {
211    pub fn push(self, next: AsyncTask<Frntend, Bkend, Md>) -> AsyncTask<Frntend, Bkend, Md> {
212        match self.task {
213            AsyncTaskKind::Future(_) | AsyncTaskKind::Stream(_) => {
214                let v = vec![self, next];
215                AsyncTask {
216                    task: AsyncTaskKind::Multi(v),
217                    constraint: None,
218                    metadata: vec![],
219                }
220            }
221            AsyncTaskKind::Multi(mut m) => {
222                m.push(next);
223                AsyncTask {
224                    task: AsyncTaskKind::Multi(m),
225                    constraint: self.constraint,
226                    metadata: self.metadata,
227                }
228            }
229            AsyncTaskKind::NoOp => next,
230        }
231    }
232    pub fn is_no_op(&self) -> bool {
233        matches!(self.task, AsyncTaskKind::NoOp)
234    }
235    pub fn new_no_op() -> AsyncTask<Frntend, Bkend, Md> {
236        Self {
237            task: AsyncTaskKind::NoOp,
238            constraint: None,
239            metadata: vec![],
240        }
241    }
242    pub fn new_future<R>(
243        request: R,
244        handler: impl TaskHandler<R::Output, Frntend, Bkend, Md> + Send + 'static,
245        constraint: Option<Constraint<Md>>,
246    ) -> AsyncTask<Frntend, Bkend, Md>
247    where
248        R: BackendTask<Bkend, MetadataType = Md> + Send + Debug + 'static,
249        Bkend: 'static,
250        Frntend: 'static,
251    {
252        let metadata = R::metadata();
253        let type_id = request.type_id();
254        let type_name = type_name::<R>();
255        let type_debug = format!("{request:?}");
256        let task = Box::new(FusedTask::new_future(request, handler));
257        let task = FutureTask {
258            task,
259            type_id,
260            type_name,
261            type_debug,
262        };
263        AsyncTask {
264            task: AsyncTaskKind::Future(task),
265            constraint,
266            metadata,
267        }
268    }
269    pub fn new_future_try<R, T, E>(
270        request: R,
271        ok_handler: impl TaskHandler<T, Frntend, Bkend, Md> + Send + 'static,
272        err_handler: impl TaskHandler<E, Frntend, Bkend, Md> + Send + 'static,
273        constraint: Option<Constraint<Md>>,
274    ) -> AsyncTask<Frntend, Bkend, Md>
275    where
276        R: BackendTask<Bkend, MetadataType = Md, Output = Result<T, E>> + Send + Debug + 'static,
277        Bkend: 'static,
278        Frntend: 'static,
279        E: 'static,
280        T: 'static,
281    {
282        let metadata = R::metadata();
283        let type_id = request.type_id();
284        let type_name = type_name::<R>();
285        let type_debug = format!("{request:?}");
286        let task = Box::new(FusedTask::new_future(
287            request,
288            TryHandler {
289                ok_handler,
290                err_handler,
291            },
292        ));
293        let task = FutureTask {
294            task,
295            type_id,
296            type_name,
297            type_debug,
298        };
299        AsyncTask {
300            task: AsyncTaskKind::Future(task),
301            constraint,
302            metadata,
303        }
304    }
305    pub fn new_future_option<R, T>(
306        request: R,
307        some_handler: impl TaskHandler<T, Frntend, Bkend, Md> + Send + 'static,
308        constraint: Option<Constraint<Md>>,
309    ) -> AsyncTask<Frntend, Bkend, Md>
310    where
311        R: BackendTask<Bkend, MetadataType = Md, Output = Option<T>> + Send + Debug + 'static,
312        Bkend: 'static,
313        Frntend: 'static,
314        T: 'static,
315    {
316        let metadata = R::metadata();
317        let type_id = request.type_id();
318        let type_name = type_name::<R>();
319        let type_debug = format!("{request:?}");
320        let task = Box::new(FusedTask::new_future(request, OptionHandler(some_handler)));
321        let task = FutureTask {
322            task,
323            type_id,
324            type_name,
325            type_debug,
326        };
327        AsyncTask {
328            task: AsyncTaskKind::Future(task),
329            constraint,
330            metadata,
331        }
332    }
333    pub fn new_stream<R>(
334        request: R,
335        // TODO: Review Clone bounds.
336        handler: impl TaskHandler<R::Output, Frntend, Bkend, Md> + Send + Clone + 'static,
337        constraint: Option<Constraint<Md>>,
338    ) -> AsyncTask<Frntend, Bkend, Md>
339    where
340        R: BackendStreamingTask<Bkend, MetadataType = Md> + Send + Debug + 'static,
341        Bkend: 'static,
342        Frntend: 'static,
343    {
344        let metadata = R::metadata();
345        let type_id = request.type_id();
346        let type_name = type_name::<R>();
347        let type_debug = format!("{request:?}");
348        let task = Box::new(FusedTask::new_stream(request, handler));
349        let task = StreamTask {
350            task,
351            type_id,
352            type_name,
353            type_debug,
354        };
355        AsyncTask {
356            task: AsyncTaskKind::Stream(task),
357            constraint,
358            metadata,
359        }
360    }
361    pub fn new_stream_try<R, T, E>(
362        request: R,
363        ok_handler: impl TaskHandler<T, Frntend, Bkend, Md> + Send + Clone + 'static,
364        err_handler: impl TaskHandler<E, Frntend, Bkend, Md> + Send + Clone + 'static,
365        constraint: Option<Constraint<Md>>,
366    ) -> AsyncTask<Frntend, Bkend, Md>
367    where
368        R: BackendStreamingTask<Bkend, MetadataType = Md, Output = Result<T, E>>
369            + Send
370            + Debug
371            + 'static,
372        Bkend: 'static,
373        Frntend: 'static,
374        E: 'static,
375        T: 'static,
376    {
377        let metadata = R::metadata();
378        let type_id = request.type_id();
379        let type_name = type_name::<R>();
380        let type_debug = format!("{request:?}");
381        let task = Box::new(FusedTask::new_stream(
382            request,
383            TryHandler {
384                ok_handler,
385                err_handler,
386            },
387        ));
388        let task = StreamTask {
389            task,
390            type_id,
391            type_name,
392            type_debug,
393        };
394        AsyncTask {
395            task: AsyncTaskKind::Stream(task),
396            constraint,
397            metadata,
398        }
399    }
400    pub fn new_stream_option<R, T>(
401        request: R,
402        some_handler: impl TaskHandler<T, Frntend, Bkend, Md> + Send + Clone + 'static,
403        constraint: Option<Constraint<Md>>,
404    ) -> AsyncTask<Frntend, Bkend, Md>
405    where
406        R: BackendStreamingTask<Bkend, MetadataType = Md, Output = Option<T>>
407            + Send
408            + Debug
409            + 'static,
410        Bkend: 'static,
411        Frntend: 'static,
412        T: 'static,
413    {
414        let metadata = R::metadata();
415        let type_id = request.type_id();
416        let type_name = type_name::<R>();
417        let type_debug = format!("{request:?}");
418        let task = Box::new(FusedTask::new_stream(request, OptionHandler(some_handler)));
419        let task = StreamTask {
420            task,
421            type_id,
422            type_name,
423            type_debug,
424        };
425        AsyncTask {
426            task: AsyncTaskKind::Stream(task),
427            constraint,
428            metadata,
429        }
430    }
431    /// # Warning
432    /// This is recursive, if you have set up a cycle of AsyncTasks, map may
433    /// overflow.
434    pub fn map_frontend<NewFrntend>(
435        self,
436        f: impl FnOnce(&mut NewFrntend) -> &mut Frntend + Clone + Send + 'static,
437    ) -> AsyncTask<NewFrntend, Bkend, Md>
438    where
439        Bkend: 'static,
440        Frntend: 'static,
441        Md: 'static,
442    {
443        let Self {
444            task,
445            constraint,
446            metadata,
447        } = self;
448        match task {
449            AsyncTaskKind::Future(FutureTask {
450                task,
451                type_id,
452                type_name,
453                type_debug,
454            }) => {
455                let map = MapDynFutureTask { task, map_fn: f };
456                let task = Box::new(map);
457                let task = FutureTask {
458                    task,
459                    type_id,
460                    type_name,
461                    type_debug,
462                };
463                AsyncTask {
464                    task: AsyncTaskKind::Future(task),
465                    constraint,
466                    metadata,
467                }
468            }
469            AsyncTaskKind::Stream(StreamTask {
470                task,
471                type_id,
472                type_name,
473                type_debug,
474            }) => {
475                let map = MapDynStreamTask { task, map_fn: f };
476                let task = Box::new(map);
477                let task = StreamTask {
478                    task,
479                    type_id,
480                    type_name,
481                    type_debug,
482                };
483                AsyncTask {
484                    task: AsyncTaskKind::Stream(task),
485                    constraint,
486                    metadata,
487                }
488            }
489            AsyncTaskKind::NoOp => AsyncTask {
490                task: AsyncTaskKind::NoOp,
491                constraint,
492                metadata,
493            },
494            AsyncTaskKind::Multi(v) => {
495                let mapped = v
496                    .into_iter()
497                    .map(|task| task.map_frontend(f.clone()))
498                    .collect();
499                AsyncTask {
500                    task: AsyncTaskKind::Multi(mapped),
501                    constraint,
502                    metadata,
503                }
504            }
505        }
506    }
507}