slip-codec 0.4.0

Serial Line Internet Protocol (SLIP) encoder/decoder
Documentation
use futures::{ready, SinkExt, StreamExt};
use serialport::SerialPort;
use slip_codec::tokio::SlipCodec;
use std::io::{Read, Write};
use std::task::Context;
use tokio::io::unix::AsyncFd;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::macros::support::{Pin, Poll};

type Result<T> = std::result::Result<T, std::io::Error>;

struct AsyncTTYPort {
    inner: AsyncFd<serialport::TTYPort>,
}

impl AsyncTTYPort {
    pub fn new(mut inner: serialport::TTYPort) -> Result<Self> {
        const ZERO: std::time::Duration = std::time::Duration::from_secs(0);

        inner.set_timeout(ZERO)?;

        Ok(Self {
            inner: AsyncFd::new(inner)?,
        })
    }

    pub fn pair() -> std::io::Result<(AsyncTTYPort, AsyncTTYPort)> {
        let (a, b) = serialport::TTYPort::pair()?;
        let a = AsyncTTYPort::new(a)?;
        let b = AsyncTTYPort::new(b)?;

        Ok((a, b))
    }
}

impl AsyncRead for AsyncTTYPort {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<tokio::io::Result<()>> {
        let mut guard = ready!(self.inner.poll_read_ready_mut(cx))?;

        match guard.try_io(|inner| {
            let read = inner.get_mut().read(buf.initialize_unfilled())?;
            buf.advance(read);
            Ok(())
        }) {
            Ok(value) => Poll::Ready(value),
            Err(_) => Poll::Pending,
        }
    }
}

impl AsyncWrite for AsyncTTYPort {
    fn poll_write(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<tokio::io::Result<usize>> {
        let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?;

        match guard.try_io(|io| io.get_mut().write(buf)) {
            Ok(value) => Poll::Ready(value),
            Err(_) => Poll::Pending,
        }
    }

    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<tokio::io::Result<()>> {
        let mut guard = ready!(self.inner.poll_write_ready_mut(cx))?;

        match guard.try_io(|io| io.get_mut().flush()) {
            Ok(value) => Poll::Ready(value),
            Err(_) => Poll::Pending,
        }
    }

    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<tokio::io::Result<()>> {
        Poll::Ready(Ok(()))
    }
}

async fn run_source(port: AsyncTTYPort) {
    let mut sink = tokio_util::codec::Framed::new(port, SlipCodec::new());

    for message in ["foo", "bar", "baz"].iter() {
        let message = message.to_string().into();

        println!("send {:?}", message);
        sink.send(message).await.unwrap();
    }
}

async fn run_sink(port: AsyncTTYPort) {
    let mut source = tokio_util::codec::Framed::new(port, SlipCodec::new());

    loop {
        if let Some(result)  = source.next().await {
            match result {
                Ok(message) => println!("recv {:?}", message),
                Err(_) => break,
            }
        }
    }
}

#[tokio::main]
pub async fn main() -> Result<()> {
    let (source, sink) = AsyncTTYPort::pair()?;
    let source_handle = tokio::spawn(run_source(source));
    let sink_handle = tokio::spawn(run_sink(sink));

    source_handle.await?;
    sink_handle.await?;

    Ok(())
}