quick_file_transfer/
util.rs

1use crate::config::transfer::command::{ServerCommand, ServerResult};
2use crate::config::Config;
3use anyhow::{bail, Result};
4use std::io::{Read, Write};
5use std::net::{TcpListener, TcpStream};
6use std::path::Path;
7use std::time::Duration;
8use std::{fmt, fs, io};
9use tiny_rnd::rnd_u32;
10
11#[derive(Debug, Clone, Copy)]
12pub struct Address<'cfg> {
13    pub ip: &'cfg str,
14    pub port: u16,
15}
16
17impl<'cfg> Address<'cfg> {
18    pub fn new(ip: &'cfg str, port: u16) -> Self {
19        Self { ip, port }
20    }
21}
22
23impl<'cfg> fmt::Display for Address<'cfg> {
24    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25        write!(f, "{}:{}", self.ip, self.port)
26    }
27}
28
29pub fn connect_tcp_stream(addr: Address) -> Result<TcpStream> {
30    let stream = TcpStream::connect(addr.to_string())?;
31    Ok(stream)
32}
33
34pub fn bind_tcp_listener(addr: Address) -> Result<TcpListener> {
35    let listener = TcpListener::bind(addr.to_string())?;
36    Ok(listener)
37}
38
39/// Format a value to a human readable byte magnitude description
40#[must_use]
41pub fn format_data_size(size_bytes: u64) -> String {
42    const KI_B_VAL: u64 = 1024;
43    const KI_B_DIVIDER: f64 = 1024_f64;
44    const MI_B_VAL: u64 = 1024 * KI_B_VAL;
45    const MI_B_DIVIDER: f64 = MI_B_VAL as f64;
46    const GI_B_VAL: u64 = 1024 * MI_B_VAL;
47    const GI_B_DIVIDER: f64 = GI_B_VAL as f64;
48    match size_bytes {
49        0..=KI_B_VAL => {
50            format!("{size_bytes:.2} B")
51        }
52        1025..=MI_B_VAL => {
53            let kib_bytes = size_bytes as f64 / KI_B_DIVIDER;
54            format!("{kib_bytes:.2} KiB")
55        }
56        1_048_577..=GI_B_VAL => {
57            let mib_bytes = size_bytes as f64 / MI_B_DIVIDER;
58            format!("{mib_bytes:.2} MiB")
59        }
60        _ => {
61            let gib_bytes = size_bytes as f64 / GI_B_DIVIDER;
62            format!("{gib_bytes:.2} GiB")
63        }
64    }
65}
66
67pub fn incremental_rw<const BUFSIZE: usize, W, R>(
68    stream_writer: &mut W,
69    reader: &mut R,
70) -> Result<u64>
71where
72    W: io::Write,
73    R: io::Read,
74{
75    let mut buf = [0; BUFSIZE];
76    let mut total_read = 0;
77    loop {
78        let bytes_read = reader.read(&mut buf)?;
79        log::trace!("Read {bytes_read}");
80        if bytes_read == 0 {
81            log::trace!("Breaking out of transfer");
82            break;
83        }
84        total_read += bytes_read;
85
86        let written_bytes = stream_writer.write(&buf[..bytes_read])?;
87        log::trace!("wrote {written_bytes}");
88        debug_assert_eq!(
89            bytes_read, written_bytes,
90            "Mismatch between bytes read/written, read={bytes_read}, written={written_bytes}"
91        );
92    }
93    Ok(total_read as u64)
94}
95
96pub fn create_file_with_len(path: &Path, len: u64) -> Result<()> {
97    let file = fs::OpenOptions::new()
98        .create(true)
99        .truncate(true)
100        .write(true)
101        .open(path)?;
102    file.set_len(len)?;
103    Ok(())
104}
105/// Bind to port 0 on `ip`, which tells the OS to assign any available port, then
106/// retrieve the socket address from the listener.
107pub fn get_free_port(ip: &str) -> Option<u16> {
108    if let Ok(listener) = TcpListener::bind((ip, 0)) {
109        if let Ok(local_addr) = listener.local_addr() {
110            return Some(local_addr.port());
111        }
112    }
113    None
114}
115
116/// see more: <https://www.rfc-editor.org/rfc/rfc6335.html#section-6>
117pub const IANA_RECOMMEND_DYNAMIC_PORT_RANGE_START: u16 = 49152;
118pub const IANA_RECOMMEND_DYNAMIC_PORT_RANGE_END: u16 = 65535;
119
120/// Bind to any available port within the specified range on `ip`,
121/// then retrieve the socket address from the listener.
122///
123///
124/// # Note
125///
126/// Internet Assigned Numbers Authority (IANA) suggests 49152-65535 for dynamic/ephemeral use.
127/// Although note that Linux distros often use 32768-61000 so a conservative/robust range of
128/// 49152-61000 is preferable.
129///
130/// see more: <https://www.rfc-editor.org/rfc/rfc6335.html#section-6>
131pub fn get_free_port_in_range(ip: &str, start_port: u16, end_port: u16) -> Option<u16> {
132    for port in start_port..=end_port {
133        if let Ok(listener) = TcpListener::bind((ip, port)) {
134            if let Ok(local_addr) = listener.local_addr() {
135                return Some(local_addr.port());
136            }
137        }
138    }
139    None
140}
141
142/// Bind to any available port within the specified range on `ip`,
143/// then return the socket
144///
145/// # Note
146///
147/// See [get_free_port_in_range] for notes about port ranges
148pub fn bind_listen_to_free_port_in_range(
149    ip: &str,
150    start_port: u16,
151    end_port: u16,
152) -> Option<TcpListener> {
153    for port in start_port..=end_port {
154        if let Ok(listener) = TcpListener::bind((ip, port)) {
155            return Some(listener);
156        }
157    }
158    None
159}
160
161/// Converts the verbosity from the config back to the command-line arguments that would produce that verbosity
162pub fn verbosity_to_args(cfg: &Config) -> &str {
163    if cfg.quiet {
164        "-q"
165    } else {
166        match cfg.verbose {
167            1 => "-v",
168            2 => "-vv",
169            // Verbose is not set
170            _ => "",
171        }
172    }
173}
174
175/// Do the basic handshake from the serverside to ensure we're talking to a QFT client
176pub fn server_handshake(socket: &mut TcpStream) -> anyhow::Result<()> {
177    let handshake_u32 = rnd_u32(std::process::id() as u64);
178    let expect_handshake = rnd_u32(handshake_u32 as u64);
179
180    if let Err(e) = socket.write_all(&handshake_u32.to_be_bytes()) {
181        log::warn!("{}: {e}, retrying in 100 ms ...", e.kind());
182        std::thread::sleep(Duration::from_millis(100));
183        socket.write_all(&handshake_u32.to_be_bytes())?
184    }
185    let mut handshake_buf: [u8; 4] = [0; 4];
186    if let Err(e) = socket.read_exact(&mut handshake_buf) {
187        log::warn!("{}: {e}, retrying in 100 ms ...", e.kind());
188        std::thread::sleep(Duration::from_millis(100));
189        socket.read_exact(&mut handshake_buf)?;
190    }
191    let handshake: u32 = u32::from_be_bytes(handshake_buf);
192
193    if handshake != expect_handshake {
194        bail!("Received unexpected handshake: {handshake}")
195    } else {
196        log::trace!("QFT handshake OK");
197    }
198    Ok(())
199}
200
201pub fn read_server_cmd(
202    socket: &mut TcpStream,
203    cmd_buf: &mut [u8],
204) -> anyhow::Result<Option<ServerCommand>> {
205    let mut header_buf = [0; ServerCommand::HEADER_SIZE];
206    // Read the header to determine the size of the incoming command/data
207    if let Err(e) = socket.read_exact(&mut header_buf) {
208        log::trace!("{e}");
209        if e.kind() == io::ErrorKind::UnexpectedEof {
210            // Ok but no command indicates the client disconnected
211            return Ok(None);
212        } else {
213            log::warn!("{}: {e}, retrying in 100 ms ...", e.kind());
214            std::thread::sleep(Duration::from_millis(100));
215            socket.read_exact(&mut header_buf)?;
216        }
217    }
218    let inc_cmd_len = ServerCommand::size_from_bytes(header_buf);
219    debug_assert!(inc_cmd_len <= cmd_buf.len());
220
221    // Read the actual command/data based on the size
222    if let Err(e) = socket.read_exact(&mut cmd_buf[..inc_cmd_len]) {
223        log::warn!("{}: {e}, retrying in 100 ms ...", e.kind());
224        std::thread::sleep(Duration::from_millis(100));
225        socket.read_exact(&mut cmd_buf[..inc_cmd_len])?;
226    }
227    let command: ServerCommand = bincode::deserialize(&cmd_buf[..inc_cmd_len])?;
228    Ok(Some(command))
229}
230
231fn read_server_response_header(socket: &mut TcpStream) -> anyhow::Result<usize> {
232    let mut header_buf = [0; ServerResult::HEADER_SIZE];
233    // Read the header to determine the size of the incoming command/data
234    if let Err(e) = socket.read_exact(&mut header_buf) {
235        bail!("{e}");
236    }
237    Ok(ServerResult::size_from_bytes(header_buf))
238}
239
240/// Provide your own buffer to allow for buffer reuse
241pub fn read_server_response_with_buf(
242    socket: &mut TcpStream,
243    resp_buf: &mut [u8],
244) -> anyhow::Result<ServerResult> {
245    let inc_resp_len = read_server_response_header(socket)?;
246    debug_assert!(inc_resp_len <= resp_buf.len());
247
248    // Read the actual command/data based on the size
249    if let Err(e) = socket.read_exact(&mut resp_buf[..inc_resp_len]) {
250        anyhow::bail!("Error reading command into buffer: {e}");
251    }
252    let resp: ServerResult = bincode::deserialize(&resp_buf[..inc_resp_len])?;
253    Ok(resp)
254}
255
256pub fn read_server_response(socket: &mut TcpStream) -> anyhow::Result<ServerResult> {
257    let inc_resp_len = read_server_response_header(socket)?;
258
259    // Candidate for unsafe uninitialized read
260    let mut resp_buf: Vec<u8> = vec![0; inc_resp_len];
261
262    // Read the actual command/data based on the size
263    if let Err(e) = socket.read_exact(&mut resp_buf) {
264        anyhow::bail!("Error reading command into buffer: {e}");
265    }
266    let resp: ServerResult = bincode::deserialize(&resp_buf)?;
267    Ok(resp)
268}
269
270/// This is for generating pseudo-random number for application client/server hand shake.
271///
272/// It is adapted from: <https://docs.rs/rand_xoshiro/latest/src/rand_xoshiro/splitmix64.rs.html>
273/// ... and gutted
274///
275/// There's no strong requirement for this random number other than being fast, and lets not add the rand crate as a dependency just for this...
276pub(crate) mod tiny_rnd {
277
278    /// Get a "random" number from a seed (one shot).
279    ///
280    /// # Note
281    ///
282    /// Adapted (gutted) from: <https://docs.rs/rand_xoshiro/latest/src/rand_xoshiro/splitmix64.rs.html>
283    ///
284    /// Stateless gutted splitmix64 random number generator.
285    ///
286    /// The gutted splitmix algorithm is NOT suitable for cryptographic purposes, but is
287    /// very fast and has a 64 bit state.
288    pub fn rnd_u32(seed: u64) -> u32 {
289        const PHI: u64 = 0x9e3779b97f4a7c15;
290        let mut z = seed.wrapping_add(PHI);
291        // David Stafford's
292        // (http://zimbry.blogspot.com/2011/09/better-bit-mixing-improving-on.html)
293        // "Mix4" variant of the 64-bit finalizer in Austin Appleby's
294        // MurmurHash3 algorithm.
295        z = (z ^ (z >> 33)).wrapping_mul(0x62A9D9ED799705F5);
296        z = (z ^ (z >> 28)).wrapping_mul(0xCB24D0A5C88C35B3);
297        (z >> 32) as u32
298    }
299}