veilid_tools/
eventual_base.rs1use super::*;
2
3#[derive(ThisError, Debug, Clone, PartialEq, Eq)]
4pub enum EventualError {
5 #[error("Try failed: {0}")]
6 TryFailed(String),
7}
8
9pub struct EventualBaseInner<T> {
10 resolved: Option<T>,
11 wakers: BTreeMap<usize, task::Waker>,
12 resolved_wakers: BTreeMap<usize, task::Waker>,
13 freelist: Vec<usize>,
14 resolved_freelist: Vec<usize>,
15}
16
17impl<T> EventualBaseInner<T> {
18 pub(super) fn new() -> Self {
19 EventualBaseInner {
20 resolved: None,
21 wakers: BTreeMap::new(),
22 resolved_wakers: BTreeMap::new(),
23 freelist: Vec::new(),
24 resolved_freelist: Vec::new(),
25 }
26 }
27
28 pub(super) fn insert_waker(&mut self, waker: task::Waker) -> usize {
29 let id = match self.freelist.pop() {
30 Some(id) => id,
31 None => self.wakers.len(),
32 };
33 self.wakers.insert(id, waker);
34 id
35 }
36
37 #[must_use]
38 pub(super) fn remove_waker(&mut self, id: usize) -> Vec<task::Waker> {
39 self.freelist.push(id);
40 self.wakers.remove(&id);
41 let mut resolved_waker_list = Vec::new();
43 if self.wakers.is_empty() && self.resolved.is_some() {
44 for w in &self.resolved_wakers {
45 resolved_waker_list.push(w.1.clone());
46 }
47 }
48 resolved_waker_list
49 }
50
51 pub(super) fn insert_resolved_waker(&mut self, waker: task::Waker) -> usize {
52 let id = match self.resolved_freelist.pop() {
53 Some(id) => id,
54 None => self.resolved_wakers.len(),
55 };
56 self.resolved_wakers.insert(id, waker);
57 id
58 }
59
60 pub(super) fn remove_resolved_waker(&mut self, id: usize) {
61 self.resolved_freelist.push(id);
62 self.resolved_wakers.remove(&id);
63 }
64
65 #[must_use]
66 pub(super) fn resolve_and_get_wakers(&mut self, value: T) -> Option<Vec<task::Waker>> {
67 if self.resolved.is_some() {
68 return None;
70 }
71
72 self.resolved = Some(value);
74
75 let mut waker_list = Vec::new();
77 for w in &self.wakers {
78 waker_list.push(w.1.clone());
79 }
80 Some(waker_list)
81 }
82
83 pub(super) fn is_resolved(&self) -> bool {
84 self.resolved.is_some()
85 }
86 pub(super) fn resolved_value_ref(&self) -> &Option<T> {
87 &self.resolved
88 }
89 pub(super) fn resolved_value_mut(&mut self) -> &mut Option<T> {
90 &mut self.resolved
91 }
92
93 pub(super) fn reset(&mut self) {
94 assert_eq!(self.wakers.len(), 0);
95 assert_eq!(self.resolved_wakers.len(), 0);
96 self.resolved = None;
97 self.freelist.clear();
98 self.resolved_freelist.clear();
99 }
100
101 pub(super) fn try_reset(&mut self) -> Result<(), EventualError> {
102 if !self.wakers.is_empty() {
103 return Err(EventualError::TryFailed(
104 "wakers not empty during reset".to_owned(),
105 ));
106 }
107 if !self.resolved_wakers.is_empty() {
108 return Err(EventualError::TryFailed(
109 "Resolved wakers not empty during reset".to_owned(),
110 ));
111 }
112 self.reset();
113 Ok(())
114 }
115
116 pub(super) fn resolved_poll(
118 &mut self,
119 id: &mut Option<usize>,
120 cx: &mut task::Context<'_>,
121 ) -> task::Poll<()> {
122 if !self.wakers.is_empty() {
124 if id.is_none() {
125 *id = Some(self.insert_resolved_waker(cx.waker().clone()));
126 }
127 task::Poll::<()>::Pending
128 } else {
129 if let Some(id) = id.take() {
130 self.remove_resolved_waker(id);
131 }
132 task::Poll::<()>::Ready(())
133 }
134 }
135
136 #[must_use]
138 pub(super) fn instance_poll(
139 &mut self,
140 id: &mut Option<usize>,
141 cx: &mut task::Context<'_>,
142 ) -> Option<Vec<task::Waker>> {
143 if self.resolved.is_none() {
145 if id.is_none() {
146 *id = Some(self.insert_waker(cx.waker().clone()));
147 }
148 None
149 } else if let Some(id) = id.take() {
150 Some(self.remove_waker(id))
151 } else {
152 Some(Vec::new())
153 }
154 }
155}
156
157pub trait EventualBase: Clone + Unpin {
159 type ResolvedType;
160
161 fn base_inner(&self) -> MutexGuard<'_, EventualBaseInner<Self::ResolvedType>>;
162
163 fn resolve_to_value(&self, value: Self::ResolvedType) -> EventualResolvedFuture<Self> {
164 let wakers = {
165 let mut inner = self.base_inner();
166 inner.resolve_and_get_wakers(value)
167 };
168 if let Some(wakers) = wakers {
169 for w in wakers {
170 w.wake();
171 }
172 }
173 EventualResolvedFuture {
174 id: None,
175 eventual: self.clone(),
176 }
177 }
178}
179
180pub struct EventualResolvedFuture<B: EventualBase> {
181 id: Option<usize>,
182 eventual: B,
183}
184
185impl<B: EventualBase> Future for EventualResolvedFuture<B> {
186 type Output = ();
187 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
188 let this = &mut *self;
189 let mut inner = this.eventual.base_inner();
190 inner.resolved_poll(&mut this.id, cx)
191 }
192}
193
194impl<B: EventualBase> Drop for EventualResolvedFuture<B> {
195 fn drop(&mut self) {
196 if let Some(id) = self.id.take() {
197 let mut inner = self.eventual.base_inner();
198 inner.remove_resolved_waker(id);
199 }
200 }
201}
202
203pub trait EventualCommon: EventualBase {
204 fn is_resolved(&self) -> bool {
205 self.base_inner().is_resolved()
206 }
207
208 fn reset(&self) {
209 self.base_inner().reset()
210 }
211
212 fn try_reset(&self) -> Result<(), EventualError> {
213 self.base_inner().try_reset()
214 }
215}
216
217impl<T> EventualCommon for T where T: EventualBase {}