Skip to main content

zest_core/application/
task.rs

1//! Side-effecting work container returned from `init`/`update`.
2
3use alloc::{boxed::Box, vec::Vec};
4use core::{
5    future::{Future, pending, poll_fn},
6    pin::Pin,
7    task::Poll,
8};
9
10use crate::application::BoxFuture;
11
12/// A side-effecting future (or set of them) the runtime drives.
13///
14/// `Task` is the runtime's pending-work container: returned from
15/// [`crate::Application::init`] and [`crate::Application::update`], merged into
16/// the runtime's active task, and polled internally. Internally it stores a
17/// flat list of boxed futures; constructors flatten nested batches on
18/// insertion.
19///
20/// Constructors:
21/// - Task::none - no work.
22/// - Task::perform - drive a `Future<Output = Option<M>>`.
23/// - Task::future - drive a `Future<Output = M>`.
24/// - Task::batch - run multiple tasks concurrently.
25
26pub struct Task<M> {
27    pub(crate) futures: Vec<BoxFuture<Option<M>>>,
28}
29
30impl<M: 'static> Task<M> {
31    /// No work.
32    #[must_use]
33    pub fn none() -> Self {
34        Self {
35            futures: Vec::new(),
36        }
37    }
38
39    /// Run `fut`; produce its `Option<M>` result via `update`.
40    #[must_use]
41    pub fn perform<F>(fut: F) -> Self
42    where
43        F: Future<Output = Option<M>> + 'static,
44    {
45        let mut futures = Vec::with_capacity(1);
46        futures.push(Box::pin(fut) as BoxFuture<Option<M>>);
47        Self { futures }
48    }
49
50    /// Run `fut`; always produce its `M` result via `update`.
51    #[must_use]
52    pub fn future<F>(fut: F) -> Self
53    where
54        F: Future<Output = M> + 'static,
55    {
56        Self::perform(async move { Some(fut.await) })
57    }
58
59    /// Combine multiple tasks. All futures are driven concurrently;
60    /// resulting messages flow through `update` independently. Nested
61    /// batches are flattened.
62    #[must_use]
63    pub fn batch(tasks: impl IntoIterator<Item = Task<M>>) -> Self {
64        let mut futures: Vec<BoxFuture<Option<M>>> = Vec::new();
65        for task in tasks {
66            futures.extend(task.futures);
67        }
68
69        Self { futures }
70    }
71
72    /// True if this task has no pending futures.
73    pub(crate) fn is_empty(&self) -> bool {
74        self.futures.is_empty()
75    }
76
77    /// Merge `other`'s futures into `self`.
78    /// Equivalent to `*self = Task::batch([core::mem::take(self), other])`
79    /// without reallocating.
80    pub(crate) fn extend(&mut self, other: Task<M>) {
81        self.futures.extend(other.futures)
82    }
83
84    /// Poll all pending futures concurrently; return the first to
85    /// complete and remove it from the task. If empty, pends forever;
86    /// save to use a `select` arm.
87    pub(crate) async fn next(&mut self) -> Option<M> {
88        if self.is_empty() {
89            return pending::<Option<M>>().await;
90        }
91
92        poll_fn(|cx| {
93            let mut idx = 0;
94            while idx < self.futures.len() {
95                match Pin::as_mut(&mut self.futures[idx]).poll(cx) {
96                    Poll::Ready(result) => {
97                        let _ = self.futures.swap_remove(idx);
98                        return Poll::Ready(result);
99                    }
100                    Poll::Pending => idx += 1,
101                }
102            }
103
104            Poll::Pending
105        })
106        .await
107    }
108}
109
110impl<M> Default for Task<M> {
111    fn default() -> Self {
112        Self {
113            futures: Vec::new(),
114        }
115    }
116}