jetstream_iroh 9.2.0

Jetstream is a RPC framework for Rust, based on the 9P protocol and QUIC.
Documentation
use std::{
    io,
    pin::Pin,
    task::{Context, Poll},
};

use futures::{Sink, SinkExt, Stream, StreamExt};
use iroh::endpoint::{RecvStream, SendStream};
use jetstream_rpc::{client::ClientCodec, Protocol};
use tokio_util::codec::{FramedRead, FramedWrite};

pub struct IrohTransport<P: Protocol> {
    send_stream: FramedWrite<SendStream, ClientCodec<P>>,
    recv_stream: FramedRead<RecvStream, ClientCodec<P>>,
}

impl<P: Protocol> From<(SendStream, RecvStream)> for IrohTransport<P> {
    fn from(value: (SendStream, RecvStream)) -> Self {
        let (send_stream, recv_stream) = value;
        let send_stream = FramedWrite::new(send_stream, ClientCodec::default());
        let recv_stream = FramedRead::new(recv_stream, ClientCodec::default());
        Self {
            send_stream,
            recv_stream,
        }
    }
}

impl<P: Protocol> Sink<jetstream_rpc::Frame<P::Request>> for IrohTransport<P>
where
    Self: Unpin,
{
    type Error = io::Error;

    fn poll_ready(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.get_mut().send_stream.poll_ready_unpin(cx)
    }

    fn start_send(
        self: Pin<&mut Self>,
        item: jetstream_rpc::Frame<P::Request>,
    ) -> Result<(), Self::Error> {
        self.get_mut().send_stream.start_send_unpin(item)
    }

    fn poll_flush(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.get_mut().send_stream.poll_flush_unpin(cx)
    }

    fn poll_close(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Result<(), Self::Error>> {
        self.get_mut().send_stream.poll_close_unpin(cx)
    }
}

impl<P: Protocol> Stream for IrohTransport<P>
where
    Self: Unpin,
{
    type Item = Result<jetstream_rpc::Frame<P::Response>, io::Error>;

    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        self.get_mut().recv_stream.poll_next_unpin(cx)
    }
}