ogurpchik 0.1.0

A transport-agnostic RPC framework for stream and memory-based communication. Built with high-performance primitives to deliver medium-performance results.
pub mod general;
#[cfg(unix)]
pub mod linux;
#[cfg(windows)]
pub mod windows;

use crate::transport::stream::vsock::general::{VListener, VStream};
use crate::transport::stream::{Acceptor, AcceptorBuilder, Connector, Splitable};
use compio::buf::{IoBuf, IoBufMut};
use compio::io::{AsyncRead, AsyncWrite};
use compio::BufResult;
use socket2::SockAddr;
use std::io;

pub struct VsockConnector {
    pub cid: u32,
    pub port: u32,
}

impl VsockConnector {
    pub fn new(cid: u32, port: u32) -> Self {
        Self { cid, port }
    }
}

impl Connector for VsockConnector {
    type Stream = VStream;

    async fn connect(&self) -> io::Result<Self::Stream> {
        VStream::connect(self.cid, self.port)
            .await
            .map_err(|e| e.into())
    }
}

impl Acceptor for VListener {
    type Stream = VStream;
    async fn accept(&self) -> io::Result<(Self::Stream, SockAddr)> {
        Ok(self.accept().await?)
    }
}

pub struct VsockAcceptorBuilder {
    cid: u32,
    pub port: u32,
}

impl VsockAcceptorBuilder {
    pub fn new(cid: u32, port: u32) -> Self {
        Self { port, cid }
    }
}

impl AcceptorBuilder for VsockAcceptorBuilder {
    type Stream = VStream;
    type Acceptor = VListener;

    async fn bind(self) -> io::Result<Self::Acceptor> {
        VListener::bind(self.port).map_err(Into::into)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use compio::io::{AsyncReadExt, AsyncWriteExt};

    const TEST_PORT: u32 = 12345;

    #[compio::test]
    async fn test_stream_full_cycle() {
        let listener = VListener::bind(TEST_PORT).expect("Failed to bind listener");

        let client_task = async {
            let mut client = VStream::connect(1, TEST_PORT)
                .await
                .expect("Client failed to connect");

            let msg = b"hello from client";
            let BufResult(res, _) = client.write_all(msg).await;
            res.expect("Client write failed");

            let buf = vec![0u8; 17];
            let BufResult(res, buf) = client.read_exact(buf).await;
            res.expect("Client read failed");
            assert_eq!(&buf, b"hello from server");
        };

        let server_task = async {
            let (mut server_stream, _) = listener.accept().await.expect("Accept failed");

            let buf = vec![0u8; 17];
            let BufResult(res, buf) = server_stream.read_exact(buf).await;
            res.expect("Server read failed");
            assert_eq!(&buf, b"hello from client");

            let BufResult(res, _) = server_stream.write_all(b"hello from server").await;
            res.expect("Server write failed");
        };

        futures::join!(client_task, server_task);
    }

    #[compio::test]
    async fn test_stream_split() {
        let listener = VListener::bind(TEST_PORT + 1).expect("Bind failed");

        let client_fut = async {
            let stream = VStream::connect(1, TEST_PORT + 1).await.unwrap();
            let (mut reader, mut writer) = stream.split().expect("Split failed");

            writer.write_all(b"ping").await.0.unwrap();
            let buf = vec![0u8; 4];
            let BufResult(res, buf) = reader.read_exact(buf).await;
            res.expect("Split failed");
            assert_eq!(&buf, b"pong");
        };

        let server_fut = async {
            let (mut server, _) = listener.accept().await.unwrap();
            let buf = vec![0u8; 4];
            let BufResult(res, buf) = server.read_exact(buf).await;
            assert_eq!(&buf, b"ping");
            server.write_all(b"pong").await.0.unwrap();
        };

        futures::join!(client_fut, server_fut);
    }
}