common/
lib.rs

1//! Common utilities and types for RCP file operation tools
2//!
3//! This crate provides shared functionality used across all RCP tools (`rcp`, `rrm`, `rlink`, `rcmp`).
4//! It includes core operations (copy, remove, link, compare), progress reporting, metadata preservation,
5//! and runtime configuration.
6//!
7//! # Core Modules
8//!
9//! - [`mod@copy`] - File copying operations with metadata preservation and error handling
10//! - [`mod@rm`] - File removal operations
11//! - [`mod@link`] - Hard-linking operations
12//! - [`mod@cmp`] - File comparison operations (metadata-based)
13//! - [`mod@preserve`] - Metadata preservation settings and operations
14//! - [`mod@progress`] - Progress tracking and reporting
15//! - [`mod@filecmp`] - File metadata comparison utilities
16//! - [`mod@remote_tracing`] - Remote tracing support for distributed operations
17//!
18//! # Key Types
19//!
20//! ## `RcpdType`
21//!
22//! Identifies the role of a remote copy daemon:
23//! - `Source` - reads files from source host
24//! - `Destination` - writes files to destination host
25//!
26//! ## `ProgressType`
27//!
28//! Controls progress reporting display:
29//! - `Auto` - automatically choose based on terminal type
30//! - `ProgressBar` - animated progress bar (for interactive terminals)
31//! - `TextUpdates` - periodic text updates (for logging/non-interactive)
32//!
33//! # Progress Reporting
34//!
35//! The crate provides a global progress tracking system accessible via [`get_progress()`].
36//! Progress can be displayed in different formats depending on the execution context.
37//!
38//! Progress output goes to stderr, while logs go to stdout, allowing users to redirect
39//! logs to a file while still viewing interactive progress.
40//!
41//! # Runtime Configuration
42//!
43//! The [`run`] function provides a unified entry point for all RCP tools with support for:
44//! - Progress tracking and reporting
45//! - Logging configuration (quiet/verbose modes)
46//! - Resource limits (max workers, open files, throttling)
47//! - Tokio runtime setup
48//! - Remote tracing integration
49//!
50//! # Examples
51//!
52//! ## Basic Copy Operation
53//!
54//! ```rust,no_run
55//! use std::path::Path;
56//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
57//! let src = Path::new("/source");
58//! let dst = Path::new("/destination");
59//!
60//! let settings = common::copy::Settings {
61//!     dereference: false,
62//!     fail_early: false,
63//!     overwrite: false,
64//!     overwrite_compare: Default::default(),
65//!     chunk_size: 0,
66//! };
67//! let preserve = common::preserve::preserve_default();
68//!
69//! let summary = common::copy(src, dst, &settings, &preserve).await?;
70//! println!("Copied {} files", summary.files_copied);
71//! # Ok(())
72//! # }
73//! ```
74//!
75//! ## Metadata Comparison
76//!
77//! ```rust,no_run
78//! use std::path::Path;
79//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
80//! let src = Path::new("/path1");
81//! let dst = Path::new("/path2");
82//!
83//! let log = common::cmp::LogWriter::new(None).await?;
84//! let settings = common::cmp::Settings {
85//!     fail_early: false,
86//!     exit_early: false,
87//!     compare: Default::default(),
88//! };
89//!
90//! let summary = common::cmp(src, dst, &log, &settings).await?;
91//! println!("Comparison complete: {}", summary);
92//! # Ok(())
93//! # }
94//! ```
95
96#[macro_use]
97extern crate lazy_static;
98
99use crate::cmp::ObjType;
100use anyhow::anyhow;
101use anyhow::Context;
102use std::io::IsTerminal;
103use tracing::instrument;
104use tracing_subscriber::fmt::format::FmtSpan;
105use tracing_subscriber::prelude::*;
106
107pub mod cmp;
108pub mod copy;
109pub mod filegen;
110pub mod link;
111pub mod preserve;
112pub mod remote_tracing;
113pub mod rm;
114
115pub mod filecmp;
116pub mod progress;
117mod testutils;
118
119pub use progress::{RcpdProgressPrinter, SerializableProgress};
120
121// Define RcpdType in common since remote depends on common
122#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
123pub enum RcpdType {
124    Source,
125    Destination,
126}
127
128impl std::fmt::Display for RcpdType {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        match self {
131            RcpdType::Source => write!(f, "source"),
132            RcpdType::Destination => write!(f, "destination"),
133        }
134    }
135}
136
137// Type alias for progress snapshots
138pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
139
140lazy_static! {
141    static ref PROGRESS: progress::Progress = progress::Progress::new();
142    static ref PBAR: indicatif::ProgressBar = indicatif::ProgressBar::new_spinner();
143}
144
145#[must_use]
146pub fn get_progress() -> &'static progress::Progress {
147    &PROGRESS
148}
149
150struct LocalTimeFormatter;
151
152impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFormatter {
153    fn format_time(
154        &self,
155        writer: &mut tracing_subscriber::fmt::format::Writer<'_>,
156    ) -> std::fmt::Result {
157        let now = chrono::Local::now();
158        writer.write_str(&now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
159    }
160}
161
162struct ProgressTracker {
163    lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
164    pbar_thread: Option<std::thread::JoinHandle<()>>,
165}
166
167#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
168pub enum ProgressType {
169    #[default]
170    #[value(name = "auto", alias = "Auto")]
171    Auto,
172    #[value(name = "ProgressBar", alias = "progress-bar")]
173    ProgressBar,
174    #[value(name = "TextUpdates", alias = "text-updates")]
175    TextUpdates,
176}
177
178pub enum GeneralProgressType {
179    User(ProgressType),
180    Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
181    RemoteMaster {
182        progress_type: ProgressType,
183        get_progress_snapshot:
184            Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>,
185    },
186}
187
188impl std::fmt::Debug for GeneralProgressType {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        match self {
191            GeneralProgressType::User(pt) => write!(f, "User({pt:?})"),
192            GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
193            GeneralProgressType::RemoteMaster { progress_type, .. } => {
194                write!(
195                    f,
196                    "RemoteMaster(progress_type: {progress_type:?}, <function>)"
197                )
198            }
199        }
200    }
201}
202
203#[derive(Debug)]
204pub struct ProgressSettings {
205    pub progress_type: GeneralProgressType,
206    pub progress_delay: Option<String>,
207}
208
209fn progress_bar(
210    lock: &std::sync::Mutex<bool>,
211    cvar: &std::sync::Condvar,
212    delay_opt: &Option<std::time::Duration>,
213) {
214    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
215    PBAR.set_style(
216        indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
217            .unwrap()
218            .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
219    );
220    let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
221    let mut is_done = lock.lock().unwrap();
222    loop {
223        PBAR.set_position(PBAR.position() + 1); // do we need to update?
224        PBAR.set_message(prog_printer.print().unwrap());
225        let result = cvar.wait_timeout(is_done, delay).unwrap();
226        is_done = result.0;
227        if *is_done {
228            break;
229        }
230    }
231    PBAR.finish_and_clear();
232}
233
234fn get_datetime_prefix() -> String {
235    chrono::Local::now()
236        .format("%Y-%m-%dT%H:%M:%S%.3f%:z")
237        .to_string()
238}
239
240fn text_updates(
241    lock: &std::sync::Mutex<bool>,
242    cvar: &std::sync::Condvar,
243    delay_opt: &Option<std::time::Duration>,
244) {
245    let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
246    let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
247    let mut is_done = lock.lock().unwrap();
248    loop {
249        eprintln!("=======================");
250        eprintln!(
251            "{}\n--{}",
252            get_datetime_prefix(),
253            prog_printer.print().unwrap()
254        );
255        let result = cvar.wait_timeout(is_done, delay).unwrap();
256        is_done = result.0;
257        if *is_done {
258            break;
259        }
260    }
261}
262
263fn rcpd_updates(
264    lock: &std::sync::Mutex<bool>,
265    cvar: &std::sync::Condvar,
266    delay_opt: &Option<std::time::Duration>,
267    sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
268) {
269    tracing::debug!("Starting rcpd progress updates");
270    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
271    let mut is_done = lock.lock().unwrap();
272    loop {
273        if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
274            // channel closed, receiver is done
275            tracing::debug!("Progress update channel closed, stopping progress updates");
276            break;
277        }
278        let result = cvar.wait_timeout(is_done, delay).unwrap();
279        is_done = result.0;
280        if *is_done {
281            break;
282        }
283    }
284}
285
286fn remote_master_updates<F>(
287    lock: &std::sync::Mutex<bool>,
288    cvar: &std::sync::Condvar,
289    delay_opt: &Option<std::time::Duration>,
290    get_progress_snapshot: F,
291    progress_type: ProgressType,
292) where
293    F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
294{
295    let interactive = match progress_type {
296        ProgressType::Auto => std::io::stderr().is_terminal(),
297        ProgressType::ProgressBar => true,
298        ProgressType::TextUpdates => false,
299    };
300    if interactive {
301        PBAR.set_style(
302            indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
303                .unwrap()
304                .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
305        );
306        let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
307        let mut printer = RcpdProgressPrinter::new();
308        let mut is_done = lock.lock().unwrap();
309        loop {
310            let progress_map = get_progress_snapshot();
311            let source_progress = &progress_map[RcpdType::Source];
312            let destination_progress = &progress_map[RcpdType::Destination];
313            PBAR.set_position(PBAR.position() + 1); // do we need to update?
314            PBAR.set_message(
315                printer
316                    .print(source_progress, destination_progress)
317                    .unwrap(),
318            );
319            let result = cvar.wait_timeout(is_done, delay).unwrap();
320            is_done = result.0;
321            if *is_done {
322                break;
323            }
324        }
325        PBAR.finish_and_clear();
326    } else {
327        let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
328        let mut printer = RcpdProgressPrinter::new();
329        let mut is_done = lock.lock().unwrap();
330        loop {
331            let progress_map = get_progress_snapshot();
332            let source_progress = &progress_map[RcpdType::Source];
333            let destination_progress = &progress_map[RcpdType::Destination];
334            eprintln!("=======================");
335            eprintln!(
336                "{}\n--{}",
337                get_datetime_prefix(),
338                printer
339                    .print(source_progress, destination_progress)
340                    .unwrap()
341            );
342            let result = cvar.wait_timeout(is_done, delay).unwrap();
343            is_done = result.0;
344            if *is_done {
345                break;
346            }
347        }
348    }
349}
350
351impl ProgressTracker {
352    pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
353        let lock_cvar =
354            std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
355        let lock_cvar_clone = lock_cvar.clone();
356        let pbar_thread = std::thread::spawn(move || {
357            let (lock, cvar) = &*lock_cvar_clone;
358            match progress_type {
359                GeneralProgressType::Remote(sender) => {
360                    rcpd_updates(lock, cvar, &delay_opt, sender);
361                }
362                GeneralProgressType::RemoteMaster {
363                    progress_type,
364                    get_progress_snapshot,
365                } => {
366                    remote_master_updates(
367                        lock,
368                        cvar,
369                        &delay_opt,
370                        get_progress_snapshot,
371                        progress_type,
372                    );
373                }
374                _ => {
375                    let interactive = match progress_type {
376                        GeneralProgressType::User(ProgressType::Auto) => {
377                            std::io::stderr().is_terminal()
378                        }
379                        GeneralProgressType::User(ProgressType::ProgressBar) => true,
380                        GeneralProgressType::User(ProgressType::TextUpdates) => false,
381                        GeneralProgressType::Remote(_)
382                        | GeneralProgressType::RemoteMaster { .. } => {
383                            unreachable!("Invalid progress type: {progress_type:?}")
384                        }
385                    };
386                    if interactive {
387                        progress_bar(lock, cvar, &delay_opt);
388                    } else {
389                        text_updates(lock, cvar, &delay_opt);
390                    }
391                }
392            }
393        });
394        Self {
395            lock_cvar,
396            pbar_thread: Some(pbar_thread),
397        }
398    }
399}
400
401impl Drop for ProgressTracker {
402    fn drop(&mut self) {
403        let (lock, cvar) = &*self.lock_cvar;
404        let mut is_done = lock.lock().unwrap();
405        *is_done = true;
406        cvar.notify_one();
407        drop(is_done);
408        if let Some(pbar_thread) = self.pbar_thread.take() {
409            pbar_thread.join().unwrap();
410        }
411    }
412}
413
414pub fn parse_metadata_cmp_settings(
415    settings: &str,
416) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
417    let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
418    for setting in settings.split(',') {
419        match setting {
420            "uid" => metadata_cmp_settings.uid = true,
421            "gid" => metadata_cmp_settings.gid = true,
422            "mode" => metadata_cmp_settings.mode = true,
423            "size" => metadata_cmp_settings.size = true,
424            "mtime" => metadata_cmp_settings.mtime = true,
425            "ctime" => metadata_cmp_settings.ctime = true,
426            _ => {
427                return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
428            }
429        }
430    }
431    Ok(metadata_cmp_settings)
432}
433
434fn parse_type_settings(
435    settings: &str,
436) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
437    let mut user_and_time = preserve::UserAndTimeSettings::default();
438    let mut mode_mask = None;
439    for setting in settings.split(',') {
440        match setting {
441            "uid" => user_and_time.uid = true,
442            "gid" => user_and_time.gid = true,
443            "time" => user_and_time.time = true,
444            _ => {
445                if let Ok(mask) = u32::from_str_radix(setting, 8) {
446                    mode_mask = Some(mask);
447                } else {
448                    return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
449                }
450            }
451        }
452    }
453    Ok((user_and_time, mode_mask))
454}
455
456pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
457    let mut preserve_settings = preserve::Settings::default();
458    for type_settings in settings.split(' ') {
459        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
460            let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
461                format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
462            )?;
463            match obj_type {
464                "f" | "file" => {
465                    preserve_settings.file = preserve::FileSettings::default();
466                    preserve_settings.file.user_and_time = user_and_time_settings;
467                    if let Some(mode) = mode_opt {
468                        preserve_settings.file.mode_mask = mode;
469                    }
470                }
471                "d" | "dir" | "directory" => {
472                    preserve_settings.dir = preserve::DirSettings::default();
473                    preserve_settings.dir.user_and_time = user_and_time_settings;
474                    if let Some(mode) = mode_opt {
475                        preserve_settings.dir.mode_mask = mode;
476                    }
477                }
478                "l" | "link" | "symlink" => {
479                    preserve_settings.symlink = preserve::SymlinkSettings::default();
480                    preserve_settings.symlink.user_and_time = user_and_time_settings;
481                }
482                _ => {
483                    return Err(anyhow!("Unknown object type: {}", obj_type));
484                }
485            }
486        } else {
487            return Err(anyhow!("Invalid preserve settings: {}", settings));
488        }
489    }
490    Ok(preserve_settings)
491}
492
493pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
494    let mut cmp_settings = cmp::ObjSettings::default();
495    for type_settings in settings.split(' ') {
496        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
497            let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
498                "parsing preserve settings: {obj_settings}, type: {obj_type}"
499            ))?;
500            let obj_type = match obj_type {
501                "f" | "file" => ObjType::File,
502                "d" | "dir" | "directory" => ObjType::Dir,
503                "l" | "link" | "symlink" => ObjType::Symlink,
504                "o" | "other" => ObjType::Other,
505                _ => {
506                    return Err(anyhow!("Unknown obj type: {}", obj_type));
507                }
508            };
509            cmp_settings[obj_type] = obj_cmp_settings;
510        } else {
511            return Err(anyhow!("Invalid preserve settings: {}", settings));
512        }
513    }
514    Ok(cmp_settings)
515}
516
517pub async fn cmp(
518    src: &std::path::Path,
519    dst: &std::path::Path,
520    log: &cmp::LogWriter,
521    settings: &cmp::Settings,
522) -> Result<cmp::Summary, anyhow::Error> {
523    cmp::cmp(&PROGRESS, src, dst, log, settings).await
524}
525
526pub async fn copy(
527    src: &std::path::Path,
528    dst: &std::path::Path,
529    settings: &copy::Settings,
530    preserve: &preserve::Settings,
531) -> Result<copy::Summary, copy::Error> {
532    copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
533}
534
535pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
536    rm::rm(&PROGRESS, path, settings).await
537}
538
539pub async fn link(
540    src: &std::path::Path,
541    dst: &std::path::Path,
542    update: &Option<std::path::PathBuf>,
543    settings: &link::Settings,
544) -> Result<link::Summary, link::Error> {
545    let cwd = std::env::current_dir()
546        .map_err(|err| link::Error::new(anyhow::Error::msg(err), link::Summary::default()))?;
547    link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
548}
549
550fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
551    match std::env::var(name) {
552        Ok(val) => match val.parse() {
553            Ok(val) => val,
554            Err(_) => default,
555        },
556        Err(_) => default,
557    }
558}
559
560#[rustfmt::skip]
561fn print_runtime_stats() -> Result<(), anyhow::Error> {
562    let process = procfs::process::Process::myself()?;
563    let stat = process.stat()?;
564    // The time is in clock ticks, so we need to convert it to seconds
565    let clock_ticks_per_second = procfs::ticks_per_second();
566    let ticks_to_duration = |ticks: u64| {
567        std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
568    };
569    let vmhwm = process.status()?.vmhwm.unwrap_or(0);
570    println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
571    println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
572    println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm));
573    Ok(())
574}
575
576fn get_max_open_files() -> Result<u64, std::io::Error> {
577    let mut rlim = libc::rlimit {
578        rlim_cur: 0,
579        rlim_max: 0,
580    };
581    // Safety: we pass a valid "rlim" pointer and the result is checked
582    let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
583    if result == 0 {
584        Ok(rlim.rlim_cur)
585    } else {
586        Err(std::io::Error::last_os_error())
587    }
588}
589
590struct ProgWriter {}
591
592impl ProgWriter {
593    fn new() -> Self {
594        Self {}
595    }
596}
597
598impl std::io::Write for ProgWriter {
599    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
600        PBAR.suspend(|| std::io::stdout().write(buf))
601    }
602    fn flush(&mut self) -> std::io::Result<()> {
603        std::io::stdout().flush()
604    }
605}
606
607#[must_use]
608pub fn generate_debug_log_filename(prefix: &str) -> String {
609    let now = chrono::Utc::now();
610    let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
611    let process_id = std::process::id();
612    format!("{prefix}-{timestamp}-{process_id}")
613}
614
615#[instrument(skip(func))] // "func" is not Debug printable
616#[allow(clippy::too_many_arguments)]
617pub fn run<Fut, Summary, Error>(
618    progress: Option<ProgressSettings>,
619    quiet: bool,
620    verbose: u8,
621    print_summary: bool,
622    max_workers: usize,
623    max_blocking_threads: usize,
624    max_open_files: Option<usize>,
625    ops_throttle: usize,
626    iops_throttle: usize,
627    chunk_size: u64,
628    remote_tracing_layer: Option<remote_tracing::RemoteTracingLayer>,
629    debug_log_file: Option<String>,
630    func: impl FnOnce() -> Fut,
631) -> Option<Summary>
632// we return an Option rather than a Result to indicate that callers of this function should NOT print the error
633where
634    Summary: std::fmt::Display,
635    Error: std::fmt::Display + std::fmt::Debug,
636    Fut: std::future::Future<Output = Result<Summary, Error>>,
637{
638    if quiet {
639        assert!(
640            verbose == 0,
641            "Quiet mode and verbose mode are mutually exclusive"
642        );
643    } else {
644        let env_filter =
645            tracing_subscriber::EnvFilter::from_default_env().add_directive(match verbose {
646                0 => "error".parse().unwrap(),
647                1 => "info".parse().unwrap(),
648                2 => "debug".parse().unwrap(),
649                _ => "trace".parse().unwrap(),
650            });
651        let file_layer = if let Some(ref log_file_path) = debug_log_file {
652            let file = std::fs::OpenOptions::new()
653                .create(true)
654                .append(true)
655                .open(log_file_path)
656                .unwrap_or_else(|e| {
657                    panic!("Failed to create debug log file at '{log_file_path}': {e}")
658                });
659            let file_layer = tracing_subscriber::fmt::layer()
660                .with_target(true)
661                .with_line_number(true)
662                .with_thread_ids(true)
663                .with_timer(LocalTimeFormatter)
664                .with_ansi(false)
665                .with_writer(file)
666                .with_filter(
667                    tracing_subscriber::EnvFilter::from_default_env().add_directive(
668                        match verbose {
669                            0 => "error".parse().unwrap(),
670                            1 => "info".parse().unwrap(),
671                            2 => "debug".parse().unwrap(),
672                            _ => "trace".parse().unwrap(),
673                        },
674                    ),
675                );
676            Some(file_layer)
677        } else {
678            None
679        };
680        let fmt_layer = if remote_tracing_layer.is_some() {
681            None
682        } else {
683            let fmt_layer = tracing_subscriber::fmt::layer()
684                .with_target(true)
685                .with_line_number(true)
686                .with_span_events(if verbose > 2 {
687                    FmtSpan::NEW | FmtSpan::CLOSE
688                } else {
689                    FmtSpan::NONE
690                })
691                .with_timer(LocalTimeFormatter)
692                .pretty()
693                .with_writer(ProgWriter::new);
694            Some(fmt_layer)
695        };
696        let is_console_enabled = match std::env::var("RCP_TOKIO_TRACING_CONSOLE_ENABLED") {
697            Ok(val) => matches!(val.to_lowercase().as_str(), "true" | "1"),
698            Err(_) => false,
699        };
700        let console_layer = if is_console_enabled {
701            let console_port: u16 =
702                read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_SERVER_PORT", 6669);
703            let retention_seconds: u64 =
704                read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
705            let console_layer = console_subscriber::ConsoleLayer::builder()
706                .retention(std::time::Duration::from_secs(retention_seconds))
707                .server_addr(([127, 0, 0, 1], console_port))
708                .spawn();
709            Some(console_layer)
710        } else {
711            None
712        };
713        tracing_subscriber::registry()
714            .with(env_filter)
715            .with(file_layer)
716            .with(fmt_layer)
717            .with(remote_tracing_layer)
718            .with(console_layer)
719            .init();
720    }
721    let mut builder = tokio::runtime::Builder::new_multi_thread();
722    builder.enable_all();
723    if max_workers > 0 {
724        builder.worker_threads(max_workers);
725    }
726    if max_blocking_threads > 0 {
727        builder.max_blocking_threads(max_blocking_threads);
728    }
729    if !sysinfo::set_open_files_limit(isize::MAX) {
730        tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
731    }
732    let set_max_open_files = max_open_files.unwrap_or_else(|| {
733        let limit = get_max_open_files().expect(
734            "We failed to query rlimit, if this is expected try specifying --max-open-files",
735        ) as usize;
736        80 * limit / 100 // ~80% of the max open files limit
737    });
738    if set_max_open_files > 0 {
739        tracing::info!("Setting max open files to: {}", set_max_open_files);
740        throttle::set_max_open_files(set_max_open_files);
741    } else {
742        tracing::info!("Not applying any limit to max open files!");
743    }
744    let runtime = builder.build().expect("Failed to create runtime");
745    fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
746        let mut replenish = replenish;
747        let mut interval = std::time::Duration::from_secs(1);
748        while replenish > 100 && interval > std::time::Duration::from_millis(1) {
749            replenish /= 10;
750            interval /= 10;
751        }
752        (replenish, interval)
753    }
754    if ops_throttle > 0 {
755        let (replenish, interval) = get_replenish_interval(ops_throttle);
756        throttle::init_ops_tokens(replenish);
757        runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
758    }
759    if iops_throttle > 0 {
760        if chunk_size == 0 {
761            tracing::error!("Chunk size must be specified when using --iops-throttle");
762            return None;
763        }
764        let (replenish, interval) = get_replenish_interval(iops_throttle);
765        throttle::init_iops_tokens(replenish);
766        runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
767    } else if chunk_size > 0 {
768        tracing::error!(
769            "--chunk-size > 0 but --iops-throttle is 0 -- did you intend to use --iops-throttle?"
770        );
771        return None;
772    }
773    let res = {
774        let _progress = progress.map(|settings| {
775            tracing::debug!("Requesting progress updates {settings:?}");
776            let delay = settings.progress_delay.map(|delay_str| {
777                humantime::parse_duration(&delay_str)
778                    .expect("Couldn't parse duration out of --progress-delay")
779            });
780            ProgressTracker::new(settings.progress_type, delay)
781        });
782        runtime.block_on(func())
783    };
784    match &res {
785        Ok(summary) => {
786            if print_summary || verbose > 0 {
787                println!("{summary}");
788            }
789        }
790        Err(err) => {
791            if !quiet {
792                println!("{err:?}");
793            }
794        }
795    }
796    if print_summary || verbose > 0 {
797        if let Err(err) = print_runtime_stats() {
798            println!("Failed to print runtime stats: {err:?}");
799        }
800    }
801    res.ok()
802}