quick_file_transfer/
util.rs1use crate::config::transfer::command::{ServerCommand, ServerResult};
2use crate::config::Config;
3use anyhow::{bail, Result};
4use std::io::{Read, Write};
5use std::net::{TcpListener, TcpStream};
6use std::path::Path;
7use std::time::Duration;
8use std::{fmt, fs, io};
9use tiny_rnd::rnd_u32;
10
11#[derive(Debug, Clone, Copy)]
12pub struct Address<'cfg> {
13 pub ip: &'cfg str,
14 pub port: u16,
15}
16
17impl<'cfg> Address<'cfg> {
18 pub fn new(ip: &'cfg str, port: u16) -> Self {
19 Self { ip, port }
20 }
21}
22
23impl<'cfg> fmt::Display for Address<'cfg> {
24 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25 write!(f, "{}:{}", self.ip, self.port)
26 }
27}
28
29pub fn connect_tcp_stream(addr: Address) -> Result<TcpStream> {
30 let stream = TcpStream::connect(addr.to_string())?;
31 Ok(stream)
32}
33
34pub fn bind_tcp_listener(addr: Address) -> Result<TcpListener> {
35 let listener = TcpListener::bind(addr.to_string())?;
36 Ok(listener)
37}
38
39#[must_use]
41pub fn format_data_size(size_bytes: u64) -> String {
42 const KI_B_VAL: u64 = 1024;
43 const KI_B_DIVIDER: f64 = 1024_f64;
44 const MI_B_VAL: u64 = 1024 * KI_B_VAL;
45 const MI_B_DIVIDER: f64 = MI_B_VAL as f64;
46 const GI_B_VAL: u64 = 1024 * MI_B_VAL;
47 const GI_B_DIVIDER: f64 = GI_B_VAL as f64;
48 match size_bytes {
49 0..=KI_B_VAL => {
50 format!("{size_bytes:.2} B")
51 }
52 1025..=MI_B_VAL => {
53 let kib_bytes = size_bytes as f64 / KI_B_DIVIDER;
54 format!("{kib_bytes:.2} KiB")
55 }
56 1_048_577..=GI_B_VAL => {
57 let mib_bytes = size_bytes as f64 / MI_B_DIVIDER;
58 format!("{mib_bytes:.2} MiB")
59 }
60 _ => {
61 let gib_bytes = size_bytes as f64 / GI_B_DIVIDER;
62 format!("{gib_bytes:.2} GiB")
63 }
64 }
65}
66
67pub fn incremental_rw<const BUFSIZE: usize, W, R>(
68 stream_writer: &mut W,
69 reader: &mut R,
70) -> Result<u64>
71where
72 W: io::Write,
73 R: io::Read,
74{
75 let mut buf = [0; BUFSIZE];
76 let mut total_read = 0;
77 loop {
78 let bytes_read = reader.read(&mut buf)?;
79 log::trace!("Read {bytes_read}");
80 if bytes_read == 0 {
81 log::trace!("Breaking out of transfer");
82 break;
83 }
84 total_read += bytes_read;
85
86 let written_bytes = stream_writer.write(&buf[..bytes_read])?;
87 log::trace!("wrote {written_bytes}");
88 debug_assert_eq!(
89 bytes_read, written_bytes,
90 "Mismatch between bytes read/written, read={bytes_read}, written={written_bytes}"
91 );
92 }
93 Ok(total_read as u64)
94}
95
96pub fn create_file_with_len(path: &Path, len: u64) -> Result<()> {
97 let file = fs::OpenOptions::new()
98 .create(true)
99 .truncate(true)
100 .write(true)
101 .open(path)?;
102 file.set_len(len)?;
103 Ok(())
104}
105pub fn get_free_port(ip: &str) -> Option<u16> {
108 if let Ok(listener) = TcpListener::bind((ip, 0)) {
109 if let Ok(local_addr) = listener.local_addr() {
110 return Some(local_addr.port());
111 }
112 }
113 None
114}
115
116pub const IANA_RECOMMEND_DYNAMIC_PORT_RANGE_START: u16 = 49152;
118pub const IANA_RECOMMEND_DYNAMIC_PORT_RANGE_END: u16 = 65535;
119
120pub fn get_free_port_in_range(ip: &str, start_port: u16, end_port: u16) -> Option<u16> {
132 for port in start_port..=end_port {
133 if let Ok(listener) = TcpListener::bind((ip, port)) {
134 if let Ok(local_addr) = listener.local_addr() {
135 return Some(local_addr.port());
136 }
137 }
138 }
139 None
140}
141
142pub fn bind_listen_to_free_port_in_range(
149 ip: &str,
150 start_port: u16,
151 end_port: u16,
152) -> Option<TcpListener> {
153 for port in start_port..=end_port {
154 if let Ok(listener) = TcpListener::bind((ip, port)) {
155 return Some(listener);
156 }
157 }
158 None
159}
160
161pub fn verbosity_to_args(cfg: &Config) -> &str {
163 if cfg.quiet {
164 "-q"
165 } else {
166 match cfg.verbose {
167 1 => "-v",
168 2 => "-vv",
169 _ => "",
171 }
172 }
173}
174
175pub fn server_handshake(socket: &mut TcpStream) -> anyhow::Result<()> {
177 let handshake_u32 = rnd_u32(std::process::id() as u64);
178 let expect_handshake = rnd_u32(handshake_u32 as u64);
179
180 if let Err(e) = socket.write_all(&handshake_u32.to_be_bytes()) {
181 log::warn!("{}: {e}, retrying in 100 ms ...", e.kind());
182 std::thread::sleep(Duration::from_millis(100));
183 socket.write_all(&handshake_u32.to_be_bytes())?
184 }
185 let mut handshake_buf: [u8; 4] = [0; 4];
186 if let Err(e) = socket.read_exact(&mut handshake_buf) {
187 log::warn!("{}: {e}, retrying in 100 ms ...", e.kind());
188 std::thread::sleep(Duration::from_millis(100));
189 socket.read_exact(&mut handshake_buf)?;
190 }
191 let handshake: u32 = u32::from_be_bytes(handshake_buf);
192
193 if handshake != expect_handshake {
194 bail!("Received unexpected handshake: {handshake}")
195 } else {
196 log::trace!("QFT handshake OK");
197 }
198 Ok(())
199}
200
201pub fn read_server_cmd(
202 socket: &mut TcpStream,
203 cmd_buf: &mut [u8],
204) -> anyhow::Result<Option<ServerCommand>> {
205 let mut header_buf = [0; ServerCommand::HEADER_SIZE];
206 if let Err(e) = socket.read_exact(&mut header_buf) {
208 log::trace!("{e}");
209 if e.kind() == io::ErrorKind::UnexpectedEof {
210 return Ok(None);
212 } else {
213 log::warn!("{}: {e}, retrying in 100 ms ...", e.kind());
214 std::thread::sleep(Duration::from_millis(100));
215 socket.read_exact(&mut header_buf)?;
216 }
217 }
218 let inc_cmd_len = ServerCommand::size_from_bytes(header_buf);
219 debug_assert!(inc_cmd_len <= cmd_buf.len());
220
221 if let Err(e) = socket.read_exact(&mut cmd_buf[..inc_cmd_len]) {
223 log::warn!("{}: {e}, retrying in 100 ms ...", e.kind());
224 std::thread::sleep(Duration::from_millis(100));
225 socket.read_exact(&mut cmd_buf[..inc_cmd_len])?;
226 }
227 let command: ServerCommand = bincode::deserialize(&cmd_buf[..inc_cmd_len])?;
228 Ok(Some(command))
229}
230
231fn read_server_response_header(socket: &mut TcpStream) -> anyhow::Result<usize> {
232 let mut header_buf = [0; ServerResult::HEADER_SIZE];
233 if let Err(e) = socket.read_exact(&mut header_buf) {
235 bail!("{e}");
236 }
237 Ok(ServerResult::size_from_bytes(header_buf))
238}
239
240pub fn read_server_response_with_buf(
242 socket: &mut TcpStream,
243 resp_buf: &mut [u8],
244) -> anyhow::Result<ServerResult> {
245 let inc_resp_len = read_server_response_header(socket)?;
246 debug_assert!(inc_resp_len <= resp_buf.len());
247
248 if let Err(e) = socket.read_exact(&mut resp_buf[..inc_resp_len]) {
250 anyhow::bail!("Error reading command into buffer: {e}");
251 }
252 let resp: ServerResult = bincode::deserialize(&resp_buf[..inc_resp_len])?;
253 Ok(resp)
254}
255
256pub fn read_server_response(socket: &mut TcpStream) -> anyhow::Result<ServerResult> {
257 let inc_resp_len = read_server_response_header(socket)?;
258
259 let mut resp_buf: Vec<u8> = vec![0; inc_resp_len];
261
262 if let Err(e) = socket.read_exact(&mut resp_buf) {
264 anyhow::bail!("Error reading command into buffer: {e}");
265 }
266 let resp: ServerResult = bincode::deserialize(&resp_buf)?;
267 Ok(resp)
268}
269
270pub(crate) mod tiny_rnd {
277
278 pub fn rnd_u32(seed: u64) -> u32 {
289 const PHI: u64 = 0x9e3779b97f4a7c15;
290 let mut z = seed.wrapping_add(PHI);
291 z = (z ^ (z >> 33)).wrapping_mul(0x62A9D9ED799705F5);
296 z = (z ^ (z >> 28)).wrapping_mul(0xCB24D0A5C88C35B3);
297 (z >> 32) as u32
298 }
299}