1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
use super::codec::LengthBasedFrameCodec;
use async_trait::async_trait;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use rsocket_rust::frame::Frame;
use rsocket_rust::transport::{Connection, Reader, Writer};
use rsocket_rust::{error::RSocketError, Result};
use tokio::net::TcpStream;
use tokio_util::codec::Framed;

#[derive(Debug)]
pub struct TcpConnection {
    stream: TcpStream,
}

struct InnerWriter {
    sink: SplitSink<Framed<TcpStream, LengthBasedFrameCodec>, Frame>,
}

struct InnerReader {
    stream: SplitStream<Framed<TcpStream, LengthBasedFrameCodec>>,
}

#[async_trait]
impl Writer for InnerWriter {
    async fn write(&mut self, frame: Frame) -> Result<()> {
        match self.sink.send(frame).await {
            Ok(()) => Ok(()),
            Err(e) => Err(RSocketError::IO(e).into()),
        }
    }
}

#[async_trait]
impl Reader for InnerReader {
    async fn read(&mut self) -> Option<Result<Frame>> {
        match self.stream.next().await {
            Some(Ok(frame)) => Some(Ok(frame)),
            Some(Err(e)) => Some(Err(RSocketError::IO(e).into())),
            None => None,
        }
    }
}

impl Connection for TcpConnection {
    fn split(
        self,
    ) -> (
        Box<dyn Writer + Send + Unpin>,
        Box<dyn Reader + Send + Unpin>,
    ) {
        let (sink, stream) = Framed::new(self.stream, LengthBasedFrameCodec).split();
        (
            Box::new(InnerWriter { sink }),
            Box::new(InnerReader { stream }),
        )
    }
}

impl From<TcpStream> for TcpConnection {
    fn from(stream: TcpStream) -> TcpConnection {
        TcpConnection { stream }
    }
}