1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
2
3use std::{
13 io::{self, Read, Result, Write},
14 net::{SocketAddr, TcpStream},
15};
16
17use tcp_lib::{read, write};
18use tracing::{debug, instrument};
19
20#[derive(Debug)]
25pub struct Handler {
26 stream: TcpStream,
27}
28
29impl Handler {
30 #[instrument("tcp/std/new", skip_all)]
35 pub fn new(host: impl AsRef<str>, port: u16) -> Result<Self> {
36 let host = host.as_ref();
37 debug!(?host, port, "connecting TCP stream");
38 let stream = TcpStream::connect((host, port))?;
39 debug!("connected");
40 Ok(Self { stream })
41 }
42
43 #[instrument(skip_all)]
49 pub fn read(&mut self, mut flow: impl AsMut<read::State>) -> Result<()> {
50 let state = flow.as_mut();
51 let bytes_count = self.stream.read(state.get_buffer_mut())?;
52 state.set_bytes_count(bytes_count);
53 Ok(())
54 }
55
56 #[instrument(skip_all)]
62 pub fn write(&mut self, mut flow: impl AsMut<write::State>) -> Result<()> {
63 let state = flow.as_mut();
64 let bytes_count = self.stream.write(state.get_buffer())?;
65 state.set_bytes_count(bytes_count);
66 Ok(())
67 }
68}
69
70impl From<TcpStream> for Handler {
71 fn from(stream: TcpStream) -> Self {
72 Self { stream }
73 }
74}
75
76impl TryFrom<SocketAddr> for Handler {
77 type Error = io::Error;
78
79 fn try_from(addr: SocketAddr) -> io::Result<Self> {
80 let host = addr.ip();
81 let port = addr.port();
82 debug!(?host, port, "connecting TCP stream");
83 let stream = TcpStream::connect(addr)?;
84 debug!("connected");
85 Ok(Self { stream })
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use std::{
92 io::{Read, Write},
93 net::{TcpListener, TcpStream},
94 thread,
95 };
96
97 use tcp_lib::{read, write};
98
99 use crate::Handler;
100
101 fn new_tcp_stream_pair() -> (TcpStream, TcpStream) {
102 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
103 let addr = listener.local_addr().unwrap();
104 let accept = thread::spawn(move || listener.accept().unwrap().0);
105 let client = TcpStream::connect(addr).unwrap();
106 let server = accept.join().unwrap();
107 (client, server)
108 }
109
110 #[test]
111 fn read() {
112 let (mut client, server) = new_tcp_stream_pair();
113 let mut handler = Handler::from(server);
114
115 let written_bytes = b"data".to_vec();
116 client.write(&written_bytes).unwrap();
117
118 let mut flow = read::Flow::new();
119
120 let read_bytes: Vec<u8> = loop {
121 match flow.next() {
122 Ok(bytes) => {
123 break bytes.to_vec();
124 }
125 Err(read::Io::Read) => {
126 handler.read(&mut flow).unwrap();
127 }
128 }
129 };
130
131 assert_eq!(written_bytes, read_bytes)
132 }
133
134 #[test]
135 fn read_chunks() {
136 let (mut client, server) = new_tcp_stream_pair();
137 let mut handler = Handler::from(server);
138
139 let written_bytes = b"big data ended by dollar$".to_vec();
140 client.write(&written_bytes).unwrap();
141
142 let mut flow = read::Flow::with_capacity(3);
143 let mut read_bytes = Vec::new();
144
145 loop {
146 let bytes = match flow.next() {
147 Ok(bytes) => bytes.to_vec(),
148 Err(read::Io::Read) => {
149 handler.read(&mut flow).unwrap();
150 continue;
151 }
152 };
153
154 println!("bytes: {read_bytes:?}");
155
156 read_bytes.extend(bytes);
157
158 if let Some(b'$') = read_bytes.last() {
159 break;
160 }
161 }
162
163 assert_eq!(written_bytes, read_bytes);
164 }
165
166 #[test]
167 fn write() {
168 let (mut client, server) = new_tcp_stream_pair();
169 let mut handler = Handler::from(server);
170
171 let mut flow = write::Flow::new(b"data".to_vec());
172
173 let written_bytes: Vec<u8> = loop {
174 match flow.next() {
175 Ok(bytes) => {
176 break bytes.to_vec();
177 }
178 Err(write::Io::Write) => {
179 handler.write(&mut flow).unwrap();
180 }
181 }
182 };
183
184 let mut read_bytes = [0; 4];
185 client.read(&mut read_bytes).unwrap();
186
187 assert_eq!(written_bytes, read_bytes)
188 }
189}