collab-common 0.0.7

Code shared by collab's client and server
Documentation
use core::pin::Pin;
use core::task::Context;

use async_compat::Compat;
use futures::task::Poll;
use futures::{io, AsyncRead, AsyncWrite, Sink, Stream};
use pin_project_lite::pin_project;
use tokio_util::bytes::{Bytes, BytesMut};
use tokio_util::codec::length_delimited::Builder;
use tokio_util::codec::{self, LengthDelimitedCodec};

pin_project! {
    /// TODO: docs
    pub struct FramedRead<R> {
        #[pin]
        inner: codec::FramedRead<Compat<R>, LengthDelimitedCodec>,
    }
}

impl<R: AsyncRead> FramedRead<R> {
    /// TODO: docs
    #[inline]
    pub fn new(reader: R) -> Self {
        Self { inner: length_delimited_codec().new_read(Compat::new(reader)) }
    }
}

impl<R: AsyncRead> Stream for FramedRead<R> {
    type Item = io::Result<BytesMut>;

    #[inline]
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        self.project().inner.poll_next(cx)
    }
}

pin_project! {
    /// TODO: docs
    pub struct FramedWrite<W> {
        #[pin]
        inner: codec::FramedWrite<Compat<W>, LengthDelimitedCodec>,
    }
}

impl<W: AsyncWrite> FramedWrite<W> {
    /// TODO: docs
    #[inline]
    pub fn new(writer: W) -> Self {
        Self { inner: length_delimited_codec().new_write(Compat::new(writer)) }
    }
}

impl<W: AsyncWrite> Sink<Vec<u8>> for FramedWrite<W> {
    type Error = io::Error;

    #[inline]
    fn poll_ready(
        self: Pin<&mut Self>,
        ctx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.project().inner.poll_ready(ctx)
    }

    #[inline]
    fn start_send(
        self: Pin<&mut Self>,
        item: Vec<u8>,
    ) -> Result<(), Self::Error> {
        self.project().inner.start_send(Bytes::from(item))
    }

    #[inline]
    fn poll_flush(
        self: Pin<&mut Self>,
        ctx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.project().inner.poll_flush(ctx)
    }

    #[inline]
    fn poll_close(
        self: Pin<&mut Self>,
        ctx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.project().inner.poll_close(ctx)
    }
}

#[cfg(any(target_pointer_width = "64", target_pointer_width = "128"))]
#[inline]
fn length_delimited_codec() -> Builder {
    let mut builder = LengthDelimitedCodec::builder();

    builder
        .length_field_type::<u64>()
        // The `as usize` cast is ok because this doesn't compile on 32-bit
        // platforms.
        .max_frame_length(u64::MAX as usize);

    builder
}