zest_core/application/task.rs
1//! Side-effecting work container returned from `init`/`update`.
2
3use alloc::{boxed::Box, vec::Vec};
4use core::{
5 future::{Future, pending, poll_fn},
6 pin::Pin,
7 task::Poll,
8};
9
10use crate::application::BoxFuture;
11
12/// A side-effecting future (or set of them) the runtime drives.
13///
14/// `Task` is the runtime's pending-work container: returned from
15/// [`crate::Application::init`] and [`crate::Application::update`], merged into
16/// the runtime's active task, and polled internally. Internally it stores a
17/// flat list of boxed futures; constructors flatten nested batches on
18/// insertion.
19///
20/// Constructors:
21/// - Task::none - no work.
22/// - Task::perform - drive a `Future<Output = Option<M>>`.
23/// - Task::future - drive a `Future<Output = M>`.
24/// - Task::batch - run multiple tasks concurrently.
25
26pub struct Task<M> {
27 pub(crate) futures: Vec<BoxFuture<Option<M>>>,
28}
29
30impl<M: 'static> Task<M> {
31 /// No work.
32 #[must_use]
33 pub fn none() -> Self {
34 Self {
35 futures: Vec::new(),
36 }
37 }
38
39 /// Run `fut`; produce its `Option<M>` result via `update`.
40 #[must_use]
41 pub fn perform<F>(fut: F) -> Self
42 where
43 F: Future<Output = Option<M>> + 'static,
44 {
45 let mut futures = Vec::with_capacity(1);
46 futures.push(Box::pin(fut) as BoxFuture<Option<M>>);
47 Self { futures }
48 }
49
50 /// Run `fut`; always produce its `M` result via `update`.
51 #[must_use]
52 pub fn future<F>(fut: F) -> Self
53 where
54 F: Future<Output = M> + 'static,
55 {
56 Self::perform(async move { Some(fut.await) })
57 }
58
59 /// Combine multiple tasks. All futures are driven concurrently;
60 /// resulting messages flow through `update` independently. Nested
61 /// batches are flattened.
62 #[must_use]
63 pub fn batch(tasks: impl IntoIterator<Item = Task<M>>) -> Self {
64 let mut futures: Vec<BoxFuture<Option<M>>> = Vec::new();
65 for task in tasks {
66 futures.extend(task.futures);
67 }
68
69 Self { futures }
70 }
71
72 /// True if this task has no pending futures.
73 pub(crate) fn is_empty(&self) -> bool {
74 self.futures.is_empty()
75 }
76
77 /// Merge `other`'s futures into `self`.
78 /// Equivalent to `*self = Task::batch([core::mem::take(self), other])`
79 /// without reallocating.
80 pub(crate) fn extend(&mut self, other: Task<M>) {
81 self.futures.extend(other.futures)
82 }
83
84 /// Poll all pending futures concurrently; return the first to
85 /// complete and remove it from the task. If empty, pends forever;
86 /// save to use a `select` arm.
87 pub(crate) async fn next(&mut self) -> Option<M> {
88 if self.is_empty() {
89 return pending::<Option<M>>().await;
90 }
91
92 poll_fn(|cx| {
93 let mut idx = 0;
94 while idx < self.futures.len() {
95 match Pin::as_mut(&mut self.futures[idx]).poll(cx) {
96 Poll::Ready(result) => {
97 let _ = self.futures.swap_remove(idx);
98 return Poll::Ready(result);
99 }
100 Poll::Pending => idx += 1,
101 }
102 }
103
104 Poll::Pending
105 })
106 .await
107 }
108}
109
110impl<M> Default for Task<M> {
111 fn default() -> Self {
112 Self {
113 futures: Vec::new(),
114 }
115 }
116}