Skip to main content

remote/protocol/
mod.rs

1//! Remote copy protocol definitions for source-destination communication.
2//!
3//! # Protocol Overview
4//!
5//! The remote copy protocol uses TCP for communication between source and destination.
6//! The source listens on two ports: a control port for bidirectional messages and a
7//! data port for file transfers. Both sides exchange messages to coordinate directory
8//! creation, file transfers, and completion.
9//!
10//! See `docs/remote_protocol.md` for the full protocol specification.
11//!
12//! # Message Flow
13//!
14//! ```text
15//! Source                              Destination
16//!   |                                      |
17//!   |  ---- Directory(root, meta) -------> |  Create root, store metadata
18//!   |  ---- Directory(child, meta) ------> |  Create child, store metadata
19//!   |  ---- Symlink(...) ----------------> |  Create symlink
20//!   |  ---- DirStructureComplete --------> |  Structure complete
21//!   |                                      |
22//!   |  <--- DirectoryCreated(root) ------- |
23//!   |  <--- DirectoryCreated(child) ------ |
24//!   |                                      |
25//!   |  ~~~~ File(f, total=N) ~~~~~~~~~~~~> |  Write file, track count
26//!   |  ~~~~ File(...) ~~~~~~~~~~~~~~~~~~-> |  ...
27//!   |                                      |  All files done → apply metadata
28//!   |                                      |
29//!   |  <--- DestinationDone -------------- |  Close send side
30//!   |  (close send side)                   |  (detect EOF)
31//!   |  (detect EOF)                        |  Close connection
32//! ```
33//!
34//! # Error Communication
35//!
36//! The protocol uses asymmetric error communication:
37//! - **Source → Destination**: Must communicate failures (FileSkipped, SymlinkSkipped)
38//!   so destination can track file counts correctly
39//! - **Destination → Source**: Does NOT communicate failures. Destination handles
40//!   errors locally and source continues sending the full structure.
41//!
42//! # Shutdown Sequence
43//!
44//! Shutdown is coordinated through TCP connection closure:
45//! 1. Destination sends `DestinationDone` and closes its send side
46//! 2. Source detects EOF on recv, closes its send side
47//! 3. Destination detects EOF on recv, closes connection
48
49use serde::{Deserialize, Serialize};
50use std::os::unix::fs::MetadataExt;
51use std::os::unix::prelude::PermissionsExt;
52
53#[derive(Clone, Debug, Deserialize, Serialize)]
54pub struct Metadata {
55    pub mode: u32,
56    pub uid: u32,
57    pub gid: u32,
58    pub atime: i64,
59    pub mtime: i64,
60    pub atime_nsec: i64,
61    pub mtime_nsec: i64,
62}
63
64impl common::preserve::Metadata for Metadata {
65    fn uid(&self) -> u32 {
66        self.uid
67    }
68    fn gid(&self) -> u32 {
69        self.gid
70    }
71    fn atime(&self) -> i64 {
72        self.atime
73    }
74    fn atime_nsec(&self) -> i64 {
75        self.atime_nsec
76    }
77    fn mtime(&self) -> i64 {
78        self.mtime
79    }
80    fn mtime_nsec(&self) -> i64 {
81        self.mtime_nsec
82    }
83    fn permissions(&self) -> std::fs::Permissions {
84        std::fs::Permissions::from_mode(self.mode)
85    }
86}
87
88impl common::preserve::Metadata for &Metadata {
89    fn uid(&self) -> u32 {
90        (*self).uid()
91    }
92    fn gid(&self) -> u32 {
93        (*self).gid()
94    }
95    fn atime(&self) -> i64 {
96        (*self).atime()
97    }
98    fn atime_nsec(&self) -> i64 {
99        (*self).atime_nsec()
100    }
101    fn mtime(&self) -> i64 {
102        (*self).mtime()
103    }
104    fn mtime_nsec(&self) -> i64 {
105        (*self).mtime_nsec()
106    }
107    fn permissions(&self) -> std::fs::Permissions {
108        (*self).permissions()
109    }
110}
111
112impl From<&std::fs::Metadata> for Metadata {
113    fn from(metadata: &std::fs::Metadata) -> Self {
114        Metadata {
115            mode: metadata.mode(),
116            uid: metadata.uid(),
117            gid: metadata.gid(),
118            atime: metadata.atime(),
119            mtime: metadata.mtime(),
120            atime_nsec: metadata.atime_nsec(),
121            mtime_nsec: metadata.mtime_nsec(),
122        }
123    }
124}
125
126/// File header sent on unidirectional streams, followed by raw file data.
127#[derive(Debug, Deserialize, Serialize)]
128pub struct File {
129    pub src: std::path::PathBuf,
130    pub dst: std::path::PathBuf,
131    pub size: u64,
132    pub metadata: Metadata,
133    pub is_root: bool,
134}
135
136/// Wrapper that includes size for comparison purposes.
137#[derive(Debug)]
138pub struct FileMetadata<'a> {
139    pub metadata: &'a Metadata,
140    pub size: u64,
141}
142
143impl<'a> common::preserve::Metadata for FileMetadata<'a> {
144    fn uid(&self) -> u32 {
145        self.metadata.uid()
146    }
147    fn gid(&self) -> u32 {
148        self.metadata.gid()
149    }
150    fn atime(&self) -> i64 {
151        self.metadata.atime()
152    }
153    fn atime_nsec(&self) -> i64 {
154        self.metadata.atime_nsec()
155    }
156    fn mtime(&self) -> i64 {
157        self.metadata.mtime()
158    }
159    fn mtime_nsec(&self) -> i64 {
160        self.metadata.mtime_nsec()
161    }
162    fn permissions(&self) -> std::fs::Permissions {
163        self.metadata.permissions()
164    }
165    fn size(&self) -> u64 {
166        self.size
167    }
168}
169
170/// Messages sent from source to destination on the control stream.
171#[derive(Debug, Deserialize, Serialize)]
172pub enum SourceMessage {
173    /// Create directory, store metadata, and declare entry counts for completion tracking.
174    /// Sent during directory tree traversal in depth-first order. Source pre-reads the
175    /// directory children before sending, so counts are known at send time.
176    Directory {
177        src: std::path::PathBuf,
178        dst: std::path::PathBuf,
179        metadata: Metadata,
180        is_root: bool,
181        /// total child entries (files + directories + symlinks) for completion tracking
182        entry_count: usize,
183        /// number of child files, echoed back via `DirectoryCreated` for file sending
184        file_count: usize,
185        /// whether to keep this directory if it ends up empty after filtering
186        keep_if_empty: bool,
187    },
188    /// Create symlink with metadata.
189    Symlink {
190        src: std::path::PathBuf,
191        dst: std::path::PathBuf,
192        target: std::path::PathBuf,
193        metadata: Metadata,
194        is_root: bool,
195    },
196    /// Signal that all directories and symlinks have been sent.
197    /// Required before destination can send `DestinationDone`.
198    /// `has_root_item` indicates whether a root file/directory/symlink will be sent.
199    /// When false (dry-run or filtered root), destination can mark root as complete.
200    DirStructureComplete { has_root_item: bool },
201    /// Notify destination that a file failed to send.
202    /// Counts as a processed entry for the parent directory's completion tracking.
203    FileSkipped {
204        src: std::path::PathBuf,
205        dst: std::path::PathBuf,
206    },
207    /// Notify destination that a symlink failed to read.
208    /// If `is_root` is true, this signals that root processing is complete (even if failed).
209    /// Non-root skipped symlinks count as a processed entry for the parent directory.
210    SymlinkSkipped { src_dst: SrcDst, is_root: bool },
211}
212
213#[derive(Clone, Debug, Deserialize, Serialize)]
214pub struct SrcDst {
215    pub src: std::path::PathBuf,
216    pub dst: std::path::PathBuf,
217}
218
219/// Messages sent from destination to source on the control stream.
220#[derive(Clone, Debug, Deserialize, Serialize)]
221pub enum DestinationMessage {
222    /// Confirm directory created, request file transfers.
223    /// `file_count` is echoed back from the `Directory` message so source knows
224    /// how many files to send from this directory.
225    DirectoryCreated {
226        src: std::path::PathBuf,
227        dst: std::path::PathBuf,
228        file_count: usize,
229    },
230    /// Signal destination has finished all operations.
231    /// Initiates graceful shutdown via stream closure.
232    DestinationDone,
233}
234
235#[derive(Clone, Debug, Deserialize, Serialize)]
236pub struct RcpdConfig {
237    pub verbose: u8,
238    pub fail_early: bool,
239    pub max_workers: usize,
240    pub max_blocking_threads: usize,
241    pub max_open_files: Option<usize>,
242    pub ops_throttle: usize,
243    pub iops_throttle: usize,
244    pub chunk_size: usize,
245    // common::copy::Settings
246    pub dereference: bool,
247    pub overwrite: bool,
248    pub overwrite_compare: String,
249    pub debug_log_prefix: Option<String>,
250    /// Port ranges for TCP connections (e.g., "8000-8999,9000-9999")
251    pub port_ranges: Option<String>,
252    pub progress: bool,
253    pub progress_delay: Option<String>,
254    pub remote_copy_conn_timeout_sec: u64,
255    /// Network profile for buffer sizing
256    pub network_profile: crate::NetworkProfile,
257    /// Buffer size for file transfers (defaults to profile-specific value)
258    pub buffer_size: Option<usize>,
259    /// Maximum concurrent connections in the pool
260    pub max_connections: usize,
261    /// Multiplier for pending file writes (max pending = max_connections × multiplier)
262    pub pending_writes_multiplier: usize,
263    /// Chrome trace output prefix for profiling
264    pub chrome_trace_prefix: Option<String>,
265    /// Flamegraph output prefix for profiling
266    pub flamegraph_prefix: Option<String>,
267    /// Log level for profiling (default: trace when profiling is enabled)
268    pub profile_level: Option<String>,
269    /// Enable tokio-console
270    pub tokio_console: bool,
271    /// Port for tokio-console server
272    pub tokio_console_port: Option<u16>,
273    /// Enable TLS encryption (default: true)
274    pub encryption: bool,
275    /// Master's certificate fingerprint for client authentication (when encryption enabled)
276    pub master_cert_fingerprint: Option<CertFingerprint>,
277}
278
279impl RcpdConfig {
280    pub fn to_args(&self) -> Vec<String> {
281        let mut args = vec![
282            format!("--max-workers={}", self.max_workers),
283            format!("--max-blocking-threads={}", self.max_blocking_threads),
284            format!("--ops-throttle={}", self.ops_throttle),
285            format!("--iops-throttle={}", self.iops_throttle),
286            format!("--chunk-size={}", self.chunk_size),
287            format!("--overwrite-compare={}", self.overwrite_compare),
288        ];
289        if self.verbose > 0 {
290            args.push(format!("-{}", "v".repeat(self.verbose as usize)));
291        }
292        if self.fail_early {
293            args.push("--fail-early".to_string());
294        }
295        if let Some(v) = self.max_open_files {
296            args.push(format!("--max-open-files={v}"));
297        }
298        if self.dereference {
299            args.push("--dereference".to_string());
300        }
301        if self.overwrite {
302            args.push("--overwrite".to_string());
303        }
304        if let Some(ref prefix) = self.debug_log_prefix {
305            args.push(format!("--debug-log-prefix={prefix}"));
306        }
307        if let Some(ref ranges) = self.port_ranges {
308            args.push(format!("--port-ranges={ranges}"));
309        }
310        if self.progress {
311            args.push("--progress".to_string());
312        }
313        if let Some(ref delay) = self.progress_delay {
314            args.push(format!("--progress-delay={delay}"));
315        }
316        args.push(format!(
317            "--remote-copy-conn-timeout-sec={}",
318            self.remote_copy_conn_timeout_sec
319        ));
320        // network profile
321        args.push(format!("--network-profile={}", self.network_profile));
322        // tcp tuning (only if set)
323        if let Some(v) = self.buffer_size {
324            args.push(format!("--buffer-size={v}"));
325        }
326        args.push(format!("--max-connections={}", self.max_connections));
327        args.push(format!(
328            "--pending-writes-multiplier={}",
329            self.pending_writes_multiplier
330        ));
331        // profiling options (only add --profile-level when profiling is enabled)
332        let profiling_enabled =
333            self.chrome_trace_prefix.is_some() || self.flamegraph_prefix.is_some();
334        if let Some(ref prefix) = self.chrome_trace_prefix {
335            args.push(format!("--chrome-trace={prefix}"));
336        }
337        if let Some(ref prefix) = self.flamegraph_prefix {
338            args.push(format!("--flamegraph={prefix}"));
339        }
340        if profiling_enabled {
341            if let Some(ref level) = self.profile_level {
342                args.push(format!("--profile-level={level}"));
343            }
344        }
345        if self.tokio_console {
346            args.push("--tokio-console".to_string());
347        }
348        if let Some(port) = self.tokio_console_port {
349            args.push(format!("--tokio-console-port={port}"));
350        }
351        if !self.encryption {
352            args.push("--no-encryption".to_string());
353        }
354        if let Some(fp) = self.master_cert_fingerprint {
355            args.push(format!(
356                "--master-cert-fp={}",
357                crate::tls::fingerprint_to_hex(&fp)
358            ));
359        }
360        args
361    }
362}
363
364#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq)]
365pub enum RcpdRole {
366    Source,
367    Destination,
368}
369
370impl std::fmt::Display for RcpdRole {
371    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372        match self {
373            RcpdRole::Source => write!(f, "source"),
374            RcpdRole::Destination => write!(f, "destination"),
375        }
376    }
377}
378
379impl std::str::FromStr for RcpdRole {
380    type Err = anyhow::Error;
381    fn from_str(s: &str) -> Result<Self, Self::Err> {
382        match s.to_lowercase().as_str() {
383            "source" => Ok(RcpdRole::Source),
384            "destination" | "dest" => Ok(RcpdRole::Destination),
385            _ => Err(anyhow::anyhow!("invalid role: {}", s)),
386        }
387    }
388}
389
390#[derive(Clone, Debug, Deserialize, Serialize)]
391pub struct TracingHello {
392    pub role: RcpdRole,
393    /// true for tracing/progress connection, false for control connection
394    pub is_tracing: bool,
395}
396
397/// TLS certificate fingerprint (SHA-256 of DER-encoded certificate).
398pub type CertFingerprint = [u8; 32];
399
400#[derive(Clone, Debug, Deserialize, Serialize)]
401pub enum MasterHello {
402    Source {
403        src: std::path::PathBuf,
404        dst: std::path::PathBuf,
405        /// Destination's TLS certificate fingerprint (None if encryption disabled)
406        dest_cert_fingerprint: Option<CertFingerprint>,
407        /// Filter settings for include/exclude patterns (source-side filtering)
408        filter: Option<common::filter::FilterSettings>,
409        /// Dry-run mode for previewing operations
410        dry_run: Option<common::config::DryRunMode>,
411    },
412    Destination {
413        /// TCP address for control connection to source
414        source_control_addr: std::net::SocketAddr,
415        /// TCP address for data connections to source
416        source_data_addr: std::net::SocketAddr,
417        server_name: String,
418        preserve: common::preserve::Settings,
419        /// Source's TLS certificate fingerprint (None if encryption disabled)
420        source_cert_fingerprint: Option<CertFingerprint>,
421    },
422}
423
424#[derive(Clone, Debug, Deserialize, Serialize)]
425pub struct SourceMasterHello {
426    /// TCP address for control connection (bidirectional messages)
427    pub control_addr: std::net::SocketAddr,
428    /// TCP address for data connections (file transfers)
429    pub data_addr: std::net::SocketAddr,
430    pub server_name: String,
431}
432
433// re-export RuntimeStats from common for convenience
434pub use common::RuntimeStats;
435
436#[derive(Clone, Debug, Deserialize, Serialize)]
437pub enum RcpdResult {
438    Success {
439        message: String,
440        summary: common::copy::Summary,
441        runtime_stats: common::RuntimeStats,
442    },
443    Failure {
444        error: String,
445        summary: common::copy::Summary,
446        runtime_stats: common::RuntimeStats,
447    },
448}