1#[macro_use]
97extern crate lazy_static;
98
99use crate::cmp::ObjType;
100use anyhow::anyhow;
101use anyhow::Context;
102use std::io::IsTerminal;
103use tracing::instrument;
104use tracing_subscriber::fmt::format::FmtSpan;
105use tracing_subscriber::prelude::*;
106
107pub mod cmp;
108pub mod config;
109pub mod copy;
110pub mod filegen;
111pub mod link;
112pub mod preserve;
113pub mod remote_tracing;
114pub mod rm;
115
116pub mod filecmp;
117pub mod progress;
118mod testutils;
119
120pub use config::{OutputConfig, RuntimeConfig, ThrottleConfig, TracingConfig};
121pub use progress::{RcpdProgressPrinter, SerializableProgress};
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
125pub enum RcpdType {
126    Source,
127    Destination,
128}
129
130impl std::fmt::Display for RcpdType {
131    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132        match self {
133            RcpdType::Source => write!(f, "source"),
134            RcpdType::Destination => write!(f, "destination"),
135        }
136    }
137}
138
139pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
141
142lazy_static! {
143    static ref PROGRESS: progress::Progress = progress::Progress::new();
144    static ref PBAR: indicatif::ProgressBar = indicatif::ProgressBar::new_spinner();
145}
146
147#[must_use]
148pub fn get_progress() -> &'static progress::Progress {
149    &PROGRESS
150}
151
152struct LocalTimeFormatter;
153
154impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFormatter {
155    fn format_time(
156        &self,
157        writer: &mut tracing_subscriber::fmt::format::Writer<'_>,
158    ) -> std::fmt::Result {
159        let now = chrono::Local::now();
160        writer.write_str(&now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
161    }
162}
163
164struct ProgressTracker {
165    lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
166    pbar_thread: Option<std::thread::JoinHandle<()>>,
167}
168
169#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
170pub enum ProgressType {
171    #[default]
172    #[value(name = "auto", alias = "Auto")]
173    Auto,
174    #[value(name = "ProgressBar", alias = "progress-bar")]
175    ProgressBar,
176    #[value(name = "TextUpdates", alias = "text-updates")]
177    TextUpdates,
178}
179
180pub enum GeneralProgressType {
181    User(ProgressType),
182    Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
183    RemoteMaster {
184        progress_type: ProgressType,
185        get_progress_snapshot:
186            Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>,
187    },
188}
189
190impl std::fmt::Debug for GeneralProgressType {
191    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192        match self {
193            GeneralProgressType::User(pt) => write!(f, "User({pt:?})"),
194            GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
195            GeneralProgressType::RemoteMaster { progress_type, .. } => {
196                write!(
197                    f,
198                    "RemoteMaster(progress_type: {progress_type:?}, <function>)"
199                )
200            }
201        }
202    }
203}
204
205#[derive(Debug)]
206pub struct ProgressSettings {
207    pub progress_type: GeneralProgressType,
208    pub progress_delay: Option<String>,
209}
210
211fn progress_bar(
212    lock: &std::sync::Mutex<bool>,
213    cvar: &std::sync::Condvar,
214    delay_opt: &Option<std::time::Duration>,
215) {
216    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
217    PBAR.set_style(
218        indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
219            .unwrap()
220            .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
221    );
222    let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
223    let mut is_done = lock.lock().unwrap();
224    loop {
225        PBAR.set_position(PBAR.position() + 1); PBAR.set_message(prog_printer.print().unwrap());
227        let result = cvar.wait_timeout(is_done, delay).unwrap();
228        is_done = result.0;
229        if *is_done {
230            break;
231        }
232    }
233    PBAR.finish_and_clear();
234}
235
236fn get_datetime_prefix() -> String {
237    chrono::Local::now()
238        .format("%Y-%m-%dT%H:%M:%S%.3f%:z")
239        .to_string()
240}
241
242fn text_updates(
243    lock: &std::sync::Mutex<bool>,
244    cvar: &std::sync::Condvar,
245    delay_opt: &Option<std::time::Duration>,
246) {
247    let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
248    let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
249    let mut is_done = lock.lock().unwrap();
250    loop {
251        eprintln!("=======================");
252        eprintln!(
253            "{}\n--{}",
254            get_datetime_prefix(),
255            prog_printer.print().unwrap()
256        );
257        let result = cvar.wait_timeout(is_done, delay).unwrap();
258        is_done = result.0;
259        if *is_done {
260            break;
261        }
262    }
263}
264
265fn rcpd_updates(
266    lock: &std::sync::Mutex<bool>,
267    cvar: &std::sync::Condvar,
268    delay_opt: &Option<std::time::Duration>,
269    sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
270) {
271    tracing::debug!("Starting rcpd progress updates");
272    let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
273    let mut is_done = lock.lock().unwrap();
274    loop {
275        if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
276            tracing::debug!("Progress update channel closed, stopping progress updates");
278            break;
279        }
280        let result = cvar.wait_timeout(is_done, delay).unwrap();
281        is_done = result.0;
282        if *is_done {
283            break;
284        }
285    }
286}
287
288fn remote_master_updates<F>(
289    lock: &std::sync::Mutex<bool>,
290    cvar: &std::sync::Condvar,
291    delay_opt: &Option<std::time::Duration>,
292    get_progress_snapshot: F,
293    progress_type: ProgressType,
294) where
295    F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
296{
297    let interactive = match progress_type {
298        ProgressType::Auto => std::io::stderr().is_terminal(),
299        ProgressType::ProgressBar => true,
300        ProgressType::TextUpdates => false,
301    };
302    if interactive {
303        PBAR.set_style(
304            indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
305                .unwrap()
306                .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
307        );
308        let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
309        let mut printer = RcpdProgressPrinter::new();
310        let mut is_done = lock.lock().unwrap();
311        loop {
312            let progress_map = get_progress_snapshot();
313            let source_progress = &progress_map[RcpdType::Source];
314            let destination_progress = &progress_map[RcpdType::Destination];
315            PBAR.set_position(PBAR.position() + 1); PBAR.set_message(
317                printer
318                    .print(source_progress, destination_progress)
319                    .unwrap(),
320            );
321            let result = cvar.wait_timeout(is_done, delay).unwrap();
322            is_done = result.0;
323            if *is_done {
324                break;
325            }
326        }
327        PBAR.finish_and_clear();
328    } else {
329        let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
330        let mut printer = RcpdProgressPrinter::new();
331        let mut is_done = lock.lock().unwrap();
332        loop {
333            let progress_map = get_progress_snapshot();
334            let source_progress = &progress_map[RcpdType::Source];
335            let destination_progress = &progress_map[RcpdType::Destination];
336            eprintln!("=======================");
337            eprintln!(
338                "{}\n--{}",
339                get_datetime_prefix(),
340                printer
341                    .print(source_progress, destination_progress)
342                    .unwrap()
343            );
344            let result = cvar.wait_timeout(is_done, delay).unwrap();
345            is_done = result.0;
346            if *is_done {
347                break;
348            }
349        }
350    }
351}
352
353impl ProgressTracker {
354    pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
355        let lock_cvar =
356            std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
357        let lock_cvar_clone = lock_cvar.clone();
358        let pbar_thread = std::thread::spawn(move || {
359            let (lock, cvar) = &*lock_cvar_clone;
360            match progress_type {
361                GeneralProgressType::Remote(sender) => {
362                    rcpd_updates(lock, cvar, &delay_opt, sender);
363                }
364                GeneralProgressType::RemoteMaster {
365                    progress_type,
366                    get_progress_snapshot,
367                } => {
368                    remote_master_updates(
369                        lock,
370                        cvar,
371                        &delay_opt,
372                        get_progress_snapshot,
373                        progress_type,
374                    );
375                }
376                _ => {
377                    let interactive = match progress_type {
378                        GeneralProgressType::User(ProgressType::Auto) => {
379                            std::io::stderr().is_terminal()
380                        }
381                        GeneralProgressType::User(ProgressType::ProgressBar) => true,
382                        GeneralProgressType::User(ProgressType::TextUpdates) => false,
383                        GeneralProgressType::Remote(_)
384                        | GeneralProgressType::RemoteMaster { .. } => {
385                            unreachable!("Invalid progress type: {progress_type:?}")
386                        }
387                    };
388                    if interactive {
389                        progress_bar(lock, cvar, &delay_opt);
390                    } else {
391                        text_updates(lock, cvar, &delay_opt);
392                    }
393                }
394            }
395        });
396        Self {
397            lock_cvar,
398            pbar_thread: Some(pbar_thread),
399        }
400    }
401}
402
403impl Drop for ProgressTracker {
404    fn drop(&mut self) {
405        let (lock, cvar) = &*self.lock_cvar;
406        let mut is_done = lock.lock().unwrap();
407        *is_done = true;
408        cvar.notify_one();
409        drop(is_done);
410        if let Some(pbar_thread) = self.pbar_thread.take() {
411            pbar_thread.join().unwrap();
412        }
413    }
414}
415
416pub fn parse_metadata_cmp_settings(
417    settings: &str,
418) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
419    let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
420    for setting in settings.split(',') {
421        match setting {
422            "uid" => metadata_cmp_settings.uid = true,
423            "gid" => metadata_cmp_settings.gid = true,
424            "mode" => metadata_cmp_settings.mode = true,
425            "size" => metadata_cmp_settings.size = true,
426            "mtime" => metadata_cmp_settings.mtime = true,
427            "ctime" => metadata_cmp_settings.ctime = true,
428            _ => {
429                return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
430            }
431        }
432    }
433    Ok(metadata_cmp_settings)
434}
435
436fn parse_type_settings(
437    settings: &str,
438) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
439    let mut user_and_time = preserve::UserAndTimeSettings::default();
440    let mut mode_mask = None;
441    for setting in settings.split(',') {
442        match setting {
443            "uid" => user_and_time.uid = true,
444            "gid" => user_and_time.gid = true,
445            "time" => user_and_time.time = true,
446            _ => {
447                if let Ok(mask) = u32::from_str_radix(setting, 8) {
448                    mode_mask = Some(mask);
449                } else {
450                    return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
451                }
452            }
453        }
454    }
455    Ok((user_and_time, mode_mask))
456}
457
458pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
459    let mut preserve_settings = preserve::Settings::default();
460    for type_settings in settings.split(' ') {
461        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
462            let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
463                format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
464            )?;
465            match obj_type {
466                "f" | "file" => {
467                    preserve_settings.file = preserve::FileSettings::default();
468                    preserve_settings.file.user_and_time = user_and_time_settings;
469                    if let Some(mode) = mode_opt {
470                        preserve_settings.file.mode_mask = mode;
471                    }
472                }
473                "d" | "dir" | "directory" => {
474                    preserve_settings.dir = preserve::DirSettings::default();
475                    preserve_settings.dir.user_and_time = user_and_time_settings;
476                    if let Some(mode) = mode_opt {
477                        preserve_settings.dir.mode_mask = mode;
478                    }
479                }
480                "l" | "link" | "symlink" => {
481                    preserve_settings.symlink = preserve::SymlinkSettings::default();
482                    preserve_settings.symlink.user_and_time = user_and_time_settings;
483                }
484                _ => {
485                    return Err(anyhow!("Unknown object type: {}", obj_type));
486                }
487            }
488        } else {
489            return Err(anyhow!("Invalid preserve settings: {}", settings));
490        }
491    }
492    Ok(preserve_settings)
493}
494
495pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
496    let mut cmp_settings = cmp::ObjSettings::default();
497    for type_settings in settings.split(' ') {
498        if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
499            let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
500                "parsing preserve settings: {obj_settings}, type: {obj_type}"
501            ))?;
502            let obj_type = match obj_type {
503                "f" | "file" => ObjType::File,
504                "d" | "dir" | "directory" => ObjType::Dir,
505                "l" | "link" | "symlink" => ObjType::Symlink,
506                "o" | "other" => ObjType::Other,
507                _ => {
508                    return Err(anyhow!("Unknown obj type: {}", obj_type));
509                }
510            };
511            cmp_settings[obj_type] = obj_cmp_settings;
512        } else {
513            return Err(anyhow!("Invalid preserve settings: {}", settings));
514        }
515    }
516    Ok(cmp_settings)
517}
518
519pub async fn cmp(
520    src: &std::path::Path,
521    dst: &std::path::Path,
522    log: &cmp::LogWriter,
523    settings: &cmp::Settings,
524) -> Result<cmp::Summary, anyhow::Error> {
525    cmp::cmp(&PROGRESS, src, dst, log, settings).await
526}
527
528pub async fn copy(
529    src: &std::path::Path,
530    dst: &std::path::Path,
531    settings: ©::Settings,
532    preserve: &preserve::Settings,
533) -> Result<copy::Summary, copy::Error> {
534    copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
535}
536
537pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
538    rm::rm(&PROGRESS, path, settings).await
539}
540
541pub async fn link(
542    src: &std::path::Path,
543    dst: &std::path::Path,
544    update: &Option<std::path::PathBuf>,
545    settings: &link::Settings,
546) -> Result<link::Summary, link::Error> {
547    let cwd = std::env::current_dir()
548        .with_context(|| "failed to get current working directory")
549        .map_err(|err| link::Error::new(err, link::Summary::default()))?;
550    link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
551}
552
553fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
554    match std::env::var(name) {
555        Ok(val) => match val.parse() {
556            Ok(val) => val,
557            Err(_) => default,
558        },
559        Err(_) => default,
560    }
561}
562
563#[rustfmt::skip]
564fn print_runtime_stats() -> Result<(), anyhow::Error> {
565    let process = procfs::process::Process::myself()?;
566    let stat = process.stat()?;
567    let clock_ticks_per_second = procfs::ticks_per_second();
569    let ticks_to_duration = |ticks: u64| {
570        std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
571    };
572    let vmhwm = process.status()?.vmhwm.unwrap_or(0);
573    println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
574    println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
575    println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm));
576    Ok(())
577}
578
579fn get_max_open_files() -> Result<u64, std::io::Error> {
580    let mut rlim = libc::rlimit {
581        rlim_cur: 0,
582        rlim_max: 0,
583    };
584    let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
586    if result == 0 {
587        Ok(rlim.rlim_cur)
588    } else {
589        Err(std::io::Error::last_os_error())
590    }
591}
592
593struct ProgWriter {}
594
595impl ProgWriter {
596    fn new() -> Self {
597        Self {}
598    }
599}
600
601impl std::io::Write for ProgWriter {
602    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
603        PBAR.suspend(|| std::io::stdout().write(buf))
604    }
605    fn flush(&mut self) -> std::io::Result<()> {
606        std::io::stdout().flush()
607    }
608}
609
610#[must_use]
611pub fn generate_debug_log_filename(prefix: &str) -> String {
612    let now = chrono::Utc::now();
613    let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
614    let process_id = std::process::id();
615    format!("{prefix}-{timestamp}-{process_id}")
616}
617
618#[instrument(skip(func))] pub fn run<Fut, Summary, Error>(
620    progress: Option<ProgressSettings>,
621    output: OutputConfig,
622    runtime: RuntimeConfig,
623    throttle: ThrottleConfig,
624    tracing_config: TracingConfig,
625    func: impl FnOnce() -> Fut,
626) -> Option<Summary>
627where
629    Summary: std::fmt::Display,
630    Error: std::fmt::Display + std::fmt::Debug,
631    Fut: std::future::Future<Output = Result<Summary, Error>>,
632{
633    if let Err(e) = throttle.validate() {
635        eprintln!("Configuration error: {e}");
636        return None;
637    }
638    let OutputConfig {
640        quiet,
641        verbose,
642        print_summary,
643    } = output;
644    let RuntimeConfig {
645        max_workers,
646        max_blocking_threads,
647    } = runtime;
648    let ThrottleConfig {
649        max_open_files,
650        ops_throttle,
651        iops_throttle,
652        chunk_size: _,
653    } = throttle;
654    let TracingConfig {
655        remote_layer: remote_tracing_layer,
656        debug_log_file,
657    } = tracing_config;
658    if quiet {
659        assert!(
660            verbose == 0,
661            "Quiet mode and verbose mode are mutually exclusive"
662        );
663    } else {
664        let env_filter =
665            tracing_subscriber::EnvFilter::from_default_env().add_directive(match verbose {
666                0 => "error".parse().unwrap(),
667                1 => "info".parse().unwrap(),
668                2 => "debug".parse().unwrap(),
669                _ => "trace".parse().unwrap(),
670            });
671        let file_layer = if let Some(ref log_file_path) = debug_log_file {
672            let file = std::fs::OpenOptions::new()
673                .create(true)
674                .append(true)
675                .open(log_file_path)
676                .unwrap_or_else(|e| {
677                    panic!("Failed to create debug log file at '{log_file_path}': {e}")
678                });
679            let file_layer = tracing_subscriber::fmt::layer()
680                .with_target(true)
681                .with_line_number(true)
682                .with_thread_ids(true)
683                .with_timer(LocalTimeFormatter)
684                .with_ansi(false)
685                .with_writer(file)
686                .with_filter(
687                    tracing_subscriber::EnvFilter::from_default_env().add_directive(
688                        match verbose {
689                            0 => "error".parse().unwrap(),
690                            1 => "info".parse().unwrap(),
691                            2 => "debug".parse().unwrap(),
692                            _ => "trace".parse().unwrap(),
693                        },
694                    ),
695                );
696            Some(file_layer)
697        } else {
698            None
699        };
700        let fmt_layer = if remote_tracing_layer.is_some() {
701            None
702        } else {
703            let fmt_layer = tracing_subscriber::fmt::layer()
704                .with_target(true)
705                .with_line_number(true)
706                .with_span_events(if verbose > 2 {
707                    FmtSpan::NEW | FmtSpan::CLOSE
708                } else {
709                    FmtSpan::NONE
710                })
711                .with_timer(LocalTimeFormatter)
712                .pretty()
713                .with_writer(ProgWriter::new);
714            Some(fmt_layer)
715        };
716        let is_console_enabled = match std::env::var("RCP_TOKIO_TRACING_CONSOLE_ENABLED") {
717            Ok(val) => matches!(val.to_lowercase().as_str(), "true" | "1"),
718            Err(_) => false,
719        };
720        let console_layer = if is_console_enabled {
721            let console_port: u16 =
722                read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_SERVER_PORT", 6669);
723            let retention_seconds: u64 =
724                read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
725            let console_layer = console_subscriber::ConsoleLayer::builder()
726                .retention(std::time::Duration::from_secs(retention_seconds))
727                .server_addr(([127, 0, 0, 1], console_port))
728                .spawn();
729            Some(console_layer)
730        } else {
731            None
732        };
733        tracing_subscriber::registry()
734            .with(env_filter)
735            .with(file_layer)
736            .with(fmt_layer)
737            .with(remote_tracing_layer)
738            .with(console_layer)
739            .init();
740    }
741    let mut builder = tokio::runtime::Builder::new_multi_thread();
742    builder.enable_all();
743    if max_workers > 0 {
744        builder.worker_threads(max_workers);
745    }
746    if max_blocking_threads > 0 {
747        builder.max_blocking_threads(max_blocking_threads);
748    }
749    if !sysinfo::set_open_files_limit(isize::MAX) {
750        tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
751    }
752    let set_max_open_files = max_open_files.unwrap_or_else(|| {
753        let limit = get_max_open_files().expect(
754            "We failed to query rlimit, if this is expected try specifying --max-open-files",
755        ) as usize;
756        80 * limit / 100 });
758    if set_max_open_files > 0 {
759        tracing::info!("Setting max open files to: {}", set_max_open_files);
760        throttle::set_max_open_files(set_max_open_files);
761    } else {
762        tracing::info!("Not applying any limit to max open files!");
763    }
764    let runtime = builder.build().expect("Failed to create runtime");
765    fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
766        let mut replenish = replenish;
767        let mut interval = std::time::Duration::from_secs(1);
768        while replenish > 100 && interval > std::time::Duration::from_millis(1) {
769            replenish /= 10;
770            interval /= 10;
771        }
772        (replenish, interval)
773    }
774    if ops_throttle > 0 {
775        let (replenish, interval) = get_replenish_interval(ops_throttle);
776        throttle::init_ops_tokens(replenish);
777        runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
778    }
779    if iops_throttle > 0 {
780        let (replenish, interval) = get_replenish_interval(iops_throttle);
781        throttle::init_iops_tokens(replenish);
782        runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
783    }
784    let res = {
785        let _progress = progress.map(|settings| {
786            tracing::debug!("Requesting progress updates {settings:?}");
787            let delay = settings.progress_delay.map(|delay_str| {
788                humantime::parse_duration(&delay_str)
789                    .expect("Couldn't parse duration out of --progress-delay")
790            });
791            ProgressTracker::new(settings.progress_type, delay)
792        });
793        runtime.block_on(func())
794    };
795    match &res {
796        Ok(summary) => {
797            if print_summary || verbose > 0 {
798                println!("{summary}");
799            }
800        }
801        Err(err) => {
802            if !quiet {
803                println!("{err:?}");
804            }
805        }
806    }
807    if print_summary || verbose > 0 {
808        if let Err(err) = print_runtime_stats() {
809            println!("Failed to print runtime stats: {err:?}");
810        }
811    }
812    res.ok()
813}