#![forbid(clippy::as_conversions, clippy::cast_ptr_alignment, trivial_casts)]
#![cfg_attr(not(feature = "std"), no_std)]
extern crate alloc;
extern crate core;
use alloc::{collections::VecDeque, sync::Arc};
use core::{fmt, marker::Unpin};
use event_listener::Event;
use once_cell::sync::OnceCell;
type NextRevision<T> = Arc<OnceCell<RevisionNode<T>>>;
#[derive(Clone, Debug)]
struct RevisionNode<T> {
next: NextRevision<T>,
data: T,
}
#[derive(Debug)]
pub struct RevisionRef<T> {
inner: NextRevision<T>,
}
#[derive(Clone, Debug)]
pub struct RevisionDetachError;
impl fmt::Display for RevisionDetachError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "failed to detach revision")
}
}
#[cfg(feature = "std")]
impl std::error::Error for RevisionDetachError {}
impl<T> Clone for RevisionRef<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T> core::ops::Deref for RevisionRef<T> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
&unsafe { self.inner.get_unchecked() }.data
}
}
unsafe impl<T> stable_deref_trait::StableDeref for RevisionRef<T> {}
unsafe impl<T> stable_deref_trait::CloneStableDeref for RevisionRef<T> {}
impl<T> RevisionRef<T> {
fn new_and_forward(nr: &mut NextRevision<T>) -> Option<Self> {
match nr.get() {
Some(_) => {
let x = Self {
inner: Arc::clone(&nr),
};
*nr = Arc::clone(&unsafe { nr.get_unchecked() }.next);
Some(x)
}
None => None,
}
}
pub fn try_detach<'a>(this: &'a mut Self) -> Result<&'a mut T, RevisionDetachError> {
let mut_this: &mut RevisionNode<T> = Arc::get_mut(&mut this.inner)
.ok_or(RevisionDetachError)?
.get_mut()
.unwrap();
mut_this.next = Arc::new(OnceCell::default());
Ok(&mut mut_this.data)
}
pub fn try_into_inner(mut this: Self) -> Result<T, Self> {
let mut mut_this: RevisionNode<T> = match Arc::get_mut(&mut this.inner) {
Some(x) => x.take().unwrap(),
None => return Err(this),
};
mut_this.next = Arc::new(OnceCell::default());
Ok(mut_this.data)
}
}
#[derive(Debug)]
#[must_use = "Queue does nothing unless you call .next() or some variation of it"]
pub struct Queue<T> {
next: NextRevision<T>,
next_ops: Arc<Event>,
pub pending: VecDeque<T>,
}
impl<T> Clone for Queue<T> {
#[inline]
fn clone(&self) -> Self {
Queue {
next: Arc::clone(&self.next),
next_ops: Arc::clone(&self.next_ops),
pending: Default::default(),
}
}
}
impl<T> Default for Queue<T> {
#[inline]
fn default() -> Self {
Queue {
next: Arc::new(Default::default()),
next_ops: Arc::new(Default::default()),
pending: Default::default(),
}
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
if Arc::strong_count(&self.next_ops) == 2 {
self.next_ops.notify(1);
}
}
}
impl<T: Unpin> Unpin for Queue<T> {}
impl<T> Iterator for Queue<T> {
type Item = RevisionRef<T>;
fn next(&mut self) -> Option<RevisionRef<T>> {
let orig_pending_len = self.pending.len();
let ret = match self.publish_intern() {
None => RevisionRef::new_and_forward(&mut self.next),
x => x,
};
if orig_pending_len != self.pending.len() {
self.next_ops.notify(usize::MAX);
}
ret
}
}
impl<T> Queue<T> {
#[inline(always)]
pub fn new() -> Self {
Default::default()
}
fn publish_intern(&mut self) -> Option<RevisionRef<T>> {
while let Some(data) = self.pending.pop_front() {
let latest = Arc::new(OnceCell::default());
let mut revnode = Some(RevisionNode {
data,
next: Arc::clone(&latest),
});
let maybe_real_old = self.next.get_or_init(|| revnode.take().unwrap());
match revnode {
None => {
self.next = latest;
}
Some(new) => {
let real_old = maybe_real_old;
let new_next = Arc::clone(&real_old.next);
self.pending.push_front(new.data);
return Some(RevisionRef {
inner: core::mem::replace(&mut self.next, new_next),
});
}
}
}
None
}
pub async fn next_async(&mut self) -> Option<RevisionRef<T>> {
loop {
let listener = self.next_ops.listen();
if let ret @ Some(_) = self.next() {
return ret;
} else if Arc::get_mut(&mut self.next_ops).is_some() {
return RevisionRef::new_and_forward(&mut self.next);
} else {
listener.await;
}
}
}
#[inline(always)]
pub fn enqueue(&mut self, pending: T) {
self.pending.push_back(pending);
}
}
#[cfg(feature = "std")]
impl<T: std::fmt::Debug> Queue<T> {
#[cold]
pub fn print_debug<W: std::io::Write>(
&self,
mut writer: W,
prefix: &str,
) -> std::io::Result<()> {
#[cold]
let mut cur = Arc::clone(&self.next);
let mut fi = true;
let mut tmpstr = String::new();
while let Some(x) = RevisionRef::new_and_forward(&mut cur) {
if !fi {
tmpstr.push(',');
tmpstr.push(' ');
}
tmpstr += &format!("{:?}", &*x);
fi = false;
}
writeln!(
writer,
"{} [{}] pending = {:?}; next_ops x{}",
prefix,
tmpstr,
&self.pending,
Arc::strong_count(&self.next_ops)
)?;
Ok(())
}
}