veilid_tools/
eventual.rs

1/// Eventual is like Dart's "Completer"
2/// It is a thread-safe concurrent data future that may eventually resolve to a value
3/// Three variants exist
4/// Eventual, which will complete each 'instance' future to that instance's value (can be different per instance) only when 'resolve' is called.
5/// EventualValue, which will complete each 'instance' future when 'resolve' is called with an owned value, and one of those instances may 'take' the value.
6/// EventualValueClone, which will complete each 'instance' future when 'resolve' is called with a Clone-able value, and any of those instances may get a clone of that value.
7/// The future returned from an Eventual::resolve() can also be awaited on to wait until all instances have been completed
8use super::*;
9
10use eventual_base::*;
11
12pub struct Eventual {
13    inner: Arc<Mutex<EventualBaseInner<()>>>,
14}
15
16impl core::fmt::Debug for Eventual {
17    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
18        f.debug_struct("Eventual").finish()
19    }
20}
21
22impl Clone for Eventual {
23    fn clone(&self) -> Self {
24        Self {
25            inner: self.inner.clone(),
26        }
27    }
28}
29
30impl EventualBase for Eventual {
31    type ResolvedType = ();
32    fn base_inner(&self) -> MutexGuard<EventualBaseInner<Self::ResolvedType>> {
33        self.inner.lock()
34    }
35}
36
37impl Default for Eventual {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl Eventual {
44    pub fn new() -> Self {
45        Self {
46            inner: Arc::new(Mutex::new(EventualBaseInner::new())),
47        }
48    }
49
50    pub fn instance_clone<T>(&self, value: T) -> EventualFutureClone<T>
51    where
52        T: Clone + Unpin,
53    {
54        EventualFutureClone {
55            id: None,
56            value,
57            eventual: self.clone(),
58        }
59    }
60    pub fn instance_none<T>(&self) -> EventualFutureNone<T>
61    where
62        T: Unpin,
63    {
64        EventualFutureNone {
65            id: None,
66            eventual: self.clone(),
67            _marker: core::marker::PhantomData {},
68        }
69    }
70    pub fn instance_empty(&self) -> EventualFutureEmpty {
71        EventualFutureEmpty {
72            id: None,
73            eventual: self.clone(),
74        }
75    }
76
77    pub fn resolve(&self) -> EventualResolvedFuture<Self> {
78        self.resolve_to_value(())
79    }
80}
81
82///////
83
84pub struct EventualFutureClone<T>
85where
86    T: Clone + Unpin,
87{
88    id: Option<usize>,
89    value: T,
90    eventual: Eventual,
91}
92
93impl<T> Future for EventualFutureClone<T>
94where
95    T: Clone + Unpin,
96{
97    type Output = T;
98    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
99        let this = &mut *self;
100        let out = {
101            let mut inner = this.eventual.base_inner();
102            inner.instance_poll(&mut this.id, cx)
103        };
104        match out {
105            None => task::Poll::<Self::Output>::Pending,
106            Some(wakers) => {
107                // Wake all other instance futures
108                for w in wakers {
109                    w.wake();
110                }
111                task::Poll::<Self::Output>::Ready(this.value.clone())
112            }
113        }
114    }
115}
116
117impl<T> Drop for EventualFutureClone<T>
118where
119    T: Clone + Unpin,
120{
121    fn drop(&mut self) {
122        if let Some(id) = self.id.take() {
123            let wakers = {
124                let mut inner = self.eventual.base_inner();
125                inner.remove_waker(id)
126            };
127            for w in wakers {
128                w.wake();
129            }
130        }
131    }
132}
133
134///////
135
136pub struct EventualFutureNone<T>
137where
138    T: Unpin,
139{
140    id: Option<usize>,
141    eventual: Eventual,
142    _marker: core::marker::PhantomData<T>,
143}
144
145impl<T> Future for EventualFutureNone<T>
146where
147    T: Unpin,
148{
149    type Output = Option<T>;
150    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
151        let this = &mut *self;
152        let out = {
153            let mut inner = this.eventual.base_inner();
154            inner.instance_poll(&mut this.id, cx)
155        };
156        match out {
157            None => task::Poll::<Self::Output>::Pending,
158            Some(wakers) => {
159                // Wake all EventualResolvedFutures
160                for w in wakers {
161                    w.wake();
162                }
163                task::Poll::<Self::Output>::Ready(None)
164            }
165        }
166    }
167}
168
169impl<T> Drop for EventualFutureNone<T>
170where
171    T: Unpin,
172{
173    fn drop(&mut self) {
174        if let Some(id) = self.id.take() {
175            let wakers = {
176                let mut inner = self.eventual.base_inner();
177                inner.remove_waker(id)
178            };
179            for w in wakers {
180                w.wake();
181            }
182        }
183    }
184}
185
186///////
187
188pub struct EventualFutureEmpty {
189    id: Option<usize>,
190    eventual: Eventual,
191}
192
193impl Future for EventualFutureEmpty {
194    type Output = ();
195    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
196        let this = &mut *self;
197        let out = {
198            let mut inner = this.eventual.base_inner();
199            inner.instance_poll(&mut this.id, cx)
200        };
201        match out {
202            None => task::Poll::<Self::Output>::Pending,
203            Some(wakers) => {
204                // Wake all EventualResolvedFutures
205                for w in wakers {
206                    w.wake();
207                }
208                task::Poll::<Self::Output>::Ready(())
209            }
210        }
211    }
212}
213
214impl Drop for EventualFutureEmpty {
215    fn drop(&mut self) {
216        if let Some(id) = self.id.take() {
217            let wakers = {
218                let mut inner = self.eventual.base_inner();
219                inner.remove_waker(id)
220            };
221            for w in wakers {
222                w.wake();
223            }
224        }
225    }
226}