use core::pin::Pin;
use core::task::Context;
use futures::task::Poll;
use futures::{Sink, Stream};
use pin_project_lite::pin_project;
pin_project! {
pub struct CallbackStream<S, T: ?Sized> {
#[pin]
inner: S,
callback: fn(&T),
}
}
impl<S, T: ?Sized> CallbackStream<S, T> {
#[inline]
pub fn inner_mut(&mut self) -> &mut S {
&mut self.inner
}
#[inline]
pub fn new(stream: S, callback: fn(&T)) -> Self {
Self { inner: stream, callback }
}
}
impl<S, T, U, E> Stream for CallbackStream<S, T>
where
S: Stream<Item = Result<U, E>>,
U: AsRef<T>,
T: ?Sized,
{
type Item = S::Item;
#[inline]
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.project();
let out = this.inner.poll_next(cx);
if let Poll::Ready(Some(Ok(item))) = &out {
(this.callback)(item.as_ref());
}
out
}
}
pin_project! {
pub struct CallbackSink<S, T:?Sized> {
#[pin]
inner: S,
callback: fn(&T),
}
}
impl<S, T: ?Sized> CallbackSink<S, T> {
#[inline]
pub fn inner_mut(&mut self) -> &mut S {
&mut self.inner
}
#[inline]
pub fn new(sink: S, callback: fn(&T)) -> Self {
Self { inner: sink, callback }
}
}
impl<S, T, U> Sink<U> for CallbackSink<S, T>
where
S: Sink<U>,
U: AsRef<T>,
T: ?Sized,
{
type Error = S::Error;
#[inline]
fn poll_ready(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_ready(ctx)
}
#[inline]
fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> {
let this = self.project();
(this.callback)(item.as_ref());
this.inner.start_send(item)
}
#[inline]
fn poll_flush(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_flush(ctx)
}
#[inline]
fn poll_close(
self: Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
self.project().inner.poll_close(ctx)
}
}