Skip to main content

veilid_tools/
eventual.rs

1/// Eventual is like Dart's "Completer"
2/// It is a thread-safe concurrent data future that may eventually resolve to a value
3/// Three variants exist
4/// Eventual, which will complete each 'instance' future to that instance's value (can be different per instance) only when 'resolve' is called.
5/// EventualValue, which will complete each 'instance' future when 'resolve' is called with an owned value, and one of those instances may 'take' the value.
6/// EventualValueClone, which will complete each 'instance' future when 'resolve' is called with a Clone-able value, and any of those instances may get a clone of that value.
7/// The future returned from an Eventual::resolve() can also be awaited on to wait until all instances have been completed
8use super::*;
9
10use eventual_base::*;
11
12pub struct Eventual {
13    inner: Arc<Mutex<EventualBaseInner<()>>>,
14}
15
16impl core::fmt::Debug for Eventual {
17    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
18        f.debug_struct("Eventual").finish()
19    }
20}
21
22impl Clone for Eventual {
23    fn clone(&self) -> Self {
24        Self {
25            inner: self.inner.clone(),
26        }
27    }
28}
29
30impl EventualBase for Eventual {
31    type ResolvedType = ();
32    fn base_inner(&self) -> MutexGuard<'_, EventualBaseInner<Self::ResolvedType>> {
33        self.inner.lock()
34    }
35}
36
37impl Default for Eventual {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl Eventual {
44    #[must_use]
45    pub fn new() -> Self {
46        Self {
47            inner: Arc::new(Mutex::new(EventualBaseInner::new())),
48        }
49    }
50
51    pub fn instance_clone<T>(&self, value: T) -> EventualFutureClone<T>
52    where
53        T: Clone + Unpin,
54    {
55        EventualFutureClone {
56            id: None,
57            value,
58            eventual: self.clone(),
59        }
60    }
61    #[must_use]
62    pub fn instance_none<T>(&self) -> EventualFutureNone<T>
63    where
64        T: Unpin,
65    {
66        EventualFutureNone {
67            id: None,
68            eventual: self.clone(),
69            _marker: core::marker::PhantomData {},
70        }
71    }
72    #[must_use]
73    pub fn instance_empty(&self) -> EventualFutureEmpty {
74        EventualFutureEmpty {
75            id: None,
76            eventual: self.clone(),
77        }
78    }
79
80    #[must_use]
81    pub fn resolve(&self) -> EventualResolvedFuture<Self> {
82        self.resolve_to_value(())
83    }
84}
85
86///////
87
88pub struct EventualFutureClone<T>
89where
90    T: Clone + Unpin,
91{
92    id: Option<usize>,
93    value: T,
94    eventual: Eventual,
95}
96
97impl<T> Future for EventualFutureClone<T>
98where
99    T: Clone + Unpin,
100{
101    type Output = T;
102    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
103        let this = &mut *self;
104        let out = {
105            let mut inner = this.eventual.base_inner();
106            inner.instance_poll(&mut this.id, cx)
107        };
108        match out {
109            None => task::Poll::<Self::Output>::Pending,
110            Some(wakers) => {
111                // Wake all other instance futures
112                for w in wakers {
113                    w.wake();
114                }
115                task::Poll::<Self::Output>::Ready(this.value.clone())
116            }
117        }
118    }
119}
120
121impl<T> Drop for EventualFutureClone<T>
122where
123    T: Clone + Unpin,
124{
125    fn drop(&mut self) {
126        if let Some(id) = self.id.take() {
127            let wakers = {
128                let mut inner = self.eventual.base_inner();
129                inner.remove_waker(id)
130            };
131            for w in wakers {
132                w.wake();
133            }
134        }
135    }
136}
137
138///////
139
140pub struct EventualFutureNone<T>
141where
142    T: Unpin,
143{
144    id: Option<usize>,
145    eventual: Eventual,
146    _marker: core::marker::PhantomData<T>,
147}
148
149impl<T> Future for EventualFutureNone<T>
150where
151    T: Unpin,
152{
153    type Output = Option<T>;
154    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
155        let this = &mut *self;
156        let out = {
157            let mut inner = this.eventual.base_inner();
158            inner.instance_poll(&mut this.id, cx)
159        };
160        match out {
161            None => task::Poll::<Self::Output>::Pending,
162            Some(wakers) => {
163                // Wake all EventualResolvedFutures
164                for w in wakers {
165                    w.wake();
166                }
167                task::Poll::<Self::Output>::Ready(None)
168            }
169        }
170    }
171}
172
173impl<T> Drop for EventualFutureNone<T>
174where
175    T: Unpin,
176{
177    fn drop(&mut self) {
178        if let Some(id) = self.id.take() {
179            let wakers = {
180                let mut inner = self.eventual.base_inner();
181                inner.remove_waker(id)
182            };
183            for w in wakers {
184                w.wake();
185            }
186        }
187    }
188}
189
190///////
191
192pub struct EventualFutureEmpty {
193    id: Option<usize>,
194    eventual: Eventual,
195}
196
197impl Future for EventualFutureEmpty {
198    type Output = ();
199    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
200        let this = &mut *self;
201        let out = {
202            let mut inner = this.eventual.base_inner();
203            inner.instance_poll(&mut this.id, cx)
204        };
205        match out {
206            None => task::Poll::<Self::Output>::Pending,
207            Some(wakers) => {
208                // Wake all EventualResolvedFutures
209                for w in wakers {
210                    w.wake();
211                }
212                task::Poll::<Self::Output>::Ready(())
213            }
214        }
215    }
216}
217
218impl Drop for EventualFutureEmpty {
219    fn drop(&mut self) {
220        if let Some(id) = self.id.take() {
221            let wakers = {
222                let mut inner = self.eventual.base_inner();
223                inner.remove_waker(id)
224            };
225            for w in wakers {
226                w.wake();
227            }
228        }
229    }
230}