communication_layer_request_reply/
tcp.rs1use std::{
2 io::{Read, Write},
3 net::{SocketAddr, TcpListener, TcpStream},
4};
5
6use crate::{ListenConnection, RequestReplyConnection, RequestReplyLayer};
7
8pub type TcpRequestReplyConnection =
9 dyn RequestReplyConnection<RequestData = Vec<u8>, ReplyData = Vec<u8>, Error = std::io::Error>;
10
11pub struct TcpLayer {}
12
13impl TcpLayer {
14 pub fn new() -> Self {
15 Self {}
16 }
17}
18
19impl Default for TcpLayer {
20 fn default() -> Self {
21 Self::new()
22 }
23}
24
25impl RequestReplyLayer for TcpLayer {
26 type Address = SocketAddr;
27 type RequestData = Vec<u8>;
28 type ReplyData = Vec<u8>;
29 type Error = std::io::Error;
30
31 fn listen(
32 &mut self,
33 addr: Self::Address,
34 ) -> Result<
35 Box<
36 dyn Iterator<
37 Item = Result<
38 Box<
39 dyn crate::ListenConnection<
40 RequestData = Self::RequestData,
41 ReplyData = Self::ReplyData,
42 Error = Self::Error,
43 >,
44 >,
45 Self::Error,
46 >,
47 >,
48 >,
49 Self::Error,
50 > {
51 let incoming: Box<dyn Iterator<Item = Result<_, _>>> = Box::new(
52 IntoIncoming {
53 listener: TcpListener::bind(addr)?,
54 }
55 .map(|r| {
56 r.map(|stream| {
57 let connection: Box<
58 dyn ListenConnection<
59 RequestData = Self::RequestData,
60 ReplyData = Self::ReplyData,
61 Error = Self::Error,
62 >,
63 > = Box::new(TcpConnection { stream });
64 connection
65 })
66 }),
67 );
68 Ok(incoming)
69 }
70
71 fn connect(
72 &mut self,
73 addr: Self::Address,
74 ) -> Result<
75 Box<
76 dyn crate::RequestReplyConnection<
77 RequestData = Self::RequestData,
78 ReplyData = Self::ReplyData,
79 Error = Self::Error,
80 >,
81 >,
82 Self::Error,
83 > {
84 TcpStream::connect(addr).map(|s| {
85 let connection: Box<
86 dyn RequestReplyConnection<
87 RequestData = Self::RequestData,
88 ReplyData = Self::ReplyData,
89 Error = Self::Error,
90 >,
91 > = Box::new(TcpConnection { stream: s });
92 connection
93 })
94 }
95}
96
97pub struct TcpConnection {
98 pub stream: TcpStream,
99}
100
101impl ListenConnection for TcpConnection {
102 type RequestData = Vec<u8>;
103 type ReplyData = Vec<u8>;
104 type Error = std::io::Error;
105
106 fn handle_next(
107 &mut self,
108 handler: Box<dyn FnOnce(Self::RequestData) -> Result<Self::ReplyData, Self::Error>>,
109 ) -> Result<(), Self::Error> {
110 let request = self.receive()?;
111 let reply = handler(request)?;
112 self.send(&reply)?;
113 Ok(())
114 }
115}
116
117impl RequestReplyConnection for TcpConnection {
118 type RequestData = Vec<u8>;
119 type ReplyData = Vec<u8>;
120 type Error = std::io::Error;
121
122 fn request(&mut self, request: &Self::RequestData) -> Result<Self::ReplyData, Self::Error> {
123 self.send(request)?;
124 let reply = self.receive()?;
125
126 Ok(reply)
127 }
128}
129
130impl TcpConnection {
131 pub fn send(&mut self, request: &[u8]) -> std::io::Result<()> {
132 let len_raw = (request.len() as u64).to_le_bytes();
133 self.stream.write_all(&len_raw)?;
134 self.stream.write_all(request)?;
135 Ok(())
136 }
137
138 pub fn receive(&mut self) -> std::io::Result<Vec<u8>> {
139 let reply_len = {
140 let mut raw = [0; 8];
141 self.stream.read_exact(&mut raw)?;
142 u64::from_le_bytes(raw) as usize
143 };
144 let mut reply = vec![0; reply_len];
145 self.stream.read_exact(&mut reply)?;
146 Ok(reply)
147 }
148}
149
150pub struct IntoIncoming {
152 listener: TcpListener,
153}
154
155impl Iterator for IntoIncoming {
156 type Item = std::io::Result<TcpStream>;
157 fn next(&mut self) -> Option<std::io::Result<TcpStream>> {
158 Some(self.listener.accept().map(|p| p.0))
159 }
160}
161
162impl std::iter::FusedIterator for IntoIncoming {}