communication_layer_request_reply/
tcp.rs

1use std::{
2    io::{Read, Write},
3    net::{SocketAddr, TcpListener, TcpStream},
4};
5
6use crate::{ListenConnection, RequestReplyConnection, RequestReplyLayer};
7
8pub type TcpRequestReplyConnection =
9    dyn RequestReplyConnection<RequestData = Vec<u8>, ReplyData = Vec<u8>, Error = std::io::Error>;
10
11pub struct TcpLayer {}
12
13impl TcpLayer {
14    pub fn new() -> Self {
15        Self {}
16    }
17}
18
19impl Default for TcpLayer {
20    fn default() -> Self {
21        Self::new()
22    }
23}
24
25impl RequestReplyLayer for TcpLayer {
26    type Address = SocketAddr;
27    type RequestData = Vec<u8>;
28    type ReplyData = Vec<u8>;
29    type Error = std::io::Error;
30
31    fn listen(
32        &mut self,
33        addr: Self::Address,
34    ) -> Result<
35        Box<
36            dyn Iterator<
37                Item = Result<
38                    Box<
39                        dyn crate::ListenConnection<
40                            RequestData = Self::RequestData,
41                            ReplyData = Self::ReplyData,
42                            Error = Self::Error,
43                        >,
44                    >,
45                    Self::Error,
46                >,
47            >,
48        >,
49        Self::Error,
50    > {
51        let incoming: Box<dyn Iterator<Item = Result<_, _>>> = Box::new(
52            IntoIncoming {
53                listener: TcpListener::bind(addr)?,
54            }
55            .map(|r| {
56                r.map(|stream| {
57                    let connection: Box<
58                        dyn ListenConnection<
59                            RequestData = Self::RequestData,
60                            ReplyData = Self::ReplyData,
61                            Error = Self::Error,
62                        >,
63                    > = Box::new(TcpConnection { stream });
64                    connection
65                })
66            }),
67        );
68        Ok(incoming)
69    }
70
71    fn connect(
72        &mut self,
73        addr: Self::Address,
74    ) -> Result<
75        Box<
76            dyn crate::RequestReplyConnection<
77                RequestData = Self::RequestData,
78                ReplyData = Self::ReplyData,
79                Error = Self::Error,
80            >,
81        >,
82        Self::Error,
83    > {
84        TcpStream::connect(addr).map(|s| {
85            let connection: Box<
86                dyn RequestReplyConnection<
87                    RequestData = Self::RequestData,
88                    ReplyData = Self::ReplyData,
89                    Error = Self::Error,
90                >,
91            > = Box::new(TcpConnection { stream: s });
92            connection
93        })
94    }
95}
96
97pub struct TcpConnection {
98    pub stream: TcpStream,
99}
100
101impl ListenConnection for TcpConnection {
102    type RequestData = Vec<u8>;
103    type ReplyData = Vec<u8>;
104    type Error = std::io::Error;
105
106    fn handle_next(
107        &mut self,
108        handler: Box<dyn FnOnce(Self::RequestData) -> Result<Self::ReplyData, Self::Error>>,
109    ) -> Result<(), Self::Error> {
110        let request = self.receive()?;
111        let reply = handler(request)?;
112        self.send(&reply)?;
113        Ok(())
114    }
115}
116
117impl RequestReplyConnection for TcpConnection {
118    type RequestData = Vec<u8>;
119    type ReplyData = Vec<u8>;
120    type Error = std::io::Error;
121
122    fn request(&mut self, request: &Self::RequestData) -> Result<Self::ReplyData, Self::Error> {
123        self.send(request)?;
124        let reply = self.receive()?;
125
126        Ok(reply)
127    }
128}
129
130impl TcpConnection {
131    pub fn send(&mut self, request: &[u8]) -> std::io::Result<()> {
132        let len_raw = (request.len() as u64).to_le_bytes();
133        self.stream.write_all(&len_raw)?;
134        self.stream.write_all(request)?;
135        Ok(())
136    }
137
138    pub fn receive(&mut self) -> std::io::Result<Vec<u8>> {
139        let reply_len = {
140            let mut raw = [0; 8];
141            self.stream.read_exact(&mut raw)?;
142            u64::from_le_bytes(raw) as usize
143        };
144        let mut reply = vec![0; reply_len];
145        self.stream.read_exact(&mut reply)?;
146        Ok(reply)
147    }
148}
149
150// taken from `std::net::tcp` module (still unstable)
151pub struct IntoIncoming {
152    listener: TcpListener,
153}
154
155impl Iterator for IntoIncoming {
156    type Item = std::io::Result<TcpStream>;
157    fn next(&mut self) -> Option<std::io::Result<TcpStream>> {
158        Some(self.listener.accept().map(|p| p.0))
159    }
160}
161
162impl std::iter::FusedIterator for IntoIncoming {}