1#[macro_use]
96extern crate lazy_static;
97
98use crate::cmp::ObjType;
99use anyhow::anyhow;
100use anyhow::Context;
101use std::io::IsTerminal;
102use tracing::instrument;
103use tracing_subscriber::fmt::format::FmtSpan;
104use tracing_subscriber::prelude::*;
105
106pub mod cmp;
107pub mod config;
108pub mod copy;
109pub mod filegen;
110pub mod link;
111pub mod preserve;
112pub mod remote_tracing;
113pub mod rm;
114pub mod version;
115
116pub mod filecmp;
117pub mod progress;
118mod testutils;
119
120pub use config::{OutputConfig, RuntimeConfig, ThrottleConfig, TracingConfig};
121pub use progress::{RcpdProgressPrinter, SerializableProgress};
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
125pub enum RcpdType {
126 Source,
127 Destination,
128}
129
130impl std::fmt::Display for RcpdType {
131 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
132 match self {
133 RcpdType::Source => write!(f, "source"),
134 RcpdType::Destination => write!(f, "destination"),
135 }
136 }
137}
138
139pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
141
142#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
144pub struct RuntimeStats {
145 pub cpu_time_user_ms: u64,
147 pub cpu_time_kernel_ms: u64,
149 pub peak_rss_bytes: u64,
151}
152
153#[derive(Debug, Default)]
155pub struct RemoteRuntimeStats {
156 pub source_host: String,
157 pub source_stats: RuntimeStats,
158 pub dest_host: String,
159 pub dest_stats: RuntimeStats,
160}
161
162#[must_use]
165pub fn is_localhost(host: &str) -> bool {
166 if host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]" {
167 return true;
168 }
169 let mut buf = [0u8; 256];
171 let result = unsafe { libc::gethostname(buf.as_mut_ptr() as *mut libc::c_char, buf.len()) };
173 if result == 0 {
174 if let Ok(hostname_cstr) = std::ffi::CStr::from_bytes_until_nul(&buf) {
175 if let Ok(hostname) = hostname_cstr.to_str() {
176 if host == hostname {
177 return true;
178 }
179 }
180 }
181 }
182 false
183}
184
185lazy_static! {
186 static ref PROGRESS: progress::Progress = progress::Progress::new();
187 static ref PBAR: indicatif::ProgressBar = indicatif::ProgressBar::new_spinner();
188 static ref REMOTE_RUNTIME_STATS: std::sync::Mutex<Option<RemoteRuntimeStats>> =
189 std::sync::Mutex::new(None);
190}
191
192#[must_use]
193pub fn get_progress() -> &'static progress::Progress {
194 &PROGRESS
195}
196
197pub fn set_remote_runtime_stats(stats: RemoteRuntimeStats) {
199 *REMOTE_RUNTIME_STATS.lock().unwrap() = Some(stats);
200}
201
202struct LocalTimeFormatter;
203
204impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFormatter {
205 fn format_time(
206 &self,
207 writer: &mut tracing_subscriber::fmt::format::Writer<'_>,
208 ) -> std::fmt::Result {
209 let now = chrono::Local::now();
210 writer.write_str(&now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
211 }
212}
213
214struct ProgressTracker {
215 lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
216 pbar_thread: Option<std::thread::JoinHandle<()>>,
217}
218
219#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
220pub enum ProgressType {
221 #[default]
222 #[value(name = "auto", alias = "Auto")]
223 Auto,
224 #[value(name = "ProgressBar", alias = "progress-bar")]
225 ProgressBar,
226 #[value(name = "TextUpdates", alias = "text-updates")]
227 TextUpdates,
228}
229
230pub enum GeneralProgressType {
231 User(ProgressType),
232 Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
233 RemoteMaster {
234 progress_type: ProgressType,
235 get_progress_snapshot:
236 Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>,
237 },
238}
239
240impl std::fmt::Debug for GeneralProgressType {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 match self {
243 GeneralProgressType::User(pt) => write!(f, "User({pt:?})"),
244 GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
245 GeneralProgressType::RemoteMaster { progress_type, .. } => {
246 write!(
247 f,
248 "RemoteMaster(progress_type: {progress_type:?}, <function>)"
249 )
250 }
251 }
252 }
253}
254
255#[derive(Debug)]
256pub struct ProgressSettings {
257 pub progress_type: GeneralProgressType,
258 pub progress_delay: Option<String>,
259}
260
261fn progress_bar(
262 lock: &std::sync::Mutex<bool>,
263 cvar: &std::sync::Condvar,
264 delay_opt: &Option<std::time::Duration>,
265) {
266 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
267 PBAR.set_style(
268 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
269 .unwrap()
270 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
271 );
272 let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
273 let mut is_done = lock.lock().unwrap();
274 loop {
275 PBAR.set_position(PBAR.position() + 1); PBAR.set_message(prog_printer.print().unwrap());
277 let result = cvar.wait_timeout(is_done, delay).unwrap();
278 is_done = result.0;
279 if *is_done {
280 break;
281 }
282 }
283 PBAR.finish_and_clear();
284}
285
286fn get_datetime_prefix() -> String {
287 chrono::Local::now()
288 .format("%Y-%m-%dT%H:%M:%S%.3f%:z")
289 .to_string()
290}
291
292fn text_updates(
293 lock: &std::sync::Mutex<bool>,
294 cvar: &std::sync::Condvar,
295 delay_opt: &Option<std::time::Duration>,
296) {
297 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
298 let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
299 let mut is_done = lock.lock().unwrap();
300 loop {
301 eprintln!("=======================");
302 eprintln!(
303 "{}\n--{}",
304 get_datetime_prefix(),
305 prog_printer.print().unwrap()
306 );
307 let result = cvar.wait_timeout(is_done, delay).unwrap();
308 is_done = result.0;
309 if *is_done {
310 break;
311 }
312 }
313}
314
315fn rcpd_updates(
316 lock: &std::sync::Mutex<bool>,
317 cvar: &std::sync::Condvar,
318 delay_opt: &Option<std::time::Duration>,
319 sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
320) {
321 tracing::debug!("Starting rcpd progress updates");
322 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
323 let mut is_done = lock.lock().unwrap();
324 loop {
325 if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
326 tracing::debug!("Progress update channel closed, stopping progress updates");
328 break;
329 }
330 let result = cvar.wait_timeout(is_done, delay).unwrap();
331 is_done = result.0;
332 if *is_done {
333 break;
334 }
335 }
336}
337
338fn remote_master_updates<F>(
339 lock: &std::sync::Mutex<bool>,
340 cvar: &std::sync::Condvar,
341 delay_opt: &Option<std::time::Duration>,
342 get_progress_snapshot: F,
343 progress_type: ProgressType,
344) where
345 F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
346{
347 let interactive = match progress_type {
348 ProgressType::Auto => std::io::stderr().is_terminal(),
349 ProgressType::ProgressBar => true,
350 ProgressType::TextUpdates => false,
351 };
352 if interactive {
353 PBAR.set_style(
354 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
355 .unwrap()
356 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
357 );
358 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
359 let mut printer = RcpdProgressPrinter::new();
360 let mut is_done = lock.lock().unwrap();
361 loop {
362 let progress_map = get_progress_snapshot();
363 let source_progress = &progress_map[RcpdType::Source];
364 let destination_progress = &progress_map[RcpdType::Destination];
365 PBAR.set_position(PBAR.position() + 1); PBAR.set_message(
367 printer
368 .print(source_progress, destination_progress)
369 .unwrap(),
370 );
371 let result = cvar.wait_timeout(is_done, delay).unwrap();
372 is_done = result.0;
373 if *is_done {
374 break;
375 }
376 }
377 PBAR.finish_and_clear();
378 } else {
379 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
380 let mut printer = RcpdProgressPrinter::new();
381 let mut is_done = lock.lock().unwrap();
382 loop {
383 let progress_map = get_progress_snapshot();
384 let source_progress = &progress_map[RcpdType::Source];
385 let destination_progress = &progress_map[RcpdType::Destination];
386 eprintln!("=======================");
387 eprintln!(
388 "{}\n--{}",
389 get_datetime_prefix(),
390 printer
391 .print(source_progress, destination_progress)
392 .unwrap()
393 );
394 let result = cvar.wait_timeout(is_done, delay).unwrap();
395 is_done = result.0;
396 if *is_done {
397 break;
398 }
399 }
400 }
401}
402
403impl ProgressTracker {
404 pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
405 let lock_cvar =
406 std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
407 let lock_cvar_clone = lock_cvar.clone();
408 let pbar_thread = std::thread::spawn(move || {
409 let (lock, cvar) = &*lock_cvar_clone;
410 match progress_type {
411 GeneralProgressType::Remote(sender) => {
412 rcpd_updates(lock, cvar, &delay_opt, sender);
413 }
414 GeneralProgressType::RemoteMaster {
415 progress_type,
416 get_progress_snapshot,
417 } => {
418 remote_master_updates(
419 lock,
420 cvar,
421 &delay_opt,
422 get_progress_snapshot,
423 progress_type,
424 );
425 }
426 _ => {
427 let interactive = match progress_type {
428 GeneralProgressType::User(ProgressType::Auto) => {
429 std::io::stderr().is_terminal()
430 }
431 GeneralProgressType::User(ProgressType::ProgressBar) => true,
432 GeneralProgressType::User(ProgressType::TextUpdates) => false,
433 GeneralProgressType::Remote(_)
434 | GeneralProgressType::RemoteMaster { .. } => {
435 unreachable!("Invalid progress type: {progress_type:?}")
436 }
437 };
438 if interactive {
439 progress_bar(lock, cvar, &delay_opt);
440 } else {
441 text_updates(lock, cvar, &delay_opt);
442 }
443 }
444 }
445 });
446 Self {
447 lock_cvar,
448 pbar_thread: Some(pbar_thread),
449 }
450 }
451}
452
453impl Drop for ProgressTracker {
454 fn drop(&mut self) {
455 let (lock, cvar) = &*self.lock_cvar;
456 let mut is_done = lock.lock().unwrap();
457 *is_done = true;
458 cvar.notify_one();
459 drop(is_done);
460 if let Some(pbar_thread) = self.pbar_thread.take() {
461 pbar_thread.join().unwrap();
462 }
463 }
464}
465
466pub fn parse_metadata_cmp_settings(
467 settings: &str,
468) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
469 let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
470 for setting in settings.split(',') {
471 match setting {
472 "uid" => metadata_cmp_settings.uid = true,
473 "gid" => metadata_cmp_settings.gid = true,
474 "mode" => metadata_cmp_settings.mode = true,
475 "size" => metadata_cmp_settings.size = true,
476 "mtime" => metadata_cmp_settings.mtime = true,
477 "ctime" => metadata_cmp_settings.ctime = true,
478 _ => {
479 return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
480 }
481 }
482 }
483 Ok(metadata_cmp_settings)
484}
485
486fn parse_type_settings(
487 settings: &str,
488) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
489 let mut user_and_time = preserve::UserAndTimeSettings::default();
490 let mut mode_mask = None;
491 for setting in settings.split(',') {
492 match setting {
493 "uid" => user_and_time.uid = true,
494 "gid" => user_and_time.gid = true,
495 "time" => user_and_time.time = true,
496 _ => {
497 if let Ok(mask) = u32::from_str_radix(setting, 8) {
498 mode_mask = Some(mask);
499 } else {
500 return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
501 }
502 }
503 }
504 }
505 Ok((user_and_time, mode_mask))
506}
507
508pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
509 let mut preserve_settings = preserve::Settings::default();
510 for type_settings in settings.split(' ') {
511 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
512 let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
513 format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
514 )?;
515 match obj_type {
516 "f" | "file" => {
517 preserve_settings.file = preserve::FileSettings::default();
518 preserve_settings.file.user_and_time = user_and_time_settings;
519 if let Some(mode) = mode_opt {
520 preserve_settings.file.mode_mask = mode;
521 }
522 }
523 "d" | "dir" | "directory" => {
524 preserve_settings.dir = preserve::DirSettings::default();
525 preserve_settings.dir.user_and_time = user_and_time_settings;
526 if let Some(mode) = mode_opt {
527 preserve_settings.dir.mode_mask = mode;
528 }
529 }
530 "l" | "link" | "symlink" => {
531 preserve_settings.symlink = preserve::SymlinkSettings::default();
532 preserve_settings.symlink.user_and_time = user_and_time_settings;
533 }
534 _ => {
535 return Err(anyhow!("Unknown object type: {}", obj_type));
536 }
537 }
538 } else {
539 return Err(anyhow!("Invalid preserve settings: {}", settings));
540 }
541 }
542 Ok(preserve_settings)
543}
544
545pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
546 let mut cmp_settings = cmp::ObjSettings::default();
547 for type_settings in settings.split(' ') {
548 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
549 let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
550 "parsing preserve settings: {obj_settings}, type: {obj_type}"
551 ))?;
552 let obj_type = match obj_type {
553 "f" | "file" => ObjType::File,
554 "d" | "dir" | "directory" => ObjType::Dir,
555 "l" | "link" | "symlink" => ObjType::Symlink,
556 "o" | "other" => ObjType::Other,
557 _ => {
558 return Err(anyhow!("Unknown obj type: {}", obj_type));
559 }
560 };
561 cmp_settings[obj_type] = obj_cmp_settings;
562 } else {
563 return Err(anyhow!("Invalid preserve settings: {}", settings));
564 }
565 }
566 Ok(cmp_settings)
567}
568
569pub async fn cmp(
570 src: &std::path::Path,
571 dst: &std::path::Path,
572 log: &cmp::LogWriter,
573 settings: &cmp::Settings,
574) -> Result<cmp::Summary, anyhow::Error> {
575 cmp::cmp(&PROGRESS, src, dst, log, settings).await
576}
577
578pub async fn copy(
579 src: &std::path::Path,
580 dst: &std::path::Path,
581 settings: ©::Settings,
582 preserve: &preserve::Settings,
583) -> Result<copy::Summary, copy::Error> {
584 copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
585}
586
587pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
588 rm::rm(&PROGRESS, path, settings).await
589}
590
591pub async fn link(
592 src: &std::path::Path,
593 dst: &std::path::Path,
594 update: &Option<std::path::PathBuf>,
595 settings: &link::Settings,
596) -> Result<link::Summary, link::Error> {
597 let cwd = std::env::current_dir()
598 .with_context(|| "failed to get current working directory")
599 .map_err(|err| link::Error::new(err, link::Summary::default()))?;
600 link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
601}
602
603fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
604 match std::env::var(name) {
605 Ok(val) => match val.parse() {
606 Ok(val) => val,
607 Err(_) => default,
608 },
609 Err(_) => default,
610 }
611}
612
613#[must_use]
615pub fn collect_runtime_stats() -> RuntimeStats {
616 collect_runtime_stats_inner(procfs::process::Process::myself().ok())
617}
618
619fn collect_runtime_stats_inner(process: Option<procfs::process::Process>) -> RuntimeStats {
620 let Some(process) = process else {
621 return RuntimeStats::default();
622 };
623 collect_runtime_stats_for_process(&process).unwrap_or_default()
624}
625
626fn collect_runtime_stats_for_process(
627 process: &procfs::process::Process,
628) -> anyhow::Result<RuntimeStats> {
629 let stat = process.stat()?;
630 let clock_ticks = procfs::ticks_per_second() as f64;
631 let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
633 Ok(RuntimeStats {
634 cpu_time_user_ms: ((stat.utime as f64 / clock_ticks) * 1000.0) as u64,
635 cpu_time_kernel_ms: ((stat.stime as f64 / clock_ticks) * 1000.0) as u64,
636 peak_rss_bytes: vmhwm_kb * 1024,
637 })
638}
639
640fn print_runtime_stats_for_role(prefix: &str, stats: &RuntimeStats) {
641 let cpu_total =
642 std::time::Duration::from_millis(stats.cpu_time_user_ms + stats.cpu_time_kernel_ms);
643 let cpu_kernel = std::time::Duration::from_millis(stats.cpu_time_kernel_ms);
644 let cpu_user = std::time::Duration::from_millis(stats.cpu_time_user_ms);
645 println!(
646 "{prefix}cpu time : {:.2?} | k: {:.2?} | u: {:.2?}",
647 cpu_total, cpu_kernel, cpu_user
648 );
649 println!(
650 "{prefix}peak RSS : {}",
651 bytesize::ByteSize(stats.peak_rss_bytes)
652 );
653}
654
655#[rustfmt::skip]
656fn print_runtime_stats() -> Result<(), anyhow::Error> {
657 let remote_stats = REMOTE_RUNTIME_STATS.lock().unwrap().take();
659 if let Some(remote) = remote_stats {
660 println!("walltime : {:.2?}", &PROGRESS.get_duration());
662 println!();
663 let source_is_local = is_localhost(&remote.source_host);
664 let dest_is_local = is_localhost(&remote.dest_host);
665 let master_stats = collect_runtime_stats();
667 if !source_is_local {
669 println!("SOURCE ({}):", remote.source_host);
670 print_runtime_stats_for_role(" ", &remote.source_stats);
671 println!();
672 }
673 if !dest_is_local {
674 println!("DESTINATION ({}):", remote.dest_host);
675 print_runtime_stats_for_role(" ", &remote.dest_stats);
676 println!();
677 }
678 match (source_is_local, dest_is_local) {
680 (true, true) => {
681 println!("MASTER + SOURCE + DESTINATION (localhost):");
682 print_runtime_stats_for_role(" master ", &master_stats);
683 print_runtime_stats_for_role(" source ", &remote.source_stats);
684 print_runtime_stats_for_role(" dest ", &remote.dest_stats);
685 }
686 (true, false) => {
687 println!("MASTER + SOURCE (localhost):");
688 print_runtime_stats_for_role(" master ", &master_stats);
689 print_runtime_stats_for_role(" source ", &remote.source_stats);
690 }
691 (false, true) => {
692 println!("MASTER + DESTINATION (localhost):");
693 print_runtime_stats_for_role(" master ", &master_stats);
694 print_runtime_stats_for_role(" dest ", &remote.dest_stats);
695 }
696 (false, false) => {
697 println!("MASTER (localhost):");
698 print_runtime_stats_for_role(" ", &master_stats);
699 }
700 }
701 return Ok(());
702 }
703 let process = procfs::process::Process::myself()?;
705 let stat = process.stat()?;
706 let clock_ticks_per_second = procfs::ticks_per_second();
708 let ticks_to_duration = |ticks: u64| {
709 std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
710 };
711 let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
713 println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
714 println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
715 println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm_kb * 1024));
716 Ok(())
717}
718
719fn get_max_open_files() -> Result<u64, std::io::Error> {
720 let mut rlim = libc::rlimit {
721 rlim_cur: 0,
722 rlim_max: 0,
723 };
724 let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
726 if result == 0 {
727 Ok(rlim.rlim_cur)
728 } else {
729 Err(std::io::Error::last_os_error())
730 }
731}
732
733struct ProgWriter {}
734
735impl ProgWriter {
736 fn new() -> Self {
737 Self {}
738 }
739}
740
741impl std::io::Write for ProgWriter {
742 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
743 PBAR.suspend(|| std::io::stdout().write(buf))
744 }
745 fn flush(&mut self) -> std::io::Result<()> {
746 std::io::stdout().flush()
747 }
748}
749
750fn get_hostname() -> String {
751 nix::unistd::gethostname()
752 .ok()
753 .and_then(|os_str| os_str.into_string().ok())
754 .unwrap_or_else(|| "unknown".to_string())
755}
756
757#[must_use]
758pub fn generate_debug_log_filename(prefix: &str) -> String {
759 let now = chrono::Utc::now();
760 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
761 let process_id = std::process::id();
762 format!("{prefix}-{timestamp}-{process_id}")
763}
764
765#[must_use]
769pub fn generate_trace_filename(prefix: &str, identifier: &str, extension: &str) -> String {
770 let hostname = get_hostname();
771 let pid = std::process::id();
772 let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S");
773 format!("{prefix}-{identifier}-{hostname}-{pid}-{timestamp}.{extension}")
774}
775
776#[instrument(skip(func))] pub fn run<Fut, Summary, Error>(
778 progress: Option<ProgressSettings>,
779 output: OutputConfig,
780 runtime: RuntimeConfig,
781 throttle: ThrottleConfig,
782 tracing_config: TracingConfig,
783 func: impl FnOnce() -> Fut,
784) -> Option<Summary>
785where
787 Summary: std::fmt::Display,
788 Error: std::fmt::Display + std::fmt::Debug,
789 Fut: std::future::Future<Output = Result<Summary, Error>>,
790{
791 let _ = get_progress();
795 if let Err(e) = throttle.validate() {
797 eprintln!("Configuration error: {e}");
798 return None;
799 }
800 let OutputConfig {
802 quiet,
803 verbose,
804 print_summary,
805 } = output;
806 let RuntimeConfig {
807 max_workers,
808 max_blocking_threads,
809 } = runtime;
810 let ThrottleConfig {
811 max_open_files,
812 ops_throttle,
813 iops_throttle,
814 chunk_size: _,
815 } = throttle;
816 let TracingConfig {
817 remote_layer: remote_tracing_layer,
818 debug_log_file,
819 chrome_trace_prefix,
820 flamegraph_prefix,
821 trace_identifier,
822 profile_level,
823 tokio_console,
824 tokio_console_port,
825 } = tracing_config;
826 let mut _chrome_guard: Option<tracing_chrome::FlushGuard> = None;
828 let mut _flame_guard: Option<tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>> =
829 None;
830 if quiet {
831 assert!(
832 verbose == 0,
833 "Quiet mode and verbose mode are mutually exclusive"
834 );
835 } else {
836 let make_env_filter = || {
838 let level_directive = match verbose {
839 0 => "error".parse().unwrap(),
840 1 => "info".parse().unwrap(),
841 2 => "debug".parse().unwrap(),
842 _ => "trace".parse().unwrap(),
843 };
844 tracing_subscriber::EnvFilter::from_default_env()
847 .add_directive(level_directive)
848 .add_directive("tokio=info".parse().unwrap())
849 .add_directive("runtime=info".parse().unwrap())
850 .add_directive("quinn=warn".parse().unwrap())
851 .add_directive("rustls=warn".parse().unwrap())
852 .add_directive("h2=warn".parse().unwrap())
853 };
854 let file_layer = if let Some(ref log_file_path) = debug_log_file {
855 let file = std::fs::OpenOptions::new()
856 .create(true)
857 .append(true)
858 .open(log_file_path)
859 .unwrap_or_else(|e| {
860 panic!("Failed to create debug log file at '{log_file_path}': {e}")
861 });
862 let file_layer = tracing_subscriber::fmt::layer()
863 .with_target(true)
864 .with_line_number(true)
865 .with_thread_ids(true)
866 .with_timer(LocalTimeFormatter)
867 .with_ansi(false)
868 .with_writer(file)
869 .with_filter(make_env_filter());
870 Some(file_layer)
871 } else {
872 None
873 };
874 let fmt_layer = if remote_tracing_layer.is_some() {
876 None
877 } else {
878 let fmt_layer = tracing_subscriber::fmt::layer()
879 .with_target(true)
880 .with_line_number(true)
881 .with_span_events(if verbose > 2 {
882 FmtSpan::NEW | FmtSpan::CLOSE
883 } else {
884 FmtSpan::NONE
885 })
886 .with_timer(LocalTimeFormatter)
887 .pretty()
888 .with_writer(ProgWriter::new)
889 .with_filter(make_env_filter());
890 Some(fmt_layer)
891 };
892 let remote_tracing_layer =
894 remote_tracing_layer.map(|layer| layer.with_filter(make_env_filter()));
895 let console_layer = if tokio_console {
896 let console_port = tokio_console_port.unwrap_or(6669);
897 let retention_seconds: u64 =
898 read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
899 eprintln!("Tokio console server listening on 127.0.0.1:{console_port}");
900 let console_layer = console_subscriber::ConsoleLayer::builder()
901 .retention(std::time::Duration::from_secs(retention_seconds))
902 .server_addr(([127, 0, 0, 1], console_port))
903 .spawn();
904 Some(console_layer)
905 } else {
906 None
907 };
908 let profiling_enabled = chrome_trace_prefix.is_some() || flamegraph_prefix.is_some();
912 let profile_filter_str = if profiling_enabled {
913 let level_str = profile_level.as_deref().unwrap_or("trace");
914 let valid_levels = ["trace", "debug", "info", "warn", "error", "off"];
916 if !valid_levels.contains(&level_str.to_lowercase().as_str()) {
917 eprintln!(
918 "Invalid --profile-level '{}'. Valid values: trace, debug, info, warn, error, off",
919 level_str
920 );
921 std::process::exit(1);
922 }
923 Some(format!(
925 "tokio=off,quinn=off,h2=off,hyper=off,rustls=off,{}",
926 level_str
927 ))
928 } else {
929 None
930 };
931 let make_profile_filter =
933 || tracing_subscriber::EnvFilter::new(profile_filter_str.as_ref().unwrap());
934 let chrome_layer = if let Some(ref prefix) = chrome_trace_prefix {
936 let filename = generate_trace_filename(prefix, &trace_identifier, "json");
937 eprintln!("Chrome trace will be written to: {filename}");
938 let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
939 .file(&filename)
940 .include_args(true)
941 .build();
942 _chrome_guard = Some(guard);
943 Some(layer.with_filter(make_profile_filter()))
944 } else {
945 None
946 };
947 let flame_layer = if let Some(ref prefix) = flamegraph_prefix {
949 let filename = generate_trace_filename(prefix, &trace_identifier, "folded");
950 eprintln!("Flamegraph data will be written to: {filename}");
951 match tracing_flame::FlameLayer::with_file(&filename) {
952 Ok((layer, guard)) => {
953 _flame_guard = Some(guard);
954 Some(layer.with_filter(make_profile_filter()))
955 }
956 Err(e) => {
957 eprintln!("Failed to create flamegraph layer: {e}");
958 None
959 }
960 }
961 } else {
962 None
963 };
964 tracing_subscriber::registry()
965 .with(file_layer)
966 .with(fmt_layer)
967 .with(remote_tracing_layer)
968 .with(console_layer)
969 .with(chrome_layer)
970 .with(flame_layer)
971 .init();
972 }
973 let mut builder = tokio::runtime::Builder::new_multi_thread();
974 builder.enable_all();
975 if max_workers > 0 {
976 builder.worker_threads(max_workers);
977 }
978 if max_blocking_threads > 0 {
979 builder.max_blocking_threads(max_blocking_threads);
980 }
981 if !sysinfo::set_open_files_limit(isize::MAX) {
982 tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
983 }
984 let set_max_open_files = max_open_files.unwrap_or_else(|| {
985 let limit = get_max_open_files().expect(
986 "We failed to query rlimit, if this is expected try specifying --max-open-files",
987 ) as usize;
988 80 * limit / 100 });
990 if set_max_open_files > 0 {
991 tracing::info!("Setting max open files to: {}", set_max_open_files);
992 throttle::set_max_open_files(set_max_open_files);
993 } else {
994 tracing::info!("Not applying any limit to max open files!");
995 }
996 let runtime = builder.build().expect("Failed to create runtime");
997 fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
998 let mut replenish = replenish;
999 let mut interval = std::time::Duration::from_secs(1);
1000 while replenish > 100 && interval > std::time::Duration::from_millis(1) {
1001 replenish /= 10;
1002 interval /= 10;
1003 }
1004 (replenish, interval)
1005 }
1006 if ops_throttle > 0 {
1007 let (replenish, interval) = get_replenish_interval(ops_throttle);
1008 throttle::init_ops_tokens(replenish);
1009 runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
1010 }
1011 if iops_throttle > 0 {
1012 let (replenish, interval) = get_replenish_interval(iops_throttle);
1013 throttle::init_iops_tokens(replenish);
1014 runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
1015 }
1016 let res = {
1017 let _progress = progress.map(|settings| {
1018 tracing::debug!("Requesting progress updates {settings:?}");
1019 let delay = settings.progress_delay.map(|delay_str| {
1020 humantime::parse_duration(&delay_str)
1021 .expect("Couldn't parse duration out of --progress-delay")
1022 });
1023 ProgressTracker::new(settings.progress_type, delay)
1024 });
1025 runtime.block_on(func())
1026 };
1027 match &res {
1028 Ok(summary) => {
1029 if print_summary || verbose > 0 {
1030 println!("{summary}");
1031 }
1032 }
1033 Err(err) => {
1034 if !quiet {
1035 println!("{err:?}");
1036 }
1037 }
1038 }
1039 if print_summary || verbose > 0 {
1040 if let Err(err) = print_runtime_stats() {
1041 println!("Failed to print runtime stats: {err:?}");
1042 }
1043 }
1044 res.ok()
1045}
1046
1047#[cfg(test)]
1048mod runtime_stats_tests {
1049 use super::*;
1050 use anyhow::Result;
1051
1052 #[test]
1053 fn collect_runtime_stats_matches_procfs_snapshot() -> Result<()> {
1054 let process = procfs::process::Process::myself()?;
1055 let expected = collect_runtime_stats_for_process(&process)?;
1056 let actual = collect_runtime_stats();
1057 let cpu_tolerance_ms = 50;
1058 let rss_tolerance_bytes = 1_000_000;
1059 assert!(
1060 expected.cpu_time_user_ms.abs_diff(actual.cpu_time_user_ms) <= cpu_tolerance_ms,
1061 "user CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1062 expected.cpu_time_user_ms,
1063 actual.cpu_time_user_ms
1064 );
1065 assert!(
1066 expected
1067 .cpu_time_kernel_ms
1068 .abs_diff(actual.cpu_time_kernel_ms)
1069 <= cpu_tolerance_ms,
1070 "kernel CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1071 expected.cpu_time_kernel_ms,
1072 actual.cpu_time_kernel_ms
1073 );
1074 assert!(
1075 expected.peak_rss_bytes.abs_diff(actual.peak_rss_bytes) <= rss_tolerance_bytes,
1076 "peak RSS deviated by more than {rss_tolerance_bytes} bytes: expected {}, got {}",
1077 expected.peak_rss_bytes,
1078 actual.peak_rss_bytes
1079 );
1080 Ok(())
1081 }
1082
1083 #[test]
1084 fn collect_runtime_stats_returns_default_on_error() {
1085 let stats = collect_runtime_stats_inner(None);
1086 assert_eq!(stats, RuntimeStats::default());
1087
1088 let nonexistent_process = procfs::process::Process::new(i32::MAX).ok();
1089 let stats = collect_runtime_stats_inner(nonexistent_process);
1090 assert_eq!(stats, RuntimeStats::default());
1091 }
1092}