1use crate::cmp::ObjType;
103use anyhow::anyhow;
104use anyhow::Context;
105use std::io::IsTerminal;
106use tracing::instrument;
107use tracing_subscriber::fmt::format::FmtSpan;
108use tracing_subscriber::prelude::*;
109
110pub mod cmp;
111pub mod config;
112pub mod copy;
113pub mod error_collector;
114pub mod filegen;
115pub mod filter;
116pub mod link;
117pub mod preserve;
118pub mod remote_tracing;
119pub mod rm;
120pub mod version;
121
122pub mod filecmp;
123pub mod progress;
124mod testutils;
125
126pub use config::{
127 DryRunMode, DryRunWarnings, OutputConfig, RuntimeConfig, ThrottleConfig, TracingConfig,
128};
129pub use progress::{RcpdProgressPrinter, SerializableProgress};
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
133pub enum RcpdType {
134 Source,
135 Destination,
136}
137
138impl std::fmt::Display for RcpdType {
139 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140 match self {
141 RcpdType::Source => write!(f, "source"),
142 RcpdType::Destination => write!(f, "destination"),
143 }
144 }
145}
146
147pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
149
150#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
152pub struct RuntimeStats {
153 pub cpu_time_user_ms: u64,
155 pub cpu_time_kernel_ms: u64,
157 pub peak_rss_bytes: u64,
159}
160
161#[derive(Debug, Default)]
163pub struct RemoteRuntimeStats {
164 pub source_host: String,
165 pub source_stats: RuntimeStats,
166 pub dest_host: String,
167 pub dest_stats: RuntimeStats,
168}
169
170#[must_use]
173pub fn is_localhost(host: &str) -> bool {
174 if host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]" {
175 return true;
176 }
177 let mut buf = [0u8; 256];
179 let result = unsafe { libc::gethostname(buf.as_mut_ptr() as *mut libc::c_char, buf.len()) };
181 if result == 0 {
182 if let Ok(hostname_cstr) = std::ffi::CStr::from_bytes_until_nul(&buf) {
183 if let Ok(hostname) = hostname_cstr.to_str() {
184 if host == hostname {
185 return true;
186 }
187 }
188 }
189 }
190 false
191}
192
193static PROGRESS: std::sync::LazyLock<progress::Progress> =
194 std::sync::LazyLock::new(progress::Progress::new);
195static PBAR: std::sync::LazyLock<indicatif::ProgressBar> =
196 std::sync::LazyLock::new(indicatif::ProgressBar::new_spinner);
197static REMOTE_RUNTIME_STATS: std::sync::LazyLock<std::sync::Mutex<Option<RemoteRuntimeStats>>> =
198 std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
199
200#[must_use]
201pub fn get_progress() -> &'static progress::Progress {
202 &PROGRESS
203}
204
205pub fn set_remote_runtime_stats(stats: RemoteRuntimeStats) {
207 *REMOTE_RUNTIME_STATS.lock().unwrap() = Some(stats);
208}
209
210struct LocalTimeFormatter;
211
212impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFormatter {
213 fn format_time(
214 &self,
215 writer: &mut tracing_subscriber::fmt::format::Writer<'_>,
216 ) -> std::fmt::Result {
217 let now = chrono::Local::now();
218 writer.write_str(&now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
219 }
220}
221
222struct ProgressTracker {
223 lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
224 pbar_thread: Option<std::thread::JoinHandle<()>>,
225}
226
227#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
228pub enum ProgressType {
229 #[default]
230 #[value(name = "auto", alias = "Auto")]
231 Auto,
232 #[value(name = "ProgressBar", alias = "progress-bar")]
233 ProgressBar,
234 #[value(name = "TextUpdates", alias = "text-updates")]
235 TextUpdates,
236}
237
238pub enum GeneralProgressType {
239 User(ProgressType),
240 Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
241 RemoteMaster {
242 progress_type: ProgressType,
243 get_progress_snapshot:
244 Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>,
245 },
246}
247
248impl std::fmt::Debug for GeneralProgressType {
249 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
250 match self {
251 GeneralProgressType::User(pt) => write!(f, "User({pt:?})"),
252 GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
253 GeneralProgressType::RemoteMaster { progress_type, .. } => {
254 write!(
255 f,
256 "RemoteMaster(progress_type: {progress_type:?}, <function>)"
257 )
258 }
259 }
260 }
261}
262
263#[derive(Debug)]
264pub struct ProgressSettings {
265 pub progress_type: GeneralProgressType,
266 pub progress_delay: Option<String>,
267}
268
269fn progress_bar(
270 lock: &std::sync::Mutex<bool>,
271 cvar: &std::sync::Condvar,
272 delay_opt: &Option<std::time::Duration>,
273) {
274 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
275 PBAR.set_style(
276 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
277 .unwrap()
278 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
279 );
280 let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
281 let mut is_done = lock.lock().unwrap();
282 loop {
283 PBAR.set_position(PBAR.position() + 1); PBAR.set_message(prog_printer.print().unwrap());
285 let result = cvar.wait_timeout(is_done, delay).unwrap();
286 is_done = result.0;
287 if *is_done {
288 break;
289 }
290 }
291 PBAR.finish_and_clear();
292}
293
294fn get_datetime_prefix() -> String {
295 chrono::Local::now()
296 .format("%Y-%m-%dT%H:%M:%S%.3f%:z")
297 .to_string()
298}
299
300fn text_updates(
301 lock: &std::sync::Mutex<bool>,
302 cvar: &std::sync::Condvar,
303 delay_opt: &Option<std::time::Duration>,
304) {
305 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
306 let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
307 let mut is_done = lock.lock().unwrap();
308 loop {
309 eprintln!("=======================");
310 eprintln!(
311 "{}\n--{}",
312 get_datetime_prefix(),
313 prog_printer.print().unwrap()
314 );
315 let result = cvar.wait_timeout(is_done, delay).unwrap();
316 is_done = result.0;
317 if *is_done {
318 break;
319 }
320 }
321}
322
323fn rcpd_updates(
324 lock: &std::sync::Mutex<bool>,
325 cvar: &std::sync::Condvar,
326 delay_opt: &Option<std::time::Duration>,
327 sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
328) {
329 tracing::debug!("Starting rcpd progress updates");
330 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
331 let mut is_done = lock.lock().unwrap();
332 loop {
333 if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
334 tracing::debug!("Progress update channel closed, stopping progress updates");
336 break;
337 }
338 let result = cvar.wait_timeout(is_done, delay).unwrap();
339 is_done = result.0;
340 if *is_done {
341 break;
342 }
343 }
344}
345
346fn remote_master_updates<F>(
347 lock: &std::sync::Mutex<bool>,
348 cvar: &std::sync::Condvar,
349 delay_opt: &Option<std::time::Duration>,
350 get_progress_snapshot: F,
351 progress_type: ProgressType,
352) where
353 F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
354{
355 let interactive = match progress_type {
356 ProgressType::Auto => std::io::stderr().is_terminal(),
357 ProgressType::ProgressBar => true,
358 ProgressType::TextUpdates => false,
359 };
360 if interactive {
361 PBAR.set_style(
362 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
363 .unwrap()
364 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
365 );
366 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
367 let mut printer = RcpdProgressPrinter::new();
368 let mut is_done = lock.lock().unwrap();
369 loop {
370 let progress_map = get_progress_snapshot();
371 let source_progress = &progress_map[RcpdType::Source];
372 let destination_progress = &progress_map[RcpdType::Destination];
373 PBAR.set_position(PBAR.position() + 1); PBAR.set_message(
375 printer
376 .print(source_progress, destination_progress)
377 .unwrap(),
378 );
379 let result = cvar.wait_timeout(is_done, delay).unwrap();
380 is_done = result.0;
381 if *is_done {
382 break;
383 }
384 }
385 PBAR.finish_and_clear();
386 } else {
387 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
388 let mut printer = RcpdProgressPrinter::new();
389 let mut is_done = lock.lock().unwrap();
390 loop {
391 let progress_map = get_progress_snapshot();
392 let source_progress = &progress_map[RcpdType::Source];
393 let destination_progress = &progress_map[RcpdType::Destination];
394 eprintln!("=======================");
395 eprintln!(
396 "{}\n--{}",
397 get_datetime_prefix(),
398 printer
399 .print(source_progress, destination_progress)
400 .unwrap()
401 );
402 let result = cvar.wait_timeout(is_done, delay).unwrap();
403 is_done = result.0;
404 if *is_done {
405 break;
406 }
407 }
408 }
409}
410
411impl ProgressTracker {
412 pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
413 let lock_cvar =
414 std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
415 let lock_cvar_clone = lock_cvar.clone();
416 let pbar_thread = std::thread::spawn(move || {
417 let (lock, cvar) = &*lock_cvar_clone;
418 match progress_type {
419 GeneralProgressType::Remote(sender) => {
420 rcpd_updates(lock, cvar, &delay_opt, sender);
421 }
422 GeneralProgressType::RemoteMaster {
423 progress_type,
424 get_progress_snapshot,
425 } => {
426 remote_master_updates(
427 lock,
428 cvar,
429 &delay_opt,
430 get_progress_snapshot,
431 progress_type,
432 );
433 }
434 _ => {
435 let interactive = match progress_type {
436 GeneralProgressType::User(ProgressType::Auto) => {
437 std::io::stderr().is_terminal()
438 }
439 GeneralProgressType::User(ProgressType::ProgressBar) => true,
440 GeneralProgressType::User(ProgressType::TextUpdates) => false,
441 GeneralProgressType::Remote(_)
442 | GeneralProgressType::RemoteMaster { .. } => {
443 unreachable!("Invalid progress type: {progress_type:?}")
444 }
445 };
446 if interactive {
447 progress_bar(lock, cvar, &delay_opt);
448 } else {
449 text_updates(lock, cvar, &delay_opt);
450 }
451 }
452 }
453 });
454 Self {
455 lock_cvar,
456 pbar_thread: Some(pbar_thread),
457 }
458 }
459}
460
461impl Drop for ProgressTracker {
462 fn drop(&mut self) {
463 let (lock, cvar) = &*self.lock_cvar;
464 let mut is_done = lock.lock().unwrap();
465 *is_done = true;
466 cvar.notify_one();
467 drop(is_done);
468 if let Some(pbar_thread) = self.pbar_thread.take() {
469 pbar_thread.join().unwrap();
470 }
471 }
472}
473
474pub fn parse_metadata_cmp_settings(
475 settings: &str,
476) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
477 let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
478 for setting in settings.split(',') {
479 match setting {
480 "uid" => metadata_cmp_settings.uid = true,
481 "gid" => metadata_cmp_settings.gid = true,
482 "mode" => metadata_cmp_settings.mode = true,
483 "size" => metadata_cmp_settings.size = true,
484 "mtime" => metadata_cmp_settings.mtime = true,
485 "ctime" => metadata_cmp_settings.ctime = true,
486 _ => {
487 return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
488 }
489 }
490 }
491 Ok(metadata_cmp_settings)
492}
493
494fn parse_type_settings(
495 settings: &str,
496) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
497 let mut user_and_time = preserve::UserAndTimeSettings::default();
498 let mut mode_mask = None;
499 for setting in settings.split(',') {
500 match setting {
501 "uid" => user_and_time.uid = true,
502 "gid" => user_and_time.gid = true,
503 "time" => user_and_time.time = true,
504 _ => {
505 if let Ok(mask) = u32::from_str_radix(setting, 8) {
506 mode_mask = Some(mask);
507 } else {
508 return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
509 }
510 }
511 }
512 }
513 Ok((user_and_time, mode_mask))
514}
515
516pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
517 match settings {
519 "all" => return Ok(preserve::preserve_all()),
520 "none" => return Ok(preserve::preserve_none()),
521 _ => {}
522 }
523 let mut preserve_settings = preserve::Settings::default();
524 for type_settings in settings.split_whitespace() {
525 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
526 let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
527 format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
528 )?;
529 match obj_type {
530 "f" | "file" => {
531 preserve_settings.file = preserve::FileSettings::default();
532 preserve_settings.file.user_and_time = user_and_time_settings;
533 if let Some(mode) = mode_opt {
534 preserve_settings.file.mode_mask = mode;
535 }
536 }
537 "d" | "dir" | "directory" => {
538 preserve_settings.dir = preserve::DirSettings::default();
539 preserve_settings.dir.user_and_time = user_and_time_settings;
540 if let Some(mode) = mode_opt {
541 preserve_settings.dir.mode_mask = mode;
542 }
543 }
544 "l" | "link" | "symlink" => {
545 preserve_settings.symlink = preserve::SymlinkSettings::default();
546 preserve_settings.symlink.user_and_time = user_and_time_settings;
547 }
548 _ => {
549 return Err(anyhow!("Unknown object type: {}", obj_type));
550 }
551 }
552 } else {
553 return Err(anyhow!("Invalid preserve settings: {}", settings));
554 }
555 }
556 Ok(preserve_settings)
557}
558
559pub fn validate_update_compare_vs_preserve(
562 update_compare: &filecmp::MetadataCmpSettings,
563 preserve: &preserve::Settings,
564) -> Result<(), String> {
565 let mut missing = Vec::new();
566 if update_compare.mtime && !preserve.file.user_and_time.time {
567 missing.push("mtime");
568 }
569 if update_compare.uid && !preserve.file.user_and_time.uid {
570 missing.push("uid");
571 }
572 if update_compare.gid && !preserve.file.user_and_time.gid {
573 missing.push("gid");
574 }
575 if update_compare.mode && preserve.file.mode_mask != 0o7777 {
577 missing.push("mode");
578 }
579 if missing.is_empty() {
580 Ok(())
581 } else {
582 Err(format!(
583 "--update compares [{}] but --preserve-settings does not preserve them. \
584 Use --allow-lossy-update to override or adjust --preserve-settings.",
585 missing.join(", ")
586 ))
587 }
588}
589
590pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
591 let mut cmp_settings = cmp::ObjSettings::default();
592 for type_settings in settings.split_whitespace() {
593 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
594 let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
595 "parsing preserve settings: {obj_settings}, type: {obj_type}"
596 ))?;
597 let obj_type = match obj_type {
598 "f" | "file" => ObjType::File,
599 "d" | "dir" | "directory" => ObjType::Dir,
600 "l" | "link" | "symlink" => ObjType::Symlink,
601 "o" | "other" => ObjType::Other,
602 _ => {
603 return Err(anyhow!("Unknown obj type: {}", obj_type));
604 }
605 };
606 cmp_settings[obj_type] = obj_cmp_settings;
607 } else {
608 return Err(anyhow!("Invalid preserve settings: {}", settings));
609 }
610 }
611 Ok(cmp_settings)
612}
613
614pub async fn cmp(
615 src: &std::path::Path,
616 dst: &std::path::Path,
617 log: &cmp::LogWriter,
618 settings: &cmp::Settings,
619) -> Result<cmp::Summary, anyhow::Error> {
620 cmp::cmp(&PROGRESS, src, dst, log, settings).await
621}
622
623pub async fn copy(
624 src: &std::path::Path,
625 dst: &std::path::Path,
626 settings: ©::Settings,
627 preserve: &preserve::Settings,
628) -> Result<copy::Summary, copy::Error> {
629 copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
630}
631
632pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
633 rm::rm(&PROGRESS, path, settings).await
634}
635
636pub async fn link(
637 src: &std::path::Path,
638 dst: &std::path::Path,
639 update: &Option<std::path::PathBuf>,
640 settings: &link::Settings,
641) -> Result<link::Summary, link::Error> {
642 let cwd = std::env::current_dir()
643 .with_context(|| "failed to get current working directory")
644 .map_err(|err| link::Error::new(err, link::Summary::default()))?;
645 link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
646}
647
648fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
649 match std::env::var(name) {
650 Ok(val) => match val.parse() {
651 Ok(val) => val,
652 Err(_) => default,
653 },
654 Err(_) => default,
655 }
656}
657
658#[must_use]
660pub fn collect_runtime_stats() -> RuntimeStats {
661 collect_runtime_stats_inner(procfs::process::Process::myself().ok())
662}
663
664fn collect_runtime_stats_inner(process: Option<procfs::process::Process>) -> RuntimeStats {
665 let Some(process) = process else {
666 return RuntimeStats::default();
667 };
668 collect_runtime_stats_for_process(&process).unwrap_or_default()
669}
670
671fn collect_runtime_stats_for_process(
672 process: &procfs::process::Process,
673) -> anyhow::Result<RuntimeStats> {
674 let stat = process.stat()?;
675 let clock_ticks = procfs::ticks_per_second() as f64;
676 let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
678 Ok(RuntimeStats {
679 cpu_time_user_ms: ((stat.utime as f64 / clock_ticks) * 1000.0) as u64,
680 cpu_time_kernel_ms: ((stat.stime as f64 / clock_ticks) * 1000.0) as u64,
681 peak_rss_bytes: vmhwm_kb * 1024,
682 })
683}
684
685fn print_runtime_stats_for_role(prefix: &str, stats: &RuntimeStats) {
686 let cpu_total =
687 std::time::Duration::from_millis(stats.cpu_time_user_ms + stats.cpu_time_kernel_ms);
688 let cpu_kernel = std::time::Duration::from_millis(stats.cpu_time_kernel_ms);
689 let cpu_user = std::time::Duration::from_millis(stats.cpu_time_user_ms);
690 println!(
691 "{prefix}cpu time : {:.2?} | k: {:.2?} | u: {:.2?}",
692 cpu_total, cpu_kernel, cpu_user
693 );
694 println!(
695 "{prefix}peak RSS : {}",
696 bytesize::ByteSize(stats.peak_rss_bytes)
697 );
698}
699
700#[rustfmt::skip]
701fn print_runtime_stats() -> Result<(), anyhow::Error> {
702 let remote_stats = REMOTE_RUNTIME_STATS.lock().unwrap().take();
704 if let Some(remote) = remote_stats {
705 println!("walltime : {:.2?}", &PROGRESS.get_duration());
707 println!();
708 let source_is_local = is_localhost(&remote.source_host);
709 let dest_is_local = is_localhost(&remote.dest_host);
710 let master_stats = collect_runtime_stats();
712 if !source_is_local {
714 println!("SOURCE ({}):", remote.source_host);
715 print_runtime_stats_for_role(" ", &remote.source_stats);
716 println!();
717 }
718 if !dest_is_local {
719 println!("DESTINATION ({}):", remote.dest_host);
720 print_runtime_stats_for_role(" ", &remote.dest_stats);
721 println!();
722 }
723 match (source_is_local, dest_is_local) {
725 (true, true) => {
726 println!("MASTER + SOURCE + DESTINATION (localhost):");
727 print_runtime_stats_for_role(" master ", &master_stats);
728 print_runtime_stats_for_role(" source ", &remote.source_stats);
729 print_runtime_stats_for_role(" dest ", &remote.dest_stats);
730 }
731 (true, false) => {
732 println!("MASTER + SOURCE (localhost):");
733 print_runtime_stats_for_role(" master ", &master_stats);
734 print_runtime_stats_for_role(" source ", &remote.source_stats);
735 }
736 (false, true) => {
737 println!("MASTER + DESTINATION (localhost):");
738 print_runtime_stats_for_role(" master ", &master_stats);
739 print_runtime_stats_for_role(" dest ", &remote.dest_stats);
740 }
741 (false, false) => {
742 println!("MASTER (localhost):");
743 print_runtime_stats_for_role(" ", &master_stats);
744 }
745 }
746 return Ok(());
747 }
748 let process = procfs::process::Process::myself()?;
750 let stat = process.stat()?;
751 let clock_ticks_per_second = procfs::ticks_per_second();
753 let ticks_to_duration = |ticks: u64| {
754 std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
755 };
756 let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
758 println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
759 println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
760 println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm_kb * 1024));
761 Ok(())
762}
763
764fn get_max_open_files() -> Result<u64, std::io::Error> {
765 let mut rlim = libc::rlimit {
766 rlim_cur: 0,
767 rlim_max: 0,
768 };
769 let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
771 if result == 0 {
772 Ok(rlim.rlim_cur)
773 } else {
774 Err(std::io::Error::last_os_error())
775 }
776}
777
778struct ProgWriter {}
779
780impl ProgWriter {
781 fn new() -> Self {
782 Self {}
783 }
784}
785
786impl std::io::Write for ProgWriter {
787 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
788 PBAR.suspend(|| std::io::stdout().write(buf))
789 }
790 fn flush(&mut self) -> std::io::Result<()> {
791 std::io::stdout().flush()
792 }
793}
794
795fn get_hostname() -> String {
796 nix::unistd::gethostname()
797 .ok()
798 .and_then(|os_str| os_str.into_string().ok())
799 .unwrap_or_else(|| "unknown".to_string())
800}
801
802#[must_use]
803pub fn generate_debug_log_filename(prefix: &str) -> String {
804 let now = chrono::Utc::now();
805 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
806 let process_id = std::process::id();
807 format!("{prefix}-{timestamp}-{process_id}")
808}
809
810#[must_use]
814pub fn generate_trace_filename(prefix: &str, identifier: &str, extension: &str) -> String {
815 let hostname = get_hostname();
816 let pid = std::process::id();
817 let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S");
818 format!("{prefix}-{identifier}-{hostname}-{pid}-{timestamp}.{extension}")
819}
820
821#[instrument(skip(func))] pub fn run<Fut, Summary, Error>(
823 progress: Option<ProgressSettings>,
824 output: OutputConfig,
825 runtime: RuntimeConfig,
826 throttle: ThrottleConfig,
827 tracing_config: TracingConfig,
828 func: impl FnOnce() -> Fut,
829) -> Option<Summary>
830where
832 Summary: std::fmt::Display,
833 Error: std::fmt::Display + std::fmt::Debug,
834 Fut: std::future::Future<Output = Result<Summary, Error>>,
835{
836 let _ = get_progress();
840 if let Err(e) = throttle.validate() {
842 eprintln!("Configuration error: {e}");
843 return None;
844 }
845 let OutputConfig {
847 quiet,
848 verbose,
849 print_summary,
850 suppress_runtime_stats,
851 } = output;
852 let RuntimeConfig {
853 max_workers,
854 max_blocking_threads,
855 } = runtime;
856 let ThrottleConfig {
857 max_open_files,
858 ops_throttle,
859 iops_throttle,
860 chunk_size: _,
861 } = throttle;
862 let TracingConfig {
863 remote_layer: remote_tracing_layer,
864 debug_log_file,
865 chrome_trace_prefix,
866 flamegraph_prefix,
867 trace_identifier,
868 profile_level,
869 tokio_console,
870 tokio_console_port,
871 } = tracing_config;
872 let mut _chrome_guard: Option<tracing_chrome::FlushGuard> = None;
874 let mut _flame_guard: Option<tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>> =
875 None;
876 if quiet {
877 assert!(
878 verbose == 0,
879 "Quiet mode and verbose mode are mutually exclusive"
880 );
881 } else {
882 let make_env_filter = || {
884 let level_directive = match verbose {
885 0 => "error".parse().unwrap(),
886 1 => "info".parse().unwrap(),
887 2 => "debug".parse().unwrap(),
888 _ => "trace".parse().unwrap(),
889 };
890 tracing_subscriber::EnvFilter::from_default_env()
893 .add_directive(level_directive)
894 .add_directive("tokio=info".parse().unwrap())
895 .add_directive("runtime=info".parse().unwrap())
896 .add_directive("quinn=warn".parse().unwrap())
897 .add_directive("rustls=warn".parse().unwrap())
898 .add_directive("h2=warn".parse().unwrap())
899 };
900 let file_layer = if let Some(ref log_file_path) = debug_log_file {
901 let file = std::fs::OpenOptions::new()
902 .create(true)
903 .append(true)
904 .open(log_file_path)
905 .unwrap_or_else(|e| {
906 panic!("Failed to create debug log file at '{log_file_path}': {e}")
907 });
908 let file_layer = tracing_subscriber::fmt::layer()
909 .with_target(true)
910 .with_line_number(true)
911 .with_thread_ids(true)
912 .with_timer(LocalTimeFormatter)
913 .with_ansi(false)
914 .with_writer(file)
915 .with_filter(make_env_filter());
916 Some(file_layer)
917 } else {
918 None
919 };
920 let fmt_layer = if remote_tracing_layer.is_some() {
922 None
923 } else {
924 let fmt_layer = tracing_subscriber::fmt::layer()
925 .with_target(true)
926 .with_line_number(true)
927 .with_span_events(if verbose > 2 {
928 FmtSpan::NEW | FmtSpan::CLOSE
929 } else {
930 FmtSpan::NONE
931 })
932 .with_timer(LocalTimeFormatter)
933 .pretty()
934 .with_writer(ProgWriter::new)
935 .with_filter(make_env_filter());
936 Some(fmt_layer)
937 };
938 let remote_tracing_layer =
940 remote_tracing_layer.map(|layer| layer.with_filter(make_env_filter()));
941 let console_layer = if tokio_console {
942 let console_port = tokio_console_port.unwrap_or(6669);
943 let retention_seconds: u64 =
944 read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
945 eprintln!("Tokio console server listening on 127.0.0.1:{console_port}");
946 let console_layer = console_subscriber::ConsoleLayer::builder()
947 .retention(std::time::Duration::from_secs(retention_seconds))
948 .server_addr(([127, 0, 0, 1], console_port))
949 .spawn();
950 Some(console_layer)
951 } else {
952 None
953 };
954 let profiling_enabled = chrome_trace_prefix.is_some() || flamegraph_prefix.is_some();
958 let profile_filter_str = if profiling_enabled {
959 let level_str = profile_level.as_deref().unwrap_or("trace");
960 let valid_levels = ["trace", "debug", "info", "warn", "error", "off"];
962 if !valid_levels.contains(&level_str.to_lowercase().as_str()) {
963 eprintln!(
964 "Invalid --profile-level '{}'. Valid values: trace, debug, info, warn, error, off",
965 level_str
966 );
967 std::process::exit(1);
968 }
969 Some(format!(
971 "tokio=off,quinn=off,h2=off,hyper=off,rustls=off,{}",
972 level_str
973 ))
974 } else {
975 None
976 };
977 let make_profile_filter =
979 || tracing_subscriber::EnvFilter::new(profile_filter_str.as_ref().unwrap());
980 let chrome_layer = if let Some(ref prefix) = chrome_trace_prefix {
982 let filename = generate_trace_filename(prefix, &trace_identifier, "json");
983 eprintln!("Chrome trace will be written to: {filename}");
984 let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
985 .file(&filename)
986 .include_args(true)
987 .build();
988 _chrome_guard = Some(guard);
989 Some(layer.with_filter(make_profile_filter()))
990 } else {
991 None
992 };
993 let flame_layer = if let Some(ref prefix) = flamegraph_prefix {
995 let filename = generate_trace_filename(prefix, &trace_identifier, "folded");
996 eprintln!("Flamegraph data will be written to: {filename}");
997 match tracing_flame::FlameLayer::with_file(&filename) {
998 Ok((layer, guard)) => {
999 _flame_guard = Some(guard);
1000 Some(layer.with_filter(make_profile_filter()))
1001 }
1002 Err(e) => {
1003 eprintln!("Failed to create flamegraph layer: {e}");
1004 None
1005 }
1006 }
1007 } else {
1008 None
1009 };
1010 tracing_subscriber::registry()
1011 .with(file_layer)
1012 .with(fmt_layer)
1013 .with(remote_tracing_layer)
1014 .with(console_layer)
1015 .with(chrome_layer)
1016 .with(flame_layer)
1017 .init();
1018 }
1019 let mut builder = tokio::runtime::Builder::new_multi_thread();
1020 builder.enable_all();
1021 if max_workers > 0 {
1022 builder.worker_threads(max_workers);
1023 }
1024 if max_blocking_threads > 0 {
1025 builder.max_blocking_threads(max_blocking_threads);
1026 }
1027 if !sysinfo::set_open_files_limit(usize::MAX) {
1028 tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
1029 }
1030 let set_max_open_files = max_open_files.unwrap_or_else(|| {
1031 let limit = get_max_open_files().expect(
1032 "We failed to query rlimit, if this is expected try specifying --max-open-files",
1033 ) as usize;
1034 std::cmp::min(limit / 10 * 8, 4096)
1037 });
1038 if set_max_open_files > 0 {
1039 tracing::info!("Setting max open files to: {}", set_max_open_files);
1040 throttle::set_max_open_files(set_max_open_files);
1041 } else {
1042 tracing::info!("Not applying any limit to max open files!");
1043 }
1044 let runtime = builder.build().expect("Failed to create runtime");
1045 fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
1046 let mut replenish = replenish;
1047 let mut interval = std::time::Duration::from_secs(1);
1048 while replenish > 100 && interval > std::time::Duration::from_millis(1) {
1049 replenish /= 10;
1050 interval /= 10;
1051 }
1052 (replenish, interval)
1053 }
1054 if ops_throttle > 0 {
1055 let (replenish, interval) = get_replenish_interval(ops_throttle);
1056 throttle::init_ops_tokens(replenish);
1057 runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
1058 }
1059 if iops_throttle > 0 {
1060 let (replenish, interval) = get_replenish_interval(iops_throttle);
1061 throttle::init_iops_tokens(replenish);
1062 runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
1063 }
1064 let res = {
1065 let _progress = progress.map(|settings| {
1066 tracing::debug!("Requesting progress updates {settings:?}");
1067 let delay = settings.progress_delay.map(|delay_str| {
1068 humantime::parse_duration(&delay_str)
1069 .expect("Couldn't parse duration out of --progress-delay")
1070 });
1071 ProgressTracker::new(settings.progress_type, delay)
1072 });
1073 runtime.block_on(func())
1074 };
1075 match &res {
1076 Ok(summary) => {
1077 if print_summary || verbose > 0 {
1078 println!("{summary}");
1079 }
1080 }
1081 Err(err) => {
1082 if !quiet {
1083 println!("{err:?}");
1084 }
1085 }
1086 }
1087 if (print_summary || verbose > 0) && !suppress_runtime_stats {
1088 if let Err(err) = print_runtime_stats() {
1089 println!("Failed to print runtime stats: {err:?}");
1090 }
1091 }
1092 res.ok()
1093}
1094
1095#[cfg(test)]
1096mod runtime_stats_tests {
1097 use super::*;
1098 use anyhow::Result;
1099
1100 #[test]
1101 fn collect_runtime_stats_matches_procfs_snapshot() -> Result<()> {
1102 let process = procfs::process::Process::myself()?;
1103 let expected = collect_runtime_stats_for_process(&process)?;
1104 let actual = collect_runtime_stats();
1105 let cpu_tolerance_ms = 50;
1106 let rss_tolerance_bytes = 1_000_000;
1107 assert!(
1108 expected.cpu_time_user_ms.abs_diff(actual.cpu_time_user_ms) <= cpu_tolerance_ms,
1109 "user CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1110 expected.cpu_time_user_ms,
1111 actual.cpu_time_user_ms
1112 );
1113 assert!(
1114 expected
1115 .cpu_time_kernel_ms
1116 .abs_diff(actual.cpu_time_kernel_ms)
1117 <= cpu_tolerance_ms,
1118 "kernel CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1119 expected.cpu_time_kernel_ms,
1120 actual.cpu_time_kernel_ms
1121 );
1122 assert!(
1123 expected.peak_rss_bytes.abs_diff(actual.peak_rss_bytes) <= rss_tolerance_bytes,
1124 "peak RSS deviated by more than {rss_tolerance_bytes} bytes: expected {}, got {}",
1125 expected.peak_rss_bytes,
1126 actual.peak_rss_bytes
1127 );
1128 Ok(())
1129 }
1130
1131 #[test]
1132 fn collect_runtime_stats_returns_default_on_error() {
1133 let stats = collect_runtime_stats_inner(None);
1134 assert_eq!(stats, RuntimeStats::default());
1135
1136 let nonexistent_process = procfs::process::Process::new(i32::MAX).ok();
1137 let stats = collect_runtime_stats_inner(nonexistent_process);
1138 assert_eq!(stats, RuntimeStats::default());
1139 }
1140}
1141
1142#[cfg(test)]
1143mod parse_preserve_settings_tests {
1144 use super::*;
1145 #[test]
1146 fn preset_all_returns_preserve_all() {
1147 let settings = parse_preserve_settings("all").unwrap();
1148 let expected = preserve::preserve_all();
1149 assert_eq!(settings.file.mode_mask, expected.file.mode_mask);
1150 assert!(settings.file.user_and_time.uid);
1151 assert!(settings.file.user_and_time.gid);
1152 assert!(settings.file.user_and_time.time);
1153 assert_eq!(settings.dir.mode_mask, expected.dir.mode_mask);
1154 assert!(settings.dir.user_and_time.uid);
1155 assert!(settings.dir.user_and_time.gid);
1156 assert!(settings.dir.user_and_time.time);
1157 assert!(settings.symlink.user_and_time.uid);
1158 assert!(settings.symlink.user_and_time.gid);
1159 assert!(settings.symlink.user_and_time.time);
1160 }
1161 #[test]
1162 fn preset_none_returns_preserve_none() {
1163 let settings = parse_preserve_settings("none").unwrap();
1164 let expected = preserve::preserve_none();
1165 assert_eq!(settings.file.mode_mask, expected.file.mode_mask);
1166 assert!(!settings.file.user_and_time.uid);
1167 assert!(!settings.file.user_and_time.gid);
1168 assert!(!settings.file.user_and_time.time);
1169 assert_eq!(settings.dir.mode_mask, expected.dir.mode_mask);
1170 assert!(!settings.dir.user_and_time.uid);
1171 assert!(!settings.dir.user_and_time.gid);
1172 assert!(!settings.dir.user_and_time.time);
1173 assert!(!settings.symlink.user_and_time.uid);
1174 assert!(!settings.symlink.user_and_time.gid);
1175 assert!(!settings.symlink.user_and_time.time);
1176 }
1177 #[test]
1178 fn per_type_settings_still_work() {
1179 let settings = parse_preserve_settings("f:uid,time,0777 d:gid").unwrap();
1180 assert!(settings.file.user_and_time.uid);
1181 assert!(settings.file.user_and_time.time);
1182 assert!(!settings.file.user_and_time.gid);
1183 assert_eq!(settings.file.mode_mask, 0o777);
1184 assert!(!settings.dir.user_and_time.uid);
1185 assert!(settings.dir.user_and_time.gid);
1186 assert!(!settings.dir.user_and_time.time);
1187 }
1188 #[test]
1189 fn invalid_settings_returns_error() {
1190 assert!(parse_preserve_settings("invalid").is_err());
1191 assert!(parse_preserve_settings("f:unknown_attr").is_err());
1192 }
1193}
1194
1195#[cfg(test)]
1196mod validate_update_compare_vs_preserve_tests {
1197 use super::*;
1198 #[test]
1199 fn detects_mtime_mismatch() {
1200 let compare = filecmp::MetadataCmpSettings {
1201 mtime: true,
1202 ..Default::default()
1203 };
1204 let preserve = preserve::preserve_none();
1205 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1206 assert!(result.is_err());
1207 assert!(result.unwrap_err().contains("mtime"));
1208 }
1209 #[test]
1210 fn detects_uid_mismatch() {
1211 let compare = filecmp::MetadataCmpSettings {
1212 uid: true,
1213 ..Default::default()
1214 };
1215 let preserve = preserve::preserve_none();
1216 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1217 assert!(result.is_err());
1218 assert!(result.unwrap_err().contains("uid"));
1219 }
1220 #[test]
1221 fn detects_gid_mismatch() {
1222 let compare = filecmp::MetadataCmpSettings {
1223 gid: true,
1224 ..Default::default()
1225 };
1226 let preserve = preserve::preserve_none();
1227 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1228 assert!(result.is_err());
1229 assert!(result.unwrap_err().contains("gid"));
1230 }
1231 #[test]
1232 fn detects_mode_mismatch() {
1233 let compare = filecmp::MetadataCmpSettings {
1234 mode: true,
1235 ..Default::default()
1236 };
1237 let mut preserve = preserve::preserve_none();
1238 preserve.file.mode_mask = 0;
1239 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1240 assert!(result.is_err());
1241 assert!(result.unwrap_err().contains("mode"));
1242 }
1243 #[test]
1244 fn detects_multiple_mismatches() {
1245 let compare = filecmp::MetadataCmpSettings {
1246 mtime: true,
1247 uid: true,
1248 gid: true,
1249 ..Default::default()
1250 };
1251 let preserve = preserve::preserve_none();
1252 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1253 assert!(result.is_err());
1254 let err = result.unwrap_err();
1255 assert!(err.contains("mtime"));
1256 assert!(err.contains("uid"));
1257 assert!(err.contains("gid"));
1258 }
1259 #[test]
1260 fn passes_when_preserve_covers_all_compared_attrs() {
1261 let compare = filecmp::MetadataCmpSettings {
1262 mtime: true,
1263 uid: true,
1264 gid: true,
1265 mode: true,
1266 size: true, ctime: true, };
1269 let preserve = preserve::preserve_all();
1270 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1271 assert!(result.is_ok());
1272 }
1273 #[test]
1274 fn fails_with_partial_mode_mask_when_mode_compared() {
1275 let compare = filecmp::MetadataCmpSettings {
1278 mode: true,
1279 ..Default::default()
1280 };
1281 let preserve = preserve::preserve_none();
1282 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1283 assert!(result.is_err());
1284 assert!(result.unwrap_err().contains("mode"));
1285 }
1286}