common/
lib.rs

1//! Common utilities and types for RCP file operation tools
2//!
3//! This crate provides shared functionality used across all RCP tools (`rcp`, `rrm`, `rlink`, `rcmp`).
4//! It includes core operations (copy, remove, link, compare), progress reporting, metadata preservation,
5//! and runtime configuration.
6//!
7//! # Core Modules
8//!
9//! - [`mod@copy`] - File copying operations with metadata preservation and error handling
10//! - [`mod@rm`] - File removal operations
11//! - [`mod@link`] - Hard-linking operations
12//! - [`mod@cmp`] - File comparison operations (metadata-based)
13//! - [`mod@preserve`] - Metadata preservation settings and operations
14//! - [`mod@progress`] - Progress tracking and reporting
15//! - [`mod@filecmp`] - File metadata comparison utilities
16//! - [`mod@remote_tracing`] - Remote tracing support for distributed operations
17//!
18//! # Key Types
19//!
20//! ## `RcpdType`
21//!
22//! Identifies the role of a remote copy daemon:
23//! - `Source` - reads files from source host
24//! - `Destination` - writes files to destination host
25//!
26//! ## `ProgressType`
27//!
28//! Controls progress reporting display:
29//! - `Auto` - automatically choose based on terminal type
30//! - `ProgressBar` - animated progress bar (for interactive terminals)
31//! - `TextUpdates` - periodic text updates (for logging/non-interactive)
32//!
33//! # Progress Reporting
34//!
35//! The crate provides a global progress tracking system accessible via [`get_progress()`].
36//! Progress can be displayed in different formats depending on the execution context.
37//!
38//! Progress output goes to stderr, while logs go to stdout, allowing users to redirect
39//! logs to a file while still viewing interactive progress.
40//!
41//! # Runtime Configuration
42//!
43//! The [`run`] function provides a unified entry point for all RCP tools with support for:
44//! - Progress tracking and reporting
45//! - Logging configuration (quiet/verbose modes)
46//! - Resource limits (max workers, open files, throttling)
47//! - Tokio runtime setup
48//! - Remote tracing integration
49//!
50//! # Examples
51//!
52//! ## Basic Copy Operation
53//!
54//! ```rust,no_run
55//! use std::path::Path;
56//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
57//! let src = Path::new("/source");
58//! let dst = Path::new("/destination");
59//!
60//! let settings = common::copy::Settings {
61//!     dereference: false,
62//!     fail_early: false,
63//!     overwrite: false,
64//!     overwrite_compare: Default::default(),
65//!     chunk_size: 0,
66//! };
67//! let preserve = common::preserve::preserve_default();
68//!
69//! let summary = common::copy(src, dst, &settings, &preserve).await?;
70//! println!("Copied {} files", summary.files_copied);
71//! # Ok(())
72//! # }
73//! ```
74//!
75//! ## Metadata Comparison
76//!
77//! ```rust,no_run
78//! use std::path::Path;
79//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
80//! let src = Path::new("/path1");
81//! let dst = Path::new("/path2");
82//!
83//! let log = common::cmp::LogWriter::new(None).await?;
84//! let settings = common::cmp::Settings {
85//!     fail_early: false,
86//!     exit_early: false,
87//!     compare: Default::default(),
88//! };
89//!
90//! let summary = common::cmp(src, dst, &log, &settings).await?;
91//! println!("Comparison complete: {}", summary);
92//! # Ok(())
93//! # }
94//! ```
95
96#[macro_use]
97extern crate lazy_static;
98
99use crate::cmp::ObjType;
100use anyhow::anyhow;
101use anyhow::Context;
102use std::io::IsTerminal;
103use tracing::instrument;
104use tracing_subscriber::fmt::format::FmtSpan;
105use tracing_subscriber::prelude::*;
106
107pub mod cmp;
108pub mod copy;
109pub mod filegen;
110pub mod link;
111pub mod preserve;
112pub mod remote_tracing;
113pub mod rm;
114
115pub mod filecmp;
116pub mod progress;
117mod testutils;
118
119pub use progress::{RcpdProgressPrinter, SerializableProgress};
120
121// Define RcpdType in common since remote depends on common
122#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
123pub enum RcpdType {
124    Source,
125    Destination,
126}
127
128impl std::fmt::Display for RcpdType {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        match self {
131            RcpdType::Source => write!(f, "source"),
132            RcpdType::Destination => write!(f, "destination"),
133        }
134    }
135}
136
137// Type alias for progress snapshots
138pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
139
140lazy_static! {
141    static ref PROGRESS: progress::Progress = progress::Progress::new();
142    static ref PBAR: indicatif::ProgressBar = indicatif::ProgressBar::new_spinner();
143}
144
145#[must_use]
146pub fn get_progress() -> &'static progress::Progress {
147    &PROGRESS
148}
149
150struct ProgressTracker {
151    lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
152    pbar_thread: Option<std::thread::JoinHandle<()>>,
153}
154
155#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
156pub enum ProgressType {
157    #[default]
158    Auto,
159    ProgressBar,
160    TextUpdates,
161}
162
163pub enum GeneralProgressType {
164    User(ProgressType),
165    Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
166    RemoteMaster(Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>),
167}
168
169impl std::fmt::Debug for GeneralProgressType {
170    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171        match self {
172            GeneralProgressType::User(pt) => write!(f, "User({pt:?})"),
173            GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
174            GeneralProgressType::RemoteMaster(_) => write!(f, "RemoteMaster(<function>)"),
175        }
176    }
177}
178
179// Keep FromStr for backwards compatibility, but ValueEnum will be used by clap
180impl std::str::FromStr for ProgressType {
181    type Err = anyhow::Error;
182
183    fn from_str(s: &str) -> Result<Self, Self::Err> {
184        match s {
185            "auto" | "Auto" => Ok(ProgressType::Auto),
186            "ProgressBar" => Ok(ProgressType::ProgressBar),
187            "TextUpdates" => Ok(ProgressType::TextUpdates),
188            _ => Err(anyhow!("Invalid progress type: {}", s)),
189        }
190    }
191}
192
193#[derive(Debug)]
194pub struct ProgressSettings {
195    pub progress_type: GeneralProgressType,
196    pub progress_delay: Option<String>,
197}
198
199fn progress_bar(
200    lock: &std::sync::Mutex<bool>,
201    cvar: &std::sync::Condvar,
202    delay_opt: &Option<std::time::Duration>,
203) {
204    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
205    PBAR.set_style(
206        indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
207            .unwrap()
208            .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
209    );
210    let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
211    let mut is_done = lock.lock().unwrap();
212    loop {
213        PBAR.set_position(PBAR.position() + 1); // do we need to update?
214        PBAR.set_message(prog_printer.print().unwrap());
215        let result = cvar.wait_timeout(is_done, delay).unwrap();
216        is_done = result.0;
217        if *is_done {
218            break;
219        }
220    }
221    PBAR.finish_and_clear();
222}
223
224fn text_updates(
225    lock: &std::sync::Mutex<bool>,
226    cvar: &std::sync::Condvar,
227    delay_opt: &Option<std::time::Duration>,
228) {
229    let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
230    let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
231    let mut is_done = lock.lock().unwrap();
232    loop {
233        eprintln!("--{}", prog_printer.print().unwrap());
234        let result = cvar.wait_timeout(is_done, delay).unwrap();
235        is_done = result.0;
236        if *is_done {
237            break;
238        }
239    }
240}
241
242fn rcpd_updates(
243    lock: &std::sync::Mutex<bool>,
244    cvar: &std::sync::Condvar,
245    delay_opt: &Option<std::time::Duration>,
246    sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
247) {
248    tracing::debug!("Starting rcpd progress updates");
249    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
250    let mut is_done = lock.lock().unwrap();
251    loop {
252        if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
253            // channel closed, receiver is done
254            tracing::debug!("Progress update channel closed, stopping progress updates");
255            break;
256        }
257        let result = cvar.wait_timeout(is_done, delay).unwrap();
258        is_done = result.0;
259        if *is_done {
260            break;
261        }
262    }
263}
264
265fn remote_master_updates<F>(
266    lock: &std::sync::Mutex<bool>,
267    cvar: &std::sync::Condvar,
268    delay_opt: &Option<std::time::Duration>,
269    get_progress_snapshot: F,
270) where
271    F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
272{
273    PBAR.set_style(
274        indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
275            .unwrap()
276            .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
277    );
278    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
279    let mut printer = RcpdProgressPrinter::new();
280    let mut is_done = lock.lock().unwrap();
281    loop {
282        let progress_map = get_progress_snapshot();
283        let source_progress = &progress_map[RcpdType::Source];
284        let destination_progress = &progress_map[RcpdType::Destination];
285        PBAR.set_position(PBAR.position() + 1); // do we need to update?
286        PBAR.set_message(
287            printer
288                .print(source_progress, destination_progress)
289                .unwrap(),
290        );
291        let result = cvar.wait_timeout(is_done, delay).unwrap();
292        is_done = result.0;
293        if *is_done {
294            break;
295        }
296    }
297    PBAR.finish_and_clear();
298}
299
300impl ProgressTracker {
301    pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
302        let lock_cvar =
303            std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
304        let lock_cvar_clone = lock_cvar.clone();
305        let pbar_thread = std::thread::spawn(move || {
306            let (lock, cvar) = &*lock_cvar_clone;
307            match progress_type {
308                GeneralProgressType::Remote(sender) => {
309                    rcpd_updates(lock, cvar, &delay_opt, sender);
310                }
311                GeneralProgressType::RemoteMaster(get_progress_snapshot) => {
312                    remote_master_updates(lock, cvar, &delay_opt, get_progress_snapshot);
313                }
314                _ => {
315                    let interactive = match progress_type {
316                        GeneralProgressType::User(ProgressType::Auto) => {
317                            std::io::stderr().is_terminal()
318                        }
319                        GeneralProgressType::User(ProgressType::ProgressBar) => true,
320                        GeneralProgressType::User(ProgressType::TextUpdates) => false,
321                        GeneralProgressType::Remote(_) | GeneralProgressType::RemoteMaster(_) => {
322                            unreachable!("Invalid progress type: {progress_type:?}")
323                        }
324                    };
325                    if interactive {
326                        progress_bar(lock, cvar, &delay_opt);
327                    } else {
328                        text_updates(lock, cvar, &delay_opt);
329                    }
330                }
331            }
332        });
333        Self {
334            lock_cvar,
335            pbar_thread: Some(pbar_thread),
336        }
337    }
338}
339
340impl Drop for ProgressTracker {
341    fn drop(&mut self) {
342        let (lock, cvar) = &*self.lock_cvar;
343        let mut is_done = lock.lock().unwrap();
344        *is_done = true;
345        cvar.notify_one();
346        drop(is_done);
347        if let Some(pbar_thread) = self.pbar_thread.take() {
348            pbar_thread.join().unwrap();
349        }
350    }
351}
352
353pub fn parse_metadata_cmp_settings(
354    settings: &str,
355) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
356    let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
357    for setting in settings.split(',') {
358        match setting {
359            "uid" => metadata_cmp_settings.uid = true,
360            "gid" => metadata_cmp_settings.gid = true,
361            "mode" => metadata_cmp_settings.mode = true,
362            "size" => metadata_cmp_settings.size = true,
363            "mtime" => metadata_cmp_settings.mtime = true,
364            "ctime" => metadata_cmp_settings.ctime = true,
365            _ => {
366                return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
367            }
368        }
369    }
370    Ok(metadata_cmp_settings)
371}
372
373fn parse_type_settings(
374    settings: &str,
375) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
376    let mut user_and_time = preserve::UserAndTimeSettings::default();
377    let mut mode_mask = None;
378    for setting in settings.split(',') {
379        match setting {
380            "uid" => user_and_time.uid = true,
381            "gid" => user_and_time.gid = true,
382            "time" => user_and_time.time = true,
383            _ => {
384                if let Ok(mask) = u32::from_str_radix(setting, 8) {
385                    mode_mask = Some(mask);
386                } else {
387                    return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
388                }
389            }
390        }
391    }
392    Ok((user_and_time, mode_mask))
393}
394
395pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
396    let mut preserve_settings = preserve::Settings::default();
397    for type_settings in settings.split(' ') {
398        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
399            let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
400                format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
401            )?;
402            match obj_type {
403                "f" | "file" => {
404                    preserve_settings.file = preserve::FileSettings::default();
405                    preserve_settings.file.user_and_time = user_and_time_settings;
406                    if let Some(mode) = mode_opt {
407                        preserve_settings.file.mode_mask = mode;
408                    }
409                }
410                "d" | "dir" | "directory" => {
411                    preserve_settings.dir = preserve::DirSettings::default();
412                    preserve_settings.dir.user_and_time = user_and_time_settings;
413                    if let Some(mode) = mode_opt {
414                        preserve_settings.dir.mode_mask = mode;
415                    }
416                }
417                "l" | "link" | "symlink" => {
418                    preserve_settings.symlink = preserve::SymlinkSettings::default();
419                    preserve_settings.symlink.user_and_time = user_and_time_settings;
420                }
421                _ => {
422                    return Err(anyhow!("Unknown object type: {}", obj_type));
423                }
424            }
425        } else {
426            return Err(anyhow!("Invalid preserve settings: {}", settings));
427        }
428    }
429    Ok(preserve_settings)
430}
431
432pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
433    let mut cmp_settings = cmp::ObjSettings::default();
434    for type_settings in settings.split(' ') {
435        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
436            let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
437                "parsing preserve settings: {obj_settings}, type: {obj_type}"
438            ))?;
439            let obj_type = match obj_type {
440                "f" | "file" => ObjType::File,
441                "d" | "dir" | "directory" => ObjType::Dir,
442                "l" | "link" | "symlink" => ObjType::Symlink,
443                _ => {
444                    return Err(anyhow!("Unknown obj type: {}", obj_type));
445                }
446            };
447            cmp_settings[obj_type] = obj_cmp_settings;
448        } else {
449            return Err(anyhow!("Invalid preserve settings: {}", settings));
450        }
451    }
452    Ok(cmp_settings)
453}
454
455pub async fn cmp(
456    src: &std::path::Path,
457    dst: &std::path::Path,
458    log: &cmp::LogWriter,
459    settings: &cmp::Settings,
460) -> Result<cmp::Summary, anyhow::Error> {
461    cmp::cmp(&PROGRESS, src, dst, log, settings).await
462}
463
464pub async fn copy(
465    src: &std::path::Path,
466    dst: &std::path::Path,
467    settings: &copy::Settings,
468    preserve: &preserve::Settings,
469) -> Result<copy::Summary, copy::Error> {
470    copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
471}
472
473pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
474    rm::rm(&PROGRESS, path, settings).await
475}
476
477pub async fn link(
478    src: &std::path::Path,
479    dst: &std::path::Path,
480    update: &Option<std::path::PathBuf>,
481    settings: &link::Settings,
482) -> Result<link::Summary, link::Error> {
483    let cwd = std::env::current_dir()
484        .map_err(|err| link::Error::new(anyhow::Error::msg(err), link::Summary::default()))?;
485    link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
486}
487
488fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
489    match std::env::var(name) {
490        Ok(val) => match val.parse() {
491            Ok(val) => val,
492            Err(_) => default,
493        },
494        Err(_) => default,
495    }
496}
497
498#[rustfmt::skip]
499fn print_runtime_stats() -> Result<(), anyhow::Error> {
500    let process = procfs::process::Process::myself()?;
501    let stat = process.stat()?;
502    // The time is in clock ticks, so we need to convert it to seconds
503    let clock_ticks_per_second = procfs::ticks_per_second();
504    let ticks_to_duration = |ticks: u64| {
505        std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
506    };
507    let vmhwm = process.status()?.vmhwm.unwrap_or(0);
508    println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
509    println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
510    println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm));
511    Ok(())
512}
513
514fn get_max_open_files() -> Result<u64, std::io::Error> {
515    let mut rlim = libc::rlimit {
516        rlim_cur: 0,
517        rlim_max: 0,
518    };
519    // Safety: we pass a valid "rlim" pointer and the result is checked
520    let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
521    if result == 0 {
522        Ok(rlim.rlim_cur)
523    } else {
524        Err(std::io::Error::last_os_error())
525    }
526}
527
528struct ProgWriter {}
529
530impl ProgWriter {
531    fn new() -> Self {
532        Self {}
533    }
534}
535
536impl std::io::Write for ProgWriter {
537    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
538        PBAR.suspend(|| std::io::stdout().write(buf))
539    }
540    fn flush(&mut self) -> std::io::Result<()> {
541        std::io::stdout().flush()
542    }
543}
544
545#[must_use]
546pub fn generate_debug_log_filename(prefix: &str) -> String {
547    let now = chrono::Utc::now();
548    let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
549    let process_id = std::process::id();
550    format!("{prefix}-{timestamp}-{process_id}")
551}
552
553#[instrument(skip(func))] // "func" is not Debug printable
554#[allow(clippy::too_many_arguments)]
555pub fn run<Fut, Summary, Error>(
556    progress: Option<ProgressSettings>,
557    quiet: bool,
558    verbose: u8,
559    print_summary: bool,
560    max_workers: usize,
561    max_blocking_threads: usize,
562    max_open_files: Option<usize>,
563    ops_throttle: usize,
564    iops_throttle: usize,
565    chunk_size: u64,
566    remote_tracing_layer: Option<remote_tracing::RemoteTracingLayer>,
567    debug_log_file: Option<String>,
568    func: impl FnOnce() -> Fut,
569) -> Option<Summary>
570// we return an Option rather than a Result to indicate that callers of this function should NOT print the error
571where
572    Summary: std::fmt::Display,
573    Error: std::fmt::Display + std::fmt::Debug,
574    Fut: std::future::Future<Output = Result<Summary, Error>>,
575{
576    if quiet {
577        assert!(
578            verbose == 0,
579            "Quiet mode and verbose mode are mutually exclusive"
580        );
581    } else {
582        let env_filter =
583            tracing_subscriber::EnvFilter::from_default_env().add_directive(match verbose {
584                0 => "error".parse().unwrap(),
585                1 => "info".parse().unwrap(),
586                2 => "debug".parse().unwrap(),
587                _ => "trace".parse().unwrap(),
588            });
589        let file_layer = if let Some(ref log_file_path) = debug_log_file {
590            let file = std::fs::OpenOptions::new()
591                .create(true)
592                .append(true)
593                .open(log_file_path)
594                .unwrap_or_else(|e| {
595                    panic!("Failed to create debug log file at '{log_file_path}': {e}")
596                });
597            let file_layer = tracing_subscriber::fmt::layer()
598                .with_target(true)
599                .with_line_number(true)
600                .with_thread_ids(true)
601                .with_ansi(false)
602                .with_writer(file)
603                .with_filter(
604                    tracing_subscriber::EnvFilter::from_default_env().add_directive(
605                        match verbose {
606                            0 => "error".parse().unwrap(),
607                            1 => "info".parse().unwrap(),
608                            2 => "debug".parse().unwrap(),
609                            _ => "trace".parse().unwrap(),
610                        },
611                    ),
612                );
613            Some(file_layer)
614        } else {
615            None
616        };
617        let fmt_layer = if remote_tracing_layer.is_some() {
618            None
619        } else {
620            let fmt_layer = tracing_subscriber::fmt::layer()
621                .with_target(true)
622                .with_line_number(true)
623                .with_span_events(if verbose > 2 {
624                    FmtSpan::NEW | FmtSpan::CLOSE
625                } else {
626                    FmtSpan::NONE
627                })
628                .pretty()
629                .with_writer(ProgWriter::new);
630            Some(fmt_layer)
631        };
632        let is_console_enabled = match std::env::var("RCP_TOKIO_TRACING_CONSOLE_ENABLED") {
633            Ok(val) => matches!(val.to_lowercase().as_str(), "true" | "1"),
634            Err(_) => false,
635        };
636        let console_layer = if is_console_enabled {
637            let console_port: u16 =
638                read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_SERVER_PORT", 6669);
639            let retention_seconds: u64 =
640                read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
641            let console_layer = console_subscriber::ConsoleLayer::builder()
642                .retention(std::time::Duration::from_secs(retention_seconds))
643                .server_addr(([127, 0, 0, 1], console_port))
644                .spawn();
645            Some(console_layer)
646        } else {
647            None
648        };
649        tracing_subscriber::registry()
650            .with(env_filter)
651            .with(file_layer)
652            .with(fmt_layer)
653            .with(remote_tracing_layer)
654            .with(console_layer)
655            .init();
656    }
657    let mut builder = tokio::runtime::Builder::new_multi_thread();
658    builder.enable_all();
659    if max_workers > 0 {
660        builder.worker_threads(max_workers);
661    }
662    if max_blocking_threads > 0 {
663        builder.max_blocking_threads(max_blocking_threads);
664    }
665    if !sysinfo::set_open_files_limit(isize::MAX) {
666        tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
667    }
668    let set_max_open_files = max_open_files.unwrap_or_else(|| {
669        let limit = get_max_open_files().expect(
670            "We failed to query rlimit, if this is expected try specifying --max-open-files",
671        ) as usize;
672        80 * limit / 100 // ~80% of the max open files limit
673    });
674    if set_max_open_files > 0 {
675        tracing::info!("Setting max open files to: {}", set_max_open_files);
676        throttle::set_max_open_files(set_max_open_files);
677    } else {
678        tracing::info!("Not applying any limit to max open files!");
679    }
680    let runtime = builder.build().expect("Failed to create runtime");
681    fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
682        let mut replenish = replenish;
683        let mut interval = std::time::Duration::from_secs(1);
684        while replenish > 100 && interval > std::time::Duration::from_millis(1) {
685            replenish /= 10;
686            interval /= 10;
687        }
688        (replenish, interval)
689    }
690    if ops_throttle > 0 {
691        let (replenish, interval) = get_replenish_interval(ops_throttle);
692        throttle::init_ops_tokens(replenish);
693        runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
694    }
695    if iops_throttle > 0 {
696        if chunk_size == 0 {
697            tracing::error!("Chunk size must be specified when using --iops-throttle");
698            return None;
699        }
700        let (replenish, interval) = get_replenish_interval(iops_throttle);
701        throttle::init_iops_tokens(replenish);
702        runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
703    } else if chunk_size > 0 {
704        tracing::error!(
705            "--chunk-size > 0 but --iops-throttle is 0 -- did you intend to use --iops-throttle?"
706        );
707        return None;
708    }
709    let res = {
710        let _progress = progress.map(|settings| {
711            tracing::debug!("Requesting progress updates {settings:?}");
712            let delay = settings.progress_delay.map(|delay_str| {
713                humantime::parse_duration(&delay_str)
714                    .expect("Couldn't parse duration out of --progress-delay")
715            });
716            ProgressTracker::new(settings.progress_type, delay)
717        });
718        runtime.block_on(func())
719    };
720    match &res {
721        Ok(summary) => {
722            if print_summary || verbose > 0 {
723                println!("{summary}");
724            }
725        }
726        Err(err) => {
727            if !quiet {
728                println!("{err:?}");
729            }
730        }
731    }
732    if print_summary || verbose > 0 {
733        if let Err(err) = print_runtime_stats() {
734            println!("Failed to print runtime stats: {err:?}");
735        }
736    }
737    res.ok()
738}