remoc 0.18.3

🦑 Remote multiplexed objects, channels, observable collections and RPC making remote interactions seamless. Provides multiple remote channels and RPC over TCP, TLS or any other transport.
Documentation
use bytes::{Buf, Bytes, BytesMut};
use std::io::{self, BufWriter};

use crate::chmux::DataBuf;

/// Writes to an internal memory buffer with a limited maximum size.
pub struct LimitedBytesWriter {
    limit: usize,
    buf: BytesMut,
    overflown: bool,
}

impl LimitedBytesWriter {
    /// Creates a new limited writer.
    pub fn new(limit: usize) -> Self {
        Self { limit, buf: BytesMut::new(), overflown: false }
    }

    /// Returns the write buffer, if no overflow has occurred.
    /// Otherwise None is returned.
    pub fn into_inner(self) -> Option<BytesMut> {
        if self.overflown { None } else { Some(self.buf) }
    }

    /// True if limit has been reached.
    pub fn overflow(&self) -> bool {
        self.overflown
    }
}

impl io::Write for LimitedBytesWriter {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        if self.buf.len() + buf.len() <= self.limit && !self.overflown {
            self.buf.extend_from_slice(buf);
            Ok(buf.len())
        } else {
            self.overflown = true;
            Err(io::Error::new(io::ErrorKind::BrokenPipe, "limit reached"))
        }
    }

    fn flush(&mut self) -> std::io::Result<()> {
        Ok(())
    }
}

/// Forwards data over a mpsc channel.
///
/// This must not be used in an async thread.
pub struct ChannelBytesWriter {
    tx: tokio::sync::mpsc::Sender<BytesMut>,
    written: usize,
}

impl ChannelBytesWriter {
    /// Creates a new forwarding writer.
    pub fn new(tx: tokio::sync::mpsc::Sender<BytesMut>) -> Self {
        Self { tx, written: 0 }
    }

    /// Written bytes.
    ///
    /// Saturates at usize::MAX;
    pub fn written(&self) -> usize {
        self.written
    }
}

impl io::Write for ChannelBytesWriter {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        match self.tx.blocking_send(buf.into()) {
            Ok(()) => {
                self.written = self.written.saturating_add(buf.len());
                Ok(buf.len())
            }
            Err(_) => Err(io::Error::new(io::ErrorKind::BrokenPipe, "channel closed")),
        }
    }

    fn flush(&mut self) -> io::Result<()> {
        Ok(())
    }
}

pub(crate) enum IoWriter<'a> {
    Limited(&'a mut LimitedBytesWriter),
    Channel(&'a mut BufWriter<ChannelBytesWriter>),
}

impl<'a> io::Write for IoWriter<'a> {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        match self {
            Self::Limited(w) => w.write(buf),
            Self::Channel(w) => w.write(buf),
        }
    }

    fn flush(&mut self) -> io::Result<()> {
        match self {
            Self::Limited(w) => w.flush(),
            Self::Channel(w) => w.flush(),
        }
    }
}

/// Reads data from an mpsc channel.
///
/// This must not be used in an async thread.
pub struct ChannelBytesReader {
    rx: tokio::sync::mpsc::Receiver<Result<Bytes, ()>>,
    buf: Bytes,
    failed: bool,
}

impl ChannelBytesReader {
    /// Creates a new reader.
    pub fn new(rx: tokio::sync::mpsc::Receiver<Result<Bytes, ()>>) -> Self {
        Self { rx, buf: Bytes::new(), failed: false }
    }
}

impl io::Read for ChannelBytesReader {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        while self.buf.is_empty() {
            if self.failed {
                return Err(io::Error::new(io::ErrorKind::BrokenPipe, "channel closed"));
            }

            match self.rx.blocking_recv() {
                Some(Ok(buf)) => self.buf = buf,
                Some(Err(())) => self.failed = true,
                None => return Ok(0),
            }
        }

        let len = buf.len().min(self.buf.len());
        buf[..len].copy_from_slice(&self.buf[..len]);
        self.buf.advance(len);
        Ok(len)
    }
}

pub(crate) enum IoReader<'a, 'b> {
    DataBuf(&'a mut bytes::buf::Reader<&'b mut DataBuf>),
    Channel(&'a mut ChannelBytesReader),
}

impl<'a, 'b> io::Read for IoReader<'a, 'b> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        match self {
            Self::DataBuf(r) => r.read(buf),
            Self::Channel(r) => r.read(buf),
        }
    }
}