Skip to main content

common/
lib.rs

1//! Common utilities and types for RCP file operation tools
2//!
3//! This crate provides shared functionality used across all RCP tools (`rcp`, `rrm`, `rlink`, `rcmp`).
4//! It includes core operations (copy, remove, link, compare), progress reporting, metadata preservation, and runtime configuration.
5//!
6//! # Core Modules
7//!
8//! - [`mod@copy`] - File copying operations with metadata preservation and error handling
9//! - [`mod@rm`] - File removal operations
10//! - [`mod@link`] - Hard-linking operations
11//! - [`mod@cmp`] - File comparison operations (metadata-based)
12//! - [`mod@preserve`] - Metadata preservation settings and operations
13//! - [`mod@progress`] - Progress tracking and reporting
14//! - [`mod@filecmp`] - File metadata comparison utilities
15//! - [`mod@remote_tracing`] - Remote tracing support for distributed operations
16//!
17//! # Key Types
18//!
19//! ## `RcpdType`
20//!
21//! Identifies the role of a remote copy daemon:
22//! - `Source` - reads files from source host
23//! - `Destination` - writes files to destination host
24//!
25//! ## `ProgressType`
26//!
27//! Controls progress reporting display:
28//! - `Auto` - automatically choose based on terminal type
29//! - `ProgressBar` - animated progress bar (for interactive terminals)
30//! - `TextUpdates` - periodic text updates (for logging/non-interactive)
31//!
32//! # Progress Reporting
33//!
34//! The crate provides a global progress tracking system accessible via [`get_progress()`].
35//! Progress can be displayed in different formats depending on the execution context.
36//!
37//! Progress output goes to stderr, while logs go to stdout, allowing users to redirect logs to a file while still viewing interactive progress.
38//!
39//! # Runtime Configuration
40//!
41//! The [`run`] function provides a unified entry point for all RCP tools with support for:
42//! - Progress tracking and reporting
43//! - Logging configuration (quiet/verbose modes)
44//! - Resource limits (max workers, open files, throttling)
45//! - Tokio runtime setup
46//! - Remote tracing integration
47//!
48//! # Examples
49//!
50//! ## Basic Copy Operation
51//!
52//! ```rust,no_run
53//! use std::path::Path;
54//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
55//! let src = Path::new("/source");
56//! let dst = Path::new("/destination");
57//!
58//! let settings = common::copy::Settings {
59//!     dereference: false,
60//!     fail_early: false,
61//!     overwrite: false,
62//!     overwrite_compare: Default::default(),
63//!     overwrite_filter: None,
64//!     ignore_existing: false,
65//!     chunk_size: 0,
66//!     skip_specials: false,
67//!     remote_copy_buffer_size: 0,
68//!     filter: None,
69//!     dry_run: None,
70//! };
71//! let preserve = common::preserve::preserve_none();
72//!
73//! let summary = common::copy(src, dst, &settings, &preserve).await?;
74//! println!("Copied {} files", summary.files_copied);
75//! # Ok(())
76//! # }
77//! ```
78//!
79//! ## Metadata Comparison
80//!
81//! ```rust,no_run
82//! use std::path::Path;
83//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
84//! let src = Path::new("/path1");
85//! let dst = Path::new("/path2");
86//!
87//! // output differences to stdout (use false for quiet mode)
88//! let log = common::cmp::LogWriter::new(None, true, common::cmp::OutputFormat::default()).await?;
89//! let settings = common::cmp::Settings {
90//!     fail_early: false,
91//!     exit_early: false,
92//!     expand_missing: false,
93//!     compare: Default::default(),
94//!     filter: None,
95//! };
96//!
97//! let summary = common::cmp(src, dst, &log, &settings).await?;
98//! println!("Comparison complete: {}", summary);
99//! # Ok(())
100//! # }
101//! ```
102
103use crate::cmp::ObjType;
104use anyhow::Context;
105use anyhow::anyhow;
106use std::io::IsTerminal;
107use tracing::instrument;
108use tracing_subscriber::fmt::format::FmtSpan;
109use tracing_subscriber::prelude::*;
110
111mod auto_meta;
112pub mod cli;
113pub mod cmp;
114pub mod config;
115pub mod copy;
116pub mod dry_run;
117pub mod error;
118pub mod error_collector;
119pub mod filegen;
120pub mod filter;
121pub mod histogram_logger;
122pub mod histogram_panel;
123pub mod link;
124pub mod observability;
125pub mod preserve;
126pub mod remote_tracing;
127pub mod rm;
128pub mod version;
129
130pub mod filecmp;
131pub mod progress;
132mod testutils;
133pub mod walk;
134
135pub use config::{
136    AutoMetaThrottleConfig, DryRunMode, DryRunWarnings, OutputConfig, RuntimeConfig,
137    ThrottleConfig, TracingConfig,
138};
139// Re-export `Side` from the congestion crate so downstream binaries
140// (rcp, rrm, …) and integration tests can pass `common::Side::Source` /
141// `common::Side::Destination` to `walk::next_entry_probed` and friends
142// without taking a direct dependency on `congestion`.
143pub use congestion::{MetadataOp, Side};
144pub use progress::{RcpdProgressPrinter, SerializableProgress};
145
146// Define RcpdType in common since remote depends on common
147#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
148pub enum RcpdType {
149    Source,
150    Destination,
151}
152
153impl std::fmt::Display for RcpdType {
154    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155        match self {
156            RcpdType::Source => write!(f, "source"),
157            RcpdType::Destination => write!(f, "destination"),
158        }
159    }
160}
161
162// Type alias for progress snapshots
163pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
164
165/// runtime statistics collected from a process (CPU time, memory usage)
166#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
167pub struct RuntimeStats {
168    /// user-mode CPU time in milliseconds
169    pub cpu_time_user_ms: u64,
170    /// kernel-mode CPU time in milliseconds
171    pub cpu_time_kernel_ms: u64,
172    /// peak resident set size in bytes
173    pub peak_rss_bytes: u64,
174}
175
176/// runtime stats collected from remote rcpd processes for display at the end of a remote copy
177#[derive(Debug, Default)]
178pub struct RemoteRuntimeStats {
179    pub source_host: String,
180    pub source_stats: RuntimeStats,
181    pub dest_host: String,
182    pub dest_stats: RuntimeStats,
183}
184
185/// checks if a host string refers to the local machine.
186/// returns true for `localhost`, `127.0.0.1`, `::1`, `[::1]`, or the actual hostname
187#[must_use]
188pub fn is_localhost(host: &str) -> bool {
189    if host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]" {
190        return true;
191    }
192    // check against actual hostname using gethostname
193    let mut buf = [0u8; 256];
194    // Safety: gethostname writes to buf and returns 0 on success
195    let result = unsafe { libc::gethostname(buf.as_mut_ptr() as *mut libc::c_char, buf.len()) };
196    if result == 0
197        && let Ok(hostname_cstr) = std::ffi::CStr::from_bytes_until_nul(&buf)
198        && let Ok(hostname) = hostname_cstr.to_str()
199        && host == hostname
200    {
201        return true;
202    }
203    false
204}
205
206static PROGRESS: std::sync::LazyLock<progress::Progress> =
207    std::sync::LazyLock::new(progress::Progress::new);
208static PBAR: std::sync::LazyLock<indicatif::ProgressBar> =
209    std::sync::LazyLock::new(indicatif::ProgressBar::new_spinner);
210static REMOTE_RUNTIME_STATS: std::sync::LazyLock<std::sync::Mutex<Option<RemoteRuntimeStats>>> =
211    std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
212static HISTOGRAM_LOGGER_CANCEL: std::sync::Mutex<Option<tokio::sync::watch::Sender<bool>>> =
213    std::sync::Mutex::new(None);
214static HISTOGRAM_LOGGER_HANDLE: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>> =
215    std::sync::Mutex::new(None);
216
217fn store_logger_cancel(tx: tokio::sync::watch::Sender<bool>) {
218    *HISTOGRAM_LOGGER_CANCEL
219        .lock()
220        .expect("histogram logger cancel mutex poisoned") = Some(tx);
221}
222
223fn store_logger_handle(handle: tokio::task::JoinHandle<()>) {
224    *HISTOGRAM_LOGGER_HANDLE
225        .lock()
226        .expect("histogram logger handle mutex poisoned") = Some(handle);
227}
228
229fn take_logger_handle() -> Option<tokio::task::JoinHandle<()>> {
230    HISTOGRAM_LOGGER_HANDLE
231        .lock()
232        .expect("histogram logger handle mutex poisoned")
233        .take()
234}
235
236fn signal_logger_cancel() {
237    if let Some(tx) = HISTOGRAM_LOGGER_CANCEL
238        .lock()
239        .expect("histogram logger cancel mutex poisoned")
240        .take()
241        && let Err(err) = tx.send(true)
242    {
243        tracing::debug!("histogram-logger cancel send failed (already gone): {err:#}");
244    }
245}
246
247#[must_use]
248pub fn get_progress() -> &'static progress::Progress {
249    &PROGRESS
250}
251
252/// stores remote runtime stats for display at the end of a remote copy operation
253pub fn set_remote_runtime_stats(stats: RemoteRuntimeStats) {
254    *REMOTE_RUNTIME_STATS.lock().unwrap() = Some(stats);
255}
256
257struct LocalTimeFormatter;
258
259impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFormatter {
260    fn format_time(
261        &self,
262        writer: &mut tracing_subscriber::fmt::format::Writer<'_>,
263    ) -> std::fmt::Result {
264        let now = chrono::Local::now();
265        writer.write_str(&now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
266    }
267}
268
269struct ProgressTracker {
270    lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
271    pbar_thread: Option<std::thread::JoinHandle<()>>,
272}
273
274#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
275pub enum ProgressType {
276    #[default]
277    #[value(name = "auto", alias = "Auto")]
278    Auto,
279    #[value(name = "ProgressBar", alias = "progress-bar")]
280    ProgressBar,
281    #[value(name = "TextUpdates", alias = "text-updates")]
282    TextUpdates,
283}
284
285pub enum GeneralProgressType {
286    User {
287        progress_type: ProgressType,
288        kind: progress::LocalProgressKind,
289    },
290    Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
291    RemoteMaster {
292        progress_type: ProgressType,
293        get_progress_snapshot:
294            Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>,
295    },
296}
297
298impl std::fmt::Debug for GeneralProgressType {
299    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300        match self {
301            GeneralProgressType::User {
302                progress_type,
303                kind,
304            } => write!(f, "User(progress_type: {progress_type:?}, kind: {kind:?})"),
305            GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
306            GeneralProgressType::RemoteMaster { progress_type, .. } => {
307                write!(
308                    f,
309                    "RemoteMaster(progress_type: {progress_type:?}, <function>)"
310                )
311            }
312        }
313    }
314}
315
316#[derive(Debug)]
317pub struct ProgressSettings {
318    pub progress_type: GeneralProgressType,
319    pub progress_delay: Option<String>,
320}
321
322fn progress_bar(
323    lock: &std::sync::Mutex<bool>,
324    cvar: &std::sync::Condvar,
325    delay_opt: &Option<std::time::Duration>,
326    kind: progress::LocalProgressKind,
327) {
328    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
329    PBAR.set_style(
330        indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
331            .unwrap()
332            .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
333    );
334    let mut prog_printer = progress::make_local_printer(kind, &PROGRESS);
335    let mut is_done = lock.lock().unwrap();
336    loop {
337        PBAR.set_position(PBAR.position() + 1); // do we need to update?
338        let mut msg = prog_printer.print().unwrap();
339        msg.push_str(&observability::render_lines());
340        msg.push_str(&render_panel_from_registry());
341        PBAR.set_message(msg);
342        let result = cvar.wait_timeout(is_done, delay).unwrap();
343        is_done = result.0;
344        if *is_done {
345            break;
346        }
347    }
348    PBAR.finish_and_clear();
349}
350
351fn get_datetime_prefix() -> String {
352    chrono::Local::now()
353        .format("%Y-%m-%dT%H:%M:%S%.3f%:z")
354        .to_string()
355}
356
357fn text_updates(
358    lock: &std::sync::Mutex<bool>,
359    cvar: &std::sync::Condvar,
360    delay_opt: &Option<std::time::Duration>,
361    kind: progress::LocalProgressKind,
362) {
363    let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
364    let mut prog_printer = progress::make_local_printer(kind, &PROGRESS);
365    let mut is_done = lock.lock().unwrap();
366    loop {
367        eprintln!("=======================");
368        eprintln!(
369            "{}\n--{}{}{}",
370            get_datetime_prefix(),
371            prog_printer.print().unwrap(),
372            observability::render_lines(),
373            render_panel_from_registry(),
374        );
375        let result = cvar.wait_timeout(is_done, delay).unwrap();
376        is_done = result.0;
377        if *is_done {
378            break;
379        }
380    }
381}
382
383fn rcpd_updates(
384    lock: &std::sync::Mutex<bool>,
385    cvar: &std::sync::Condvar,
386    delay_opt: &Option<std::time::Duration>,
387    sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
388) {
389    tracing::debug!("Starting rcpd progress updates");
390    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
391    let mut is_done = lock.lock().unwrap();
392    loop {
393        if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
394            // channel closed, receiver is done
395            tracing::debug!("Progress update channel closed, stopping progress updates");
396            break;
397        }
398        let result = cvar.wait_timeout(is_done, delay).unwrap();
399        is_done = result.0;
400        if *is_done {
401            break;
402        }
403    }
404}
405
406fn remote_master_updates<F>(
407    lock: &std::sync::Mutex<bool>,
408    cvar: &std::sync::Condvar,
409    delay_opt: &Option<std::time::Duration>,
410    get_progress_snapshot: F,
411    progress_type: ProgressType,
412) where
413    F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
414{
415    let interactive = match progress_type {
416        ProgressType::Auto => std::io::stderr().is_terminal(),
417        ProgressType::ProgressBar => true,
418        ProgressType::TextUpdates => false,
419    };
420    if interactive {
421        PBAR.set_style(
422            indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
423                .unwrap()
424                .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
425        );
426        let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
427        let mut printer = RcpdProgressPrinter::new();
428        let mut is_done = lock.lock().unwrap();
429        loop {
430            let progress_map = get_progress_snapshot();
431            let source_progress = &progress_map[RcpdType::Source];
432            let destination_progress = &progress_map[RcpdType::Destination];
433            PBAR.set_position(PBAR.position() + 1); // do we need to update?
434            let mut msg = printer
435                .print(source_progress, destination_progress)
436                .unwrap();
437            msg.push_str(&render_panel_from_registry());
438            PBAR.set_message(msg);
439            let result = cvar.wait_timeout(is_done, delay).unwrap();
440            is_done = result.0;
441            if *is_done {
442                break;
443            }
444        }
445        PBAR.finish_and_clear();
446    } else {
447        let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
448        let mut printer = RcpdProgressPrinter::new();
449        let mut is_done = lock.lock().unwrap();
450        loop {
451            let progress_map = get_progress_snapshot();
452            let source_progress = &progress_map[RcpdType::Source];
453            let destination_progress = &progress_map[RcpdType::Destination];
454            eprintln!("=======================");
455            eprintln!(
456                "{}\n--{}{}",
457                get_datetime_prefix(),
458                printer
459                    .print(source_progress, destination_progress)
460                    .unwrap(),
461                render_panel_from_registry(),
462            );
463            let result = cvar.wait_timeout(is_done, delay).unwrap();
464            is_done = result.0;
465            if *is_done {
466                break;
467            }
468        }
469    }
470}
471
472impl ProgressTracker {
473    pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
474        let lock_cvar =
475            std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
476        let lock_cvar_clone = lock_cvar.clone();
477        let pbar_thread = std::thread::spawn(move || {
478            let (lock, cvar) = &*lock_cvar_clone;
479            match progress_type {
480                GeneralProgressType::Remote(sender) => {
481                    rcpd_updates(lock, cvar, &delay_opt, sender);
482                }
483                GeneralProgressType::RemoteMaster {
484                    progress_type,
485                    get_progress_snapshot,
486                } => {
487                    remote_master_updates(
488                        lock,
489                        cvar,
490                        &delay_opt,
491                        get_progress_snapshot,
492                        progress_type,
493                    );
494                }
495                GeneralProgressType::User {
496                    progress_type,
497                    kind,
498                } => {
499                    let interactive = match progress_type {
500                        ProgressType::Auto => std::io::stderr().is_terminal(),
501                        ProgressType::ProgressBar => true,
502                        ProgressType::TextUpdates => false,
503                    };
504                    if interactive {
505                        progress_bar(lock, cvar, &delay_opt, kind);
506                    } else {
507                        text_updates(lock, cvar, &delay_opt, kind);
508                    }
509                }
510            }
511        });
512        Self {
513            lock_cvar,
514            pbar_thread: Some(pbar_thread),
515        }
516    }
517}
518
519impl Drop for ProgressTracker {
520    fn drop(&mut self) {
521        let (lock, cvar) = &*self.lock_cvar;
522        let mut is_done = lock.lock().unwrap();
523        *is_done = true;
524        cvar.notify_one();
525        drop(is_done);
526        if let Some(pbar_thread) = self.pbar_thread.take() {
527            pbar_thread.join().unwrap();
528        }
529    }
530}
531
532pub fn parse_metadata_cmp_settings(
533    settings: &str,
534) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
535    let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
536    for setting in settings.split(',') {
537        match setting {
538            "uid" => metadata_cmp_settings.uid = true,
539            "gid" => metadata_cmp_settings.gid = true,
540            "mode" => metadata_cmp_settings.mode = true,
541            "size" => metadata_cmp_settings.size = true,
542            "mtime" => metadata_cmp_settings.mtime = true,
543            "ctime" => metadata_cmp_settings.ctime = true,
544            _ => {
545                return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
546            }
547        }
548    }
549    Ok(metadata_cmp_settings)
550}
551
552fn parse_type_settings(
553    settings: &str,
554) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
555    let mut user_and_time = preserve::UserAndTimeSettings::default();
556    let mut mode_mask = None;
557    for setting in settings.split(',') {
558        match setting {
559            "uid" => user_and_time.uid = true,
560            "gid" => user_and_time.gid = true,
561            "time" => user_and_time.time = true,
562            _ => {
563                if let Ok(mask) = u32::from_str_radix(setting, 8) {
564                    mode_mask = Some(mask);
565                } else {
566                    return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
567                }
568            }
569        }
570    }
571    Ok((user_and_time, mode_mask))
572}
573
574pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
575    // handle presets
576    match settings {
577        "all" => return Ok(preserve::preserve_all()),
578        "none" => return Ok(preserve::preserve_none()),
579        _ => {}
580    }
581    let mut preserve_settings = preserve::Settings::default();
582    for type_settings in settings.split_whitespace() {
583        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
584            let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
585                format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
586            )?;
587            match obj_type {
588                "f" | "file" => {
589                    preserve_settings.file = preserve::FileSettings::default();
590                    preserve_settings.file.user_and_time = user_and_time_settings;
591                    if let Some(mode) = mode_opt {
592                        preserve_settings.file.mode_mask = mode;
593                    }
594                }
595                "d" | "dir" | "directory" => {
596                    preserve_settings.dir = preserve::DirSettings::default();
597                    preserve_settings.dir.user_and_time = user_and_time_settings;
598                    if let Some(mode) = mode_opt {
599                        preserve_settings.dir.mode_mask = mode;
600                    }
601                }
602                "l" | "link" | "symlink" => {
603                    preserve_settings.symlink = preserve::SymlinkSettings::default();
604                    preserve_settings.symlink.user_and_time = user_and_time_settings;
605                }
606                _ => {
607                    return Err(anyhow!("Unknown object type: {}", obj_type));
608                }
609            }
610        } else {
611            return Err(anyhow!("Invalid preserve settings: {}", settings));
612        }
613    }
614    Ok(preserve_settings)
615}
616
617/// Validates that every attribute checked by --update's comparison is actually being preserved.
618/// Skips size (always preserved via content copy) and ctime (kernel-managed, cannot be set).
619pub fn validate_update_compare_vs_preserve(
620    update_compare: &filecmp::MetadataCmpSettings,
621    preserve: &preserve::Settings,
622) -> Result<(), String> {
623    let mut missing = Vec::new();
624    if update_compare.mtime && !preserve.file.user_and_time.time {
625        missing.push("mtime");
626    }
627    if update_compare.uid && !preserve.file.user_and_time.uid {
628        missing.push("uid");
629    }
630    if update_compare.gid && !preserve.file.user_and_time.gid {
631        missing.push("gid");
632    }
633    // metadata_equal compares full mode (0o7777), so a partial mask is lossy
634    if update_compare.mode && preserve.file.mode_mask != 0o7777 {
635        missing.push("mode");
636    }
637    if missing.is_empty() {
638        Ok(())
639    } else {
640        Err(format!(
641            "--update compares [{}] but --preserve-settings does not preserve them. \
642             Use --allow-lossy-update to override or adjust --preserve-settings.",
643            missing.join(", ")
644        ))
645    }
646}
647
648pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
649    let mut cmp_settings = cmp::ObjSettings::default();
650    for type_settings in settings.split_whitespace() {
651        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
652            let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
653                "parsing preserve settings: {obj_settings}, type: {obj_type}"
654            ))?;
655            let obj_type = match obj_type {
656                "f" | "file" => ObjType::File,
657                "d" | "dir" | "directory" => ObjType::Dir,
658                "l" | "link" | "symlink" => ObjType::Symlink,
659                "o" | "other" => ObjType::Other,
660                _ => {
661                    return Err(anyhow!("Unknown obj type: {}", obj_type));
662                }
663            };
664            cmp_settings[obj_type] = obj_cmp_settings;
665        } else {
666            return Err(anyhow!("Invalid preserve settings: {}", settings));
667        }
668    }
669    Ok(cmp_settings)
670}
671
672pub async fn cmp(
673    src: &std::path::Path,
674    dst: &std::path::Path,
675    log: &cmp::LogWriter,
676    settings: &cmp::Settings,
677) -> Result<cmp::Summary, anyhow::Error> {
678    cmp::cmp(&PROGRESS, src, dst, log, settings).await
679}
680
681pub async fn copy(
682    src: &std::path::Path,
683    dst: &std::path::Path,
684    settings: &copy::Settings,
685    preserve: &preserve::Settings,
686) -> Result<copy::Summary, copy::Error> {
687    copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
688}
689
690pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
691    rm::rm(&PROGRESS, path, settings).await
692}
693
694pub async fn link(
695    src: &std::path::Path,
696    dst: &std::path::Path,
697    update: &Option<std::path::PathBuf>,
698    settings: &link::Settings,
699) -> Result<link::Summary, link::Error> {
700    let cwd = std::env::current_dir()
701        .with_context(|| "failed to get current working directory")
702        .map_err(|err| link::Error::new(err, link::Summary::default()))?;
703    link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
704}
705
706fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
707    match std::env::var(name) {
708        Ok(val) => match val.parse() {
709            Ok(val) => val,
710            Err(_) => default,
711        },
712        Err(_) => default,
713    }
714}
715
716/// collects runtime statistics (CPU time, memory) for the current process
717#[must_use]
718pub fn collect_runtime_stats() -> RuntimeStats {
719    collect_runtime_stats_inner(procfs::process::Process::myself().ok())
720}
721
722fn collect_runtime_stats_inner(process: Option<procfs::process::Process>) -> RuntimeStats {
723    let Some(process) = process else {
724        return RuntimeStats::default();
725    };
726    collect_runtime_stats_for_process(&process).unwrap_or_default()
727}
728
729fn collect_runtime_stats_for_process(
730    process: &procfs::process::Process,
731) -> anyhow::Result<RuntimeStats> {
732    let stat = process.stat()?;
733    let clock_ticks = procfs::ticks_per_second() as f64;
734    // vmhwm from /proc/[pid]/status is in kB, convert to bytes
735    let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
736    Ok(RuntimeStats {
737        cpu_time_user_ms: ((stat.utime as f64 / clock_ticks) * 1000.0) as u64,
738        cpu_time_kernel_ms: ((stat.stime as f64 / clock_ticks) * 1000.0) as u64,
739        peak_rss_bytes: vmhwm_kb * 1024,
740    })
741}
742
743fn print_runtime_stats_for_role(prefix: &str, stats: &RuntimeStats) {
744    let cpu_total =
745        std::time::Duration::from_millis(stats.cpu_time_user_ms + stats.cpu_time_kernel_ms);
746    let cpu_kernel = std::time::Duration::from_millis(stats.cpu_time_kernel_ms);
747    let cpu_user = std::time::Duration::from_millis(stats.cpu_time_user_ms);
748    println!(
749        "{prefix}cpu time : {:.2?} | k: {:.2?} | u: {:.2?}",
750        cpu_total, cpu_kernel, cpu_user
751    );
752    println!(
753        "{prefix}peak RSS : {}",
754        bytesize::ByteSize(stats.peak_rss_bytes)
755    );
756}
757
758#[rustfmt::skip]
759fn print_runtime_stats() -> Result<(), anyhow::Error> {
760    // check if we have remote runtime stats (from a remote copy operation)
761    let remote_stats = REMOTE_RUNTIME_STATS.lock().unwrap().take();
762    if let Some(remote) = remote_stats {
763        // print global walltime first
764        println!("walltime : {:.2?}", &PROGRESS.get_duration());
765        println!();
766        let source_is_local = is_localhost(&remote.source_host);
767        let dest_is_local = is_localhost(&remote.dest_host);
768        // collect master stats
769        let master_stats = collect_runtime_stats();
770        // print non-localhost roles first
771        if !source_is_local {
772            println!("SOURCE ({}):", remote.source_host);
773            print_runtime_stats_for_role("  ", &remote.source_stats);
774            println!();
775        }
776        if !dest_is_local {
777            println!("DESTINATION ({}):", remote.dest_host);
778            print_runtime_stats_for_role("  ", &remote.dest_stats);
779            println!();
780        }
781        // print combined localhost section
782        match (source_is_local, dest_is_local) {
783            (true, true) => {
784                println!("MASTER + SOURCE + DESTINATION (localhost):");
785                print_runtime_stats_for_role("  master ", &master_stats);
786                print_runtime_stats_for_role("  source ", &remote.source_stats);
787                print_runtime_stats_for_role("  dest   ", &remote.dest_stats);
788            }
789            (true, false) => {
790                println!("MASTER + SOURCE (localhost):");
791                print_runtime_stats_for_role("  master ", &master_stats);
792                print_runtime_stats_for_role("  source ", &remote.source_stats);
793            }
794            (false, true) => {
795                println!("MASTER + DESTINATION (localhost):");
796                print_runtime_stats_for_role("  master ", &master_stats);
797                print_runtime_stats_for_role("  dest   ", &remote.dest_stats);
798            }
799            (false, false) => {
800                println!("MASTER (localhost):");
801                print_runtime_stats_for_role("  ", &master_stats);
802            }
803        }
804        return Ok(());
805    }
806    // local operation - print stats for this process only
807    let process = procfs::process::Process::myself()?;
808    let stat = process.stat()?;
809    // The time is in clock ticks, so we need to convert it to seconds
810    let clock_ticks_per_second = procfs::ticks_per_second();
811    let ticks_to_duration = |ticks: u64| {
812        std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
813    };
814    // vmhwm from /proc/[pid]/status is in kB, convert to bytes
815    let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
816    println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
817    println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
818    println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm_kb * 1024));
819    Ok(())
820}
821
822fn get_max_open_files() -> Result<u64, std::io::Error> {
823    let mut rlim = libc::rlimit {
824        rlim_cur: 0,
825        rlim_max: 0,
826    };
827    // Safety: we pass a valid "rlim" pointer and the result is checked
828    let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
829    if result == 0 {
830        Ok(rlim.rlim_cur)
831    } else {
832        Err(std::io::Error::last_os_error())
833    }
834}
835
836struct ProgWriter {}
837
838impl ProgWriter {
839    fn new() -> Self {
840        Self {}
841    }
842}
843
844impl std::io::Write for ProgWriter {
845    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
846        PBAR.suspend(|| std::io::stdout().write(buf))
847    }
848    fn flush(&mut self) -> std::io::Result<()> {
849        std::io::stdout().flush()
850    }
851}
852
853fn get_hostname() -> String {
854    nix::unistd::gethostname()
855        .ok()
856        .and_then(|os_str| os_str.into_string().ok())
857        .unwrap_or_else(|| "unknown".to_string())
858}
859
860#[must_use]
861pub fn generate_debug_log_filename(prefix: &str) -> String {
862    let now = chrono::Utc::now();
863    let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
864    let process_id = std::process::id();
865    format!("{prefix}-{timestamp}-{process_id}")
866}
867
868/// Generate a trace filename with identifier, hostname, PID, and timestamp.
869///
870/// `identifier` should be "rcp", "rcpd-source", or "rcpd-destination"
871#[must_use]
872pub fn generate_trace_filename(prefix: &str, identifier: &str, extension: &str) -> String {
873    let hostname = get_hostname();
874    let pid = std::process::id();
875    let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S");
876    format!("{prefix}-{identifier}-{hostname}-{pid}-{timestamp}.{extension}")
877}
878
879/// Build the verbose-level [`tracing_subscriber::EnvFilter`] used by every
880/// non-profile tracing layer (file, fmt, remote). Excludes noisy deps that are
881/// rarely useful when debugging rcp.
882fn build_verbose_env_filter(verbose: u8) -> tracing_subscriber::EnvFilter {
883    let level_directive = match verbose {
884        0 => "error".parse().unwrap(),
885        1 => "info".parse().unwrap(),
886        2 => "debug".parse().unwrap(),
887        _ => "trace".parse().unwrap(),
888    };
889    tracing_subscriber::EnvFilter::from_default_env()
890        .add_directive(level_directive)
891        .add_directive("tokio=info".parse().unwrap())
892        .add_directive("runtime=info".parse().unwrap())
893        .add_directive("quinn=warn".parse().unwrap())
894        .add_directive("rustls=warn".parse().unwrap())
895        .add_directive("h2=warn".parse().unwrap())
896}
897
898/// Build the [`tracing_subscriber::EnvFilter`] used by chrome/flame profile
899/// layers. Profiling layers don't share the verbose-level filter because they
900/// have their own `--profile-level`. Returns the formatted filter string —
901/// callers re-parse it per layer because EnvFilter isn't Clone.
902fn build_profile_filter_str(profile_level: Option<&str>) -> String {
903    let level_str = profile_level.unwrap_or("trace");
904    let valid_levels = ["trace", "debug", "info", "warn", "error", "off"];
905    if !valid_levels.contains(&level_str.to_lowercase().as_str()) {
906        eprintln!(
907            "Invalid --profile-level '{level_str}'. Valid values: trace, debug, info, warn, error, off"
908        );
909        std::process::exit(1);
910    }
911    format!("tokio=off,quinn=off,h2=off,hyper=off,rustls=off,{level_str}")
912}
913
914/// Guards from chrome/flame tracing layers that must outlive the runtime to
915/// flush traces on shutdown. Hold the returned struct for the lifetime of the
916/// run.
917#[allow(dead_code)] // fields are kept alive only for their Drop side-effects
918struct TracingGuards {
919    chrome: Option<tracing_chrome::FlushGuard>,
920    flame: Option<tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>>,
921}
922
923/// Install the global [`tracing_subscriber`] registry from a [`TracingConfig`].
924/// Caller must hold the returned [`TracingGuards`] until the run finishes so
925/// that chrome/flame traces are flushed before the file handles close.
926///
927/// In quiet mode this is a no-op (the subscriber is never installed).
928fn install_tracing_subscriber(
929    quiet: bool,
930    verbose: u8,
931    tracing_config: TracingConfig,
932) -> TracingGuards {
933    if quiet {
934        assert!(
935            verbose == 0,
936            "Quiet mode and verbose mode are mutually exclusive"
937        );
938        return TracingGuards {
939            chrome: None,
940            flame: None,
941        };
942    }
943    let TracingConfig {
944        remote_layer: remote_tracing_layer,
945        debug_log_file,
946        chrome_trace_prefix,
947        flamegraph_prefix,
948        trace_identifier,
949        profile_level,
950        tokio_console,
951        tokio_console_port,
952    } = tracing_config;
953    let file_layer = debug_log_file.map(|log_file_path| {
954        let file = std::fs::OpenOptions::new()
955            .create(true)
956            .append(true)
957            .open(&log_file_path)
958            .unwrap_or_else(|e| {
959                panic!("Failed to create debug log file at '{log_file_path}': {e}")
960            });
961        tracing_subscriber::fmt::layer()
962            .with_target(true)
963            .with_line_number(true)
964            .with_thread_ids(true)
965            .with_timer(LocalTimeFormatter)
966            .with_ansi(false)
967            .with_writer(file)
968            .with_filter(build_verbose_env_filter(verbose))
969    });
970    // fmt_layer for local console output (when not using remote tracing)
971    let fmt_layer = if remote_tracing_layer.is_some() {
972        None
973    } else {
974        Some(
975            tracing_subscriber::fmt::layer()
976                .with_target(true)
977                .with_line_number(true)
978                .with_span_events(if verbose > 2 {
979                    FmtSpan::NEW | FmtSpan::CLOSE
980                } else {
981                    FmtSpan::NONE
982                })
983                .with_timer(LocalTimeFormatter)
984                .pretty()
985                .with_writer(ProgWriter::new)
986                .with_filter(build_verbose_env_filter(verbose)),
987        )
988    };
989    // apply env_filter to remote_tracing_layer so it respects verbose level
990    let remote_tracing_layer =
991        remote_tracing_layer.map(|layer| layer.with_filter(build_verbose_env_filter(verbose)));
992    let console_layer = tokio_console.then(|| {
993        let console_port = tokio_console_port.unwrap_or(6669);
994        let retention_seconds: u64 =
995            read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
996        eprintln!("Tokio console server listening on 127.0.0.1:{console_port}");
997        console_subscriber::ConsoleLayer::builder()
998            .retention(std::time::Duration::from_secs(retention_seconds))
999            .server_addr(([127, 0, 0, 1], console_port))
1000            .spawn()
1001    });
1002    // chrome/flame share a profile filter; build the string once and re-parse
1003    // per layer (EnvFilter isn't Clone).
1004    let profile_filter_str = (chrome_trace_prefix.is_some() || flamegraph_prefix.is_some())
1005        .then(|| build_profile_filter_str(profile_level.as_deref()));
1006    let make_profile_filter =
1007        || tracing_subscriber::EnvFilter::new(profile_filter_str.as_ref().unwrap());
1008    let mut chrome_guard = None;
1009    let chrome_layer = chrome_trace_prefix.as_ref().map(|prefix| {
1010        let filename = generate_trace_filename(prefix, &trace_identifier, "json");
1011        eprintln!("Chrome trace will be written to: {filename}");
1012        let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
1013            .file(&filename)
1014            .include_args(true)
1015            .build();
1016        chrome_guard = Some(guard);
1017        layer.with_filter(make_profile_filter())
1018    });
1019    let mut flame_guard = None;
1020    let flame_layer = flamegraph_prefix.as_ref().and_then(|prefix| {
1021        let filename = generate_trace_filename(prefix, &trace_identifier, "folded");
1022        eprintln!("Flamegraph data will be written to: {filename}");
1023        match tracing_flame::FlameLayer::with_file(&filename) {
1024            Ok((layer, guard)) => {
1025                flame_guard = Some(guard);
1026                Some(layer.with_filter(make_profile_filter()))
1027            }
1028            Err(e) => {
1029                eprintln!("Failed to create flamegraph layer: {e}");
1030                None
1031            }
1032        }
1033    });
1034    tracing_subscriber::registry()
1035        .with(file_layer)
1036        .with(fmt_layer)
1037        .with(remote_tracing_layer)
1038        .with(console_layer)
1039        .with(chrome_layer)
1040        .with(flame_layer)
1041        .init();
1042    TracingGuards {
1043        chrome: chrome_guard,
1044        flame: flame_guard,
1045    }
1046}
1047
1048/// Build a multi-threaded tokio runtime configured per `runtime`, and apply
1049/// the `max_open_files` limit from `throttle`. Falls back to ~80% of the
1050/// system rlimit (capped at 4096) when `max_open_files` is unset.
1051fn build_tokio_runtime(
1052    runtime: &RuntimeConfig,
1053    throttle: &ThrottleConfig,
1054) -> tokio::runtime::Runtime {
1055    let mut builder = tokio::runtime::Builder::new_multi_thread();
1056    builder.enable_all();
1057    if runtime.max_workers > 0 {
1058        builder.worker_threads(runtime.max_workers);
1059    }
1060    if runtime.max_blocking_threads > 0 {
1061        builder.max_blocking_threads(runtime.max_blocking_threads);
1062    }
1063    if !sysinfo::set_open_files_limit(usize::MAX) {
1064        tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
1065    }
1066    let set_max_open_files = throttle.max_open_files.unwrap_or_else(|| {
1067        let limit = get_max_open_files().expect(
1068            "We failed to query rlimit, if this is expected try specifying --max-open-files",
1069        ) as usize;
1070        // use ~80% of the system limit, but cap at 4096 to avoid overwhelming
1071        // distributed filesystems
1072        std::cmp::min(limit / 10 * 8, 4096)
1073    });
1074    if set_max_open_files > 0 {
1075        tracing::info!("Setting max open files to: {}", set_max_open_files);
1076        throttle::set_max_open_files(set_max_open_files);
1077    } else {
1078        tracing::info!("Not applying any limit to max open files!");
1079    }
1080    builder.build().expect("Failed to create runtime")
1081}
1082
1083/// Spawn the ops/iops throttle replenisher tasks onto `runtime` if the
1084/// throttles are enabled.
1085///
1086/// When `auto_meta` is set, the ops-throttle is forced to a fixed 100ms
1087/// replenish interval (matching the constant the auto-meta adapter uses
1088/// when converting `Decision::rate_per_sec` → tokens-per-interval) *and*
1089/// is bootstrapped even if `--ops-throttle` was zero. That way:
1090///
1091/// 1. a future rate-aware controller's `rate_per_sec: Some(_)` decisions
1092///    actually gate ops instead of silently no-opping;
1093/// 2. the adapter's 100ms conversion assumption matches the thread's
1094///    real interval, regardless of the user's static `--ops-throttle`
1095///    value.
1096fn spawn_throttle_replenishers(
1097    runtime: &tokio::runtime::Runtime,
1098    throttle: &ThrottleConfig,
1099    trace_identifier: &str,
1100) {
1101    fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
1102        let mut replenish = replenish;
1103        let mut interval = std::time::Duration::from_secs(1);
1104        while replenish > 100 && interval > std::time::Duration::from_millis(1) {
1105            replenish /= 10;
1106            interval /= 10;
1107        }
1108        (replenish, interval)
1109    }
1110    let auto_meta_on = throttle.auto_meta.is_some();
1111    if auto_meta_on {
1112        // Force the fixed 100ms cadence the adapter assumes. Bootstrap
1113        // with at least 1 token so `setup()` enables the semaphore; if
1114        // the user didn't pass `--ops-throttle`, immediately disable —
1115        // the adapter re-enables only when a rate decision arrives.
1116        let interval = std::time::Duration::from_millis(100);
1117        let initial_replenish = (throttle.ops_throttle as f64 * 0.1) as usize;
1118        throttle::init_ops_tokens(initial_replenish.max(1));
1119        if throttle.ops_throttle == 0 {
1120            throttle::disable_ops_throttle();
1121        }
1122        runtime.spawn(throttle::run_ops_replenish_thread(
1123            initial_replenish,
1124            interval,
1125        ));
1126    } else if throttle.ops_throttle > 0 {
1127        let (replenish, interval) = get_replenish_interval(throttle.ops_throttle);
1128        throttle::init_ops_tokens(replenish);
1129        runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
1130    }
1131    if throttle.iops_throttle > 0 {
1132        let (replenish, interval) = get_replenish_interval(throttle.iops_throttle);
1133        throttle::init_iops_tokens(replenish);
1134        runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
1135    }
1136    if let Some(auto) = throttle.auto_meta {
1137        spawn_auto_meta_throttle(
1138            runtime,
1139            auto,
1140            throttle.histogram_enabled,
1141            throttle.histogram_log_path.clone(),
1142            throttle.histogram_interval,
1143            trace_identifier,
1144        );
1145    }
1146}
1147
1148/// Compute the per-tool resolved log path by inserting `trace_identifier`
1149/// between the user-supplied stem and extension. Mirrors the
1150/// chrome_trace_prefix convention so master and rcpds don't collide on
1151/// localhost runs.
1152///
1153/// Handles three edge cases consistently with the validator:
1154/// - bare filename (`foo.hdr`): parent → `.`
1155/// - no extension (`foo`): extension → `hdr`
1156/// - no stem (`.hidden`): stem → `auto-meta`
1157///
1158/// Non-UTF-8 stem and extension components (valid on Unix) are preserved
1159/// unchanged; only genuinely absent components fall back to defaults.
1160fn resolve_log_path(path: &std::path::Path, trace_identifier: &str) -> std::path::PathBuf {
1161    let parent = match path.parent() {
1162        Some(p) if p.as_os_str().is_empty() => std::path::Path::new("."),
1163        Some(p) => p,
1164        None => std::path::Path::new("."),
1165    };
1166    let mut name: std::ffi::OsString = path
1167        .file_stem()
1168        .map(|s| s.to_os_string())
1169        .unwrap_or_else(|| std::ffi::OsString::from("auto-meta"));
1170    name.push(".");
1171    name.push(trace_identifier);
1172    name.push(".");
1173    match path.extension() {
1174        Some(e) => name.push(e),
1175        None => name.push("hdr"),
1176    }
1177    parent.join(name)
1178}
1179
1180/// Verify the resolved histogram log path can actually be opened for
1181/// writing. Called from [`run`] before any work begins so a typo or
1182/// permission issue surfaces as a configuration error rather than a
1183/// silent warning at runtime.
1184///
1185/// The check creates+truncates the file (matching what the logger
1186/// task does later) so it catches "exists as a directory", "exists
1187/// as an unwritable file", and most permission issues. The logger's
1188/// own open later in startup will reuse the same path; the double-
1189/// open is harmless (the logger truncates again).
1190fn validate_histogram_log_target(
1191    throttle: &ThrottleConfig,
1192    trace_identifier: &str,
1193) -> Result<(), String> {
1194    let Some(path) = &throttle.histogram_log_path else {
1195        return Ok(());
1196    };
1197    if path.is_dir() {
1198        return Err(format!(
1199            "--auto-meta-histogram-log {path:?} is a directory; expected a file path",
1200        ));
1201    }
1202    if path.file_name().is_none() {
1203        return Err(format!(
1204            "--auto-meta-histogram-log {path:?} has no filename component",
1205        ));
1206    }
1207    let resolved = resolve_log_path(path, trace_identifier);
1208    let mut open_options = std::fs::OpenOptions::new();
1209    open_options.create(true).write(true).truncate(true);
1210    #[cfg(unix)]
1211    {
1212        use std::os::unix::fs::OpenOptionsExt;
1213        open_options.custom_flags(libc::O_NOFOLLOW);
1214    }
1215    match open_options.open(&resolved) {
1216        Ok(_) => Ok(()),
1217        Err(err) => {
1218            // ELOOP from O_NOFOLLOW reads as "too many levels of symbolic links"
1219            // — make it explicit that this rejection is intentional security.
1220            #[cfg(unix)]
1221            let context = if err.raw_os_error() == Some(libc::ELOOP) {
1222                " (resolved path is a symlink, which would let a local attacker hijack the write)"
1223            } else {
1224                ""
1225            };
1226            #[cfg(not(unix))]
1227            let context = "";
1228            Err(format!(
1229                "--auto-meta-histogram-log cannot create resolved path {resolved:?}: {err:#}{context}",
1230            ))
1231        }
1232    }
1233}
1234
1235/// Stable label for a `(Side, MetadataOp)` controller.
1236///
1237/// Naming rule:
1238/// - **Lookups** (`Stat`, `ReadLink`) happen on either filesystem, so
1239///   the label always carries an explicit `src-` / `dst-` prefix to
1240///   disambiguate. Example: `src-stat`, `dst-read-link`.
1241/// - **Mutations and `open(O_CREAT)`** only ever occur on the
1242///   destination side (sources are immutable in copy/cmp/link/rm).
1243///   These labels drop the side prefix entirely. Example: `mkdir`,
1244///   `unlink`, `rmdir`, `hard-link`, `symlink`, `chmod`, `open-create`.
1245///
1246/// The result: single-filesystem tools like `rrm` show clean labels
1247/// (`src-stat`, `unlink`, `rmdir`) instead of the prior misleading
1248/// `meta-src` / `meta-dst` framing — there is no second filesystem to
1249/// distinguish from. Dual-filesystem tools (rcp, rcmp, rlink) still
1250/// disambiguate the two stat / read-link controllers cleanly.
1251///
1252/// Implemented as a `const fn` over a fixed match table so the label
1253/// set is a compile-time constant — no allocation, no `Box::leak`, and
1254/// no per-`run()` accumulation when callers invoke the runtime more
1255/// than once in a single process.
1256const fn unit_label(side: congestion::Side, op: congestion::MetadataOp) -> &'static str {
1257    use congestion::MetadataOp::*;
1258    use congestion::Side::*;
1259    match (side, op) {
1260        // Lookups: prefix with side because both sides exercise them.
1261        (Source, Stat) => "src-stat",
1262        (Destination, Stat) => "dst-stat",
1263        (Source, ReadLink) => "src-read-link",
1264        (Destination, ReadLink) => "dst-read-link",
1265        // Destination-only ops: no prefix in the active case. The
1266        // (Source, op) slot is wired but never sees a sample under
1267        // normal operation; the renderer hides it. The `src-` label is
1268        // kept so any debugging surface still disambiguates the slot
1269        // from the active destination one if it ever fires.
1270        (Destination, MkDir) => "mkdir",
1271        (Source, MkDir) => "src-mkdir",
1272        (Destination, RmDir) => "rmdir",
1273        (Source, RmDir) => "src-rmdir",
1274        (Destination, Unlink) => "unlink",
1275        (Source, Unlink) => "src-unlink",
1276        (Destination, HardLink) => "hard-link",
1277        (Source, HardLink) => "src-hard-link",
1278        (Destination, Symlink) => "symlink",
1279        (Source, Symlink) => "src-symlink",
1280        (Destination, Chmod) => "chmod",
1281        (Source, Chmod) => "src-chmod",
1282        (Destination, OpenCreate) => "open-create",
1283        (Source, OpenCreate) => "src-open-create",
1284    }
1285}
1286
1287fn build_histogram_header(
1288    auto: &AutoMetaThrottleConfig,
1289    tool_name: &str,
1290    snapshot_interval: std::time::Duration,
1291) -> congestion::format::LogHeader {
1292    use congestion::format::{AutoMetaSnapshot, HdrSnapshot, LogHeader, UnitLabel};
1293    let hostname = nix::unistd::gethostname()
1294        .ok()
1295        .and_then(|h| h.into_string().ok())
1296        .unwrap_or_else(|| "unknown".into());
1297    let mut unit_labels = Vec::with_capacity(congestion::N_META_RESOURCES);
1298    for &side in &congestion::Side::ALL {
1299        for &op in &congestion::MetadataOp::ALL {
1300            unit_labels.push(UnitLabel {
1301                side: side as u8,
1302                op: op as u8,
1303                label: unit_label(side, op).to_string(),
1304            });
1305        }
1306    }
1307    LogHeader {
1308        format_version: congestion::format::FORMAT_VERSION,
1309        tool: tool_name.to_string(),
1310        tool_version: env!("CARGO_PKG_VERSION").to_string(),
1311        hostname,
1312        pid: std::process::id(),
1313        start_unix_micros: std::time::SystemTime::now()
1314            .duration_since(std::time::UNIX_EPOCH)
1315            .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
1316            .unwrap_or(0),
1317        snapshot_interval_micros: u64::try_from(snapshot_interval.as_micros()).unwrap_or(u64::MAX),
1318        auto_meta: AutoMetaSnapshot {
1319            initial_cwnd: auto.initial_cwnd,
1320            min_cwnd: auto.min_cwnd,
1321            max_cwnd: auto.max_cwnd,
1322            alpha: auto.alpha,
1323            beta: auto.beta,
1324            increase_step: auto.increase_step,
1325            decrease_step: auto.decrease_step,
1326            baseline_percentile: auto.baseline_percentile,
1327            current_percentile: auto.current_percentile,
1328            long_window_micros: u64::try_from(auto.long_window.as_micros()).unwrap_or(u64::MAX),
1329            short_window_micros: u64::try_from(auto.short_window.as_micros()).unwrap_or(u64::MAX),
1330            tick_interval_micros: u64::try_from(auto.tick_interval.as_micros()).unwrap_or(u64::MAX),
1331        },
1332        hdr: HdrSnapshot {
1333            lowest_discernible_micros: congestion::HDR_LOWEST_DISCERNIBLE_MICROS,
1334            highest_trackable_micros: congestion::HDR_HIGHEST_TRACKABLE_MICROS,
1335            significant_figures: congestion::HDR_SIGNIFICANT_FIGURES,
1336            unit: "microseconds".into(),
1337        },
1338        unit_labels,
1339    }
1340}
1341
1342fn render_panel_from_registry() -> String {
1343    let entries = observability::registered_histograms();
1344    if entries.is_empty() {
1345        return String::new();
1346    }
1347    let snapshots: Vec<hdrhistogram::Histogram<u64>> = entries
1348        .iter()
1349        .map(|e| (*e.snapshot_rx.borrow()).clone())
1350        .collect();
1351    let units: Vec<histogram_panel::PanelUnit> = entries
1352        .iter()
1353        .zip(snapshots.iter())
1354        .map(|(e, snap)| histogram_panel::PanelUnit {
1355            label: e.label,
1356            histogram: snap,
1357            interval: e.interval,
1358        })
1359        .collect();
1360    histogram_panel::render_histogram_panel(&units)
1361}
1362
1363/// Wire up the adaptive metadata-ops control loops — one per
1364/// `(Side, MetadataOp)` pair (18 in total):
1365///
1366/// 1. Seed every resource's `OPS_IN_FLIGHT_LIMIT_*` semaphore with the
1367///    controller's initial cwnd so the first probe on any resource
1368///    finds a permit available.
1369/// 2. Install one `RoutingSink` that fans metadata samples out to per-
1370///    `(side, op)` channels, each consumed by its own
1371///    `ControlUnit<RatioController>`. Each syscall on each side gets
1372///    an independent latency baseline and an independent cwnd, so a
1373///    saturated `unlink` path doesn't drag down `stat` (or vice versa).
1374/// 3. Spawn one combined adapter/monitor task per resource. By
1375///    convention `(Destination, Stat)` is the rate-driver — the global
1376///    `OPS_THROTTLE` is shared, so only one adapter may translate rate
1377///    decisions; all others apply concurrency only. The current
1378///    `RatioController` doesn't emit rate decisions, so the choice is
1379///    forward-looking.
1380/// 4. Each adapter exits cleanly when its control unit stops
1381///    publishing decisions, so they don't leak as unbounded background
1382///    loops.
1383///
1384/// Only one auto-meta config is supported per process. If a sample sink
1385/// was already installed, it is silently replaced.
1386fn spawn_auto_meta_throttle(
1387    runtime: &tokio::runtime::Runtime,
1388    auto: AutoMetaThrottleConfig,
1389    histogram_enabled: bool,
1390    histogram_log_path: Option<std::path::PathBuf>,
1391    histogram_interval: std::time::Duration,
1392    trace_identifier: &str,
1393) {
1394    let initial_cwnd = auto
1395        .initial_cwnd
1396        .clamp(auto.min_cwnd.max(1), auto.max_cwnd.max(1));
1397    let histogram_active = histogram_enabled || histogram_log_path.is_some();
1398    // Per-tool log path: each rcpd / master writes to its own file by
1399    // suffixing trace_identifier on the user-supplied path, mirroring
1400    // chrome_trace_prefix's convention.
1401    let resolved_log_path = histogram_log_path
1402        .as_ref()
1403        .map(|p| resolve_log_path(p, trace_identifier));
1404
1405    // Build receivers + accumulators in parallel arrays so we can pass
1406    // each accumulator both to a ControlUnit and to a LoggerUnit.
1407    let mut builder = congestion::RoutingSinkBuilder::new();
1408    struct Slot {
1409        label: &'static str,
1410        side: congestion::Side,
1411        op: congestion::MetadataOp,
1412        sample_rx: tokio::sync::mpsc::Receiver<congestion::Sample>,
1413        apply_rate: bool,
1414        accumulator: Option<std::sync::Arc<std::sync::Mutex<congestion::HistogramAccumulator>>>,
1415    }
1416    let mut slots: Vec<Slot> = Vec::with_capacity(congestion::N_META_RESOURCES);
1417    for &side in &congestion::Side::ALL {
1418        for &op in &congestion::MetadataOp::ALL {
1419            let resource = walk::meta_resource(side, op);
1420            throttle::set_max_ops_in_flight(resource, initial_cwnd as usize);
1421            let rx = builder.metadata_receiver(side, op);
1422            let apply_rate = matches!(
1423                (side, op),
1424                (congestion::Side::Destination, congestion::MetadataOp::Stat),
1425            );
1426            let accumulator = if histogram_active {
1427                let acc = std::sync::Arc::new(std::sync::Mutex::new(
1428                    congestion::HistogramAccumulator::new(),
1429                ));
1430                builder.metadata_histogram(side, op, acc.clone());
1431                Some(acc)
1432            } else {
1433                None
1434            };
1435            slots.push(Slot {
1436                label: unit_label(side, op),
1437                side,
1438                op,
1439                sample_rx: rx,
1440                apply_rate,
1441                accumulator,
1442            });
1443        }
1444    }
1445    let sink = std::sync::Arc::new(builder.build());
1446    congestion::install_sample_sink(sink.clone());
1447
1448    // Per-unit watch senders for the live histogram panel; collected into
1449    // a parallel vec so we can also build the logger's `LoggerUnit` list.
1450    let mut logger_units: Vec<histogram_logger::LoggerUnit> = Vec::new();
1451    for slot in slots {
1452        let controller = congestion::RatioController::new(congestion::RatioConfig {
1453            initial_cwnd: auto.initial_cwnd,
1454            min_cwnd: auto.min_cwnd,
1455            max_cwnd: auto.max_cwnd,
1456            alpha: auto.alpha,
1457            beta: auto.beta,
1458            increase_step: auto.increase_step,
1459            decrease_step: auto.decrease_step,
1460            baseline_percentile: auto.baseline_percentile,
1461            current_percentile: auto.current_percentile,
1462            long_window: auto.long_window,
1463            short_window: auto.short_window,
1464        });
1465        let (unit, decision_rx, snapshot_rx) = congestion::ControlUnit::new(
1466            slot.label,
1467            controller,
1468            slot.sample_rx,
1469            auto.tick_interval,
1470        );
1471        observability::register_unit(slot.label, snapshot_rx);
1472        if let Some(acc) = slot.accumulator.as_ref() {
1473            let (snap_tx, snap_rx) = tokio::sync::watch::channel(
1474                hdrhistogram::Histogram::<u64>::new_with_bounds(
1475                    congestion::HDR_LOWEST_DISCERNIBLE_MICROS,
1476                    congestion::HDR_HIGHEST_TRACKABLE_MICROS,
1477                    congestion::HDR_SIGNIFICANT_FIGURES,
1478                )
1479                .expect("histogram bounds valid"),
1480            );
1481            observability::register_histogram(slot.label, snap_rx, histogram_interval);
1482            logger_units.push(histogram_logger::LoggerUnit {
1483                label: slot.label,
1484                side: slot.side,
1485                op: slot.op,
1486                accumulator: acc.clone(),
1487                snapshot_tx: snap_tx,
1488            });
1489        }
1490        runtime.spawn(unit.run());
1491        runtime.spawn(auto_meta::run_adapter(
1492            walk::meta_resource(slot.side, slot.op),
1493            slot.apply_rate,
1494            decision_rx,
1495            sink.clone(),
1496        ));
1497    }
1498
1499    if histogram_active {
1500        let header = build_histogram_header(&auto, trace_identifier, histogram_interval);
1501        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1502        store_logger_cancel(cancel_tx);
1503        // Snapshot the global PROGRESS counters into JSON each tick so
1504        // the binary log carries throughput/files-copied alongside the
1505        // latency distributions — readers can time-align them via the
1506        // shared unix_micros field. Encoding can't realistically fail
1507        // for this struct shape, but on the off chance it does we log
1508        // and return an empty Vec — the logger treats empty as "skip
1509        // this tick" rather than writing a record readers can't parse.
1510        let progress_source: histogram_logger::ProgressSource = Box::new(|| {
1511            let snapshot = progress::SerializableProgress::from(&*PROGRESS);
1512            serde_json::to_vec(&snapshot).unwrap_or_else(|err| {
1513                tracing::warn!(
1514                    "histogram-logger: SerializableProgress JSON encode failed: {err:#}; \
1515                     dropping this tick's progress record"
1516                );
1517                Vec::new()
1518            })
1519        });
1520        let handle = runtime.spawn(histogram_logger::run_logger(
1521            histogram_logger::LoggerConfig {
1522                interval: histogram_interval,
1523                log_path: resolved_log_path,
1524                header,
1525                progress_source: Some(progress_source),
1526            },
1527            logger_units,
1528            cancel_rx,
1529        ));
1530        store_logger_handle(handle);
1531    }
1532
1533    tracing::info!(
1534        "auto-meta-throttle enabled (per-(side, op) controllers, {} total): \
1535         initial_cwnd={}, max_cwnd={}, alpha={}, beta={}, \
1536         baseline_percentile={}, current_percentile={}, \
1537         long_window={:?}, short_window={:?}, tick={:?}, \
1538         histograms={}",
1539        congestion::N_META_RESOURCES,
1540        auto.initial_cwnd,
1541        auto.max_cwnd,
1542        auto.alpha,
1543        auto.beta,
1544        auto.baseline_percentile,
1545        auto.current_percentile,
1546        auto.long_window,
1547        auto.short_window,
1548        auto.tick_interval,
1549        histogram_active,
1550    );
1551}
1552
1553#[instrument(skip(func))] // "func" is not Debug printable
1554pub fn run<Fut, Summary, Error>(
1555    progress: Option<ProgressSettings>,
1556    output: OutputConfig,
1557    runtime_config: RuntimeConfig,
1558    throttle_config: ThrottleConfig,
1559    tracing_config: TracingConfig,
1560    func: impl FnOnce() -> Fut,
1561) -> Option<Summary>
1562// we return an Option rather than a Result to indicate that callers of this function should NOT print the error
1563where
1564    Summary: std::fmt::Display,
1565    Error: std::fmt::Display + std::fmt::Debug,
1566    Fut: std::future::Future<Output = Result<Summary, Error>>,
1567{
1568    // force initialization of PROGRESS to set start_time at the beginning of the run
1569    // (for remote master operations, PROGRESS is otherwise only accessed at the end in
1570    // print_runtime_stats(), leading to near-zero walltime)
1571    let _ = get_progress();
1572    if let Err(e) = throttle_config.validate() {
1573        eprintln!("Configuration error: {e}");
1574        return None;
1575    }
1576    let OutputConfig {
1577        quiet,
1578        verbose,
1579        print_summary,
1580        suppress_runtime_stats,
1581    } = output;
1582    // tracing guards must outlive the runtime so chrome/flame traces flush
1583    // extract trace_identifier before install_tracing_subscriber consumes tracing_config
1584    let trace_identifier = tracing_config.trace_identifier.clone();
1585    if let Err(e) = validate_histogram_log_target(&throttle_config, &trace_identifier) {
1586        eprintln!("Configuration error: {e}");
1587        return None;
1588    }
1589    let _tracing_guards = install_tracing_subscriber(quiet, verbose, tracing_config);
1590    let res = {
1591        let runtime = build_tokio_runtime(&runtime_config, &throttle_config);
1592        spawn_throttle_replenishers(&runtime, &throttle_config, &trace_identifier);
1593        let res = {
1594            let _progress_tracker = progress.map(|settings| {
1595                tracing::debug!("Requesting progress updates {settings:?}");
1596                let delay = settings.progress_delay.map(|delay_str| {
1597                    humantime::parse_duration(&delay_str)
1598                        .expect("Couldn't parse duration out of --progress-delay")
1599                });
1600                ProgressTracker::new(settings.progress_type, delay)
1601            });
1602            runtime.block_on(func())
1603        };
1604        match &res {
1605            Ok(summary) => {
1606                if print_summary || verbose > 0 {
1607                    println!("{summary}");
1608                }
1609            }
1610            Err(err) => {
1611                if !quiet {
1612                    println!("{err:?}");
1613                }
1614            }
1615        }
1616        if (print_summary || verbose > 0)
1617            && !suppress_runtime_stats
1618            && let Err(err) = print_runtime_stats()
1619        {
1620            println!("Failed to print runtime stats: {err:?}");
1621        }
1622        // Signal the histogram logger to exit cleanly so its final
1623        // snapshot is written before the runtime drops and aborts it.
1624        // No-op when histograms are disabled.
1625        signal_logger_cancel();
1626        if let Some(handle) = take_logger_handle() {
1627            // Bound the wait so a stuck logger can't hang shutdown — 1s is
1628            // generous: the logger only does a snapshot+flush on cancel.
1629            let _ = runtime.block_on(async {
1630                tokio::time::timeout(std::time::Duration::from_secs(1), handle).await
1631            });
1632        }
1633        res
1634        // runtime drops here, cancelling all spawned tasks (control
1635        // loop, adapter, replenishers) and releasing their permits.
1636    };
1637    // Clear process-wide state so a second `run()` in the same process
1638    // starts on a clean slate. Without this, a later run inherits the
1639    // previous auto-meta sample sink and ops-in-flight cap, and its
1640    // probes acquire against stale limits even when auto_meta is off.
1641    reset_process_throttle_state();
1642    res.ok()
1643}
1644
1645/// Reset process-wide throttle + congestion state to its pre-`run()`
1646/// defaults. Called by [`run`] on exit so callers that invoke it more
1647/// than once in a single process (library users, integration tests
1648/// outside this crate) aren't affected by the previous invocation's
1649/// decisions.
1650fn reset_process_throttle_state() {
1651    congestion::clear_sample_sink();
1652    observability::clear();
1653    for &side in &throttle::Side::ALL {
1654        for &op in &throttle::MetadataOp::ALL {
1655            throttle::set_max_ops_in_flight(throttle::Resource::meta(side, op), 0);
1656        }
1657    }
1658    throttle::disable_ops_throttle();
1659    // Without these resets, a second run() in the same process inherits
1660    // the previous run's open-files cap and iops-throttle even when the
1661    // caller passes 0 ("no limit"): `set_max_open_files` / `init_iops_tokens`
1662    // are skipped on 0, leaving the prior `setup(N)` in force. setup(0)
1663    // disables the semaphore, so the next run sees a clean slate and can
1664    // either re-init with a fresh value or stay disabled.
1665    throttle::set_max_open_files(0);
1666    throttle::init_iops_tokens(0);
1667}
1668
1669#[cfg(test)]
1670mod unit_label_tests {
1671    use super::unit_label;
1672    use congestion::{MetadataOp, Side};
1673
1674    #[test]
1675    fn lookup_ops_carry_side_prefix() {
1676        // Stat and ReadLink can be on either side, so disambiguate.
1677        assert_eq!(unit_label(Side::Source, MetadataOp::Stat), "src-stat");
1678        assert_eq!(unit_label(Side::Destination, MetadataOp::Stat), "dst-stat");
1679        assert_eq!(
1680            unit_label(Side::Source, MetadataOp::ReadLink),
1681            "src-read-link",
1682        );
1683        assert_eq!(
1684            unit_label(Side::Destination, MetadataOp::ReadLink),
1685            "dst-read-link",
1686        );
1687    }
1688
1689    #[test]
1690    fn destination_only_ops_drop_prefix() {
1691        // Mutations + open-create only fire on the destination, so the
1692        // active label has no side prefix — single-FS tools like rrm
1693        // see "unlink", "rmdir" instead of "dst-unlink".
1694        assert_eq!(unit_label(Side::Destination, MetadataOp::MkDir), "mkdir");
1695        assert_eq!(unit_label(Side::Destination, MetadataOp::RmDir), "rmdir");
1696        assert_eq!(unit_label(Side::Destination, MetadataOp::Unlink), "unlink");
1697        assert_eq!(
1698            unit_label(Side::Destination, MetadataOp::HardLink),
1699            "hard-link",
1700        );
1701        assert_eq!(
1702            unit_label(Side::Destination, MetadataOp::Symlink),
1703            "symlink"
1704        );
1705        assert_eq!(unit_label(Side::Destination, MetadataOp::Chmod), "chmod");
1706        assert_eq!(
1707            unit_label(Side::Destination, MetadataOp::OpenCreate),
1708            "open-create",
1709        );
1710    }
1711
1712    #[test]
1713    fn unused_source_side_mutation_slots_keep_src_prefix() {
1714        // The wiring registers a controller for every (side, op) pair,
1715        // including the unused (Source, mutation) slots. Those stay
1716        // idle and are hidden by the renderer, but if they ever fired
1717        // a probe (regression / wiring mistake) the label distinguishes
1718        // them from the active destination-side variant.
1719        assert_eq!(unit_label(Side::Source, MetadataOp::Unlink), "src-unlink");
1720        assert_eq!(unit_label(Side::Source, MetadataOp::MkDir), "src-mkdir");
1721    }
1722
1723    #[test]
1724    fn labels_are_unique_across_all_resources() {
1725        // Sanity: 18 distinct (Side, MetadataOp) pairs must produce 18
1726        // distinct labels — otherwise observability::register_unit would
1727        // create ambiguous panel rows.
1728        let mut seen = std::collections::HashSet::new();
1729        for &side in &Side::ALL {
1730            for &op in &MetadataOp::ALL {
1731                let label = unit_label(side, op);
1732                assert!(seen.insert(label), "duplicate label: {label}");
1733            }
1734        }
1735        assert_eq!(seen.len(), congestion::N_META_RESOURCES);
1736    }
1737}
1738
1739#[cfg(test)]
1740mod runtime_stats_tests {
1741    use super::*;
1742    use anyhow::Result;
1743
1744    #[test]
1745    fn collect_runtime_stats_matches_procfs_snapshot() -> Result<()> {
1746        let process = procfs::process::Process::myself()?;
1747        let expected = collect_runtime_stats_for_process(&process)?;
1748        let actual = collect_runtime_stats();
1749        let cpu_tolerance_ms = 50;
1750        let rss_tolerance_bytes = 1_000_000;
1751        assert!(
1752            expected.cpu_time_user_ms.abs_diff(actual.cpu_time_user_ms) <= cpu_tolerance_ms,
1753            "user CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1754            expected.cpu_time_user_ms,
1755            actual.cpu_time_user_ms
1756        );
1757        assert!(
1758            expected
1759                .cpu_time_kernel_ms
1760                .abs_diff(actual.cpu_time_kernel_ms)
1761                <= cpu_tolerance_ms,
1762            "kernel CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1763            expected.cpu_time_kernel_ms,
1764            actual.cpu_time_kernel_ms
1765        );
1766        assert!(
1767            expected.peak_rss_bytes.abs_diff(actual.peak_rss_bytes) <= rss_tolerance_bytes,
1768            "peak RSS deviated by more than {rss_tolerance_bytes} bytes: expected {}, got {}",
1769            expected.peak_rss_bytes,
1770            actual.peak_rss_bytes
1771        );
1772        Ok(())
1773    }
1774
1775    #[test]
1776    fn collect_runtime_stats_returns_default_on_error() {
1777        let stats = collect_runtime_stats_inner(None);
1778        assert_eq!(stats, RuntimeStats::default());
1779
1780        let nonexistent_process = procfs::process::Process::new(i32::MAX).ok();
1781        let stats = collect_runtime_stats_inner(nonexistent_process);
1782        assert_eq!(stats, RuntimeStats::default());
1783    }
1784}
1785
1786#[cfg(test)]
1787mod parse_preserve_settings_tests {
1788    use super::*;
1789    #[test]
1790    fn preset_all_returns_preserve_all() {
1791        let settings = parse_preserve_settings("all").unwrap();
1792        let expected = preserve::preserve_all();
1793        assert_eq!(settings.file.mode_mask, expected.file.mode_mask);
1794        assert!(settings.file.user_and_time.uid);
1795        assert!(settings.file.user_and_time.gid);
1796        assert!(settings.file.user_and_time.time);
1797        assert_eq!(settings.dir.mode_mask, expected.dir.mode_mask);
1798        assert!(settings.dir.user_and_time.uid);
1799        assert!(settings.dir.user_and_time.gid);
1800        assert!(settings.dir.user_and_time.time);
1801        assert!(settings.symlink.user_and_time.uid);
1802        assert!(settings.symlink.user_and_time.gid);
1803        assert!(settings.symlink.user_and_time.time);
1804    }
1805    #[test]
1806    fn preset_none_returns_preserve_none() {
1807        let settings = parse_preserve_settings("none").unwrap();
1808        let expected = preserve::preserve_none();
1809        assert_eq!(settings.file.mode_mask, expected.file.mode_mask);
1810        assert!(!settings.file.user_and_time.uid);
1811        assert!(!settings.file.user_and_time.gid);
1812        assert!(!settings.file.user_and_time.time);
1813        assert_eq!(settings.dir.mode_mask, expected.dir.mode_mask);
1814        assert!(!settings.dir.user_and_time.uid);
1815        assert!(!settings.dir.user_and_time.gid);
1816        assert!(!settings.dir.user_and_time.time);
1817        assert!(!settings.symlink.user_and_time.uid);
1818        assert!(!settings.symlink.user_and_time.gid);
1819        assert!(!settings.symlink.user_and_time.time);
1820    }
1821    #[test]
1822    fn per_type_settings_still_work() {
1823        let settings = parse_preserve_settings("f:uid,time,0777 d:gid").unwrap();
1824        assert!(settings.file.user_and_time.uid);
1825        assert!(settings.file.user_and_time.time);
1826        assert!(!settings.file.user_and_time.gid);
1827        assert_eq!(settings.file.mode_mask, 0o777);
1828        assert!(!settings.dir.user_and_time.uid);
1829        assert!(settings.dir.user_and_time.gid);
1830        assert!(!settings.dir.user_and_time.time);
1831    }
1832    #[test]
1833    fn invalid_settings_returns_error() {
1834        assert!(parse_preserve_settings("invalid").is_err());
1835        assert!(parse_preserve_settings("f:unknown_attr").is_err());
1836    }
1837}
1838
1839#[cfg(test)]
1840mod validate_update_compare_vs_preserve_tests {
1841    use super::*;
1842    #[test]
1843    fn detects_mtime_mismatch() {
1844        let compare = filecmp::MetadataCmpSettings {
1845            mtime: true,
1846            ..Default::default()
1847        };
1848        let preserve = preserve::preserve_none();
1849        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1850        assert!(result.is_err());
1851        assert!(result.unwrap_err().contains("mtime"));
1852    }
1853    #[test]
1854    fn detects_uid_mismatch() {
1855        let compare = filecmp::MetadataCmpSettings {
1856            uid: true,
1857            ..Default::default()
1858        };
1859        let preserve = preserve::preserve_none();
1860        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1861        assert!(result.is_err());
1862        assert!(result.unwrap_err().contains("uid"));
1863    }
1864    #[test]
1865    fn detects_gid_mismatch() {
1866        let compare = filecmp::MetadataCmpSettings {
1867            gid: true,
1868            ..Default::default()
1869        };
1870        let preserve = preserve::preserve_none();
1871        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1872        assert!(result.is_err());
1873        assert!(result.unwrap_err().contains("gid"));
1874    }
1875    #[test]
1876    fn detects_mode_mismatch() {
1877        let compare = filecmp::MetadataCmpSettings {
1878            mode: true,
1879            ..Default::default()
1880        };
1881        let mut preserve = preserve::preserve_none();
1882        preserve.file.mode_mask = 0;
1883        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1884        assert!(result.is_err());
1885        assert!(result.unwrap_err().contains("mode"));
1886    }
1887    #[test]
1888    fn detects_multiple_mismatches() {
1889        let compare = filecmp::MetadataCmpSettings {
1890            mtime: true,
1891            uid: true,
1892            gid: true,
1893            ..Default::default()
1894        };
1895        let preserve = preserve::preserve_none();
1896        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1897        assert!(result.is_err());
1898        let err = result.unwrap_err();
1899        assert!(err.contains("mtime"));
1900        assert!(err.contains("uid"));
1901        assert!(err.contains("gid"));
1902    }
1903    #[test]
1904    fn passes_when_preserve_covers_all_compared_attrs() {
1905        let compare = filecmp::MetadataCmpSettings {
1906            mtime: true,
1907            uid: true,
1908            gid: true,
1909            mode: true,
1910            size: true,  // always preserved, should not cause error
1911            ctime: true, // kernel-managed, should not cause error
1912        };
1913        let preserve = preserve::preserve_all();
1914        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1915        assert!(result.is_ok());
1916    }
1917    #[test]
1918    fn fails_with_partial_mode_mask_when_mode_compared() {
1919        // default mode_mask is 0o0777 which drops setuid/setgid/sticky bits,
1920        // but metadata_equal compares full mode (0o7777) — so this is lossy
1921        let compare = filecmp::MetadataCmpSettings {
1922            mode: true,
1923            ..Default::default()
1924        };
1925        let preserve = preserve::preserve_none();
1926        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1927        assert!(result.is_err());
1928        assert!(result.unwrap_err().contains("mode"));
1929    }
1930}
1931
1932#[cfg(test)]
1933mod resolve_log_path_tests {
1934    use super::*;
1935
1936    #[test]
1937    fn full_path_with_extension() {
1938        let p = std::path::Path::new("/tmp/foo.hdr");
1939        assert_eq!(
1940            resolve_log_path(p, "rcp"),
1941            std::path::PathBuf::from("/tmp/foo.rcp.hdr"),
1942        );
1943    }
1944
1945    #[test]
1946    fn bare_filename_resolves_to_current_dir() {
1947        let p = std::path::Path::new("foo.hdr");
1948        assert_eq!(
1949            resolve_log_path(p, "rcp"),
1950            std::path::PathBuf::from("./foo.rcp.hdr"),
1951        );
1952    }
1953
1954    #[test]
1955    fn no_extension_defaults_to_hdr() {
1956        let p = std::path::Path::new("/tmp/foo");
1957        assert_eq!(
1958            resolve_log_path(p, "rcp"),
1959            std::path::PathBuf::from("/tmp/foo.rcp.hdr"),
1960        );
1961    }
1962
1963    #[test]
1964    #[cfg(unix)]
1965    fn preserves_non_utf8_stem() {
1966        use std::os::unix::ffi::{OsStrExt, OsStringExt};
1967        // Build a path with an invalid-UTF-8 stem: /tmp/<0xFF><0xFE>.hdr
1968        let mut raw_name = vec![b'/', b't', b'm', b'p', b'/'];
1969        raw_name.extend_from_slice(&[0xFF, 0xFE]);
1970        raw_name.extend_from_slice(b".hdr");
1971        let p = std::path::PathBuf::from(std::ffi::OsString::from_vec(raw_name));
1972        let resolved = resolve_log_path(&p, "rcp");
1973        // The non-UTF-8 stem must be preserved; the suffix and extension
1974        // append cleanly.
1975        let bytes = resolved.as_os_str().as_bytes();
1976        assert!(
1977            bytes.windows(2).any(|w| w == [0xFF, 0xFE]),
1978            "non-UTF-8 bytes must survive resolution; got bytes: {bytes:?}",
1979        );
1980        assert!(
1981            bytes.ends_with(b".rcp.hdr"),
1982            "expected .rcp.hdr suffix; got bytes: {bytes:?}",
1983        );
1984    }
1985}
1986
1987#[cfg(test)]
1988mod validate_histogram_log_target_tests {
1989    use super::*;
1990
1991    fn throttle_with_log_path(path: Option<std::path::PathBuf>) -> ThrottleConfig {
1992        ThrottleConfig {
1993            histogram_enabled: path.is_some(),
1994            histogram_log_path: path,
1995            ..Default::default()
1996        }
1997    }
1998
1999    #[test]
2000    fn no_log_path_is_ok() {
2001        let throttle = throttle_with_log_path(None);
2002        assert!(validate_histogram_log_target(&throttle, "rcp").is_ok());
2003    }
2004
2005    #[test]
2006    fn writable_resolved_path_is_ok() {
2007        let dir = tempfile::tempdir().unwrap();
2008        let throttle = throttle_with_log_path(Some(dir.path().join("foo.hdr")));
2009        assert!(validate_histogram_log_target(&throttle, "rcp").is_ok());
2010    }
2011
2012    #[test]
2013    fn resolved_path_existing_as_directory_is_rejected() {
2014        // Create a directory at the exact resolved path; OpenOptions::open
2015        // with create+truncate fails when target is a directory.
2016        let dir = tempfile::tempdir().unwrap();
2017        let blocker = dir.path().join("foo.rcp.hdr");
2018        std::fs::create_dir(&blocker).unwrap();
2019        let throttle = throttle_with_log_path(Some(dir.path().join("foo.hdr")));
2020        let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2021        assert!(
2022            err.contains("histogram-log") && err.contains("foo.rcp.hdr"),
2023            "got: {err}",
2024        );
2025    }
2026
2027    #[test]
2028    fn resolved_path_in_missing_parent_is_rejected() {
2029        let throttle = throttle_with_log_path(Some("/nonexistent-dir-67890/foo.hdr".into()));
2030        let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2031        assert!(err.contains("histogram-log"), "got: {err}");
2032    }
2033
2034    #[test]
2035    fn log_path_pointing_at_existing_directory_is_rejected() {
2036        let dir = tempfile::tempdir().unwrap();
2037        let throttle = throttle_with_log_path(Some(dir.path().to_path_buf()));
2038        let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2039        assert!(err.contains("directory"), "got: {err}");
2040    }
2041
2042    #[test]
2043    fn log_path_with_no_filename_is_rejected() {
2044        // PathBuf::from("/") has parent() == None and file_name() == None.
2045        let throttle = throttle_with_log_path(Some(std::path::PathBuf::from("/")));
2046        let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2047        assert!(
2048            err.contains("filename") || err.contains("directory"),
2049            "got: {err}",
2050        );
2051    }
2052
2053    #[test]
2054    #[cfg(unix)]
2055    fn resolved_path_existing_as_symlink_is_rejected() {
2056        // Defense against symlink-based hijacking: a local attacker who
2057        // can pre-create the predictable suffixed path as a symlink must
2058        // not be able to redirect the truncating open to a victim file.
2059        let dir = tempfile::tempdir().unwrap();
2060        // The resolved path will be `<dir>/foo.rcp.hdr`. Pre-create it as
2061        // a symlink pointing somewhere else (in this test, just to a
2062        // sibling file we don't care about).
2063        let target = dir.path().join("victim.txt");
2064        std::fs::write(&target, b"do not clobber").unwrap();
2065        let resolved_path = dir.path().join("foo.rcp.hdr");
2066        std::os::unix::fs::symlink(&target, &resolved_path).unwrap();
2067        let throttle = throttle_with_log_path(Some(dir.path().join("foo.hdr")));
2068        let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2069        assert!(
2070            err.contains("symlink") || err.contains("ELOOP") || err.contains("Too many levels"),
2071            "got: {err}",
2072        );
2073        // Victim file content is preserved (the truncating open never reached it).
2074        let preserved = std::fs::read(&target).unwrap();
2075        assert_eq!(preserved, b"do not clobber");
2076    }
2077}