veilid_tools/
eventual_base.rs

1use super::*;
2
3#[derive(ThisError, Debug, Clone, PartialEq, Eq)]
4pub enum EventualError {
5    #[error("Try failed: {0}")]
6    TryFailed(String),
7}
8
9pub struct EventualBaseInner<T> {
10    resolved: Option<T>,
11    wakers: BTreeMap<usize, task::Waker>,
12    resolved_wakers: BTreeMap<usize, task::Waker>,
13    freelist: Vec<usize>,
14    resolved_freelist: Vec<usize>,
15}
16
17impl<T> EventualBaseInner<T> {
18    pub(super) fn new() -> Self {
19        EventualBaseInner {
20            resolved: None,
21            wakers: BTreeMap::new(),
22            resolved_wakers: BTreeMap::new(),
23            freelist: Vec::new(),
24            resolved_freelist: Vec::new(),
25        }
26    }
27
28    pub(super) fn insert_waker(&mut self, waker: task::Waker) -> usize {
29        let id = match self.freelist.pop() {
30            Some(id) => id,
31            None => self.wakers.len(),
32        };
33        self.wakers.insert(id, waker);
34        id
35    }
36
37    #[must_use]
38    pub(super) fn remove_waker(&mut self, id: usize) -> Vec<task::Waker> {
39        self.freelist.push(id);
40        self.wakers.remove(&id);
41        // See if we should complete the EventualResolvedFutures
42        let mut resolved_waker_list = Vec::new();
43        if self.wakers.is_empty() && self.resolved.is_some() {
44            for w in &self.resolved_wakers {
45                resolved_waker_list.push(w.1.clone());
46            }
47        }
48        resolved_waker_list
49    }
50
51    pub(super) fn insert_resolved_waker(&mut self, waker: task::Waker) -> usize {
52        let id = match self.resolved_freelist.pop() {
53            Some(id) => id,
54            None => self.resolved_wakers.len(),
55        };
56        self.resolved_wakers.insert(id, waker);
57        id
58    }
59
60    pub(super) fn remove_resolved_waker(&mut self, id: usize) {
61        self.resolved_freelist.push(id);
62        self.resolved_wakers.remove(&id);
63    }
64
65    #[must_use]
66    pub(super) fn resolve_and_get_wakers(&mut self, value: T) -> Option<Vec<task::Waker>> {
67        if self.resolved.is_some() {
68            // Already resolved
69            return None;
70        }
71
72        // Store resolved value
73        self.resolved = Some(value);
74
75        // Return a copy of the waker list so the caller can wake all the EventualFutures
76        let mut waker_list = Vec::new();
77        for w in &self.wakers {
78            waker_list.push(w.1.clone());
79        }
80        Some(waker_list)
81    }
82
83    pub(super) fn is_resolved(&self) -> bool {
84        self.resolved.is_some()
85    }
86    pub(super) fn resolved_value_ref(&self) -> &Option<T> {
87        &self.resolved
88    }
89    pub(super) fn resolved_value_mut(&mut self) -> &mut Option<T> {
90        &mut self.resolved
91    }
92
93    pub(super) fn reset(&mut self) {
94        assert_eq!(self.wakers.len(), 0);
95        assert_eq!(self.resolved_wakers.len(), 0);
96        self.resolved = None;
97        self.freelist.clear();
98        self.resolved_freelist.clear();
99    }
100
101    pub(super) fn try_reset(&mut self) -> Result<(), EventualError> {
102        if !self.wakers.is_empty() {
103            return Err(EventualError::TryFailed(
104                "wakers not empty during reset".to_owned(),
105            ));
106        }
107        if !self.resolved_wakers.is_empty() {
108            return Err(EventualError::TryFailed(
109                "Resolved wakers not empty during reset".to_owned(),
110            ));
111        }
112        self.reset();
113        Ok(())
114    }
115
116    // Resolved future helpers
117    pub(super) fn resolved_poll(
118        &mut self,
119        id: &mut Option<usize>,
120        cx: &mut task::Context<'_>,
121    ) -> task::Poll<()> {
122        // If there are any instance futures still waiting, we resolution isn't finished
123        if !self.wakers.is_empty() {
124            if id.is_none() {
125                *id = Some(self.insert_resolved_waker(cx.waker().clone()));
126            }
127            task::Poll::<()>::Pending
128        } else {
129            if let Some(id) = id.take() {
130                self.remove_resolved_waker(id);
131            }
132            task::Poll::<()>::Ready(())
133        }
134    }
135
136    // Instance future helpers
137    #[must_use]
138    pub(super) fn instance_poll(
139        &mut self,
140        id: &mut Option<usize>,
141        cx: &mut task::Context<'_>,
142    ) -> Option<Vec<task::Waker>> {
143        // If the resolved value hasn't showed up then we can't wake the instance futures
144        if self.resolved.is_none() {
145            if id.is_none() {
146                *id = Some(self.insert_waker(cx.waker().clone()));
147            }
148            None
149        } else if let Some(id) = id.take() {
150            Some(self.remove_waker(id))
151        } else {
152            Some(Vec::new())
153        }
154    }
155}
156
157// xxx: this would love to be 'pub(super)' instead of pub, to ensure nobody else touches resolve_to_value directly
158pub trait EventualBase: Clone + Unpin {
159    type ResolvedType;
160
161    fn base_inner(&self) -> MutexGuard<EventualBaseInner<Self::ResolvedType>>;
162
163    fn resolve_to_value(&self, value: Self::ResolvedType) -> EventualResolvedFuture<Self> {
164        let wakers = {
165            let mut inner = self.base_inner();
166            inner.resolve_and_get_wakers(value)
167        };
168        if let Some(wakers) = wakers {
169            for w in wakers {
170                w.wake();
171            }
172        }
173        EventualResolvedFuture {
174            id: None,
175            eventual: self.clone(),
176        }
177    }
178}
179
180pub struct EventualResolvedFuture<B: EventualBase> {
181    id: Option<usize>,
182    eventual: B,
183}
184
185impl<B: EventualBase> Future for EventualResolvedFuture<B> {
186    type Output = ();
187    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
188        let this = &mut *self;
189        let mut inner = this.eventual.base_inner();
190        inner.resolved_poll(&mut this.id, cx)
191    }
192}
193
194impl<B: EventualBase> Drop for EventualResolvedFuture<B> {
195    fn drop(&mut self) {
196        if let Some(id) = self.id.take() {
197            let mut inner = self.eventual.base_inner();
198            inner.remove_resolved_waker(id);
199        }
200    }
201}
202
203pub trait EventualCommon: EventualBase {
204    fn is_resolved(&self) -> bool {
205        self.base_inner().is_resolved()
206    }
207
208    fn reset(&self) {
209        self.base_inner().reset()
210    }
211
212    fn try_reset(&self) -> Result<(), EventualError> {
213        self.base_inner().try_reset()
214    }
215}
216
217impl<T> EventualCommon for T where T: EventualBase {}