use serde::{Deserialize, Serialize};
use std::os::unix::fs::MetadataExt;
use std::os::unix::prelude::PermissionsExt;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Metadata {
pub mode: u32,
pub uid: u32,
pub gid: u32,
pub atime: i64,
pub mtime: i64,
pub atime_nsec: i64,
pub mtime_nsec: i64,
}
impl common::preserve::Metadata for Metadata {
fn uid(&self) -> u32 {
self.uid
}
fn gid(&self) -> u32 {
self.gid
}
fn atime(&self) -> i64 {
self.atime
}
fn atime_nsec(&self) -> i64 {
self.atime_nsec
}
fn mtime(&self) -> i64 {
self.mtime
}
fn mtime_nsec(&self) -> i64 {
self.mtime_nsec
}
fn permissions(&self) -> std::fs::Permissions {
std::fs::Permissions::from_mode(self.mode)
}
}
impl common::preserve::Metadata for &Metadata {
fn uid(&self) -> u32 {
(*self).uid()
}
fn gid(&self) -> u32 {
(*self).gid()
}
fn atime(&self) -> i64 {
(*self).atime()
}
fn atime_nsec(&self) -> i64 {
(*self).atime_nsec()
}
fn mtime(&self) -> i64 {
(*self).mtime()
}
fn mtime_nsec(&self) -> i64 {
(*self).mtime_nsec()
}
fn permissions(&self) -> std::fs::Permissions {
(*self).permissions()
}
}
impl From<&std::fs::Metadata> for Metadata {
fn from(metadata: &std::fs::Metadata) -> Self {
Metadata {
mode: metadata.mode(),
uid: metadata.uid(),
gid: metadata.gid(),
atime: metadata.atime(),
mtime: metadata.mtime(),
atime_nsec: metadata.atime_nsec(),
mtime_nsec: metadata.mtime_nsec(),
}
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct File {
pub src: std::path::PathBuf,
pub dst: std::path::PathBuf,
pub size: u64,
pub metadata: Metadata,
pub is_root: bool,
}
#[derive(Debug)]
pub struct FileMetadata<'a> {
pub metadata: &'a Metadata,
pub size: u64,
}
impl<'a> common::preserve::Metadata for FileMetadata<'a> {
fn uid(&self) -> u32 {
self.metadata.uid()
}
fn gid(&self) -> u32 {
self.metadata.gid()
}
fn atime(&self) -> i64 {
self.metadata.atime()
}
fn atime_nsec(&self) -> i64 {
self.metadata.atime_nsec()
}
fn mtime(&self) -> i64 {
self.metadata.mtime()
}
fn mtime_nsec(&self) -> i64 {
self.metadata.mtime_nsec()
}
fn permissions(&self) -> std::fs::Permissions {
self.metadata.permissions()
}
fn size(&self) -> u64 {
self.size
}
}
#[derive(Debug, Deserialize, Serialize)]
pub enum SourceMessage {
Directory {
src: std::path::PathBuf,
dst: std::path::PathBuf,
metadata: Metadata,
is_root: bool,
entry_count: usize,
file_count: usize,
keep_if_empty: bool,
},
Symlink {
src: std::path::PathBuf,
dst: std::path::PathBuf,
target: std::path::PathBuf,
metadata: Metadata,
is_root: bool,
},
DirStructureComplete { has_root_item: bool },
FileSkipped {
src: std::path::PathBuf,
dst: std::path::PathBuf,
},
SymlinkSkipped { src_dst: SrcDst, is_root: bool },
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SrcDst {
pub src: std::path::PathBuf,
pub dst: std::path::PathBuf,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum DestinationMessage {
DirectoryCreated {
src: std::path::PathBuf,
dst: std::path::PathBuf,
file_count: usize,
},
DestinationDone,
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RcpdConfig {
pub verbose: u8,
pub fail_early: bool,
pub max_workers: usize,
pub max_blocking_threads: usize,
pub max_open_files: Option<usize>,
pub ops_throttle: usize,
pub iops_throttle: usize,
pub chunk_size: usize,
pub auto_meta: Option<common::AutoMetaThrottleConfig>,
pub auto_meta_histogram: bool,
pub auto_meta_histogram_log: Option<String>,
pub auto_meta_histogram_interval: std::time::Duration,
pub dereference: bool,
pub overwrite: bool,
pub overwrite_compare: String,
pub overwrite_filter: Option<String>,
pub ignore_existing: bool,
pub skip_specials: bool,
pub debug_log_prefix: Option<String>,
pub port_ranges: Option<String>,
pub progress: bool,
pub progress_delay: Option<String>,
pub remote_copy_conn_timeout_sec: u64,
pub network_profile: crate::NetworkProfile,
pub buffer_size: Option<usize>,
pub max_connections: usize,
pub pending_writes_multiplier: usize,
pub chrome_trace_prefix: Option<String>,
pub flamegraph_prefix: Option<String>,
pub profile_level: Option<String>,
pub tokio_console: bool,
pub tokio_console_port: Option<u16>,
pub encryption: bool,
pub master_cert_fingerprint: Option<CertFingerprint>,
}
impl RcpdConfig {
pub fn to_args(&self) -> Vec<String> {
let mut args = vec![
format!("--max-workers={}", self.max_workers),
format!("--max-blocking-threads={}", self.max_blocking_threads),
format!("--ops-throttle={}", self.ops_throttle),
format!("--iops-throttle={}", self.iops_throttle),
format!("--chunk-size={}", self.chunk_size),
format!("--overwrite-compare={}", self.overwrite_compare),
];
if self.verbose > 0 {
args.push(format!("-{}", "v".repeat(self.verbose as usize)));
}
if self.fail_early {
args.push("--fail-early".to_string());
}
if let Some(v) = self.max_open_files {
args.push(format!("--max-open-files={v}"));
}
if self.dereference {
args.push("--dereference".to_string());
}
if self.overwrite {
args.push("--overwrite".to_string());
if let Some(ref filter) = self.overwrite_filter {
args.push(format!("--overwrite-filter={filter}"));
}
}
if self.ignore_existing {
args.push("--ignore-existing".to_string());
}
if self.skip_specials {
args.push("--skip-specials".to_string());
}
if let Some(ref prefix) = self.debug_log_prefix {
args.push(format!("--debug-log-prefix={prefix}"));
}
if let Some(ref ranges) = self.port_ranges {
args.push(format!("--port-ranges={ranges}"));
}
if self.progress {
args.push("--progress".to_string());
}
if let Some(ref delay) = self.progress_delay {
args.push(format!("--progress-delay={delay}"));
}
args.push(format!(
"--remote-copy-conn-timeout-sec={}",
self.remote_copy_conn_timeout_sec
));
args.push(format!("--network-profile={}", self.network_profile));
if let Some(v) = self.buffer_size {
args.push(format!("--buffer-size={v}"));
}
args.push(format!("--max-connections={}", self.max_connections));
args.push(format!(
"--pending-writes-multiplier={}",
self.pending_writes_multiplier
));
let profiling_enabled =
self.chrome_trace_prefix.is_some() || self.flamegraph_prefix.is_some();
if let Some(ref prefix) = self.chrome_trace_prefix {
args.push(format!("--chrome-trace={prefix}"));
}
if let Some(ref prefix) = self.flamegraph_prefix {
args.push(format!("--flamegraph={prefix}"));
}
if profiling_enabled && let Some(level) = &self.profile_level {
args.push(format!("--profile-level={level}"));
}
if self.tokio_console {
args.push("--tokio-console".to_string());
}
if let Some(port) = self.tokio_console_port {
args.push(format!("--tokio-console-port={port}"));
}
if !self.encryption {
args.push("--no-encryption".to_string());
}
if let Some(fp) = self.master_cert_fingerprint {
args.push(format!(
"--master-cert-fp={}",
crate::tls::fingerprint_to_hex(&fp)
));
}
if let Some(auto) = &self.auto_meta {
args.push("--auto-meta-throttle".to_string());
args.push(format!("--auto-meta-initial-cwnd={}", auto.initial_cwnd));
args.push(format!("--auto-meta-min-cwnd={}", auto.min_cwnd));
args.push(format!("--auto-meta-max-cwnd={}", auto.max_cwnd));
args.push(format!("--auto-meta-alpha={}", auto.alpha));
args.push(format!("--auto-meta-beta={}", auto.beta));
args.push(format!(
"--auto-meta-baseline-percentile={}",
auto.baseline_percentile,
));
args.push(format!(
"--auto-meta-current-percentile={}",
auto.current_percentile,
));
args.push(format!("--auto-meta-increase-step={}", auto.increase_step));
args.push(format!("--auto-meta-decrease-step={}", auto.decrease_step));
args.push(format!(
"--auto-meta-long-window={}",
humantime::format_duration(auto.long_window),
));
args.push(format!(
"--auto-meta-short-window={}",
humantime::format_duration(auto.short_window),
));
args.push(format!(
"--auto-meta-tick-interval={}",
humantime::format_duration(auto.tick_interval),
));
}
if let Some(path) = &self.auto_meta_histogram_log {
args.push(format!("--auto-meta-histogram-log={path}"));
args.push(format!(
"--auto-meta-histogram-interval={}",
humantime::format_duration(self.auto_meta_histogram_interval),
));
}
args
}
}
#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialEq, Eq)]
pub enum RcpdRole {
Source,
Destination,
}
impl std::fmt::Display for RcpdRole {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RcpdRole::Source => write!(f, "source"),
RcpdRole::Destination => write!(f, "destination"),
}
}
}
impl std::str::FromStr for RcpdRole {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"source" => Ok(RcpdRole::Source),
"destination" | "dest" => Ok(RcpdRole::Destination),
_ => Err(anyhow::anyhow!("invalid role: {}", s)),
}
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct TracingHello {
pub role: RcpdRole,
pub is_tracing: bool,
}
pub type CertFingerprint = [u8; 32];
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum MasterHello {
Source {
src: std::path::PathBuf,
dst: std::path::PathBuf,
dest_cert_fingerprint: Option<CertFingerprint>,
filter: Option<common::filter::FilterSettings>,
dry_run: Option<common::config::DryRunMode>,
},
Destination {
source_control_addr: std::net::SocketAddr,
source_data_addr: std::net::SocketAddr,
server_name: String,
preserve: common::preserve::Settings,
source_cert_fingerprint: Option<CertFingerprint>,
},
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SourceMasterHello {
pub control_addr: std::net::SocketAddr,
pub data_addr: std::net::SocketAddr,
pub server_name: String,
}
pub use common::RuntimeStats;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub enum RcpdResult {
Success {
message: String,
summary: common::copy::Summary,
runtime_stats: common::RuntimeStats,
},
Failure {
error: String,
summary: common::copy::Summary,
runtime_stats: common::RuntimeStats,
},
}
#[cfg(test)]
mod tests {
use super::*;
fn minimal_rcpd_config() -> RcpdConfig {
RcpdConfig {
verbose: 0,
fail_early: false,
max_workers: 0,
max_blocking_threads: 0,
max_open_files: None,
ops_throttle: 0,
iops_throttle: 0,
chunk_size: 0,
auto_meta: None,
auto_meta_histogram: false,
auto_meta_histogram_log: None,
auto_meta_histogram_interval: std::time::Duration::from_secs(1),
dereference: false,
overwrite: false,
overwrite_compare: "size,mtime".to_string(),
overwrite_filter: None,
ignore_existing: false,
skip_specials: false,
debug_log_prefix: None,
port_ranges: None,
progress: false,
progress_delay: None,
remote_copy_conn_timeout_sec: 30,
network_profile: crate::NetworkProfile::default(),
buffer_size: None,
max_connections: 1,
pending_writes_multiplier: 1,
chrome_trace_prefix: None,
flamegraph_prefix: None,
profile_level: None,
tokio_console: false,
tokio_console_port: None,
encryption: true,
master_cert_fingerprint: None,
}
}
#[test]
fn to_args_omits_auto_meta_throttle_when_none() {
let args = minimal_rcpd_config().to_args();
let throttle_flags = [
"--auto-meta-throttle",
"--auto-meta-initial-cwnd",
"--auto-meta-min-cwnd",
"--auto-meta-max-cwnd",
"--auto-meta-alpha",
"--auto-meta-beta",
"--auto-meta-baseline-percentile",
"--auto-meta-current-percentile",
"--auto-meta-increase-step",
"--auto-meta-decrease-step",
"--auto-meta-long-window",
"--auto-meta-short-window",
"--auto-meta-tick-interval",
];
for flag in throttle_flags {
assert!(
!args.iter().any(|a| a.starts_with(flag)),
"throttle flag {flag} should not be emitted when auto_meta is None: {args:?}",
);
}
for arg in &args {
assert!(
!arg.starts_with("--auto-meta-histogram"),
"must not emit any histogram flag when histograms are off, found: {arg}",
);
}
}
#[test]
fn to_args_propagates_all_auto_meta_fields() {
let mut config = minimal_rcpd_config();
config.auto_meta = Some(common::AutoMetaThrottleConfig {
initial_cwnd: 8,
min_cwnd: 2,
max_cwnd: 128,
alpha: 1.2,
beta: 1.6,
increase_step: 2,
decrease_step: 3,
baseline_percentile: 0.4,
current_percentile: 0.6,
long_window: std::time::Duration::from_secs(20),
short_window: std::time::Duration::from_secs(2),
tick_interval: std::time::Duration::from_millis(75),
});
let args = config.to_args();
let has = |needle: &str| args.iter().any(|a| a == needle);
let has_prefix = |needle: &str| args.iter().any(|a| a.starts_with(needle));
assert!(has("--auto-meta-throttle"));
assert!(has("--auto-meta-initial-cwnd=8"));
assert!(has("--auto-meta-min-cwnd=2"));
assert!(has("--auto-meta-max-cwnd=128"));
assert!(has_prefix("--auto-meta-alpha=1.2"));
assert!(has_prefix("--auto-meta-beta=1.6"));
assert!(has_prefix("--auto-meta-baseline-percentile=0.4"));
assert!(has_prefix("--auto-meta-current-percentile=0.6"));
assert!(has("--auto-meta-increase-step=2"));
assert!(has("--auto-meta-decrease-step=3"));
assert!(has_prefix("--auto-meta-long-window="));
assert!(has_prefix("--auto-meta-short-window="));
assert!(has_prefix("--auto-meta-tick-interval="));
}
#[test]
fn to_args_omits_histogram_flags_when_disabled() {
let mut config = minimal_rcpd_config();
config.auto_meta_histogram = false;
config.auto_meta_histogram_log = None;
let args = config.to_args();
for arg in &args {
assert!(
!arg.starts_with("--auto-meta-histogram"),
"must not emit histogram flag when disabled, found: {arg}",
);
}
}
#[test]
fn to_args_omits_panel_only_flag_when_no_log_path() {
let mut config = minimal_rcpd_config();
config.auto_meta_histogram = true;
config.auto_meta_histogram_log = None;
let args = config.to_args();
for arg in &args {
assert!(
!arg.starts_with("--auto-meta-histogram"),
"panel-only flag must not be forwarded to rcpd, found: {arg}",
);
}
}
#[test]
fn to_args_forwards_histogram_log_and_interval_when_log_path_set() {
let mut config = minimal_rcpd_config();
config.auto_meta_histogram = false; config.auto_meta_histogram_log = Some("/tmp/foo.hdr".into());
config.auto_meta_histogram_interval = std::time::Duration::from_millis(500);
let args = config.to_args();
assert!(
args.iter()
.any(|a| a == "--auto-meta-histogram-log=/tmp/foo.hdr")
);
assert!(
args.iter()
.any(|a| a.starts_with("--auto-meta-histogram-interval="))
);
assert!(
!args.iter().any(|a| a == "--auto-meta-histogram"),
"panel-only flag must not be forwarded; the log flag implies the pipeline",
);
}
}