tcp_std/
lib.rs

1#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
2
3//! The [TCP flows] project is a set of libraries to manage TCP
4//! streams in a I/O-agnostic way. It is highly recommended that you
5//! read first about the project in order to understand `tcp-std`.
6//!
7//! This library exposes an I/O handler for that project, based on the
8//! Rust standard library (sync).
9//!
10//! [TCP flows]: https://github.com/pimalaya/tcp
11
12use std::{
13    io::{self, Read, Result, Write},
14    net::{SocketAddr, TcpStream},
15};
16
17use tcp_lib::{read, write};
18use tracing::{debug, instrument};
19
20/// The standard, blocking TCP I/O handler.
21///
22/// This handler makes use of the standard, blocking
23/// [`std::net::TcpStream`] to read from and write to TCP streams.
24#[derive(Debug)]
25pub struct Handler {
26    stream: TcpStream,
27}
28
29impl Handler {
30    /// Builds a new handler.
31    ///
32    /// This function does perform I/O, as it connects to the TCP
33    /// stream matching the given hostname and port.
34    #[instrument("tcp/std/new", skip_all)]
35    pub fn new(host: impl AsRef<str>, port: u16) -> Result<Self> {
36        let host = host.as_ref();
37        debug!(?host, port, "connecting TCP stream");
38        let stream = TcpStream::connect((host, port))?;
39        debug!("connected");
40        Ok(Self { stream })
41    }
42
43    /// Processes the [`read::Io::Read`] request.
44    ///
45    /// This function reads synchronously a chunk of bytes from the
46    /// inner TCP stream to its inner state read buffer, then set how
47    /// many bytes have been read.
48    #[instrument(skip_all)]
49    pub fn read(&mut self, mut flow: impl AsMut<read::State>) -> Result<()> {
50        let state = flow.as_mut();
51        let bytes_count = self.stream.read(state.get_buffer_mut())?;
52        state.set_bytes_count(bytes_count);
53        Ok(())
54    }
55
56    /// Processes the [`write::Io::Write`] request.
57    ///
58    /// This function writes synchronously bytes to the inner TCP
59    /// stream from its inner state write buffer, then set how many
60    /// bytes have been written.
61    #[instrument(skip_all)]
62    pub fn write(&mut self, mut flow: impl AsMut<write::State>) -> Result<()> {
63        let state = flow.as_mut();
64        let bytes_count = self.stream.write(state.get_buffer())?;
65        state.set_bytes_count(bytes_count);
66        Ok(())
67    }
68}
69
70impl From<TcpStream> for Handler {
71    fn from(stream: TcpStream) -> Self {
72        Self { stream }
73    }
74}
75
76impl TryFrom<SocketAddr> for Handler {
77    type Error = io::Error;
78
79    fn try_from(addr: SocketAddr) -> io::Result<Self> {
80        let host = addr.ip();
81        let port = addr.port();
82        debug!(?host, port, "connecting TCP stream");
83        let stream = TcpStream::connect(addr)?;
84        debug!("connected");
85        Ok(Self { stream })
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use std::{
92        io::{Read, Write},
93        net::{TcpListener, TcpStream},
94        thread,
95    };
96
97    use tcp_lib::{read, write};
98
99    use crate::Handler;
100
101    fn new_tcp_stream_pair() -> (TcpStream, TcpStream) {
102        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
103        let addr = listener.local_addr().unwrap();
104        let accept = thread::spawn(move || listener.accept().unwrap().0);
105        let client = TcpStream::connect(addr).unwrap();
106        let server = accept.join().unwrap();
107        (client, server)
108    }
109
110    #[test]
111    fn read() {
112        let (mut client, server) = new_tcp_stream_pair();
113        let mut handler = Handler::from(server);
114
115        let written_bytes = b"data".to_vec();
116        client.write(&written_bytes).unwrap();
117
118        let mut flow = read::Flow::new();
119
120        let read_bytes: Vec<u8> = loop {
121            match flow.next() {
122                Ok(bytes) => {
123                    break bytes.to_vec();
124                }
125                Err(read::Io::Read) => {
126                    handler.read(&mut flow).unwrap();
127                }
128            }
129        };
130
131        assert_eq!(written_bytes, read_bytes)
132    }
133
134    #[test]
135    fn read_chunks() {
136        let (mut client, server) = new_tcp_stream_pair();
137        let mut handler = Handler::from(server);
138
139        let written_bytes = b"big data ended by dollar$".to_vec();
140        client.write(&written_bytes).unwrap();
141
142        let mut flow = read::Flow::with_capacity(3);
143        let mut read_bytes = Vec::new();
144
145        loop {
146            let bytes = match flow.next() {
147                Ok(bytes) => bytes.to_vec(),
148                Err(read::Io::Read) => {
149                    handler.read(&mut flow).unwrap();
150                    continue;
151                }
152            };
153
154            println!("bytes: {read_bytes:?}");
155
156            read_bytes.extend(bytes);
157
158            if let Some(b'$') = read_bytes.last() {
159                break;
160            }
161        }
162
163        assert_eq!(written_bytes, read_bytes);
164    }
165
166    #[test]
167    fn write() {
168        let (mut client, server) = new_tcp_stream_pair();
169        let mut handler = Handler::from(server);
170
171        let mut flow = write::Flow::new(b"data".to_vec());
172
173        let written_bytes: Vec<u8> = loop {
174            match flow.next() {
175                Ok(bytes) => {
176                    break bytes.to_vec();
177                }
178                Err(write::Io::Write) => {
179                    handler.write(&mut flow).unwrap();
180                }
181            }
182        };
183
184        let mut read_bytes = [0; 4];
185        client.read(&mut read_bytes).unwrap();
186
187        assert_eq!(written_bytes, read_bytes)
188    }
189}