1#[macro_use]
97extern crate lazy_static;
98
99use crate::cmp::ObjType;
100use anyhow::anyhow;
101use anyhow::Context;
102use std::io::IsTerminal;
103use tracing::instrument;
104use tracing_subscriber::fmt::format::FmtSpan;
105use tracing_subscriber::prelude::*;
106
107pub mod cmp;
108pub mod config;
109pub mod copy;
110pub mod filegen;
111pub mod link;
112pub mod preserve;
113pub mod remote_tracing;
114pub mod rm;
115pub mod version;
116
117pub mod filecmp;
118pub mod progress;
119mod testutils;
120
121pub use config::{OutputConfig, RuntimeConfig, ThrottleConfig, TracingConfig};
122pub use progress::{RcpdProgressPrinter, SerializableProgress};
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
126pub enum RcpdType {
127 Source,
128 Destination,
129}
130
131impl std::fmt::Display for RcpdType {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 match self {
134 RcpdType::Source => write!(f, "source"),
135 RcpdType::Destination => write!(f, "destination"),
136 }
137 }
138}
139
140pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
142
143#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
145pub struct RuntimeStats {
146 pub cpu_time_user_ms: u64,
148 pub cpu_time_kernel_ms: u64,
150 pub peak_rss_bytes: u64,
152}
153
154#[derive(Debug, Default)]
156pub struct RemoteRuntimeStats {
157 pub source_host: String,
158 pub source_stats: RuntimeStats,
159 pub dest_host: String,
160 pub dest_stats: RuntimeStats,
161}
162
163#[must_use]
166pub fn is_localhost(host: &str) -> bool {
167 if host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]" {
168 return true;
169 }
170 let mut buf = [0u8; 256];
172 let result = unsafe { libc::gethostname(buf.as_mut_ptr() as *mut libc::c_char, buf.len()) };
174 if result == 0 {
175 if let Ok(hostname_cstr) = std::ffi::CStr::from_bytes_until_nul(&buf) {
176 if let Ok(hostname) = hostname_cstr.to_str() {
177 if host == hostname {
178 return true;
179 }
180 }
181 }
182 }
183 false
184}
185
186lazy_static! {
187 static ref PROGRESS: progress::Progress = progress::Progress::new();
188 static ref PBAR: indicatif::ProgressBar = indicatif::ProgressBar::new_spinner();
189 static ref REMOTE_RUNTIME_STATS: std::sync::Mutex<Option<RemoteRuntimeStats>> =
190 std::sync::Mutex::new(None);
191}
192
193#[must_use]
194pub fn get_progress() -> &'static progress::Progress {
195 &PROGRESS
196}
197
198pub fn set_remote_runtime_stats(stats: RemoteRuntimeStats) {
200 *REMOTE_RUNTIME_STATS.lock().unwrap() = Some(stats);
201}
202
203struct LocalTimeFormatter;
204
205impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFormatter {
206 fn format_time(
207 &self,
208 writer: &mut tracing_subscriber::fmt::format::Writer<'_>,
209 ) -> std::fmt::Result {
210 let now = chrono::Local::now();
211 writer.write_str(&now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
212 }
213}
214
215struct ProgressTracker {
216 lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
217 pbar_thread: Option<std::thread::JoinHandle<()>>,
218}
219
220#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
221pub enum ProgressType {
222 #[default]
223 #[value(name = "auto", alias = "Auto")]
224 Auto,
225 #[value(name = "ProgressBar", alias = "progress-bar")]
226 ProgressBar,
227 #[value(name = "TextUpdates", alias = "text-updates")]
228 TextUpdates,
229}
230
231pub enum GeneralProgressType {
232 User(ProgressType),
233 Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
234 RemoteMaster {
235 progress_type: ProgressType,
236 get_progress_snapshot:
237 Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>,
238 },
239}
240
241impl std::fmt::Debug for GeneralProgressType {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 match self {
244 GeneralProgressType::User(pt) => write!(f, "User({pt:?})"),
245 GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
246 GeneralProgressType::RemoteMaster { progress_type, .. } => {
247 write!(
248 f,
249 "RemoteMaster(progress_type: {progress_type:?}, <function>)"
250 )
251 }
252 }
253 }
254}
255
256#[derive(Debug)]
257pub struct ProgressSettings {
258 pub progress_type: GeneralProgressType,
259 pub progress_delay: Option<String>,
260}
261
262fn progress_bar(
263 lock: &std::sync::Mutex<bool>,
264 cvar: &std::sync::Condvar,
265 delay_opt: &Option<std::time::Duration>,
266) {
267 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
268 PBAR.set_style(
269 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
270 .unwrap()
271 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
272 );
273 let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
274 let mut is_done = lock.lock().unwrap();
275 loop {
276 PBAR.set_position(PBAR.position() + 1); PBAR.set_message(prog_printer.print().unwrap());
278 let result = cvar.wait_timeout(is_done, delay).unwrap();
279 is_done = result.0;
280 if *is_done {
281 break;
282 }
283 }
284 PBAR.finish_and_clear();
285}
286
287fn get_datetime_prefix() -> String {
288 chrono::Local::now()
289 .format("%Y-%m-%dT%H:%M:%S%.3f%:z")
290 .to_string()
291}
292
293fn text_updates(
294 lock: &std::sync::Mutex<bool>,
295 cvar: &std::sync::Condvar,
296 delay_opt: &Option<std::time::Duration>,
297) {
298 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
299 let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
300 let mut is_done = lock.lock().unwrap();
301 loop {
302 eprintln!("=======================");
303 eprintln!(
304 "{}\n--{}",
305 get_datetime_prefix(),
306 prog_printer.print().unwrap()
307 );
308 let result = cvar.wait_timeout(is_done, delay).unwrap();
309 is_done = result.0;
310 if *is_done {
311 break;
312 }
313 }
314}
315
316fn rcpd_updates(
317 lock: &std::sync::Mutex<bool>,
318 cvar: &std::sync::Condvar,
319 delay_opt: &Option<std::time::Duration>,
320 sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
321) {
322 tracing::debug!("Starting rcpd progress updates");
323 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
324 let mut is_done = lock.lock().unwrap();
325 loop {
326 if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
327 tracing::debug!("Progress update channel closed, stopping progress updates");
329 break;
330 }
331 let result = cvar.wait_timeout(is_done, delay).unwrap();
332 is_done = result.0;
333 if *is_done {
334 break;
335 }
336 }
337}
338
339fn remote_master_updates<F>(
340 lock: &std::sync::Mutex<bool>,
341 cvar: &std::sync::Condvar,
342 delay_opt: &Option<std::time::Duration>,
343 get_progress_snapshot: F,
344 progress_type: ProgressType,
345) where
346 F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
347{
348 let interactive = match progress_type {
349 ProgressType::Auto => std::io::stderr().is_terminal(),
350 ProgressType::ProgressBar => true,
351 ProgressType::TextUpdates => false,
352 };
353 if interactive {
354 PBAR.set_style(
355 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
356 .unwrap()
357 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
358 );
359 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
360 let mut printer = RcpdProgressPrinter::new();
361 let mut is_done = lock.lock().unwrap();
362 loop {
363 let progress_map = get_progress_snapshot();
364 let source_progress = &progress_map[RcpdType::Source];
365 let destination_progress = &progress_map[RcpdType::Destination];
366 PBAR.set_position(PBAR.position() + 1); PBAR.set_message(
368 printer
369 .print(source_progress, destination_progress)
370 .unwrap(),
371 );
372 let result = cvar.wait_timeout(is_done, delay).unwrap();
373 is_done = result.0;
374 if *is_done {
375 break;
376 }
377 }
378 PBAR.finish_and_clear();
379 } else {
380 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
381 let mut printer = RcpdProgressPrinter::new();
382 let mut is_done = lock.lock().unwrap();
383 loop {
384 let progress_map = get_progress_snapshot();
385 let source_progress = &progress_map[RcpdType::Source];
386 let destination_progress = &progress_map[RcpdType::Destination];
387 eprintln!("=======================");
388 eprintln!(
389 "{}\n--{}",
390 get_datetime_prefix(),
391 printer
392 .print(source_progress, destination_progress)
393 .unwrap()
394 );
395 let result = cvar.wait_timeout(is_done, delay).unwrap();
396 is_done = result.0;
397 if *is_done {
398 break;
399 }
400 }
401 }
402}
403
404impl ProgressTracker {
405 pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
406 let lock_cvar =
407 std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
408 let lock_cvar_clone = lock_cvar.clone();
409 let pbar_thread = std::thread::spawn(move || {
410 let (lock, cvar) = &*lock_cvar_clone;
411 match progress_type {
412 GeneralProgressType::Remote(sender) => {
413 rcpd_updates(lock, cvar, &delay_opt, sender);
414 }
415 GeneralProgressType::RemoteMaster {
416 progress_type,
417 get_progress_snapshot,
418 } => {
419 remote_master_updates(
420 lock,
421 cvar,
422 &delay_opt,
423 get_progress_snapshot,
424 progress_type,
425 );
426 }
427 _ => {
428 let interactive = match progress_type {
429 GeneralProgressType::User(ProgressType::Auto) => {
430 std::io::stderr().is_terminal()
431 }
432 GeneralProgressType::User(ProgressType::ProgressBar) => true,
433 GeneralProgressType::User(ProgressType::TextUpdates) => false,
434 GeneralProgressType::Remote(_)
435 | GeneralProgressType::RemoteMaster { .. } => {
436 unreachable!("Invalid progress type: {progress_type:?}")
437 }
438 };
439 if interactive {
440 progress_bar(lock, cvar, &delay_opt);
441 } else {
442 text_updates(lock, cvar, &delay_opt);
443 }
444 }
445 }
446 });
447 Self {
448 lock_cvar,
449 pbar_thread: Some(pbar_thread),
450 }
451 }
452}
453
454impl Drop for ProgressTracker {
455 fn drop(&mut self) {
456 let (lock, cvar) = &*self.lock_cvar;
457 let mut is_done = lock.lock().unwrap();
458 *is_done = true;
459 cvar.notify_one();
460 drop(is_done);
461 if let Some(pbar_thread) = self.pbar_thread.take() {
462 pbar_thread.join().unwrap();
463 }
464 }
465}
466
467pub fn parse_metadata_cmp_settings(
468 settings: &str,
469) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
470 let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
471 for setting in settings.split(',') {
472 match setting {
473 "uid" => metadata_cmp_settings.uid = true,
474 "gid" => metadata_cmp_settings.gid = true,
475 "mode" => metadata_cmp_settings.mode = true,
476 "size" => metadata_cmp_settings.size = true,
477 "mtime" => metadata_cmp_settings.mtime = true,
478 "ctime" => metadata_cmp_settings.ctime = true,
479 _ => {
480 return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
481 }
482 }
483 }
484 Ok(metadata_cmp_settings)
485}
486
487fn parse_type_settings(
488 settings: &str,
489) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
490 let mut user_and_time = preserve::UserAndTimeSettings::default();
491 let mut mode_mask = None;
492 for setting in settings.split(',') {
493 match setting {
494 "uid" => user_and_time.uid = true,
495 "gid" => user_and_time.gid = true,
496 "time" => user_and_time.time = true,
497 _ => {
498 if let Ok(mask) = u32::from_str_radix(setting, 8) {
499 mode_mask = Some(mask);
500 } else {
501 return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
502 }
503 }
504 }
505 }
506 Ok((user_and_time, mode_mask))
507}
508
509pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
510 let mut preserve_settings = preserve::Settings::default();
511 for type_settings in settings.split(' ') {
512 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
513 let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
514 format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
515 )?;
516 match obj_type {
517 "f" | "file" => {
518 preserve_settings.file = preserve::FileSettings::default();
519 preserve_settings.file.user_and_time = user_and_time_settings;
520 if let Some(mode) = mode_opt {
521 preserve_settings.file.mode_mask = mode;
522 }
523 }
524 "d" | "dir" | "directory" => {
525 preserve_settings.dir = preserve::DirSettings::default();
526 preserve_settings.dir.user_and_time = user_and_time_settings;
527 if let Some(mode) = mode_opt {
528 preserve_settings.dir.mode_mask = mode;
529 }
530 }
531 "l" | "link" | "symlink" => {
532 preserve_settings.symlink = preserve::SymlinkSettings::default();
533 preserve_settings.symlink.user_and_time = user_and_time_settings;
534 }
535 _ => {
536 return Err(anyhow!("Unknown object type: {}", obj_type));
537 }
538 }
539 } else {
540 return Err(anyhow!("Invalid preserve settings: {}", settings));
541 }
542 }
543 Ok(preserve_settings)
544}
545
546pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
547 let mut cmp_settings = cmp::ObjSettings::default();
548 for type_settings in settings.split(' ') {
549 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
550 let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
551 "parsing preserve settings: {obj_settings}, type: {obj_type}"
552 ))?;
553 let obj_type = match obj_type {
554 "f" | "file" => ObjType::File,
555 "d" | "dir" | "directory" => ObjType::Dir,
556 "l" | "link" | "symlink" => ObjType::Symlink,
557 "o" | "other" => ObjType::Other,
558 _ => {
559 return Err(anyhow!("Unknown obj type: {}", obj_type));
560 }
561 };
562 cmp_settings[obj_type] = obj_cmp_settings;
563 } else {
564 return Err(anyhow!("Invalid preserve settings: {}", settings));
565 }
566 }
567 Ok(cmp_settings)
568}
569
570pub async fn cmp(
571 src: &std::path::Path,
572 dst: &std::path::Path,
573 log: &cmp::LogWriter,
574 settings: &cmp::Settings,
575) -> Result<cmp::Summary, anyhow::Error> {
576 cmp::cmp(&PROGRESS, src, dst, log, settings).await
577}
578
579pub async fn copy(
580 src: &std::path::Path,
581 dst: &std::path::Path,
582 settings: ©::Settings,
583 preserve: &preserve::Settings,
584) -> Result<copy::Summary, copy::Error> {
585 copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
586}
587
588pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
589 rm::rm(&PROGRESS, path, settings).await
590}
591
592pub async fn link(
593 src: &std::path::Path,
594 dst: &std::path::Path,
595 update: &Option<std::path::PathBuf>,
596 settings: &link::Settings,
597) -> Result<link::Summary, link::Error> {
598 let cwd = std::env::current_dir()
599 .with_context(|| "failed to get current working directory")
600 .map_err(|err| link::Error::new(err, link::Summary::default()))?;
601 link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
602}
603
604fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
605 match std::env::var(name) {
606 Ok(val) => match val.parse() {
607 Ok(val) => val,
608 Err(_) => default,
609 },
610 Err(_) => default,
611 }
612}
613
614#[must_use]
616pub fn collect_runtime_stats() -> RuntimeStats {
617 collect_runtime_stats_inner(procfs::process::Process::myself().ok())
618}
619
620fn collect_runtime_stats_inner(process: Option<procfs::process::Process>) -> RuntimeStats {
621 let Some(process) = process else {
622 return RuntimeStats::default();
623 };
624 collect_runtime_stats_for_process(&process).unwrap_or_default()
625}
626
627fn collect_runtime_stats_for_process(
628 process: &procfs::process::Process,
629) -> anyhow::Result<RuntimeStats> {
630 let stat = process.stat()?;
631 let clock_ticks = procfs::ticks_per_second() as f64;
632 let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
634 Ok(RuntimeStats {
635 cpu_time_user_ms: ((stat.utime as f64 / clock_ticks) * 1000.0) as u64,
636 cpu_time_kernel_ms: ((stat.stime as f64 / clock_ticks) * 1000.0) as u64,
637 peak_rss_bytes: vmhwm_kb * 1024,
638 })
639}
640
641fn print_runtime_stats_for_role(prefix: &str, stats: &RuntimeStats) {
642 let cpu_total =
643 std::time::Duration::from_millis(stats.cpu_time_user_ms + stats.cpu_time_kernel_ms);
644 let cpu_kernel = std::time::Duration::from_millis(stats.cpu_time_kernel_ms);
645 let cpu_user = std::time::Duration::from_millis(stats.cpu_time_user_ms);
646 println!(
647 "{prefix}cpu time : {:.2?} | k: {:.2?} | u: {:.2?}",
648 cpu_total, cpu_kernel, cpu_user
649 );
650 println!(
651 "{prefix}peak RSS : {}",
652 bytesize::ByteSize(stats.peak_rss_bytes)
653 );
654}
655
656#[rustfmt::skip]
657fn print_runtime_stats() -> Result<(), anyhow::Error> {
658 let remote_stats = REMOTE_RUNTIME_STATS.lock().unwrap().take();
660 if let Some(remote) = remote_stats {
661 println!("walltime : {:.2?}", &PROGRESS.get_duration());
663 println!();
664 let source_is_local = is_localhost(&remote.source_host);
665 let dest_is_local = is_localhost(&remote.dest_host);
666 let master_stats = collect_runtime_stats();
668 if !source_is_local {
670 println!("SOURCE ({}):", remote.source_host);
671 print_runtime_stats_for_role(" ", &remote.source_stats);
672 println!();
673 }
674 if !dest_is_local {
675 println!("DESTINATION ({}):", remote.dest_host);
676 print_runtime_stats_for_role(" ", &remote.dest_stats);
677 println!();
678 }
679 match (source_is_local, dest_is_local) {
681 (true, true) => {
682 println!("MASTER + SOURCE + DESTINATION (localhost):");
683 print_runtime_stats_for_role(" master ", &master_stats);
684 print_runtime_stats_for_role(" source ", &remote.source_stats);
685 print_runtime_stats_for_role(" dest ", &remote.dest_stats);
686 }
687 (true, false) => {
688 println!("MASTER + SOURCE (localhost):");
689 print_runtime_stats_for_role(" master ", &master_stats);
690 print_runtime_stats_for_role(" source ", &remote.source_stats);
691 }
692 (false, true) => {
693 println!("MASTER + DESTINATION (localhost):");
694 print_runtime_stats_for_role(" master ", &master_stats);
695 print_runtime_stats_for_role(" dest ", &remote.dest_stats);
696 }
697 (false, false) => {
698 println!("MASTER (localhost):");
699 print_runtime_stats_for_role(" ", &master_stats);
700 }
701 }
702 return Ok(());
703 }
704 let process = procfs::process::Process::myself()?;
706 let stat = process.stat()?;
707 let clock_ticks_per_second = procfs::ticks_per_second();
709 let ticks_to_duration = |ticks: u64| {
710 std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
711 };
712 let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
714 println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
715 println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
716 println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm_kb * 1024));
717 Ok(())
718}
719
720fn get_max_open_files() -> Result<u64, std::io::Error> {
721 let mut rlim = libc::rlimit {
722 rlim_cur: 0,
723 rlim_max: 0,
724 };
725 let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
727 if result == 0 {
728 Ok(rlim.rlim_cur)
729 } else {
730 Err(std::io::Error::last_os_error())
731 }
732}
733
734struct ProgWriter {}
735
736impl ProgWriter {
737 fn new() -> Self {
738 Self {}
739 }
740}
741
742impl std::io::Write for ProgWriter {
743 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
744 PBAR.suspend(|| std::io::stdout().write(buf))
745 }
746 fn flush(&mut self) -> std::io::Result<()> {
747 std::io::stdout().flush()
748 }
749}
750
751fn get_hostname() -> String {
752 nix::unistd::gethostname()
753 .ok()
754 .and_then(|os_str| os_str.into_string().ok())
755 .unwrap_or_else(|| "unknown".to_string())
756}
757
758#[must_use]
759pub fn generate_debug_log_filename(prefix: &str) -> String {
760 let now = chrono::Utc::now();
761 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
762 let process_id = std::process::id();
763 format!("{prefix}-{timestamp}-{process_id}")
764}
765
766#[must_use]
770pub fn generate_trace_filename(prefix: &str, identifier: &str, extension: &str) -> String {
771 let hostname = get_hostname();
772 let pid = std::process::id();
773 let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S");
774 format!("{prefix}-{identifier}-{hostname}-{pid}-{timestamp}.{extension}")
775}
776
777#[instrument(skip(func))] pub fn run<Fut, Summary, Error>(
779 progress: Option<ProgressSettings>,
780 output: OutputConfig,
781 runtime: RuntimeConfig,
782 throttle: ThrottleConfig,
783 tracing_config: TracingConfig,
784 func: impl FnOnce() -> Fut,
785) -> Option<Summary>
786where
788 Summary: std::fmt::Display,
789 Error: std::fmt::Display + std::fmt::Debug,
790 Fut: std::future::Future<Output = Result<Summary, Error>>,
791{
792 let _ = get_progress();
796 if let Err(e) = throttle.validate() {
798 eprintln!("Configuration error: {e}");
799 return None;
800 }
801 let OutputConfig {
803 quiet,
804 verbose,
805 print_summary,
806 } = output;
807 let RuntimeConfig {
808 max_workers,
809 max_blocking_threads,
810 } = runtime;
811 let ThrottleConfig {
812 max_open_files,
813 ops_throttle,
814 iops_throttle,
815 chunk_size: _,
816 } = throttle;
817 let TracingConfig {
818 remote_layer: remote_tracing_layer,
819 debug_log_file,
820 chrome_trace_prefix,
821 flamegraph_prefix,
822 trace_identifier,
823 profile_level,
824 tokio_console,
825 tokio_console_port,
826 } = tracing_config;
827 let mut _chrome_guard: Option<tracing_chrome::FlushGuard> = None;
829 let mut _flame_guard: Option<tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>> =
830 None;
831 if quiet {
832 assert!(
833 verbose == 0,
834 "Quiet mode and verbose mode are mutually exclusive"
835 );
836 } else {
837 let make_env_filter = || {
839 let level_directive = match verbose {
840 0 => "error".parse().unwrap(),
841 1 => "info".parse().unwrap(),
842 2 => "debug".parse().unwrap(),
843 _ => "trace".parse().unwrap(),
844 };
845 tracing_subscriber::EnvFilter::from_default_env()
848 .add_directive(level_directive)
849 .add_directive("tokio=info".parse().unwrap())
850 .add_directive("runtime=info".parse().unwrap())
851 .add_directive("quinn=warn".parse().unwrap())
852 .add_directive("rustls=warn".parse().unwrap())
853 .add_directive("h2=warn".parse().unwrap())
854 };
855 let file_layer = if let Some(ref log_file_path) = debug_log_file {
856 let file = std::fs::OpenOptions::new()
857 .create(true)
858 .append(true)
859 .open(log_file_path)
860 .unwrap_or_else(|e| {
861 panic!("Failed to create debug log file at '{log_file_path}': {e}")
862 });
863 let file_layer = tracing_subscriber::fmt::layer()
864 .with_target(true)
865 .with_line_number(true)
866 .with_thread_ids(true)
867 .with_timer(LocalTimeFormatter)
868 .with_ansi(false)
869 .with_writer(file)
870 .with_filter(make_env_filter());
871 Some(file_layer)
872 } else {
873 None
874 };
875 let fmt_layer = if remote_tracing_layer.is_some() {
877 None
878 } else {
879 let fmt_layer = tracing_subscriber::fmt::layer()
880 .with_target(true)
881 .with_line_number(true)
882 .with_span_events(if verbose > 2 {
883 FmtSpan::NEW | FmtSpan::CLOSE
884 } else {
885 FmtSpan::NONE
886 })
887 .with_timer(LocalTimeFormatter)
888 .pretty()
889 .with_writer(ProgWriter::new)
890 .with_filter(make_env_filter());
891 Some(fmt_layer)
892 };
893 let remote_tracing_layer =
895 remote_tracing_layer.map(|layer| layer.with_filter(make_env_filter()));
896 let console_layer = if tokio_console {
897 let console_port = tokio_console_port.unwrap_or(6669);
898 let retention_seconds: u64 =
899 read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
900 eprintln!("Tokio console server listening on 127.0.0.1:{console_port}");
901 let console_layer = console_subscriber::ConsoleLayer::builder()
902 .retention(std::time::Duration::from_secs(retention_seconds))
903 .server_addr(([127, 0, 0, 1], console_port))
904 .spawn();
905 Some(console_layer)
906 } else {
907 None
908 };
909 let profiling_enabled = chrome_trace_prefix.is_some() || flamegraph_prefix.is_some();
913 let profile_filter_str = if profiling_enabled {
914 let level_str = profile_level.as_deref().unwrap_or("trace");
915 let valid_levels = ["trace", "debug", "info", "warn", "error", "off"];
917 if !valid_levels.contains(&level_str.to_lowercase().as_str()) {
918 eprintln!(
919 "Invalid --profile-level '{}'. Valid values: trace, debug, info, warn, error, off",
920 level_str
921 );
922 std::process::exit(1);
923 }
924 Some(format!(
926 "tokio=off,quinn=off,h2=off,hyper=off,rustls=off,{}",
927 level_str
928 ))
929 } else {
930 None
931 };
932 let make_profile_filter =
934 || tracing_subscriber::EnvFilter::new(profile_filter_str.as_ref().unwrap());
935 let chrome_layer = if let Some(ref prefix) = chrome_trace_prefix {
937 let filename = generate_trace_filename(prefix, &trace_identifier, "json");
938 eprintln!("Chrome trace will be written to: {filename}");
939 let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
940 .file(&filename)
941 .include_args(true)
942 .build();
943 _chrome_guard = Some(guard);
944 Some(layer.with_filter(make_profile_filter()))
945 } else {
946 None
947 };
948 let flame_layer = if let Some(ref prefix) = flamegraph_prefix {
950 let filename = generate_trace_filename(prefix, &trace_identifier, "folded");
951 eprintln!("Flamegraph data will be written to: {filename}");
952 match tracing_flame::FlameLayer::with_file(&filename) {
953 Ok((layer, guard)) => {
954 _flame_guard = Some(guard);
955 Some(layer.with_filter(make_profile_filter()))
956 }
957 Err(e) => {
958 eprintln!("Failed to create flamegraph layer: {e}");
959 None
960 }
961 }
962 } else {
963 None
964 };
965 tracing_subscriber::registry()
966 .with(file_layer)
967 .with(fmt_layer)
968 .with(remote_tracing_layer)
969 .with(console_layer)
970 .with(chrome_layer)
971 .with(flame_layer)
972 .init();
973 }
974 let mut builder = tokio::runtime::Builder::new_multi_thread();
975 builder.enable_all();
976 if max_workers > 0 {
977 builder.worker_threads(max_workers);
978 }
979 if max_blocking_threads > 0 {
980 builder.max_blocking_threads(max_blocking_threads);
981 }
982 if !sysinfo::set_open_files_limit(isize::MAX) {
983 tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
984 }
985 let set_max_open_files = max_open_files.unwrap_or_else(|| {
986 let limit = get_max_open_files().expect(
987 "We failed to query rlimit, if this is expected try specifying --max-open-files",
988 ) as usize;
989 80 * limit / 100 });
991 if set_max_open_files > 0 {
992 tracing::info!("Setting max open files to: {}", set_max_open_files);
993 throttle::set_max_open_files(set_max_open_files);
994 } else {
995 tracing::info!("Not applying any limit to max open files!");
996 }
997 let runtime = builder.build().expect("Failed to create runtime");
998 fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
999 let mut replenish = replenish;
1000 let mut interval = std::time::Duration::from_secs(1);
1001 while replenish > 100 && interval > std::time::Duration::from_millis(1) {
1002 replenish /= 10;
1003 interval /= 10;
1004 }
1005 (replenish, interval)
1006 }
1007 if ops_throttle > 0 {
1008 let (replenish, interval) = get_replenish_interval(ops_throttle);
1009 throttle::init_ops_tokens(replenish);
1010 runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
1011 }
1012 if iops_throttle > 0 {
1013 let (replenish, interval) = get_replenish_interval(iops_throttle);
1014 throttle::init_iops_tokens(replenish);
1015 runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
1016 }
1017 let res = {
1018 let _progress = progress.map(|settings| {
1019 tracing::debug!("Requesting progress updates {settings:?}");
1020 let delay = settings.progress_delay.map(|delay_str| {
1021 humantime::parse_duration(&delay_str)
1022 .expect("Couldn't parse duration out of --progress-delay")
1023 });
1024 ProgressTracker::new(settings.progress_type, delay)
1025 });
1026 runtime.block_on(func())
1027 };
1028 match &res {
1029 Ok(summary) => {
1030 if print_summary || verbose > 0 {
1031 println!("{summary}");
1032 }
1033 }
1034 Err(err) => {
1035 if !quiet {
1036 println!("{err:?}");
1037 }
1038 }
1039 }
1040 if print_summary || verbose > 0 {
1041 if let Err(err) = print_runtime_stats() {
1042 println!("Failed to print runtime stats: {err:?}");
1043 }
1044 }
1045 res.ok()
1046}
1047
1048#[cfg(test)]
1049mod runtime_stats_tests {
1050 use super::*;
1051 use anyhow::Result;
1052
1053 #[test]
1054 fn collect_runtime_stats_matches_procfs_snapshot() -> Result<()> {
1055 let process = procfs::process::Process::myself()?;
1056 let expected = collect_runtime_stats_for_process(&process)?;
1057 let actual = collect_runtime_stats();
1058 let cpu_tolerance_ms = 50;
1059 let rss_tolerance_bytes = 1_000_000;
1060 assert!(
1061 expected.cpu_time_user_ms.abs_diff(actual.cpu_time_user_ms) <= cpu_tolerance_ms,
1062 "user CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1063 expected.cpu_time_user_ms,
1064 actual.cpu_time_user_ms
1065 );
1066 assert!(
1067 expected
1068 .cpu_time_kernel_ms
1069 .abs_diff(actual.cpu_time_kernel_ms)
1070 <= cpu_tolerance_ms,
1071 "kernel CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1072 expected.cpu_time_kernel_ms,
1073 actual.cpu_time_kernel_ms
1074 );
1075 assert!(
1076 expected.peak_rss_bytes.abs_diff(actual.peak_rss_bytes) <= rss_tolerance_bytes,
1077 "peak RSS deviated by more than {rss_tolerance_bytes} bytes: expected {}, got {}",
1078 expected.peak_rss_bytes,
1079 actual.peak_rss_bytes
1080 );
1081 Ok(())
1082 }
1083
1084 #[test]
1085 fn collect_runtime_stats_returns_default_on_error() {
1086 let stats = collect_runtime_stats_inner(None);
1087 assert_eq!(stats, RuntimeStats::default());
1088
1089 let nonexistent_process = procfs::process::Process::new(i32::MAX).ok();
1090 let stats = collect_runtime_stats_inner(nonexistent_process);
1091 assert_eq!(stats, RuntimeStats::default());
1092 }
1093}