use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll, Wake},
};
use futures_util::{
Sink, Stream, StreamExt, TryStream, ready,
stream::{Fuse, FusedStream},
task::AtomicWaker,
};
use pin_project::pin_project;
#[derive(Debug, Default)]
struct Wakers {
next: AtomicWaker,
ready: AtomicWaker,
flush: AtomicWaker,
}
impl Wake for Wakers {
fn wake(self: Arc<Self>) {
self.next.wake();
self.ready.wake();
self.flush.wake();
}
}
#[pin_project]
#[derive(Debug)]
pub struct UseLatest<R, Out, S = <R as Stream>::Item> {
#[pin]
incoming: Fuse<R>,
#[pin]
current: Option<S>,
swap: Option<S>,
w: Arc<Wakers>,
buffer: Option<Out>,
}
impl<R: Default + Stream, Out, S> Default for UseLatest<R, Out, S> {
fn default() -> Self {
R::default().into()
}
}
impl<In, Out, E, S: TryStream<Ok = In, Error = E> + Sink<Out, Error = E>, R: Stream<Item = S>>
UseLatest<R, Out, S>
{
fn poll_current(self: Pin<&mut Self>) -> Poll<Result<(), E>> {
let mut this = self.project();
let waker = this.w.clone().into();
let cx = &mut Context::from_waker(&waker);
loop {
match this.swap {
Some(_) => {
if let Some(current) = this.current.as_mut().as_pin_mut() {
ready!(current.poll_close(cx))?;
}
this.current.as_mut().set(this.swap.take());
this.w.wake_by_ref();
}
None => {
if !this.incoming.is_terminated()
&& let Poll::Ready(Some(swap)) = this.incoming.as_mut().poll_next(cx)
{
*this.swap = Some(swap);
this.w.wake_by_ref();
continue;
}
break Poll::Ready(Ok(()));
}
}
}
}
fn poll_buffer(self: Pin<&mut Self>) -> Poll<Result<(), E>> {
let mut this = self.project();
let waker = this.w.clone().into();
let cx = &mut Context::from_waker(&waker);
if this.buffer.is_some() {
if let Some(current) = this.current.as_mut().as_pin_mut() {
ready!(current.poll_ready(cx))?;
} else {
return Poll::Pending;
}
}
if let Some(item) = this.buffer.take()
&& let Some(current) = this.current.as_mut().as_pin_mut()
{
current.start_send(item)?;
}
Poll::Ready(Ok(()))
}
}
impl<In, Out, E, S: TryStream<Ok = In, Error = E> + Sink<Out, Error = E>, R: Stream<Item = S>>
Stream for UseLatest<R, Out, S>
{
type Item = Result<In, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.w.next.register(cx.waker());
ready!(self.as_mut().poll_current())?;
let _ = self.as_mut().poll_buffer()?;
let mut this = self.project();
if let Some(current) = this.current.as_mut().as_pin_mut() {
if let Some(r) = ready!(current.try_poll_next(cx)) {
return Poll::Ready(Some(r));
}
this.current.as_mut().set(None)
}
if this.incoming.is_terminated() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
impl<In, Out, E, S: TryStream<Ok = In, Error = E> + Sink<Out, Error = E>, R: Stream<Item = S>>
FusedStream for UseLatest<R, Out, S>
{
fn is_terminated(&self) -> bool {
self.current.is_none() && self.swap.is_none() && self.incoming.is_terminated()
}
}
impl<In, Out, E, S: TryStream<Ok = In, Error = E> + Sink<Out, Error = E>, R: Stream<Item = S>>
Sink<Out> for UseLatest<R, Out, S>
{
type Error = E;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.w.ready.register(cx.waker());
ready!(self.as_mut().poll_current())?;
self.as_mut().poll_buffer()
}
fn start_send(self: Pin<&mut Self>, item: Out) -> Result<(), Self::Error> {
let this = self.project();
assert!(this.buffer.is_none());
*this.buffer = Some(item);
Ok(())
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.w.flush.register(cx.waker());
ready!(self.as_mut().poll_current())?;
ready!(self.as_mut().poll_buffer())?;
let this = self.project();
match this.current.as_pin_mut() {
Some(current) => current.poll_flush(cx),
None => Poll::Ready(Ok(())),
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.as_mut().poll_current())?;
ready!(self.as_mut().poll_buffer())?;
let mut this = self.project();
if let Some(current) = this.current.as_mut().as_pin_mut() {
ready!(current.poll_close(cx))?;
this.current.as_mut().set(None)
}
if this.incoming.is_terminated() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
}
impl<R: Stream, Out, S> From<R> for UseLatest<R, Out, S> {
fn from(incoming: R) -> Self {
UseLatest {
incoming: incoming.fuse(),
current: None,
swap: None,
w: Default::default(),
buffer: None,
}
}
}
pub trait UseLatestExt<Out>:
Sized + Stream<Item: TryStream<Error = Self::E> + Sink<Out, Error = Self::E>>
{
type E;
#[must_use]
fn use_latest(self) -> UseLatest<Self, Out> {
self.into()
}
}
impl<In, Out, E, S: TryStream<Ok = In, Error = E> + Sink<Out, Error = E>, R: Stream<Item = S>>
UseLatestExt<Out> for R
{
type E = E;
}