1use async_recursion::async_recursion;
2use tracing::instrument;
3
4fn progress() -> &'static common::progress::Progress {
5 common::get_progress()
6}
7
8#[instrument(skip(error_occurred))]
9#[async_recursion]
10async fn send_directories_and_symlinks(
11 settings: &common::copy::Settings,
12 src: &std::path::Path,
13 dst: &std::path::Path,
14 is_root: bool,
15 control_send_stream: &remote::streams::SharedSendStream,
16 connection: &remote::streams::Connection,
17 error_occurred: &std::sync::Arc<std::sync::atomic::AtomicBool>,
18) -> anyhow::Result<()> {
19 tracing::debug!("Sending data from {:?} to {:?}", &src, dst);
20 let src_metadata = match if settings.dereference {
21 tokio::fs::metadata(&src).await
22 } else {
23 tokio::fs::symlink_metadata(&src).await
24 } {
25 Ok(m) => m,
26 Err(e) => {
27 tracing::error!("Failed reading metadata from src {src:?}: {e}");
28 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
29 if settings.fail_early {
30 return Err(e.into());
31 }
32 return Ok(());
33 }
34 };
35 if src_metadata.is_file() {
36 return Ok(());
37 }
38 if src_metadata.is_symlink() {
39 let target = match tokio::fs::read_link(&src).await {
40 Ok(t) => t,
41 Err(e) => {
42 tracing::error!("Failed reading symlink {src:?}: {e}");
43 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
44 if !is_root {
46 let skip_msg =
47 remote::protocol::SourceMessage::SymlinkSkipped(remote::protocol::SrcDst {
48 src: src.to_path_buf(),
49 dst: dst.to_path_buf(),
50 });
51 control_send_stream
52 .lock()
53 .await
54 .send_batch_message(&skip_msg)
55 .await?;
56 }
57 if settings.fail_early {
58 return Err(e.into());
59 }
60 return Ok(());
61 }
62 };
63 let symlink = remote::protocol::SourceMessage::Symlink {
64 src: src.to_path_buf(),
65 dst: dst.to_path_buf(),
66 target: target.clone(),
67 metadata: remote::protocol::Metadata::from(&src_metadata),
68 is_root,
69 };
70 return control_send_stream
71 .lock()
72 .await
73 .send_batch_message(&symlink)
74 .await;
75 }
76 if !src_metadata.is_dir() {
77 assert!(
78 src_metadata.is_file(),
79 "Encountered fs object that's not a directory, symlink or a file? {src:?}"
80 );
81 return Ok(());
82 }
83 let mut entry_count = 0;
86 let mut entries = match tokio::fs::read_dir(&src).await {
87 Ok(e) => e,
88 Err(e) => {
89 tracing::error!("Cannot open directory {src:?} for reading: {e}");
90 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
91 if settings.fail_early {
92 return Err(e.into());
93 }
94 return Ok(());
95 }
96 };
97 loop {
98 match entries.next_entry().await {
99 Ok(Some(_entry)) => {
100 entry_count += 1;
101 }
102 Ok(None) => break,
103 Err(e) => {
104 tracing::error!("Failed traversing src directory {src:?}: {e}");
105 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
106 if settings.fail_early {
107 return Err(e.into());
108 }
109 break;
110 }
111 }
112 }
113 let dir = remote::protocol::SourceMessage::DirStub {
114 src: src.to_path_buf(),
115 dst: dst.to_path_buf(),
116 num_entries: entry_count,
117 };
118 tracing::debug!(
119 "Sending directory stub: {:?} -> {:?}, with {} entries",
120 &src,
121 dst,
122 entry_count
123 );
124 control_send_stream
125 .lock()
126 .await
127 .send_batch_message(&dir)
128 .await?;
129 let mut entries = match tokio::fs::read_dir(&src).await {
130 Ok(e) => e,
131 Err(e) => {
132 tracing::error!("Cannot open directory {src:?} for reading: {e}");
133 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
134 if settings.fail_early {
135 return Err(e.into());
136 }
137 return Ok(());
138 }
139 };
140 loop {
141 match entries.next_entry().await {
142 Ok(Some(entry)) => {
143 assert!(
144 entry_count > 0,
145 "Entry count for {src:?} is out of sync, was it modified during the copy?"
146 );
147 entry_count -= 1;
148 let entry_path = entry.path();
149 let entry_name = entry_path.file_name().unwrap();
150 let dst_path = dst.join(entry_name);
151 if let Err(e) = send_directories_and_symlinks(
152 settings,
153 &entry_path,
154 &dst_path,
155 false,
156 control_send_stream,
157 connection,
158 error_occurred,
159 )
160 .await
161 {
162 tracing::error!("Failed to send directory/symlink {entry_path:?}: {e}");
163 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
164 if settings.fail_early {
165 return Err(e);
166 }
167 }
168 }
169 Ok(None) => break,
170 Err(e) => {
171 tracing::error!("Failed traversing src directory {src:?}: {e}");
172 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
173 if settings.fail_early {
174 return Err(e.into());
175 }
176 break;
177 }
178 }
179 }
180 assert!(
181 entry_count == 0,
182 "Entry count for {src:?} is out of sync, was it modified during the copy?"
183 );
184 Ok(())
185}
186
187#[instrument(skip(error_occurred))]
188#[async_recursion]
189async fn send_fs_objects(
190 settings: &common::copy::Settings,
191 src: &std::path::Path,
192 dst: &std::path::Path,
193 control_send_stream: remote::streams::SharedSendStream,
194 connection: remote::streams::Connection,
195 error_occurred: std::sync::Arc<std::sync::atomic::AtomicBool>,
196) -> anyhow::Result<()> {
197 tracing::info!("Sending data from {:?} to {:?}", src, dst);
198 let src_metadata = match if settings.dereference {
199 tokio::fs::metadata(src).await
200 } else {
201 tokio::fs::symlink_metadata(src).await
202 } {
203 Ok(m) => m,
204 Err(e) => {
205 tracing::error!("Failed reading metadata from src {src:?}: {e}");
206 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
207 return Err(e.into());
208 }
209 };
210 if !src_metadata.is_file() {
211 if let Err(e) = send_directories_and_symlinks(
212 settings,
213 src,
214 dst,
215 true,
216 &control_send_stream,
217 &connection,
218 &error_occurred,
219 )
220 .await
221 {
222 tracing::error!("Failed to send directories and symlinks: {e}");
223 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
224 if settings.fail_early {
225 return Err(e);
226 }
227 }
228 }
229 let mut stream = control_send_stream.lock().await;
230 stream
231 .send_control_message(&remote::protocol::SourceMessage::DirStructureComplete)
232 .await?;
233 if src_metadata.is_file() {
234 if let Err(e) = send_file(
235 settings,
236 src,
237 dst,
238 &src_metadata,
239 true,
240 connection,
241 &error_occurred,
242 control_send_stream.clone(),
243 )
244 .await
245 {
246 tracing::error!("Failed to send root file: {e}");
247 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
248 if settings.fail_early {
249 return Err(e);
250 }
251 }
252 }
253 return Ok(());
254}
255
256#[instrument(skip(error_occurred, control_send_stream))]
257#[async_recursion]
258#[allow(clippy::too_many_arguments)]
259async fn send_file(
260 settings: &common::copy::Settings,
261 src: &std::path::Path,
262 dst: &std::path::Path,
263 src_metadata: &std::fs::Metadata,
264 is_root: bool,
265 connection: remote::streams::Connection,
266 error_occurred: &std::sync::Arc<std::sync::atomic::AtomicBool>,
267 control_send_stream: remote::streams::SharedSendStream,
268) -> anyhow::Result<()> {
269 let prog = progress();
270 let _ops_guard = prog.ops.guard();
271 let _open_file_guard = throttle::open_file_permit().await;
272 tracing::debug!("Sending file content for {:?}", src);
273 throttle::get_file_iops_tokens(settings.chunk_size, src_metadata.len()).await;
274 let mut file = match tokio::fs::File::open(src).await {
276 Ok(f) => f,
277 Err(e) => {
278 tracing::error!("Failed to open file {src:?}: {e}");
279 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
280 if !is_root {
282 let skip_msg =
283 remote::protocol::SourceMessage::FileSkipped(remote::protocol::SrcDst {
284 src: src.to_path_buf(),
285 dst: dst.to_path_buf(),
286 });
287 control_send_stream
288 .lock()
289 .await
290 .send_batch_message(&skip_msg)
291 .await?;
292 }
293 if settings.fail_early {
294 return Err(e.into());
295 }
296 return Ok(());
297 }
298 };
299 let metadata = remote::protocol::Metadata::from(src_metadata);
300 let file_header = remote::protocol::File {
301 src: src.to_path_buf(),
302 dst: dst.to_path_buf(),
303 size: src_metadata.len(),
304 metadata,
305 is_root,
306 };
307 let mut file_send_stream = connection.open_uni().await?;
308 match file_send_stream
309 .send_message_with_data(&file_header, &mut file)
310 .await
311 {
312 Ok(_bytes_sent) => {
313 file_send_stream.close().await?;
314 prog.files_copied.inc();
315 prog.bytes_copied.add(src_metadata.len());
316 tracing::info!("Sent file: {:?} -> {:?}", src, dst);
317 Ok(())
318 }
319 Err(e) => {
320 tracing::error!("Failed to send file content for {src:?}: {e}");
321 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
322 file_send_stream.close().await?;
323 if settings.fail_early {
324 Err(e)
325 } else {
326 Ok(())
327 }
328 }
329 }
330}
331
332#[instrument(skip(error_occurred, control_send_stream))]
333async fn send_files_in_directory(
334 settings: common::copy::Settings,
335 src: std::path::PathBuf,
336 dst: std::path::PathBuf,
337 connection: remote::streams::Connection,
338 error_occurred: std::sync::Arc<std::sync::atomic::AtomicBool>,
339 control_send_stream: remote::streams::SharedSendStream,
340) -> anyhow::Result<()> {
341 tracing::info!("Sending files from {src:?}");
342 let mut entries = match tokio::fs::read_dir(&src).await {
343 Ok(e) => e,
344 Err(e) => {
345 tracing::error!("Cannot open directory {src:?} for reading: {e}");
346 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
347 if settings.fail_early {
348 return Err(e.into());
349 }
350 return Ok(());
351 }
352 };
353 let mut join_set = tokio::task::JoinSet::new();
354 loop {
355 match entries.next_entry().await {
356 Ok(Some(entry)) => {
357 let entry_path = entry.path();
358 let entry_name = entry_path.file_name().unwrap();
359 let dst_path = dst.join(entry_name);
360 let entry_metadata = match if settings.dereference {
361 tokio::fs::metadata(&entry_path).await
362 } else {
363 tokio::fs::symlink_metadata(&entry_path).await
364 } {
365 Ok(m) => m,
366 Err(e) => {
367 tracing::error!("Failed reading metadata from {entry_path:?}: {e}");
368 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
369 if settings.fail_early {
370 return Err(e.into());
371 }
372 continue;
373 }
374 };
375 if !entry_metadata.is_file() {
376 continue;
377 }
378 throttle::get_ops_token().await;
379 let connection = connection.clone();
380 let error_flag = error_occurred.clone();
381 let control_stream = control_send_stream.clone();
382 join_set.spawn(async move {
383 send_file(
384 &settings,
385 &entry_path,
386 &dst_path,
387 &entry_metadata,
388 false,
389 connection,
390 &error_flag,
391 control_stream,
392 )
393 .await
394 });
395 }
396 Ok(None) => break,
397 Err(e) => {
398 tracing::error!("Failed traversing src directory {src:?}: {e}");
399 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
400 if settings.fail_early {
401 return Err(e.into());
402 }
403 break;
404 }
405 }
406 }
407 drop(entries);
408 while let Some(res) = join_set.join_next().await {
409 match res {
410 Ok(Ok(())) => {}
411 Ok(Err(e)) => {
412 tracing::error!("Failed to send file from {src:?}: {e}");
413 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
414 if settings.fail_early {
415 return Err(e);
416 }
417 }
418 Err(e) => {
419 tracing::error!("Task panicked while sending file from {src:?}: {e}");
420 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
421 if settings.fail_early {
422 return Err(e.into());
423 }
424 }
425 }
426 }
427 Ok(())
428}
429
430#[instrument(skip(error_occurred))]
431async fn dispatch_control_messages(
432 settings: common::copy::Settings,
433 mut control_recv_stream: remote::streams::RecvStream,
434 control_send_stream: remote::streams::SharedSendStream,
435 connection: remote::streams::Connection,
436 src_root: std::path::PathBuf,
437 error_occurred: std::sync::Arc<std::sync::atomic::AtomicBool>,
438) -> anyhow::Result<()> {
439 let mut join_set = tokio::task::JoinSet::new();
440 while let Some(message) = control_recv_stream
441 .recv_object::<remote::protocol::DestinationMessage>()
442 .await?
443 {
444 match message {
445 remote::protocol::DestinationMessage::DirectoryCreated(confirmation) => {
446 tracing::info!(
447 "Received directory creation confirmation for: {:?} -> {:?}",
448 confirmation.src,
449 confirmation.dst
450 );
451 let error_flag = error_occurred.clone();
452 join_set.spawn(send_files_in_directory(
453 settings,
454 confirmation.src.clone(),
455 confirmation.dst.clone(),
456 connection.clone(),
457 error_flag,
458 control_send_stream.clone(),
459 ));
460 }
461 remote::protocol::DestinationMessage::DirectoryFailed(failure) => {
462 tracing::warn!(
463 "Received directory creation failure for: {:?} -> {:?}, skipping its contents",
464 failure.src,
465 failure.dst
466 );
467 }
468 remote::protocol::DestinationMessage::DirectoryComplete(completion) => {
469 tracing::info!(
470 "Received directory completion for: {:?} -> {:?}",
471 completion.src,
472 completion.dst
473 );
474 match if settings.dereference {
476 tokio::fs::metadata(&completion.src).await
477 } else {
478 tokio::fs::symlink_metadata(&completion.src).await
479 } {
480 Ok(src_metadata) => {
481 let metadata = remote::protocol::Metadata::from(&src_metadata);
482 let is_root = completion.src == src_root;
483 let dir_metadata = remote::protocol::SourceMessage::Directory {
484 src: completion.src,
485 dst: completion.dst,
486 metadata,
487 is_root,
488 };
489 tracing::debug!("Before sending directory metadata");
490 {
491 let mut stream = control_send_stream.lock().await;
492 stream.send_control_message(&dir_metadata).await?;
493 }
494 tracing::debug!("Sent directory metadata");
495 }
496 Err(e) => {
497 tracing::error!("Failed to read metadata from {:?}: {e}", completion.src);
498 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
499 if settings.fail_early {
500 return Err(e.into());
501 }
502 }
503 }
504 }
505 remote::protocol::DestinationMessage::DestinationDone => {
506 tracing::info!("Received destination done message");
507 let mut stream = control_send_stream.lock().await;
508 stream
509 .send_control_message(&remote::protocol::SourceMessage::SourceDone)
510 .await?;
511 tracing::info!("Closing control send stream");
512 stream.close().await?;
513 tracing::info!("Sent source done message");
514 break;
515 }
516 }
517 while let Some(result) = join_set.try_join_next() {
519 match result {
520 Ok(Ok(())) => {}
521 Ok(Err(e)) => {
522 tracing::error!("Task failed: {e}");
523 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
524 if settings.fail_early {
525 return Err(e);
526 }
527 }
528 Err(e) => {
529 tracing::error!("Task panicked: {e}");
530 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
531 if settings.fail_early {
532 return Err(e.into());
533 }
534 }
535 }
536 }
537 }
538 while let Some(result) = join_set.join_next().await {
539 match result {
540 Ok(Ok(())) => {}
541 Ok(Err(e)) => {
542 tracing::error!("Task failed: {e}");
543 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
544 if settings.fail_early {
545 return Err(e);
546 }
547 }
548 Err(e) => {
549 tracing::error!("Task panicked: {e}");
550 error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
551 if settings.fail_early {
552 return Err(e.into());
553 }
554 }
555 }
556 }
557 tracing::info!("Closing control recv stream");
558 control_recv_stream.close().await;
559 tracing::info!("Finished dispatching control messages");
560 Ok(())
561}
562
563async fn handle_connection(
564 conn: quinn::Connecting,
565 settings: &common::copy::Settings,
566 src: &std::path::Path,
567 dst: &std::path::Path,
568 error_occurred: std::sync::Arc<std::sync::atomic::AtomicBool>,
569) -> anyhow::Result<()> {
570 let connection = conn.await?;
571 tracing::info!("Destination connection established");
572 let connection = remote::streams::Connection::new(connection);
573 let (control_send_stream, control_recv_stream) = connection.open_bi().await?;
574 tracing::info!("Opened streams for directory transfer");
575 let dispatch_task = tokio::spawn(dispatch_control_messages(
576 *settings,
577 control_recv_stream,
578 control_send_stream.clone(),
579 connection.clone(),
580 src.to_path_buf(),
581 error_occurred.clone(),
582 ));
583 let send_result = send_fs_objects(
584 settings,
585 src,
586 dst,
587 control_send_stream,
588 connection.clone(),
589 error_occurred.clone(),
590 )
591 .await;
592 if send_result.is_err() {
594 connection.close();
595 }
596 send_result?;
597 dispatch_task.await??;
598 tracing::info!("Data sent successfully");
599 Ok(())
600}
601
602#[instrument]
603pub async fn run_source(
604 master_send_stream: remote::streams::SharedSendStream,
605 src: &std::path::Path,
606 dst: &std::path::Path,
607 settings: &common::copy::Settings,
608 quic_config: &remote::QuicConfig,
609) -> anyhow::Result<(String, common::copy::Summary)> {
610 let (server_endpoint, cert_fingerprint) = remote::get_server_with_port_ranges(
611 quic_config.port_ranges.as_deref(),
612 quic_config.idle_timeout_sec,
613 quic_config.keep_alive_interval_sec,
614 )?;
615 let server_addr = remote::get_endpoint_addr(&server_endpoint)?;
616 tracing::info!("Source server listening on {}", server_addr);
617 let master_hello = remote::protocol::SourceMasterHello {
618 source_addr: server_addr,
619 server_name: remote::get_random_server_name(),
620 cert_fingerprint,
621 };
622 tracing::info!("Sending master hello: {:?}", master_hello);
623 master_send_stream
624 .lock()
625 .await
626 .send_control_message(&master_hello)
627 .await?;
628 tracing::info!("Waiting for connection from destination");
629 let error_occurred = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
632 let accept_timeout = std::time::Duration::from_secs(quic_config.conn_timeout_sec);
633 match tokio::time::timeout(accept_timeout, server_endpoint.accept()).await {
634 Ok(Some(conn)) => {
635 tracing::info!("New destination connection incoming");
636 handle_connection(conn, settings, src, dst, error_occurred.clone()).await?;
637 }
638 Ok(None) => {
639 tracing::error!("Server endpoint closed unexpectedly");
640 return Err(anyhow::anyhow!("Server endpoint closed unexpectedly"));
641 }
642 Err(_) => {
643 tracing::error!(
644 "Timed out waiting for destination to connect after {:?}. \
645 This usually means the destination cannot reach the source. \
646 Check network connectivity and firewall rules.",
647 accept_timeout
648 );
649 return Err(anyhow::anyhow!(
650 "Timed out waiting for destination to connect after {:?}",
651 accept_timeout
652 ));
653 }
654 }
655 tracing::info!("Source is done");
656 server_endpoint.wait_idle().await;
657 if error_occurred.load(std::sync::atomic::Ordering::Relaxed) {
659 Err(common::copy::Error {
660 source: anyhow::anyhow!("Some operations failed during remote copy"),
661 summary: common::copy::Summary::default(),
662 }
663 .into())
664 } else {
665 Ok(("source OK".to_string(), common::copy::Summary::default()))
666 }
667}