use fixedbitset::FixedBitSet;
use futures::Stream;
use slice_dst::SliceWithHeader;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
use try_lock::TryLock;
pub struct Header {
cursor: AtomicUsize,
waker: TryLock<Option<Waker>>,
}
type Inner = SliceWithHeader<Header, AtomicUsize>;
pub struct Sentinel(Arc<Inner>, usize);
impl Sentinel {
pub fn new(inner: &Arc<Inner>, index: usize) -> Self {
Self(Arc::clone(inner), index)
}
fn notify(&self, mut new_cursor: usize) {
loop {
if let Some(mut maybe_waker) = self.0.header.waker.try_lock() {
if let Some(waker) = maybe_waker.take() {
waker.wake()
}
} else {
return;
}
let even_newer_cursor = self.0.header.cursor.load(Ordering::Acquire);
if even_newer_cursor == new_cursor {
return;
}
new_cursor = even_newer_cursor;
}
}
}
impl Drop for Sentinel {
fn drop(&mut self) {
let mut min_cursor = 0;
loop {
let cursor = self.0.header.cursor.load(Ordering::Acquire).max(min_cursor);
match self.0.slice[cursor].compare_exchange_weak(
usize::MAX,
self.1,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
let new_cursor = self
.0
.header
.cursor
.fetch_max(cursor + 1, Ordering::Release)
.max(cursor + 1);
self.notify(new_cursor);
return;
}
Err(current) => {
min_cursor = cursor + if current == usize::MAX { 0 } else { 1 };
}
}
}
}
}
pub struct Builder(Arc<Inner>, FixedBitSet);
pub struct DropStream {
inner: Arc<Inner>,
cursor: usize,
len: usize,
}
impl Builder {
pub fn new(l: usize) -> Self {
Self(
SliceWithHeader::new(
Header {
cursor: AtomicUsize::default(),
waker: TryLock::new(None),
},
std::iter::repeat_n((), l).map(|_| AtomicUsize::new(usize::MAX)),
),
FixedBitSet::with_capacity(l),
)
}
pub fn make_sentinel(&mut self, i: usize) -> Option<Sentinel> {
if self.1.put(i) {
None
} else {
Some(Sentinel::new(&self.0, i))
}
}
pub fn into_stream(self) -> DropStream {
DropStream { inner: self.0, cursor: 0, len: self.1.count_ones(..) }
}
}
impl Stream for DropStream {
type Item = usize;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<usize>> {
if self.len == 0 || self.len == usize::MAX {
self.len = usize::MAX;
return Poll::Ready(None);
}
if self.inner.header.cursor.load(Ordering::Acquire) > self.cursor {
let item = self.inner.slice[self.cursor].load(Ordering::Acquire);
self.cursor += 1;
self.len -= 1;
return Poll::Ready(Some(item));
}
if let Some(mut maybe_waker) = self.inner.header.waker.try_lock() {
let park = maybe_waker
.as_ref()
.map(|w| !w.will_wake(cx.waker()))
.unwrap_or(true);
if park {
let old = std::mem::replace(&mut *maybe_waker, Some(cx.waker().clone()));
if let Some(w) = old {
w.wake();
}
}
}
if self.inner.header.cursor.load(Ordering::Acquire) > self.cursor {
let item = self.inner.slice[self.cursor].load(Ordering::Acquire);
self.cursor += 1;
Poll::Ready(Some(item))
} else {
Poll::Pending
}
}
}
impl futures::stream::FusedStream for DropStream {
fn is_terminated(&self) -> bool {
self.len == usize::MAX
}
}