ogurpchik 0.1.0

A transport-agnostic RPC framework for stream and memory-based communication. Built with high-performance primitives to deliver medium-performance results.
use crate::align_buffer::AlignedBuffer;
use crate::transport::raw::peer::{Peer, PeerConfig};
use crate::transport::raw::{
    RawTransport, SendHandle, TransportAcceptor, TransportBuilder, TransportConnector,
};
use crate::transport::stream::{Acceptor, AcceptorBuilder, AsyncStream, Connector};
use std::io;

pub struct StreamTransport<S: AsyncStream> {
    pub stream: S,
    pub config: PeerConfig,
}

impl<S: AsyncStream> RawTransport for StreamTransport<S> {
    fn decompose(self) -> anyhow::Result<(SendHandle, flume::Receiver<AlignedBuffer>)> {
        Peer::new(self.stream, self.config).map_err(Into::into)
    }
}

pub struct GenericStreamAcceptor<A: Acceptor> {
    inner: A,
    config: PeerConfig,
}

impl<A: Acceptor> TransportAcceptor for GenericStreamAcceptor<A> {
    type Transport = StreamTransport<A::Stream>;

    async fn accept(&self) -> io::Result<Self::Transport> {
        let (stream, _) = self.inner.accept().await?;
        let transport = StreamTransport {
            stream,
            config: self.config.clone(),
        };
        Ok(transport)
    }
}

pub struct GenericStreamBuilder<B: AcceptorBuilder> {
    inner_builder: B,
    config: PeerConfig,
}

impl<B: AcceptorBuilder> GenericStreamBuilder<B> {
    pub fn new(inner_builder: B, config: PeerConfig) -> Self {
        Self {
            inner_builder,
            config,
        }
    }
}

impl<B: AcceptorBuilder> TransportBuilder for GenericStreamBuilder<B> {
    type Transport = StreamTransport<B::Stream>;
    type Acceptor = GenericStreamAcceptor<B::Acceptor>;

    async fn bind(self) -> io::Result<Self::Acceptor> {
        let acceptor = self.inner_builder.bind().await?;
        Ok(GenericStreamAcceptor {
            inner: acceptor,
            config: self.config,
        })
    }
}

pub struct GenericStreamConnector<C: Connector> {
    inner: C,
    config: PeerConfig,
}

impl<C: Connector> GenericStreamConnector<C> {
    pub fn new(inner: C, config: PeerConfig) -> Self {
        Self { inner, config }
    }
}

impl<C: Connector> TransportConnector for GenericStreamConnector<C> {
    type Transport = StreamTransport<C::Stream>;

    async fn connect(&self) -> anyhow::Result<Self::Transport> {
        let stream = self.inner.connect().await?;
        Ok(StreamTransport {
            stream,
            config: self.config.clone(),
        })
    }
}