Skip to main content

irox_tools/sync/
eventual.rs

1// SPDX-License-Identifier: MIT
2// Copyright 2025 IROX Contributors
3//
4
5//!
6//! Tools to represent a task that's "eventually" complete.  Very similar to a future, but without
7//! the rigidity and infrastructure.
8//!
9//! # Example:
10//! ```no_run
11//! use irox_tools::sync::{Eventual, EventualStatus};
12//! let eventual: Eventual<()> = Eventual::default();
13//! loop {
14//!     match eventual.get() {
15//!         EventualStatus::NotReady => {
16//!             // start the task, put into running state.
17//!             eventual.start();
18//!             // spawn a thread, enqueue a task, make the thing, etc.
19//!             continue;
20//!         }
21//!         EventualStatus::Running => {
22//!             // spin, maybe sleep or busy wait.
23//!             core::hint::spin_loop();
24//!             std::thread::sleep(std::time::Duration::from_millis(100));
25//!             continue;
26//!         }
27//!         EventualStatus::CompleteEmpty => {
28//!             // task is complete, but didn't return a response.
29//!             break;
30//!         }
31//!         EventualStatus::Complete(value) => {
32//!             // task is complete - use the value provided.
33//!             // note - value is behind an Arc, so multiple calls to `get` if complete
34//!             // will return the same value in memory
35//!             break;
36//!         }
37//!     }
38//! }
39//! ```
40extern crate alloc;
41
42use alloc::sync::Arc;
43use core::fmt::{Debug, Formatter};
44use core::future::Future;
45use core::pin::Pin;
46use core::task::{Context, Poll};
47use std::sync::{Condvar, Mutex, RwLock};
48
49///
50/// The current status of the Eventual Task
51#[derive(Default)]
52pub enum EventualStatus<T> {
53    /// Task has not started yet, or has failed and should be retried
54    #[default]
55    NotReady,
56    /// Task is actively running
57    Running,
58    /// Task is complete, and did not return a response (success/fail is not represented here, use a inner `Result`)
59    CompleteEmpty,
60    /// Task is complete, and has returned this response.
61    Complete(T),
62}
63impl<T> Debug for EventualStatus<T> {
64    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
65        match self {
66            EventualStatus::NotReady => write!(f, "NotReady"),
67            EventualStatus::Running => write!(f, "Running"),
68            EventualStatus::CompleteEmpty => write!(f, "CompleteEmpty"),
69            EventualStatus::Complete(_) => write!(f, "Complete"),
70        }
71    }
72}
73impl<T> From<Option<T>> for EventualStatus<T> {
74    fn from(value: Option<T>) -> Self {
75        match value {
76            None => EventualStatus::CompleteEmpty,
77            Some(v) => EventualStatus::Complete(v),
78        }
79    }
80}
81impl<T> From<EventualStatus<T>> for Option<T> {
82    fn from(value: EventualStatus<T>) -> Self {
83        if let EventualStatus::Complete(v) = value {
84            return Some(v);
85        }
86        None
87    }
88}
89impl<T: Clone> Clone for EventualStatus<T> {
90    fn clone(&self) -> Self {
91        match self {
92            EventualStatus::NotReady => EventualStatus::NotReady,
93            EventualStatus::Running => EventualStatus::Running,
94            EventualStatus::CompleteEmpty => EventualStatus::CompleteEmpty,
95            EventualStatus::Complete(c) => EventualStatus::Complete(c.clone()),
96        }
97    }
98}
99impl<T> EventualStatus<T> {
100    /// If the task is complete, return the completed value.  If it's not complete (or completed empty)
101    /// returns [`None`]
102    pub fn take(&mut self) -> Option<T> {
103        if !self.is_complete() {
104            return None;
105        }
106        core::mem::replace(self, EventualStatus::CompleteEmpty).into()
107    }
108    /// True if the task is completed.
109    pub fn is_complete(&self) -> bool {
110        if let EventualStatus::NotReady = self {
111            return false;
112        } else if let EventualStatus::Running = self {
113            return false;
114        }
115        true
116    }
117    /// True if the task is pending
118    pub fn is_pending(&self) -> bool {
119        !self.is_complete()
120    }
121}
122
123struct EventualInner<T> {
124    condvar: Condvar,
125    guard: Mutex<()>,
126    val: RwLock<EventualStatus<Arc<T>>>,
127}
128impl<T> Default for EventualInner<T> {
129    fn default() -> Self {
130        EventualInner {
131            condvar: Default::default(),
132            guard: Default::default(),
133            val: RwLock::new(EventualStatus::NotReady),
134        }
135    }
136}
137impl<T> EventualInner<T> {
138    fn new(status: EventualStatus<Arc<T>>) -> Self {
139        EventualInner {
140            condvar: Default::default(),
141            guard: Default::default(),
142            val: RwLock::new(status),
143        }
144    }
145}
146///
147/// Represents a computation result that may eventually appear.  Semantically equivalent to a Future,
148/// but doesn't necessarily require an Async ecosystem around it.
149#[derive(Clone)]
150pub struct Eventual<T> {
151    inner: Arc<EventualInner<T>>,
152}
153impl<T> Default for Eventual<T> {
154    fn default() -> Self {
155        Eventual {
156            inner: Arc::new(EventualInner::default()),
157        }
158    }
159}
160
161impl<T> Eventual<T> {
162    /// Creates a new already completed task with the specified value
163    pub fn new_loaded(val: T) -> Self {
164        Eventual {
165            inner: Arc::new(EventualInner::new(EventualStatus::Complete(Arc::new(val)))),
166        }
167    }
168    /// Sets the completion status of this task.  [`Some`] becomes [`EventualStatus::Complete`],
169    /// [`None`] becomes [`EventualStatus::CompleteEmpty`]
170    pub fn set(&self, val: Option<T>) {
171        if let Ok(mut write) = self.inner.val.write() {
172            let val = val.map(Arc::new);
173            *write = val.into();
174        }
175        if self.is_ready() {
176            self.inner.condvar.notify_all();
177        }
178    }
179    ///
180    /// Sets the completed status to [`EventualStatus::Complete`] using the specified value.
181    pub fn set_shared(&self, val: Arc<T>) {
182        if let Ok(mut write) = self.inner.val.write() {
183            *write = EventualStatus::Complete(val);
184            self.inner.condvar.notify_all()
185        }
186    }
187    ///
188    /// Gets the current status of this task.  This is the primary method to poll over.
189    /// See the module docs for an example.
190    pub fn get(&self) -> EventualStatus<Arc<T>> {
191        if let Ok(read) = self.inner.val.read() {
192            return read.clone();
193        }
194        EventualStatus::NotReady
195    }
196    ///
197    /// If the task is complete, take the Arc out of storage.  Further calls to `get` or `take` will
198    /// return `None`
199    pub fn take(&self) -> Option<Arc<T>> {
200        if let Ok(mut write) = self.inner.val.write() {
201            return write.take();
202        }
203        None
204    }
205    ///
206    /// Move the task state from `NotReady` or `CompleteEmpty` to `Running`.  Does nothing if already
207    /// complete.
208    pub fn start(&self) {
209        if let Ok(mut write) = self.inner.val.write() {
210            match *write {
211                EventualStatus::NotReady | EventualStatus::CompleteEmpty => {
212                    *write = EventualStatus::Running;
213                }
214                _ => {
215                    //noop
216                }
217            }
218        }
219    }
220    ///
221    /// True if this task is complete with [`EventualStatus::CompleteEmpty`] or [`EventualStatus::Complete`]
222    pub fn is_ready(&self) -> bool {
223        if let Ok(read) = self.inner.val.read() {
224            return read.is_complete();
225        }
226        false
227    }
228    ///
229    /// True if this task has status [`EventualStatus::NotRunning`] or [`EventualStatus::Running`]
230    pub fn is_pending(&self) -> bool {
231        if let Ok(read) = self.inner.val.read() {
232            return read.is_pending();
233        }
234        true
235    }
236    ///
237    /// Blocks until this task transitions into [`EventualStatus::CompleteEmpty`] or
238    /// [`EventualStatus::Complete`].  If already in this state, quick returns.
239    pub fn block_until_ready(&self) -> EventualStatus<Arc<T>> {
240        match self.get() {
241            EventualStatus::CompleteEmpty => return EventualStatus::CompleteEmpty,
242            EventualStatus::Complete(v) => return EventualStatus::Complete(v),
243            _ => {}
244        }
245        if let Ok(guard) = self.inner.guard.lock() {
246            let _unused = self.inner.condvar.wait_while(guard, |()| self.is_pending());
247        }
248        self.get()
249    }
250}
251
252impl<T> Future for Eventual<T> {
253    type Output = EventualStatus<Arc<T>>;
254
255    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
256        let status = self.get();
257        if status.is_complete() {
258            return Poll::Ready(status);
259        }
260        cx.waker().wake_by_ref();
261        Poll::Pending
262    }
263}