revenq/
lib.rs

1/*!
2# Nomenclature
3This library generally is designed to handle events. It doesn't "pin" the
4user to a single event container, instead, it abstracts away from this and
5generally handles so-called revisions, which may contain one event at a time,
6or a `Vec<Event>`, the only requirements are that the revisions must
7have a [size known at compile time](core::marker::Sized)
8(due to limitations of the used backend).
9**/
10
11#![forbid(clippy::as_conversions, clippy::cast_ptr_alignment, trivial_casts)]
12
13// once_cell doesn't support no_std in the used flavor.
14// #![cfg_attr(not(feature = "std"), no_std)]
15
16extern crate alloc;
17extern crate core;
18
19use alloc::{sync::Arc, vec::Vec};
20use core::{fmt, marker::Unpin};
21use event_listener::Event;
22use once_cell::sync::OnceCell;
23
24fn perfect_unreachable() -> ! {
25    if core::cfg!(debug_assertions) {
26        unreachable!()
27    } else {
28        unsafe { core::hint::unreachable_unchecked() }
29    }
30}
31
32type NextRevision<T> = Arc<OnceCell<RevisionNode<T>>>;
33
34#[derive(Clone, Debug)]
35struct RevisionNode<T> {
36    next: NextRevision<T>,
37    data: T,
38}
39
40/// A owning reference to a revision.
41///
42/// Warning: Objects of this type must not be leaked, otherwise all future
43/// revisions will be leaked, too, and thus the memory of the queue is never freed.
44#[derive(Debug)]
45pub struct RevisionRef<T> {
46    inner: NextRevision<T>,
47}
48
49/// Error indicating a failed [`RevisionRef::try_detach`] call.
50#[derive(Clone, Debug)]
51pub struct RevisionDetachError;
52
53impl fmt::Display for RevisionDetachError {
54    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
55        write!(f, "failed to detach revision")
56    }
57}
58
59// #[cfg(feature = "std")]
60impl std::error::Error for RevisionDetachError {}
61
62impl<T> Clone for RevisionRef<T> {
63    fn clone(&self) -> Self {
64        Self {
65            inner: Arc::clone(&self.inner),
66        }
67    }
68}
69
70impl<T> core::ops::Deref for RevisionRef<T> {
71    type Target = T;
72
73    #[inline]
74    fn deref(&self) -> &T {
75        // This pointer should never change once RevisionRef is created until
76        // it's dropped.
77        &unsafe { self.inner.get_unchecked() }.data
78    }
79}
80
81unsafe impl<T> stable_deref_trait::StableDeref for RevisionRef<T> {}
82unsafe impl<T> stable_deref_trait::CloneStableDeref for RevisionRef<T> {}
83
84impl<T> RevisionRef<T> {
85    fn new_and_forward(nr: &mut NextRevision<T>) -> Option<Self> {
86        match nr.get() {
87            Some(_) => {
88                let x = Self {
89                    inner: Arc::clone(&nr),
90                };
91                *nr = Arc::clone(&unsafe { nr.get_unchecked() }.next);
92                Some(x)
93            }
94            None => None,
95        }
96    }
97
98    /// Try to detach this revision from the following.
99    /// Only works if this `RevisionRef` is the last reference to this revision.
100    /// This is the case if no RevisionRef to a revision with precedes this
101    /// revision exist and this is the last ptr to this revision, and all queue
102    /// references have already consumed this revision.
103    /// Use this method to reduce queue memory usage if you want to store this
104    /// object long-term.
105    pub fn try_detach<'a>(this: &'a mut Self) -> Result<&'a mut T, RevisionDetachError> {
106        // get ownership over the Arc of revision $this.inner
107        // (with lifetime = as long as $this.inner exists with the current Arc)
108        let mut_this: &mut RevisionNode<T> = Arc::get_mut(&mut this.inner)
109            .ok_or(RevisionDetachError)?
110            .get_mut()
111            .unwrap();
112        // no other reference to *us* exists.
113        // override our $next ptr, thus decoupling this node from the following
114        mut_this.next = Arc::new(OnceCell::default());
115        Ok(&mut mut_this.data)
116    }
117
118    /// Similiar to [`try_detach`](RevisionRef::try_detach), detach this revision
119    /// if possible, but then unwrap the inner value
120    pub fn try_into_inner(mut this: Self) -> Result<T, Self> {
121        // get ownership over the Arc of revision $this.inner
122        let mut mut_this: RevisionNode<T> = match Arc::get_mut(&mut this.inner) {
123            Some(x) => x.take().unwrap(),
124            None => return Err(this),
125        };
126        // no other reference to *us* exists.
127        // override our $next ptr, thus decoupling this node from the following
128        mut_this.next = Arc::new(OnceCell::default());
129        Ok(mut_this.data)
130    }
131}
132
133/// A simple event / revision queue
134#[derive(Debug)]
135#[must_use = "Queue does nothing unless you call .next() or some variation of it"]
136pub struct Queue<T> {
137    // the $next field is partially shared, e.g. all queues derived from the same
138    // original queue can find the current $next value, but may be a bit behind
139    // (e.g. have unconsumed revisions,
140    //  which should be iterated to get the current value)
141    next: NextRevision<T>,
142
143    // waiting next... calls
144    next_ops: Arc<Event>,
145
146    // currently pending revisions
147    pub pending: Vec<T>,
148}
149
150impl<T> Clone for Queue<T> {
151    #[inline]
152    fn clone(&self) -> Self {
153        Queue {
154            next: Arc::clone(&self.next),
155            next_ops: Arc::clone(&self.next_ops),
156            pending: Default::default(),
157        }
158    }
159}
160
161impl<T> Default for Queue<T> {
162    #[inline]
163    fn default() -> Self {
164        Queue {
165            next: Arc::new(Default::default()),
166            next_ops: Arc::new(Default::default()),
167            pending: Default::default(),
168        }
169    }
170}
171
172impl<T> Drop for Queue<T> {
173    fn drop(&mut self) {
174        if Arc::strong_count(&self.next_ops) == 2 {
175            self.next_ops.notify(1);
176        }
177    }
178}
179
180impl<T: Unpin> Unpin for Queue<T> {}
181
182impl<T> Iterator for Queue<T> {
183    type Item = RevisionRef<T>;
184
185    fn next(&mut self) -> Option<RevisionRef<T>> {
186        let orig_pending_len = self.pending.len();
187
188        let ret = match self.publish_intern() {
189            None => RevisionRef::new_and_forward(&mut self.next),
190            x => x,
191        };
192
193        // may have published something
194        if orig_pending_len != self.pending.len() {
195            self.next_ops.notify(usize::MAX);
196        }
197
198        ret
199    }
200}
201
202impl<T> Queue<T> {
203    #[inline(always)]
204    pub fn new() -> Self {
205        Default::default()
206    }
207
208    fn publish_intern(&mut self) -> Option<RevisionRef<T>> {
209        enum State<T> {
210            ToPublish { rest: Vec<T>, last: T },
211            Published { latest: NextRevision<T> },
212        }
213
214        let last = self.pending.pop()?;
215
216        // : try append to the first 'None' ptr in the 'latest' chain
217        // try to append revnode, if CAS succeeds, done, otherwise:
218        // return a RevisionRef for the failed CAS ptr, and the revnode;
219        // set $latest to the next ptr
220
221        let mut state = State::ToPublish {
222            rest: core::mem::take(&mut self.pending),
223            last,
224        };
225        let maybe_real_old = self.next.get_or_init(|| {
226            let latest = Arc::new(OnceCell::default());
227            if let State::ToPublish { rest, last } = core::mem::replace(
228                &mut state,
229                State::Published {
230                    latest: Arc::clone(&latest),
231                },
232            ) {
233                // unpack the last one, so we get a node not wrapped in a Arc<OnceCell<_>>
234                let prela = RevisionNode {
235                    data: last,
236                    next: latest,
237                };
238                rest.into_iter().rev().fold(prela, |next, i| RevisionNode {
239                    data: i,
240                    next: Arc::new(OnceCell::from(next)),
241                })
242            } else {
243                perfect_unreachable()
244            }
245        });
246        match state {
247            State::Published { latest } => {
248                // CAS / publishing succeeded
249                self.next = latest;
250                None
251            }
252            State::ToPublish { mut rest, last } => {
253                // CAS failed
254                // we need to split this assignment to prevent rustc E0502
255                let new_next = Arc::clone(&maybe_real_old.next);
256
257                // this publishing failed
258                rest.push(last);
259                self.pending = rest;
260
261                // we discovered a new revision, return that
262                Some(RevisionRef {
263                    // This is safe since the cell cannot be changed once it is set.
264                    // use the next revision
265                    inner: core::mem::replace(&mut self.next, new_next),
266                })
267            }
268        }
269    }
270
271    /// Waits asynchronously for an event to be published on the queue.
272    /// Only returns `None` if no other reference to the queue
273    /// exists anymore, because otherwise nothing could wake this up.
274    /// Tries to publish pending revisions while waiting.
275    pub async fn next_async(&mut self) -> Option<RevisionRef<T>> {
276        let mut listener = None;
277
278        loop {
279            if let ret @ Some(_) = self.next() {
280                // we got something, return
281                // notify another blocked receive operation
282                self.next_ops.notify(1);
283                return ret;
284            } else if Arc::get_mut(&mut self.next_ops).is_some() {
285                // cancel if no one is listening
286                // skip publishing + notifying phase bc no one is listening
287                // we need to re-check to catch a race-condition between
288                // the call to $self.next and the check of $self.next_ops
289                // in between other queue instances may have been destroyed,
290                // but messages are still in the queue.
291                return RevisionRef::new_and_forward(&mut self.next);
292            } else {
293                match listener.take() {
294                    None => {
295                        // Start listening and then try receiving again.
296                        listener = Some(self.next_ops.listen());
297                    }
298                    Some(l) => {
299                        // Wait for a notification.
300                        l.await;
301                    }
302                }
303            }
304        }
305    }
306
307    /// This method enqueues the pending revision for publishing.
308    /// The iterator **must** be "collected"/"polled"
309    /// (calling [`Iterator::next`] until it returns None) to publish them.
310    #[inline(always)]
311    pub fn enqueue(&mut self, pending: T) {
312        self.pending.push(pending);
313    }
314}
315
316// #[cfg(feature = "std")]
317impl<T: std::fmt::Debug> Queue<T> {
318    /// Helper function, prints all unprocessed, newly published revisions
319    #[cold]
320    pub fn print_debug<W: std::io::Write>(
321        &self,
322        mut writer: W,
323        prefix: &str,
324    ) -> std::io::Result<()> {
325        let mut cur = Arc::clone(&self.next);
326        let mut fi = true;
327        let mut tmpstr = String::new();
328        while let Some(x) = RevisionRef::new_and_forward(&mut cur) {
329            if !fi {
330                tmpstr.push(',');
331                tmpstr.push(' ');
332            }
333            tmpstr += &format!("{:?}", &*x);
334            fi = false;
335        }
336        writeln!(
337            writer,
338            "{} [{}] pending = {:?}; next_ops x{}",
339            prefix,
340            tmpstr,
341            &self.pending,
342            Arc::strong_count(&self.next_ops)
343        )?;
344        Ok(())
345    }
346}