1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
/*!
# Nomenclature
This library generally is designed to handle events. It doesn't "pin" the
user to a single event container, instead, it abstracts away from this and
generally handles so-called revisions, which may contain one event at a time,
or a `Vec<Event>`, the only requirements are that the revisions must
have a [size known at compile time](core::marker::Sized)
(due to limitations of the used backend).
**/

#![forbid(clippy::as_conversions, clippy::cast_ptr_alignment, trivial_casts)]

// once_cell doesn't support no_std in the used flavor.
// #![cfg_attr(not(feature = "std"), no_std)]

extern crate alloc;
extern crate core;

use alloc::{sync::Arc, vec::Vec};
use core::{fmt, marker::Unpin};
use event_listener::Event;
use once_cell::sync::OnceCell;

fn perfect_unreachable() -> ! {
    if core::cfg!(debug_assertions) {
        unreachable!()
    } else {
        unsafe { core::hint::unreachable_unchecked() }
    }
}

type NextRevision<T> = Arc<OnceCell<RevisionNode<T>>>;

#[derive(Clone, Debug)]
struct RevisionNode<T> {
    next: NextRevision<T>,
    data: T,
}

/// A owning reference to a revision.
///
/// Warning: Objects of this type must not be leaked, otherwise all future
/// revisions will be leaked, too, and thus the memory of the queue is never freed.
#[derive(Debug)]
pub struct RevisionRef<T> {
    inner: NextRevision<T>,
}

/// Error indicating a failed [`RevisionRef::try_detach`] call.
#[derive(Clone, Debug)]
pub struct RevisionDetachError;

impl fmt::Display for RevisionDetachError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "failed to detach revision")
    }
}

// #[cfg(feature = "std")]
impl std::error::Error for RevisionDetachError {}

impl<T> Clone for RevisionRef<T> {
    fn clone(&self) -> Self {
        Self {
            inner: Arc::clone(&self.inner),
        }
    }
}

impl<T> core::ops::Deref for RevisionRef<T> {
    type Target = T;

    #[inline]
    fn deref(&self) -> &T {
        // This pointer should never change once RevisionRef is created until
        // it's dropped.
        &unsafe { self.inner.get_unchecked() }.data
    }
}

unsafe impl<T> stable_deref_trait::StableDeref for RevisionRef<T> {}
unsafe impl<T> stable_deref_trait::CloneStableDeref for RevisionRef<T> {}

impl<T> RevisionRef<T> {
    fn new_and_forward(nr: &mut NextRevision<T>) -> Option<Self> {
        match nr.get() {
            Some(_) => {
                let x = Self {
                    inner: Arc::clone(&nr),
                };
                *nr = Arc::clone(&unsafe { nr.get_unchecked() }.next);
                Some(x)
            }
            None => None,
        }
    }

    /// Try to detach this revision from the following.
    /// Only works if this `RevisionRef` is the last reference to this revision.
    /// This is the case if no RevisionRef to a revision with precedes this
    /// revision exist and this is the last ptr to this revision, and all queue
    /// references have already consumed this revision.
    /// Use this method to reduce queue memory usage if you want to store this
    /// object long-term.
    pub fn try_detach<'a>(this: &'a mut Self) -> Result<&'a mut T, RevisionDetachError> {
        // get ownership over the Arc of revision $this.inner
        // (with lifetime = as long as $this.inner exists with the current Arc)
        let mut_this: &mut RevisionNode<T> = Arc::get_mut(&mut this.inner)
            .ok_or(RevisionDetachError)?
            .get_mut()
            .unwrap();
        // no other reference to *us* exists.
        // override our $next ptr, thus decoupling this node from the following
        mut_this.next = Arc::new(OnceCell::default());
        Ok(&mut mut_this.data)
    }

    /// Similiar to [`try_detach`](RevisionRef::try_detach), detach this revision
    /// if possible, but then unwrap the inner value
    pub fn try_into_inner(mut this: Self) -> Result<T, Self> {
        // get ownership over the Arc of revision $this.inner
        let mut mut_this: RevisionNode<T> = match Arc::get_mut(&mut this.inner) {
            Some(x) => x.take().unwrap(),
            None => return Err(this),
        };
        // no other reference to *us* exists.
        // override our $next ptr, thus decoupling this node from the following
        mut_this.next = Arc::new(OnceCell::default());
        Ok(mut_this.data)
    }
}

/// A simple event / revision queue
#[derive(Debug)]
#[must_use = "Queue does nothing unless you call .next() or some variation of it"]
pub struct Queue<T> {
    // the $next field is partially shared, e.g. all queues derived from the same
    // original queue can find the current $next value, but may be a bit behind
    // (e.g. have unconsumed revisions,
    //  which should be iterated to get the current value)
    next: NextRevision<T>,

    // waiting next... calls
    next_ops: Arc<Event>,

    // currently pending revisions
    pub pending: Vec<T>,
}

impl<T> Clone for Queue<T> {
    #[inline]
    fn clone(&self) -> Self {
        Queue {
            next: Arc::clone(&self.next),
            next_ops: Arc::clone(&self.next_ops),
            pending: Default::default(),
        }
    }
}

impl<T> Default for Queue<T> {
    #[inline]
    fn default() -> Self {
        Queue {
            next: Arc::new(Default::default()),
            next_ops: Arc::new(Default::default()),
            pending: Default::default(),
        }
    }
}

impl<T> Drop for Queue<T> {
    fn drop(&mut self) {
        if Arc::strong_count(&self.next_ops) == 2 {
            self.next_ops.notify(1);
        }
    }
}

impl<T: Unpin> Unpin for Queue<T> {}

impl<T> Iterator for Queue<T> {
    type Item = RevisionRef<T>;

    fn next(&mut self) -> Option<RevisionRef<T>> {
        let orig_pending_len = self.pending.len();

        let ret = match self.publish_intern() {
            None => RevisionRef::new_and_forward(&mut self.next),
            x => x,
        };

        // may have published something
        if orig_pending_len != self.pending.len() {
            self.next_ops.notify(usize::MAX);
        }

        ret
    }
}

impl<T> Queue<T> {
    #[inline(always)]
    pub fn new() -> Self {
        Default::default()
    }

    fn publish_intern(&mut self) -> Option<RevisionRef<T>> {
        enum State<T> {
            ToPublish { rest: Vec<T>, last: T },
            Published { latest: NextRevision<T> },
        }

        let last = self.pending.pop()?;

        // : try append to the first 'None' ptr in the 'latest' chain
        // try to append revnode, if CAS succeeds, done, otherwise:
        // return a RevisionRef for the failed CAS ptr, and the revnode;
        // set $latest to the next ptr

        let mut state = State::ToPublish {
            rest: core::mem::take(&mut self.pending),
            last,
        };
        let maybe_real_old = self.next.get_or_init(|| {
            let latest = Arc::new(OnceCell::default());
            if let State::ToPublish { rest, last } = core::mem::replace(
                &mut state,
                State::Published {
                    latest: Arc::clone(&latest),
                },
            ) {
                // unpack the last one, so we get a node not wrapped in a Arc<OnceCell<_>>
                let prela = RevisionNode {
                    data: last,
                    next: latest,
                };
                rest.into_iter().rev().fold(prela, |next, i| RevisionNode {
                    data: i,
                    next: Arc::new(OnceCell::from(next)),
                })
            } else {
                perfect_unreachable()
            }
        });
        match state {
            State::Published { latest } => {
                // CAS / publishing succeeded
                self.next = latest;
                None
            }
            State::ToPublish { mut rest, last } => {
                // CAS failed
                // we need to split this assignment to prevent rustc E0502
                let new_next = Arc::clone(&maybe_real_old.next);

                // this publishing failed
                rest.push(last);
                self.pending = rest;

                // we discovered a new revision, return that
                Some(RevisionRef {
                    // This is safe since the cell cannot be changed once it is set.
                    // use the next revision
                    inner: core::mem::replace(&mut self.next, new_next),
                })
            }
        }
    }

    /// Waits asynchronously for an event to be published on the queue.
    /// Only returns `None` if no other reference to the queue
    /// exists anymore, because otherwise nothing could wake this up.
    /// Tries to publish pending revisions while waiting.
    pub async fn next_async(&mut self) -> Option<RevisionRef<T>> {
        let mut listener = None;

        loop {
            if let ret @ Some(_) = self.next() {
                // we got something, return
                // notify another blocked receive operation
                self.next_ops.notify(1);
                return ret;
            } else if Arc::get_mut(&mut self.next_ops).is_some() {
                // cancel if no one is listening
                // skip publishing + notifying phase bc no one is listening
                // we need to re-check to catch a race-condition between
                // the call to $self.next and the check of $self.next_ops
                // in between other queue instances may have been destroyed,
                // but messages are still in the queue.
                return RevisionRef::new_and_forward(&mut self.next);
            } else {
                match listener.take() {
                    None => {
                        // Start listening and then try receiving again.
                        listener = Some(self.next_ops.listen());
                    }
                    Some(l) => {
                        // Wait for a notification.
                        l.await;
                    }
                }
            }
        }
    }

    /// This method enqueues the pending revision for publishing.
    /// The iterator **must** be "collected"/"polled"
    /// (calling [`Iterator::next`] until it returns None) to publish them.
    #[inline(always)]
    pub fn enqueue(&mut self, pending: T) {
        self.pending.push(pending);
    }
}

// #[cfg(feature = "std")]
impl<T: std::fmt::Debug> Queue<T> {
    /// Helper function, prints all unprocessed, newly published revisions
    #[cold]
    pub fn print_debug<W: std::io::Write>(
        &self,
        mut writer: W,
        prefix: &str,
    ) -> std::io::Result<()> {
        let mut cur = Arc::clone(&self.next);
        let mut fi = true;
        let mut tmpstr = String::new();
        while let Some(x) = RevisionRef::new_and_forward(&mut cur) {
            if !fi {
                tmpstr.push(',');
                tmpstr.push(' ');
            }
            tmpstr += &format!("{:?}", &*x);
            fi = false;
        }
        writeln!(
            writer,
            "{} [{}] pending = {:?}; next_ops x{}",
            prefix,
            tmpstr,
            &self.pending,
            Arc::strong_count(&self.next_ops)
        )?;
        Ok(())
    }
}