1use serde::{Deserialize, Serialize};
50use std::os::unix::fs::MetadataExt;
51use std::os::unix::prelude::PermissionsExt;
52
53#[derive(Clone, Debug, Deserialize, Serialize)]
54pub struct Metadata {
55 pub mode: u32,
56 pub uid: u32,
57 pub gid: u32,
58 pub atime: i64,
59 pub mtime: i64,
60 pub atime_nsec: i64,
61 pub mtime_nsec: i64,
62}
63
64impl common::preserve::Metadata for Metadata {
65 fn uid(&self) -> u32 {
66 self.uid
67 }
68 fn gid(&self) -> u32 {
69 self.gid
70 }
71 fn atime(&self) -> i64 {
72 self.atime
73 }
74 fn atime_nsec(&self) -> i64 {
75 self.atime_nsec
76 }
77 fn mtime(&self) -> i64 {
78 self.mtime
79 }
80 fn mtime_nsec(&self) -> i64 {
81 self.mtime_nsec
82 }
83 fn permissions(&self) -> std::fs::Permissions {
84 std::fs::Permissions::from_mode(self.mode)
85 }
86}
87
88impl common::preserve::Metadata for &Metadata {
89 fn uid(&self) -> u32 {
90 (*self).uid()
91 }
92 fn gid(&self) -> u32 {
93 (*self).gid()
94 }
95 fn atime(&self) -> i64 {
96 (*self).atime()
97 }
98 fn atime_nsec(&self) -> i64 {
99 (*self).atime_nsec()
100 }
101 fn mtime(&self) -> i64 {
102 (*self).mtime()
103 }
104 fn mtime_nsec(&self) -> i64 {
105 (*self).mtime_nsec()
106 }
107 fn permissions(&self) -> std::fs::Permissions {
108 (*self).permissions()
109 }
110}
111
112impl From<&std::fs::Metadata> for Metadata {
113 fn from(metadata: &std::fs::Metadata) -> Self {
114 Metadata {
115 mode: metadata.mode(),
116 uid: metadata.uid(),
117 gid: metadata.gid(),
118 atime: metadata.atime(),
119 mtime: metadata.mtime(),
120 atime_nsec: metadata.atime_nsec(),
121 mtime_nsec: metadata.mtime_nsec(),
122 }
123 }
124}
125
126#[derive(Debug, Deserialize, Serialize)]
128pub struct File {
129 pub src: std::path::PathBuf,
130 pub dst: std::path::PathBuf,
131 pub size: u64,
132 pub metadata: Metadata,
133 pub is_root: bool,
134}
135
136#[derive(Debug)]
138pub struct FileMetadata<'a> {
139 pub metadata: &'a Metadata,
140 pub size: u64,
141}
142
143impl<'a> common::preserve::Metadata for FileMetadata<'a> {
144 fn uid(&self) -> u32 {
145 self.metadata.uid()
146 }
147 fn gid(&self) -> u32 {
148 self.metadata.gid()
149 }
150 fn atime(&self) -> i64 {
151 self.metadata.atime()
152 }
153 fn atime_nsec(&self) -> i64 {
154 self.metadata.atime_nsec()
155 }
156 fn mtime(&self) -> i64 {
157 self.metadata.mtime()
158 }
159 fn mtime_nsec(&self) -> i64 {
160 self.metadata.mtime_nsec()
161 }
162 fn permissions(&self) -> std::fs::Permissions {
163 self.metadata.permissions()
164 }
165 fn size(&self) -> u64 {
166 self.size
167 }
168}
169
170#[derive(Debug, Deserialize, Serialize)]
172pub enum SourceMessage {
173 Directory {
177 src: std::path::PathBuf,
178 dst: std::path::PathBuf,
179 metadata: Metadata,
180 is_root: bool,
181 entry_count: usize,
183 file_count: usize,
185 keep_if_empty: bool,
187 },
188 Symlink {
190 src: std::path::PathBuf,
191 dst: std::path::PathBuf,
192 target: std::path::PathBuf,
193 metadata: Metadata,
194 is_root: bool,
195 },
196 DirStructureComplete { has_root_item: bool },
201 FileSkipped {
204 src: std::path::PathBuf,
205 dst: std::path::PathBuf,
206 },
207 SymlinkSkipped { src_dst: SrcDst, is_root: bool },
211}
212
213#[derive(Clone, Debug, Deserialize, Serialize)]
214pub struct SrcDst {
215 pub src: std::path::PathBuf,
216 pub dst: std::path::PathBuf,
217}
218
219#[derive(Clone, Debug, Deserialize, Serialize)]
221pub enum DestinationMessage {
222 DirectoryCreated {
226 src: std::path::PathBuf,
227 dst: std::path::PathBuf,
228 file_count: usize,
229 },
230 DestinationDone,
233}
234
235#[derive(Clone, Debug, Deserialize, Serialize)]
236pub struct RcpdConfig {
237 pub verbose: u8,
238 pub fail_early: bool,
239 pub max_workers: usize,
240 pub max_blocking_threads: usize,
241 pub max_open_files: Option<usize>,
242 pub ops_throttle: usize,
243 pub iops_throttle: usize,
244 pub chunk_size: usize,
245 pub auto_meta: Option<common::AutoMetaThrottleConfig>,
249 pub auto_meta_histogram: bool,
251 pub auto_meta_histogram_log: Option<String>,
255 pub auto_meta_histogram_interval: std::time::Duration,
257 pub dereference: bool,
259 pub overwrite: bool,
260 pub overwrite_compare: String,
261 pub overwrite_filter: Option<String>,
262 pub ignore_existing: bool,
263 pub skip_specials: bool,
264 pub debug_log_prefix: Option<String>,
265 pub port_ranges: Option<String>,
267 pub progress: bool,
268 pub progress_delay: Option<String>,
269 pub remote_copy_conn_timeout_sec: u64,
270 pub network_profile: crate::NetworkProfile,
272 pub buffer_size: Option<usize>,
274 pub max_connections: usize,
276 pub pending_writes_multiplier: usize,
278 pub chrome_trace_prefix: Option<String>,
280 pub flamegraph_prefix: Option<String>,
282 pub profile_level: Option<String>,
284 pub tokio_console: bool,
286 pub tokio_console_port: Option<u16>,
288 pub encryption: bool,
290 pub master_cert_fingerprint: Option<CertFingerprint>,
292}
293
294impl RcpdConfig {
295 pub fn to_args(&self) -> Vec<String> {
296 let mut args = vec![
297 format!("--max-workers={}", self.max_workers),
298 format!("--max-blocking-threads={}", self.max_blocking_threads),
299 format!("--ops-throttle={}", self.ops_throttle),
300 format!("--iops-throttle={}", self.iops_throttle),
301 format!("--chunk-size={}", self.chunk_size),
302 format!("--overwrite-compare={}", self.overwrite_compare),
303 ];
304 if self.verbose > 0 {
305 args.push(format!("-{}", "v".repeat(self.verbose as usize)));
306 }
307 if self.fail_early {
308 args.push("--fail-early".to_string());
309 }
310 if let Some(v) = self.max_open_files {
311 args.push(format!("--max-open-files={v}"));
312 }
313 if self.dereference {
314 args.push("--dereference".to_string());
315 }
316 if self.overwrite {
317 args.push("--overwrite".to_string());
318 if let Some(ref filter) = self.overwrite_filter {
319 args.push(format!("--overwrite-filter={filter}"));
320 }
321 }
322 if self.ignore_existing {
323 args.push("--ignore-existing".to_string());
324 }
325 if self.skip_specials {
326 args.push("--skip-specials".to_string());
327 }
328 if let Some(ref prefix) = self.debug_log_prefix {
329 args.push(format!("--debug-log-prefix={prefix}"));
330 }
331 if let Some(ref ranges) = self.port_ranges {
332 args.push(format!("--port-ranges={ranges}"));
333 }
334 if self.progress {
335 args.push("--progress".to_string());
336 }
337 if let Some(ref delay) = self.progress_delay {
338 args.push(format!("--progress-delay={delay}"));
339 }
340 args.push(format!(
341 "--remote-copy-conn-timeout-sec={}",
342 self.remote_copy_conn_timeout_sec
343 ));
344 args.push(format!("--network-profile={}", self.network_profile));
346 if let Some(v) = self.buffer_size {
348 args.push(format!("--buffer-size={v}"));
349 }
350 args.push(format!("--max-connections={}", self.max_connections));
351 args.push(format!(
352 "--pending-writes-multiplier={}",
353 self.pending_writes_multiplier
354 ));
355 let profiling_enabled =
357 self.chrome_trace_prefix.is_some() || self.flamegraph_prefix.is_some();
358 if let Some(ref prefix) = self.chrome_trace_prefix {
359 args.push(format!("--chrome-trace={prefix}"));
360 }
361 if let Some(ref prefix) = self.flamegraph_prefix {
362 args.push(format!("--flamegraph={prefix}"));
363 }
364 if profiling_enabled && let Some(level) = &self.profile_level {
365 args.push(format!("--profile-level={level}"));
366 }
367 if self.tokio_console {
368 args.push("--tokio-console".to_string());
369 }
370 if let Some(port) = self.tokio_console_port {
371 args.push(format!("--tokio-console-port={port}"));
372 }
373 if !self.encryption {
374 args.push("--no-encryption".to_string());
375 }
376 if let Some(fp) = self.master_cert_fingerprint {
377 args.push(format!(
378 "--master-cert-fp={}",
379 crate::tls::fingerprint_to_hex(&fp)
380 ));
381 }
382 if let Some(auto) = &self.auto_meta {
385 args.push("--auto-meta-throttle".to_string());
386 args.push(format!("--auto-meta-initial-cwnd={}", auto.initial_cwnd));
387 args.push(format!("--auto-meta-min-cwnd={}", auto.min_cwnd));
388 args.push(format!("--auto-meta-max-cwnd={}", auto.max_cwnd));
389 args.push(format!("--auto-meta-alpha={}", auto.alpha));
390 args.push(format!("--auto-meta-beta={}", auto.beta));
391 args.push(format!(
392 "--auto-meta-baseline-percentile={}",
393 auto.baseline_percentile,
394 ));
395 args.push(format!(
396 "--auto-meta-current-percentile={}",
397 auto.current_percentile,
398 ));
399 args.push(format!("--auto-meta-increase-step={}", auto.increase_step));
400 args.push(format!("--auto-meta-decrease-step={}", auto.decrease_step));
401 args.push(format!(
402 "--auto-meta-long-window={}",
403 humantime::format_duration(auto.long_window),
404 ));
405 args.push(format!(
406 "--auto-meta-short-window={}",
407 humantime::format_duration(auto.short_window),
408 ));
409 args.push(format!(
410 "--auto-meta-tick-interval={}",
411 humantime::format_duration(auto.tick_interval),
412 ));
413 }
414 if let Some(path) = &self.auto_meta_histogram_log {
422 args.push(format!("--auto-meta-histogram-log={path}"));
423 args.push(format!(
424 "--auto-meta-histogram-interval={}",
425 humantime::format_duration(self.auto_meta_histogram_interval),
426 ));
427 }
428 args
429 }
430}
431
432#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq)]
433pub enum RcpdRole {
434 Source,
435 Destination,
436}
437
438impl std::fmt::Display for RcpdRole {
439 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
440 match self {
441 RcpdRole::Source => write!(f, "source"),
442 RcpdRole::Destination => write!(f, "destination"),
443 }
444 }
445}
446
447impl std::str::FromStr for RcpdRole {
448 type Err = anyhow::Error;
449 fn from_str(s: &str) -> Result<Self, Self::Err> {
450 match s.to_lowercase().as_str() {
451 "source" => Ok(RcpdRole::Source),
452 "destination" | "dest" => Ok(RcpdRole::Destination),
453 _ => Err(anyhow::anyhow!("invalid role: {}", s)),
454 }
455 }
456}
457
458#[derive(Clone, Debug, Deserialize, Serialize)]
459pub struct TracingHello {
460 pub role: RcpdRole,
461 pub is_tracing: bool,
463}
464
465pub type CertFingerprint = [u8; 32];
467
468#[derive(Clone, Debug, Deserialize, Serialize)]
469pub enum MasterHello {
470 Source {
471 src: std::path::PathBuf,
472 dst: std::path::PathBuf,
473 dest_cert_fingerprint: Option<CertFingerprint>,
475 filter: Option<common::filter::FilterSettings>,
477 dry_run: Option<common::config::DryRunMode>,
479 },
480 Destination {
481 source_control_addr: std::net::SocketAddr,
483 source_data_addr: std::net::SocketAddr,
485 server_name: String,
486 preserve: common::preserve::Settings,
487 source_cert_fingerprint: Option<CertFingerprint>,
489 },
490}
491
492#[derive(Clone, Debug, Deserialize, Serialize)]
493pub struct SourceMasterHello {
494 pub control_addr: std::net::SocketAddr,
496 pub data_addr: std::net::SocketAddr,
498 pub server_name: String,
499}
500
501pub use common::RuntimeStats;
503
504#[derive(Clone, Debug, Deserialize, Serialize)]
505pub enum RcpdResult {
506 Success {
507 message: String,
508 summary: common::copy::Summary,
509 runtime_stats: common::RuntimeStats,
510 },
511 Failure {
512 error: String,
513 summary: common::copy::Summary,
514 runtime_stats: common::RuntimeStats,
515 },
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521
522 fn minimal_rcpd_config() -> RcpdConfig {
523 RcpdConfig {
524 verbose: 0,
525 fail_early: false,
526 max_workers: 0,
527 max_blocking_threads: 0,
528 max_open_files: None,
529 ops_throttle: 0,
530 iops_throttle: 0,
531 chunk_size: 0,
532 auto_meta: None,
533 auto_meta_histogram: false,
534 auto_meta_histogram_log: None,
535 auto_meta_histogram_interval: std::time::Duration::from_secs(1),
536 dereference: false,
537 overwrite: false,
538 overwrite_compare: "size,mtime".to_string(),
539 overwrite_filter: None,
540 ignore_existing: false,
541 skip_specials: false,
542 debug_log_prefix: None,
543 port_ranges: None,
544 progress: false,
545 progress_delay: None,
546 remote_copy_conn_timeout_sec: 30,
547 network_profile: crate::NetworkProfile::default(),
548 buffer_size: None,
549 max_connections: 1,
550 pending_writes_multiplier: 1,
551 chrome_trace_prefix: None,
552 flamegraph_prefix: None,
553 profile_level: None,
554 tokio_console: false,
555 tokio_console_port: None,
556 encryption: true,
557 master_cert_fingerprint: None,
558 }
559 }
560
561 #[test]
562 fn to_args_omits_auto_meta_throttle_when_none() {
563 let args = minimal_rcpd_config().to_args();
564 let throttle_flags = [
566 "--auto-meta-throttle",
567 "--auto-meta-initial-cwnd",
568 "--auto-meta-min-cwnd",
569 "--auto-meta-max-cwnd",
570 "--auto-meta-alpha",
571 "--auto-meta-beta",
572 "--auto-meta-baseline-percentile",
573 "--auto-meta-current-percentile",
574 "--auto-meta-increase-step",
575 "--auto-meta-decrease-step",
576 "--auto-meta-long-window",
577 "--auto-meta-short-window",
578 "--auto-meta-tick-interval",
579 ];
580 for flag in throttle_flags {
581 assert!(
582 !args.iter().any(|a| a.starts_with(flag)),
583 "throttle flag {flag} should not be emitted when auto_meta is None: {args:?}",
584 );
585 }
586 for arg in &args {
588 assert!(
589 !arg.starts_with("--auto-meta-histogram"),
590 "must not emit any histogram flag when histograms are off, found: {arg}",
591 );
592 }
593 }
594
595 #[test]
596 fn to_args_propagates_all_auto_meta_fields() {
597 let mut config = minimal_rcpd_config();
598 config.auto_meta = Some(common::AutoMetaThrottleConfig {
599 initial_cwnd: 8,
600 min_cwnd: 2,
601 max_cwnd: 128,
602 alpha: 1.2,
603 beta: 1.6,
604 increase_step: 2,
605 decrease_step: 3,
606 baseline_percentile: 0.4,
607 current_percentile: 0.6,
608 long_window: std::time::Duration::from_secs(20),
609 short_window: std::time::Duration::from_secs(2),
610 tick_interval: std::time::Duration::from_millis(75),
611 });
612 let args = config.to_args();
613 let has = |needle: &str| args.iter().any(|a| a == needle);
614 let has_prefix = |needle: &str| args.iter().any(|a| a.starts_with(needle));
615 assert!(has("--auto-meta-throttle"));
616 assert!(has("--auto-meta-initial-cwnd=8"));
617 assert!(has("--auto-meta-min-cwnd=2"));
618 assert!(has("--auto-meta-max-cwnd=128"));
619 assert!(has_prefix("--auto-meta-alpha=1.2"));
620 assert!(has_prefix("--auto-meta-beta=1.6"));
621 assert!(has_prefix("--auto-meta-baseline-percentile=0.4"));
622 assert!(has_prefix("--auto-meta-current-percentile=0.6"));
623 assert!(has("--auto-meta-increase-step=2"));
624 assert!(has("--auto-meta-decrease-step=3"));
625 assert!(has_prefix("--auto-meta-long-window="));
626 assert!(has_prefix("--auto-meta-short-window="));
627 assert!(has_prefix("--auto-meta-tick-interval="));
628 }
629
630 #[test]
631 fn to_args_omits_histogram_flags_when_disabled() {
632 let mut config = minimal_rcpd_config();
636 config.auto_meta_histogram = false;
637 config.auto_meta_histogram_log = None;
638 let args = config.to_args();
639 for arg in &args {
640 assert!(
641 !arg.starts_with("--auto-meta-histogram"),
642 "must not emit histogram flag when disabled, found: {arg}",
643 );
644 }
645 }
646
647 #[test]
648 fn to_args_omits_panel_only_flag_when_no_log_path() {
649 let mut config = minimal_rcpd_config();
653 config.auto_meta_histogram = true;
654 config.auto_meta_histogram_log = None;
655 let args = config.to_args();
656 for arg in &args {
657 assert!(
658 !arg.starts_with("--auto-meta-histogram"),
659 "panel-only flag must not be forwarded to rcpd, found: {arg}",
660 );
661 }
662 }
663
664 #[test]
665 fn to_args_forwards_histogram_log_and_interval_when_log_path_set() {
666 let mut config = minimal_rcpd_config();
667 config.auto_meta_histogram = false; config.auto_meta_histogram_log = Some("/tmp/foo.hdr".into());
669 config.auto_meta_histogram_interval = std::time::Duration::from_millis(500);
670 let args = config.to_args();
671 assert!(
672 args.iter()
673 .any(|a| a == "--auto-meta-histogram-log=/tmp/foo.hdr")
674 );
675 assert!(
676 args.iter()
677 .any(|a| a.starts_with("--auto-meta-histogram-interval="))
678 );
679 assert!(
682 !args.iter().any(|a| a == "--auto-meta-histogram"),
683 "panel-only flag must not be forwarded; the log flag implies the pipeline",
684 );
685 }
686}