collab-common 0.0.7

Code shared by collab's client and server
Documentation
use core::pin::Pin;
use core::task::Context;

use futures::task::Poll;
use futures::{Sink, Stream};
use pin_project_lite::pin_project;

pin_project! {
    /// A [`Stream`] adapter that executes a callback with the buffer returned
    /// by the underlying stream.
    pub struct CallbackStream<S, T: ?Sized> {
        #[pin]
        inner: S,
        callback: fn(&T),
    }
}

impl<S, T: ?Sized> CallbackStream<S, T> {
    /// TODO: docs
    #[inline]
    pub fn inner_mut(&mut self) -> &mut S {
        &mut self.inner
    }

    /// Creates a new [`CallbackStream`] wrapping the given stream.
    #[inline]
    pub fn new(stream: S, callback: fn(&T)) -> Self {
        Self { inner: stream, callback }
    }
}

impl<S, T, U, E> Stream for CallbackStream<S, T>
where
    // We have force the `Item` of the underlying stream to be a `Result<T, E>`
    // because this struct usually wraps such streams.
    //
    // Ideally we'd like to have a separate impl for `Stream<Item = U>` where
    // `U: AsRef<T>`, but that's not possible without specialization.
    S: Stream<Item = Result<U, E>>,
    U: AsRef<T>,
    T: ?Sized,
{
    type Item = S::Item;

    #[inline]
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        let this = self.project();

        let out = this.inner.poll_next(cx);

        if let Poll::Ready(Some(Ok(item))) = &out {
            (this.callback)(item.as_ref());
        }

        out
    }
}

pin_project! {
    /// A [`Sink`] adapter that executes a callback with the buffer given to
    /// `start_send` before forwarding it to the underlying sink.
    pub struct CallbackSink<S, T:?Sized> {
        #[pin]
        inner: S,
        callback: fn(&T),
    }
}

impl<S, T: ?Sized> CallbackSink<S, T> {
    /// TODO: docs
    #[inline]
    pub fn inner_mut(&mut self) -> &mut S {
        &mut self.inner
    }

    /// Creates a new [`CallbackSink`] wrapping the given sink.
    #[inline]
    pub fn new(sink: S, callback: fn(&T)) -> Self {
        Self { inner: sink, callback }
    }
}

impl<S, T, U> Sink<U> for CallbackSink<S, T>
where
    S: Sink<U>,
    U: AsRef<T>,
    T: ?Sized,
{
    type Error = S::Error;

    #[inline]
    fn poll_ready(
        self: Pin<&mut Self>,
        ctx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.project().inner.poll_ready(ctx)
    }

    #[inline]
    fn start_send(self: Pin<&mut Self>, item: U) -> Result<(), Self::Error> {
        let this = self.project();
        (this.callback)(item.as_ref());
        this.inner.start_send(item)
    }

    #[inline]
    fn poll_flush(
        self: Pin<&mut Self>,
        ctx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.project().inner.poll_flush(ctx)
    }

    #[inline]
    fn poll_close(
        self: Pin<&mut Self>,
        ctx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.project().inner.poll_close(ctx)
    }
}