rcp_tools_rcp/
source.rs

1use async_recursion::async_recursion;
2use tracing::instrument;
3
4fn progress() -> &'static common::progress::Progress {
5    common::get_progress()
6}
7
8#[instrument(skip(error_occurred))]
9#[async_recursion]
10async fn send_directories_and_symlinks(
11    settings: &common::copy::Settings,
12    src: &std::path::Path,
13    dst: &std::path::Path,
14    is_root: bool,
15    control_send_stream: &remote::streams::SharedSendStream,
16    connection: &remote::streams::Connection,
17    error_occurred: &std::sync::Arc<std::sync::atomic::AtomicBool>,
18) -> anyhow::Result<()> {
19    tracing::debug!("Sending data from {:?} to {:?}", &src, dst);
20    let src_metadata = match if settings.dereference {
21        tokio::fs::metadata(&src).await
22    } else {
23        tokio::fs::symlink_metadata(&src).await
24    } {
25        Ok(m) => m,
26        Err(e) => {
27            tracing::error!("Failed reading metadata from src {src:?}: {e}");
28            error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
29            if settings.fail_early {
30                return Err(e.into());
31            }
32            return Ok(());
33        }
34    };
35    if src_metadata.is_file() {
36        return Ok(());
37    }
38    if src_metadata.is_symlink() {
39        let target = match tokio::fs::read_link(&src).await {
40            Ok(t) => t,
41            Err(e) => {
42                tracing::error!("Failed reading symlink {src:?}: {e}");
43                error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
44                // notify destination that this symlink was skipped (for directory tracking)
45                if !is_root {
46                    let skip_msg =
47                        remote::protocol::SourceMessage::SymlinkSkipped(remote::protocol::SrcDst {
48                            src: src.to_path_buf(),
49                            dst: dst.to_path_buf(),
50                        });
51                    control_send_stream
52                        .lock()
53                        .await
54                        .send_batch_message(&skip_msg)
55                        .await?;
56                }
57                if settings.fail_early {
58                    return Err(e.into());
59                }
60                return Ok(());
61            }
62        };
63        let symlink = remote::protocol::SourceMessage::Symlink {
64            src: src.to_path_buf(),
65            dst: dst.to_path_buf(),
66            target: target.clone(),
67            metadata: remote::protocol::Metadata::from(&src_metadata),
68            is_root,
69        };
70        return control_send_stream
71            .lock()
72            .await
73            .send_batch_message(&symlink)
74            .await;
75    }
76    if !src_metadata.is_dir() {
77        assert!(
78            src_metadata.is_file(),
79            "Encountered fs object that's not a directory, symlink or a file? {src:?}"
80        );
81        return Ok(());
82    }
83    // we do one more read_dir to count entries; this could be avoided by e.g. modifying
84    // the protocol to send the entry count at a later time
85    let mut entry_count = 0;
86    let mut entries = match tokio::fs::read_dir(&src).await {
87        Ok(e) => e,
88        Err(e) => {
89            tracing::error!("Cannot open directory {src:?} for reading: {e}");
90            error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
91            if settings.fail_early {
92                return Err(e.into());
93            }
94            return Ok(());
95        }
96    };
97    loop {
98        match entries.next_entry().await {
99            Ok(Some(_entry)) => {
100                entry_count += 1;
101            }
102            Ok(None) => break,
103            Err(e) => {
104                tracing::error!("Failed traversing src directory {src:?}: {e}");
105                error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
106                if settings.fail_early {
107                    return Err(e.into());
108                }
109                break;
110            }
111        }
112    }
113    let dir = remote::protocol::SourceMessage::DirStub {
114        src: src.to_path_buf(),
115        dst: dst.to_path_buf(),
116        num_entries: entry_count,
117    };
118    tracing::debug!(
119        "Sending directory stub: {:?} -> {:?}, with {} entries",
120        &src,
121        dst,
122        entry_count
123    );
124    control_send_stream
125        .lock()
126        .await
127        .send_batch_message(&dir)
128        .await?;
129    let mut entries = match tokio::fs::read_dir(&src).await {
130        Ok(e) => e,
131        Err(e) => {
132            tracing::error!("Cannot open directory {src:?} for reading: {e}");
133            error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
134            if settings.fail_early {
135                return Err(e.into());
136            }
137            return Ok(());
138        }
139    };
140    loop {
141        match entries.next_entry().await {
142            Ok(Some(entry)) => {
143                assert!(
144                    entry_count > 0,
145                    "Entry count for {src:?} is out of sync, was it modified during the copy?"
146                );
147                entry_count -= 1;
148                let entry_path = entry.path();
149                let entry_name = entry_path.file_name().unwrap();
150                let dst_path = dst.join(entry_name);
151                if let Err(e) = send_directories_and_symlinks(
152                    settings,
153                    &entry_path,
154                    &dst_path,
155                    false,
156                    control_send_stream,
157                    connection,
158                    error_occurred,
159                )
160                .await
161                {
162                    tracing::error!("Failed to send directory/symlink {entry_path:?}: {e}");
163                    error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
164                    if settings.fail_early {
165                        return Err(e);
166                    }
167                }
168            }
169            Ok(None) => break,
170            Err(e) => {
171                tracing::error!("Failed traversing src directory {src:?}: {e}");
172                error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
173                if settings.fail_early {
174                    return Err(e.into());
175                }
176                break;
177            }
178        }
179    }
180    assert!(
181        entry_count == 0,
182        "Entry count for {src:?} is out of sync, was it modified during the copy?"
183    );
184    Ok(())
185}
186
187#[instrument(skip(error_occurred))]
188#[async_recursion]
189async fn send_fs_objects(
190    settings: &common::copy::Settings,
191    src: &std::path::Path,
192    dst: &std::path::Path,
193    control_send_stream: remote::streams::SharedSendStream,
194    connection: remote::streams::Connection,
195    error_occurred: std::sync::Arc<std::sync::atomic::AtomicBool>,
196) -> anyhow::Result<()> {
197    tracing::info!("Sending data from {:?} to {:?}", src, dst);
198    let src_metadata = match if settings.dereference {
199        tokio::fs::metadata(src).await
200    } else {
201        tokio::fs::symlink_metadata(src).await
202    } {
203        Ok(m) => m,
204        Err(e) => {
205            tracing::error!("Failed reading metadata from src {src:?}: {e}");
206            error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
207            return Err(e.into());
208        }
209    };
210    if !src_metadata.is_file() {
211        if let Err(e) = send_directories_and_symlinks(
212            settings,
213            src,
214            dst,
215            true,
216            &control_send_stream,
217            &connection,
218            &error_occurred,
219        )
220        .await
221        {
222            tracing::error!("Failed to send directories and symlinks: {e}");
223            error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
224            if settings.fail_early {
225                return Err(e);
226            }
227        }
228    }
229    let mut stream = control_send_stream.lock().await;
230    stream
231        .send_control_message(&remote::protocol::SourceMessage::DirStructureComplete)
232        .await?;
233    if src_metadata.is_file() {
234        if let Err(e) = send_file(
235            settings,
236            src,
237            dst,
238            &src_metadata,
239            true,
240            connection,
241            &error_occurred,
242            control_send_stream.clone(),
243        )
244        .await
245        {
246            tracing::error!("Failed to send root file: {e}");
247            error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
248            if settings.fail_early {
249                return Err(e);
250            }
251        }
252    }
253    return Ok(());
254}
255
256#[instrument(skip(error_occurred, control_send_stream))]
257#[async_recursion]
258#[allow(clippy::too_many_arguments)]
259async fn send_file(
260    settings: &common::copy::Settings,
261    src: &std::path::Path,
262    dst: &std::path::Path,
263    src_metadata: &std::fs::Metadata,
264    is_root: bool,
265    connection: remote::streams::Connection,
266    error_occurred: &std::sync::Arc<std::sync::atomic::AtomicBool>,
267    control_send_stream: remote::streams::SharedSendStream,
268) -> anyhow::Result<()> {
269    let prog = progress();
270    let _ops_guard = prog.ops.guard();
271    let _open_file_guard = throttle::open_file_permit().await;
272    tracing::debug!("Sending file content for {:?}", src);
273    throttle::get_file_iops_tokens(settings.chunk_size, src_metadata.len()).await;
274    // open the file BEFORE opening the stream to avoid leaving destination waiting
275    let mut file = match tokio::fs::File::open(src).await {
276        Ok(f) => f,
277        Err(e) => {
278            tracing::error!("Failed to open file {src:?}: {e}");
279            error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
280            // notify destination that this file was skipped (for directory tracking)
281            if !is_root {
282                let skip_msg =
283                    remote::protocol::SourceMessage::FileSkipped(remote::protocol::SrcDst {
284                        src: src.to_path_buf(),
285                        dst: dst.to_path_buf(),
286                    });
287                control_send_stream
288                    .lock()
289                    .await
290                    .send_batch_message(&skip_msg)
291                    .await?;
292            }
293            if settings.fail_early {
294                return Err(e.into());
295            }
296            return Ok(());
297        }
298    };
299    let metadata = remote::protocol::Metadata::from(src_metadata);
300    let file_header = remote::protocol::File {
301        src: src.to_path_buf(),
302        dst: dst.to_path_buf(),
303        size: src_metadata.len(),
304        metadata,
305        is_root,
306    };
307    let mut file_send_stream = connection.open_uni().await?;
308    match file_send_stream
309        .send_message_with_data(&file_header, &mut file)
310        .await
311    {
312        Ok(_bytes_sent) => {
313            file_send_stream.close().await?;
314            prog.files_copied.inc();
315            prog.bytes_copied.add(src_metadata.len());
316            tracing::info!("Sent file: {:?} -> {:?}", src, dst);
317            Ok(())
318        }
319        Err(e) => {
320            tracing::error!("Failed to send file content for {src:?}: {e}");
321            error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
322            file_send_stream.close().await?;
323            if settings.fail_early {
324                Err(e)
325            } else {
326                Ok(())
327            }
328        }
329    }
330}
331
332#[instrument(skip(error_occurred, control_send_stream))]
333async fn send_files_in_directory(
334    settings: common::copy::Settings,
335    src: std::path::PathBuf,
336    dst: std::path::PathBuf,
337    connection: remote::streams::Connection,
338    error_occurred: std::sync::Arc<std::sync::atomic::AtomicBool>,
339    control_send_stream: remote::streams::SharedSendStream,
340) -> anyhow::Result<()> {
341    tracing::info!("Sending files from {src:?}");
342    let mut entries = match tokio::fs::read_dir(&src).await {
343        Ok(e) => e,
344        Err(e) => {
345            tracing::error!("Cannot open directory {src:?} for reading: {e}");
346            error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
347            if settings.fail_early {
348                return Err(e.into());
349            }
350            return Ok(());
351        }
352    };
353    let mut join_set = tokio::task::JoinSet::new();
354    loop {
355        match entries.next_entry().await {
356            Ok(Some(entry)) => {
357                let entry_path = entry.path();
358                let entry_name = entry_path.file_name().unwrap();
359                let dst_path = dst.join(entry_name);
360                let entry_metadata = match if settings.dereference {
361                    tokio::fs::metadata(&entry_path).await
362                } else {
363                    tokio::fs::symlink_metadata(&entry_path).await
364                } {
365                    Ok(m) => m,
366                    Err(e) => {
367                        tracing::error!("Failed reading metadata from {entry_path:?}: {e}");
368                        error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
369                        if settings.fail_early {
370                            return Err(e.into());
371                        }
372                        continue;
373                    }
374                };
375                if !entry_metadata.is_file() {
376                    continue;
377                }
378                throttle::get_ops_token().await;
379                let connection = connection.clone();
380                let error_flag = error_occurred.clone();
381                let control_stream = control_send_stream.clone();
382                join_set.spawn(async move {
383                    send_file(
384                        &settings,
385                        &entry_path,
386                        &dst_path,
387                        &entry_metadata,
388                        false,
389                        connection,
390                        &error_flag,
391                        control_stream,
392                    )
393                    .await
394                });
395            }
396            Ok(None) => break,
397            Err(e) => {
398                tracing::error!("Failed traversing src directory {src:?}: {e}");
399                error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
400                if settings.fail_early {
401                    return Err(e.into());
402                }
403                break;
404            }
405        }
406    }
407    drop(entries);
408    while let Some(res) = join_set.join_next().await {
409        match res {
410            Ok(Ok(())) => {}
411            Ok(Err(e)) => {
412                tracing::error!("Failed to send file from {src:?}: {e}");
413                error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
414                if settings.fail_early {
415                    return Err(e);
416                }
417            }
418            Err(e) => {
419                tracing::error!("Task panicked while sending file from {src:?}: {e}");
420                error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
421                if settings.fail_early {
422                    return Err(e.into());
423                }
424            }
425        }
426    }
427    Ok(())
428}
429
430#[instrument(skip(error_occurred))]
431async fn dispatch_control_messages(
432    settings: common::copy::Settings,
433    mut control_recv_stream: remote::streams::RecvStream,
434    control_send_stream: remote::streams::SharedSendStream,
435    connection: remote::streams::Connection,
436    src_root: std::path::PathBuf,
437    error_occurred: std::sync::Arc<std::sync::atomic::AtomicBool>,
438) -> anyhow::Result<()> {
439    let mut join_set = tokio::task::JoinSet::new();
440    while let Some(message) = control_recv_stream
441        .recv_object::<remote::protocol::DestinationMessage>()
442        .await?
443    {
444        match message {
445            remote::protocol::DestinationMessage::DirectoryCreated(confirmation) => {
446                tracing::info!(
447                    "Received directory creation confirmation for: {:?} -> {:?}",
448                    confirmation.src,
449                    confirmation.dst
450                );
451                let error_flag = error_occurred.clone();
452                join_set.spawn(send_files_in_directory(
453                    settings,
454                    confirmation.src.clone(),
455                    confirmation.dst.clone(),
456                    connection.clone(),
457                    error_flag,
458                    control_send_stream.clone(),
459                ));
460            }
461            remote::protocol::DestinationMessage::DirectoryFailed(failure) => {
462                tracing::warn!(
463                    "Received directory creation failure for: {:?} -> {:?}, skipping its contents",
464                    failure.src,
465                    failure.dst
466                );
467            }
468            remote::protocol::DestinationMessage::DirectoryComplete(completion) => {
469                tracing::info!(
470                    "Received directory completion for: {:?} -> {:?}",
471                    completion.src,
472                    completion.dst
473                );
474                // Send directory metadata
475                match if settings.dereference {
476                    tokio::fs::metadata(&completion.src).await
477                } else {
478                    tokio::fs::symlink_metadata(&completion.src).await
479                } {
480                    Ok(src_metadata) => {
481                        let metadata = remote::protocol::Metadata::from(&src_metadata);
482                        let is_root = completion.src == src_root;
483                        let dir_metadata = remote::protocol::SourceMessage::Directory {
484                            src: completion.src,
485                            dst: completion.dst,
486                            metadata,
487                            is_root,
488                        };
489                        tracing::debug!("Before sending directory metadata");
490                        {
491                            let mut stream = control_send_stream.lock().await;
492                            stream.send_control_message(&dir_metadata).await?;
493                        }
494                        tracing::debug!("Sent directory metadata");
495                    }
496                    Err(e) => {
497                        tracing::error!("Failed to read metadata from {:?}: {e}", completion.src);
498                        error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
499                        if settings.fail_early {
500                            return Err(e.into());
501                        }
502                    }
503                }
504            }
505            remote::protocol::DestinationMessage::DestinationDone => {
506                tracing::info!("Received destination done message");
507                let mut stream = control_send_stream.lock().await;
508                stream
509                    .send_control_message(&remote::protocol::SourceMessage::SourceDone)
510                    .await?;
511                tracing::info!("Closing control send stream");
512                stream.close().await?;
513                tracing::info!("Sent source done message");
514                break;
515            }
516        }
517        // opportunistically cleanup finished tasks
518        while let Some(result) = join_set.try_join_next() {
519            match result {
520                Ok(Ok(())) => {}
521                Ok(Err(e)) => {
522                    tracing::error!("Task failed: {e}");
523                    error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
524                    if settings.fail_early {
525                        return Err(e);
526                    }
527                }
528                Err(e) => {
529                    tracing::error!("Task panicked: {e}");
530                    error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
531                    if settings.fail_early {
532                        return Err(e.into());
533                    }
534                }
535            }
536        }
537    }
538    while let Some(result) = join_set.join_next().await {
539        match result {
540            Ok(Ok(())) => {}
541            Ok(Err(e)) => {
542                tracing::error!("Task failed: {e}");
543                error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
544                if settings.fail_early {
545                    return Err(e);
546                }
547            }
548            Err(e) => {
549                tracing::error!("Task panicked: {e}");
550                error_occurred.store(true, std::sync::atomic::Ordering::Relaxed);
551                if settings.fail_early {
552                    return Err(e.into());
553                }
554            }
555        }
556    }
557    tracing::info!("Closing control recv stream");
558    control_recv_stream.close().await;
559    tracing::info!("Finished dispatching control messages");
560    Ok(())
561}
562
563async fn handle_connection(
564    conn: quinn::Connecting,
565    settings: &common::copy::Settings,
566    src: &std::path::Path,
567    dst: &std::path::Path,
568    error_occurred: std::sync::Arc<std::sync::atomic::AtomicBool>,
569) -> anyhow::Result<()> {
570    let connection = conn.await?;
571    tracing::info!("Destination connection established");
572    let connection = remote::streams::Connection::new(connection);
573    let (control_send_stream, control_recv_stream) = connection.open_bi().await?;
574    tracing::info!("Opened streams for directory transfer");
575    let dispatch_task = tokio::spawn(dispatch_control_messages(
576        *settings,
577        control_recv_stream,
578        control_send_stream.clone(),
579        connection.clone(),
580        src.to_path_buf(),
581        error_occurred.clone(),
582    ));
583    let send_result = send_fs_objects(
584        settings,
585        src,
586        dst,
587        control_send_stream,
588        connection.clone(),
589        error_occurred.clone(),
590    )
591    .await;
592    // if sending failed, close connection to unblock destination immediately
593    if send_result.is_err() {
594        connection.close();
595    }
596    send_result?;
597    dispatch_task.await??;
598    tracing::info!("Data sent successfully");
599    Ok(())
600}
601
602#[instrument]
603pub async fn run_source(
604    master_send_stream: remote::streams::SharedSendStream,
605    src: &std::path::Path,
606    dst: &std::path::Path,
607    settings: &common::copy::Settings,
608    quic_config: &remote::QuicConfig,
609) -> anyhow::Result<(String, common::copy::Summary)> {
610    let (server_endpoint, cert_fingerprint) = remote::get_server_with_port_ranges(
611        quic_config.port_ranges.as_deref(),
612        quic_config.idle_timeout_sec,
613        quic_config.keep_alive_interval_sec,
614    )?;
615    let server_addr = remote::get_endpoint_addr(&server_endpoint)?;
616    tracing::info!("Source server listening on {}", server_addr);
617    let master_hello = remote::protocol::SourceMasterHello {
618        source_addr: server_addr,
619        server_name: remote::get_random_server_name(),
620        cert_fingerprint,
621    };
622    tracing::info!("Sending master hello: {:?}", master_hello);
623    master_send_stream
624        .lock()
625        .await
626        .send_control_message(&master_hello)
627        .await?;
628    tracing::info!("Waiting for connection from destination");
629    // wait for destination to connect with a timeout
630    // destination should connect within a reasonable time after receiving the source address
631    let error_occurred = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
632    let accept_timeout = std::time::Duration::from_secs(quic_config.conn_timeout_sec);
633    match tokio::time::timeout(accept_timeout, server_endpoint.accept()).await {
634        Ok(Some(conn)) => {
635            tracing::info!("New destination connection incoming");
636            handle_connection(conn, settings, src, dst, error_occurred.clone()).await?;
637        }
638        Ok(None) => {
639            tracing::error!("Server endpoint closed unexpectedly");
640            return Err(anyhow::anyhow!("Server endpoint closed unexpectedly"));
641        }
642        Err(_) => {
643            tracing::error!(
644                "Timed out waiting for destination to connect after {:?}. \
645                This usually means the destination cannot reach the source. \
646                Check network connectivity and firewall rules.",
647                accept_timeout
648            );
649            return Err(anyhow::anyhow!(
650                "Timed out waiting for destination to connect after {:?}",
651                accept_timeout
652            ));
653        }
654    }
655    tracing::info!("Source is done");
656    server_endpoint.wait_idle().await;
657    // source doesn't track summary - destination is authoritative
658    if error_occurred.load(std::sync::atomic::Ordering::Relaxed) {
659        Err(common::copy::Error {
660            source: anyhow::anyhow!("Some operations failed during remote copy"),
661            summary: common::copy::Summary::default(),
662        }
663        .into())
664    } else {
665        Ok(("source OK".to_string(), common::copy::Summary::default()))
666    }
667}