irox_tools/sync/
eventual.rs1extern crate alloc;
41
42use alloc::sync::Arc;
43use core::fmt::{Debug, Formatter};
44use core::future::Future;
45use core::pin::Pin;
46use core::task::{Context, Poll};
47use std::sync::{Condvar, Mutex, RwLock};
48
49#[derive(Default)]
52pub enum EventualStatus<T> {
53 #[default]
55 NotReady,
56 Running,
58 CompleteEmpty,
60 Complete(T),
62}
63impl<T> Debug for EventualStatus<T> {
64 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
65 match self {
66 EventualStatus::NotReady => write!(f, "NotReady"),
67 EventualStatus::Running => write!(f, "Running"),
68 EventualStatus::CompleteEmpty => write!(f, "CompleteEmpty"),
69 EventualStatus::Complete(_) => write!(f, "Complete"),
70 }
71 }
72}
73impl<T> From<Option<T>> for EventualStatus<T> {
74 fn from(value: Option<T>) -> Self {
75 match value {
76 None => EventualStatus::CompleteEmpty,
77 Some(v) => EventualStatus::Complete(v),
78 }
79 }
80}
81impl<T> From<EventualStatus<T>> for Option<T> {
82 fn from(value: EventualStatus<T>) -> Self {
83 if let EventualStatus::Complete(v) = value {
84 return Some(v);
85 }
86 None
87 }
88}
89impl<T: Clone> Clone for EventualStatus<T> {
90 fn clone(&self) -> Self {
91 match self {
92 EventualStatus::NotReady => EventualStatus::NotReady,
93 EventualStatus::Running => EventualStatus::Running,
94 EventualStatus::CompleteEmpty => EventualStatus::CompleteEmpty,
95 EventualStatus::Complete(c) => EventualStatus::Complete(c.clone()),
96 }
97 }
98}
99impl<T> EventualStatus<T> {
100 pub fn take(&mut self) -> Option<T> {
103 if !self.is_complete() {
104 return None;
105 }
106 core::mem::replace(self, EventualStatus::CompleteEmpty).into()
107 }
108 pub fn is_complete(&self) -> bool {
110 if let EventualStatus::NotReady = self {
111 return false;
112 } else if let EventualStatus::Running = self {
113 return false;
114 }
115 true
116 }
117 pub fn is_pending(&self) -> bool {
119 !self.is_complete()
120 }
121}
122
123struct EventualInner<T> {
124 condvar: Condvar,
125 guard: Mutex<()>,
126 val: RwLock<EventualStatus<Arc<T>>>,
127}
128impl<T> Default for EventualInner<T> {
129 fn default() -> Self {
130 EventualInner {
131 condvar: Default::default(),
132 guard: Default::default(),
133 val: RwLock::new(EventualStatus::NotReady),
134 }
135 }
136}
137impl<T> EventualInner<T> {
138 fn new(status: EventualStatus<Arc<T>>) -> Self {
139 EventualInner {
140 condvar: Default::default(),
141 guard: Default::default(),
142 val: RwLock::new(status),
143 }
144 }
145}
146#[derive(Clone)]
150pub struct Eventual<T> {
151 inner: Arc<EventualInner<T>>,
152}
153impl<T> Default for Eventual<T> {
154 fn default() -> Self {
155 Eventual {
156 inner: Arc::new(EventualInner::default()),
157 }
158 }
159}
160
161impl<T> Eventual<T> {
162 pub fn new_loaded(val: T) -> Self {
164 Eventual {
165 inner: Arc::new(EventualInner::new(EventualStatus::Complete(Arc::new(val)))),
166 }
167 }
168 pub fn set(&self, val: Option<T>) {
171 if let Ok(mut write) = self.inner.val.write() {
172 let val = val.map(Arc::new);
173 *write = val.into();
174 }
175 if self.is_ready() {
176 self.inner.condvar.notify_all();
177 }
178 }
179 pub fn set_shared(&self, val: Arc<T>) {
182 if let Ok(mut write) = self.inner.val.write() {
183 *write = EventualStatus::Complete(val);
184 self.inner.condvar.notify_all()
185 }
186 }
187 pub fn get(&self) -> EventualStatus<Arc<T>> {
191 if let Ok(read) = self.inner.val.read() {
192 return read.clone();
193 }
194 EventualStatus::NotReady
195 }
196 pub fn take(&self) -> Option<Arc<T>> {
200 if let Ok(mut write) = self.inner.val.write() {
201 return write.take();
202 }
203 None
204 }
205 pub fn start(&self) {
209 if let Ok(mut write) = self.inner.val.write() {
210 match *write {
211 EventualStatus::NotReady | EventualStatus::CompleteEmpty => {
212 *write = EventualStatus::Running;
213 }
214 _ => {
215 }
217 }
218 }
219 }
220 pub fn is_ready(&self) -> bool {
223 if let Ok(read) = self.inner.val.read() {
224 return read.is_complete();
225 }
226 false
227 }
228 pub fn is_pending(&self) -> bool {
231 if let Ok(read) = self.inner.val.read() {
232 return read.is_pending();
233 }
234 true
235 }
236 pub fn block_until_ready(&self) -> EventualStatus<Arc<T>> {
240 match self.get() {
241 EventualStatus::CompleteEmpty => return EventualStatus::CompleteEmpty,
242 EventualStatus::Complete(v) => return EventualStatus::Complete(v),
243 _ => {}
244 }
245 if let Ok(guard) = self.inner.guard.lock() {
246 let _unused = self.inner.condvar.wait_while(guard, |()| self.is_pending());
247 }
248 self.get()
249 }
250}
251
252impl<T> Future for Eventual<T> {
253 type Output = EventualStatus<Arc<T>>;
254
255 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
256 let status = self.get();
257 if status.is_complete() {
258 return Poll::Ready(status);
259 }
260 cx.waker().wake_by_ref();
261 Poll::Pending
262 }
263}