1use crate::cmp::ObjType;
105use anyhow::Context;
106use anyhow::anyhow;
107use std::io::IsTerminal;
108use tracing::instrument;
109use tracing_subscriber::fmt::format::FmtSpan;
110use tracing_subscriber::prelude::*;
111
112mod auto_meta;
113pub mod chmod;
114pub mod cli;
115pub mod cmp;
116pub mod config;
117pub mod copy;
118pub mod delete;
119pub mod dry_run;
120pub mod error;
121pub mod error_collector;
122pub mod filegen;
123pub mod filter;
124pub mod histogram_logger;
125pub mod histogram_panel;
126pub mod link;
127pub mod observability;
128pub mod preserve;
129pub mod remote_tracing;
130pub mod rm;
131pub mod version;
132
133pub mod filecmp;
134pub mod progress;
135mod testutils;
136pub mod walk;
137
138pub use config::{
139 AutoMetaThrottleConfig, DryRunMode, DryRunWarnings, OutputConfig, RuntimeConfig,
140 ThrottleConfig, TracingConfig,
141};
142pub use congestion::{MetadataOp, Side};
147pub use progress::{RcpdProgressPrinter, SerializableProgress};
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
151pub enum RcpdType {
152 Source,
153 Destination,
154}
155
156impl std::fmt::Display for RcpdType {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 match self {
159 RcpdType::Source => write!(f, "source"),
160 RcpdType::Destination => write!(f, "destination"),
161 }
162 }
163}
164
165pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
167
168#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
170pub struct RuntimeStats {
171 pub cpu_time_user_ms: u64,
173 pub cpu_time_kernel_ms: u64,
175 pub peak_rss_bytes: u64,
177}
178
179#[derive(Debug, Default)]
181pub struct RemoteRuntimeStats {
182 pub source_host: String,
183 pub source_stats: RuntimeStats,
184 pub dest_host: String,
185 pub dest_stats: RuntimeStats,
186}
187
188#[must_use]
191pub fn is_localhost(host: &str) -> bool {
192 if host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]" {
193 return true;
194 }
195 let mut buf = [0u8; 256];
197 let result = unsafe { libc::gethostname(buf.as_mut_ptr() as *mut libc::c_char, buf.len()) };
199 if result == 0
200 && let Ok(hostname_cstr) = std::ffi::CStr::from_bytes_until_nul(&buf)
201 && let Ok(hostname) = hostname_cstr.to_str()
202 && host == hostname
203 {
204 return true;
205 }
206 false
207}
208
209static PROGRESS: std::sync::LazyLock<progress::Progress> =
210 std::sync::LazyLock::new(progress::Progress::new);
211static PBAR: std::sync::LazyLock<indicatif::ProgressBar> =
212 std::sync::LazyLock::new(indicatif::ProgressBar::new_spinner);
213static REMOTE_RUNTIME_STATS: std::sync::LazyLock<std::sync::Mutex<Option<RemoteRuntimeStats>>> =
214 std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
215static HISTOGRAM_LOGGER_CANCEL: std::sync::Mutex<Option<tokio::sync::watch::Sender<bool>>> =
216 std::sync::Mutex::new(None);
217static HISTOGRAM_LOGGER_HANDLE: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>> =
218 std::sync::Mutex::new(None);
219
220fn store_logger_cancel(tx: tokio::sync::watch::Sender<bool>) {
221 *HISTOGRAM_LOGGER_CANCEL
222 .lock()
223 .expect("histogram logger cancel mutex poisoned") = Some(tx);
224}
225
226fn store_logger_handle(handle: tokio::task::JoinHandle<()>) {
227 *HISTOGRAM_LOGGER_HANDLE
228 .lock()
229 .expect("histogram logger handle mutex poisoned") = Some(handle);
230}
231
232fn take_logger_handle() -> Option<tokio::task::JoinHandle<()>> {
233 HISTOGRAM_LOGGER_HANDLE
234 .lock()
235 .expect("histogram logger handle mutex poisoned")
236 .take()
237}
238
239fn signal_logger_cancel() {
240 if let Some(tx) = HISTOGRAM_LOGGER_CANCEL
241 .lock()
242 .expect("histogram logger cancel mutex poisoned")
243 .take()
244 && let Err(err) = tx.send(true)
245 {
246 tracing::debug!("histogram-logger cancel send failed (already gone): {err:#}");
247 }
248}
249
250#[must_use]
251pub fn get_progress() -> &'static progress::Progress {
252 &PROGRESS
253}
254
255pub fn set_remote_runtime_stats(stats: RemoteRuntimeStats) {
257 *REMOTE_RUNTIME_STATS.lock().unwrap() = Some(stats);
258}
259
260struct LocalTimeFormatter;
261
262impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFormatter {
263 fn format_time(
264 &self,
265 writer: &mut tracing_subscriber::fmt::format::Writer<'_>,
266 ) -> std::fmt::Result {
267 let now = chrono::Local::now();
268 writer.write_str(&now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
269 }
270}
271
272struct ProgressTracker {
273 lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
274 pbar_thread: Option<std::thread::JoinHandle<()>>,
275}
276
277#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
278pub enum ProgressType {
279 #[default]
280 #[value(name = "auto", alias = "Auto")]
281 Auto,
282 #[value(name = "ProgressBar", alias = "progress-bar")]
283 ProgressBar,
284 #[value(name = "TextUpdates", alias = "text-updates")]
285 TextUpdates,
286}
287
288pub enum GeneralProgressType {
289 User {
290 progress_type: ProgressType,
291 kind: progress::LocalProgressKind,
292 },
293 Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
294 RemoteMaster {
295 progress_type: ProgressType,
296 get_progress_snapshot:
297 Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>,
298 },
299}
300
301impl std::fmt::Debug for GeneralProgressType {
302 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303 match self {
304 GeneralProgressType::User {
305 progress_type,
306 kind,
307 } => write!(f, "User(progress_type: {progress_type:?}, kind: {kind:?})"),
308 GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
309 GeneralProgressType::RemoteMaster { progress_type, .. } => {
310 write!(
311 f,
312 "RemoteMaster(progress_type: {progress_type:?}, <function>)"
313 )
314 }
315 }
316 }
317}
318
319#[derive(Debug)]
320pub struct ProgressSettings {
321 pub progress_type: GeneralProgressType,
322 pub progress_delay: Option<String>,
323}
324
325fn progress_bar(
326 lock: &std::sync::Mutex<bool>,
327 cvar: &std::sync::Condvar,
328 delay_opt: &Option<std::time::Duration>,
329 kind: progress::LocalProgressKind,
330) {
331 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
332 PBAR.set_style(
333 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
334 .unwrap()
335 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
336 );
337 let mut prog_printer = progress::make_local_printer(kind, &PROGRESS);
338 let mut is_done = lock.lock().unwrap();
339 loop {
340 PBAR.set_position(PBAR.position() + 1); let mut msg = prog_printer.print().unwrap();
342 msg.push_str(&observability::render_lines());
343 msg.push_str(&render_panel_from_registry());
344 PBAR.set_message(msg);
345 let result = cvar.wait_timeout(is_done, delay).unwrap();
346 is_done = result.0;
347 if *is_done {
348 break;
349 }
350 }
351 PBAR.finish_and_clear();
352}
353
354fn get_datetime_prefix() -> String {
355 chrono::Local::now()
356 .format("%Y-%m-%dT%H:%M:%S%.3f%:z")
357 .to_string()
358}
359
360fn text_updates(
361 lock: &std::sync::Mutex<bool>,
362 cvar: &std::sync::Condvar,
363 delay_opt: &Option<std::time::Duration>,
364 kind: progress::LocalProgressKind,
365) {
366 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
367 let mut prog_printer = progress::make_local_printer(kind, &PROGRESS);
368 let mut is_done = lock.lock().unwrap();
369 loop {
370 eprintln!("=======================");
371 eprintln!(
372 "{}\n--{}{}{}",
373 get_datetime_prefix(),
374 prog_printer.print().unwrap(),
375 observability::render_lines(),
376 render_panel_from_registry(),
377 );
378 let result = cvar.wait_timeout(is_done, delay).unwrap();
379 is_done = result.0;
380 if *is_done {
381 break;
382 }
383 }
384}
385
386fn rcpd_updates(
387 lock: &std::sync::Mutex<bool>,
388 cvar: &std::sync::Condvar,
389 delay_opt: &Option<std::time::Duration>,
390 sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
391) {
392 tracing::debug!("Starting rcpd progress updates");
393 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
394 let mut is_done = lock.lock().unwrap();
395 loop {
396 if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
397 tracing::debug!("Progress update channel closed, stopping progress updates");
399 break;
400 }
401 let result = cvar.wait_timeout(is_done, delay).unwrap();
402 is_done = result.0;
403 if *is_done {
404 break;
405 }
406 }
407}
408
409fn remote_master_updates<F>(
410 lock: &std::sync::Mutex<bool>,
411 cvar: &std::sync::Condvar,
412 delay_opt: &Option<std::time::Duration>,
413 get_progress_snapshot: F,
414 progress_type: ProgressType,
415) where
416 F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
417{
418 let interactive = match progress_type {
419 ProgressType::Auto => std::io::stderr().is_terminal(),
420 ProgressType::ProgressBar => true,
421 ProgressType::TextUpdates => false,
422 };
423 if interactive {
424 PBAR.set_style(
425 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
426 .unwrap()
427 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
428 );
429 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
430 let mut printer = RcpdProgressPrinter::new();
431 let mut is_done = lock.lock().unwrap();
432 loop {
433 let progress_map = get_progress_snapshot();
434 let source_progress = &progress_map[RcpdType::Source];
435 let destination_progress = &progress_map[RcpdType::Destination];
436 PBAR.set_position(PBAR.position() + 1); let mut msg = printer
438 .print(source_progress, destination_progress)
439 .unwrap();
440 msg.push_str(&render_panel_from_registry());
441 PBAR.set_message(msg);
442 let result = cvar.wait_timeout(is_done, delay).unwrap();
443 is_done = result.0;
444 if *is_done {
445 break;
446 }
447 }
448 PBAR.finish_and_clear();
449 } else {
450 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
451 let mut printer = RcpdProgressPrinter::new();
452 let mut is_done = lock.lock().unwrap();
453 loop {
454 let progress_map = get_progress_snapshot();
455 let source_progress = &progress_map[RcpdType::Source];
456 let destination_progress = &progress_map[RcpdType::Destination];
457 eprintln!("=======================");
458 eprintln!(
459 "{}\n--{}{}",
460 get_datetime_prefix(),
461 printer
462 .print(source_progress, destination_progress)
463 .unwrap(),
464 render_panel_from_registry(),
465 );
466 let result = cvar.wait_timeout(is_done, delay).unwrap();
467 is_done = result.0;
468 if *is_done {
469 break;
470 }
471 }
472 }
473}
474
475impl ProgressTracker {
476 pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
477 let lock_cvar =
478 std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
479 let lock_cvar_clone = lock_cvar.clone();
480 let pbar_thread = std::thread::spawn(move || {
481 let (lock, cvar) = &*lock_cvar_clone;
482 match progress_type {
483 GeneralProgressType::Remote(sender) => {
484 rcpd_updates(lock, cvar, &delay_opt, sender);
485 }
486 GeneralProgressType::RemoteMaster {
487 progress_type,
488 get_progress_snapshot,
489 } => {
490 remote_master_updates(
491 lock,
492 cvar,
493 &delay_opt,
494 get_progress_snapshot,
495 progress_type,
496 );
497 }
498 GeneralProgressType::User {
499 progress_type,
500 kind,
501 } => {
502 let interactive = match progress_type {
503 ProgressType::Auto => std::io::stderr().is_terminal(),
504 ProgressType::ProgressBar => true,
505 ProgressType::TextUpdates => false,
506 };
507 if interactive {
508 progress_bar(lock, cvar, &delay_opt, kind);
509 } else {
510 text_updates(lock, cvar, &delay_opt, kind);
511 }
512 }
513 }
514 });
515 Self {
516 lock_cvar,
517 pbar_thread: Some(pbar_thread),
518 }
519 }
520}
521
522impl Drop for ProgressTracker {
523 fn drop(&mut self) {
524 let (lock, cvar) = &*self.lock_cvar;
525 let mut is_done = lock.lock().unwrap();
526 *is_done = true;
527 cvar.notify_one();
528 drop(is_done);
529 if let Some(pbar_thread) = self.pbar_thread.take() {
530 pbar_thread.join().unwrap();
531 }
532 }
533}
534
535pub fn parse_metadata_cmp_settings(
536 settings: &str,
537) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
538 let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
539 for setting in settings.split(',') {
540 match setting {
541 "uid" => metadata_cmp_settings.uid = true,
542 "gid" => metadata_cmp_settings.gid = true,
543 "mode" => metadata_cmp_settings.mode = true,
544 "size" => metadata_cmp_settings.size = true,
545 "mtime" => metadata_cmp_settings.mtime = true,
546 "ctime" => metadata_cmp_settings.ctime = true,
547 _ => {
548 return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
549 }
550 }
551 }
552 Ok(metadata_cmp_settings)
553}
554
555fn parse_type_settings(
556 settings: &str,
557) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
558 let mut user_and_time = preserve::UserAndTimeSettings::default();
559 let mut mode_mask = None;
560 for setting in settings.split(',') {
561 match setting {
562 "uid" => user_and_time.uid = true,
563 "gid" => user_and_time.gid = true,
564 "time" => user_and_time.time = true,
565 _ => {
566 if let Ok(mask) = u32::from_str_radix(setting, 8) {
567 mode_mask = Some(mask);
568 } else {
569 return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
570 }
571 }
572 }
573 }
574 Ok((user_and_time, mode_mask))
575}
576
577pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
578 match settings {
580 "all" => return Ok(preserve::preserve_all()),
581 "none" => return Ok(preserve::preserve_none()),
582 _ => {}
583 }
584 let mut preserve_settings = preserve::Settings::default();
585 for type_settings in settings.split_whitespace() {
586 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
587 let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
588 format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
589 )?;
590 match obj_type {
591 "f" | "file" => {
592 preserve_settings.file = preserve::FileSettings::default();
593 preserve_settings.file.user_and_time = user_and_time_settings;
594 if let Some(mode) = mode_opt {
595 preserve_settings.file.mode_mask = mode;
596 }
597 }
598 "d" | "dir" | "directory" => {
599 preserve_settings.dir = preserve::DirSettings::default();
600 preserve_settings.dir.user_and_time = user_and_time_settings;
601 if let Some(mode) = mode_opt {
602 preserve_settings.dir.mode_mask = mode;
603 }
604 }
605 "l" | "link" | "symlink" => {
606 preserve_settings.symlink = preserve::SymlinkSettings::default();
607 preserve_settings.symlink.user_and_time = user_and_time_settings;
608 }
609 _ => {
610 return Err(anyhow!("Unknown object type: {}", obj_type));
611 }
612 }
613 } else {
614 return Err(anyhow!("Invalid preserve settings: {}", settings));
615 }
616 }
617 Ok(preserve_settings)
618}
619
620pub fn validate_update_compare_vs_preserve(
623 update_compare: &filecmp::MetadataCmpSettings,
624 preserve: &preserve::Settings,
625) -> Result<(), String> {
626 let mut missing = Vec::new();
627 if update_compare.mtime && !preserve.file.user_and_time.time {
628 missing.push("mtime");
629 }
630 if update_compare.uid && !preserve.file.user_and_time.uid {
631 missing.push("uid");
632 }
633 if update_compare.gid && !preserve.file.user_and_time.gid {
634 missing.push("gid");
635 }
636 if update_compare.mode && preserve.file.mode_mask != 0o7777 {
638 missing.push("mode");
639 }
640 if missing.is_empty() {
641 Ok(())
642 } else {
643 Err(format!(
644 "--update compares [{}] but --preserve-settings does not preserve them. \
645 Use --allow-lossy-update to override or adjust --preserve-settings.",
646 missing.join(", ")
647 ))
648 }
649}
650
651pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
652 let mut cmp_settings = cmp::ObjSettings::default();
653 for type_settings in settings.split_whitespace() {
654 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
655 let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
656 "parsing preserve settings: {obj_settings}, type: {obj_type}"
657 ))?;
658 let obj_type = match obj_type {
659 "f" | "file" => ObjType::File,
660 "d" | "dir" | "directory" => ObjType::Dir,
661 "l" | "link" | "symlink" => ObjType::Symlink,
662 "o" | "other" => ObjType::Other,
663 _ => {
664 return Err(anyhow!("Unknown obj type: {}", obj_type));
665 }
666 };
667 cmp_settings[obj_type] = obj_cmp_settings;
668 } else {
669 return Err(anyhow!("Invalid preserve settings: {}", settings));
670 }
671 }
672 Ok(cmp_settings)
673}
674
675pub async fn cmp(
676 src: &std::path::Path,
677 dst: &std::path::Path,
678 log: &cmp::LogWriter,
679 settings: &cmp::Settings,
680) -> Result<cmp::Summary, anyhow::Error> {
681 cmp::cmp(&PROGRESS, src, dst, log, settings).await
682}
683
684pub async fn copy(
685 src: &std::path::Path,
686 dst: &std::path::Path,
687 settings: ©::Settings,
688 preserve: &preserve::Settings,
689) -> Result<copy::Summary, copy::Error> {
690 copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
691}
692
693pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
694 rm::rm(&PROGRESS, path, settings).await
695}
696
697pub async fn chmod(
698 path: &std::path::Path,
699 settings: &chmod::Settings,
700) -> Result<chmod::Summary, chmod::Error> {
701 chmod::chmod(&PROGRESS, path, settings).await
702}
703
704pub async fn link(
705 src: &std::path::Path,
706 dst: &std::path::Path,
707 update: &Option<std::path::PathBuf>,
708 settings: &link::Settings,
709) -> Result<link::Summary, link::Error> {
710 let cwd = std::env::current_dir()
711 .with_context(|| "failed to get current working directory")
712 .map_err(|err| link::Error::new(err, link::Summary::default()))?;
713 link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
714}
715
716fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
717 match std::env::var(name) {
718 Ok(val) => match val.parse() {
719 Ok(val) => val,
720 Err(_) => default,
721 },
722 Err(_) => default,
723 }
724}
725
726#[must_use]
728pub fn collect_runtime_stats() -> RuntimeStats {
729 collect_runtime_stats_inner(procfs::process::Process::myself().ok())
730}
731
732fn collect_runtime_stats_inner(process: Option<procfs::process::Process>) -> RuntimeStats {
733 let Some(process) = process else {
734 return RuntimeStats::default();
735 };
736 collect_runtime_stats_for_process(&process).unwrap_or_default()
737}
738
739fn collect_runtime_stats_for_process(
740 process: &procfs::process::Process,
741) -> anyhow::Result<RuntimeStats> {
742 let stat = process.stat()?;
743 let clock_ticks = procfs::ticks_per_second() as f64;
744 let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
746 Ok(RuntimeStats {
747 cpu_time_user_ms: ((stat.utime as f64 / clock_ticks) * 1000.0) as u64,
748 cpu_time_kernel_ms: ((stat.stime as f64 / clock_ticks) * 1000.0) as u64,
749 peak_rss_bytes: vmhwm_kb * 1024,
750 })
751}
752
753fn print_runtime_stats_for_role(prefix: &str, stats: &RuntimeStats) {
754 let cpu_total =
755 std::time::Duration::from_millis(stats.cpu_time_user_ms + stats.cpu_time_kernel_ms);
756 let cpu_kernel = std::time::Duration::from_millis(stats.cpu_time_kernel_ms);
757 let cpu_user = std::time::Duration::from_millis(stats.cpu_time_user_ms);
758 println!(
759 "{prefix}cpu time : {:.2?} | k: {:.2?} | u: {:.2?}",
760 cpu_total, cpu_kernel, cpu_user
761 );
762 println!(
763 "{prefix}peak RSS : {}",
764 bytesize::ByteSize(stats.peak_rss_bytes)
765 );
766}
767
768#[rustfmt::skip]
769fn print_runtime_stats() -> Result<(), anyhow::Error> {
770 let remote_stats = REMOTE_RUNTIME_STATS.lock().unwrap().take();
772 if let Some(remote) = remote_stats {
773 println!("walltime : {:.2?}", &PROGRESS.get_duration());
775 println!();
776 let source_is_local = is_localhost(&remote.source_host);
777 let dest_is_local = is_localhost(&remote.dest_host);
778 let master_stats = collect_runtime_stats();
780 if !source_is_local {
782 println!("SOURCE ({}):", remote.source_host);
783 print_runtime_stats_for_role(" ", &remote.source_stats);
784 println!();
785 }
786 if !dest_is_local {
787 println!("DESTINATION ({}):", remote.dest_host);
788 print_runtime_stats_for_role(" ", &remote.dest_stats);
789 println!();
790 }
791 match (source_is_local, dest_is_local) {
793 (true, true) => {
794 println!("MASTER + SOURCE + DESTINATION (localhost):");
795 print_runtime_stats_for_role(" master ", &master_stats);
796 print_runtime_stats_for_role(" source ", &remote.source_stats);
797 print_runtime_stats_for_role(" dest ", &remote.dest_stats);
798 }
799 (true, false) => {
800 println!("MASTER + SOURCE (localhost):");
801 print_runtime_stats_for_role(" master ", &master_stats);
802 print_runtime_stats_for_role(" source ", &remote.source_stats);
803 }
804 (false, true) => {
805 println!("MASTER + DESTINATION (localhost):");
806 print_runtime_stats_for_role(" master ", &master_stats);
807 print_runtime_stats_for_role(" dest ", &remote.dest_stats);
808 }
809 (false, false) => {
810 println!("MASTER (localhost):");
811 print_runtime_stats_for_role(" ", &master_stats);
812 }
813 }
814 return Ok(());
815 }
816 let process = procfs::process::Process::myself()?;
818 let stat = process.stat()?;
819 let clock_ticks_per_second = procfs::ticks_per_second();
821 let ticks_to_duration = |ticks: u64| {
822 std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
823 };
824 let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
826 println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
827 println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
828 println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm_kb * 1024));
829 Ok(())
830}
831
832fn get_max_open_files() -> Result<u64, std::io::Error> {
833 let mut rlim = libc::rlimit {
834 rlim_cur: 0,
835 rlim_max: 0,
836 };
837 let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
839 if result == 0 {
840 Ok(rlim.rlim_cur)
841 } else {
842 Err(std::io::Error::last_os_error())
843 }
844}
845
846struct ProgWriter {}
847
848impl ProgWriter {
849 fn new() -> Self {
850 Self {}
851 }
852}
853
854impl std::io::Write for ProgWriter {
855 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
856 PBAR.suspend(|| std::io::stdout().write(buf))
857 }
858 fn flush(&mut self) -> std::io::Result<()> {
859 std::io::stdout().flush()
860 }
861}
862
863fn get_hostname() -> String {
864 nix::unistd::gethostname()
865 .ok()
866 .and_then(|os_str| os_str.into_string().ok())
867 .unwrap_or_else(|| "unknown".to_string())
868}
869
870#[must_use]
871pub fn generate_debug_log_filename(prefix: &str) -> String {
872 let now = chrono::Utc::now();
873 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
874 let process_id = std::process::id();
875 format!("{prefix}-{timestamp}-{process_id}")
876}
877
878#[must_use]
882pub fn generate_trace_filename(prefix: &str, identifier: &str, extension: &str) -> String {
883 let hostname = get_hostname();
884 let pid = std::process::id();
885 let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S");
886 format!("{prefix}-{identifier}-{hostname}-{pid}-{timestamp}.{extension}")
887}
888
889fn build_verbose_env_filter(verbose: u8) -> tracing_subscriber::EnvFilter {
893 let level_directive = match verbose {
894 0 => "error".parse().unwrap(),
895 1 => "info".parse().unwrap(),
896 2 => "debug".parse().unwrap(),
897 _ => "trace".parse().unwrap(),
898 };
899 tracing_subscriber::EnvFilter::from_default_env()
900 .add_directive(level_directive)
901 .add_directive("tokio=info".parse().unwrap())
902 .add_directive("runtime=info".parse().unwrap())
903 .add_directive("quinn=warn".parse().unwrap())
904 .add_directive("rustls=warn".parse().unwrap())
905 .add_directive("h2=warn".parse().unwrap())
906}
907
908fn build_profile_filter_str(profile_level: Option<&str>) -> String {
913 let level_str = profile_level.unwrap_or("trace");
914 let valid_levels = ["trace", "debug", "info", "warn", "error", "off"];
915 if !valid_levels.contains(&level_str.to_lowercase().as_str()) {
916 eprintln!(
917 "Invalid --profile-level '{level_str}'. Valid values: trace, debug, info, warn, error, off"
918 );
919 std::process::exit(1);
920 }
921 format!("tokio=off,quinn=off,h2=off,hyper=off,rustls=off,{level_str}")
922}
923
924#[allow(dead_code)] struct TracingGuards {
929 chrome: Option<tracing_chrome::FlushGuard>,
930 flame: Option<tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>>,
931}
932
933fn install_tracing_subscriber(
939 quiet: bool,
940 verbose: u8,
941 tracing_config: TracingConfig,
942) -> TracingGuards {
943 if quiet {
944 assert!(
945 verbose == 0,
946 "Quiet mode and verbose mode are mutually exclusive"
947 );
948 return TracingGuards {
949 chrome: None,
950 flame: None,
951 };
952 }
953 let TracingConfig {
954 remote_layer: remote_tracing_layer,
955 debug_log_file,
956 chrome_trace_prefix,
957 flamegraph_prefix,
958 trace_identifier,
959 profile_level,
960 tokio_console,
961 tokio_console_port,
962 } = tracing_config;
963 let file_layer = debug_log_file.map(|log_file_path| {
964 let file = std::fs::OpenOptions::new()
965 .create(true)
966 .append(true)
967 .open(&log_file_path)
968 .unwrap_or_else(|e| {
969 panic!("Failed to create debug log file at '{log_file_path}': {e}")
970 });
971 tracing_subscriber::fmt::layer()
972 .with_target(true)
973 .with_line_number(true)
974 .with_thread_ids(true)
975 .with_timer(LocalTimeFormatter)
976 .with_ansi(false)
977 .with_writer(file)
978 .with_filter(build_verbose_env_filter(verbose))
979 });
980 let fmt_layer = if remote_tracing_layer.is_some() {
982 None
983 } else {
984 Some(
985 tracing_subscriber::fmt::layer()
986 .with_target(true)
987 .with_line_number(true)
988 .with_span_events(if verbose > 2 {
989 FmtSpan::NEW | FmtSpan::CLOSE
990 } else {
991 FmtSpan::NONE
992 })
993 .with_timer(LocalTimeFormatter)
994 .pretty()
995 .with_writer(ProgWriter::new)
996 .with_filter(build_verbose_env_filter(verbose)),
997 )
998 };
999 let remote_tracing_layer =
1001 remote_tracing_layer.map(|layer| layer.with_filter(build_verbose_env_filter(verbose)));
1002 let console_layer = tokio_console.then(|| {
1003 let console_port = tokio_console_port.unwrap_or(6669);
1004 let retention_seconds: u64 =
1005 read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
1006 eprintln!("Tokio console server listening on 127.0.0.1:{console_port}");
1007 console_subscriber::ConsoleLayer::builder()
1008 .retention(std::time::Duration::from_secs(retention_seconds))
1009 .server_addr(([127, 0, 0, 1], console_port))
1010 .spawn()
1011 });
1012 let profile_filter_str = (chrome_trace_prefix.is_some() || flamegraph_prefix.is_some())
1015 .then(|| build_profile_filter_str(profile_level.as_deref()));
1016 let make_profile_filter =
1017 || tracing_subscriber::EnvFilter::new(profile_filter_str.as_ref().unwrap());
1018 let mut chrome_guard = None;
1019 let chrome_layer = chrome_trace_prefix.as_ref().map(|prefix| {
1020 let filename = generate_trace_filename(prefix, &trace_identifier, "json");
1021 eprintln!("Chrome trace will be written to: {filename}");
1022 let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
1023 .file(&filename)
1024 .include_args(true)
1025 .build();
1026 chrome_guard = Some(guard);
1027 layer.with_filter(make_profile_filter())
1028 });
1029 let mut flame_guard = None;
1030 let flame_layer = flamegraph_prefix.as_ref().and_then(|prefix| {
1031 let filename = generate_trace_filename(prefix, &trace_identifier, "folded");
1032 eprintln!("Flamegraph data will be written to: {filename}");
1033 match tracing_flame::FlameLayer::with_file(&filename) {
1034 Ok((layer, guard)) => {
1035 flame_guard = Some(guard);
1036 Some(layer.with_filter(make_profile_filter()))
1037 }
1038 Err(e) => {
1039 eprintln!("Failed to create flamegraph layer: {e}");
1040 None
1041 }
1042 }
1043 });
1044 tracing_subscriber::registry()
1045 .with(file_layer)
1046 .with(fmt_layer)
1047 .with(remote_tracing_layer)
1048 .with(console_layer)
1049 .with(chrome_layer)
1050 .with(flame_layer)
1051 .init();
1052 TracingGuards {
1053 chrome: chrome_guard,
1054 flame: flame_guard,
1055 }
1056}
1057
1058fn build_tokio_runtime(
1062 runtime: &RuntimeConfig,
1063 throttle: &ThrottleConfig,
1064) -> tokio::runtime::Runtime {
1065 let mut builder = tokio::runtime::Builder::new_multi_thread();
1066 builder.enable_all();
1067 if runtime.max_workers > 0 {
1068 builder.worker_threads(runtime.max_workers);
1069 }
1070 if runtime.max_blocking_threads > 0 {
1071 builder.max_blocking_threads(runtime.max_blocking_threads);
1072 }
1073 if !sysinfo::set_open_files_limit(usize::MAX) {
1074 tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
1075 }
1076 let set_max_open_files = throttle.max_open_files.unwrap_or_else(|| {
1077 let limit = get_max_open_files().expect(
1078 "We failed to query rlimit, if this is expected try specifying --max-open-files",
1079 ) as usize;
1080 std::cmp::min(limit / 10 * 8, 4096)
1083 });
1084 if set_max_open_files > 0 {
1085 tracing::info!("Setting max open files to: {}", set_max_open_files);
1086 throttle::set_max_open_files(set_max_open_files);
1087 } else {
1088 tracing::info!("Not applying any limit to max open files!");
1089 }
1090 builder.build().expect("Failed to create runtime")
1091}
1092
1093fn spawn_throttle_replenishers(
1107 runtime: &tokio::runtime::Runtime,
1108 throttle: &ThrottleConfig,
1109 trace_identifier: &str,
1110) {
1111 fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
1112 let mut replenish = replenish;
1113 let mut interval = std::time::Duration::from_secs(1);
1114 while replenish > 100 && interval > std::time::Duration::from_millis(1) {
1115 replenish /= 10;
1116 interval /= 10;
1117 }
1118 (replenish, interval)
1119 }
1120 let auto_meta_on = throttle.auto_meta.is_some();
1121 if auto_meta_on {
1122 let interval = std::time::Duration::from_millis(100);
1127 let initial_replenish = (throttle.ops_throttle as f64 * 0.1) as usize;
1128 throttle::init_ops_tokens(initial_replenish.max(1));
1129 if throttle.ops_throttle == 0 {
1130 throttle::disable_ops_throttle();
1131 }
1132 runtime.spawn(throttle::run_ops_replenish_thread(
1133 initial_replenish,
1134 interval,
1135 ));
1136 } else if throttle.ops_throttle > 0 {
1137 let (replenish, interval) = get_replenish_interval(throttle.ops_throttle);
1138 throttle::init_ops_tokens(replenish);
1139 runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
1140 }
1141 if throttle.iops_throttle > 0 {
1142 let (replenish, interval) = get_replenish_interval(throttle.iops_throttle);
1143 throttle::init_iops_tokens(replenish);
1144 runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
1145 }
1146 if let Some(auto) = throttle.auto_meta {
1147 spawn_auto_meta_throttle(
1148 runtime,
1149 auto,
1150 throttle.histogram_enabled,
1151 throttle.histogram_log_path.clone(),
1152 throttle.histogram_interval,
1153 trace_identifier,
1154 );
1155 }
1156}
1157
1158fn resolve_log_path(path: &std::path::Path, trace_identifier: &str) -> std::path::PathBuf {
1171 let parent = match path.parent() {
1172 Some(p) if p.as_os_str().is_empty() => std::path::Path::new("."),
1173 Some(p) => p,
1174 None => std::path::Path::new("."),
1175 };
1176 let mut name: std::ffi::OsString = path
1177 .file_stem()
1178 .map(|s| s.to_os_string())
1179 .unwrap_or_else(|| std::ffi::OsString::from("auto-meta"));
1180 name.push(".");
1181 name.push(trace_identifier);
1182 name.push(".");
1183 match path.extension() {
1184 Some(e) => name.push(e),
1185 None => name.push("hdr"),
1186 }
1187 parent.join(name)
1188}
1189
1190fn validate_histogram_log_target(
1201 throttle: &ThrottleConfig,
1202 trace_identifier: &str,
1203) -> Result<(), String> {
1204 let Some(path) = &throttle.histogram_log_path else {
1205 return Ok(());
1206 };
1207 if path.is_dir() {
1208 return Err(format!(
1209 "--auto-meta-histogram-log {path:?} is a directory; expected a file path",
1210 ));
1211 }
1212 if path.file_name().is_none() {
1213 return Err(format!(
1214 "--auto-meta-histogram-log {path:?} has no filename component",
1215 ));
1216 }
1217 let resolved = resolve_log_path(path, trace_identifier);
1218 let mut open_options = std::fs::OpenOptions::new();
1219 open_options.create(true).write(true).truncate(true);
1220 #[cfg(unix)]
1221 {
1222 use std::os::unix::fs::OpenOptionsExt;
1223 open_options.custom_flags(libc::O_NOFOLLOW);
1224 }
1225 match open_options.open(&resolved) {
1226 Ok(_) => Ok(()),
1227 Err(err) => {
1228 #[cfg(unix)]
1231 let context = if err.raw_os_error() == Some(libc::ELOOP) {
1232 " (resolved path is a symlink, which would let a local attacker hijack the write)"
1233 } else {
1234 ""
1235 };
1236 #[cfg(not(unix))]
1237 let context = "";
1238 Err(format!(
1239 "--auto-meta-histogram-log cannot create resolved path {resolved:?}: {err:#}{context}",
1240 ))
1241 }
1242 }
1243}
1244
1245const fn unit_label(side: congestion::Side, op: congestion::MetadataOp) -> &'static str {
1267 use congestion::MetadataOp::*;
1268 use congestion::Side::*;
1269 match (side, op) {
1270 (Source, Stat) => "src-stat",
1272 (Destination, Stat) => "dst-stat",
1273 (Source, ReadLink) => "src-read-link",
1274 (Destination, ReadLink) => "dst-read-link",
1275 (Destination, MkDir) => "mkdir",
1281 (Source, MkDir) => "src-mkdir",
1282 (Destination, RmDir) => "rmdir",
1283 (Source, RmDir) => "src-rmdir",
1284 (Destination, Unlink) => "unlink",
1285 (Source, Unlink) => "src-unlink",
1286 (Destination, HardLink) => "hard-link",
1287 (Source, HardLink) => "src-hard-link",
1288 (Destination, Symlink) => "symlink",
1289 (Source, Symlink) => "src-symlink",
1290 (Destination, Chmod) => "chmod",
1291 (Source, Chmod) => "src-chmod",
1292 (Destination, OpenCreate) => "open-create",
1293 (Source, OpenCreate) => "src-open-create",
1294 }
1295}
1296
1297fn build_histogram_header(
1298 auto: &AutoMetaThrottleConfig,
1299 tool_name: &str,
1300 snapshot_interval: std::time::Duration,
1301) -> congestion::format::LogHeader {
1302 use congestion::format::{AutoMetaSnapshot, HdrSnapshot, LogHeader, UnitLabel};
1303 let hostname = nix::unistd::gethostname()
1304 .ok()
1305 .and_then(|h| h.into_string().ok())
1306 .unwrap_or_else(|| "unknown".into());
1307 let mut unit_labels = Vec::with_capacity(congestion::N_META_RESOURCES);
1308 for &side in &congestion::Side::ALL {
1309 for &op in &congestion::MetadataOp::ALL {
1310 unit_labels.push(UnitLabel {
1311 side: side as u8,
1312 op: op as u8,
1313 label: unit_label(side, op).to_string(),
1314 });
1315 }
1316 }
1317 LogHeader {
1318 format_version: congestion::format::FORMAT_VERSION,
1319 tool: tool_name.to_string(),
1320 tool_version: env!("CARGO_PKG_VERSION").to_string(),
1321 hostname,
1322 pid: std::process::id(),
1323 start_unix_micros: std::time::SystemTime::now()
1324 .duration_since(std::time::UNIX_EPOCH)
1325 .map(|d| u64::try_from(d.as_micros()).unwrap_or(u64::MAX))
1326 .unwrap_or(0),
1327 snapshot_interval_micros: u64::try_from(snapshot_interval.as_micros()).unwrap_or(u64::MAX),
1328 auto_meta: AutoMetaSnapshot {
1329 initial_cwnd: auto.initial_cwnd,
1330 min_cwnd: auto.min_cwnd,
1331 max_cwnd: auto.max_cwnd,
1332 alpha: auto.alpha,
1333 beta: auto.beta,
1334 increase_step: auto.increase_step,
1335 decrease_step: auto.decrease_step,
1336 baseline_percentile: auto.baseline_percentile,
1337 current_percentile: auto.current_percentile,
1338 long_window_micros: u64::try_from(auto.long_window.as_micros()).unwrap_or(u64::MAX),
1339 short_window_micros: u64::try_from(auto.short_window.as_micros()).unwrap_or(u64::MAX),
1340 tick_interval_micros: u64::try_from(auto.tick_interval.as_micros()).unwrap_or(u64::MAX),
1341 },
1342 hdr: HdrSnapshot {
1343 lowest_discernible_micros: congestion::HDR_LOWEST_DISCERNIBLE_MICROS,
1344 highest_trackable_micros: congestion::HDR_HIGHEST_TRACKABLE_MICROS,
1345 significant_figures: congestion::HDR_SIGNIFICANT_FIGURES,
1346 unit: "microseconds".into(),
1347 },
1348 unit_labels,
1349 }
1350}
1351
1352fn render_panel_from_registry() -> String {
1353 let entries = observability::registered_histograms();
1354 if entries.is_empty() {
1355 return String::new();
1356 }
1357 let snapshots: Vec<hdrhistogram::Histogram<u64>> = entries
1358 .iter()
1359 .map(|e| (*e.snapshot_rx.borrow()).clone())
1360 .collect();
1361 let units: Vec<histogram_panel::PanelUnit> = entries
1362 .iter()
1363 .zip(snapshots.iter())
1364 .map(|(e, snap)| histogram_panel::PanelUnit {
1365 label: e.label,
1366 histogram: snap,
1367 interval: e.interval,
1368 })
1369 .collect();
1370 histogram_panel::render_histogram_panel(&units)
1371}
1372
1373fn spawn_auto_meta_throttle(
1397 runtime: &tokio::runtime::Runtime,
1398 auto: AutoMetaThrottleConfig,
1399 histogram_enabled: bool,
1400 histogram_log_path: Option<std::path::PathBuf>,
1401 histogram_interval: std::time::Duration,
1402 trace_identifier: &str,
1403) {
1404 let initial_cwnd = auto
1405 .initial_cwnd
1406 .clamp(auto.min_cwnd.max(1), auto.max_cwnd.max(1));
1407 let histogram_active = histogram_enabled || histogram_log_path.is_some();
1408 let resolved_log_path = histogram_log_path
1412 .as_ref()
1413 .map(|p| resolve_log_path(p, trace_identifier));
1414
1415 let mut builder = congestion::RoutingSinkBuilder::new();
1418 struct Slot {
1419 label: &'static str,
1420 side: congestion::Side,
1421 op: congestion::MetadataOp,
1422 sample_rx: tokio::sync::mpsc::Receiver<congestion::Sample>,
1423 apply_rate: bool,
1424 accumulator: Option<std::sync::Arc<std::sync::Mutex<congestion::HistogramAccumulator>>>,
1425 }
1426 let mut slots: Vec<Slot> = Vec::with_capacity(congestion::N_META_RESOURCES);
1427 for &side in &congestion::Side::ALL {
1428 for &op in &congestion::MetadataOp::ALL {
1429 let resource = walk::meta_resource(side, op);
1430 throttle::set_max_ops_in_flight(resource, initial_cwnd as usize);
1431 let rx = builder.metadata_receiver(side, op);
1432 let apply_rate = matches!(
1433 (side, op),
1434 (congestion::Side::Destination, congestion::MetadataOp::Stat),
1435 );
1436 let accumulator = if histogram_active {
1437 let acc = std::sync::Arc::new(std::sync::Mutex::new(
1438 congestion::HistogramAccumulator::new(),
1439 ));
1440 builder.metadata_histogram(side, op, acc.clone());
1441 Some(acc)
1442 } else {
1443 None
1444 };
1445 slots.push(Slot {
1446 label: unit_label(side, op),
1447 side,
1448 op,
1449 sample_rx: rx,
1450 apply_rate,
1451 accumulator,
1452 });
1453 }
1454 }
1455 let sink = std::sync::Arc::new(builder.build());
1456 congestion::install_sample_sink(sink.clone());
1457
1458 let mut logger_units: Vec<histogram_logger::LoggerUnit> = Vec::new();
1461 for slot in slots {
1462 let controller = congestion::RatioController::new(congestion::RatioConfig {
1463 initial_cwnd: auto.initial_cwnd,
1464 min_cwnd: auto.min_cwnd,
1465 max_cwnd: auto.max_cwnd,
1466 alpha: auto.alpha,
1467 beta: auto.beta,
1468 increase_step: auto.increase_step,
1469 decrease_step: auto.decrease_step,
1470 baseline_percentile: auto.baseline_percentile,
1471 current_percentile: auto.current_percentile,
1472 long_window: auto.long_window,
1473 short_window: auto.short_window,
1474 });
1475 let (unit, decision_rx, snapshot_rx) = congestion::ControlUnit::new(
1476 slot.label,
1477 controller,
1478 slot.sample_rx,
1479 auto.tick_interval,
1480 );
1481 observability::register_unit(slot.label, snapshot_rx);
1482 if let Some(acc) = slot.accumulator.as_ref() {
1483 let (snap_tx, snap_rx) = tokio::sync::watch::channel(
1484 hdrhistogram::Histogram::<u64>::new_with_bounds(
1485 congestion::HDR_LOWEST_DISCERNIBLE_MICROS,
1486 congestion::HDR_HIGHEST_TRACKABLE_MICROS,
1487 congestion::HDR_SIGNIFICANT_FIGURES,
1488 )
1489 .expect("histogram bounds valid"),
1490 );
1491 observability::register_histogram(slot.label, snap_rx, histogram_interval);
1492 logger_units.push(histogram_logger::LoggerUnit {
1493 label: slot.label,
1494 side: slot.side,
1495 op: slot.op,
1496 accumulator: acc.clone(),
1497 snapshot_tx: snap_tx,
1498 });
1499 }
1500 runtime.spawn(unit.run());
1501 runtime.spawn(auto_meta::run_adapter(
1502 walk::meta_resource(slot.side, slot.op),
1503 slot.apply_rate,
1504 decision_rx,
1505 sink.clone(),
1506 ));
1507 }
1508
1509 if histogram_active {
1510 let header = build_histogram_header(&auto, trace_identifier, histogram_interval);
1511 let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(false);
1512 store_logger_cancel(cancel_tx);
1513 let progress_source: histogram_logger::ProgressSource = Box::new(|| {
1521 let snapshot = progress::SerializableProgress::from(&*PROGRESS);
1522 serde_json::to_vec(&snapshot).unwrap_or_else(|err| {
1523 tracing::warn!(
1524 "histogram-logger: SerializableProgress JSON encode failed: {err:#}; \
1525 dropping this tick's progress record"
1526 );
1527 Vec::new()
1528 })
1529 });
1530 let handle = runtime.spawn(histogram_logger::run_logger(
1531 histogram_logger::LoggerConfig {
1532 interval: histogram_interval,
1533 log_path: resolved_log_path,
1534 header,
1535 progress_source: Some(progress_source),
1536 },
1537 logger_units,
1538 cancel_rx,
1539 ));
1540 store_logger_handle(handle);
1541 }
1542
1543 tracing::info!(
1544 "auto-meta-throttle enabled (per-(side, op) controllers, {} total): \
1545 initial_cwnd={}, max_cwnd={}, alpha={}, beta={}, \
1546 baseline_percentile={}, current_percentile={}, \
1547 long_window={:?}, short_window={:?}, tick={:?}, \
1548 histograms={}",
1549 congestion::N_META_RESOURCES,
1550 auto.initial_cwnd,
1551 auto.max_cwnd,
1552 auto.alpha,
1553 auto.beta,
1554 auto.baseline_percentile,
1555 auto.current_percentile,
1556 auto.long_window,
1557 auto.short_window,
1558 auto.tick_interval,
1559 histogram_active,
1560 );
1561}
1562
1563#[instrument(skip(func))] pub fn run<Fut, Summary, Error>(
1565 progress: Option<ProgressSettings>,
1566 output: OutputConfig,
1567 runtime_config: RuntimeConfig,
1568 throttle_config: ThrottleConfig,
1569 tracing_config: TracingConfig,
1570 func: impl FnOnce() -> Fut,
1571) -> Option<Summary>
1572where
1574 Summary: std::fmt::Display,
1575 Error: std::fmt::Display + std::fmt::Debug,
1576 Fut: std::future::Future<Output = Result<Summary, Error>>,
1577{
1578 let _ = get_progress();
1582 if let Err(e) = throttle_config.validate() {
1583 eprintln!("Configuration error: {e}");
1584 return None;
1585 }
1586 let OutputConfig {
1587 quiet,
1588 verbose,
1589 print_summary,
1590 suppress_runtime_stats,
1591 } = output;
1592 let trace_identifier = tracing_config.trace_identifier.clone();
1595 if let Err(e) = validate_histogram_log_target(&throttle_config, &trace_identifier) {
1596 eprintln!("Configuration error: {e}");
1597 return None;
1598 }
1599 let _tracing_guards = install_tracing_subscriber(quiet, verbose, tracing_config);
1600 let res = {
1601 let runtime = build_tokio_runtime(&runtime_config, &throttle_config);
1602 spawn_throttle_replenishers(&runtime, &throttle_config, &trace_identifier);
1603 let res = {
1604 let _progress_tracker = progress.map(|settings| {
1605 tracing::debug!("Requesting progress updates {settings:?}");
1606 let delay = settings.progress_delay.map(|delay_str| {
1607 humantime::parse_duration(&delay_str)
1608 .expect("Couldn't parse duration out of --progress-delay")
1609 });
1610 ProgressTracker::new(settings.progress_type, delay)
1611 });
1612 runtime.block_on(func())
1613 };
1614 match &res {
1615 Ok(summary) => {
1616 if print_summary || verbose > 0 {
1617 println!("{summary}");
1618 }
1619 }
1620 Err(err) => {
1621 if !quiet {
1622 println!("{err:?}");
1623 }
1624 }
1625 }
1626 if (print_summary || verbose > 0)
1627 && !suppress_runtime_stats
1628 && let Err(err) = print_runtime_stats()
1629 {
1630 println!("Failed to print runtime stats: {err:?}");
1631 }
1632 signal_logger_cancel();
1636 if let Some(handle) = take_logger_handle() {
1637 let _ = runtime.block_on(async {
1640 tokio::time::timeout(std::time::Duration::from_secs(1), handle).await
1641 });
1642 }
1643 res
1644 };
1647 reset_process_throttle_state();
1652 res.ok()
1653}
1654
1655fn reset_process_throttle_state() {
1661 congestion::clear_sample_sink();
1662 observability::clear();
1663 for &side in &throttle::Side::ALL {
1664 for &op in &throttle::MetadataOp::ALL {
1665 throttle::set_max_ops_in_flight(throttle::Resource::meta(side, op), 0);
1666 }
1667 }
1668 throttle::disable_ops_throttle();
1669 throttle::set_max_open_files(0);
1676 throttle::init_iops_tokens(0);
1677}
1678
1679#[cfg(test)]
1680mod unit_label_tests {
1681 use super::unit_label;
1682 use congestion::{MetadataOp, Side};
1683
1684 #[test]
1685 fn lookup_ops_carry_side_prefix() {
1686 assert_eq!(unit_label(Side::Source, MetadataOp::Stat), "src-stat");
1688 assert_eq!(unit_label(Side::Destination, MetadataOp::Stat), "dst-stat");
1689 assert_eq!(
1690 unit_label(Side::Source, MetadataOp::ReadLink),
1691 "src-read-link",
1692 );
1693 assert_eq!(
1694 unit_label(Side::Destination, MetadataOp::ReadLink),
1695 "dst-read-link",
1696 );
1697 }
1698
1699 #[test]
1700 fn destination_only_ops_drop_prefix() {
1701 assert_eq!(unit_label(Side::Destination, MetadataOp::MkDir), "mkdir");
1705 assert_eq!(unit_label(Side::Destination, MetadataOp::RmDir), "rmdir");
1706 assert_eq!(unit_label(Side::Destination, MetadataOp::Unlink), "unlink");
1707 assert_eq!(
1708 unit_label(Side::Destination, MetadataOp::HardLink),
1709 "hard-link",
1710 );
1711 assert_eq!(
1712 unit_label(Side::Destination, MetadataOp::Symlink),
1713 "symlink"
1714 );
1715 assert_eq!(unit_label(Side::Destination, MetadataOp::Chmod), "chmod");
1716 assert_eq!(
1717 unit_label(Side::Destination, MetadataOp::OpenCreate),
1718 "open-create",
1719 );
1720 }
1721
1722 #[test]
1723 fn unused_source_side_mutation_slots_keep_src_prefix() {
1724 assert_eq!(unit_label(Side::Source, MetadataOp::Unlink), "src-unlink");
1730 assert_eq!(unit_label(Side::Source, MetadataOp::MkDir), "src-mkdir");
1731 }
1732
1733 #[test]
1734 fn labels_are_unique_across_all_resources() {
1735 let mut seen = std::collections::HashSet::new();
1739 for &side in &Side::ALL {
1740 for &op in &MetadataOp::ALL {
1741 let label = unit_label(side, op);
1742 assert!(seen.insert(label), "duplicate label: {label}");
1743 }
1744 }
1745 assert_eq!(seen.len(), congestion::N_META_RESOURCES);
1746 }
1747}
1748
1749#[cfg(test)]
1750mod runtime_stats_tests {
1751 use super::*;
1752 use anyhow::Result;
1753
1754 #[test]
1755 fn collect_runtime_stats_matches_procfs_snapshot() -> Result<()> {
1756 let process = procfs::process::Process::myself()?;
1757 let expected = collect_runtime_stats_for_process(&process)?;
1758 let actual = collect_runtime_stats();
1759 let cpu_tolerance_ms = 50;
1760 let rss_tolerance_bytes = 1_000_000;
1761 assert!(
1762 expected.cpu_time_user_ms.abs_diff(actual.cpu_time_user_ms) <= cpu_tolerance_ms,
1763 "user CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1764 expected.cpu_time_user_ms,
1765 actual.cpu_time_user_ms
1766 );
1767 assert!(
1768 expected
1769 .cpu_time_kernel_ms
1770 .abs_diff(actual.cpu_time_kernel_ms)
1771 <= cpu_tolerance_ms,
1772 "kernel CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1773 expected.cpu_time_kernel_ms,
1774 actual.cpu_time_kernel_ms
1775 );
1776 assert!(
1777 expected.peak_rss_bytes.abs_diff(actual.peak_rss_bytes) <= rss_tolerance_bytes,
1778 "peak RSS deviated by more than {rss_tolerance_bytes} bytes: expected {}, got {}",
1779 expected.peak_rss_bytes,
1780 actual.peak_rss_bytes
1781 );
1782 Ok(())
1783 }
1784
1785 #[test]
1786 fn collect_runtime_stats_returns_default_on_error() {
1787 let stats = collect_runtime_stats_inner(None);
1788 assert_eq!(stats, RuntimeStats::default());
1789
1790 let nonexistent_process = procfs::process::Process::new(i32::MAX).ok();
1791 let stats = collect_runtime_stats_inner(nonexistent_process);
1792 assert_eq!(stats, RuntimeStats::default());
1793 }
1794}
1795
1796#[cfg(test)]
1797mod parse_preserve_settings_tests {
1798 use super::*;
1799 #[test]
1800 fn preset_all_returns_preserve_all() {
1801 let settings = parse_preserve_settings("all").unwrap();
1802 let expected = preserve::preserve_all();
1803 assert_eq!(settings.file.mode_mask, expected.file.mode_mask);
1804 assert!(settings.file.user_and_time.uid);
1805 assert!(settings.file.user_and_time.gid);
1806 assert!(settings.file.user_and_time.time);
1807 assert_eq!(settings.dir.mode_mask, expected.dir.mode_mask);
1808 assert!(settings.dir.user_and_time.uid);
1809 assert!(settings.dir.user_and_time.gid);
1810 assert!(settings.dir.user_and_time.time);
1811 assert!(settings.symlink.user_and_time.uid);
1812 assert!(settings.symlink.user_and_time.gid);
1813 assert!(settings.symlink.user_and_time.time);
1814 }
1815 #[test]
1816 fn preset_none_returns_preserve_none() {
1817 let settings = parse_preserve_settings("none").unwrap();
1818 let expected = preserve::preserve_none();
1819 assert_eq!(settings.file.mode_mask, expected.file.mode_mask);
1820 assert!(!settings.file.user_and_time.uid);
1821 assert!(!settings.file.user_and_time.gid);
1822 assert!(!settings.file.user_and_time.time);
1823 assert_eq!(settings.dir.mode_mask, expected.dir.mode_mask);
1824 assert!(!settings.dir.user_and_time.uid);
1825 assert!(!settings.dir.user_and_time.gid);
1826 assert!(!settings.dir.user_and_time.time);
1827 assert!(!settings.symlink.user_and_time.uid);
1828 assert!(!settings.symlink.user_and_time.gid);
1829 assert!(!settings.symlink.user_and_time.time);
1830 }
1831 #[test]
1832 fn per_type_settings_still_work() {
1833 let settings = parse_preserve_settings("f:uid,time,0777 d:gid").unwrap();
1834 assert!(settings.file.user_and_time.uid);
1835 assert!(settings.file.user_and_time.time);
1836 assert!(!settings.file.user_and_time.gid);
1837 assert_eq!(settings.file.mode_mask, 0o777);
1838 assert!(!settings.dir.user_and_time.uid);
1839 assert!(settings.dir.user_and_time.gid);
1840 assert!(!settings.dir.user_and_time.time);
1841 }
1842 #[test]
1843 fn invalid_settings_returns_error() {
1844 assert!(parse_preserve_settings("invalid").is_err());
1845 assert!(parse_preserve_settings("f:unknown_attr").is_err());
1846 }
1847}
1848
1849#[cfg(test)]
1850mod validate_update_compare_vs_preserve_tests {
1851 use super::*;
1852 #[test]
1853 fn detects_mtime_mismatch() {
1854 let compare = filecmp::MetadataCmpSettings {
1855 mtime: true,
1856 ..Default::default()
1857 };
1858 let preserve = preserve::preserve_none();
1859 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1860 assert!(result.is_err());
1861 assert!(result.unwrap_err().contains("mtime"));
1862 }
1863 #[test]
1864 fn detects_uid_mismatch() {
1865 let compare = filecmp::MetadataCmpSettings {
1866 uid: true,
1867 ..Default::default()
1868 };
1869 let preserve = preserve::preserve_none();
1870 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1871 assert!(result.is_err());
1872 assert!(result.unwrap_err().contains("uid"));
1873 }
1874 #[test]
1875 fn detects_gid_mismatch() {
1876 let compare = filecmp::MetadataCmpSettings {
1877 gid: true,
1878 ..Default::default()
1879 };
1880 let preserve = preserve::preserve_none();
1881 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1882 assert!(result.is_err());
1883 assert!(result.unwrap_err().contains("gid"));
1884 }
1885 #[test]
1886 fn detects_mode_mismatch() {
1887 let compare = filecmp::MetadataCmpSettings {
1888 mode: true,
1889 ..Default::default()
1890 };
1891 let mut preserve = preserve::preserve_none();
1892 preserve.file.mode_mask = 0;
1893 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1894 assert!(result.is_err());
1895 assert!(result.unwrap_err().contains("mode"));
1896 }
1897 #[test]
1898 fn detects_multiple_mismatches() {
1899 let compare = filecmp::MetadataCmpSettings {
1900 mtime: true,
1901 uid: true,
1902 gid: true,
1903 ..Default::default()
1904 };
1905 let preserve = preserve::preserve_none();
1906 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1907 assert!(result.is_err());
1908 let err = result.unwrap_err();
1909 assert!(err.contains("mtime"));
1910 assert!(err.contains("uid"));
1911 assert!(err.contains("gid"));
1912 }
1913 #[test]
1914 fn passes_when_preserve_covers_all_compared_attrs() {
1915 let compare = filecmp::MetadataCmpSettings {
1916 mtime: true,
1917 uid: true,
1918 gid: true,
1919 mode: true,
1920 size: true, ctime: true, };
1923 let preserve = preserve::preserve_all();
1924 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1925 assert!(result.is_ok());
1926 }
1927 #[test]
1928 fn fails_with_partial_mode_mask_when_mode_compared() {
1929 let compare = filecmp::MetadataCmpSettings {
1932 mode: true,
1933 ..Default::default()
1934 };
1935 let preserve = preserve::preserve_none();
1936 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1937 assert!(result.is_err());
1938 assert!(result.unwrap_err().contains("mode"));
1939 }
1940}
1941
1942#[cfg(test)]
1943mod resolve_log_path_tests {
1944 use super::*;
1945
1946 #[test]
1947 fn full_path_with_extension() {
1948 let p = std::path::Path::new("/tmp/foo.hdr");
1949 assert_eq!(
1950 resolve_log_path(p, "rcp"),
1951 std::path::PathBuf::from("/tmp/foo.rcp.hdr"),
1952 );
1953 }
1954
1955 #[test]
1956 fn bare_filename_resolves_to_current_dir() {
1957 let p = std::path::Path::new("foo.hdr");
1958 assert_eq!(
1959 resolve_log_path(p, "rcp"),
1960 std::path::PathBuf::from("./foo.rcp.hdr"),
1961 );
1962 }
1963
1964 #[test]
1965 fn no_extension_defaults_to_hdr() {
1966 let p = std::path::Path::new("/tmp/foo");
1967 assert_eq!(
1968 resolve_log_path(p, "rcp"),
1969 std::path::PathBuf::from("/tmp/foo.rcp.hdr"),
1970 );
1971 }
1972
1973 #[test]
1974 #[cfg(unix)]
1975 fn preserves_non_utf8_stem() {
1976 use std::os::unix::ffi::{OsStrExt, OsStringExt};
1977 let mut raw_name = vec![b'/', b't', b'm', b'p', b'/'];
1979 raw_name.extend_from_slice(&[0xFF, 0xFE]);
1980 raw_name.extend_from_slice(b".hdr");
1981 let p = std::path::PathBuf::from(std::ffi::OsString::from_vec(raw_name));
1982 let resolved = resolve_log_path(&p, "rcp");
1983 let bytes = resolved.as_os_str().as_bytes();
1986 assert!(
1987 bytes.windows(2).any(|w| w == [0xFF, 0xFE]),
1988 "non-UTF-8 bytes must survive resolution; got bytes: {bytes:?}",
1989 );
1990 assert!(
1991 bytes.ends_with(b".rcp.hdr"),
1992 "expected .rcp.hdr suffix; got bytes: {bytes:?}",
1993 );
1994 }
1995}
1996
1997#[cfg(test)]
1998mod validate_histogram_log_target_tests {
1999 use super::*;
2000
2001 fn throttle_with_log_path(path: Option<std::path::PathBuf>) -> ThrottleConfig {
2002 ThrottleConfig {
2003 histogram_enabled: path.is_some(),
2004 histogram_log_path: path,
2005 ..Default::default()
2006 }
2007 }
2008
2009 #[test]
2010 fn no_log_path_is_ok() {
2011 let throttle = throttle_with_log_path(None);
2012 assert!(validate_histogram_log_target(&throttle, "rcp").is_ok());
2013 }
2014
2015 #[test]
2016 fn writable_resolved_path_is_ok() {
2017 let dir = tempfile::tempdir().unwrap();
2018 let throttle = throttle_with_log_path(Some(dir.path().join("foo.hdr")));
2019 assert!(validate_histogram_log_target(&throttle, "rcp").is_ok());
2020 }
2021
2022 #[test]
2023 fn resolved_path_existing_as_directory_is_rejected() {
2024 let dir = tempfile::tempdir().unwrap();
2027 let blocker = dir.path().join("foo.rcp.hdr");
2028 std::fs::create_dir(&blocker).unwrap();
2029 let throttle = throttle_with_log_path(Some(dir.path().join("foo.hdr")));
2030 let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2031 assert!(
2032 err.contains("histogram-log") && err.contains("foo.rcp.hdr"),
2033 "got: {err}",
2034 );
2035 }
2036
2037 #[test]
2038 fn resolved_path_in_missing_parent_is_rejected() {
2039 let throttle = throttle_with_log_path(Some("/nonexistent-dir-67890/foo.hdr".into()));
2040 let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2041 assert!(err.contains("histogram-log"), "got: {err}");
2042 }
2043
2044 #[test]
2045 fn log_path_pointing_at_existing_directory_is_rejected() {
2046 let dir = tempfile::tempdir().unwrap();
2047 let throttle = throttle_with_log_path(Some(dir.path().to_path_buf()));
2048 let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2049 assert!(err.contains("directory"), "got: {err}");
2050 }
2051
2052 #[test]
2053 fn log_path_with_no_filename_is_rejected() {
2054 let throttle = throttle_with_log_path(Some(std::path::PathBuf::from("/")));
2056 let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2057 assert!(
2058 err.contains("filename") || err.contains("directory"),
2059 "got: {err}",
2060 );
2061 }
2062
2063 #[test]
2064 #[cfg(unix)]
2065 fn resolved_path_existing_as_symlink_is_rejected() {
2066 let dir = tempfile::tempdir().unwrap();
2070 let target = dir.path().join("victim.txt");
2074 std::fs::write(&target, b"do not clobber").unwrap();
2075 let resolved_path = dir.path().join("foo.rcp.hdr");
2076 std::os::unix::fs::symlink(&target, &resolved_path).unwrap();
2077 let throttle = throttle_with_log_path(Some(dir.path().join("foo.hdr")));
2078 let err = validate_histogram_log_target(&throttle, "rcp").unwrap_err();
2079 assert!(
2080 err.contains("symlink") || err.contains("ELOOP") || err.contains("Too many levels"),
2081 "got: {err}",
2082 );
2083 let preserved = std::fs::read(&target).unwrap();
2085 assert_eq!(preserved, b"do not clobber");
2086 }
2087}