quick_file_transfer/send/
client.rs1use std::{
2 fs::File,
3 io::{Read, Write},
4 net::{IpAddr, TcpStream},
5 path::{Path, PathBuf},
6 thread,
7 time::Duration,
8};
9
10use anyhow::bail;
11
12use crate::{
13 config::{
14 self,
15 compression::{Bzip2Args, Compression, GzipArgs, XzArgs},
16 transfer::{
17 command::{DestinationMode, ServerCommand, ServerResult},
18 util::TcpConnectMode,
19 },
20 },
21 mmap_reader::MemoryMapWrapper,
22 send::util::{file_with_bufreader, qft_connect_to_server, send_command, tcp_bufwriter},
23 util::{format_data_size, incremental_rw, read_server_response},
24 TCP_STREAM_BUFSIZE,
25};
26
27#[allow(clippy::too_many_arguments)]
29pub fn run_client(
30 ip: IpAddr,
31 port: u16,
32 use_mmap: bool,
33 input_files: &[PathBuf],
34 prealloc: bool,
35 compression: Option<Compression>,
36 connect_mode: TcpConnectMode,
37 remote_dest: Option<&Path>,
38) -> anyhow::Result<()> {
39 let mut initial_tcp_stream = qft_connect_to_server((ip, port), connect_mode)?;
40
41 if let Some(remote_dest) = remote_dest {
43 tracing::info!("Running client in remote mode targeting destination: {remote_dest:?}");
44 if input_files.is_empty() {
45 bail!("Error: no files to send");
46 }
47 let dest_mode: DestinationMode = if input_files.len() == 1 {
48 DestinationMode::SingleFile
49 } else {
50 DestinationMode::MultipleFiles
51 };
52
53 send_command(
54 &mut initial_tcp_stream,
55 &ServerCommand::IsDestinationValid(
56 dest_mode,
57 remote_dest.to_string_lossy().into_owned(),
58 ),
59 )?;
60
61 match read_server_response(&mut initial_tcp_stream)? {
62 ServerResult::Ok => log::trace!("Remote path is valid"),
63 ServerResult::Err(e) => {
64 bail!(e);
65 }
66 }
67 }
68
69 let cmd_free_port = ServerCommand::GetFreePort((None, None));
70 send_command(&mut initial_tcp_stream, &cmd_free_port)?;
71 let mut free_port_buf: [u8; 2] = [0; 2];
72 if let Err(e) = initial_tcp_stream.read_exact(&mut free_port_buf) {
73 log::trace!("Initial tcp read of free port response failed: {e}, retrying in 100 ms...");
74 thread::sleep(Duration::from_millis(100));
75 initial_tcp_stream.read_exact(&mut free_port_buf)?;
76 }
77 let free_port = u16::from_be_bytes(free_port_buf);
78 tracing::info!("Got free port: {free_port}");
79
80 if input_files.is_empty() {
81 let mut tcp_stream = qft_connect_to_server((ip, free_port), connect_mode)?;
82 let cmd_receive_data =
83 ServerCommand::ReceiveData(0, "stdin".to_string(), compression.map(|c| c.variant()));
84 send_command(&mut tcp_stream, &cmd_receive_data)?;
85 let transferred_len =
86 transfer_data((ip, port), &mut tcp_stream, compression, None, use_mmap)?;
87 log::info!(
88 "Sent {} [{transferred_len} B]",
89 format_data_size(transferred_len)
90 );
91 } else {
92 let mut fcount = input_files.len();
93 log::info!("Sending {fcount} file(s)");
94
95 for f in input_files {
96 let mut tcp_stream = qft_connect_to_server((ip, free_port), connect_mode)?;
97
98 fcount -= 1;
99 let fname: String = f.file_name().unwrap().to_str().unwrap().to_owned();
100 if prealloc {
101 let file_size = File::open(f)?.metadata()?.len();
102 tracing::debug!(
103 "Requesting preallocation of file of size {} [{file_size} B]",
104 format_data_size(file_size)
105 );
106 send_command(
107 &mut tcp_stream,
108 &ServerCommand::Prealloc(
109 file_size,
110 f.file_name().unwrap().to_string_lossy().into(),
111 ),
112 )?;
113 }
114
115 log::trace!("Sending receive data command");
116 let cmd_receive_data =
117 ServerCommand::ReceiveData(fcount as u32, fname, compression.map(|c| c.variant()));
118 send_command(&mut tcp_stream, &cmd_receive_data)?;
119
120 let transferred_len =
121 transfer_data((ip, port), &mut tcp_stream, compression, Some(f), use_mmap)?;
122 tcp_stream.flush()?;
123
124 log::info!(
125 "Sent {file} {} [{transferred_len} B]",
126 format_data_size(transferred_len),
127 file = f.display()
128 );
129 }
130 }
131
132 send_command(&mut initial_tcp_stream, &ServerCommand::EndOfTransfer)?;
133 query_server_result(&mut initial_tcp_stream)?;
134
135 Ok(())
136}
137
138pub fn query_server_result(initial_tcp_stream: &mut TcpStream) -> anyhow::Result<()> {
139 use config::transfer::command::ServerResult;
140 let mut header_buf = [0; ServerResult::HEADER_SIZE];
141 if let Err(e) = initial_tcp_stream.read_exact(&mut header_buf) {
143 log::warn!("{}: {e}, retrying in 100 ms ...", e.kind());
144 std::thread::sleep(Duration::from_millis(100));
145 initial_tcp_stream.read_exact(&mut header_buf)?;
146 }
147 let inc_cmd_len = ServerResult::size_from_bytes(header_buf);
148
149 let mut resp_buf = vec![0; inc_cmd_len];
150
151 if let Err(e) = initial_tcp_stream.read_exact(&mut resp_buf[..inc_cmd_len]) {
153 anyhow::bail!("Error reading command into buffer: {e}");
154 }
155 let resp: ServerResult = bincode::deserialize(&resp_buf[..inc_cmd_len])?;
156 log::debug!("Server response: {resp:?}");
157
158 match resp {
159 ServerResult::Ok => Ok(()),
160 ServerResult::Err(err_str) => bail!("Server responded with an error: {err_str}"),
161 }
162}
163
164fn transfer_data(
165 (ip, port): (IpAddr, u16),
166 tcp_stream: &mut TcpStream,
167 compression: Option<Compression>,
168 file: Option<&Path>,
169 use_mmap: bool,
170) -> anyhow::Result<u64> {
171 log::debug!("Sending to: {ip}:{port}");
172
173 let mut buf_tcp_stream = tcp_bufwriter(tcp_stream);
174
175 if use_mmap && file.is_some() {
176 log::debug!("Using mmap");
177 let mmap = MemoryMapWrapper::new(file.unwrap())?;
178 let target_read = mmap.flen();
179
180 let transferred_bytes = match compression {
181 None => {
182 let mut total_written = 0;
183 let chunks = mmap.borrow_full().chunks(TCP_STREAM_BUFSIZE);
184 for chunk in chunks {
185 let mut chunk_written = 0;
186 let chunk_len = chunk.len();
187 while chunk_written != chunk_len {
188 let bytes_written = buf_tcp_stream.write(chunk)?;
189 if bytes_written == 0 {
190 bail!("Wrote 0 bytes to socket, server disconnected?");
191 }
192 chunk_written += bytes_written;
193 }
194 total_written += chunk_written;
195 }
196
197 total_written.try_into()?
198 }
199 Some(c) => match c {
200 config::compression::Compression::Bzip2(Bzip2Args { compression_level }) => {
201 let mut encoder = bzip2::read::BzEncoder::new(
202 mmap.borrow_full(),
203 bzip2::Compression::new(compression_level.into()),
204 );
205 incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut buf_tcp_stream, &mut encoder)?
206 }
207 config::compression::Compression::Lz4 => {
208 let mut lz4_writer = lz4_flex::frame::FrameEncoder::new(&mut buf_tcp_stream);
209 let mut total_read = 0;
210 while total_read < target_read {
211 let remaining = target_read - total_read;
212 let chunk_size = remaining.min(TCP_STREAM_BUFSIZE);
213 let chunk = mmap.borrow_slice(total_read..total_read + chunk_size)?;
214 let written_bytes = lz4_writer.write(chunk)?;
215 total_read += written_bytes;
216 }
217 lz4_writer.flush()?; total_read as u64
219 }
220 config::compression::Compression::Gzip(GzipArgs { compression_level }) => {
221 let mut encoder = flate2::read::GzEncoder::new(
222 mmap.borrow_full(),
223 flate2::Compression::new(compression_level.into()),
224 );
225 incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut buf_tcp_stream, &mut encoder)?
226 }
227 config::compression::Compression::Xz(XzArgs { compression_level }) => {
228 let mut compressor =
229 xz2::read::XzEncoder::new(mmap.borrow_full(), compression_level.into());
230 incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(
231 &mut buf_tcp_stream,
232 &mut compressor,
233 )?
234 }
235 },
236 };
237 return Ok(transferred_bytes);
238 }
239
240 let mut bufreader = file_with_bufreader(file.unwrap())?;
242 if let Some(compression) = compression {
243 log::debug!("Compression mode: {compression}");
244 };
245
246 let transferred_bytes = match compression {
247 Some(compression) => match compression {
248 config::compression::Compression::Bzip2(Bzip2Args { compression_level }) => {
249 let mut encoder = bzip2::read::BzEncoder::new(
250 bufreader,
251 bzip2::Compression::new(compression_level.into()),
252 );
253 incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut buf_tcp_stream, &mut encoder)?
254 }
255 config::compression::Compression::Lz4 => {
256 let mut lz4_writer = lz4_flex::frame::FrameEncoder::new(&mut buf_tcp_stream);
257 let len: u64 =
258 incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut lz4_writer, &mut bufreader)?;
259 lz4_writer.flush()?; len
261 }
262 config::compression::Compression::Gzip(GzipArgs { compression_level }) => {
263 let mut encoder = flate2::read::GzEncoder::new(
264 bufreader,
265 flate2::Compression::new(compression_level.into()),
266 );
267 incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut buf_tcp_stream, &mut encoder)?
268 }
269 config::compression::Compression::Xz(XzArgs { compression_level }) => {
270 let mut compressor = xz2::read::XzEncoder::new(bufreader, compression_level.into());
271 incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut buf_tcp_stream, &mut compressor)?
272 }
273 },
274 None => incremental_rw::<TCP_STREAM_BUFSIZE, _, _>(&mut buf_tcp_stream, &mut bufreader)?,
275 };
276
277 Ok(transferred_bytes)
278}