common/
lib.rs

1//! Common utilities and types for RCP file operation tools
2//!
3//! This crate provides shared functionality used across all RCP tools (`rcp`, `rrm`, `rlink`, `rcmp`).
4//! It includes core operations (copy, remove, link, compare), progress reporting, metadata preservation, and runtime configuration.
5//!
6//! # Core Modules
7//!
8//! - [`mod@copy`] - File copying operations with metadata preservation and error handling
9//! - [`mod@rm`] - File removal operations
10//! - [`mod@link`] - Hard-linking operations
11//! - [`mod@cmp`] - File comparison operations (metadata-based)
12//! - [`mod@preserve`] - Metadata preservation settings and operations
13//! - [`mod@progress`] - Progress tracking and reporting
14//! - [`mod@filecmp`] - File metadata comparison utilities
15//! - [`mod@remote_tracing`] - Remote tracing support for distributed operations
16//!
17//! # Key Types
18//!
19//! ## `RcpdType`
20//!
21//! Identifies the role of a remote copy daemon:
22//! - `Source` - reads files from source host
23//! - `Destination` - writes files to destination host
24//!
25//! ## `ProgressType`
26//!
27//! Controls progress reporting display:
28//! - `Auto` - automatically choose based on terminal type
29//! - `ProgressBar` - animated progress bar (for interactive terminals)
30//! - `TextUpdates` - periodic text updates (for logging/non-interactive)
31//!
32//! # Progress Reporting
33//!
34//! The crate provides a global progress tracking system accessible via [`get_progress()`].
35//! Progress can be displayed in different formats depending on the execution context.
36//!
37//! Progress output goes to stderr, while logs go to stdout, allowing users to redirect logs to a file while still viewing interactive progress.
38//!
39//! # Runtime Configuration
40//!
41//! The [`run`] function provides a unified entry point for all RCP tools with support for:
42//! - Progress tracking and reporting
43//! - Logging configuration (quiet/verbose modes)
44//! - Resource limits (max workers, open files, throttling)
45//! - Tokio runtime setup
46//! - Remote tracing integration
47//!
48//! # Examples
49//!
50//! ## Basic Copy Operation
51//!
52//! ```rust,no_run
53//! use std::path::Path;
54//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
55//! let src = Path::new("/source");
56//! let dst = Path::new("/destination");
57//!
58//! let settings = common::copy::Settings {
59//!     dereference: false,
60//!     fail_early: false,
61//!     overwrite: false,
62//!     overwrite_compare: Default::default(),
63//!     chunk_size: 0,
64//!     remote_copy_buffer_size: 0,
65//! };
66//! let preserve = common::preserve::preserve_default();
67//!
68//! let summary = common::copy(src, dst, &settings, &preserve).await?;
69//! println!("Copied {} files", summary.files_copied);
70//! # Ok(())
71//! # }
72//! ```
73//!
74//! ## Metadata Comparison
75//!
76//! ```rust,no_run
77//! use std::path::Path;
78//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
79//! let src = Path::new("/path1");
80//! let dst = Path::new("/path2");
81//!
82//! // output differences to stdout (use false for quiet mode)
83//! let log = common::cmp::LogWriter::new(None, true).await?;
84//! let settings = common::cmp::Settings {
85//!     fail_early: false,
86//!     exit_early: false,
87//!     compare: Default::default(),
88//! };
89//!
90//! let summary = common::cmp(src, dst, &log, &settings).await?;
91//! println!("Comparison complete: {}", summary);
92//! # Ok(())
93//! # }
94//! ```
95
96#[macro_use]
97extern crate lazy_static;
98
99use crate::cmp::ObjType;
100use anyhow::anyhow;
101use anyhow::Context;
102use std::io::IsTerminal;
103use tracing::instrument;
104use tracing_subscriber::fmt::format::FmtSpan;
105use tracing_subscriber::prelude::*;
106
107pub mod cmp;
108pub mod config;
109pub mod copy;
110pub mod filegen;
111pub mod link;
112pub mod preserve;
113pub mod remote_tracing;
114pub mod rm;
115pub mod version;
116
117pub mod filecmp;
118pub mod progress;
119mod testutils;
120
121pub use config::{OutputConfig, RuntimeConfig, ThrottleConfig, TracingConfig};
122pub use progress::{RcpdProgressPrinter, SerializableProgress};
123
124// Define RcpdType in common since remote depends on common
125#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
126pub enum RcpdType {
127    Source,
128    Destination,
129}
130
131impl std::fmt::Display for RcpdType {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        match self {
134            RcpdType::Source => write!(f, "source"),
135            RcpdType::Destination => write!(f, "destination"),
136        }
137    }
138}
139
140// Type alias for progress snapshots
141pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
142
143/// runtime statistics collected from a process (CPU time, memory usage)
144#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
145pub struct RuntimeStats {
146    /// user-mode CPU time in milliseconds
147    pub cpu_time_user_ms: u64,
148    /// kernel-mode CPU time in milliseconds
149    pub cpu_time_kernel_ms: u64,
150    /// peak resident set size in bytes
151    pub peak_rss_bytes: u64,
152}
153
154/// runtime stats collected from remote rcpd processes for display at the end of a remote copy
155#[derive(Debug, Default)]
156pub struct RemoteRuntimeStats {
157    pub source_host: String,
158    pub source_stats: RuntimeStats,
159    pub dest_host: String,
160    pub dest_stats: RuntimeStats,
161}
162
163/// checks if a host string refers to the local machine.
164/// returns true for `localhost`, `127.0.0.1`, `::1`, `[::1]`, or the actual hostname
165#[must_use]
166pub fn is_localhost(host: &str) -> bool {
167    if host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]" {
168        return true;
169    }
170    // check against actual hostname using gethostname
171    let mut buf = [0u8; 256];
172    // Safety: gethostname writes to buf and returns 0 on success
173    let result = unsafe { libc::gethostname(buf.as_mut_ptr() as *mut libc::c_char, buf.len()) };
174    if result == 0 {
175        if let Ok(hostname_cstr) = std::ffi::CStr::from_bytes_until_nul(&buf) {
176            if let Ok(hostname) = hostname_cstr.to_str() {
177                if host == hostname {
178                    return true;
179                }
180            }
181        }
182    }
183    false
184}
185
186lazy_static! {
187    static ref PROGRESS: progress::Progress = progress::Progress::new();
188    static ref PBAR: indicatif::ProgressBar = indicatif::ProgressBar::new_spinner();
189    static ref REMOTE_RUNTIME_STATS: std::sync::Mutex<Option<RemoteRuntimeStats>> =
190        std::sync::Mutex::new(None);
191}
192
193#[must_use]
194pub fn get_progress() -> &'static progress::Progress {
195    &PROGRESS
196}
197
198/// stores remote runtime stats for display at the end of a remote copy operation
199pub fn set_remote_runtime_stats(stats: RemoteRuntimeStats) {
200    *REMOTE_RUNTIME_STATS.lock().unwrap() = Some(stats);
201}
202
203struct LocalTimeFormatter;
204
205impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFormatter {
206    fn format_time(
207        &self,
208        writer: &mut tracing_subscriber::fmt::format::Writer<'_>,
209    ) -> std::fmt::Result {
210        let now = chrono::Local::now();
211        writer.write_str(&now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
212    }
213}
214
215struct ProgressTracker {
216    lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
217    pbar_thread: Option<std::thread::JoinHandle<()>>,
218}
219
220#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
221pub enum ProgressType {
222    #[default]
223    #[value(name = "auto", alias = "Auto")]
224    Auto,
225    #[value(name = "ProgressBar", alias = "progress-bar")]
226    ProgressBar,
227    #[value(name = "TextUpdates", alias = "text-updates")]
228    TextUpdates,
229}
230
231pub enum GeneralProgressType {
232    User(ProgressType),
233    Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
234    RemoteMaster {
235        progress_type: ProgressType,
236        get_progress_snapshot:
237            Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>,
238    },
239}
240
241impl std::fmt::Debug for GeneralProgressType {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        match self {
244            GeneralProgressType::User(pt) => write!(f, "User({pt:?})"),
245            GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
246            GeneralProgressType::RemoteMaster { progress_type, .. } => {
247                write!(
248                    f,
249                    "RemoteMaster(progress_type: {progress_type:?}, <function>)"
250                )
251            }
252        }
253    }
254}
255
256#[derive(Debug)]
257pub struct ProgressSettings {
258    pub progress_type: GeneralProgressType,
259    pub progress_delay: Option<String>,
260}
261
262fn progress_bar(
263    lock: &std::sync::Mutex<bool>,
264    cvar: &std::sync::Condvar,
265    delay_opt: &Option<std::time::Duration>,
266) {
267    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
268    PBAR.set_style(
269        indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
270            .unwrap()
271            .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
272    );
273    let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
274    let mut is_done = lock.lock().unwrap();
275    loop {
276        PBAR.set_position(PBAR.position() + 1); // do we need to update?
277        PBAR.set_message(prog_printer.print().unwrap());
278        let result = cvar.wait_timeout(is_done, delay).unwrap();
279        is_done = result.0;
280        if *is_done {
281            break;
282        }
283    }
284    PBAR.finish_and_clear();
285}
286
287fn get_datetime_prefix() -> String {
288    chrono::Local::now()
289        .format("%Y-%m-%dT%H:%M:%S%.3f%:z")
290        .to_string()
291}
292
293fn text_updates(
294    lock: &std::sync::Mutex<bool>,
295    cvar: &std::sync::Condvar,
296    delay_opt: &Option<std::time::Duration>,
297) {
298    let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
299    let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
300    let mut is_done = lock.lock().unwrap();
301    loop {
302        eprintln!("=======================");
303        eprintln!(
304            "{}\n--{}",
305            get_datetime_prefix(),
306            prog_printer.print().unwrap()
307        );
308        let result = cvar.wait_timeout(is_done, delay).unwrap();
309        is_done = result.0;
310        if *is_done {
311            break;
312        }
313    }
314}
315
316fn rcpd_updates(
317    lock: &std::sync::Mutex<bool>,
318    cvar: &std::sync::Condvar,
319    delay_opt: &Option<std::time::Duration>,
320    sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
321) {
322    tracing::debug!("Starting rcpd progress updates");
323    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
324    let mut is_done = lock.lock().unwrap();
325    loop {
326        if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
327            // channel closed, receiver is done
328            tracing::debug!("Progress update channel closed, stopping progress updates");
329            break;
330        }
331        let result = cvar.wait_timeout(is_done, delay).unwrap();
332        is_done = result.0;
333        if *is_done {
334            break;
335        }
336    }
337}
338
339fn remote_master_updates<F>(
340    lock: &std::sync::Mutex<bool>,
341    cvar: &std::sync::Condvar,
342    delay_opt: &Option<std::time::Duration>,
343    get_progress_snapshot: F,
344    progress_type: ProgressType,
345) where
346    F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
347{
348    let interactive = match progress_type {
349        ProgressType::Auto => std::io::stderr().is_terminal(),
350        ProgressType::ProgressBar => true,
351        ProgressType::TextUpdates => false,
352    };
353    if interactive {
354        PBAR.set_style(
355            indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
356                .unwrap()
357                .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
358        );
359        let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
360        let mut printer = RcpdProgressPrinter::new();
361        let mut is_done = lock.lock().unwrap();
362        loop {
363            let progress_map = get_progress_snapshot();
364            let source_progress = &progress_map[RcpdType::Source];
365            let destination_progress = &progress_map[RcpdType::Destination];
366            PBAR.set_position(PBAR.position() + 1); // do we need to update?
367            PBAR.set_message(
368                printer
369                    .print(source_progress, destination_progress)
370                    .unwrap(),
371            );
372            let result = cvar.wait_timeout(is_done, delay).unwrap();
373            is_done = result.0;
374            if *is_done {
375                break;
376            }
377        }
378        PBAR.finish_and_clear();
379    } else {
380        let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
381        let mut printer = RcpdProgressPrinter::new();
382        let mut is_done = lock.lock().unwrap();
383        loop {
384            let progress_map = get_progress_snapshot();
385            let source_progress = &progress_map[RcpdType::Source];
386            let destination_progress = &progress_map[RcpdType::Destination];
387            eprintln!("=======================");
388            eprintln!(
389                "{}\n--{}",
390                get_datetime_prefix(),
391                printer
392                    .print(source_progress, destination_progress)
393                    .unwrap()
394            );
395            let result = cvar.wait_timeout(is_done, delay).unwrap();
396            is_done = result.0;
397            if *is_done {
398                break;
399            }
400        }
401    }
402}
403
404impl ProgressTracker {
405    pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
406        let lock_cvar =
407            std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
408        let lock_cvar_clone = lock_cvar.clone();
409        let pbar_thread = std::thread::spawn(move || {
410            let (lock, cvar) = &*lock_cvar_clone;
411            match progress_type {
412                GeneralProgressType::Remote(sender) => {
413                    rcpd_updates(lock, cvar, &delay_opt, sender);
414                }
415                GeneralProgressType::RemoteMaster {
416                    progress_type,
417                    get_progress_snapshot,
418                } => {
419                    remote_master_updates(
420                        lock,
421                        cvar,
422                        &delay_opt,
423                        get_progress_snapshot,
424                        progress_type,
425                    );
426                }
427                _ => {
428                    let interactive = match progress_type {
429                        GeneralProgressType::User(ProgressType::Auto) => {
430                            std::io::stderr().is_terminal()
431                        }
432                        GeneralProgressType::User(ProgressType::ProgressBar) => true,
433                        GeneralProgressType::User(ProgressType::TextUpdates) => false,
434                        GeneralProgressType::Remote(_)
435                        | GeneralProgressType::RemoteMaster { .. } => {
436                            unreachable!("Invalid progress type: {progress_type:?}")
437                        }
438                    };
439                    if interactive {
440                        progress_bar(lock, cvar, &delay_opt);
441                    } else {
442                        text_updates(lock, cvar, &delay_opt);
443                    }
444                }
445            }
446        });
447        Self {
448            lock_cvar,
449            pbar_thread: Some(pbar_thread),
450        }
451    }
452}
453
454impl Drop for ProgressTracker {
455    fn drop(&mut self) {
456        let (lock, cvar) = &*self.lock_cvar;
457        let mut is_done = lock.lock().unwrap();
458        *is_done = true;
459        cvar.notify_one();
460        drop(is_done);
461        if let Some(pbar_thread) = self.pbar_thread.take() {
462            pbar_thread.join().unwrap();
463        }
464    }
465}
466
467pub fn parse_metadata_cmp_settings(
468    settings: &str,
469) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
470    let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
471    for setting in settings.split(',') {
472        match setting {
473            "uid" => metadata_cmp_settings.uid = true,
474            "gid" => metadata_cmp_settings.gid = true,
475            "mode" => metadata_cmp_settings.mode = true,
476            "size" => metadata_cmp_settings.size = true,
477            "mtime" => metadata_cmp_settings.mtime = true,
478            "ctime" => metadata_cmp_settings.ctime = true,
479            _ => {
480                return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
481            }
482        }
483    }
484    Ok(metadata_cmp_settings)
485}
486
487fn parse_type_settings(
488    settings: &str,
489) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
490    let mut user_and_time = preserve::UserAndTimeSettings::default();
491    let mut mode_mask = None;
492    for setting in settings.split(',') {
493        match setting {
494            "uid" => user_and_time.uid = true,
495            "gid" => user_and_time.gid = true,
496            "time" => user_and_time.time = true,
497            _ => {
498                if let Ok(mask) = u32::from_str_radix(setting, 8) {
499                    mode_mask = Some(mask);
500                } else {
501                    return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
502                }
503            }
504        }
505    }
506    Ok((user_and_time, mode_mask))
507}
508
509pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
510    let mut preserve_settings = preserve::Settings::default();
511    for type_settings in settings.split(' ') {
512        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
513            let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
514                format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
515            )?;
516            match obj_type {
517                "f" | "file" => {
518                    preserve_settings.file = preserve::FileSettings::default();
519                    preserve_settings.file.user_and_time = user_and_time_settings;
520                    if let Some(mode) = mode_opt {
521                        preserve_settings.file.mode_mask = mode;
522                    }
523                }
524                "d" | "dir" | "directory" => {
525                    preserve_settings.dir = preserve::DirSettings::default();
526                    preserve_settings.dir.user_and_time = user_and_time_settings;
527                    if let Some(mode) = mode_opt {
528                        preserve_settings.dir.mode_mask = mode;
529                    }
530                }
531                "l" | "link" | "symlink" => {
532                    preserve_settings.symlink = preserve::SymlinkSettings::default();
533                    preserve_settings.symlink.user_and_time = user_and_time_settings;
534                }
535                _ => {
536                    return Err(anyhow!("Unknown object type: {}", obj_type));
537                }
538            }
539        } else {
540            return Err(anyhow!("Invalid preserve settings: {}", settings));
541        }
542    }
543    Ok(preserve_settings)
544}
545
546pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
547    let mut cmp_settings = cmp::ObjSettings::default();
548    for type_settings in settings.split(' ') {
549        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
550            let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
551                "parsing preserve settings: {obj_settings}, type: {obj_type}"
552            ))?;
553            let obj_type = match obj_type {
554                "f" | "file" => ObjType::File,
555                "d" | "dir" | "directory" => ObjType::Dir,
556                "l" | "link" | "symlink" => ObjType::Symlink,
557                "o" | "other" => ObjType::Other,
558                _ => {
559                    return Err(anyhow!("Unknown obj type: {}", obj_type));
560                }
561            };
562            cmp_settings[obj_type] = obj_cmp_settings;
563        } else {
564            return Err(anyhow!("Invalid preserve settings: {}", settings));
565        }
566    }
567    Ok(cmp_settings)
568}
569
570pub async fn cmp(
571    src: &std::path::Path,
572    dst: &std::path::Path,
573    log: &cmp::LogWriter,
574    settings: &cmp::Settings,
575) -> Result<cmp::Summary, anyhow::Error> {
576    cmp::cmp(&PROGRESS, src, dst, log, settings).await
577}
578
579pub async fn copy(
580    src: &std::path::Path,
581    dst: &std::path::Path,
582    settings: &copy::Settings,
583    preserve: &preserve::Settings,
584) -> Result<copy::Summary, copy::Error> {
585    copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
586}
587
588pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
589    rm::rm(&PROGRESS, path, settings).await
590}
591
592pub async fn link(
593    src: &std::path::Path,
594    dst: &std::path::Path,
595    update: &Option<std::path::PathBuf>,
596    settings: &link::Settings,
597) -> Result<link::Summary, link::Error> {
598    let cwd = std::env::current_dir()
599        .with_context(|| "failed to get current working directory")
600        .map_err(|err| link::Error::new(err, link::Summary::default()))?;
601    link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
602}
603
604fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
605    match std::env::var(name) {
606        Ok(val) => match val.parse() {
607            Ok(val) => val,
608            Err(_) => default,
609        },
610        Err(_) => default,
611    }
612}
613
614/// collects runtime statistics (CPU time, memory) for the current process
615#[must_use]
616pub fn collect_runtime_stats() -> RuntimeStats {
617    collect_runtime_stats_inner(procfs::process::Process::myself().ok())
618}
619
620fn collect_runtime_stats_inner(process: Option<procfs::process::Process>) -> RuntimeStats {
621    let Some(process) = process else {
622        return RuntimeStats::default();
623    };
624    collect_runtime_stats_for_process(&process).unwrap_or_default()
625}
626
627fn collect_runtime_stats_for_process(
628    process: &procfs::process::Process,
629) -> anyhow::Result<RuntimeStats> {
630    let stat = process.stat()?;
631    let clock_ticks = procfs::ticks_per_second() as f64;
632    // vmhwm from /proc/[pid]/status is in kB, convert to bytes
633    let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
634    Ok(RuntimeStats {
635        cpu_time_user_ms: ((stat.utime as f64 / clock_ticks) * 1000.0) as u64,
636        cpu_time_kernel_ms: ((stat.stime as f64 / clock_ticks) * 1000.0) as u64,
637        peak_rss_bytes: vmhwm_kb * 1024,
638    })
639}
640
641fn print_runtime_stats_for_role(prefix: &str, stats: &RuntimeStats) {
642    let cpu_total =
643        std::time::Duration::from_millis(stats.cpu_time_user_ms + stats.cpu_time_kernel_ms);
644    let cpu_kernel = std::time::Duration::from_millis(stats.cpu_time_kernel_ms);
645    let cpu_user = std::time::Duration::from_millis(stats.cpu_time_user_ms);
646    println!(
647        "{prefix}cpu time : {:.2?} | k: {:.2?} | u: {:.2?}",
648        cpu_total, cpu_kernel, cpu_user
649    );
650    println!(
651        "{prefix}peak RSS : {}",
652        bytesize::ByteSize(stats.peak_rss_bytes)
653    );
654}
655
656#[rustfmt::skip]
657fn print_runtime_stats() -> Result<(), anyhow::Error> {
658    // check if we have remote runtime stats (from a remote copy operation)
659    let remote_stats = REMOTE_RUNTIME_STATS.lock().unwrap().take();
660    if let Some(remote) = remote_stats {
661        // print global walltime first
662        println!("walltime : {:.2?}", &PROGRESS.get_duration());
663        println!();
664        let source_is_local = is_localhost(&remote.source_host);
665        let dest_is_local = is_localhost(&remote.dest_host);
666        // collect master stats
667        let master_stats = collect_runtime_stats();
668        // print non-localhost roles first
669        if !source_is_local {
670            println!("SOURCE ({}):", remote.source_host);
671            print_runtime_stats_for_role("  ", &remote.source_stats);
672            println!();
673        }
674        if !dest_is_local {
675            println!("DESTINATION ({}):", remote.dest_host);
676            print_runtime_stats_for_role("  ", &remote.dest_stats);
677            println!();
678        }
679        // print combined localhost section
680        match (source_is_local, dest_is_local) {
681            (true, true) => {
682                println!("MASTER + SOURCE + DESTINATION (localhost):");
683                print_runtime_stats_for_role("  master ", &master_stats);
684                print_runtime_stats_for_role("  source ", &remote.source_stats);
685                print_runtime_stats_for_role("  dest   ", &remote.dest_stats);
686            }
687            (true, false) => {
688                println!("MASTER + SOURCE (localhost):");
689                print_runtime_stats_for_role("  master ", &master_stats);
690                print_runtime_stats_for_role("  source ", &remote.source_stats);
691            }
692            (false, true) => {
693                println!("MASTER + DESTINATION (localhost):");
694                print_runtime_stats_for_role("  master ", &master_stats);
695                print_runtime_stats_for_role("  dest   ", &remote.dest_stats);
696            }
697            (false, false) => {
698                println!("MASTER (localhost):");
699                print_runtime_stats_for_role("  ", &master_stats);
700            }
701        }
702        return Ok(());
703    }
704    // local operation - print stats for this process only
705    let process = procfs::process::Process::myself()?;
706    let stat = process.stat()?;
707    // The time is in clock ticks, so we need to convert it to seconds
708    let clock_ticks_per_second = procfs::ticks_per_second();
709    let ticks_to_duration = |ticks: u64| {
710        std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
711    };
712    // vmhwm from /proc/[pid]/status is in kB, convert to bytes
713    let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
714    println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
715    println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
716    println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm_kb * 1024));
717    Ok(())
718}
719
720fn get_max_open_files() -> Result<u64, std::io::Error> {
721    let mut rlim = libc::rlimit {
722        rlim_cur: 0,
723        rlim_max: 0,
724    };
725    // Safety: we pass a valid "rlim" pointer and the result is checked
726    let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
727    if result == 0 {
728        Ok(rlim.rlim_cur)
729    } else {
730        Err(std::io::Error::last_os_error())
731    }
732}
733
734struct ProgWriter {}
735
736impl ProgWriter {
737    fn new() -> Self {
738        Self {}
739    }
740}
741
742impl std::io::Write for ProgWriter {
743    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
744        PBAR.suspend(|| std::io::stdout().write(buf))
745    }
746    fn flush(&mut self) -> std::io::Result<()> {
747        std::io::stdout().flush()
748    }
749}
750
751fn get_hostname() -> String {
752    nix::unistd::gethostname()
753        .ok()
754        .and_then(|os_str| os_str.into_string().ok())
755        .unwrap_or_else(|| "unknown".to_string())
756}
757
758#[must_use]
759pub fn generate_debug_log_filename(prefix: &str) -> String {
760    let now = chrono::Utc::now();
761    let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
762    let process_id = std::process::id();
763    format!("{prefix}-{timestamp}-{process_id}")
764}
765
766/// Generate a trace filename with identifier, hostname, PID, and timestamp.
767///
768/// `identifier` should be "rcp", "rcpd-source", or "rcpd-destination"
769#[must_use]
770pub fn generate_trace_filename(prefix: &str, identifier: &str, extension: &str) -> String {
771    let hostname = get_hostname();
772    let pid = std::process::id();
773    let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S");
774    format!("{prefix}-{identifier}-{hostname}-{pid}-{timestamp}.{extension}")
775}
776
777#[instrument(skip(func))] // "func" is not Debug printable
778pub fn run<Fut, Summary, Error>(
779    progress: Option<ProgressSettings>,
780    output: OutputConfig,
781    runtime: RuntimeConfig,
782    throttle: ThrottleConfig,
783    tracing_config: TracingConfig,
784    func: impl FnOnce() -> Fut,
785) -> Option<Summary>
786// we return an Option rather than a Result to indicate that callers of this function should NOT print the error
787where
788    Summary: std::fmt::Display,
789    Error: std::fmt::Display + std::fmt::Debug,
790    Fut: std::future::Future<Output = Result<Summary, Error>>,
791{
792    // force initialization of PROGRESS to set start_time at the beginning of the run
793    // (for remote master operations, PROGRESS is otherwise only accessed at the end in
794    // print_runtime_stats(), leading to near-zero walltime)
795    let _ = get_progress();
796    // validate configuration
797    if let Err(e) = throttle.validate() {
798        eprintln!("Configuration error: {e}");
799        return None;
800    }
801    // unpack configs for internal use
802    let OutputConfig {
803        quiet,
804        verbose,
805        print_summary,
806    } = output;
807    let RuntimeConfig {
808        max_workers,
809        max_blocking_threads,
810    } = runtime;
811    let ThrottleConfig {
812        max_open_files,
813        ops_throttle,
814        iops_throttle,
815        chunk_size: _,
816    } = throttle;
817    let TracingConfig {
818        remote_layer: remote_tracing_layer,
819        debug_log_file,
820        chrome_trace_prefix,
821        flamegraph_prefix,
822        trace_identifier,
823        profile_level,
824        tokio_console,
825        tokio_console_port,
826    } = tracing_config;
827    // guards must be kept alive for the duration of the run to ensure traces are flushed
828    let mut _chrome_guard: Option<tracing_chrome::FlushGuard> = None;
829    let mut _flame_guard: Option<tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>> =
830        None;
831    if quiet {
832        assert!(
833            verbose == 0,
834            "Quiet mode and verbose mode are mutually exclusive"
835        );
836    } else {
837        // helper to create the verbose-level filter consistently
838        let make_env_filter = || {
839            let level_directive = match verbose {
840                0 => "error".parse().unwrap(),
841                1 => "info".parse().unwrap(),
842                2 => "debug".parse().unwrap(),
843                _ => "trace".parse().unwrap(),
844            };
845            // filter out noisy dependencies - they're extremely verbose at DEBUG/TRACE level
846            // and not useful for debugging rcp
847            tracing_subscriber::EnvFilter::from_default_env()
848                .add_directive(level_directive)
849                .add_directive("tokio=info".parse().unwrap())
850                .add_directive("runtime=info".parse().unwrap())
851                .add_directive("quinn=warn".parse().unwrap())
852                .add_directive("rustls=warn".parse().unwrap())
853                .add_directive("h2=warn".parse().unwrap())
854        };
855        let file_layer = if let Some(ref log_file_path) = debug_log_file {
856            let file = std::fs::OpenOptions::new()
857                .create(true)
858                .append(true)
859                .open(log_file_path)
860                .unwrap_or_else(|e| {
861                    panic!("Failed to create debug log file at '{log_file_path}': {e}")
862                });
863            let file_layer = tracing_subscriber::fmt::layer()
864                .with_target(true)
865                .with_line_number(true)
866                .with_thread_ids(true)
867                .with_timer(LocalTimeFormatter)
868                .with_ansi(false)
869                .with_writer(file)
870                .with_filter(make_env_filter());
871            Some(file_layer)
872        } else {
873            None
874        };
875        // fmt_layer for local console output (when not using remote tracing)
876        let fmt_layer = if remote_tracing_layer.is_some() {
877            None
878        } else {
879            let fmt_layer = tracing_subscriber::fmt::layer()
880                .with_target(true)
881                .with_line_number(true)
882                .with_span_events(if verbose > 2 {
883                    FmtSpan::NEW | FmtSpan::CLOSE
884                } else {
885                    FmtSpan::NONE
886                })
887                .with_timer(LocalTimeFormatter)
888                .pretty()
889                .with_writer(ProgWriter::new)
890                .with_filter(make_env_filter());
891            Some(fmt_layer)
892        };
893        // apply env_filter to remote_tracing_layer so it respects verbose level
894        let remote_tracing_layer =
895            remote_tracing_layer.map(|layer| layer.with_filter(make_env_filter()));
896        let console_layer = if tokio_console {
897            let console_port = tokio_console_port.unwrap_or(6669);
898            let retention_seconds: u64 =
899                read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
900            eprintln!("Tokio console server listening on 127.0.0.1:{console_port}");
901            let console_layer = console_subscriber::ConsoleLayer::builder()
902                .retention(std::time::Duration::from_secs(retention_seconds))
903                .server_addr(([127, 0, 0, 1], console_port))
904                .spawn();
905            Some(console_layer)
906        } else {
907            None
908        };
909        // build profile filter for chrome/flame layers
910        // uses EnvFilter to capture spans from our crates at the specified level
911        // while excluding noisy dependencies like tokio, quinn, h2, etc.
912        let profiling_enabled = chrome_trace_prefix.is_some() || flamegraph_prefix.is_some();
913        let profile_filter_str = if profiling_enabled {
914            let level_str = profile_level.as_deref().unwrap_or("trace");
915            // validate level is a known tracing level
916            let valid_levels = ["trace", "debug", "info", "warn", "error", "off"];
917            if !valid_levels.contains(&level_str.to_lowercase().as_str()) {
918                eprintln!(
919                    "Invalid --profile-level '{}'. Valid values: trace, debug, info, warn, error, off",
920                    level_str
921                );
922                std::process::exit(1);
923            }
924            // exclude noisy deps, include everything else at the profile level
925            Some(format!(
926                "tokio=off,quinn=off,h2=off,hyper=off,rustls=off,{}",
927                level_str
928            ))
929        } else {
930            None
931        };
932        // helper to create profile filter (already validated above)
933        let make_profile_filter =
934            || tracing_subscriber::EnvFilter::new(profile_filter_str.as_ref().unwrap());
935        // chrome tracing layer (produces JSON viewable in Perfetto UI)
936        let chrome_layer = if let Some(ref prefix) = chrome_trace_prefix {
937            let filename = generate_trace_filename(prefix, &trace_identifier, "json");
938            eprintln!("Chrome trace will be written to: {filename}");
939            let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
940                .file(&filename)
941                .include_args(true)
942                .build();
943            _chrome_guard = Some(guard);
944            Some(layer.with_filter(make_profile_filter()))
945        } else {
946            None
947        };
948        // flamegraph layer (produces folded stacks for inferno)
949        let flame_layer = if let Some(ref prefix) = flamegraph_prefix {
950            let filename = generate_trace_filename(prefix, &trace_identifier, "folded");
951            eprintln!("Flamegraph data will be written to: {filename}");
952            match tracing_flame::FlameLayer::with_file(&filename) {
953                Ok((layer, guard)) => {
954                    _flame_guard = Some(guard);
955                    Some(layer.with_filter(make_profile_filter()))
956                }
957                Err(e) => {
958                    eprintln!("Failed to create flamegraph layer: {e}");
959                    None
960                }
961            }
962        } else {
963            None
964        };
965        tracing_subscriber::registry()
966            .with(file_layer)
967            .with(fmt_layer)
968            .with(remote_tracing_layer)
969            .with(console_layer)
970            .with(chrome_layer)
971            .with(flame_layer)
972            .init();
973    }
974    let mut builder = tokio::runtime::Builder::new_multi_thread();
975    builder.enable_all();
976    if max_workers > 0 {
977        builder.worker_threads(max_workers);
978    }
979    if max_blocking_threads > 0 {
980        builder.max_blocking_threads(max_blocking_threads);
981    }
982    if !sysinfo::set_open_files_limit(isize::MAX) {
983        tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
984    }
985    let set_max_open_files = max_open_files.unwrap_or_else(|| {
986        let limit = get_max_open_files().expect(
987            "We failed to query rlimit, if this is expected try specifying --max-open-files",
988        ) as usize;
989        80 * limit / 100 // ~80% of the max open files limit
990    });
991    if set_max_open_files > 0 {
992        tracing::info!("Setting max open files to: {}", set_max_open_files);
993        throttle::set_max_open_files(set_max_open_files);
994    } else {
995        tracing::info!("Not applying any limit to max open files!");
996    }
997    let runtime = builder.build().expect("Failed to create runtime");
998    fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
999        let mut replenish = replenish;
1000        let mut interval = std::time::Duration::from_secs(1);
1001        while replenish > 100 && interval > std::time::Duration::from_millis(1) {
1002            replenish /= 10;
1003            interval /= 10;
1004        }
1005        (replenish, interval)
1006    }
1007    if ops_throttle > 0 {
1008        let (replenish, interval) = get_replenish_interval(ops_throttle);
1009        throttle::init_ops_tokens(replenish);
1010        runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
1011    }
1012    if iops_throttle > 0 {
1013        let (replenish, interval) = get_replenish_interval(iops_throttle);
1014        throttle::init_iops_tokens(replenish);
1015        runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
1016    }
1017    let res = {
1018        let _progress = progress.map(|settings| {
1019            tracing::debug!("Requesting progress updates {settings:?}");
1020            let delay = settings.progress_delay.map(|delay_str| {
1021                humantime::parse_duration(&delay_str)
1022                    .expect("Couldn't parse duration out of --progress-delay")
1023            });
1024            ProgressTracker::new(settings.progress_type, delay)
1025        });
1026        runtime.block_on(func())
1027    };
1028    match &res {
1029        Ok(summary) => {
1030            if print_summary || verbose > 0 {
1031                println!("{summary}");
1032            }
1033        }
1034        Err(err) => {
1035            if !quiet {
1036                println!("{err:?}");
1037            }
1038        }
1039    }
1040    if print_summary || verbose > 0 {
1041        if let Err(err) = print_runtime_stats() {
1042            println!("Failed to print runtime stats: {err:?}");
1043        }
1044    }
1045    res.ok()
1046}
1047
1048#[cfg(test)]
1049mod runtime_stats_tests {
1050    use super::*;
1051    use anyhow::Result;
1052
1053    #[test]
1054    fn collect_runtime_stats_matches_procfs_snapshot() -> Result<()> {
1055        let process = procfs::process::Process::myself()?;
1056        let expected = collect_runtime_stats_for_process(&process)?;
1057        let actual = collect_runtime_stats();
1058        let cpu_tolerance_ms = 50;
1059        let rss_tolerance_bytes = 1_000_000;
1060        assert!(
1061            expected.cpu_time_user_ms.abs_diff(actual.cpu_time_user_ms) <= cpu_tolerance_ms,
1062            "user CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1063            expected.cpu_time_user_ms,
1064            actual.cpu_time_user_ms
1065        );
1066        assert!(
1067            expected
1068                .cpu_time_kernel_ms
1069                .abs_diff(actual.cpu_time_kernel_ms)
1070                <= cpu_tolerance_ms,
1071            "kernel CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1072            expected.cpu_time_kernel_ms,
1073            actual.cpu_time_kernel_ms
1074        );
1075        assert!(
1076            expected.peak_rss_bytes.abs_diff(actual.peak_rss_bytes) <= rss_tolerance_bytes,
1077            "peak RSS deviated by more than {rss_tolerance_bytes} bytes: expected {}, got {}",
1078            expected.peak_rss_bytes,
1079            actual.peak_rss_bytes
1080        );
1081        Ok(())
1082    }
1083
1084    #[test]
1085    fn collect_runtime_stats_returns_default_on_error() {
1086        let stats = collect_runtime_stats_inner(None);
1087        assert_eq!(stats, RuntimeStats::default());
1088
1089        let nonexistent_process = procfs::process::Process::new(i32::MAX).ok();
1090        let stats = collect_runtime_stats_inner(nonexistent_process);
1091        assert_eq!(stats, RuntimeStats::default());
1092    }
1093}