common/
lib.rs

1//! Common utilities and types for RCP file operation tools
2//!
3//! This crate provides shared functionality used across all RCP tools (`rcp`, `rrm`, `rlink`, `rcmp`).
4//! It includes core operations (copy, remove, link, compare), progress reporting, metadata preservation, and runtime configuration.
5//!
6//! # Core Modules
7//!
8//! - [`mod@copy`] - File copying operations with metadata preservation and error handling
9//! - [`mod@rm`] - File removal operations
10//! - [`mod@link`] - Hard-linking operations
11//! - [`mod@cmp`] - File comparison operations (metadata-based)
12//! - [`mod@preserve`] - Metadata preservation settings and operations
13//! - [`mod@progress`] - Progress tracking and reporting
14//! - [`mod@filecmp`] - File metadata comparison utilities
15//! - [`mod@remote_tracing`] - Remote tracing support for distributed operations
16//!
17//! # Key Types
18//!
19//! ## `RcpdType`
20//!
21//! Identifies the role of a remote copy daemon:
22//! - `Source` - reads files from source host
23//! - `Destination` - writes files to destination host
24//!
25//! ## `ProgressType`
26//!
27//! Controls progress reporting display:
28//! - `Auto` - automatically choose based on terminal type
29//! - `ProgressBar` - animated progress bar (for interactive terminals)
30//! - `TextUpdates` - periodic text updates (for logging/non-interactive)
31//!
32//! # Progress Reporting
33//!
34//! The crate provides a global progress tracking system accessible via [`get_progress()`].
35//! Progress can be displayed in different formats depending on the execution context.
36//!
37//! Progress output goes to stderr, while logs go to stdout, allowing users to redirect logs to a file while still viewing interactive progress.
38//!
39//! # Runtime Configuration
40//!
41//! The [`run`] function provides a unified entry point for all RCP tools with support for:
42//! - Progress tracking and reporting
43//! - Logging configuration (quiet/verbose modes)
44//! - Resource limits (max workers, open files, throttling)
45//! - Tokio runtime setup
46//! - Remote tracing integration
47//!
48//! # Examples
49//!
50//! ## Basic Copy Operation
51//!
52//! ```rust,no_run
53//! use std::path::Path;
54//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
55//! let src = Path::new("/source");
56//! let dst = Path::new("/destination");
57//!
58//! let settings = common::copy::Settings {
59//!     dereference: false,
60//!     fail_early: false,
61//!     overwrite: false,
62//!     overwrite_compare: Default::default(),
63//!     chunk_size: 0,
64//!     remote_copy_buffer_size: 0,
65//! };
66//! let preserve = common::preserve::preserve_default();
67//!
68//! let summary = common::copy(src, dst, &settings, &preserve).await?;
69//! println!("Copied {} files", summary.files_copied);
70//! # Ok(())
71//! # }
72//! ```
73//!
74//! ## Metadata Comparison
75//!
76//! ```rust,no_run
77//! use std::path::Path;
78//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
79//! let src = Path::new("/path1");
80//! let dst = Path::new("/path2");
81//!
82//! let log = common::cmp::LogWriter::new(None).await?;
83//! let settings = common::cmp::Settings {
84//!     fail_early: false,
85//!     exit_early: false,
86//!     compare: Default::default(),
87//! };
88//!
89//! let summary = common::cmp(src, dst, &log, &settings).await?;
90//! println!("Comparison complete: {}", summary);
91//! # Ok(())
92//! # }
93//! ```
94
95#[macro_use]
96extern crate lazy_static;
97
98use crate::cmp::ObjType;
99use anyhow::anyhow;
100use anyhow::Context;
101use std::io::IsTerminal;
102use tracing::instrument;
103use tracing_subscriber::fmt::format::FmtSpan;
104use tracing_subscriber::prelude::*;
105
106pub mod cmp;
107pub mod config;
108pub mod copy;
109pub mod filegen;
110pub mod link;
111pub mod preserve;
112pub mod remote_tracing;
113pub mod rm;
114pub mod version;
115
116pub mod filecmp;
117pub mod progress;
118mod testutils;
119
120pub use config::{OutputConfig, RuntimeConfig, ThrottleConfig, TracingConfig};
121pub use progress::{RcpdProgressPrinter, SerializableProgress};
122
123// Define RcpdType in common since remote depends on common
124#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
125pub enum RcpdType {
126    Source,
127    Destination,
128}
129
130impl std::fmt::Display for RcpdType {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            RcpdType::Source => write!(f, "source"),
134            RcpdType::Destination => write!(f, "destination"),
135        }
136    }
137}
138
139// Type alias for progress snapshots
140pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
141
142/// runtime statistics collected from a process (CPU time, memory usage)
143#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
144pub struct RuntimeStats {
145    /// user-mode CPU time in milliseconds
146    pub cpu_time_user_ms: u64,
147    /// kernel-mode CPU time in milliseconds
148    pub cpu_time_kernel_ms: u64,
149    /// peak resident set size in bytes
150    pub peak_rss_bytes: u64,
151}
152
153/// runtime stats collected from remote rcpd processes for display at the end of a remote copy
154#[derive(Debug, Default)]
155pub struct RemoteRuntimeStats {
156    pub source_host: String,
157    pub source_stats: RuntimeStats,
158    pub dest_host: String,
159    pub dest_stats: RuntimeStats,
160}
161
162/// checks if a host string refers to the local machine.
163/// returns true for `localhost`, `127.0.0.1`, `::1`, `[::1]`, or the actual hostname
164#[must_use]
165pub fn is_localhost(host: &str) -> bool {
166    if host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]" {
167        return true;
168    }
169    // check against actual hostname using gethostname
170    let mut buf = [0u8; 256];
171    // Safety: gethostname writes to buf and returns 0 on success
172    let result = unsafe { libc::gethostname(buf.as_mut_ptr() as *mut libc::c_char, buf.len()) };
173    if result == 0 {
174        if let Ok(hostname_cstr) = std::ffi::CStr::from_bytes_until_nul(&buf) {
175            if let Ok(hostname) = hostname_cstr.to_str() {
176                if host == hostname {
177                    return true;
178                }
179            }
180        }
181    }
182    false
183}
184
185lazy_static! {
186    static ref PROGRESS: progress::Progress = progress::Progress::new();
187    static ref PBAR: indicatif::ProgressBar = indicatif::ProgressBar::new_spinner();
188    static ref REMOTE_RUNTIME_STATS: std::sync::Mutex<Option<RemoteRuntimeStats>> =
189        std::sync::Mutex::new(None);
190}
191
192#[must_use]
193pub fn get_progress() -> &'static progress::Progress {
194    &PROGRESS
195}
196
197/// stores remote runtime stats for display at the end of a remote copy operation
198pub fn set_remote_runtime_stats(stats: RemoteRuntimeStats) {
199    *REMOTE_RUNTIME_STATS.lock().unwrap() = Some(stats);
200}
201
202struct LocalTimeFormatter;
203
204impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFormatter {
205    fn format_time(
206        &self,
207        writer: &mut tracing_subscriber::fmt::format::Writer<'_>,
208    ) -> std::fmt::Result {
209        let now = chrono::Local::now();
210        writer.write_str(&now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
211    }
212}
213
214struct ProgressTracker {
215    lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
216    pbar_thread: Option<std::thread::JoinHandle<()>>,
217}
218
219#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
220pub enum ProgressType {
221    #[default]
222    #[value(name = "auto", alias = "Auto")]
223    Auto,
224    #[value(name = "ProgressBar", alias = "progress-bar")]
225    ProgressBar,
226    #[value(name = "TextUpdates", alias = "text-updates")]
227    TextUpdates,
228}
229
230pub enum GeneralProgressType {
231    User(ProgressType),
232    Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
233    RemoteMaster {
234        progress_type: ProgressType,
235        get_progress_snapshot:
236            Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>,
237    },
238}
239
240impl std::fmt::Debug for GeneralProgressType {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        match self {
243            GeneralProgressType::User(pt) => write!(f, "User({pt:?})"),
244            GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
245            GeneralProgressType::RemoteMaster { progress_type, .. } => {
246                write!(
247                    f,
248                    "RemoteMaster(progress_type: {progress_type:?}, <function>)"
249                )
250            }
251        }
252    }
253}
254
255#[derive(Debug)]
256pub struct ProgressSettings {
257    pub progress_type: GeneralProgressType,
258    pub progress_delay: Option<String>,
259}
260
261fn progress_bar(
262    lock: &std::sync::Mutex<bool>,
263    cvar: &std::sync::Condvar,
264    delay_opt: &Option<std::time::Duration>,
265) {
266    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
267    PBAR.set_style(
268        indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
269            .unwrap()
270            .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
271    );
272    let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
273    let mut is_done = lock.lock().unwrap();
274    loop {
275        PBAR.set_position(PBAR.position() + 1); // do we need to update?
276        PBAR.set_message(prog_printer.print().unwrap());
277        let result = cvar.wait_timeout(is_done, delay).unwrap();
278        is_done = result.0;
279        if *is_done {
280            break;
281        }
282    }
283    PBAR.finish_and_clear();
284}
285
286fn get_datetime_prefix() -> String {
287    chrono::Local::now()
288        .format("%Y-%m-%dT%H:%M:%S%.3f%:z")
289        .to_string()
290}
291
292fn text_updates(
293    lock: &std::sync::Mutex<bool>,
294    cvar: &std::sync::Condvar,
295    delay_opt: &Option<std::time::Duration>,
296) {
297    let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
298    let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
299    let mut is_done = lock.lock().unwrap();
300    loop {
301        eprintln!("=======================");
302        eprintln!(
303            "{}\n--{}",
304            get_datetime_prefix(),
305            prog_printer.print().unwrap()
306        );
307        let result = cvar.wait_timeout(is_done, delay).unwrap();
308        is_done = result.0;
309        if *is_done {
310            break;
311        }
312    }
313}
314
315fn rcpd_updates(
316    lock: &std::sync::Mutex<bool>,
317    cvar: &std::sync::Condvar,
318    delay_opt: &Option<std::time::Duration>,
319    sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
320) {
321    tracing::debug!("Starting rcpd progress updates");
322    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
323    let mut is_done = lock.lock().unwrap();
324    loop {
325        if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
326            // channel closed, receiver is done
327            tracing::debug!("Progress update channel closed, stopping progress updates");
328            break;
329        }
330        let result = cvar.wait_timeout(is_done, delay).unwrap();
331        is_done = result.0;
332        if *is_done {
333            break;
334        }
335    }
336}
337
338fn remote_master_updates<F>(
339    lock: &std::sync::Mutex<bool>,
340    cvar: &std::sync::Condvar,
341    delay_opt: &Option<std::time::Duration>,
342    get_progress_snapshot: F,
343    progress_type: ProgressType,
344) where
345    F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
346{
347    let interactive = match progress_type {
348        ProgressType::Auto => std::io::stderr().is_terminal(),
349        ProgressType::ProgressBar => true,
350        ProgressType::TextUpdates => false,
351    };
352    if interactive {
353        PBAR.set_style(
354            indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
355                .unwrap()
356                .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
357        );
358        let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
359        let mut printer = RcpdProgressPrinter::new();
360        let mut is_done = lock.lock().unwrap();
361        loop {
362            let progress_map = get_progress_snapshot();
363            let source_progress = &progress_map[RcpdType::Source];
364            let destination_progress = &progress_map[RcpdType::Destination];
365            PBAR.set_position(PBAR.position() + 1); // do we need to update?
366            PBAR.set_message(
367                printer
368                    .print(source_progress, destination_progress)
369                    .unwrap(),
370            );
371            let result = cvar.wait_timeout(is_done, delay).unwrap();
372            is_done = result.0;
373            if *is_done {
374                break;
375            }
376        }
377        PBAR.finish_and_clear();
378    } else {
379        let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
380        let mut printer = RcpdProgressPrinter::new();
381        let mut is_done = lock.lock().unwrap();
382        loop {
383            let progress_map = get_progress_snapshot();
384            let source_progress = &progress_map[RcpdType::Source];
385            let destination_progress = &progress_map[RcpdType::Destination];
386            eprintln!("=======================");
387            eprintln!(
388                "{}\n--{}",
389                get_datetime_prefix(),
390                printer
391                    .print(source_progress, destination_progress)
392                    .unwrap()
393            );
394            let result = cvar.wait_timeout(is_done, delay).unwrap();
395            is_done = result.0;
396            if *is_done {
397                break;
398            }
399        }
400    }
401}
402
403impl ProgressTracker {
404    pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
405        let lock_cvar =
406            std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
407        let lock_cvar_clone = lock_cvar.clone();
408        let pbar_thread = std::thread::spawn(move || {
409            let (lock, cvar) = &*lock_cvar_clone;
410            match progress_type {
411                GeneralProgressType::Remote(sender) => {
412                    rcpd_updates(lock, cvar, &delay_opt, sender);
413                }
414                GeneralProgressType::RemoteMaster {
415                    progress_type,
416                    get_progress_snapshot,
417                } => {
418                    remote_master_updates(
419                        lock,
420                        cvar,
421                        &delay_opt,
422                        get_progress_snapshot,
423                        progress_type,
424                    );
425                }
426                _ => {
427                    let interactive = match progress_type {
428                        GeneralProgressType::User(ProgressType::Auto) => {
429                            std::io::stderr().is_terminal()
430                        }
431                        GeneralProgressType::User(ProgressType::ProgressBar) => true,
432                        GeneralProgressType::User(ProgressType::TextUpdates) => false,
433                        GeneralProgressType::Remote(_)
434                        | GeneralProgressType::RemoteMaster { .. } => {
435                            unreachable!("Invalid progress type: {progress_type:?}")
436                        }
437                    };
438                    if interactive {
439                        progress_bar(lock, cvar, &delay_opt);
440                    } else {
441                        text_updates(lock, cvar, &delay_opt);
442                    }
443                }
444            }
445        });
446        Self {
447            lock_cvar,
448            pbar_thread: Some(pbar_thread),
449        }
450    }
451}
452
453impl Drop for ProgressTracker {
454    fn drop(&mut self) {
455        let (lock, cvar) = &*self.lock_cvar;
456        let mut is_done = lock.lock().unwrap();
457        *is_done = true;
458        cvar.notify_one();
459        drop(is_done);
460        if let Some(pbar_thread) = self.pbar_thread.take() {
461            pbar_thread.join().unwrap();
462        }
463    }
464}
465
466pub fn parse_metadata_cmp_settings(
467    settings: &str,
468) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
469    let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
470    for setting in settings.split(',') {
471        match setting {
472            "uid" => metadata_cmp_settings.uid = true,
473            "gid" => metadata_cmp_settings.gid = true,
474            "mode" => metadata_cmp_settings.mode = true,
475            "size" => metadata_cmp_settings.size = true,
476            "mtime" => metadata_cmp_settings.mtime = true,
477            "ctime" => metadata_cmp_settings.ctime = true,
478            _ => {
479                return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
480            }
481        }
482    }
483    Ok(metadata_cmp_settings)
484}
485
486fn parse_type_settings(
487    settings: &str,
488) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
489    let mut user_and_time = preserve::UserAndTimeSettings::default();
490    let mut mode_mask = None;
491    for setting in settings.split(',') {
492        match setting {
493            "uid" => user_and_time.uid = true,
494            "gid" => user_and_time.gid = true,
495            "time" => user_and_time.time = true,
496            _ => {
497                if let Ok(mask) = u32::from_str_radix(setting, 8) {
498                    mode_mask = Some(mask);
499                } else {
500                    return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
501                }
502            }
503        }
504    }
505    Ok((user_and_time, mode_mask))
506}
507
508pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
509    let mut preserve_settings = preserve::Settings::default();
510    for type_settings in settings.split(' ') {
511        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
512            let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
513                format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
514            )?;
515            match obj_type {
516                "f" | "file" => {
517                    preserve_settings.file = preserve::FileSettings::default();
518                    preserve_settings.file.user_and_time = user_and_time_settings;
519                    if let Some(mode) = mode_opt {
520                        preserve_settings.file.mode_mask = mode;
521                    }
522                }
523                "d" | "dir" | "directory" => {
524                    preserve_settings.dir = preserve::DirSettings::default();
525                    preserve_settings.dir.user_and_time = user_and_time_settings;
526                    if let Some(mode) = mode_opt {
527                        preserve_settings.dir.mode_mask = mode;
528                    }
529                }
530                "l" | "link" | "symlink" => {
531                    preserve_settings.symlink = preserve::SymlinkSettings::default();
532                    preserve_settings.symlink.user_and_time = user_and_time_settings;
533                }
534                _ => {
535                    return Err(anyhow!("Unknown object type: {}", obj_type));
536                }
537            }
538        } else {
539            return Err(anyhow!("Invalid preserve settings: {}", settings));
540        }
541    }
542    Ok(preserve_settings)
543}
544
545pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
546    let mut cmp_settings = cmp::ObjSettings::default();
547    for type_settings in settings.split(' ') {
548        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
549            let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
550                "parsing preserve settings: {obj_settings}, type: {obj_type}"
551            ))?;
552            let obj_type = match obj_type {
553                "f" | "file" => ObjType::File,
554                "d" | "dir" | "directory" => ObjType::Dir,
555                "l" | "link" | "symlink" => ObjType::Symlink,
556                "o" | "other" => ObjType::Other,
557                _ => {
558                    return Err(anyhow!("Unknown obj type: {}", obj_type));
559                }
560            };
561            cmp_settings[obj_type] = obj_cmp_settings;
562        } else {
563            return Err(anyhow!("Invalid preserve settings: {}", settings));
564        }
565    }
566    Ok(cmp_settings)
567}
568
569pub async fn cmp(
570    src: &std::path::Path,
571    dst: &std::path::Path,
572    log: &cmp::LogWriter,
573    settings: &cmp::Settings,
574) -> Result<cmp::Summary, anyhow::Error> {
575    cmp::cmp(&PROGRESS, src, dst, log, settings).await
576}
577
578pub async fn copy(
579    src: &std::path::Path,
580    dst: &std::path::Path,
581    settings: &copy::Settings,
582    preserve: &preserve::Settings,
583) -> Result<copy::Summary, copy::Error> {
584    copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
585}
586
587pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
588    rm::rm(&PROGRESS, path, settings).await
589}
590
591pub async fn link(
592    src: &std::path::Path,
593    dst: &std::path::Path,
594    update: &Option<std::path::PathBuf>,
595    settings: &link::Settings,
596) -> Result<link::Summary, link::Error> {
597    let cwd = std::env::current_dir()
598        .with_context(|| "failed to get current working directory")
599        .map_err(|err| link::Error::new(err, link::Summary::default()))?;
600    link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
601}
602
603fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
604    match std::env::var(name) {
605        Ok(val) => match val.parse() {
606            Ok(val) => val,
607            Err(_) => default,
608        },
609        Err(_) => default,
610    }
611}
612
613/// collects runtime statistics (CPU time, memory) for the current process
614#[must_use]
615pub fn collect_runtime_stats() -> RuntimeStats {
616    collect_runtime_stats_inner(procfs::process::Process::myself().ok())
617}
618
619fn collect_runtime_stats_inner(process: Option<procfs::process::Process>) -> RuntimeStats {
620    let Some(process) = process else {
621        return RuntimeStats::default();
622    };
623    collect_runtime_stats_for_process(&process).unwrap_or_default()
624}
625
626fn collect_runtime_stats_for_process(
627    process: &procfs::process::Process,
628) -> anyhow::Result<RuntimeStats> {
629    let stat = process.stat()?;
630    let clock_ticks = procfs::ticks_per_second() as f64;
631    // vmhwm from /proc/[pid]/status is in kB, convert to bytes
632    let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
633    Ok(RuntimeStats {
634        cpu_time_user_ms: ((stat.utime as f64 / clock_ticks) * 1000.0) as u64,
635        cpu_time_kernel_ms: ((stat.stime as f64 / clock_ticks) * 1000.0) as u64,
636        peak_rss_bytes: vmhwm_kb * 1024,
637    })
638}
639
640fn print_runtime_stats_for_role(prefix: &str, stats: &RuntimeStats) {
641    let cpu_total =
642        std::time::Duration::from_millis(stats.cpu_time_user_ms + stats.cpu_time_kernel_ms);
643    let cpu_kernel = std::time::Duration::from_millis(stats.cpu_time_kernel_ms);
644    let cpu_user = std::time::Duration::from_millis(stats.cpu_time_user_ms);
645    println!(
646        "{prefix}cpu time : {:.2?} | k: {:.2?} | u: {:.2?}",
647        cpu_total, cpu_kernel, cpu_user
648    );
649    println!(
650        "{prefix}peak RSS : {}",
651        bytesize::ByteSize(stats.peak_rss_bytes)
652    );
653}
654
655#[rustfmt::skip]
656fn print_runtime_stats() -> Result<(), anyhow::Error> {
657    // check if we have remote runtime stats (from a remote copy operation)
658    let remote_stats = REMOTE_RUNTIME_STATS.lock().unwrap().take();
659    if let Some(remote) = remote_stats {
660        // print global walltime first
661        println!("walltime : {:.2?}", &PROGRESS.get_duration());
662        println!();
663        let source_is_local = is_localhost(&remote.source_host);
664        let dest_is_local = is_localhost(&remote.dest_host);
665        // collect master stats
666        let master_stats = collect_runtime_stats();
667        // print non-localhost roles first
668        if !source_is_local {
669            println!("SOURCE ({}):", remote.source_host);
670            print_runtime_stats_for_role("  ", &remote.source_stats);
671            println!();
672        }
673        if !dest_is_local {
674            println!("DESTINATION ({}):", remote.dest_host);
675            print_runtime_stats_for_role("  ", &remote.dest_stats);
676            println!();
677        }
678        // print combined localhost section
679        match (source_is_local, dest_is_local) {
680            (true, true) => {
681                println!("MASTER + SOURCE + DESTINATION (localhost):");
682                print_runtime_stats_for_role("  master ", &master_stats);
683                print_runtime_stats_for_role("  source ", &remote.source_stats);
684                print_runtime_stats_for_role("  dest   ", &remote.dest_stats);
685            }
686            (true, false) => {
687                println!("MASTER + SOURCE (localhost):");
688                print_runtime_stats_for_role("  master ", &master_stats);
689                print_runtime_stats_for_role("  source ", &remote.source_stats);
690            }
691            (false, true) => {
692                println!("MASTER + DESTINATION (localhost):");
693                print_runtime_stats_for_role("  master ", &master_stats);
694                print_runtime_stats_for_role("  dest   ", &remote.dest_stats);
695            }
696            (false, false) => {
697                println!("MASTER (localhost):");
698                print_runtime_stats_for_role("  ", &master_stats);
699            }
700        }
701        return Ok(());
702    }
703    // local operation - print stats for this process only
704    let process = procfs::process::Process::myself()?;
705    let stat = process.stat()?;
706    // The time is in clock ticks, so we need to convert it to seconds
707    let clock_ticks_per_second = procfs::ticks_per_second();
708    let ticks_to_duration = |ticks: u64| {
709        std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
710    };
711    // vmhwm from /proc/[pid]/status is in kB, convert to bytes
712    let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
713    println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
714    println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
715    println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm_kb * 1024));
716    Ok(())
717}
718
719fn get_max_open_files() -> Result<u64, std::io::Error> {
720    let mut rlim = libc::rlimit {
721        rlim_cur: 0,
722        rlim_max: 0,
723    };
724    // Safety: we pass a valid "rlim" pointer and the result is checked
725    let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
726    if result == 0 {
727        Ok(rlim.rlim_cur)
728    } else {
729        Err(std::io::Error::last_os_error())
730    }
731}
732
733struct ProgWriter {}
734
735impl ProgWriter {
736    fn new() -> Self {
737        Self {}
738    }
739}
740
741impl std::io::Write for ProgWriter {
742    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
743        PBAR.suspend(|| std::io::stdout().write(buf))
744    }
745    fn flush(&mut self) -> std::io::Result<()> {
746        std::io::stdout().flush()
747    }
748}
749
750fn get_hostname() -> String {
751    nix::unistd::gethostname()
752        .ok()
753        .and_then(|os_str| os_str.into_string().ok())
754        .unwrap_or_else(|| "unknown".to_string())
755}
756
757#[must_use]
758pub fn generate_debug_log_filename(prefix: &str) -> String {
759    let now = chrono::Utc::now();
760    let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
761    let process_id = std::process::id();
762    format!("{prefix}-{timestamp}-{process_id}")
763}
764
765/// Generate a trace filename with identifier, hostname, PID, and timestamp.
766///
767/// `identifier` should be "rcp", "rcpd-source", or "rcpd-destination"
768#[must_use]
769pub fn generate_trace_filename(prefix: &str, identifier: &str, extension: &str) -> String {
770    let hostname = get_hostname();
771    let pid = std::process::id();
772    let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S");
773    format!("{prefix}-{identifier}-{hostname}-{pid}-{timestamp}.{extension}")
774}
775
776#[instrument(skip(func))] // "func" is not Debug printable
777pub fn run<Fut, Summary, Error>(
778    progress: Option<ProgressSettings>,
779    output: OutputConfig,
780    runtime: RuntimeConfig,
781    throttle: ThrottleConfig,
782    tracing_config: TracingConfig,
783    func: impl FnOnce() -> Fut,
784) -> Option<Summary>
785// we return an Option rather than a Result to indicate that callers of this function should NOT print the error
786where
787    Summary: std::fmt::Display,
788    Error: std::fmt::Display + std::fmt::Debug,
789    Fut: std::future::Future<Output = Result<Summary, Error>>,
790{
791    // force initialization of PROGRESS to set start_time at the beginning of the run
792    // (for remote master operations, PROGRESS is otherwise only accessed at the end in
793    // print_runtime_stats(), leading to near-zero walltime)
794    let _ = get_progress();
795    // validate configuration
796    if let Err(e) = throttle.validate() {
797        eprintln!("Configuration error: {e}");
798        return None;
799    }
800    // unpack configs for internal use
801    let OutputConfig {
802        quiet,
803        verbose,
804        print_summary,
805    } = output;
806    let RuntimeConfig {
807        max_workers,
808        max_blocking_threads,
809    } = runtime;
810    let ThrottleConfig {
811        max_open_files,
812        ops_throttle,
813        iops_throttle,
814        chunk_size: _,
815    } = throttle;
816    let TracingConfig {
817        remote_layer: remote_tracing_layer,
818        debug_log_file,
819        chrome_trace_prefix,
820        flamegraph_prefix,
821        trace_identifier,
822        profile_level,
823        tokio_console,
824        tokio_console_port,
825    } = tracing_config;
826    // guards must be kept alive for the duration of the run to ensure traces are flushed
827    let mut _chrome_guard: Option<tracing_chrome::FlushGuard> = None;
828    let mut _flame_guard: Option<tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>> =
829        None;
830    if quiet {
831        assert!(
832            verbose == 0,
833            "Quiet mode and verbose mode are mutually exclusive"
834        );
835    } else {
836        // helper to create the verbose-level filter consistently
837        let make_env_filter = || {
838            let level_directive = match verbose {
839                0 => "error".parse().unwrap(),
840                1 => "info".parse().unwrap(),
841                2 => "debug".parse().unwrap(),
842                _ => "trace".parse().unwrap(),
843            };
844            // filter out noisy dependencies - they're extremely verbose at DEBUG/TRACE level
845            // and not useful for debugging rcp
846            tracing_subscriber::EnvFilter::from_default_env()
847                .add_directive(level_directive)
848                .add_directive("tokio=info".parse().unwrap())
849                .add_directive("runtime=info".parse().unwrap())
850                .add_directive("quinn=warn".parse().unwrap())
851                .add_directive("rustls=warn".parse().unwrap())
852                .add_directive("h2=warn".parse().unwrap())
853        };
854        let file_layer = if let Some(ref log_file_path) = debug_log_file {
855            let file = std::fs::OpenOptions::new()
856                .create(true)
857                .append(true)
858                .open(log_file_path)
859                .unwrap_or_else(|e| {
860                    panic!("Failed to create debug log file at '{log_file_path}': {e}")
861                });
862            let file_layer = tracing_subscriber::fmt::layer()
863                .with_target(true)
864                .with_line_number(true)
865                .with_thread_ids(true)
866                .with_timer(LocalTimeFormatter)
867                .with_ansi(false)
868                .with_writer(file)
869                .with_filter(make_env_filter());
870            Some(file_layer)
871        } else {
872            None
873        };
874        // fmt_layer for local console output (when not using remote tracing)
875        let fmt_layer = if remote_tracing_layer.is_some() {
876            None
877        } else {
878            let fmt_layer = tracing_subscriber::fmt::layer()
879                .with_target(true)
880                .with_line_number(true)
881                .with_span_events(if verbose > 2 {
882                    FmtSpan::NEW | FmtSpan::CLOSE
883                } else {
884                    FmtSpan::NONE
885                })
886                .with_timer(LocalTimeFormatter)
887                .pretty()
888                .with_writer(ProgWriter::new)
889                .with_filter(make_env_filter());
890            Some(fmt_layer)
891        };
892        // apply env_filter to remote_tracing_layer so it respects verbose level
893        let remote_tracing_layer =
894            remote_tracing_layer.map(|layer| layer.with_filter(make_env_filter()));
895        let console_layer = if tokio_console {
896            let console_port = tokio_console_port.unwrap_or(6669);
897            let retention_seconds: u64 =
898                read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
899            eprintln!("Tokio console server listening on 127.0.0.1:{console_port}");
900            let console_layer = console_subscriber::ConsoleLayer::builder()
901                .retention(std::time::Duration::from_secs(retention_seconds))
902                .server_addr(([127, 0, 0, 1], console_port))
903                .spawn();
904            Some(console_layer)
905        } else {
906            None
907        };
908        // build profile filter for chrome/flame layers
909        // uses EnvFilter to capture spans from our crates at the specified level
910        // while excluding noisy dependencies like tokio, quinn, h2, etc.
911        let profiling_enabled = chrome_trace_prefix.is_some() || flamegraph_prefix.is_some();
912        let profile_filter_str = if profiling_enabled {
913            let level_str = profile_level.as_deref().unwrap_or("trace");
914            // validate level is a known tracing level
915            let valid_levels = ["trace", "debug", "info", "warn", "error", "off"];
916            if !valid_levels.contains(&level_str.to_lowercase().as_str()) {
917                eprintln!(
918                    "Invalid --profile-level '{}'. Valid values: trace, debug, info, warn, error, off",
919                    level_str
920                );
921                std::process::exit(1);
922            }
923            // exclude noisy deps, include everything else at the profile level
924            Some(format!(
925                "tokio=off,quinn=off,h2=off,hyper=off,rustls=off,{}",
926                level_str
927            ))
928        } else {
929            None
930        };
931        // helper to create profile filter (already validated above)
932        let make_profile_filter =
933            || tracing_subscriber::EnvFilter::new(profile_filter_str.as_ref().unwrap());
934        // chrome tracing layer (produces JSON viewable in Perfetto UI)
935        let chrome_layer = if let Some(ref prefix) = chrome_trace_prefix {
936            let filename = generate_trace_filename(prefix, &trace_identifier, "json");
937            eprintln!("Chrome trace will be written to: {filename}");
938            let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
939                .file(&filename)
940                .include_args(true)
941                .build();
942            _chrome_guard = Some(guard);
943            Some(layer.with_filter(make_profile_filter()))
944        } else {
945            None
946        };
947        // flamegraph layer (produces folded stacks for inferno)
948        let flame_layer = if let Some(ref prefix) = flamegraph_prefix {
949            let filename = generate_trace_filename(prefix, &trace_identifier, "folded");
950            eprintln!("Flamegraph data will be written to: {filename}");
951            match tracing_flame::FlameLayer::with_file(&filename) {
952                Ok((layer, guard)) => {
953                    _flame_guard = Some(guard);
954                    Some(layer.with_filter(make_profile_filter()))
955                }
956                Err(e) => {
957                    eprintln!("Failed to create flamegraph layer: {e}");
958                    None
959                }
960            }
961        } else {
962            None
963        };
964        tracing_subscriber::registry()
965            .with(file_layer)
966            .with(fmt_layer)
967            .with(remote_tracing_layer)
968            .with(console_layer)
969            .with(chrome_layer)
970            .with(flame_layer)
971            .init();
972    }
973    let mut builder = tokio::runtime::Builder::new_multi_thread();
974    builder.enable_all();
975    if max_workers > 0 {
976        builder.worker_threads(max_workers);
977    }
978    if max_blocking_threads > 0 {
979        builder.max_blocking_threads(max_blocking_threads);
980    }
981    if !sysinfo::set_open_files_limit(isize::MAX) {
982        tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
983    }
984    let set_max_open_files = max_open_files.unwrap_or_else(|| {
985        let limit = get_max_open_files().expect(
986            "We failed to query rlimit, if this is expected try specifying --max-open-files",
987        ) as usize;
988        80 * limit / 100 // ~80% of the max open files limit
989    });
990    if set_max_open_files > 0 {
991        tracing::info!("Setting max open files to: {}", set_max_open_files);
992        throttle::set_max_open_files(set_max_open_files);
993    } else {
994        tracing::info!("Not applying any limit to max open files!");
995    }
996    let runtime = builder.build().expect("Failed to create runtime");
997    fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
998        let mut replenish = replenish;
999        let mut interval = std::time::Duration::from_secs(1);
1000        while replenish > 100 && interval > std::time::Duration::from_millis(1) {
1001            replenish /= 10;
1002            interval /= 10;
1003        }
1004        (replenish, interval)
1005    }
1006    if ops_throttle > 0 {
1007        let (replenish, interval) = get_replenish_interval(ops_throttle);
1008        throttle::init_ops_tokens(replenish);
1009        runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
1010    }
1011    if iops_throttle > 0 {
1012        let (replenish, interval) = get_replenish_interval(iops_throttle);
1013        throttle::init_iops_tokens(replenish);
1014        runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
1015    }
1016    let res = {
1017        let _progress = progress.map(|settings| {
1018            tracing::debug!("Requesting progress updates {settings:?}");
1019            let delay = settings.progress_delay.map(|delay_str| {
1020                humantime::parse_duration(&delay_str)
1021                    .expect("Couldn't parse duration out of --progress-delay")
1022            });
1023            ProgressTracker::new(settings.progress_type, delay)
1024        });
1025        runtime.block_on(func())
1026    };
1027    match &res {
1028        Ok(summary) => {
1029            if print_summary || verbose > 0 {
1030                println!("{summary}");
1031            }
1032        }
1033        Err(err) => {
1034            if !quiet {
1035                println!("{err:?}");
1036            }
1037        }
1038    }
1039    if print_summary || verbose > 0 {
1040        if let Err(err) = print_runtime_stats() {
1041            println!("Failed to print runtime stats: {err:?}");
1042        }
1043    }
1044    res.ok()
1045}
1046
1047#[cfg(test)]
1048mod runtime_stats_tests {
1049    use super::*;
1050    use anyhow::Result;
1051
1052    #[test]
1053    fn collect_runtime_stats_matches_procfs_snapshot() -> Result<()> {
1054        let process = procfs::process::Process::myself()?;
1055        let expected = collect_runtime_stats_for_process(&process)?;
1056        let actual = collect_runtime_stats();
1057        let cpu_tolerance_ms = 50;
1058        let rss_tolerance_bytes = 1_000_000;
1059        assert!(
1060            expected.cpu_time_user_ms.abs_diff(actual.cpu_time_user_ms) <= cpu_tolerance_ms,
1061            "user CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1062            expected.cpu_time_user_ms,
1063            actual.cpu_time_user_ms
1064        );
1065        assert!(
1066            expected
1067                .cpu_time_kernel_ms
1068                .abs_diff(actual.cpu_time_kernel_ms)
1069                <= cpu_tolerance_ms,
1070            "kernel CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1071            expected.cpu_time_kernel_ms,
1072            actual.cpu_time_kernel_ms
1073        );
1074        assert!(
1075            expected.peak_rss_bytes.abs_diff(actual.peak_rss_bytes) <= rss_tolerance_bytes,
1076            "peak RSS deviated by more than {rss_tolerance_bytes} bytes: expected {}, got {}",
1077            expected.peak_rss_bytes,
1078            actual.peak_rss_bytes
1079        );
1080        Ok(())
1081    }
1082
1083    #[test]
1084    fn collect_runtime_stats_returns_default_on_error() {
1085        let stats = collect_runtime_stats_inner(None);
1086        assert_eq!(stats, RuntimeStats::default());
1087
1088        let nonexistent_process = procfs::process::Process::new(i32::MAX).ok();
1089        let stats = collect_runtime_stats_inner(nonexistent_process);
1090        assert_eq!(stats, RuntimeStats::default());
1091    }
1092}