1use crate::cmp::ObjType;
104use anyhow::Context;
105use anyhow::anyhow;
106use std::io::IsTerminal;
107use tracing::instrument;
108use tracing_subscriber::fmt::format::FmtSpan;
109use tracing_subscriber::prelude::*;
110
111mod auto_meta;
112pub mod cli;
113pub mod cmp;
114pub mod config;
115pub mod copy;
116pub mod dry_run;
117pub mod error;
118pub mod error_collector;
119pub mod filegen;
120pub mod filter;
121pub mod histogram_logger;
122pub mod histogram_panel;
123pub mod link;
124pub mod observability;
125pub mod preserve;
126pub mod remote_tracing;
127pub mod rm;
128pub mod version;
129
130pub mod filecmp;
131pub mod progress;
132mod testutils;
133pub mod walk;
134
135pub use config::{
136 AutoMetaThrottleConfig, DryRunMode, DryRunWarnings, OutputConfig, RuntimeConfig,
137 ThrottleConfig, TracingConfig,
138};
139pub use congestion::{MetadataOp, Side};
144pub use progress::{RcpdProgressPrinter, SerializableProgress};
145
146#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
148pub enum RcpdType {
149 Source,
150 Destination,
151}
152
153impl std::fmt::Display for RcpdType {
154 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
155 match self {
156 RcpdType::Source => write!(f, "source"),
157 RcpdType::Destination => write!(f, "destination"),
158 }
159 }
160}
161
162pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
164
165#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
167pub struct RuntimeStats {
168 pub cpu_time_user_ms: u64,
170 pub cpu_time_kernel_ms: u64,
172 pub peak_rss_bytes: u64,
174}
175
176#[derive(Debug, Default)]
178pub struct RemoteRuntimeStats {
179 pub source_host: String,
180 pub source_stats: RuntimeStats,
181 pub dest_host: String,
182 pub dest_stats: RuntimeStats,
183}
184
185#[must_use]
188pub fn is_localhost(host: &str) -> bool {
189 if host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]" {
190 return true;
191 }
192 let mut buf = [0u8; 256];
194 let result = unsafe { libc::gethostname(buf.as_mut_ptr() as *mut libc::c_char, buf.len()) };
196 if result == 0
197 && let Ok(hostname_cstr) = std::ffi::CStr::from_bytes_until_nul(&buf)
198 && let Ok(hostname) = hostname_cstr.to_str()
199 && host == hostname
200 {
201 return true;
202 }
203 false
204}
205
206static PROGRESS: std::sync::LazyLock<progress::Progress> =
207 std::sync::LazyLock::new(progress::Progress::new);
208static PBAR: std::sync::LazyLock<indicatif::ProgressBar> =
209 std::sync::LazyLock::new(indicatif::ProgressBar::new_spinner);
210static REMOTE_RUNTIME_STATS: std::sync::LazyLock<std::sync::Mutex<Option<RemoteRuntimeStats>>> =
211 std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
212static HISTOGRAM_LOGGER_CANCEL: std::sync::Mutex<Option<tokio::sync::watch::Sender<bool>>> =
213 std::sync::Mutex::new(None);
214static HISTOGRAM_LOGGER_HANDLE: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>> =
215 std::sync::Mutex::new(None);
216
217fn store_logger_cancel(tx: tokio::sync::watch::Sender<bool>) {
218 *HISTOGRAM_LOGGER_CANCEL
219 .lock()
220 .expect("histogram logger cancel mutex poisoned") = Some(tx);
221}
222
223fn store_logger_handle(handle: tokio::task::JoinHandle<()>) {
224 *HISTOGRAM_LOGGER_HANDLE
225 .lock()
226 .expect("histogram logger handle mutex poisoned") = Some(handle);
227}
228
229fn take_logger_handle() -> Option<tokio::task::JoinHandle<()>> {
230 HISTOGRAM_LOGGER_HANDLE
231 .lock()
232 .expect("histogram logger handle mutex poisoned")
233 .take()
234}
235
236fn signal_logger_cancel() {
237 if let Some(tx) = HISTOGRAM_LOGGER_CANCEL
238 .lock()
239 .expect("histogram logger cancel mutex poisoned")
240 .take()
241 && let Err(err) = tx.send(true)
242 {
243 tracing::debug!("histogram-logger cancel send failed (already gone): {err:#}");
244 }
245}
246
247#[must_use]
248pub fn get_progress() -> &'static progress::Progress {
249 &PROGRESS
250}
251
252pub fn set_remote_runtime_stats(stats: RemoteRuntimeStats) {
254 *REMOTE_RUNTIME_STATS.lock().unwrap() = Some(stats);
255}
256
257struct LocalTimeFormatter;
258
259impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFormatter {
260 fn format_time(
261 &self,
262 writer: &mut tracing_subscriber::fmt::format::Writer<'_>,
263 ) -> std::fmt::Result {
264 let now = chrono::Local::now();
265 writer.write_str(&now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
266 }
267}
268
269struct ProgressTracker {
270 lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
271 pbar_thread: Option<std::thread::JoinHandle<()>>,
272}
273
274#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
275pub enum ProgressType {
276 #[default]
277 #[value(name = "auto", alias = "Auto")]
278 Auto,
279 #[value(name = "ProgressBar", alias = "progress-bar")]
280 ProgressBar,
281 #[value(name = "TextUpdates", alias = "text-updates")]
282 TextUpdates,
283}
284
285pub enum GeneralProgressType {
286 User {
287 progress_type: ProgressType,
288 kind: progress::LocalProgressKind,
289 },
290 Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
291 RemoteMaster {
292 progress_type: ProgressType,
293 get_progress_snapshot:
294 Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>,
295 },
296}
297
298impl std::fmt::Debug for GeneralProgressType {
299 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300 match self {
301 GeneralProgressType::User {
302 progress_type,
303 kind,
304 } => write!(f, "User(progress_type: {progress_type:?}, kind: {kind:?})"),
305 GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
306 GeneralProgressType::RemoteMaster { progress_type, .. } => {
307 write!(
308 f,
309 "RemoteMaster(progress_type: {progress_type:?}, <function>)"
310 )
311 }
312 }
313 }
314}
315
316#[derive(Debug)]
317pub struct ProgressSettings {
318 pub progress_type: GeneralProgressType,
319 pub progress_delay: Option<String>,
320}
321
322fn progress_bar(
323 lock: &std::sync::Mutex<bool>,
324 cvar: &std::sync::Condvar,
325 delay_opt: &Option<std::time::Duration>,
326 kind: progress::LocalProgressKind,
327) {
328 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
329 PBAR.set_style(
330 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
331 .unwrap()
332 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
333 );
334 let mut prog_printer = progress::make_local_printer(kind, &PROGRESS);
335 let mut is_done = lock.lock().unwrap();
336 loop {
337 PBAR.set_position(PBAR.position() + 1); let mut msg = prog_printer.print().unwrap();
339 msg.push_str(&observability::render_lines());
340 msg.push_str(&render_panel_from_registry());
341 PBAR.set_message(msg);
342 let result = cvar.wait_timeout(is_done, delay).unwrap();
343 is_done = result.0;
344 if *is_done {
345 break;
346 }
347 }
348 PBAR.finish_and_clear();
349}
350
351fn get_datetime_prefix() -> String {
352 chrono::Local::now()
353 .format("%Y-%m-%dT%H:%M:%S%.3f%:z")
354 .to_string()
355}
356
357fn text_updates(
358 lock: &std::sync::Mutex<bool>,
359 cvar: &std::sync::Condvar,
360 delay_opt: &Option<std::time::Duration>,
361 kind: progress::LocalProgressKind,
362) {
363 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
364 let mut prog_printer = progress::make_local_printer(kind, &PROGRESS);
365 let mut is_done = lock.lock().unwrap();
366 loop {
367 eprintln!("=======================");
368 eprintln!(
369 "{}\n--{}{}{}",
370 get_datetime_prefix(),
371 prog_printer.print().unwrap(),
372 observability::render_lines(),
373 render_panel_from_registry(),
374 );
375 let result = cvar.wait_timeout(is_done, delay).unwrap();
376 is_done = result.0;
377 if *is_done {
378 break;
379 }
380 }
381}
382
383fn rcpd_updates(
384 lock: &std::sync::Mutex<bool>,
385 cvar: &std::sync::Condvar,
386 delay_opt: &Option<std::time::Duration>,
387 sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
388) {
389 tracing::debug!("Starting rcpd progress updates");
390 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
391 let mut is_done = lock.lock().unwrap();
392 loop {
393 if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
394 tracing::debug!("Progress update channel closed, stopping progress updates");
396 break;
397 }
398 let result = cvar.wait_timeout(is_done, delay).unwrap();
399 is_done = result.0;
400 if *is_done {
401 break;
402 }
403 }
404}
405
406fn remote_master_updates<F>(
407 lock: &std::sync::Mutex<bool>,
408 cvar: &std::sync::Condvar,
409 delay_opt: &Option<std::time::Duration>,
410 get_progress_snapshot: F,
411 progress_type: ProgressType,
412) where
413 F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
414{
415 let interactive = match progress_type {
416 ProgressType::Auto => std::io::stderr().is_terminal(),
417 ProgressType::ProgressBar => true,
418 ProgressType::TextUpdates => false,
419 };
420 if interactive {
421 PBAR.set_style(
422 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
423 .unwrap()
424 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
425 );
426 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
427 let mut printer = RcpdProgressPrinter::new();
428 let mut is_done = lock.lock().unwrap();
429 loop {
430 let progress_map = get_progress_snapshot();
431 let source_progress = &progress_map[RcpdType::Source];
432 let destination_progress = &progress_map[RcpdType::Destination];
433 PBAR.set_position(PBAR.position() + 1); let mut msg = printer
435 .print(source_progress, destination_progress)
436 .unwrap();
437 msg.push_str(&render_panel_from_registry());
438 PBAR.set_message(msg);
439 let result = cvar.wait_timeout(is_done, delay).unwrap();
440 is_done = result.0;
441 if *is_done {
442 break;
443 }
444 }
445 PBAR.finish_and_clear();
446 } else {
447 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
448 let mut printer = RcpdProgressPrinter::new();
449 let mut is_done = lock.lock().unwrap();
450 loop {
451 let progress_map = get_progress_snapshot();
452 let source_progress = &progress_map[RcpdType::Source];
453 let destination_progress = &progress_map[RcpdType::Destination];
454 eprintln!("=======================");
455 eprintln!(
456 "{}\n--{}{}",
457 get_datetime_prefix(),
458 printer
459 .print(source_progress, destination_progress)
460 .unwrap(),
461 render_panel_from_registry(),
462 );
463 let result = cvar.wait_timeout(is_done, delay).unwrap();
464 is_done = result.0;
465 if *is_done {
466 break;
467 }
468 }
469 }
470}
471
472impl ProgressTracker {
473 pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
474 let lock_cvar =
475 std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
476 let lock_cvar_clone = lock_cvar.clone();
477 let pbar_thread = std::thread::spawn(move || {
478 let (lock, cvar) = &*lock_cvar_clone;
479 match progress_type {
480 GeneralProgressType::Remote(sender) => {
481 rcpd_updates(lock, cvar, &delay_opt, sender);
482 }
483 GeneralProgressType::RemoteMaster {
484 progress_type,
485 get_progress_snapshot,
486 } => {
487 remote_master_updates(
488 lock,
489 cvar,
490 &delay_opt,
491 get_progress_snapshot,
492 progress_type,
493 );
494 }
495 GeneralProgressType::User {
496 progress_type,
497 kind,
498 } => {
499 let interactive = match progress_type {
500 ProgressType::Auto => std::io::stderr().is_terminal(),
501 ProgressType::ProgressBar => true,
502 ProgressType::TextUpdates => false,
503 };
504 if interactive {
505 progress_bar(lock, cvar, &delay_opt, kind);
506 } else {
507 text_updates(lock, cvar, &delay_opt, kind);
508 }
509 }
510 }
511 });
512 Self {
513 lock_cvar,
514 pbar_thread: Some(pbar_thread),
515 }
516 }
517}
518
519impl Drop for ProgressTracker {
520 fn drop(&mut self) {
521 let (lock, cvar) = &*self.lock_cvar;
522 let mut is_done = lock.lock().unwrap();
523 *is_done = true;
524 cvar.notify_one();
525 drop(is_done);
526 if let Some(pbar_thread) = self.pbar_thread.take() {
527 pbar_thread.join().unwrap();
528 }
529 }
530}
531
532pub fn parse_metadata_cmp_settings(
533 settings: &str,
534) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
535 let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
536 for setting in settings.split(',') {
537 match setting {
538 "uid" => metadata_cmp_settings.uid = true,
539 "gid" => metadata_cmp_settings.gid = true,
540 "mode" => metadata_cmp_settings.mode = true,
541 "size" => metadata_cmp_settings.size = true,
542 "mtime" => metadata_cmp_settings.mtime = true,
543 "ctime" => metadata_cmp_settings.ctime = true,
544 _ => {
545 return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
546 }
547 }
548 }
549 Ok(metadata_cmp_settings)
550}
551
552fn parse_type_settings(
553 settings: &str,
554) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
555 let mut user_and_time = preserve::UserAndTimeSettings::default();
556 let mut mode_mask = None;
557 for setting in settings.split(',') {
558 match setting {
559 "uid" => user_and_time.uid = true,
560 "gid" => user_and_time.gid = true,
561 "time" => user_and_time.time = true,
562 _ => {
563 if let Ok(mask) = u32::from_str_radix(setting, 8) {
564 mode_mask = Some(mask);
565 } else {
566 return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
567 }
568 }
569 }
570 }
571 Ok((user_and_time, mode_mask))
572}
573
574pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
575 match settings {
577 "all" => return Ok(preserve::preserve_all()),
578 "none" => return Ok(preserve::preserve_none()),
579 _ => {}
580 }
581 let mut preserve_settings = preserve::Settings::default();
582 for type_settings in settings.split_whitespace() {
583 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
584 let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
585 format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
586 )?;
587 match obj_type {
588 "f" | "file" => {
589 preserve_settings.file = preserve::FileSettings::default();
590 preserve_settings.file.user_and_time = user_and_time_settings;
591 if let Some(mode) = mode_opt {
592 preserve_settings.file.mode_mask = mode;
593 }
594 }
595 "d" | "dir" | "directory" => {
596 preserve_settings.dir = preserve::DirSettings::default();
597 preserve_settings.dir.user_and_time = user_and_time_settings;
598 if let Some(mode) = mode_opt {
599 preserve_settings.dir.mode_mask = mode;
600 }
601 }
602 "l" | "link" | "symlink" => {
603 preserve_settings.symlink = preserve::SymlinkSettings::default();
604 preserve_settings.symlink.user_and_time = user_and_time_settings;
605 }
606 _ => {
607 return Err(anyhow!("Unknown object type: {}", obj_type));
608 }
609 }
610 } else {
611 return Err(anyhow!("Invalid preserve settings: {}", settings));
612 }
613 }
614 Ok(preserve_settings)
615}
616
617pub fn validate_update_compare_vs_preserve(
620 update_compare: &filecmp::MetadataCmpSettings,
621 preserve: &preserve::Settings,
622) -> Result<(), String> {
623 let mut missing = Vec::new();
624 if update_compare.mtime && !preserve.file.user_and_time.time {
625 missing.push("mtime");
626 }
627 if update_compare.uid && !preserve.file.user_and_time.uid {
628 missing.push("uid");
629 }
630 if update_compare.gid && !preserve.file.user_and_time.gid {
631 missing.push("gid");
632 }
633 if update_compare.mode && preserve.file.mode_mask != 0o7777 {
635 missing.push("mode");
636 }
637 if missing.is_empty() {
638 Ok(())
639 } else {
640 Err(format!(
641 "--update compares [{}] but --preserve-settings does not preserve them. \
642 Use --allow-lossy-update to override or adjust --preserve-settings.",
643 missing.join(", ")
644 ))
645 }
646}
647
648pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
649 let mut cmp_settings = cmp::ObjSettings::default();
650 for type_settings in settings.split_whitespace() {
651 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
652 let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
653 "parsing preserve settings: {obj_settings}, type: {obj_type}"
654 ))?;
655 let obj_type = match obj_type {
656 "f" | "file" => ObjType::File,
657 "d" | "dir" | "directory" => ObjType::Dir,
658 "l" | "link" | "symlink" => ObjType::Symlink,
659 "o" | "other" => ObjType::Other,
660 _ => {
661 return Err(anyhow!("Unknown obj type: {}", obj_type));
662 }
663 };
664 cmp_settings[obj_type] = obj_cmp_settings;
665 } else {
666 return Err(anyhow!("Invalid preserve settings: {}", settings));
667 }
668 }
669 Ok(cmp_settings)
670}
671
672pub async fn cmp(
673 src: &std::path::Path,
674 dst: &std::path::Path,
675 log: &cmp::LogWriter,
676 settings: &cmp::Settings,
677) -> Result<cmp::Summary, anyhow::Error> {
678 cmp::cmp(&PROGRESS, src, dst, log, settings).await
679}
680
681pub async fn copy(
682 src: &std::path::Path,
683 dst: &std::path::Path,
684 settings: ©::Settings,
685 preserve: &preserve::Settings,
686) -> Result<copy::Summary, copy::Error> {
687 copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
688}
689
690pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
691 rm::rm(&PROGRESS, path, settings).await
692}
693
694pub async fn link(
695 src: &std::path::Path,
696 dst: &std::path::Path,
697 update: &Option<std::path::PathBuf>,
698 settings: &link::Settings,
699) -> Result<link::Summary, link::Error> {
700 let cwd = std::env::current_dir()
701 .with_context(|| "failed to get current working directory")
702 .map_err(|err| link::Error::new(err, link::Summary::default()))?;
703 link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
704}
705
706fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
707 match std::env::var(name) {
708 Ok(val) => match val.parse() {
709 Ok(val) => val,
710 Err(_) => default,
711 },
712 Err(_) => default,
713 }
714}
715
716#[must_use]
718pub fn collect_runtime_stats() -> RuntimeStats {
719 collect_runtime_stats_inner(procfs::process::Process::myself().ok())
720}
721
722fn collect_runtime_stats_inner(process: Option<procfs::process::Process>) -> RuntimeStats {
723 let Some(process) = process else {
724 return RuntimeStats::default();
725 };
726 collect_runtime_stats_for_process(&process).unwrap_or_default()
727}
728
729fn collect_runtime_stats_for_process(
730 process: &procfs::process::Process,
731) -> anyhow::Result<RuntimeStats> {
732 let stat = process.stat()?;
733 let clock_ticks = procfs::ticks_per_second() as f64;
734 let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
736 Ok(RuntimeStats {
737 cpu_time_user_ms: ((stat.utime as f64 / clock_ticks) * 1000.0) as u64,
738 cpu_time_kernel_ms: ((stat.stime as f64 / clock_ticks) * 1000.0) as u64,
739 peak_rss_bytes: vmhwm_kb * 1024,
740 })
741}
742
743fn print_runtime_stats_for_role(prefix: &str, stats: &RuntimeStats) {
744 let cpu_total =
745 std::time::Duration::from_millis(stats.cpu_time_user_ms + stats.cpu_time_kernel_ms);
746 let cpu_kernel = std::time::Duration::from_millis(stats.cpu_time_kernel_ms);
747 let cpu_user = std::time::Duration::from_millis(stats.cpu_time_user_ms);
748 println!(
749 "{prefix}cpu time : {:.2?} | k: {:.2?} | u: {:.2?}",
750 cpu_total, cpu_kernel, cpu_user
751 );
752 println!(
753 "{prefix}peak RSS : {}",
754 bytesize::ByteSize(stats.peak_rss_bytes)
755 );
756}
757
758#[rustfmt::skip]
759fn print_runtime_stats() -> Result<(), anyhow::Error> {
760 let remote_stats = REMOTE_RUNTIME_STATS.lock().unwrap().take();
762 if let Some(remote) = remote_stats {
763 println!("walltime : {:.2?}", &PROGRESS.get_duration());
765 println!();
766 let source_is_local = is_localhost(&remote.source_host);
767 let dest_is_local = is_localhost(&remote.dest_host);
768 let master_stats = collect_runtime_stats();
770 if !source_is_local {
772 println!("SOURCE ({}):", remote.source_host);
773 print_runtime_stats_for_role(" ", &remote.source_stats);
774 println!();
775 }
776 if !dest_is_local {
777 println!("DESTINATION ({}):", remote.dest_host);
778 print_runtime_stats_for_role(" ", &remote.dest_stats);
779 println!();
780 }
781 match (source_is_local, dest_is_local) {
783 (true, true) => {
784 println!("MASTER + SOURCE + DESTINATION (localhost):");
785 print_runtime_stats_for_role(" master ", &master_stats);
786 print_runtime_stats_for_role(" source ", &remote.source_stats);
787 print_runtime_stats_for_role(" dest ", &remote.dest_stats);
788 }
789 (true, false) => {
790 println!("MASTER + SOURCE (localhost):");
791 print_runtime_stats_for_role(" master ", &master_stats);
792 print_runtime_stats_for_role(" source ", &remote.source_stats);
793 }
794 (false, true) => {
795 println!("MASTER + DESTINATION (localhost):");
796 print_runtime_stats_for_role(" master ", &master_stats);
797 print_runtime_stats_for_role(" dest ", &remote.dest_stats);
798 }
799 (false, false) => {
800 println!("MASTER (localhost):");
801 print_runtime_stats_for_role(" ", &master_stats);
802 }
803 }
804 return Ok(());
805 }
806 let process = procfs::process::Process::myself()?;
808 let stat = process.stat()?;
809 let clock_ticks_per_second = procfs::ticks_per_second();
811 let ticks_to_duration = |ticks: u64| {
812 std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
813 };
814 let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
816 println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
817 println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
818 println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm_kb * 1024));
819 Ok(())
820}
821
822fn get_max_open_files() -> Result<u64, std::io::Error> {
823 let mut rlim = libc::rlimit {
824 rlim_cur: 0,
825 rlim_max: 0,
826 };
827 let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
829 if result == 0 {
830 Ok(rlim.rlim_cur)
831 } else {
832 Err(std::io::Error::last_os_error())
833 }
834}
835
836struct ProgWriter {}
837
838impl ProgWriter {
839 fn new() -> Self {
840 Self {}
841 }
842}
843
844impl std::io::Write for ProgWriter {
845 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
846 PBAR.suspend(|| std::io::stdout().write(buf))
847 }
848 fn flush(&mut self) -> std::io::Result<()> {
849 std::io::stdout().flush()
850 }
851}
852
853fn get_hostname() -> String {
854 nix::unistd::gethostname()
855 .ok()
856 .and_then(|os_str| os_str.into_string().ok())
857 .unwrap_or_else(|| "unknown".to_string())
858}
859
860#[must_use]
861pub fn generate_debug_log_filename(prefix: &str) -> String {
862 let now = chrono::Utc::now();
863 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
864 let process_id = std::process::id();
865 format!("{prefix}-{timestamp}-{process_id}")
866}
867
868#[must_use]
872pub fn generate_trace_filename(prefix: &str, identifier: &str, extension: &str) -> String {
873 let hostname = get_hostname();
874 let pid = std::process::id();
875 let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S");
876 format!("{prefix}-{identifier}-{hostname}-{pid}-{timestamp}.{extension}")
877}
878
879fn build_verbose_env_filter(verbose: u8) -> tracing_subscriber::EnvFilter {
883 let level_directive = match verbose {
884 0 => "error".parse().unwrap(),
885 1 => "info".parse().unwrap(),
886 2 => "debug".parse().unwrap(),
887 _ => "trace".parse().unwrap(),
888 };
889 tracing_subscriber::EnvFilter::from_default_env()
890 .add_directive(level_directive)
891 .add_directive("tokio=info".parse().unwrap())
892 .add_directive("runtime=info".parse().unwrap())
893 .add_directive("quinn=warn".parse().unwrap())
894 .add_directive("rustls=warn".parse().unwrap())
895 .add_directive("h2=warn".parse().unwrap())
896}
897
898fn build_profile_filter_str(profile_level: Option<&str>) -> String {
903 let level_str = profile_level.unwrap_or("trace");
904 let valid_levels = ["trace", "debug", "info", "warn", "error", "off"];
905 if !valid_levels.contains(&level_str.to_lowercase().as_str()) {
906 eprintln!(
907 "Invalid --profile-level '{level_str}'. Valid values: trace, debug, info, warn, error, off"
908 );
909 std::process::exit(1);
910 }
911 format!("tokio=off,quinn=off,h2=off,hyper=off,rustls=off,{level_str}")
912}
913
914#[allow(dead_code)] struct TracingGuards {
919 chrome: Option<tracing_chrome::FlushGuard>,
920 flame: Option<tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>>,
921}
922
923fn install_tracing_subscriber(
929 quiet: bool,
930 verbose: u8,
931 tracing_config: TracingConfig,
932) -> TracingGuards {
933 if quiet {
934 assert!(
935 verbose == 0,
936 "Quiet mode and verbose mode are mutually exclusive"
937 );
938 return TracingGuards {
939 chrome: None,
940 flame: None,
941 };
942 }
943 let TracingConfig {
944 remote_layer: remote_tracing_layer,
945 debug_log_file,
946 chrome_trace_prefix,
947 flamegraph_prefix,
948 trace_identifier,
949 profile_level,
950 tokio_console,
951 tokio_console_port,
952 } = tracing_config;
953 let file_layer = debug_log_file.map(|log_file_path| {
954 let file = std::fs::OpenOptions::new()
955 .create(true)
956 .append(true)
957 .open(&log_file_path)
958 .unwrap_or_else(|e| {
959 panic!("Failed to create debug log file at '{log_file_path}': {e}")
960 });
961 tracing_subscriber::fmt::layer()
962 .with_target(true)
963 .with_line_number(true)
964 .with_thread_ids(true)
965 .with_timer(LocalTimeFormatter)
966 .with_ansi(false)
967 .with_writer(file)
968 .with_filter(build_verbose_env_filter(verbose))
969 });
970 let fmt_layer = if remote_tracing_layer.is_some() {
972 None
973 } else {
974 Some(
975 tracing_subscriber::fmt::layer()
976 .with_target(true)
977 .with_line_number(true)
978 .with_span_events(if verbose > 2 {
979 FmtSpan::NEW | FmtSpan::CLOSE
980 } else {
981 FmtSpan::NONE
982 })
983 .with_timer(LocalTimeFormatter)
984 .pretty()
985 .with_writer(ProgWriter::new)
986 .with_filter(build_verbose_env_filter(verbose)),
987 )
988 };
989 let remote_tracing_layer =
991 remote_tracing_layer.map(|layer| layer.with_filter(build_verbose_env_filter(verbose)));
992 let console_layer = tokio_console.then(|| {
993 let console_port = tokio_console_port.unwrap_or(6669);
994 let retention_seconds: u64 =
995 read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
996 eprintln!("Tokio console server listening on 127.0.0.1:{console_port}");
997 console_subscriber::ConsoleLayer::builder()
998 .retention(std::time::Duration::from_secs(retention_seconds))
999 .server_addr(([127, 0, 0, 1], console_port))
1000 .spawn()
1001 });
1002 let profile_filter_str = (chrome_trace_prefix.is_some() || flamegraph_prefix.is_some())
1005 .then(|| build_profile_filter_str(profile_level.as_deref()));
1006 let make_profile_filter =
1007 || tracing_subscriber::EnvFilter::new(profile_filter_str.as_ref().unwrap());
1008 let mut chrome_guard = None;
1009 let chrome_layer = chrome_trace_prefix.as_ref().map(|prefix| {
1010 let filename = generate_trace_filename(prefix, &trace_identifier, "json");
1011 eprintln!("Chrome trace will be written to: {filename}");
1012 let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
1013 .file(&filename)
1014 .include_args(true)
1015 .build();
1016 chrome_guard = Some(guard);
1017 layer.with_filter(make_profile_filter())
1018 });
1019 let mut flame_guard = None;
1020 let flame_layer = flamegraph_prefix.as_ref().and_then(|prefix| {
1021 let filename = generate_trace_filename(prefix, &trace_identifier, "folded");
1022 eprintln!("Flamegraph data will be written to: {filename}");
1023 match tracing_flame::FlameLayer::with_file(&filename) {
1024 Ok((layer, guard)) => {
1025 flame_guard = Some(guard);
1026 Some(layer.with_filter(make_profile_filter()))
1027 }
1028 Err(e) => {
1029 eprintln!("Failed to create flamegraph layer: {e}");
1030 None
1031 }
1032 }
1033 });
1034 tracing_subscriber::registry()
1035 .with(file_layer)
1036 .with(fmt_layer)
1037 .with(remote_tracing_layer)
1038 .with(console_layer)
1039 .with(chrome_layer)
1040 .with(flame_layer)
1041 .init();
1042 TracingGuards {
1043 chrome: chrome_guard,
1044 flame: flame_guard,
1045 }
1046}
1047
1048fn build_tokio_runtime(
1052 runtime: &RuntimeConfig,
1053 throttle: &ThrottleConfig,
1054) -> tokio::runtime::Runtime {
1055 let mut builder = tokio::runtime::Builder::new_multi_thread();
1056 builder.enable_all();
1057 if runtime.max_workers > 0 {
1058 builder.worker_threads(runtime.max_workers);
1059 }
1060 if runtime.max_blocking_threads > 0 {
1061 builder.max_blocking_threads(runtime.max_blocking_threads);
1062 }
1063 if !sysinfo::set_open_files_limit(usize::MAX) {
1064 tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
1065 }
1066 let set_max_open_files = throttle.max_open_files.unwrap_or_else(|| {
1067 let limit = get_max_open_files().expect(
1068 "We failed to query rlimit, if this is expected try specifying --max-open-files",
1069 ) as usize;
1070 std::cmp::min(limit / 10 * 8, 4096)
1073 });
1074 if set_max_open_files > 0 {
1075 tracing::info!("Setting max open files to: {}", set_max_open_files);
1076 throttle::set_max_open_files(set_max_open_files);
1077 } else {
1078 tracing::info!("Not applying any limit to max open files!");
1079 }
1080 builder.build().expect("Failed to create runtime")
1081}
1082
1083fn spawn_throttle_replenishers(
1097 runtime: &tokio::runtime::Runtime,
1098 throttle: &ThrottleConfig,
1099 trace_identifier: &str,
1100) {
1101 fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
1102 let mut replenish = replenish;
1103 let mut interval = std::time::Duration::from_secs(1);
1104 while replenish > 100 && interval > std::time::Duration::from_millis(1) {
1105 replenish /= 10;
1106 interval /= 10;
1107 }
1108 (replenish, interval)
1109 }
1110 let auto_meta_on = throttle.auto_meta.is_some();
1111 if auto_meta_on {
1112 let interval = std::time::Duration::from_millis(100);
1117 let initial_replenish = (throttle.ops_throttle as f64 * 0.1) as usize;
1118 throttle::init_ops_tokens(initial_replenish.max(1));
1119 if throttle.ops_throttle == 0 {
1120 throttle::disable_ops_throttle();
1121 }
1122 runtime.spawn(throttle::run_ops_replenish_thread(
1123 initial_replenish,
1124 interval,
1125 ));
1126 } else if throttle.ops_throttle > 0 {
1127 let (replenish, interval) = get_replenish_interval(throttle.ops_throttle);
1128 throttle::init_ops_tokens(replenish);
1129 runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
1130 }
1131 if throttle.iops_throttle > 0 {
1132 let (replenish, interval) = get_replenish_interval(throttle.iops_throttle);
1133 throttle::init_iops_tokens(replenish);
1134 runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
1135 }
1136 if let Some(auto) = throttle.auto_meta {
1137 spawn_auto_meta_throttle(
1138 runtime,
1139 auto,
1140 throttle.histogram_enabled,
1141 throttle.histogram_log_path.clone(),
1142 throttle.histogram_interval,
1143 trace_identifier,
1144 );
1145 }
1146}
1147
1148fn resolve_log_path(path: &std::path::Path, trace_identifier: &str) -> std::path::PathBuf {
1161 let parent = match path.parent() {
1162 Some(p) if p.as_os_str().is_empty() => std::path::Path::new("."),
1163 Some(p) => p,
1164 None => std::path::Path::new("."),
1165 };
1166 let mut name: std::ffi::OsString = path
1167 .file_stem()
1168 .map(|s| s.to_os_string())
1169 .unwrap_or_else(|| std::ffi::OsString::from("auto-meta"));
1170 name.push(".");
1171 name.push(trace_identifier);
1172 name.push(".");
1173 match path.extension() {
1174 Some(e) => name.push(e),
1175 None => name.push("hdr"),
1176 }
1177 parent.join(name)
1178}
1179
1180fn validate_histogram_log_target(
1191 throttle: &ThrottleConfig,
1192 trace_identifier: &str,
1193) -> Result<(), String> {
1194 let Some(path) = &throttle.histogram_log_path else {
1195 return Ok(());
1196 };
1197 if path.is_dir() {
1198 return Err(format!(
1199 "--auto-meta-histogram-log {path:?} is a directory; expected a file path",
1200 ));
1201 }
1202 if path.file_name().is_none() {
1203 return Err(format!(
1204 "--auto-meta-histogram-log {path:?} has no filename component",
1205 ));
1206 }
1207 let resolved = resolve_log_path(path, trace_identifier);
1208 let mut open_options = std::fs::OpenOptions::new();
1209 open_options.create(true).write(true).truncate(true);
1210 #[cfg(unix)]
1211 {
1212 use std::os::unix::fs::OpenOptionsExt;
1213 open_options.custom_flags(libc::O_NOFOLLOW);
1214 }
1215 match open_options.open(&resolved) {
1216 Ok(_) => Ok(()),
1217 Err(err) => {
1218 #[cfg(unix)]
1221 let context = if err.raw_os_error() == Some(libc::ELOOP) {
1222 " (resolved path is a symlink, which would let a local attacker hijack the write)"
1223 } else {
1224 ""
1225 };
1226 #[cfg(not(unix))]
1227 let context = "";
1228 Err(format!(
1229 "--auto-meta-histogram-log cannot create resolved path {resolved:?}: {err:#}{context}",
1230 ))
1231 }
1232 }
1233}
1234
1235const fn unit_label(side: congestion::Side, op: congestion::MetadataOp) -> &'static str {
1257 use congestion::MetadataOp::*;
1258 use congestion::Side::*;
1259 match (side, op) {
1260 (Source, Stat) => "src-stat",
1262 (Destination, Stat) => "dst-stat",
1263 (Source, ReadLink) => "src-read-link",
1264 (Destination, ReadLink) => "dst-read-link",
1265 (Destination, MkDir) => "mkdir",
1271 (Source, MkDir) => "src-mkdir",
1272 (Destination, RmDir) => "rmdir",
1273 (Source, RmDir) => "src-rmdir",
1274 (Destination, Unlink) => "unlink",
1275 (Source, Unlink) => "src-unlink",
1276 (Destination, HardLink) => "hard-link",
1277 (Source, HardLink) => "src-hard-link",
1278 (Destination, Symlink) => "symlink",
1279 (Source, Symlink) => "src-symlink",
1280 (Destination, Chmod) => "chmod",
1281 (Source, Chmod) => "src-chmod",
1282 (Destination, OpenCreate) => "open-create",
1283 (Source, OpenCreate) => "src-open-create",
1284 }
1285}
1286
1287fn build_histogram_header(
1288 auto: &AutoMetaThrottleConfig,
1289 tool_name: &str,
1290 snapshot_interval: std::time::Duration,
1291) -> congestion::format::LogHeader {
1292 use congestion::format::{AutoMetaSnapshot, HdrSnapshot, LogHeader, UnitLabel};
1293 let hostname = nix::unistd::gethostname()
1294 .ok()
1295 .and_then(|h| h.into_string().ok())
1296 .unwrap_or_else(|| "unknown".into());
1297 let mut unit_labels = Vec::with_capacity(congestion::N_META_RESOURCES);
1298 for &side in &congestion::Side::ALL {
1299 for &op in &congestion::MetadataOp::ALL {
1300 unit_labels.push(UnitLabel {
1301 side: side as u8,
1302 op: op as u8,
1303 label: unit_label(side, op).to_string(),
1304 });
1305 }
1306 }
1307 LogHeader {
1308 format_version: congestion::format::FORMAT_VERSION,
1309 tool: tool_name.to_string(),
1310 tool_version: env!("CARGO_PKG_VERSION").to_string(),
1311 hostname,
1312 pid: std::process::id(),
1313 start_unix_micros: std::time::SystemTime::now()
1314 .duration_since(std::time::UNIX_EPOCH)
1315 .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
1316 .unwrap_or(0),
1317 snapshot_interval_micros: u64::try_from(snapshot_interval.as_micros()).unwrap_or(u64::MAX),
1318 auto_meta: AutoMetaSnapshot {
1319 initial_cwnd: auto.initial_cwnd,
1320 min_cwnd: auto.min_cwnd,
1321 max_cwnd: auto.max_cwnd,
1322 alpha: auto.alpha,
1323 beta: auto.beta,
1324 increase_step: auto.increase_step,
1325 decrease_step: auto.decrease_step,
1326 baseline_percentile: auto.baseline_percentile,
1327 current_percentile: auto.current_percentile,
1328 long_window_micros: u64::try_from(auto.long_window.as_micros()).unwrap_or(u64::MAX),
1329 short_window_micros: u64::try_from(auto.short_window.as_micros()).unwrap_or(u64::MAX),
1330 tick_interval_micros: u64::try_from(auto.tick_interval.as_micros()).unwrap_or(u64::MAX),
1331 },
1332 hdr: HdrSnapshot {
1333 lowest_discernible_micros: congestion::HDR_LOWEST_DISCERNIBLE_MICROS,
1334 highest_trackable_micros: congestion::HDR_HIGHEST_TRACKABLE_MICROS,
1335 significant_figures: congestion::HDR_SIGNIFICANT_FIGURES,
1336 unit: "microseconds".into(),
1337 },
1338 unit_labels,
1339 }
1340}
1341
1342fn render_panel_from_registry() -> String {
1343 let entries = observability::registered_histograms();
1344 if entries.is_empty() {
1345 return String::new();
1346 }
1347 let snapshots: Vec<hdrhistogram::Histogram<u64>> = entries
1348 .iter()
1349 .map(|e| (*e.snapshot_rx.borrow()).clone())
1350 .collect();
1351 let units: Vec<histogram_panel::PanelUnit> = entries
1352 .iter()
1353 .zip(snapshots.iter())
1354 .map(|(e, snap)| histogram_panel::PanelUnit {
1355 label: e.label,
1356 histogram: snap,
1357 interval: e.interval,
1358 })
1359 .collect();
1360 histogram_panel::render_histogram_panel(&units)
1361}
1362
1363fn spawn_auto_meta_throttle(
1387 runtime: &tokio::runtime::Runtime,
1388 auto: AutoMetaThrottleConfig,
1389 histogram_enabled: bool,
1390 histogram_log_path: Option<std::path::PathBuf>,
1391 histogram_interval: std::time::Duration,
1392 trace_identifier: &str,
1393) {
1394 let initial_cwnd = auto
1395 .initial_cwnd
1396 .clamp(auto.min_cwnd.max(1), auto.max_cwnd.max(1));
1397 let histogram_active = histogram_enabled || histogram_log_path.is_some();
1398 let resolved_log_path = histogram_log_path
1402 .as_ref()
1403 .map(|p| resolve_log_path(p, trace_identifier));
1404
1405 let mut builder = congestion::RoutingSinkBuilder::new();
1408 struct Slot {
1409 label: &'static str,
1410 side: congestion::Side,
1411 op: congestion::MetadataOp,
1412 sample_rx: tokio::sync::mpsc::Receiver<congestion::Sample>,
1413 apply_rate: bool,
1414 accumulator: Option<std::sync::Arc<std::sync::Mutex<congestion::HistogramAccumulator>>>,
1415 }
1416 let mut slots: Vec<Slot> = Vec::with_capacity(congestion::N_META_RESOURCES);
1417 for &side in &congestion::Side::ALL {
1418 for &op in &congestion::MetadataOp::ALL {
1419 let resource = walk::meta_resource(side, op);
1420 throttle::set_max_ops_in_flight(resource, initial_cwnd as usize);
1421 let rx = builder.metadata_receiver(side, op);
1422 let apply_rate = matches!(
1423 (side, op),
1424 (congestion::Side::Destination, congestion::MetadataOp::Stat),
1425 );
1426 let accumulator = if histogram_active {
1427 let acc = std::sync::Arc::new(std::sync::Mutex::new(
1428 congestion::HistogramAccumulator::new(),
1429 ));
1430 builder.metadata_histogram(side, op, acc.clone());
1431 Some(acc)
1432 } else {
1433 None
1434 };
1435 slots.push(Slot {
1436 label: unit_label(side, op),
1437 side,
1438 op,
1439 sample_rx: rx,
1440 apply_rate,
1441 accumulator,
1442 });
1443 }
1444 }
1445 let sink = std::sync::Arc::new(builder.build());
1446 congestion::install_sample_sink(sink.clone());
1447
1448 let mut logger_units: Vec<histogram_logger::LoggerUnit> = Vec::new();
1451 for slot in slots {
1452 let controller = congestion::RatioController::new(congestion::RatioConfig {
1453 initial_cwnd: auto.initial_cwnd,
1454 min_cwnd: auto.min_cwnd,
1455 max_cwnd: auto.max_cwnd,
1456 alpha: auto.alpha,
1457 beta: auto.beta,
1458 increase_step: auto.increase_step,
1459 decrease_step: auto.decrease_step,
1460 baseline_percentile: auto.baseline_percentile,
1461 current_percentile: auto.current_percentile,
1462 long_window: auto.long_window,
1463 short_window: auto.short_window,
1464 });
1465 let (unit, decision_rx, snapshot_rx) = congestion::ControlUnit::new(
1466 slot.label,
1467 controller,
1468 slot.sample_rx,
1469 auto.tick_interval,
1470 );
1471 observability::register_unit(slot.label, snapshot_rx);
1472 if let Some(acc) = slot.accumulator.as_ref() {
1473 let (snap_tx, snap_rx) = tokio::sync::watch::channel(
1474 hdrhistogram::Histogram::<u64>::new_with_bounds(
1475 congestion::HDR_LOWEST_DISCERNIBLE_MICROS,
1476 congestion::HDR_HIGHEST_TRACKABLE_MICROS,
1477 congestion::HDR_SIGNIFICANT_FIGURES,
1478 )
1479 .expect("histogram bounds valid"),
1480 );
1481 observability::register_histogram(slot.label, snap_rx, histogram_interval);
1482 logger_units.push(histogram_logger::LoggerUnit {
1483 label: slot.label,
1484 side: slot.side,
1485 op: slot.op,
1486 accumulator: acc.clone(),
1487 snapshot_tx: snap_tx,
1488 });
1489 }
1490 runtime.spawn(unit.run());
1491 runtime.spawn(auto_meta::run_adapter(
1492 walk::meta_resource(slot.side, slot.op),
1493 slot.apply_rate,
1494 decision_rx,
1495 sink.clone(),
1496 ));
1497 }
1498
1499 if histogram_active {
1500 let header = build_histogram_header(&auto, trace_identifier, histogram_interval);
1501 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1502 store_logger_cancel(cancel_tx);
1503 let progress_source: histogram_logger::ProgressSource = Box::new(|| {
1511 let snapshot = progress::SerializableProgress::from(&*PROGRESS);
1512 serde_json::to_vec(&snapshot).unwrap_or_else(|err| {
1513 tracing::warn!(
1514 "histogram-logger: SerializableProgress JSON encode failed: {err:#}; \
1515 dropping this tick's progress record"
1516 );
1517 Vec::new()
1518 })
1519 });
1520 let handle = runtime.spawn(histogram_logger::run_logger(
1521 histogram_logger::LoggerConfig {
1522 interval: histogram_interval,
1523 log_path: resolved_log_path,
1524 header,
1525 progress_source: Some(progress_source),
1526 },
1527 logger_units,
1528 cancel_rx,
1529 ));
1530 store_logger_handle(handle);
1531 }
1532
1533 tracing::info!(
1534 "auto-meta-throttle enabled (per-(side, op) controllers, {} total): \
1535 initial_cwnd={}, max_cwnd={}, alpha={}, beta={}, \
1536 baseline_percentile={}, current_percentile={}, \
1537 long_window={:?}, short_window={:?}, tick={:?}, \
1538 histograms={}",
1539 congestion::N_META_RESOURCES,
1540 auto.initial_cwnd,
1541 auto.max_cwnd,
1542 auto.alpha,
1543 auto.beta,
1544 auto.baseline_percentile,
1545 auto.current_percentile,
1546 auto.long_window,
1547 auto.short_window,
1548 auto.tick_interval,
1549 histogram_active,
1550 );
1551}
1552
1553#[instrument(skip(func))] pub fn run<Fut, Summary, Error>(
1555 progress: Option<ProgressSettings>,
1556 output: OutputConfig,
1557 runtime_config: RuntimeConfig,
1558 throttle_config: ThrottleConfig,
1559 tracing_config: TracingConfig,
1560 func: impl FnOnce() -> Fut,
1561) -> Option<Summary>
1562where
1564 Summary: std::fmt::Display,
1565 Error: std::fmt::Display + std::fmt::Debug,
1566 Fut: std::future::Future<Output = Result<Summary, Error>>,
1567{
1568 let _ = get_progress();
1572 if let Err(e) = throttle_config.validate() {
1573 eprintln!("Configuration error: {e}");
1574 return None;
1575 }
1576 let OutputConfig {
1577 quiet,
1578 verbose,
1579 print_summary,
1580 suppress_runtime_stats,
1581 } = output;
1582 let trace_identifier = tracing_config.trace_identifier.clone();
1585 if let Err(e) = validate_histogram_log_target(&throttle_config, &trace_identifier) {
1586 eprintln!("Configuration error: {e}");
1587 return None;
1588 }
1589 let _tracing_guards = install_tracing_subscriber(quiet, verbose, tracing_config);
1590 let res = {
1591 let runtime = build_tokio_runtime(&runtime_config, &throttle_config);
1592 spawn_throttle_replenishers(&runtime, &throttle_config, &trace_identifier);
1593 let res = {
1594 let _progress_tracker = progress.map(|settings| {
1595 tracing::debug!("Requesting progress updates {settings:?}");
1596 let delay = settings.progress_delay.map(|delay_str| {
1597 humantime::parse_duration(&delay_str)
1598 .expect("Couldn't parse duration out of --progress-delay")
1599 });
1600 ProgressTracker::new(settings.progress_type, delay)
1601 });
1602 runtime.block_on(func())
1603 };
1604 match &res {
1605 Ok(summary) => {
1606 if print_summary || verbose > 0 {
1607 println!("{summary}");
1608 }
1609 }
1610 Err(err) => {
1611 if !quiet {
1612 println!("{err:?}");
1613 }
1614 }
1615 }
1616 if (print_summary || verbose > 0)
1617 && !suppress_runtime_stats
1618 && let Err(err) = print_runtime_stats()
1619 {
1620 println!("Failed to print runtime stats: {err:?}");
1621 }
1622 signal_logger_cancel();
1626 if let Some(handle) = take_logger_handle() {
1627 let _ = runtime.block_on(async {
1630 tokio::time::timeout(std::time::Duration::from_secs(1), handle).await
1631 });
1632 }
1633 res
1634 };
1637 reset_process_throttle_state();
1642 res.ok()
1643}
1644
1645fn reset_process_throttle_state() {
1651 congestion::clear_sample_sink();
1652 observability::clear();
1653 for &side in &throttle::Side::ALL {
1654 for &op in &throttle::MetadataOp::ALL {
1655 throttle::set_max_ops_in_flight(throttle::Resource::meta(side, op), 0);
1656 }
1657 }
1658 throttle::disable_ops_throttle();
1659 throttle::set_max_open_files(0);
1666 throttle::init_iops_tokens(0);
1667}
1668
1669#[cfg(test)]
1670mod unit_label_tests {
1671 use super::unit_label;
1672 use congestion::{MetadataOp, Side};
1673
1674 #[test]
1675 fn lookup_ops_carry_side_prefix() {
1676 assert_eq!(unit_label(Side::Source, MetadataOp::Stat), "src-stat");
1678 assert_eq!(unit_label(Side::Destination, MetadataOp::Stat), "dst-stat");
1679 assert_eq!(
1680 unit_label(Side::Source, MetadataOp::ReadLink),
1681 "src-read-link",
1682 );
1683 assert_eq!(
1684 unit_label(Side::Destination, MetadataOp::ReadLink),
1685 "dst-read-link",
1686 );
1687 }
1688
1689 #[test]
1690 fn destination_only_ops_drop_prefix() {
1691 assert_eq!(unit_label(Side::Destination, MetadataOp::MkDir), "mkdir");
1695 assert_eq!(unit_label(Side::Destination, MetadataOp::RmDir), "rmdir");
1696 assert_eq!(unit_label(Side::Destination, MetadataOp::Unlink), "unlink");
1697 assert_eq!(
1698 unit_label(Side::Destination, MetadataOp::HardLink),
1699 "hard-link",
1700 );
1701 assert_eq!(
1702 unit_label(Side::Destination, MetadataOp::Symlink),
1703 "symlink"
1704 );
1705 assert_eq!(unit_label(Side::Destination, MetadataOp::Chmod), "chmod");
1706 assert_eq!(
1707 unit_label(Side::Destination, MetadataOp::OpenCreate),
1708 "open-create",
1709 );
1710 }
1711
1712 #[test]
1713 fn unused_source_side_mutation_slots_keep_src_prefix() {
1714 assert_eq!(unit_label(Side::Source, MetadataOp::Unlink), "src-unlink");
1720 assert_eq!(unit_label(Side::Source, MetadataOp::MkDir), "src-mkdir");
1721 }
1722
1723 #[test]
1724 fn labels_are_unique_across_all_resources() {
1725 let mut seen = std::collections::HashSet::new();
1729 for &side in &Side::ALL {
1730 for &op in &MetadataOp::ALL {
1731 let label = unit_label(side, op);
1732 assert!(seen.insert(label), "duplicate label: {label}");
1733 }
1734 }
1735 assert_eq!(seen.len(), congestion::N_META_RESOURCES);
1736 }
1737}
1738
1739#[cfg(test)]
1740mod runtime_stats_tests {
1741 use super::*;
1742 use anyhow::Result;
1743
1744 #[test]
1745 fn collect_runtime_stats_matches_procfs_snapshot() -> Result<()> {
1746 let process = procfs::process::Process::myself()?;
1747 let expected = collect_runtime_stats_for_process(&process)?;
1748 let actual = collect_runtime_stats();
1749 let cpu_tolerance_ms = 50;
1750 let rss_tolerance_bytes = 1_000_000;
1751 assert!(
1752 expected.cpu_time_user_ms.abs_diff(actual.cpu_time_user_ms) <= cpu_tolerance_ms,
1753 "user CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1754 expected.cpu_time_user_ms,
1755 actual.cpu_time_user_ms
1756 );
1757 assert!(
1758 expected
1759 .cpu_time_kernel_ms
1760 .abs_diff(actual.cpu_time_kernel_ms)
1761 <= cpu_tolerance_ms,
1762 "kernel CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1763 expected.cpu_time_kernel_ms,
1764 actual.cpu_time_kernel_ms
1765 );
1766 assert!(
1767 expected.peak_rss_bytes.abs_diff(actual.peak_rss_bytes) <= rss_tolerance_bytes,
1768 "peak RSS deviated by more than {rss_tolerance_bytes} bytes: expected {}, got {}",
1769 expected.peak_rss_bytes,
1770 actual.peak_rss_bytes
1771 );
1772 Ok(())
1773 }
1774
1775 #[test]
1776 fn collect_runtime_stats_returns_default_on_error() {
1777 let stats = collect_runtime_stats_inner(None);
1778 assert_eq!(stats, RuntimeStats::default());
1779
1780 let nonexistent_process = procfs::process::Process::new(i32::MAX).ok();
1781 let stats = collect_runtime_stats_inner(nonexistent_process);
1782 assert_eq!(stats, RuntimeStats::default());
1783 }
1784}
1785
1786#[cfg(test)]
1787mod parse_preserve_settings_tests {
1788 use super::*;
1789 #[test]
1790 fn preset_all_returns_preserve_all() {
1791 let settings = parse_preserve_settings("all").unwrap();
1792 let expected = preserve::preserve_all();
1793 assert_eq!(settings.file.mode_mask, expected.file.mode_mask);
1794 assert!(settings.file.user_and_time.uid);
1795 assert!(settings.file.user_and_time.gid);
1796 assert!(settings.file.user_and_time.time);
1797 assert_eq!(settings.dir.mode_mask, expected.dir.mode_mask);
1798 assert!(settings.dir.user_and_time.uid);
1799 assert!(settings.dir.user_and_time.gid);
1800 assert!(settings.dir.user_and_time.time);
1801 assert!(settings.symlink.user_and_time.uid);
1802 assert!(settings.symlink.user_and_time.gid);
1803 assert!(settings.symlink.user_and_time.time);
1804 }
1805 #[test]
1806 fn preset_none_returns_preserve_none() {
1807 let settings = parse_preserve_settings("none").unwrap();
1808 let expected = preserve::preserve_none();
1809 assert_eq!(settings.file.mode_mask, expected.file.mode_mask);
1810 assert!(!settings.file.user_and_time.uid);
1811 assert!(!settings.file.user_and_time.gid);
1812 assert!(!settings.file.user_and_time.time);
1813 assert_eq!(settings.dir.mode_mask, expected.dir.mode_mask);
1814 assert!(!settings.dir.user_and_time.uid);
1815 assert!(!settings.dir.user_and_time.gid);
1816 assert!(!settings.dir.user_and_time.time);
1817 assert!(!settings.symlink.user_and_time.uid);
1818 assert!(!settings.symlink.user_and_time.gid);
1819 assert!(!settings.symlink.user_and_time.time);
1820 }
1821 #[test]
1822 fn per_type_settings_still_work() {
1823 let settings = parse_preserve_settings("f:uid,time,0777 d:gid").unwrap();
1824 assert!(settings.file.user_and_time.uid);
1825 assert!(settings.file.user_and_time.time);
1826 assert!(!settings.file.user_and_time.gid);
1827 assert_eq!(settings.file.mode_mask, 0o777);
1828 assert!(!settings.dir.user_and_time.uid);
1829 assert!(settings.dir.user_and_time.gid);
1830 assert!(!settings.dir.user_and_time.time);
1831 }
1832 #[test]
1833 fn invalid_settings_returns_error() {
1834 assert!(parse_preserve_settings("invalid").is_err());
1835 assert!(parse_preserve_settings("f:unknown_attr").is_err());
1836 }
1837}
1838
1839#[cfg(test)]
1840mod validate_update_compare_vs_preserve_tests {
1841 use super::*;
1842 #[test]
1843 fn detects_mtime_mismatch() {
1844 let compare = filecmp::MetadataCmpSettings {
1845 mtime: true,
1846 ..Default::default()
1847 };
1848 let preserve = preserve::preserve_none();
1849 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1850 assert!(result.is_err());
1851 assert!(result.unwrap_err().contains("mtime"));
1852 }
1853 #[test]
1854 fn detects_uid_mismatch() {
1855 let compare = filecmp::MetadataCmpSettings {
1856 uid: true,
1857 ..Default::default()
1858 };
1859 let preserve = preserve::preserve_none();
1860 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1861 assert!(result.is_err());
1862 assert!(result.unwrap_err().contains("uid"));
1863 }
1864 #[test]
1865 fn detects_gid_mismatch() {
1866 let compare = filecmp::MetadataCmpSettings {
1867 gid: true,
1868 ..Default::default()
1869 };
1870 let preserve = preserve::preserve_none();
1871 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1872 assert!(result.is_err());
1873 assert!(result.unwrap_err().contains("gid"));
1874 }
1875 #[test]
1876 fn detects_mode_mismatch() {
1877 let compare = filecmp::MetadataCmpSettings {
1878 mode: true,
1879 ..Default::default()
1880 };
1881 let mut preserve = preserve::preserve_none();
1882 preserve.file.mode_mask = 0;
1883 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1884 assert!(result.is_err());
1885 assert!(result.unwrap_err().contains("mode"));
1886 }
1887 #[test]
1888 fn detects_multiple_mismatches() {
1889 let compare = filecmp::MetadataCmpSettings {
1890 mtime: true,
1891 uid: true,
1892 gid: true,
1893 ..Default::default()
1894 };
1895 let preserve = preserve::preserve_none();
1896 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1897 assert!(result.is_err());
1898 let err = result.unwrap_err();
1899 assert!(err.contains("mtime"));
1900 assert!(err.contains("uid"));
1901 assert!(err.contains("gid"));
1902 }
1903 #[test]
1904 fn passes_when_preserve_covers_all_compared_attrs() {
1905 let compare = filecmp::MetadataCmpSettings {
1906 mtime: true,
1907 uid: true,
1908 gid: true,
1909 mode: true,
1910 size: true, ctime: true, };
1913 let preserve = preserve::preserve_all();
1914 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1915 assert!(result.is_ok());
1916 }
1917 #[test]
1918 fn fails_with_partial_mode_mask_when_mode_compared() {
1919 let compare = filecmp::MetadataCmpSettings {
1922 mode: true,
1923 ..Default::default()
1924 };
1925 let preserve = preserve::preserve_none();
1926 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1927 assert!(result.is_err());
1928 assert!(result.unwrap_err().contains("mode"));
1929 }
1930}
1931
1932#[cfg(test)]
1933mod resolve_log_path_tests {
1934 use super::*;
1935
1936 #[test]
1937 fn full_path_with_extension() {
1938 let p = std::path::Path::new("/tmp/foo.hdr");
1939 assert_eq!(
1940 resolve_log_path(p, "rcp"),
1941 std::path::PathBuf::from("/tmp/foo.rcp.hdr"),
1942 );
1943 }
1944
1945 #[test]
1946 fn bare_filename_resolves_to_current_dir() {
1947 let p = std::path::Path::new("foo.hdr");
1948 assert_eq!(
1949 resolve_log_path(p, "rcp"),
1950 std::path::PathBuf::from("./foo.rcp.hdr"),
1951 );
1952 }
1953
1954 #[test]
1955 fn no_extension_defaults_to_hdr() {
1956 let p = std::path::Path::new("/tmp/foo");
1957 assert_eq!(
1958 resolve_log_path(p, "rcp"),
1959 std::path::PathBuf::from("/tmp/foo.rcp.hdr"),
1960 );
1961 }
1962
1963 #[test]
1964 #[cfg(unix)]
1965 fn preserves_non_utf8_stem() {
1966 use std::os::unix::ffi::{OsStrExt, OsStringExt};
1967 let mut raw_name = vec![b'/', b't', b'm', b'p', b'/'];
1969 raw_name.extend_from_slice(&[0xFF, 0xFE]);
1970 raw_name.extend_from_slice(b".hdr");
1971 let p = std::path::PathBuf::from(std::ffi::OsString::from_vec(raw_name));
1972 let resolved = resolve_log_path(&p, "rcp");
1973 let bytes = resolved.as_os_str().as_bytes();
1976 assert!(
1977 bytes.windows(2).any(|w| w == [0xFF, 0xFE]),
1978 "non-UTF-8 bytes must survive resolution; got bytes: {bytes:?}",
1979 );
1980 assert!(
1981 bytes.ends_with(b".rcp.hdr"),
1982 "expected .rcp.hdr suffix; got bytes: {bytes:?}",
1983 );
1984 }
1985}
1986
1987#[cfg(test)]
1988mod validate_histogram_log_target_tests {
1989 use super::*;
1990
1991 fn throttle_with_log_path(path: Option<std::path::PathBuf>) -> ThrottleConfig {
1992 ThrottleConfig {
1993 histogram_enabled: path.is_some(),
1994 histogram_log_path: path,
1995 ..Default::default()
1996 }
1997 }
1998
1999 #[test]
2000 fn no_log_path_is_ok() {
2001 let throttle = throttle_with_log_path(None);
2002 assert!(validate_histogram_log_target(&throttle, "rcp").is_ok());
2003 }
2004
2005 #[test]
2006 fn writable_resolved_path_is_ok() {
2007 let dir = tempfile::tempdir().unwrap();
2008 let throttle = throttle_with_log_path(Some(dir.path().join("foo.hdr")));
2009 assert!(validate_histogram_log_target(&throttle, "rcp").is_ok());
2010 }
2011
2012 #[test]
2013 fn resolved_path_existing_as_directory_is_rejected() {
2014 let dir = tempfile::tempdir().unwrap();
2017 let blocker = dir.path().join("foo.rcp.hdr");
2018 std::fs::create_dir(&blocker).unwrap();
2019 let throttle = throttle_with_log_path(Some(dir.path().join("foo.hdr")));
2020 let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2021 assert!(
2022 err.contains("histogram-log") && err.contains("foo.rcp.hdr"),
2023 "got: {err}",
2024 );
2025 }
2026
2027 #[test]
2028 fn resolved_path_in_missing_parent_is_rejected() {
2029 let throttle = throttle_with_log_path(Some("/nonexistent-dir-67890/foo.hdr".into()));
2030 let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2031 assert!(err.contains("histogram-log"), "got: {err}");
2032 }
2033
2034 #[test]
2035 fn log_path_pointing_at_existing_directory_is_rejected() {
2036 let dir = tempfile::tempdir().unwrap();
2037 let throttle = throttle_with_log_path(Some(dir.path().to_path_buf()));
2038 let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2039 assert!(err.contains("directory"), "got: {err}");
2040 }
2041
2042 #[test]
2043 fn log_path_with_no_filename_is_rejected() {
2044 let throttle = throttle_with_log_path(Some(std::path::PathBuf::from("/")));
2046 let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2047 assert!(
2048 err.contains("filename") || err.contains("directory"),
2049 "got: {err}",
2050 );
2051 }
2052
2053 #[test]
2054 #[cfg(unix)]
2055 fn resolved_path_existing_as_symlink_is_rejected() {
2056 let dir = tempfile::tempdir().unwrap();
2060 let target = dir.path().join("victim.txt");
2064 std::fs::write(&target, b"do not clobber").unwrap();
2065 let resolved_path = dir.path().join("foo.rcp.hdr");
2066 std::os::unix::fs::symlink(&target, &resolved_path).unwrap();
2067 let throttle = throttle_with_log_path(Some(dir.path().join("foo.hdr")));
2068 let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2069 assert!(
2070 err.contains("symlink") || err.contains("ELOOP") || err.contains("Too many levels"),
2071 "got: {err}",
2072 );
2073 let preserved = std::fs::read(&target).unwrap();
2075 assert_eq!(preserved, b"do not clobber");
2076 }
2077}