remote/protocol/
mod.rs

1//! Remote copy protocol definitions for source-destination communication.
2//!
3//! # Protocol Overview
4//!
5//! The remote copy protocol uses TCP for communication between source and destination.
6//! The source listens on two ports: a control port for bidirectional messages and a
7//! data port for file transfers. Both sides exchange messages to coordinate directory
8//! creation, file transfers, and completion.
9//!
10//! See `docs/remote_protocol.md` for the full protocol specification.
11//!
12//! # Message Flow
13//!
14//! ```text
15//! Source                              Destination
16//!   |                                      |
17//!   |  ---- Directory(root, meta) -------> |  Create root, store metadata
18//!   |  ---- Directory(child, meta) ------> |  Create child, store metadata
19//!   |  ---- Symlink(...) ----------------> |  Create symlink
20//!   |  ---- DirStructureComplete --------> |  Structure complete
21//!   |                                      |
22//!   |  <--- DirectoryCreated(root) ------- |
23//!   |  <--- DirectoryCreated(child) ------ |
24//!   |                                      |
25//!   |  ~~~~ File(f, total=N) ~~~~~~~~~~~~> |  Write file, track count
26//!   |  ~~~~ File(...) ~~~~~~~~~~~~~~~~~~-> |  ...
27//!   |                                      |  All files done → apply metadata
28//!   |                                      |
29//!   |  <--- DestinationDone -------------- |  Close send side
30//!   |  (close send side)                   |  (detect EOF)
31//!   |  (detect EOF)                        |  Close connection
32//! ```
33//!
34//! # Error Communication
35//!
36//! The protocol uses asymmetric error communication:
37//! - **Source → Destination**: Must communicate failures (FileSkipped, SymlinkSkipped)
38//!   so destination can track file counts correctly
39//! - **Destination → Source**: Does NOT communicate failures. Destination handles
40//!   errors locally and source continues sending the full structure.
41//!
42//! # Shutdown Sequence
43//!
44//! Shutdown is coordinated through TCP connection closure:
45//! 1. Destination sends `DestinationDone` and closes its send side
46//! 2. Source detects EOF on recv, closes its send side
47//! 3. Destination detects EOF on recv, closes connection
48
49use serde::{Deserialize, Serialize};
50use std::os::unix::fs::MetadataExt;
51use std::os::unix::prelude::PermissionsExt;
52
53#[derive(Clone, Debug, Deserialize, Serialize)]
54pub struct Metadata {
55    pub mode: u32,
56    pub uid: u32,
57    pub gid: u32,
58    pub atime: i64,
59    pub mtime: i64,
60    pub atime_nsec: i64,
61    pub mtime_nsec: i64,
62}
63
64impl common::preserve::Metadata for Metadata {
65    fn uid(&self) -> u32 {
66        self.uid
67    }
68    fn gid(&self) -> u32 {
69        self.gid
70    }
71    fn atime(&self) -> i64 {
72        self.atime
73    }
74    fn atime_nsec(&self) -> i64 {
75        self.atime_nsec
76    }
77    fn mtime(&self) -> i64 {
78        self.mtime
79    }
80    fn mtime_nsec(&self) -> i64 {
81        self.mtime_nsec
82    }
83    fn permissions(&self) -> std::fs::Permissions {
84        std::fs::Permissions::from_mode(self.mode)
85    }
86}
87
88impl common::preserve::Metadata for &Metadata {
89    fn uid(&self) -> u32 {
90        (*self).uid()
91    }
92    fn gid(&self) -> u32 {
93        (*self).gid()
94    }
95    fn atime(&self) -> i64 {
96        (*self).atime()
97    }
98    fn atime_nsec(&self) -> i64 {
99        (*self).atime_nsec()
100    }
101    fn mtime(&self) -> i64 {
102        (*self).mtime()
103    }
104    fn mtime_nsec(&self) -> i64 {
105        (*self).mtime_nsec()
106    }
107    fn permissions(&self) -> std::fs::Permissions {
108        (*self).permissions()
109    }
110}
111
112impl From<&std::fs::Metadata> for Metadata {
113    fn from(metadata: &std::fs::Metadata) -> Self {
114        Metadata {
115            mode: metadata.mode(),
116            uid: metadata.uid(),
117            gid: metadata.gid(),
118            atime: metadata.atime(),
119            mtime: metadata.mtime(),
120            atime_nsec: metadata.atime_nsec(),
121            mtime_nsec: metadata.mtime_nsec(),
122        }
123    }
124}
125
126/// File header sent on unidirectional streams, followed by raw file data.
127///
128/// The `dir_total_files` field tells destination how many files to expect
129/// for this file's parent directory. This is set when source iterates the
130/// directory (after receiving `DirectoryCreated`), ensuring accuracy even
131/// if directory contents change during the copy.
132#[derive(Debug, Deserialize, Serialize)]
133pub struct File {
134    pub src: std::path::PathBuf,
135    pub dst: std::path::PathBuf,
136    pub size: u64,
137    pub metadata: Metadata,
138    pub is_root: bool,
139    /// Total number of files in the parent directory (for tracking completion)
140    pub dir_total_files: usize,
141}
142
143/// Wrapper that includes size for comparison purposes.
144#[derive(Debug)]
145pub struct FileMetadata<'a> {
146    pub metadata: &'a Metadata,
147    pub size: u64,
148}
149
150impl<'a> common::preserve::Metadata for FileMetadata<'a> {
151    fn uid(&self) -> u32 {
152        self.metadata.uid()
153    }
154    fn gid(&self) -> u32 {
155        self.metadata.gid()
156    }
157    fn atime(&self) -> i64 {
158        self.metadata.atime()
159    }
160    fn atime_nsec(&self) -> i64 {
161        self.metadata.atime_nsec()
162    }
163    fn mtime(&self) -> i64 {
164        self.metadata.mtime()
165    }
166    fn mtime_nsec(&self) -> i64 {
167        self.metadata.mtime_nsec()
168    }
169    fn permissions(&self) -> std::fs::Permissions {
170        self.metadata.permissions()
171    }
172    fn size(&self) -> u64 {
173        self.size
174    }
175}
176
177/// Messages sent from source to destination on the control stream.
178#[derive(Debug, Deserialize, Serialize)]
179pub enum SourceMessage {
180    /// Create directory and store metadata for later application.
181    /// Sent during directory tree traversal in depth-first order.
182    Directory {
183        src: std::path::PathBuf,
184        dst: std::path::PathBuf,
185        metadata: Metadata,
186        is_root: bool,
187    },
188    /// Create symlink with metadata.
189    Symlink {
190        src: std::path::PathBuf,
191        dst: std::path::PathBuf,
192        target: std::path::PathBuf,
193        metadata: Metadata,
194        is_root: bool,
195    },
196    /// Signal that all directories and symlinks have been sent.
197    /// Required before destination can send `DestinationDone`.
198    DirStructureComplete,
199    /// Notify destination that a file failed to send.
200    /// Includes `dir_total_files` so destination can track file counts.
201    FileSkipped {
202        src: std::path::PathBuf,
203        dst: std::path::PathBuf,
204        dir_total_files: usize,
205    },
206    /// Notify destination that a symlink failed to read.
207    /// For logging purposes only (symlinks don't affect file counts).
208    /// If `is_root` is true, this signals that root processing is complete (even if failed).
209    SymlinkSkipped { src_dst: SrcDst, is_root: bool },
210    /// Notify destination that a directory contains no files.
211    /// Sent after receiving `DirectoryCreated` for an empty directory.
212    DirectoryEmpty {
213        src: std::path::PathBuf,
214        dst: std::path::PathBuf,
215    },
216}
217
218#[derive(Clone, Debug, Deserialize, Serialize)]
219pub struct SrcDst {
220    pub src: std::path::PathBuf,
221    pub dst: std::path::PathBuf,
222}
223
224/// Messages sent from destination to source on the control stream.
225#[derive(Clone, Debug, Deserialize, Serialize)]
226pub enum DestinationMessage {
227    /// Confirm directory created, request file transfers.
228    /// Triggers source to send files from this directory.
229    DirectoryCreated(SrcDst),
230    /// Signal destination has finished all operations.
231    /// Initiates graceful shutdown via stream closure.
232    DestinationDone,
233}
234
235#[derive(Clone, Debug, Deserialize, Serialize)]
236pub struct RcpdConfig {
237    pub verbose: u8,
238    pub fail_early: bool,
239    pub max_workers: usize,
240    pub max_blocking_threads: usize,
241    pub max_open_files: Option<usize>,
242    pub ops_throttle: usize,
243    pub iops_throttle: usize,
244    pub chunk_size: usize,
245    // common::copy::Settings
246    pub dereference: bool,
247    pub overwrite: bool,
248    pub overwrite_compare: String,
249    pub debug_log_prefix: Option<String>,
250    /// Port ranges for TCP connections (e.g., "8000-8999,9000-9999")
251    pub port_ranges: Option<String>,
252    pub progress: bool,
253    pub progress_delay: Option<String>,
254    pub remote_copy_conn_timeout_sec: u64,
255    /// Network profile for buffer sizing
256    pub network_profile: crate::NetworkProfile,
257    /// Buffer size for file transfers (defaults to profile-specific value)
258    pub buffer_size: Option<usize>,
259    /// Maximum concurrent connections in the pool
260    pub max_connections: usize,
261    /// Multiplier for pending file writes (max pending = max_connections × multiplier)
262    pub pending_writes_multiplier: usize,
263    /// Chrome trace output prefix for profiling
264    pub chrome_trace_prefix: Option<String>,
265    /// Flamegraph output prefix for profiling
266    pub flamegraph_prefix: Option<String>,
267    /// Log level for profiling (default: trace when profiling is enabled)
268    pub profile_level: Option<String>,
269    /// Enable tokio-console
270    pub tokio_console: bool,
271    /// Port for tokio-console server
272    pub tokio_console_port: Option<u16>,
273    /// Enable TLS encryption (default: true)
274    pub encryption: bool,
275    /// Master's certificate fingerprint for client authentication (when encryption enabled)
276    pub master_cert_fingerprint: Option<CertFingerprint>,
277}
278
279impl RcpdConfig {
280    pub fn to_args(&self) -> Vec<String> {
281        let mut args = vec![
282            format!("--max-workers={}", self.max_workers),
283            format!("--max-blocking-threads={}", self.max_blocking_threads),
284            format!("--ops-throttle={}", self.ops_throttle),
285            format!("--iops-throttle={}", self.iops_throttle),
286            format!("--chunk-size={}", self.chunk_size),
287            format!("--overwrite-compare={}", self.overwrite_compare),
288        ];
289        if self.verbose > 0 {
290            args.push(format!("-{}", "v".repeat(self.verbose as usize)));
291        }
292        if self.fail_early {
293            args.push("--fail-early".to_string());
294        }
295        if let Some(v) = self.max_open_files {
296            args.push(format!("--max-open-files={v}"));
297        }
298        if self.dereference {
299            args.push("--dereference".to_string());
300        }
301        if self.overwrite {
302            args.push("--overwrite".to_string());
303        }
304        if let Some(ref prefix) = self.debug_log_prefix {
305            args.push(format!("--debug-log-prefix={prefix}"));
306        }
307        if let Some(ref ranges) = self.port_ranges {
308            args.push(format!("--port-ranges={ranges}"));
309        }
310        if self.progress {
311            args.push("--progress".to_string());
312        }
313        if let Some(ref delay) = self.progress_delay {
314            args.push(format!("--progress-delay={delay}"));
315        }
316        args.push(format!(
317            "--remote-copy-conn-timeout-sec={}",
318            self.remote_copy_conn_timeout_sec
319        ));
320        // network profile
321        args.push(format!("--network-profile={}", self.network_profile));
322        // tcp tuning (only if set)
323        if let Some(v) = self.buffer_size {
324            args.push(format!("--buffer-size={v}"));
325        }
326        args.push(format!("--max-connections={}", self.max_connections));
327        args.push(format!(
328            "--pending-writes-multiplier={}",
329            self.pending_writes_multiplier
330        ));
331        // profiling options (only add --profile-level when profiling is enabled)
332        let profiling_enabled =
333            self.chrome_trace_prefix.is_some() || self.flamegraph_prefix.is_some();
334        if let Some(ref prefix) = self.chrome_trace_prefix {
335            args.push(format!("--chrome-trace={prefix}"));
336        }
337        if let Some(ref prefix) = self.flamegraph_prefix {
338            args.push(format!("--flamegraph={prefix}"));
339        }
340        if profiling_enabled {
341            if let Some(ref level) = self.profile_level {
342                args.push(format!("--profile-level={level}"));
343            }
344        }
345        if self.tokio_console {
346            args.push("--tokio-console".to_string());
347        }
348        if let Some(port) = self.tokio_console_port {
349            args.push(format!("--tokio-console-port={port}"));
350        }
351        if !self.encryption {
352            args.push("--no-encryption".to_string());
353        }
354        if let Some(fp) = self.master_cert_fingerprint {
355            args.push(format!(
356                "--master-cert-fp={}",
357                crate::tls::fingerprint_to_hex(&fp)
358            ));
359        }
360        args
361    }
362}
363
364#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq)]
365pub enum RcpdRole {
366    Source,
367    Destination,
368}
369
370impl std::fmt::Display for RcpdRole {
371    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372        match self {
373            RcpdRole::Source => write!(f, "source"),
374            RcpdRole::Destination => write!(f, "destination"),
375        }
376    }
377}
378
379impl std::str::FromStr for RcpdRole {
380    type Err = anyhow::Error;
381    fn from_str(s: &str) -> Result<Self, Self::Err> {
382        match s.to_lowercase().as_str() {
383            "source" => Ok(RcpdRole::Source),
384            "destination" | "dest" => Ok(RcpdRole::Destination),
385            _ => Err(anyhow::anyhow!("invalid role: {}", s)),
386        }
387    }
388}
389
390#[derive(Clone, Debug, Deserialize, Serialize)]
391pub struct TracingHello {
392    pub role: RcpdRole,
393    /// true for tracing/progress connection, false for control connection
394    pub is_tracing: bool,
395}
396
397/// TLS certificate fingerprint (SHA-256 of DER-encoded certificate).
398pub type CertFingerprint = [u8; 32];
399
400#[derive(Clone, Debug, Deserialize, Serialize)]
401pub enum MasterHello {
402    Source {
403        src: std::path::PathBuf,
404        dst: std::path::PathBuf,
405        /// Destination's TLS certificate fingerprint (None if encryption disabled)
406        dest_cert_fingerprint: Option<CertFingerprint>,
407    },
408    Destination {
409        /// TCP address for control connection to source
410        source_control_addr: std::net::SocketAddr,
411        /// TCP address for data connections to source
412        source_data_addr: std::net::SocketAddr,
413        server_name: String,
414        preserve: common::preserve::Settings,
415        /// Source's TLS certificate fingerprint (None if encryption disabled)
416        source_cert_fingerprint: Option<CertFingerprint>,
417    },
418}
419
420#[derive(Clone, Debug, Deserialize, Serialize)]
421pub struct SourceMasterHello {
422    /// TCP address for control connection (bidirectional messages)
423    pub control_addr: std::net::SocketAddr,
424    /// TCP address for data connections (file transfers)
425    pub data_addr: std::net::SocketAddr,
426    pub server_name: String,
427}
428
429// re-export RuntimeStats from common for convenience
430pub use common::RuntimeStats;
431
432#[derive(Clone, Debug, Deserialize, Serialize)]
433pub enum RcpdResult {
434    Success {
435        message: String,
436        summary: common::copy::Summary,
437        runtime_stats: common::RuntimeStats,
438    },
439    Failure {
440        error: String,
441        summary: common::copy::Summary,
442        runtime_stats: common::RuntimeStats,
443    },
444}