quick_file_transfer/send/
client.rs

1use std::{
2    fs::File,
3    io::{Read, Write},
4    net::{IpAddr, TcpStream},
5    path::{Path, PathBuf},
6    thread,
7    time::Duration,
8};
9
10use anyhow::bail;
11
12use crate::{
13    config::{
14        self,
15        compression::{Bzip2Args, Compression, GzipArgs, XzArgs},
16        transfer::{
17            command::{DestinationMode, ServerCommand, ServerResult},
18            util::TcpConnectMode,
19        },
20    },
21    mmap_reader::MemoryMapWrapper,
22    send::util::{file_with_bufreader, qft_connect_to_server, send_command, tcp_bufwriter},
23    util::{format_data_size, incremental_rw, read_server_response},
24    TCP_STREAM_BUFSIZE,
25};
26
27/// If poll is specified, poll the server with the specified interval, else exut on the first failure to establish a connection.
28#[allow(clippy::too_many_arguments)]
29pub fn run_client(
30    ip: IpAddr,
31    port: u16,
32    use_mmap: bool,
33    input_files: &[PathBuf],
34    prealloc: bool,
35    compression: Option<Compression>,
36    connect_mode: TcpConnectMode,
37    remote_dest: Option<&Path>,
38) -> anyhow::Result<()> {
39    let mut initial_tcp_stream = qft_connect_to_server((ip, port), connect_mode)?;
40
41    // Validate remote path before start
42    if let Some(remote_dest) = remote_dest {
43        tracing::info!("Running client in remote mode targeting destination: {remote_dest:?}");
44        if input_files.is_empty() {
45            bail!("Error: no files to send");
46        }
47        let dest_mode: DestinationMode = if input_files.len() == 1 {
48            DestinationMode::SingleFile
49        } else {
50            DestinationMode::MultipleFiles
51        };
52
53        send_command(
54            &mut initial_tcp_stream,
55            &ServerCommand::IsDestinationValid(
56                dest_mode,
57                remote_dest.to_string_lossy().into_owned(),
58            ),
59        )?;
60
61        match read_server_response(&mut initial_tcp_stream)? {
62            ServerResult::Ok => log::trace!("Remote path is valid"),
63            ServerResult::Err(e) => {
64                bail!(e);
65            }
66        }
67    }
68
69    let cmd_free_port = ServerCommand::GetFreePort((None, None));
70    send_command(&mut initial_tcp_stream, &cmd_free_port)?;
71    let mut free_port_buf: [u8; 2] = [0; 2];
72    if let Err(e) = initial_tcp_stream.read_exact(&mut free_port_buf) {
73        log::trace!("Initial tcp read of free port response failed: {e}, retrying in 100 ms...");
74        thread::sleep(Duration::from_millis(100));
75        initial_tcp_stream.read_exact(&mut free_port_buf)?;
76    }
77    let free_port = u16::from_be_bytes(free_port_buf);
78    tracing::info!("Got free port: {free_port}");
79
80    if input_files.is_empty() {
81        let mut tcp_stream = qft_connect_to_server((ip, free_port), connect_mode)?;
82        let cmd_receive_data =
83            ServerCommand::ReceiveData(0, "stdin".to_string(), compression.map(|c| c.variant()));
84        send_command(&mut tcp_stream, &cmd_receive_data)?;
85        let transferred_len =
86            transfer_data((ip, port), &mut tcp_stream, compression, None, use_mmap)?;
87        log::info!(
88            "Sent {} [{transferred_len} B]",
89            format_data_size(transferred_len)
90        );
91    } else {
92        let mut fcount = input_files.len();
93        log::info!("Sending {fcount} file(s)");
94
95        for f in input_files {
96            let mut tcp_stream = qft_connect_to_server((ip, free_port), connect_mode)?;
97
98            fcount -= 1;
99            let fname: String = f.file_name().unwrap().to_str().unwrap().to_owned();
100            if prealloc {
101                let file_size = File::open(f)?.metadata()?.len();
102                tracing::debug!(
103                    "Requesting preallocation of file of size {} [{file_size} B]",
104                    format_data_size(file_size)
105                );
106                send_command(
107                    &mut tcp_stream,
108                    &ServerCommand::Prealloc(
109                        file_size,
110                        f.file_name().unwrap().to_string_lossy().into(),
111                    ),
112                )?;
113            }
114
115            log::trace!("Sending receive data command");
116            let cmd_receive_data =
117                ServerCommand::ReceiveData(fcount as u32, fname, compression.map(|c| c.variant()));
118            send_command(&mut tcp_stream, &cmd_receive_data)?;
119
120            let transferred_len =
121                transfer_data((ip, port), &mut tcp_stream, compression, Some(f), use_mmap)?;
122            tcp_stream.flush()?;
123
124            log::info!(
125                "Sent {file} {} [{transferred_len} B]",
126                format_data_size(transferred_len),
127                file = f.display()
128            );
129        }
130    }
131
132    send_command(&mut initial_tcp_stream, &ServerCommand::EndOfTransfer)?;
133    query_server_result(&mut initial_tcp_stream)?;
134
135    Ok(())
136}
137
138pub fn query_server_result(initial_tcp_stream: &mut TcpStream) -> anyhow::Result<()> {
139    use config::transfer::command::ServerResult;
140    let mut header_buf = [0; ServerResult::HEADER_SIZE];
141    // Read the header to determine the size of the incoming command/data
142    if let Err(e) = initial_tcp_stream.read_exact(&mut header_buf) {
143        log::warn!("{}: {e}, retrying in 100 ms ...", e.kind());
144        std::thread::sleep(Duration::from_millis(100));
145        initial_tcp_stream.read_exact(&mut header_buf)?;
146    }
147    let inc_cmd_len = ServerResult::size_from_bytes(header_buf);
148
149    let mut resp_buf = vec![0; inc_cmd_len];
150
151    // Read the actual command/data based on the size
152    if let Err(e) = initial_tcp_stream.read_exact(&mut resp_buf[..inc_cmd_len]) {
153        anyhow::bail!("Error reading command into buffer: {e}");
154    }
155    let resp: ServerResult = bincode::deserialize(&resp_buf[..inc_cmd_len])?;
156    log::debug!("Server response: {resp:?}");
157
158    match resp {
159        ServerResult::Ok => Ok(()),
160        ServerResult::Err(err_str) => bail!("Server responded with an error: {err_str}"),
161    }
162}
163
164fn transfer_data(
165    (ip, port): (IpAddr, u16),
166    tcp_stream: &mut TcpStream,
167    compression: Option<Compression>,
168    file: Option<&Path>,
169    use_mmap: bool,
170) -> anyhow::Result<u64> {
171    log::debug!("Sending to: {ip}:{port}");
172
173    let mut buf_tcp_stream = tcp_bufwriter(tcp_stream);
174
175    if use_mmap && file.is_some() {
176        log::debug!("Using mmap");
177        let mmap = MemoryMapWrapper::new(file.unwrap())?;
178        let target_read = mmap.flen();
179
180        let transferred_bytes = match compression {
181            None => {
182                let mut total_written = 0;
183                let chunks = mmap.borrow_full().chunks(TCP_STREAM_BUFSIZE);
184                for chunk in chunks {
185                    let mut chunk_written = 0;
186                    let chunk_len = chunk.len();
187                    while chunk_written != chunk_len {
188                        let bytes_written = buf_tcp_stream.write(chunk)?;
189                        if bytes_written == 0 {
190                            bail!("Wrote 0 bytes to socket, server disconnected?");
191                        }
192                        chunk_written += bytes_written;
193                    }
194                    total_written += chunk_written;
195                }
196
197                total_written.try_into()?
198            }
199            Some(c) => match c {
200                config::compression::Compression::Bzip2(Bzip2Args { compression_level }) => {
201                    let mut encoder = bzip2::read::BzEncoder::new(
202                        mmap.borrow_full(),
203                        bzip2::Compression::new(compression_level.into()),
204                    );
205                    incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut buf_tcp_stream, &mut encoder)?
206                }
207                config::compression::Compression::Lz4 => {
208                    let mut lz4_writer = lz4_flex::frame::FrameEncoder::new(&mut buf_tcp_stream);
209                    let mut total_read = 0;
210                    while total_read < target_read {
211                        let remaining = target_read - total_read;
212                        let chunk_size = remaining.min(TCP_STREAM_BUFSIZE);
213                        let chunk = mmap.borrow_slice(total_read..total_read + chunk_size)?;
214                        let written_bytes = lz4_writer.write(chunk)?;
215                        total_read += written_bytes;
216                    }
217                    lz4_writer.flush()?; // Needed to ensure the entire content is written
218                    total_read as u64
219                }
220                config::compression::Compression::Gzip(GzipArgs { compression_level }) => {
221                    let mut encoder = flate2::read::GzEncoder::new(
222                        mmap.borrow_full(),
223                        flate2::Compression::new(compression_level.into()),
224                    );
225                    incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut buf_tcp_stream, &mut encoder)?
226                }
227                config::compression::Compression::Xz(XzArgs { compression_level }) => {
228                    let mut compressor =
229                        xz2::read::XzEncoder::new(mmap.borrow_full(), compression_level.into());
230                    incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(
231                        &mut buf_tcp_stream,
232                        &mut compressor,
233                    )?
234                }
235            },
236        };
237        return Ok(transferred_bytes);
238    }
239
240    // On-stack dynamic dispatch
241    let mut bufreader = file_with_bufreader(file.unwrap())?;
242    if let Some(compression) = compression {
243        log::debug!("Compression mode: {compression}");
244    };
245
246    let transferred_bytes = match compression {
247        Some(compression) => match compression {
248            config::compression::Compression::Bzip2(Bzip2Args { compression_level }) => {
249                let mut encoder = bzip2::read::BzEncoder::new(
250                    bufreader,
251                    bzip2::Compression::new(compression_level.into()),
252                );
253                incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut buf_tcp_stream, &mut encoder)?
254            }
255            config::compression::Compression::Lz4 => {
256                let mut lz4_writer = lz4_flex::frame::FrameEncoder::new(&mut buf_tcp_stream);
257                let len: u64 =
258                    incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut lz4_writer, &mut bufreader)?;
259                lz4_writer.flush()?; // Needed to ensure the entire content is written
260                len
261            }
262            config::compression::Compression::Gzip(GzipArgs { compression_level }) => {
263                let mut encoder = flate2::read::GzEncoder::new(
264                    bufreader,
265                    flate2::Compression::new(compression_level.into()),
266                );
267                incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut buf_tcp_stream, &mut encoder)?
268            }
269            config::compression::Compression::Xz(XzArgs { compression_level }) => {
270                let mut compressor = xz2::read::XzEncoder::new(bufreader, compression_level.into());
271                incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut buf_tcp_stream, &mut compressor)?
272            }
273        },
274        None => incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut buf_tcp_stream, &mut bufreader)?,
275    };
276
277    Ok(transferred_bytes)
278}