#![forbid(clippy::as_conversions, clippy::cast_ptr_alignment, trivial_casts)]
extern crate alloc;
extern crate core;
use alloc::{sync::Arc, vec::Vec};
use core::{fmt, marker::Unpin};
use event_listener::Event;
use once_cell::sync::OnceCell;
fn perfect_unreachable() -> ! {
if core::cfg!(debug_assertions) {
unreachable!()
} else {
unsafe { core::hint::unreachable_unchecked() }
}
}
type NextRevision<T> = Arc<OnceCell<RevisionNode<T>>>;
#[derive(Clone, Debug)]
struct RevisionNode<T> {
next: NextRevision<T>,
data: T,
}
#[derive(Debug)]
pub struct RevisionRef<T> {
inner: NextRevision<T>,
}
#[derive(Clone, Debug)]
pub struct RevisionDetachError;
impl fmt::Display for RevisionDetachError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "failed to detach revision")
}
}
impl std::error::Error for RevisionDetachError {}
impl<T> Clone for RevisionRef<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T> core::ops::Deref for RevisionRef<T> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
&unsafe { self.inner.get_unchecked() }.data
}
}
unsafe impl<T> stable_deref_trait::StableDeref for RevisionRef<T> {}
unsafe impl<T> stable_deref_trait::CloneStableDeref for RevisionRef<T> {}
impl<T> RevisionRef<T> {
fn new_and_forward(nr: &mut NextRevision<T>) -> Option<Self> {
match nr.get() {
Some(_) => {
let x = Self {
inner: Arc::clone(&nr),
};
*nr = Arc::clone(&unsafe { nr.get_unchecked() }.next);
Some(x)
}
None => None,
}
}
pub fn try_detach<'a>(this: &'a mut Self) -> Result<&'a mut T, RevisionDetachError> {
let mut_this: &mut RevisionNode<T> = Arc::get_mut(&mut this.inner)
.ok_or(RevisionDetachError)?
.get_mut()
.unwrap();
mut_this.next = Arc::new(OnceCell::default());
Ok(&mut mut_this.data)
}
pub fn try_into_inner(mut this: Self) -> Result<T, Self> {
let mut mut_this: RevisionNode<T> = match Arc::get_mut(&mut this.inner) {
Some(x) => x.take().unwrap(),
None => return Err(this),
};
mut_this.next = Arc::new(OnceCell::default());
Ok(mut_this.data)
}
}
#[derive(Debug)]
#[must_use = "Queue does nothing unless you call .next() or some variation of it"]
pub struct Queue<T> {
next: NextRevision<T>,
next_ops: Arc<Event>,
pub pending: Vec<T>,
}
impl<T> Clone for Queue<T> {
#[inline]
fn clone(&self) -> Self {
Queue {
next: Arc::clone(&self.next),
next_ops: Arc::clone(&self.next_ops),
pending: Default::default(),
}
}
}
impl<T> Default for Queue<T> {
#[inline]
fn default() -> Self {
Queue {
next: Arc::new(Default::default()),
next_ops: Arc::new(Default::default()),
pending: Default::default(),
}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
if Arc::strong_count(&self.next_ops) == 2 {
self.next_ops.notify(1);
}
}
}
impl<T: Unpin> Unpin for Queue<T> {}
impl<T> Iterator for Queue<T> {
type Item = RevisionRef<T>;
fn next(&mut self) -> Option<RevisionRef<T>> {
let orig_pending_len = self.pending.len();
let ret = match self.publish_intern() {
None => RevisionRef::new_and_forward(&mut self.next),
x => x,
};
if orig_pending_len != self.pending.len() {
self.next_ops.notify(usize::MAX);
}
ret
}
}
impl<T> Queue<T> {
#[inline(always)]
pub fn new() -> Self {
Default::default()
}
fn publish_intern(&mut self) -> Option<RevisionRef<T>> {
enum State<T> {
ToPublish { rest: Vec<T>, last: T },
Published { latest: NextRevision<T> },
}
let last = self.pending.pop()?;
let mut state = State::ToPublish {
rest: core::mem::take(&mut self.pending),
last,
};
let maybe_real_old = self.next.get_or_init(|| {
let latest = Arc::new(OnceCell::default());
if let State::ToPublish { rest, last } = core::mem::replace(
&mut state,
State::Published {
latest: Arc::clone(&latest),
},
) {
let prela = RevisionNode {
data: last,
next: latest,
};
rest.into_iter().rev().fold(prela, |next, i| RevisionNode {
data: i,
next: Arc::new(OnceCell::from(next)),
})
} else {
perfect_unreachable()
}
});
match state {
State::Published { latest } => {
self.next = latest;
None
}
State::ToPublish { mut rest, last } => {
let new_next = Arc::clone(&maybe_real_old.next);
rest.push(last);
self.pending = rest;
Some(RevisionRef {
inner: core::mem::replace(&mut self.next, new_next),
})
}
}
}
pub async fn next_async(&mut self) -> Option<RevisionRef<T>> {
let mut listener = None;
loop {
if let ret @ Some(_) = self.next() {
self.next_ops.notify(1);
return ret;
} else if Arc::get_mut(&mut self.next_ops).is_some() {
return RevisionRef::new_and_forward(&mut self.next);
} else {
match listener.take() {
None => {
listener = Some(self.next_ops.listen());
}
Some(l) => {
l.await;
}
}
}
}
}
#[inline(always)]
pub fn enqueue(&mut self, pending: T) {
self.pending.push(pending);
}
}
impl<T: std::fmt::Debug> Queue<T> {
#[cold]
pub fn print_debug<W: std::io::Write>(
&self,
mut writer: W,
prefix: &str,
) -> std::io::Result<()> {
let mut cur = Arc::clone(&self.next);
let mut fi = true;
let mut tmpstr = String::new();
while let Some(x) = RevisionRef::new_and_forward(&mut cur) {
if !fi {
tmpstr.push(',');
tmpstr.push(' ');
}
tmpstr += &format!("{:?}", &*x);
fi = false;
}
writeln!(
writer,
"{} [{}] pending = {:?}; next_ops x{}",
prefix,
tmpstr,
&self.pending,
Arc::strong_count(&self.next_ops)
)?;
Ok(())
}
}