Skip to main content

common/
lib.rs

1//! Common utilities and types for RCP file operation tools
2//!
3//! This crate provides shared functionality used across all RCP tools (`rcp`, `rrm`, `rlink`, `rcmp`).
4//! It includes core operations (copy, remove, link, compare), progress reporting, metadata preservation, and runtime configuration.
5//!
6//! # Core Modules
7//!
8//! - [`mod@copy`] - File copying operations with metadata preservation and error handling
9//! - [`mod@rm`] - File removal operations
10//! - [`mod@link`] - Hard-linking operations
11//! - [`mod@cmp`] - File comparison operations (metadata-based)
12//! - [`mod@preserve`] - Metadata preservation settings and operations
13//! - [`mod@progress`] - Progress tracking and reporting
14//! - [`mod@filecmp`] - File metadata comparison utilities
15//! - [`mod@remote_tracing`] - Remote tracing support for distributed operations
16//!
17//! # Key Types
18//!
19//! ## `RcpdType`
20//!
21//! Identifies the role of a remote copy daemon:
22//! - `Source` - reads files from source host
23//! - `Destination` - writes files to destination host
24//!
25//! ## `ProgressType`
26//!
27//! Controls progress reporting display:
28//! - `Auto` - automatically choose based on terminal type
29//! - `ProgressBar` - animated progress bar (for interactive terminals)
30//! - `TextUpdates` - periodic text updates (for logging/non-interactive)
31//!
32//! # Progress Reporting
33//!
34//! The crate provides a global progress tracking system accessible via [`get_progress()`].
35//! Progress can be displayed in different formats depending on the execution context.
36//!
37//! Progress output goes to stderr, while logs go to stdout, allowing users to redirect logs to a file while still viewing interactive progress.
38//!
39//! # Runtime Configuration
40//!
41//! The [`run`] function provides a unified entry point for all RCP tools with support for:
42//! - Progress tracking and reporting
43//! - Logging configuration (quiet/verbose modes)
44//! - Resource limits (max workers, open files, throttling)
45//! - Tokio runtime setup
46//! - Remote tracing integration
47//!
48//! # Examples
49//!
50//! ## Basic Copy Operation
51//!
52//! ```rust,no_run
53//! use std::path::Path;
54//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
55//! let src = Path::new("/source");
56//! let dst = Path::new("/destination");
57//!
58//! let settings = common::copy::Settings {
59//!     dereference: false,
60//!     fail_early: false,
61//!     overwrite: false,
62//!     overwrite_compare: Default::default(),
63//!     overwrite_filter: None,
64//!     ignore_existing: false,
65//!     chunk_size: 0,
66//!     skip_specials: false,
67//!     remote_copy_buffer_size: 0,
68//!     filter: None,
69//!     dry_run: None,
70//!     delete: None,
71//! };
72//! let preserve = common::preserve::preserve_none();
73//!
74//! let summary = common::copy(src, dst, &settings, &preserve).await?;
75//! println!("Copied {} files", summary.files_copied);
76//! # Ok(())
77//! # }
78//! ```
79//!
80//! ## Metadata Comparison
81//!
82//! ```rust,no_run
83//! use std::path::Path;
84//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
85//! let src = Path::new("/path1");
86//! let dst = Path::new("/path2");
87//!
88//! // output differences to stdout (use false for quiet mode)
89//! let log = common::cmp::LogWriter::new(None, true, common::cmp::OutputFormat::default()).await?;
90//! let settings = common::cmp::Settings {
91//!     fail_early: false,
92//!     exit_early: false,
93//!     expand_missing: false,
94//!     compare: Default::default(),
95//!     filter: None,
96//! };
97//!
98//! let summary = common::cmp(src, dst, &log, &settings).await?;
99//! println!("Comparison complete: {}", summary);
100//! # Ok(())
101//! # }
102//! ```
103
104use crate::cmp::ObjType;
105use anyhow::Context;
106use anyhow::anyhow;
107use std::io::IsTerminal;
108use tracing::instrument;
109use tracing_subscriber::fmt::format::FmtSpan;
110use tracing_subscriber::prelude::*;
111
112mod auto_meta;
113pub mod chmod;
114pub mod cli;
115pub mod cmp;
116pub mod config;
117pub mod copy;
118pub mod delete;
119pub mod dry_run;
120pub mod error;
121pub mod error_collector;
122pub mod filegen;
123pub mod filter;
124pub mod histogram_logger;
125pub mod histogram_panel;
126pub mod link;
127pub mod observability;
128pub mod preserve;
129pub mod remote_tracing;
130pub mod rm;
131pub mod version;
132
133pub mod filecmp;
134pub mod progress;
135mod testutils;
136pub mod walk;
137
138pub use config::{
139    AutoMetaThrottleConfig, DryRunMode, DryRunWarnings, OutputConfig, RuntimeConfig,
140    ThrottleConfig, TracingConfig,
141};
142// Re-export `Side` from the congestion crate so downstream binaries
143// (rcp, rrm, …) and integration tests can pass `common::Side::Source` /
144// `common::Side::Destination` to `walk::next_entry_probed` and friends
145// without taking a direct dependency on `congestion`.
146pub use congestion::{MetadataOp, Side};
147pub use progress::{RcpdProgressPrinter, SerializableProgress};
148
149// Define RcpdType in common since remote depends on common
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
151pub enum RcpdType {
152    Source,
153    Destination,
154}
155
156impl std::fmt::Display for RcpdType {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        match self {
159            RcpdType::Source => write!(f, "source"),
160            RcpdType::Destination => write!(f, "destination"),
161        }
162    }
163}
164
165// Type alias for progress snapshots
166pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
167
168/// runtime statistics collected from a process (CPU time, memory usage)
169#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
170pub struct RuntimeStats {
171    /// user-mode CPU time in milliseconds
172    pub cpu_time_user_ms: u64,
173    /// kernel-mode CPU time in milliseconds
174    pub cpu_time_kernel_ms: u64,
175    /// peak resident set size in bytes
176    pub peak_rss_bytes: u64,
177}
178
179/// runtime stats collected from remote rcpd processes for display at the end of a remote copy
180#[derive(Debug, Default)]
181pub struct RemoteRuntimeStats {
182    pub source_host: String,
183    pub source_stats: RuntimeStats,
184    pub dest_host: String,
185    pub dest_stats: RuntimeStats,
186}
187
188/// checks if a host string refers to the local machine.
189/// returns true for `localhost`, `127.0.0.1`, `::1`, `[::1]`, or the actual hostname
190#[must_use]
191pub fn is_localhost(host: &str) -> bool {
192    if host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]" {
193        return true;
194    }
195    // check against actual hostname using gethostname
196    let mut buf = [0u8; 256];
197    // Safety: gethostname writes to buf and returns 0 on success
198    let result = unsafe { libc::gethostname(buf.as_mut_ptr() as *mut libc::c_char, buf.len()) };
199    if result == 0
200        && let Ok(hostname_cstr) = std::ffi::CStr::from_bytes_until_nul(&buf)
201        && let Ok(hostname) = hostname_cstr.to_str()
202        && host == hostname
203    {
204        return true;
205    }
206    false
207}
208
209static PROGRESS: std::sync::LazyLock<progress::Progress> =
210    std::sync::LazyLock::new(progress::Progress::new);
211static PBAR: std::sync::LazyLock<indicatif::ProgressBar> =
212    std::sync::LazyLock::new(indicatif::ProgressBar::new_spinner);
213static REMOTE_RUNTIME_STATS: std::sync::LazyLock<std::sync::Mutex<Option<RemoteRuntimeStats>>> =
214    std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
215static HISTOGRAM_LOGGER_CANCEL: std::sync::Mutex<Option<tokio::sync::watch::Sender<bool>>> =
216    std::sync::Mutex::new(None);
217static HISTOGRAM_LOGGER_HANDLE: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>> =
218    std::sync::Mutex::new(None);
219
220fn store_logger_cancel(tx: tokio::sync::watch::Sender<bool>) {
221    *HISTOGRAM_LOGGER_CANCEL
222        .lock()
223        .expect("histogram logger cancel mutex poisoned") = Some(tx);
224}
225
226fn store_logger_handle(handle: tokio::task::JoinHandle<()>) {
227    *HISTOGRAM_LOGGER_HANDLE
228        .lock()
229        .expect("histogram logger handle mutex poisoned") = Some(handle);
230}
231
232fn take_logger_handle() -> Option<tokio::task::JoinHandle<()>> {
233    HISTOGRAM_LOGGER_HANDLE
234        .lock()
235        .expect("histogram logger handle mutex poisoned")
236        .take()
237}
238
239fn signal_logger_cancel() {
240    if let Some(tx) = HISTOGRAM_LOGGER_CANCEL
241        .lock()
242        .expect("histogram logger cancel mutex poisoned")
243        .take()
244        && let Err(err) = tx.send(true)
245    {
246        tracing::debug!("histogram-logger cancel send failed (already gone): {err:#}");
247    }
248}
249
250#[must_use]
251pub fn get_progress() -> &'static progress::Progress {
252    &PROGRESS
253}
254
255/// stores remote runtime stats for display at the end of a remote copy operation
256pub fn set_remote_runtime_stats(stats: RemoteRuntimeStats) {
257    *REMOTE_RUNTIME_STATS.lock().unwrap() = Some(stats);
258}
259
260struct LocalTimeFormatter;
261
262impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFormatter {
263    fn format_time(
264        &self,
265        writer: &mut tracing_subscriber::fmt::format::Writer<'_>,
266    ) -> std::fmt::Result {
267        let now = chrono::Local::now();
268        writer.write_str(&now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
269    }
270}
271
272struct ProgressTracker {
273    lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
274    pbar_thread: Option<std::thread::JoinHandle<()>>,
275}
276
277#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
278pub enum ProgressType {
279    #[default]
280    #[value(name = "auto", alias = "Auto")]
281    Auto,
282    #[value(name = "ProgressBar", alias = "progress-bar")]
283    ProgressBar,
284    #[value(name = "TextUpdates", alias = "text-updates")]
285    TextUpdates,
286}
287
288pub enum GeneralProgressType {
289    User {
290        progress_type: ProgressType,
291        kind: progress::LocalProgressKind,
292    },
293    Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
294    RemoteMaster {
295        progress_type: ProgressType,
296        get_progress_snapshot:
297            Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>,
298    },
299}
300
301impl std::fmt::Debug for GeneralProgressType {
302    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303        match self {
304            GeneralProgressType::User {
305                progress_type,
306                kind,
307            } => write!(f, "User(progress_type: {progress_type:?}, kind: {kind:?})"),
308            GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
309            GeneralProgressType::RemoteMaster { progress_type, .. } => {
310                write!(
311                    f,
312                    "RemoteMaster(progress_type: {progress_type:?}, <function>)"
313                )
314            }
315        }
316    }
317}
318
319#[derive(Debug)]
320pub struct ProgressSettings {
321    pub progress_type: GeneralProgressType,
322    pub progress_delay: Option<String>,
323}
324
325fn progress_bar(
326    lock: &std::sync::Mutex<bool>,
327    cvar: &std::sync::Condvar,
328    delay_opt: &Option<std::time::Duration>,
329    kind: progress::LocalProgressKind,
330) {
331    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
332    PBAR.set_style(
333        indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
334            .unwrap()
335            .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
336    );
337    let mut prog_printer = progress::make_local_printer(kind, &PROGRESS);
338    let mut is_done = lock.lock().unwrap();
339    loop {
340        PBAR.set_position(PBAR.position() + 1); // do we need to update?
341        let mut msg = prog_printer.print().unwrap();
342        msg.push_str(&observability::render_lines());
343        msg.push_str(&render_panel_from_registry());
344        PBAR.set_message(msg);
345        let result = cvar.wait_timeout(is_done, delay).unwrap();
346        is_done = result.0;
347        if *is_done {
348            break;
349        }
350    }
351    PBAR.finish_and_clear();
352}
353
354fn get_datetime_prefix() -> String {
355    chrono::Local::now()
356        .format("%Y-%m-%dT%H:%M:%S%.3f%:z")
357        .to_string()
358}
359
360fn text_updates(
361    lock: &std::sync::Mutex<bool>,
362    cvar: &std::sync::Condvar,
363    delay_opt: &Option<std::time::Duration>,
364    kind: progress::LocalProgressKind,
365) {
366    let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
367    let mut prog_printer = progress::make_local_printer(kind, &PROGRESS);
368    let mut is_done = lock.lock().unwrap();
369    loop {
370        eprintln!("=======================");
371        eprintln!(
372            "{}\n--{}{}{}",
373            get_datetime_prefix(),
374            prog_printer.print().unwrap(),
375            observability::render_lines(),
376            render_panel_from_registry(),
377        );
378        let result = cvar.wait_timeout(is_done, delay).unwrap();
379        is_done = result.0;
380        if *is_done {
381            break;
382        }
383    }
384}
385
386fn rcpd_updates(
387    lock: &std::sync::Mutex<bool>,
388    cvar: &std::sync::Condvar,
389    delay_opt: &Option<std::time::Duration>,
390    sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
391) {
392    tracing::debug!("Starting rcpd progress updates");
393    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
394    let mut is_done = lock.lock().unwrap();
395    loop {
396        if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
397            // channel closed, receiver is done
398            tracing::debug!("Progress update channel closed, stopping progress updates");
399            break;
400        }
401        let result = cvar.wait_timeout(is_done, delay).unwrap();
402        is_done = result.0;
403        if *is_done {
404            break;
405        }
406    }
407}
408
409fn remote_master_updates<F>(
410    lock: &std::sync::Mutex<bool>,
411    cvar: &std::sync::Condvar,
412    delay_opt: &Option<std::time::Duration>,
413    get_progress_snapshot: F,
414    progress_type: ProgressType,
415) where
416    F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
417{
418    let interactive = match progress_type {
419        ProgressType::Auto => std::io::stderr().is_terminal(),
420        ProgressType::ProgressBar => true,
421        ProgressType::TextUpdates => false,
422    };
423    if interactive {
424        PBAR.set_style(
425            indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
426                .unwrap()
427                .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
428        );
429        let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
430        let mut printer = RcpdProgressPrinter::new();
431        let mut is_done = lock.lock().unwrap();
432        loop {
433            let progress_map = get_progress_snapshot();
434            let source_progress = &progress_map[RcpdType::Source];
435            let destination_progress = &progress_map[RcpdType::Destination];
436            PBAR.set_position(PBAR.position() + 1); // do we need to update?
437            let mut msg = printer
438                .print(source_progress, destination_progress)
439                .unwrap();
440            msg.push_str(&render_panel_from_registry());
441            PBAR.set_message(msg);
442            let result = cvar.wait_timeout(is_done, delay).unwrap();
443            is_done = result.0;
444            if *is_done {
445                break;
446            }
447        }
448        PBAR.finish_and_clear();
449    } else {
450        let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
451        let mut printer = RcpdProgressPrinter::new();
452        let mut is_done = lock.lock().unwrap();
453        loop {
454            let progress_map = get_progress_snapshot();
455            let source_progress = &progress_map[RcpdType::Source];
456            let destination_progress = &progress_map[RcpdType::Destination];
457            eprintln!("=======================");
458            eprintln!(
459                "{}\n--{}{}",
460                get_datetime_prefix(),
461                printer
462                    .print(source_progress, destination_progress)
463                    .unwrap(),
464                render_panel_from_registry(),
465            );
466            let result = cvar.wait_timeout(is_done, delay).unwrap();
467            is_done = result.0;
468            if *is_done {
469                break;
470            }
471        }
472    }
473}
474
475impl ProgressTracker {
476    pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
477        let lock_cvar =
478            std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
479        let lock_cvar_clone = lock_cvar.clone();
480        let pbar_thread = std::thread::spawn(move || {
481            let (lock, cvar) = &*lock_cvar_clone;
482            match progress_type {
483                GeneralProgressType::Remote(sender) => {
484                    rcpd_updates(lock, cvar, &delay_opt, sender);
485                }
486                GeneralProgressType::RemoteMaster {
487                    progress_type,
488                    get_progress_snapshot,
489                } => {
490                    remote_master_updates(
491                        lock,
492                        cvar,
493                        &delay_opt,
494                        get_progress_snapshot,
495                        progress_type,
496                    );
497                }
498                GeneralProgressType::User {
499                    progress_type,
500                    kind,
501                } => {
502                    let interactive = match progress_type {
503                        ProgressType::Auto => std::io::stderr().is_terminal(),
504                        ProgressType::ProgressBar => true,
505                        ProgressType::TextUpdates => false,
506                    };
507                    if interactive {
508                        progress_bar(lock, cvar, &delay_opt, kind);
509                    } else {
510                        text_updates(lock, cvar, &delay_opt, kind);
511                    }
512                }
513            }
514        });
515        Self {
516            lock_cvar,
517            pbar_thread: Some(pbar_thread),
518        }
519    }
520}
521
522impl Drop for ProgressTracker {
523    fn drop(&mut self) {
524        let (lock, cvar) = &*self.lock_cvar;
525        let mut is_done = lock.lock().unwrap();
526        *is_done = true;
527        cvar.notify_one();
528        drop(is_done);
529        if let Some(pbar_thread) = self.pbar_thread.take() {
530            pbar_thread.join().unwrap();
531        }
532    }
533}
534
535pub fn parse_metadata_cmp_settings(
536    settings: &str,
537) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
538    let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
539    for setting in settings.split(',') {
540        match setting {
541            "uid" => metadata_cmp_settings.uid = true,
542            "gid" => metadata_cmp_settings.gid = true,
543            "mode" => metadata_cmp_settings.mode = true,
544            "size" => metadata_cmp_settings.size = true,
545            "mtime" => metadata_cmp_settings.mtime = true,
546            "ctime" => metadata_cmp_settings.ctime = true,
547            _ => {
548                return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
549            }
550        }
551    }
552    Ok(metadata_cmp_settings)
553}
554
555fn parse_type_settings(
556    settings: &str,
557) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
558    let mut user_and_time = preserve::UserAndTimeSettings::default();
559    let mut mode_mask = None;
560    for setting in settings.split(',') {
561        match setting {
562            "uid" => user_and_time.uid = true,
563            "gid" => user_and_time.gid = true,
564            "time" => user_and_time.time = true,
565            _ => {
566                if let Ok(mask) = u32::from_str_radix(setting, 8) {
567                    mode_mask = Some(mask);
568                } else {
569                    return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
570                }
571            }
572        }
573    }
574    Ok((user_and_time, mode_mask))
575}
576
577pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
578    // handle presets
579    match settings {
580        "all" => return Ok(preserve::preserve_all()),
581        "none" => return Ok(preserve::preserve_none()),
582        _ => {}
583    }
584    let mut preserve_settings = preserve::Settings::default();
585    for type_settings in settings.split_whitespace() {
586        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
587            let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
588                format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
589            )?;
590            match obj_type {
591                "f" | "file" => {
592                    preserve_settings.file = preserve::FileSettings::default();
593                    preserve_settings.file.user_and_time = user_and_time_settings;
594                    if let Some(mode) = mode_opt {
595                        preserve_settings.file.mode_mask = mode;
596                    }
597                }
598                "d" | "dir" | "directory" => {
599                    preserve_settings.dir = preserve::DirSettings::default();
600                    preserve_settings.dir.user_and_time = user_and_time_settings;
601                    if let Some(mode) = mode_opt {
602                        preserve_settings.dir.mode_mask = mode;
603                    }
604                }
605                "l" | "link" | "symlink" => {
606                    preserve_settings.symlink = preserve::SymlinkSettings::default();
607                    preserve_settings.symlink.user_and_time = user_and_time_settings;
608                }
609                _ => {
610                    return Err(anyhow!("Unknown object type: {}", obj_type));
611                }
612            }
613        } else {
614            return Err(anyhow!("Invalid preserve settings: {}", settings));
615        }
616    }
617    Ok(preserve_settings)
618}
619
620/// Validates that every attribute checked by --update's comparison is actually being preserved.
621/// Skips size (always preserved via content copy) and ctime (kernel-managed, cannot be set).
622pub fn validate_update_compare_vs_preserve(
623    update_compare: &filecmp::MetadataCmpSettings,
624    preserve: &preserve::Settings,
625) -> Result<(), String> {
626    let mut missing = Vec::new();
627    if update_compare.mtime && !preserve.file.user_and_time.time {
628        missing.push("mtime");
629    }
630    if update_compare.uid && !preserve.file.user_and_time.uid {
631        missing.push("uid");
632    }
633    if update_compare.gid && !preserve.file.user_and_time.gid {
634        missing.push("gid");
635    }
636    // metadata_equal compares full mode (0o7777), so a partial mask is lossy
637    if update_compare.mode && preserve.file.mode_mask != 0o7777 {
638        missing.push("mode");
639    }
640    if missing.is_empty() {
641        Ok(())
642    } else {
643        Err(format!(
644            "--update compares [{}] but --preserve-settings does not preserve them. \
645             Use --allow-lossy-update to override or adjust --preserve-settings.",
646            missing.join(", ")
647        ))
648    }
649}
650
651pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
652    let mut cmp_settings = cmp::ObjSettings::default();
653    for type_settings in settings.split_whitespace() {
654        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
655            let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
656                "parsing preserve settings: {obj_settings}, type: {obj_type}"
657            ))?;
658            let obj_type = match obj_type {
659                "f" | "file" => ObjType::File,
660                "d" | "dir" | "directory" => ObjType::Dir,
661                "l" | "link" | "symlink" => ObjType::Symlink,
662                "o" | "other" => ObjType::Other,
663                _ => {
664                    return Err(anyhow!("Unknown obj type: {}", obj_type));
665                }
666            };
667            cmp_settings[obj_type] = obj_cmp_settings;
668        } else {
669            return Err(anyhow!("Invalid preserve settings: {}", settings));
670        }
671    }
672    Ok(cmp_settings)
673}
674
675pub async fn cmp(
676    src: &std::path::Path,
677    dst: &std::path::Path,
678    log: &cmp::LogWriter,
679    settings: &cmp::Settings,
680) -> Result<cmp::Summary, anyhow::Error> {
681    cmp::cmp(&PROGRESS, src, dst, log, settings).await
682}
683
684pub async fn copy(
685    src: &std::path::Path,
686    dst: &std::path::Path,
687    settings: &copy::Settings,
688    preserve: &preserve::Settings,
689) -> Result<copy::Summary, copy::Error> {
690    copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
691}
692
693pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
694    rm::rm(&PROGRESS, path, settings).await
695}
696
697pub async fn chmod(
698    path: &std::path::Path,
699    settings: &chmod::Settings,
700) -> Result<chmod::Summary, chmod::Error> {
701    chmod::chmod(&PROGRESS, path, settings).await
702}
703
704pub async fn link(
705    src: &std::path::Path,
706    dst: &std::path::Path,
707    update: &Option<std::path::PathBuf>,
708    settings: &link::Settings,
709) -> Result<link::Summary, link::Error> {
710    let cwd = std::env::current_dir()
711        .with_context(|| "failed to get current working directory")
712        .map_err(|err| link::Error::new(err, link::Summary::default()))?;
713    link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
714}
715
716fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
717    match std::env::var(name) {
718        Ok(val) => match val.parse() {
719            Ok(val) => val,
720            Err(_) => default,
721        },
722        Err(_) => default,
723    }
724}
725
726/// collects runtime statistics (CPU time, memory) for the current process
727#[must_use]
728pub fn collect_runtime_stats() -> RuntimeStats {
729    collect_runtime_stats_inner(procfs::process::Process::myself().ok())
730}
731
732fn collect_runtime_stats_inner(process: Option<procfs::process::Process>) -> RuntimeStats {
733    let Some(process) = process else {
734        return RuntimeStats::default();
735    };
736    collect_runtime_stats_for_process(&process).unwrap_or_default()
737}
738
739fn collect_runtime_stats_for_process(
740    process: &procfs::process::Process,
741) -> anyhow::Result<RuntimeStats> {
742    let stat = process.stat()?;
743    let clock_ticks = procfs::ticks_per_second() as f64;
744    // vmhwm from /proc/[pid]/status is in kB, convert to bytes
745    let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
746    Ok(RuntimeStats {
747        cpu_time_user_ms: ((stat.utime as f64 / clock_ticks) * 1000.0) as u64,
748        cpu_time_kernel_ms: ((stat.stime as f64 / clock_ticks) * 1000.0) as u64,
749        peak_rss_bytes: vmhwm_kb * 1024,
750    })
751}
752
753fn print_runtime_stats_for_role(prefix: &str, stats: &RuntimeStats) {
754    let cpu_total =
755        std::time::Duration::from_millis(stats.cpu_time_user_ms + stats.cpu_time_kernel_ms);
756    let cpu_kernel = std::time::Duration::from_millis(stats.cpu_time_kernel_ms);
757    let cpu_user = std::time::Duration::from_millis(stats.cpu_time_user_ms);
758    println!(
759        "{prefix}cpu time : {:.2?} | k: {:.2?} | u: {:.2?}",
760        cpu_total, cpu_kernel, cpu_user
761    );
762    println!(
763        "{prefix}peak RSS : {}",
764        bytesize::ByteSize(stats.peak_rss_bytes)
765    );
766}
767
768#[rustfmt::skip]
769fn print_runtime_stats() -> Result<(), anyhow::Error> {
770    // check if we have remote runtime stats (from a remote copy operation)
771    let remote_stats = REMOTE_RUNTIME_STATS.lock().unwrap().take();
772    if let Some(remote) = remote_stats {
773        // print global walltime first
774        println!("walltime : {:.2?}", &PROGRESS.get_duration());
775        println!();
776        let source_is_local = is_localhost(&remote.source_host);
777        let dest_is_local = is_localhost(&remote.dest_host);
778        // collect master stats
779        let master_stats = collect_runtime_stats();
780        // print non-localhost roles first
781        if !source_is_local {
782            println!("SOURCE ({}):", remote.source_host);
783            print_runtime_stats_for_role("  ", &remote.source_stats);
784            println!();
785        }
786        if !dest_is_local {
787            println!("DESTINATION ({}):", remote.dest_host);
788            print_runtime_stats_for_role("  ", &remote.dest_stats);
789            println!();
790        }
791        // print combined localhost section
792        match (source_is_local, dest_is_local) {
793            (true, true) => {
794                println!("MASTER + SOURCE + DESTINATION (localhost):");
795                print_runtime_stats_for_role("  master ", &master_stats);
796                print_runtime_stats_for_role("  source ", &remote.source_stats);
797                print_runtime_stats_for_role("  dest   ", &remote.dest_stats);
798            }
799            (true, false) => {
800                println!("MASTER + SOURCE (localhost):");
801                print_runtime_stats_for_role("  master ", &master_stats);
802                print_runtime_stats_for_role("  source ", &remote.source_stats);
803            }
804            (false, true) => {
805                println!("MASTER + DESTINATION (localhost):");
806                print_runtime_stats_for_role("  master ", &master_stats);
807                print_runtime_stats_for_role("  dest   ", &remote.dest_stats);
808            }
809            (false, false) => {
810                println!("MASTER (localhost):");
811                print_runtime_stats_for_role("  ", &master_stats);
812            }
813        }
814        return Ok(());
815    }
816    // local operation - print stats for this process only
817    let process = procfs::process::Process::myself()?;
818    let stat = process.stat()?;
819    // The time is in clock ticks, so we need to convert it to seconds
820    let clock_ticks_per_second = procfs::ticks_per_second();
821    let ticks_to_duration = |ticks: u64| {
822        std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
823    };
824    // vmhwm from /proc/[pid]/status is in kB, convert to bytes
825    let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
826    println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
827    println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
828    println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm_kb * 1024));
829    Ok(())
830}
831
832fn get_max_open_files() -> Result<u64, std::io::Error> {
833    let mut rlim = libc::rlimit {
834        rlim_cur: 0,
835        rlim_max: 0,
836    };
837    // Safety: we pass a valid "rlim" pointer and the result is checked
838    let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
839    if result == 0 {
840        Ok(rlim.rlim_cur)
841    } else {
842        Err(std::io::Error::last_os_error())
843    }
844}
845
846struct ProgWriter {}
847
848impl ProgWriter {
849    fn new() -> Self {
850        Self {}
851    }
852}
853
854impl std::io::Write for ProgWriter {
855    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
856        PBAR.suspend(|| std::io::stdout().write(buf))
857    }
858    fn flush(&mut self) -> std::io::Result<()> {
859        std::io::stdout().flush()
860    }
861}
862
863fn get_hostname() -> String {
864    nix::unistd::gethostname()
865        .ok()
866        .and_then(|os_str| os_str.into_string().ok())
867        .unwrap_or_else(|| "unknown".to_string())
868}
869
870#[must_use]
871pub fn generate_debug_log_filename(prefix: &str) -> String {
872    let now = chrono::Utc::now();
873    let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
874    let process_id = std::process::id();
875    format!("{prefix}-{timestamp}-{process_id}")
876}
877
878/// Generate a trace filename with identifier, hostname, PID, and timestamp.
879///
880/// `identifier` should be "rcp", "rcpd-source", or "rcpd-destination"
881#[must_use]
882pub fn generate_trace_filename(prefix: &str, identifier: &str, extension: &str) -> String {
883    let hostname = get_hostname();
884    let pid = std::process::id();
885    let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S");
886    format!("{prefix}-{identifier}-{hostname}-{pid}-{timestamp}.{extension}")
887}
888
889/// Build the verbose-level [`tracing_subscriber::EnvFilter`] used by every
890/// non-profile tracing layer (file, fmt, remote). Excludes noisy deps that are
891/// rarely useful when debugging rcp.
892fn build_verbose_env_filter(verbose: u8) -> tracing_subscriber::EnvFilter {
893    let level_directive = match verbose {
894        0 => "error".parse().unwrap(),
895        1 => "info".parse().unwrap(),
896        2 => "debug".parse().unwrap(),
897        _ => "trace".parse().unwrap(),
898    };
899    tracing_subscriber::EnvFilter::from_default_env()
900        .add_directive(level_directive)
901        .add_directive("tokio=info".parse().unwrap())
902        .add_directive("runtime=info".parse().unwrap())
903        .add_directive("quinn=warn".parse().unwrap())
904        .add_directive("rustls=warn".parse().unwrap())
905        .add_directive("h2=warn".parse().unwrap())
906}
907
908/// Build the [`tracing_subscriber::EnvFilter`] used by chrome/flame profile
909/// layers. Profiling layers don't share the verbose-level filter because they
910/// have their own `--profile-level`. Returns the formatted filter string —
911/// callers re-parse it per layer because EnvFilter isn't Clone.
912fn build_profile_filter_str(profile_level: Option<&str>) -> String {
913    let level_str = profile_level.unwrap_or("trace");
914    let valid_levels = ["trace", "debug", "info", "warn", "error", "off"];
915    if !valid_levels.contains(&level_str.to_lowercase().as_str()) {
916        eprintln!(
917            "Invalid --profile-level '{level_str}'. Valid values: trace, debug, info, warn, error, off"
918        );
919        std::process::exit(1);
920    }
921    format!("tokio=off,quinn=off,h2=off,hyper=off,rustls=off,{level_str}")
922}
923
924/// Guards from chrome/flame tracing layers that must outlive the runtime to
925/// flush traces on shutdown. Hold the returned struct for the lifetime of the
926/// run.
927#[allow(dead_code)] // fields are kept alive only for their Drop side-effects
928struct TracingGuards {
929    chrome: Option<tracing_chrome::FlushGuard>,
930    flame: Option<tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>>,
931}
932
933/// Install the global [`tracing_subscriber`] registry from a [`TracingConfig`].
934/// Caller must hold the returned [`TracingGuards`] until the run finishes so
935/// that chrome/flame traces are flushed before the file handles close.
936///
937/// In quiet mode this is a no-op (the subscriber is never installed).
938fn install_tracing_subscriber(
939    quiet: bool,
940    verbose: u8,
941    tracing_config: TracingConfig,
942) -> TracingGuards {
943    if quiet {
944        assert!(
945            verbose == 0,
946            "Quiet mode and verbose mode are mutually exclusive"
947        );
948        return TracingGuards {
949            chrome: None,
950            flame: None,
951        };
952    }
953    let TracingConfig {
954        remote_layer: remote_tracing_layer,
955        debug_log_file,
956        chrome_trace_prefix,
957        flamegraph_prefix,
958        trace_identifier,
959        profile_level,
960        tokio_console,
961        tokio_console_port,
962    } = tracing_config;
963    let file_layer = debug_log_file.map(|log_file_path| {
964        let file = std::fs::OpenOptions::new()
965            .create(true)
966            .append(true)
967            .open(&log_file_path)
968            .unwrap_or_else(|e| {
969                panic!("Failed to create debug log file at '{log_file_path}': {e}")
970            });
971        tracing_subscriber::fmt::layer()
972            .with_target(true)
973            .with_line_number(true)
974            .with_thread_ids(true)
975            .with_timer(LocalTimeFormatter)
976            .with_ansi(false)
977            .with_writer(file)
978            .with_filter(build_verbose_env_filter(verbose))
979    });
980    // fmt_layer for local console output (when not using remote tracing)
981    let fmt_layer = if remote_tracing_layer.is_some() {
982        None
983    } else {
984        Some(
985            tracing_subscriber::fmt::layer()
986                .with_target(true)
987                .with_line_number(true)
988                .with_span_events(if verbose > 2 {
989                    FmtSpan::NEW | FmtSpan::CLOSE
990                } else {
991                    FmtSpan::NONE
992                })
993                .with_timer(LocalTimeFormatter)
994                .pretty()
995                .with_writer(ProgWriter::new)
996                .with_filter(build_verbose_env_filter(verbose)),
997        )
998    };
999    // apply env_filter to remote_tracing_layer so it respects verbose level
1000    let remote_tracing_layer =
1001        remote_tracing_layer.map(|layer| layer.with_filter(build_verbose_env_filter(verbose)));
1002    let console_layer = tokio_console.then(|| {
1003        let console_port = tokio_console_port.unwrap_or(6669);
1004        let retention_seconds: u64 =
1005            read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
1006        eprintln!("Tokio console server listening on 127.0.0.1:{console_port}");
1007        console_subscriber::ConsoleLayer::builder()
1008            .retention(std::time::Duration::from_secs(retention_seconds))
1009            .server_addr(([127, 0, 0, 1], console_port))
1010            .spawn()
1011    });
1012    // chrome/flame share a profile filter; build the string once and re-parse
1013    // per layer (EnvFilter isn't Clone).
1014    let profile_filter_str = (chrome_trace_prefix.is_some() || flamegraph_prefix.is_some())
1015        .then(|| build_profile_filter_str(profile_level.as_deref()));
1016    let make_profile_filter =
1017        || tracing_subscriber::EnvFilter::new(profile_filter_str.as_ref().unwrap());
1018    let mut chrome_guard = None;
1019    let chrome_layer = chrome_trace_prefix.as_ref().map(|prefix| {
1020        let filename = generate_trace_filename(prefix, &trace_identifier, "json");
1021        eprintln!("Chrome trace will be written to: {filename}");
1022        let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
1023            .file(&filename)
1024            .include_args(true)
1025            .build();
1026        chrome_guard = Some(guard);
1027        layer.with_filter(make_profile_filter())
1028    });
1029    let mut flame_guard = None;
1030    let flame_layer = flamegraph_prefix.as_ref().and_then(|prefix| {
1031        let filename = generate_trace_filename(prefix, &trace_identifier, "folded");
1032        eprintln!("Flamegraph data will be written to: {filename}");
1033        match tracing_flame::FlameLayer::with_file(&filename) {
1034            Ok((layer, guard)) => {
1035                flame_guard = Some(guard);
1036                Some(layer.with_filter(make_profile_filter()))
1037            }
1038            Err(e) => {
1039                eprintln!("Failed to create flamegraph layer: {e}");
1040                None
1041            }
1042        }
1043    });
1044    tracing_subscriber::registry()
1045        .with(file_layer)
1046        .with(fmt_layer)
1047        .with(remote_tracing_layer)
1048        .with(console_layer)
1049        .with(chrome_layer)
1050        .with(flame_layer)
1051        .init();
1052    TracingGuards {
1053        chrome: chrome_guard,
1054        flame: flame_guard,
1055    }
1056}
1057
1058/// Build a multi-threaded tokio runtime configured per `runtime`, and apply
1059/// the `max_open_files` limit from `throttle`. Falls back to ~80% of the
1060/// system rlimit (capped at 4096) when `max_open_files` is unset.
1061fn build_tokio_runtime(
1062    runtime: &RuntimeConfig,
1063    throttle: &ThrottleConfig,
1064) -> tokio::runtime::Runtime {
1065    let mut builder = tokio::runtime::Builder::new_multi_thread();
1066    builder.enable_all();
1067    if runtime.max_workers > 0 {
1068        builder.worker_threads(runtime.max_workers);
1069    }
1070    if runtime.max_blocking_threads > 0 {
1071        builder.max_blocking_threads(runtime.max_blocking_threads);
1072    }
1073    if !sysinfo::set_open_files_limit(usize::MAX) {
1074        tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
1075    }
1076    let set_max_open_files = throttle.max_open_files.unwrap_or_else(|| {
1077        let limit = get_max_open_files().expect(
1078            "We failed to query rlimit, if this is expected try specifying --max-open-files",
1079        ) as usize;
1080        // use ~80% of the system limit, but cap at 4096 to avoid overwhelming
1081        // distributed filesystems
1082        std::cmp::min(limit / 10 * 8, 4096)
1083    });
1084    if set_max_open_files > 0 {
1085        tracing::info!("Setting max open files to: {}", set_max_open_files);
1086        throttle::set_max_open_files(set_max_open_files);
1087    } else {
1088        tracing::info!("Not applying any limit to max open files!");
1089    }
1090    builder.build().expect("Failed to create runtime")
1091}
1092
1093/// Spawn the ops/iops throttle replenisher tasks onto `runtime` if the
1094/// throttles are enabled.
1095///
1096/// When `auto_meta` is set, the ops-throttle is forced to a fixed 100ms
1097/// replenish interval (matching the constant the auto-meta adapter uses
1098/// when converting `Decision::rate_per_sec` → tokens-per-interval) *and*
1099/// is bootstrapped even if `--ops-throttle` was zero. That way:
1100///
1101/// 1. a future rate-aware controller's `rate_per_sec: Some(_)` decisions
1102///    actually gate ops instead of silently no-opping;
1103/// 2. the adapter's 100ms conversion assumption matches the thread's
1104///    real interval, regardless of the user's static `--ops-throttle`
1105///    value.
1106fn spawn_throttle_replenishers(
1107    runtime: &tokio::runtime::Runtime,
1108    throttle: &ThrottleConfig,
1109    trace_identifier: &str,
1110) {
1111    fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
1112        let mut replenish = replenish;
1113        let mut interval = std::time::Duration::from_secs(1);
1114        while replenish > 100 && interval > std::time::Duration::from_millis(1) {
1115            replenish /= 10;
1116            interval /= 10;
1117        }
1118        (replenish, interval)
1119    }
1120    let auto_meta_on = throttle.auto_meta.is_some();
1121    if auto_meta_on {
1122        // Force the fixed 100ms cadence the adapter assumes. Bootstrap
1123        // with at least 1 token so `setup()` enables the semaphore; if
1124        // the user didn't pass `--ops-throttle`, immediately disable —
1125        // the adapter re-enables only when a rate decision arrives.
1126        let interval = std::time::Duration::from_millis(100);
1127        let initial_replenish = (throttle.ops_throttle as f64 * 0.1) as usize;
1128        throttle::init_ops_tokens(initial_replenish.max(1));
1129        if throttle.ops_throttle == 0 {
1130            throttle::disable_ops_throttle();
1131        }
1132        runtime.spawn(throttle::run_ops_replenish_thread(
1133            initial_replenish,
1134            interval,
1135        ));
1136    } else if throttle.ops_throttle > 0 {
1137        let (replenish, interval) = get_replenish_interval(throttle.ops_throttle);
1138        throttle::init_ops_tokens(replenish);
1139        runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
1140    }
1141    if throttle.iops_throttle > 0 {
1142        let (replenish, interval) = get_replenish_interval(throttle.iops_throttle);
1143        throttle::init_iops_tokens(replenish);
1144        runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
1145    }
1146    if let Some(auto) = throttle.auto_meta {
1147        spawn_auto_meta_throttle(
1148            runtime,
1149            auto,
1150            throttle.histogram_enabled,
1151            throttle.histogram_log_path.clone(),
1152            throttle.histogram_interval,
1153            trace_identifier,
1154        );
1155    }
1156}
1157
1158/// Compute the per-tool resolved log path by inserting `trace_identifier`
1159/// between the user-supplied stem and extension. Mirrors the
1160/// chrome_trace_prefix convention so master and rcpds don't collide on
1161/// localhost runs.
1162///
1163/// Handles three edge cases consistently with the validator:
1164/// - bare filename (`foo.hdr`): parent → `.`
1165/// - no extension (`foo`): extension → `hdr`
1166/// - no stem (`.hidden`): stem → `auto-meta`
1167///
1168/// Non-UTF-8 stem and extension components (valid on Unix) are preserved
1169/// unchanged; only genuinely absent components fall back to defaults.
1170fn resolve_log_path(path: &std::path::Path, trace_identifier: &str) -> std::path::PathBuf {
1171    let parent = match path.parent() {
1172        Some(p) if p.as_os_str().is_empty() => std::path::Path::new("."),
1173        Some(p) => p,
1174        None => std::path::Path::new("."),
1175    };
1176    let mut name: std::ffi::OsString = path
1177        .file_stem()
1178        .map(|s| s.to_os_string())
1179        .unwrap_or_else(|| std::ffi::OsString::from("auto-meta"));
1180    name.push(".");
1181    name.push(trace_identifier);
1182    name.push(".");
1183    match path.extension() {
1184        Some(e) => name.push(e),
1185        None => name.push("hdr"),
1186    }
1187    parent.join(name)
1188}
1189
1190/// Verify the resolved histogram log path can actually be opened for
1191/// writing. Called from [`run`] before any work begins so a typo or
1192/// permission issue surfaces as a configuration error rather than a
1193/// silent warning at runtime.
1194///
1195/// The check creates+truncates the file (matching what the logger
1196/// task does later) so it catches "exists as a directory", "exists
1197/// as an unwritable file", and most permission issues. The logger's
1198/// own open later in startup will reuse the same path; the double-
1199/// open is harmless (the logger truncates again).
1200fn validate_histogram_log_target(
1201    throttle: &ThrottleConfig,
1202    trace_identifier: &str,
1203) -> Result<(), String> {
1204    let Some(path) = &throttle.histogram_log_path else {
1205        return Ok(());
1206    };
1207    if path.is_dir() {
1208        return Err(format!(
1209            "--auto-meta-histogram-log {path:?} is a directory; expected a file path",
1210        ));
1211    }
1212    if path.file_name().is_none() {
1213        return Err(format!(
1214            "--auto-meta-histogram-log {path:?} has no filename component",
1215        ));
1216    }
1217    let resolved = resolve_log_path(path, trace_identifier);
1218    let mut open_options = std::fs::OpenOptions::new();
1219    open_options.create(true).write(true).truncate(true);
1220    #[cfg(unix)]
1221    {
1222        use std::os::unix::fs::OpenOptionsExt;
1223        open_options.custom_flags(libc::O_NOFOLLOW);
1224    }
1225    match open_options.open(&resolved) {
1226        Ok(_) => Ok(()),
1227        Err(err) => {
1228            // ELOOP from O_NOFOLLOW reads as "too many levels of symbolic links"
1229            // — make it explicit that this rejection is intentional security.
1230            #[cfg(unix)]
1231            let context = if err.raw_os_error() == Some(libc::ELOOP) {
1232                " (resolved path is a symlink, which would let a local attacker hijack the write)"
1233            } else {
1234                ""
1235            };
1236            #[cfg(not(unix))]
1237            let context = "";
1238            Err(format!(
1239                "--auto-meta-histogram-log cannot create resolved path {resolved:?}: {err:#}{context}",
1240            ))
1241        }
1242    }
1243}
1244
1245/// Stable label for a `(Side, MetadataOp)` controller.
1246///
1247/// Naming rule:
1248/// - **Lookups** (`Stat`, `ReadLink`) happen on either filesystem, so
1249///   the label always carries an explicit `src-` / `dst-` prefix to
1250///   disambiguate. Example: `src-stat`, `dst-read-link`.
1251/// - **Mutations and `open(O_CREAT)`** only ever occur on the
1252///   destination side (sources are immutable in copy/cmp/link/rm).
1253///   These labels drop the side prefix entirely. Example: `mkdir`,
1254///   `unlink`, `rmdir`, `hard-link`, `symlink`, `chmod`, `open-create`.
1255///
1256/// The result: single-filesystem tools like `rrm` show clean labels
1257/// (`src-stat`, `unlink`, `rmdir`) instead of the prior misleading
1258/// `meta-src` / `meta-dst` framing — there is no second filesystem to
1259/// distinguish from. Dual-filesystem tools (rcp, rcmp, rlink) still
1260/// disambiguate the two stat / read-link controllers cleanly.
1261///
1262/// Implemented as a `const fn` over a fixed match table so the label
1263/// set is a compile-time constant — no allocation, no `Box::leak`, and
1264/// no per-`run()` accumulation when callers invoke the runtime more
1265/// than once in a single process.
1266const fn unit_label(side: congestion::Side, op: congestion::MetadataOp) -> &'static str {
1267    use congestion::MetadataOp::*;
1268    use congestion::Side::*;
1269    match (side, op) {
1270        // Lookups: prefix with side because both sides exercise them.
1271        (Source, Stat) => "src-stat",
1272        (Destination, Stat) => "dst-stat",
1273        (Source, ReadLink) => "src-read-link",
1274        (Destination, ReadLink) => "dst-read-link",
1275        // Destination-only ops: no prefix in the active case. The
1276        // (Source, op) slot is wired but never sees a sample under
1277        // normal operation; the renderer hides it. The `src-` label is
1278        // kept so any debugging surface still disambiguates the slot
1279        // from the active destination one if it ever fires.
1280        (Destination, MkDir) => "mkdir",
1281        (Source, MkDir) => "src-mkdir",
1282        (Destination, RmDir) => "rmdir",
1283        (Source, RmDir) => "src-rmdir",
1284        (Destination, Unlink) => "unlink",
1285        (Source, Unlink) => "src-unlink",
1286        (Destination, HardLink) => "hard-link",
1287        (Source, HardLink) => "src-hard-link",
1288        (Destination, Symlink) => "symlink",
1289        (Source, Symlink) => "src-symlink",
1290        (Destination, Chmod) => "chmod",
1291        (Source, Chmod) => "src-chmod",
1292        (Destination, OpenCreate) => "open-create",
1293        (Source, OpenCreate) => "src-open-create",
1294    }
1295}
1296
1297fn build_histogram_header(
1298    auto: &AutoMetaThrottleConfig,
1299    tool_name: &str,
1300    snapshot_interval: std::time::Duration,
1301) -> congestion::format::LogHeader {
1302    use congestion::format::{AutoMetaSnapshot, HdrSnapshot, LogHeader, UnitLabel};
1303    let hostname = nix::unistd::gethostname()
1304        .ok()
1305        .and_then(|h| h.into_string().ok())
1306        .unwrap_or_else(|| "unknown".into());
1307    let mut unit_labels = Vec::with_capacity(congestion::N_META_RESOURCES);
1308    for &side in &congestion::Side::ALL {
1309        for &op in &congestion::MetadataOp::ALL {
1310            unit_labels.push(UnitLabel {
1311                side: side as u8,
1312                op: op as u8,
1313                label: unit_label(side, op).to_string(),
1314            });
1315        }
1316    }
1317    LogHeader {
1318        format_version: congestion::format::FORMAT_VERSION,
1319        tool: tool_name.to_string(),
1320        tool_version: env!("CARGO_PKG_VERSION").to_string(),
1321        hostname,
1322        pid: std::process::id(),
1323        start_unix_micros: std::time::SystemTime::now()
1324            .duration_since(std::time::UNIX_EPOCH)
1325            .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
1326            .unwrap_or(0),
1327        snapshot_interval_micros: u64::try_from(snapshot_interval.as_micros()).unwrap_or(u64::MAX),
1328        auto_meta: AutoMetaSnapshot {
1329            initial_cwnd: auto.initial_cwnd,
1330            min_cwnd: auto.min_cwnd,
1331            max_cwnd: auto.max_cwnd,
1332            alpha: auto.alpha,
1333            beta: auto.beta,
1334            increase_step: auto.increase_step,
1335            decrease_step: auto.decrease_step,
1336            baseline_percentile: auto.baseline_percentile,
1337            current_percentile: auto.current_percentile,
1338            long_window_micros: u64::try_from(auto.long_window.as_micros()).unwrap_or(u64::MAX),
1339            short_window_micros: u64::try_from(auto.short_window.as_micros()).unwrap_or(u64::MAX),
1340            tick_interval_micros: u64::try_from(auto.tick_interval.as_micros()).unwrap_or(u64::MAX),
1341        },
1342        hdr: HdrSnapshot {
1343            lowest_discernible_micros: congestion::HDR_LOWEST_DISCERNIBLE_MICROS,
1344            highest_trackable_micros: congestion::HDR_HIGHEST_TRACKABLE_MICROS,
1345            significant_figures: congestion::HDR_SIGNIFICANT_FIGURES,
1346            unit: "microseconds".into(),
1347        },
1348        unit_labels,
1349    }
1350}
1351
1352fn render_panel_from_registry() -> String {
1353    let entries = observability::registered_histograms();
1354    if entries.is_empty() {
1355        return String::new();
1356    }
1357    let snapshots: Vec<hdrhistogram::Histogram<u64>> = entries
1358        .iter()
1359        .map(|e| (*e.snapshot_rx.borrow()).clone())
1360        .collect();
1361    let units: Vec<histogram_panel::PanelUnit> = entries
1362        .iter()
1363        .zip(snapshots.iter())
1364        .map(|(e, snap)| histogram_panel::PanelUnit {
1365            label: e.label,
1366            histogram: snap,
1367            interval: e.interval,
1368        })
1369        .collect();
1370    histogram_panel::render_histogram_panel(&units)
1371}
1372
1373/// Wire up the adaptive metadata-ops control loops — one per
1374/// `(Side, MetadataOp)` pair (18 in total):
1375///
1376/// 1. Seed every resource's `OPS_IN_FLIGHT_LIMIT_*` semaphore with the
1377///    controller's initial cwnd so the first probe on any resource
1378///    finds a permit available.
1379/// 2. Install one `RoutingSink` that fans metadata samples out to per-
1380///    `(side, op)` channels, each consumed by its own
1381///    `ControlUnit<RatioController>`. Each syscall on each side gets
1382///    an independent latency baseline and an independent cwnd, so a
1383///    saturated `unlink` path doesn't drag down `stat` (or vice versa).
1384/// 3. Spawn one combined adapter/monitor task per resource. By
1385///    convention `(Destination, Stat)` is the rate-driver — the global
1386///    `OPS_THROTTLE` is shared, so only one adapter may translate rate
1387///    decisions; all others apply concurrency only. The current
1388///    `RatioController` doesn't emit rate decisions, so the choice is
1389///    forward-looking.
1390/// 4. Each adapter exits cleanly when its control unit stops
1391///    publishing decisions, so they don't leak as unbounded background
1392///    loops.
1393///
1394/// Only one auto-meta config is supported per process. If a sample sink
1395/// was already installed, it is silently replaced.
1396fn spawn_auto_meta_throttle(
1397    runtime: &tokio::runtime::Runtime,
1398    auto: AutoMetaThrottleConfig,
1399    histogram_enabled: bool,
1400    histogram_log_path: Option<std::path::PathBuf>,
1401    histogram_interval: std::time::Duration,
1402    trace_identifier: &str,
1403) {
1404    let initial_cwnd = auto
1405        .initial_cwnd
1406        .clamp(auto.min_cwnd.max(1), auto.max_cwnd.max(1));
1407    let histogram_active = histogram_enabled || histogram_log_path.is_some();
1408    // Per-tool log path: each rcpd / master writes to its own file by
1409    // suffixing trace_identifier on the user-supplied path, mirroring
1410    // chrome_trace_prefix's convention.
1411    let resolved_log_path = histogram_log_path
1412        .as_ref()
1413        .map(|p| resolve_log_path(p, trace_identifier));
1414
1415    // Build receivers + accumulators in parallel arrays so we can pass
1416    // each accumulator both to a ControlUnit and to a LoggerUnit.
1417    let mut builder = congestion::RoutingSinkBuilder::new();
1418    struct Slot {
1419        label: &'static str,
1420        side: congestion::Side,
1421        op: congestion::MetadataOp,
1422        sample_rx: tokio::sync::mpsc::Receiver<congestion::Sample>,
1423        apply_rate: bool,
1424        accumulator: Option<std::sync::Arc<std::sync::Mutex<congestion::HistogramAccumulator>>>,
1425    }
1426    let mut slots: Vec<Slot> = Vec::with_capacity(congestion::N_META_RESOURCES);
1427    for &side in &congestion::Side::ALL {
1428        for &op in &congestion::MetadataOp::ALL {
1429            let resource = walk::meta_resource(side, op);
1430            throttle::set_max_ops_in_flight(resource, initial_cwnd as usize);
1431            let rx = builder.metadata_receiver(side, op);
1432            let apply_rate = matches!(
1433                (side, op),
1434                (congestion::Side::Destination, congestion::MetadataOp::Stat),
1435            );
1436            let accumulator = if histogram_active {
1437                let acc = std::sync::Arc::new(std::sync::Mutex::new(
1438                    congestion::HistogramAccumulator::new(),
1439                ));
1440                builder.metadata_histogram(side, op, acc.clone());
1441                Some(acc)
1442            } else {
1443                None
1444            };
1445            slots.push(Slot {
1446                label: unit_label(side, op),
1447                side,
1448                op,
1449                sample_rx: rx,
1450                apply_rate,
1451                accumulator,
1452            });
1453        }
1454    }
1455    let sink = std::sync::Arc::new(builder.build());
1456    congestion::install_sample_sink(sink.clone());
1457
1458    // Per-unit watch senders for the live histogram panel; collected into
1459    // a parallel vec so we can also build the logger's `LoggerUnit` list.
1460    let mut logger_units: Vec<histogram_logger::LoggerUnit> = Vec::new();
1461    for slot in slots {
1462        let controller = congestion::RatioController::new(congestion::RatioConfig {
1463            initial_cwnd: auto.initial_cwnd,
1464            min_cwnd: auto.min_cwnd,
1465            max_cwnd: auto.max_cwnd,
1466            alpha: auto.alpha,
1467            beta: auto.beta,
1468            increase_step: auto.increase_step,
1469            decrease_step: auto.decrease_step,
1470            baseline_percentile: auto.baseline_percentile,
1471            current_percentile: auto.current_percentile,
1472            long_window: auto.long_window,
1473            short_window: auto.short_window,
1474        });
1475        let (unit, decision_rx, snapshot_rx) = congestion::ControlUnit::new(
1476            slot.label,
1477            controller,
1478            slot.sample_rx,
1479            auto.tick_interval,
1480        );
1481        observability::register_unit(slot.label, snapshot_rx);
1482        if let Some(acc) = slot.accumulator.as_ref() {
1483            let (snap_tx, snap_rx) = tokio::sync::watch::channel(
1484                hdrhistogram::Histogram::<u64>::new_with_bounds(
1485                    congestion::HDR_LOWEST_DISCERNIBLE_MICROS,
1486                    congestion::HDR_HIGHEST_TRACKABLE_MICROS,
1487                    congestion::HDR_SIGNIFICANT_FIGURES,
1488                )
1489                .expect("histogram bounds valid"),
1490            );
1491            observability::register_histogram(slot.label, snap_rx, histogram_interval);
1492            logger_units.push(histogram_logger::LoggerUnit {
1493                label: slot.label,
1494                side: slot.side,
1495                op: slot.op,
1496                accumulator: acc.clone(),
1497                snapshot_tx: snap_tx,
1498            });
1499        }
1500        runtime.spawn(unit.run());
1501        runtime.spawn(auto_meta::run_adapter(
1502            walk::meta_resource(slot.side, slot.op),
1503            slot.apply_rate,
1504            decision_rx,
1505            sink.clone(),
1506        ));
1507    }
1508
1509    if histogram_active {
1510        let header = build_histogram_header(&auto, trace_identifier, histogram_interval);
1511        let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1512        store_logger_cancel(cancel_tx);
1513        // Snapshot the global PROGRESS counters into JSON each tick so
1514        // the binary log carries throughput/files-copied alongside the
1515        // latency distributions — readers can time-align them via the
1516        // shared unix_micros field. Encoding can't realistically fail
1517        // for this struct shape, but on the off chance it does we log
1518        // and return an empty Vec — the logger treats empty as "skip
1519        // this tick" rather than writing a record readers can't parse.
1520        let progress_source: histogram_logger::ProgressSource = Box::new(|| {
1521            let snapshot = progress::SerializableProgress::from(&*PROGRESS);
1522            serde_json::to_vec(&snapshot).unwrap_or_else(|err| {
1523                tracing::warn!(
1524                    "histogram-logger: SerializableProgress JSON encode failed: {err:#}; \
1525                     dropping this tick's progress record"
1526                );
1527                Vec::new()
1528            })
1529        });
1530        let handle = runtime.spawn(histogram_logger::run_logger(
1531            histogram_logger::LoggerConfig {
1532                interval: histogram_interval,
1533                log_path: resolved_log_path,
1534                header,
1535                progress_source: Some(progress_source),
1536            },
1537            logger_units,
1538            cancel_rx,
1539        ));
1540        store_logger_handle(handle);
1541    }
1542
1543    tracing::info!(
1544        "auto-meta-throttle enabled (per-(side, op) controllers, {} total): \
1545         initial_cwnd={}, max_cwnd={}, alpha={}, beta={}, \
1546         baseline_percentile={}, current_percentile={}, \
1547         long_window={:?}, short_window={:?}, tick={:?}, \
1548         histograms={}",
1549        congestion::N_META_RESOURCES,
1550        auto.initial_cwnd,
1551        auto.max_cwnd,
1552        auto.alpha,
1553        auto.beta,
1554        auto.baseline_percentile,
1555        auto.current_percentile,
1556        auto.long_window,
1557        auto.short_window,
1558        auto.tick_interval,
1559        histogram_active,
1560    );
1561}
1562
1563#[instrument(skip(func))] // "func" is not Debug printable
1564pub fn run<Fut, Summary, Error>(
1565    progress: Option<ProgressSettings>,
1566    output: OutputConfig,
1567    runtime_config: RuntimeConfig,
1568    throttle_config: ThrottleConfig,
1569    tracing_config: TracingConfig,
1570    func: impl FnOnce() -> Fut,
1571) -> Option<Summary>
1572// we return an Option rather than a Result to indicate that callers of this function should NOT print the error
1573where
1574    Summary: std::fmt::Display,
1575    Error: std::fmt::Display + std::fmt::Debug,
1576    Fut: std::future::Future<Output = Result<Summary, Error>>,
1577{
1578    // force initialization of PROGRESS to set start_time at the beginning of the run
1579    // (for remote master operations, PROGRESS is otherwise only accessed at the end in
1580    // print_runtime_stats(), leading to near-zero walltime)
1581    let _ = get_progress();
1582    if let Err(e) = throttle_config.validate() {
1583        eprintln!("Configuration error: {e}");
1584        return None;
1585    }
1586    let OutputConfig {
1587        quiet,
1588        verbose,
1589        print_summary,
1590        suppress_runtime_stats,
1591    } = output;
1592    // tracing guards must outlive the runtime so chrome/flame traces flush
1593    // extract trace_identifier before install_tracing_subscriber consumes tracing_config
1594    let trace_identifier = tracing_config.trace_identifier.clone();
1595    if let Err(e) = validate_histogram_log_target(&throttle_config, &trace_identifier) {
1596        eprintln!("Configuration error: {e}");
1597        return None;
1598    }
1599    let _tracing_guards = install_tracing_subscriber(quiet, verbose, tracing_config);
1600    let res = {
1601        let runtime = build_tokio_runtime(&runtime_config, &throttle_config);
1602        spawn_throttle_replenishers(&runtime, &throttle_config, &trace_identifier);
1603        let res = {
1604            let _progress_tracker = progress.map(|settings| {
1605                tracing::debug!("Requesting progress updates {settings:?}");
1606                let delay = settings.progress_delay.map(|delay_str| {
1607                    humantime::parse_duration(&delay_str)
1608                        .expect("Couldn't parse duration out of --progress-delay")
1609                });
1610                ProgressTracker::new(settings.progress_type, delay)
1611            });
1612            runtime.block_on(func())
1613        };
1614        match &res {
1615            Ok(summary) => {
1616                if print_summary || verbose > 0 {
1617                    println!("{summary}");
1618                }
1619            }
1620            Err(err) => {
1621                if !quiet {
1622                    println!("{err:?}");
1623                }
1624            }
1625        }
1626        if (print_summary || verbose > 0)
1627            && !suppress_runtime_stats
1628            && let Err(err) = print_runtime_stats()
1629        {
1630            println!("Failed to print runtime stats: {err:?}");
1631        }
1632        // Signal the histogram logger to exit cleanly so its final
1633        // snapshot is written before the runtime drops and aborts it.
1634        // No-op when histograms are disabled.
1635        signal_logger_cancel();
1636        if let Some(handle) = take_logger_handle() {
1637            // Bound the wait so a stuck logger can't hang shutdown — 1s is
1638            // generous: the logger only does a snapshot+flush on cancel.
1639            let _ = runtime.block_on(async {
1640                tokio::time::timeout(std::time::Duration::from_secs(1), handle).await
1641            });
1642        }
1643        res
1644        // runtime drops here, cancelling all spawned tasks (control
1645        // loop, adapter, replenishers) and releasing their permits.
1646    };
1647    // Clear process-wide state so a second `run()` in the same process
1648    // starts on a clean slate. Without this, a later run inherits the
1649    // previous auto-meta sample sink and ops-in-flight cap, and its
1650    // probes acquire against stale limits even when auto_meta is off.
1651    reset_process_throttle_state();
1652    res.ok()
1653}
1654
1655/// Reset process-wide throttle + congestion state to its pre-`run()`
1656/// defaults. Called by [`run`] on exit so callers that invoke it more
1657/// than once in a single process (library users, integration tests
1658/// outside this crate) aren't affected by the previous invocation's
1659/// decisions.
1660fn reset_process_throttle_state() {
1661    congestion::clear_sample_sink();
1662    observability::clear();
1663    for &side in &throttle::Side::ALL {
1664        for &op in &throttle::MetadataOp::ALL {
1665            throttle::set_max_ops_in_flight(throttle::Resource::meta(side, op), 0);
1666        }
1667    }
1668    throttle::disable_ops_throttle();
1669    // Without these resets, a second run() in the same process inherits
1670    // the previous run's open-files cap and iops-throttle even when the
1671    // caller passes 0 ("no limit"): `set_max_open_files` / `init_iops_tokens`
1672    // are skipped on 0, leaving the prior `setup(N)` in force. setup(0)
1673    // disables the semaphore, so the next run sees a clean slate and can
1674    // either re-init with a fresh value or stay disabled.
1675    throttle::set_max_open_files(0);
1676    throttle::init_iops_tokens(0);
1677}
1678
1679#[cfg(test)]
1680mod unit_label_tests {
1681    use super::unit_label;
1682    use congestion::{MetadataOp, Side};
1683
1684    #[test]
1685    fn lookup_ops_carry_side_prefix() {
1686        // Stat and ReadLink can be on either side, so disambiguate.
1687        assert_eq!(unit_label(Side::Source, MetadataOp::Stat), "src-stat");
1688        assert_eq!(unit_label(Side::Destination, MetadataOp::Stat), "dst-stat");
1689        assert_eq!(
1690            unit_label(Side::Source, MetadataOp::ReadLink),
1691            "src-read-link",
1692        );
1693        assert_eq!(
1694            unit_label(Side::Destination, MetadataOp::ReadLink),
1695            "dst-read-link",
1696        );
1697    }
1698
1699    #[test]
1700    fn destination_only_ops_drop_prefix() {
1701        // Mutations + open-create only fire on the destination, so the
1702        // active label has no side prefix — single-FS tools like rrm
1703        // see "unlink", "rmdir" instead of "dst-unlink".
1704        assert_eq!(unit_label(Side::Destination, MetadataOp::MkDir), "mkdir");
1705        assert_eq!(unit_label(Side::Destination, MetadataOp::RmDir), "rmdir");
1706        assert_eq!(unit_label(Side::Destination, MetadataOp::Unlink), "unlink");
1707        assert_eq!(
1708            unit_label(Side::Destination, MetadataOp::HardLink),
1709            "hard-link",
1710        );
1711        assert_eq!(
1712            unit_label(Side::Destination, MetadataOp::Symlink),
1713            "symlink"
1714        );
1715        assert_eq!(unit_label(Side::Destination, MetadataOp::Chmod), "chmod");
1716        assert_eq!(
1717            unit_label(Side::Destination, MetadataOp::OpenCreate),
1718            "open-create",
1719        );
1720    }
1721
1722    #[test]
1723    fn unused_source_side_mutation_slots_keep_src_prefix() {
1724        // The wiring registers a controller for every (side, op) pair,
1725        // including the unused (Source, mutation) slots. Those stay
1726        // idle and are hidden by the renderer, but if they ever fired
1727        // a probe (regression / wiring mistake) the label distinguishes
1728        // them from the active destination-side variant.
1729        assert_eq!(unit_label(Side::Source, MetadataOp::Unlink), "src-unlink");
1730        assert_eq!(unit_label(Side::Source, MetadataOp::MkDir), "src-mkdir");
1731    }
1732
1733    #[test]
1734    fn labels_are_unique_across_all_resources() {
1735        // Sanity: 18 distinct (Side, MetadataOp) pairs must produce 18
1736        // distinct labels — otherwise observability::register_unit would
1737        // create ambiguous panel rows.
1738        let mut seen = std::collections::HashSet::new();
1739        for &side in &Side::ALL {
1740            for &op in &MetadataOp::ALL {
1741                let label = unit_label(side, op);
1742                assert!(seen.insert(label), "duplicate label: {label}");
1743            }
1744        }
1745        assert_eq!(seen.len(), congestion::N_META_RESOURCES);
1746    }
1747}
1748
1749#[cfg(test)]
1750mod runtime_stats_tests {
1751    use super::*;
1752    use anyhow::Result;
1753
1754    #[test]
1755    fn collect_runtime_stats_matches_procfs_snapshot() -> Result<()> {
1756        let process = procfs::process::Process::myself()?;
1757        let expected = collect_runtime_stats_for_process(&process)?;
1758        let actual = collect_runtime_stats();
1759        let cpu_tolerance_ms = 50;
1760        let rss_tolerance_bytes = 1_000_000;
1761        assert!(
1762            expected.cpu_time_user_ms.abs_diff(actual.cpu_time_user_ms) <= cpu_tolerance_ms,
1763            "user CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1764            expected.cpu_time_user_ms,
1765            actual.cpu_time_user_ms
1766        );
1767        assert!(
1768            expected
1769                .cpu_time_kernel_ms
1770                .abs_diff(actual.cpu_time_kernel_ms)
1771                <= cpu_tolerance_ms,
1772            "kernel CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1773            expected.cpu_time_kernel_ms,
1774            actual.cpu_time_kernel_ms
1775        );
1776        assert!(
1777            expected.peak_rss_bytes.abs_diff(actual.peak_rss_bytes) <= rss_tolerance_bytes,
1778            "peak RSS deviated by more than {rss_tolerance_bytes} bytes: expected {}, got {}",
1779            expected.peak_rss_bytes,
1780            actual.peak_rss_bytes
1781        );
1782        Ok(())
1783    }
1784
1785    #[test]
1786    fn collect_runtime_stats_returns_default_on_error() {
1787        let stats = collect_runtime_stats_inner(None);
1788        assert_eq!(stats, RuntimeStats::default());
1789
1790        let nonexistent_process = procfs::process::Process::new(i32::MAX).ok();
1791        let stats = collect_runtime_stats_inner(nonexistent_process);
1792        assert_eq!(stats, RuntimeStats::default());
1793    }
1794}
1795
1796#[cfg(test)]
1797mod parse_preserve_settings_tests {
1798    use super::*;
1799    #[test]
1800    fn preset_all_returns_preserve_all() {
1801        let settings = parse_preserve_settings("all").unwrap();
1802        let expected = preserve::preserve_all();
1803        assert_eq!(settings.file.mode_mask, expected.file.mode_mask);
1804        assert!(settings.file.user_and_time.uid);
1805        assert!(settings.file.user_and_time.gid);
1806        assert!(settings.file.user_and_time.time);
1807        assert_eq!(settings.dir.mode_mask, expected.dir.mode_mask);
1808        assert!(settings.dir.user_and_time.uid);
1809        assert!(settings.dir.user_and_time.gid);
1810        assert!(settings.dir.user_and_time.time);
1811        assert!(settings.symlink.user_and_time.uid);
1812        assert!(settings.symlink.user_and_time.gid);
1813        assert!(settings.symlink.user_and_time.time);
1814    }
1815    #[test]
1816    fn preset_none_returns_preserve_none() {
1817        let settings = parse_preserve_settings("none").unwrap();
1818        let expected = preserve::preserve_none();
1819        assert_eq!(settings.file.mode_mask, expected.file.mode_mask);
1820        assert!(!settings.file.user_and_time.uid);
1821        assert!(!settings.file.user_and_time.gid);
1822        assert!(!settings.file.user_and_time.time);
1823        assert_eq!(settings.dir.mode_mask, expected.dir.mode_mask);
1824        assert!(!settings.dir.user_and_time.uid);
1825        assert!(!settings.dir.user_and_time.gid);
1826        assert!(!settings.dir.user_and_time.time);
1827        assert!(!settings.symlink.user_and_time.uid);
1828        assert!(!settings.symlink.user_and_time.gid);
1829        assert!(!settings.symlink.user_and_time.time);
1830    }
1831    #[test]
1832    fn per_type_settings_still_work() {
1833        let settings = parse_preserve_settings("f:uid,time,0777 d:gid").unwrap();
1834        assert!(settings.file.user_and_time.uid);
1835        assert!(settings.file.user_and_time.time);
1836        assert!(!settings.file.user_and_time.gid);
1837        assert_eq!(settings.file.mode_mask, 0o777);
1838        assert!(!settings.dir.user_and_time.uid);
1839        assert!(settings.dir.user_and_time.gid);
1840        assert!(!settings.dir.user_and_time.time);
1841    }
1842    #[test]
1843    fn invalid_settings_returns_error() {
1844        assert!(parse_preserve_settings("invalid").is_err());
1845        assert!(parse_preserve_settings("f:unknown_attr").is_err());
1846    }
1847}
1848
1849#[cfg(test)]
1850mod validate_update_compare_vs_preserve_tests {
1851    use super::*;
1852    #[test]
1853    fn detects_mtime_mismatch() {
1854        let compare = filecmp::MetadataCmpSettings {
1855            mtime: true,
1856            ..Default::default()
1857        };
1858        let preserve = preserve::preserve_none();
1859        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1860        assert!(result.is_err());
1861        assert!(result.unwrap_err().contains("mtime"));
1862    }
1863    #[test]
1864    fn detects_uid_mismatch() {
1865        let compare = filecmp::MetadataCmpSettings {
1866            uid: true,
1867            ..Default::default()
1868        };
1869        let preserve = preserve::preserve_none();
1870        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1871        assert!(result.is_err());
1872        assert!(result.unwrap_err().contains("uid"));
1873    }
1874    #[test]
1875    fn detects_gid_mismatch() {
1876        let compare = filecmp::MetadataCmpSettings {
1877            gid: true,
1878            ..Default::default()
1879        };
1880        let preserve = preserve::preserve_none();
1881        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1882        assert!(result.is_err());
1883        assert!(result.unwrap_err().contains("gid"));
1884    }
1885    #[test]
1886    fn detects_mode_mismatch() {
1887        let compare = filecmp::MetadataCmpSettings {
1888            mode: true,
1889            ..Default::default()
1890        };
1891        let mut preserve = preserve::preserve_none();
1892        preserve.file.mode_mask = 0;
1893        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1894        assert!(result.is_err());
1895        assert!(result.unwrap_err().contains("mode"));
1896    }
1897    #[test]
1898    fn detects_multiple_mismatches() {
1899        let compare = filecmp::MetadataCmpSettings {
1900            mtime: true,
1901            uid: true,
1902            gid: true,
1903            ..Default::default()
1904        };
1905        let preserve = preserve::preserve_none();
1906        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1907        assert!(result.is_err());
1908        let err = result.unwrap_err();
1909        assert!(err.contains("mtime"));
1910        assert!(err.contains("uid"));
1911        assert!(err.contains("gid"));
1912    }
1913    #[test]
1914    fn passes_when_preserve_covers_all_compared_attrs() {
1915        let compare = filecmp::MetadataCmpSettings {
1916            mtime: true,
1917            uid: true,
1918            gid: true,
1919            mode: true,
1920            size: true,  // always preserved, should not cause error
1921            ctime: true, // kernel-managed, should not cause error
1922        };
1923        let preserve = preserve::preserve_all();
1924        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1925        assert!(result.is_ok());
1926    }
1927    #[test]
1928    fn fails_with_partial_mode_mask_when_mode_compared() {
1929        // default mode_mask is 0o0777 which drops setuid/setgid/sticky bits,
1930        // but metadata_equal compares full mode (0o7777) — so this is lossy
1931        let compare = filecmp::MetadataCmpSettings {
1932            mode: true,
1933            ..Default::default()
1934        };
1935        let preserve = preserve::preserve_none();
1936        let result = validate_update_compare_vs_preserve(&compare, &preserve);
1937        assert!(result.is_err());
1938        assert!(result.unwrap_err().contains("mode"));
1939    }
1940}
1941
1942#[cfg(test)]
1943mod resolve_log_path_tests {
1944    use super::*;
1945
1946    #[test]
1947    fn full_path_with_extension() {
1948        let p = std::path::Path::new("/tmp/foo.hdr");
1949        assert_eq!(
1950            resolve_log_path(p, "rcp"),
1951            std::path::PathBuf::from("/tmp/foo.rcp.hdr"),
1952        );
1953    }
1954
1955    #[test]
1956    fn bare_filename_resolves_to_current_dir() {
1957        let p = std::path::Path::new("foo.hdr");
1958        assert_eq!(
1959            resolve_log_path(p, "rcp"),
1960            std::path::PathBuf::from("./foo.rcp.hdr"),
1961        );
1962    }
1963
1964    #[test]
1965    fn no_extension_defaults_to_hdr() {
1966        let p = std::path::Path::new("/tmp/foo");
1967        assert_eq!(
1968            resolve_log_path(p, "rcp"),
1969            std::path::PathBuf::from("/tmp/foo.rcp.hdr"),
1970        );
1971    }
1972
1973    #[test]
1974    #[cfg(unix)]
1975    fn preserves_non_utf8_stem() {
1976        use std::os::unix::ffi::{OsStrExt, OsStringExt};
1977        // Build a path with an invalid-UTF-8 stem: /tmp/<0xFF><0xFE>.hdr
1978        let mut raw_name = vec![b'/', b't', b'm', b'p', b'/'];
1979        raw_name.extend_from_slice(&[0xFF, 0xFE]);
1980        raw_name.extend_from_slice(b".hdr");
1981        let p = std::path::PathBuf::from(std::ffi::OsString::from_vec(raw_name));
1982        let resolved = resolve_log_path(&p, "rcp");
1983        // The non-UTF-8 stem must be preserved; the suffix and extension
1984        // append cleanly.
1985        let bytes = resolved.as_os_str().as_bytes();
1986        assert!(
1987            bytes.windows(2).any(|w| w == [0xFF, 0xFE]),
1988            "non-UTF-8 bytes must survive resolution; got bytes: {bytes:?}",
1989        );
1990        assert!(
1991            bytes.ends_with(b".rcp.hdr"),
1992            "expected .rcp.hdr suffix; got bytes: {bytes:?}",
1993        );
1994    }
1995}
1996
1997#[cfg(test)]
1998mod validate_histogram_log_target_tests {
1999    use super::*;
2000
2001    fn throttle_with_log_path(path: Option<std::path::PathBuf>) -> ThrottleConfig {
2002        ThrottleConfig {
2003            histogram_enabled: path.is_some(),
2004            histogram_log_path: path,
2005            ..Default::default()
2006        }
2007    }
2008
2009    #[test]
2010    fn no_log_path_is_ok() {
2011        let throttle = throttle_with_log_path(None);
2012        assert!(validate_histogram_log_target(&throttle, "rcp").is_ok());
2013    }
2014
2015    #[test]
2016    fn writable_resolved_path_is_ok() {
2017        let dir = tempfile::tempdir().unwrap();
2018        let throttle = throttle_with_log_path(Some(dir.path().join("foo.hdr")));
2019        assert!(validate_histogram_log_target(&throttle, "rcp").is_ok());
2020    }
2021
2022    #[test]
2023    fn resolved_path_existing_as_directory_is_rejected() {
2024        // Create a directory at the exact resolved path; OpenOptions::open
2025        // with create+truncate fails when target is a directory.
2026        let dir = tempfile::tempdir().unwrap();
2027        let blocker = dir.path().join("foo.rcp.hdr");
2028        std::fs::create_dir(&blocker).unwrap();
2029        let throttle = throttle_with_log_path(Some(dir.path().join("foo.hdr")));
2030        let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2031        assert!(
2032            err.contains("histogram-log") && err.contains("foo.rcp.hdr"),
2033            "got: {err}",
2034        );
2035    }
2036
2037    #[test]
2038    fn resolved_path_in_missing_parent_is_rejected() {
2039        let throttle = throttle_with_log_path(Some("/nonexistent-dir-67890/foo.hdr".into()));
2040        let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2041        assert!(err.contains("histogram-log"), "got: {err}");
2042    }
2043
2044    #[test]
2045    fn log_path_pointing_at_existing_directory_is_rejected() {
2046        let dir = tempfile::tempdir().unwrap();
2047        let throttle = throttle_with_log_path(Some(dir.path().to_path_buf()));
2048        let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2049        assert!(err.contains("directory"), "got: {err}");
2050    }
2051
2052    #[test]
2053    fn log_path_with_no_filename_is_rejected() {
2054        // PathBuf::from("/") has parent() == None and file_name() == None.
2055        let throttle = throttle_with_log_path(Some(std::path::PathBuf::from("/")));
2056        let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2057        assert!(
2058            err.contains("filename") || err.contains("directory"),
2059            "got: {err}",
2060        );
2061    }
2062
2063    #[test]
2064    #[cfg(unix)]
2065    fn resolved_path_existing_as_symlink_is_rejected() {
2066        // Defense against symlink-based hijacking: a local attacker who
2067        // can pre-create the predictable suffixed path as a symlink must
2068        // not be able to redirect the truncating open to a victim file.
2069        let dir = tempfile::tempdir().unwrap();
2070        // The resolved path will be `<dir>/foo.rcp.hdr`. Pre-create it as
2071        // a symlink pointing somewhere else (in this test, just to a
2072        // sibling file we don't care about).
2073        let target = dir.path().join("victim.txt");
2074        std::fs::write(&target, b"do not clobber").unwrap();
2075        let resolved_path = dir.path().join("foo.rcp.hdr");
2076        std::os::unix::fs::symlink(&target, &resolved_path).unwrap();
2077        let throttle = throttle_with_log_path(Some(dir.path().join("foo.hdr")));
2078        let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2079        assert!(
2080            err.contains("symlink") || err.contains("ELOOP") || err.contains("Too many levels"),
2081            "got: {err}",
2082        );
2083        // Victim file content is preserved (the truncating open never reached it).
2084        let preserved = std::fs::read(&target).unwrap();
2085        assert_eq!(preserved, b"do not clobber");
2086    }
2087}