revenq/lib.rs
1/*!
2# Nomenclature
3This library generally is designed to handle events. It doesn't "pin" the
4user to a single event container, instead, it abstracts away from this and
5generally handles so-called revisions, which may contain one event at a time,
6or a `Vec<Event>`, the only requirements are that the revisions must
7have a [size known at compile time](core::marker::Sized)
8(due to limitations of the used backend).
9**/
10
11#![forbid(clippy::as_conversions, clippy::cast_ptr_alignment, trivial_casts)]
12
13// once_cell doesn't support no_std in the used flavor.
14// #![cfg_attr(not(feature = "std"), no_std)]
15
16extern crate alloc;
17extern crate core;
18
19use alloc::{sync::Arc, vec::Vec};
20use core::{fmt, marker::Unpin};
21use event_listener::Event;
22use once_cell::sync::OnceCell;
23
24fn perfect_unreachable() -> ! {
25 if core::cfg!(debug_assertions) {
26 unreachable!()
27 } else {
28 unsafe { core::hint::unreachable_unchecked() }
29 }
30}
31
32type NextRevision<T> = Arc<OnceCell<RevisionNode<T>>>;
33
34#[derive(Clone, Debug)]
35struct RevisionNode<T> {
36 next: NextRevision<T>,
37 data: T,
38}
39
40/// A owning reference to a revision.
41///
42/// Warning: Objects of this type must not be leaked, otherwise all future
43/// revisions will be leaked, too, and thus the memory of the queue is never freed.
44#[derive(Debug)]
45pub struct RevisionRef<T> {
46 inner: NextRevision<T>,
47}
48
49/// Error indicating a failed [`RevisionRef::try_detach`] call.
50#[derive(Clone, Debug)]
51pub struct RevisionDetachError;
52
53impl fmt::Display for RevisionDetachError {
54 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
55 write!(f, "failed to detach revision")
56 }
57}
58
59// #[cfg(feature = "std")]
60impl std::error::Error for RevisionDetachError {}
61
62impl<T> Clone for RevisionRef<T> {
63 fn clone(&self) -> Self {
64 Self {
65 inner: Arc::clone(&self.inner),
66 }
67 }
68}
69
70impl<T> core::ops::Deref for RevisionRef<T> {
71 type Target = T;
72
73 #[inline]
74 fn deref(&self) -> &T {
75 // This pointer should never change once RevisionRef is created until
76 // it's dropped.
77 &unsafe { self.inner.get_unchecked() }.data
78 }
79}
80
81unsafe impl<T> stable_deref_trait::StableDeref for RevisionRef<T> {}
82unsafe impl<T> stable_deref_trait::CloneStableDeref for RevisionRef<T> {}
83
84impl<T> RevisionRef<T> {
85 fn new_and_forward(nr: &mut NextRevision<T>) -> Option<Self> {
86 match nr.get() {
87 Some(_) => {
88 let x = Self {
89 inner: Arc::clone(&nr),
90 };
91 *nr = Arc::clone(&unsafe { nr.get_unchecked() }.next);
92 Some(x)
93 }
94 None => None,
95 }
96 }
97
98 /// Try to detach this revision from the following.
99 /// Only works if this `RevisionRef` is the last reference to this revision.
100 /// This is the case if no RevisionRef to a revision with precedes this
101 /// revision exist and this is the last ptr to this revision, and all queue
102 /// references have already consumed this revision.
103 /// Use this method to reduce queue memory usage if you want to store this
104 /// object long-term.
105 pub fn try_detach<'a>(this: &'a mut Self) -> Result<&'a mut T, RevisionDetachError> {
106 // get ownership over the Arc of revision $this.inner
107 // (with lifetime = as long as $this.inner exists with the current Arc)
108 let mut_this: &mut RevisionNode<T> = Arc::get_mut(&mut this.inner)
109 .ok_or(RevisionDetachError)?
110 .get_mut()
111 .unwrap();
112 // no other reference to *us* exists.
113 // override our $next ptr, thus decoupling this node from the following
114 mut_this.next = Arc::new(OnceCell::default());
115 Ok(&mut mut_this.data)
116 }
117
118 /// Similiar to [`try_detach`](RevisionRef::try_detach), detach this revision
119 /// if possible, but then unwrap the inner value
120 pub fn try_into_inner(mut this: Self) -> Result<T, Self> {
121 // get ownership over the Arc of revision $this.inner
122 let mut mut_this: RevisionNode<T> = match Arc::get_mut(&mut this.inner) {
123 Some(x) => x.take().unwrap(),
124 None => return Err(this),
125 };
126 // no other reference to *us* exists.
127 // override our $next ptr, thus decoupling this node from the following
128 mut_this.next = Arc::new(OnceCell::default());
129 Ok(mut_this.data)
130 }
131}
132
133/// A simple event / revision queue
134#[derive(Debug)]
135#[must_use = "Queue does nothing unless you call .next() or some variation of it"]
136pub struct Queue<T> {
137 // the $next field is partially shared, e.g. all queues derived from the same
138 // original queue can find the current $next value, but may be a bit behind
139 // (e.g. have unconsumed revisions,
140 // which should be iterated to get the current value)
141 next: NextRevision<T>,
142
143 // waiting next... calls
144 next_ops: Arc<Event>,
145
146 // currently pending revisions
147 pub pending: Vec<T>,
148}
149
150impl<T> Clone for Queue<T> {
151 #[inline]
152 fn clone(&self) -> Self {
153 Queue {
154 next: Arc::clone(&self.next),
155 next_ops: Arc::clone(&self.next_ops),
156 pending: Default::default(),
157 }
158 }
159}
160
161impl<T> Default for Queue<T> {
162 #[inline]
163 fn default() -> Self {
164 Queue {
165 next: Arc::new(Default::default()),
166 next_ops: Arc::new(Default::default()),
167 pending: Default::default(),
168 }
169 }
170}
171
172impl<T> Drop for Queue<T> {
173 fn drop(&mut self) {
174 if Arc::strong_count(&self.next_ops) == 2 {
175 self.next_ops.notify(1);
176 }
177 }
178}
179
180impl<T: Unpin> Unpin for Queue<T> {}
181
182impl<T> Iterator for Queue<T> {
183 type Item = RevisionRef<T>;
184
185 fn next(&mut self) -> Option<RevisionRef<T>> {
186 let orig_pending_len = self.pending.len();
187
188 let ret = match self.publish_intern() {
189 None => RevisionRef::new_and_forward(&mut self.next),
190 x => x,
191 };
192
193 // may have published something
194 if orig_pending_len != self.pending.len() {
195 self.next_ops.notify(usize::MAX);
196 }
197
198 ret
199 }
200}
201
202impl<T> Queue<T> {
203 #[inline(always)]
204 pub fn new() -> Self {
205 Default::default()
206 }
207
208 fn publish_intern(&mut self) -> Option<RevisionRef<T>> {
209 enum State<T> {
210 ToPublish { rest: Vec<T>, last: T },
211 Published { latest: NextRevision<T> },
212 }
213
214 let last = self.pending.pop()?;
215
216 // : try append to the first 'None' ptr in the 'latest' chain
217 // try to append revnode, if CAS succeeds, done, otherwise:
218 // return a RevisionRef for the failed CAS ptr, and the revnode;
219 // set $latest to the next ptr
220
221 let mut state = State::ToPublish {
222 rest: core::mem::take(&mut self.pending),
223 last,
224 };
225 let maybe_real_old = self.next.get_or_init(|| {
226 let latest = Arc::new(OnceCell::default());
227 if let State::ToPublish { rest, last } = core::mem::replace(
228 &mut state,
229 State::Published {
230 latest: Arc::clone(&latest),
231 },
232 ) {
233 // unpack the last one, so we get a node not wrapped in a Arc<OnceCell<_>>
234 let prela = RevisionNode {
235 data: last,
236 next: latest,
237 };
238 rest.into_iter().rev().fold(prela, |next, i| RevisionNode {
239 data: i,
240 next: Arc::new(OnceCell::from(next)),
241 })
242 } else {
243 perfect_unreachable()
244 }
245 });
246 match state {
247 State::Published { latest } => {
248 // CAS / publishing succeeded
249 self.next = latest;
250 None
251 }
252 State::ToPublish { mut rest, last } => {
253 // CAS failed
254 // we need to split this assignment to prevent rustc E0502
255 let new_next = Arc::clone(&maybe_real_old.next);
256
257 // this publishing failed
258 rest.push(last);
259 self.pending = rest;
260
261 // we discovered a new revision, return that
262 Some(RevisionRef {
263 // This is safe since the cell cannot be changed once it is set.
264 // use the next revision
265 inner: core::mem::replace(&mut self.next, new_next),
266 })
267 }
268 }
269 }
270
271 /// Waits asynchronously for an event to be published on the queue.
272 /// Only returns `None` if no other reference to the queue
273 /// exists anymore, because otherwise nothing could wake this up.
274 /// Tries to publish pending revisions while waiting.
275 pub async fn next_async(&mut self) -> Option<RevisionRef<T>> {
276 let mut listener = None;
277
278 loop {
279 if let ret @ Some(_) = self.next() {
280 // we got something, return
281 // notify another blocked receive operation
282 self.next_ops.notify(1);
283 return ret;
284 } else if Arc::get_mut(&mut self.next_ops).is_some() {
285 // cancel if no one is listening
286 // skip publishing + notifying phase bc no one is listening
287 // we need to re-check to catch a race-condition between
288 // the call to $self.next and the check of $self.next_ops
289 // in between other queue instances may have been destroyed,
290 // but messages are still in the queue.
291 return RevisionRef::new_and_forward(&mut self.next);
292 } else {
293 match listener.take() {
294 None => {
295 // Start listening and then try receiving again.
296 listener = Some(self.next_ops.listen());
297 }
298 Some(l) => {
299 // Wait for a notification.
300 l.await;
301 }
302 }
303 }
304 }
305 }
306
307 /// This method enqueues the pending revision for publishing.
308 /// The iterator **must** be "collected"/"polled"
309 /// (calling [`Iterator::next`] until it returns None) to publish them.
310 #[inline(always)]
311 pub fn enqueue(&mut self, pending: T) {
312 self.pending.push(pending);
313 }
314}
315
316// #[cfg(feature = "std")]
317impl<T: std::fmt::Debug> Queue<T> {
318 /// Helper function, prints all unprocessed, newly published revisions
319 #[cold]
320 pub fn print_debug<W: std::io::Write>(
321 &self,
322 mut writer: W,
323 prefix: &str,
324 ) -> std::io::Result<()> {
325 let mut cur = Arc::clone(&self.next);
326 let mut fi = true;
327 let mut tmpstr = String::new();
328 while let Some(x) = RevisionRef::new_and_forward(&mut cur) {
329 if !fi {
330 tmpstr.push(',');
331 tmpstr.push(' ');
332 }
333 tmpstr += &format!("{:?}", &*x);
334 fi = false;
335 }
336 writeln!(
337 writer,
338 "{} [{}] pending = {:?}; next_ops x{}",
339 prefix,
340 tmpstr,
341 &self.pending,
342 Arc::strong_count(&self.next_ops)
343 )?;
344 Ok(())
345 }
346}