1use crate::cmp::ObjType;
101use anyhow::anyhow;
102use anyhow::Context;
103use std::io::IsTerminal;
104use tracing::instrument;
105use tracing_subscriber::fmt::format::FmtSpan;
106use tracing_subscriber::prelude::*;
107
108pub mod cmp;
109pub mod config;
110pub mod copy;
111pub mod error_collector;
112pub mod filegen;
113pub mod filter;
114pub mod link;
115pub mod preserve;
116pub mod remote_tracing;
117pub mod rm;
118pub mod version;
119
120pub mod filecmp;
121pub mod progress;
122mod testutils;
123
124pub use config::{
125 DryRunMode, DryRunWarnings, OutputConfig, RuntimeConfig, ThrottleConfig, TracingConfig,
126};
127pub use progress::{RcpdProgressPrinter, SerializableProgress};
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
131pub enum RcpdType {
132 Source,
133 Destination,
134}
135
136impl std::fmt::Display for RcpdType {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 match self {
139 RcpdType::Source => write!(f, "source"),
140 RcpdType::Destination => write!(f, "destination"),
141 }
142 }
143}
144
145pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
147
148#[derive(Clone, Debug, Default, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
150pub struct RuntimeStats {
151 pub cpu_time_user_ms: u64,
153 pub cpu_time_kernel_ms: u64,
155 pub peak_rss_bytes: u64,
157}
158
159#[derive(Debug, Default)]
161pub struct RemoteRuntimeStats {
162 pub source_host: String,
163 pub source_stats: RuntimeStats,
164 pub dest_host: String,
165 pub dest_stats: RuntimeStats,
166}
167
168#[must_use]
171pub fn is_localhost(host: &str) -> bool {
172 if host == "localhost" || host == "127.0.0.1" || host == "::1" || host == "[::1]" {
173 return true;
174 }
175 let mut buf = [0u8; 256];
177 let result = unsafe { libc::gethostname(buf.as_mut_ptr() as *mut libc::c_char, buf.len()) };
179 if result == 0 {
180 if let Ok(hostname_cstr) = std::ffi::CStr::from_bytes_until_nul(&buf) {
181 if let Ok(hostname) = hostname_cstr.to_str() {
182 if host == hostname {
183 return true;
184 }
185 }
186 }
187 }
188 false
189}
190
191static PROGRESS: std::sync::LazyLock<progress::Progress> =
192 std::sync::LazyLock::new(progress::Progress::new);
193static PBAR: std::sync::LazyLock<indicatif::ProgressBar> =
194 std::sync::LazyLock::new(indicatif::ProgressBar::new_spinner);
195static REMOTE_RUNTIME_STATS: std::sync::LazyLock<std::sync::Mutex<Option<RemoteRuntimeStats>>> =
196 std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
197
198#[must_use]
199pub fn get_progress() -> &'static progress::Progress {
200 &PROGRESS
201}
202
203pub fn set_remote_runtime_stats(stats: RemoteRuntimeStats) {
205 *REMOTE_RUNTIME_STATS.lock().unwrap() = Some(stats);
206}
207
208struct LocalTimeFormatter;
209
210impl tracing_subscriber::fmt::time::FormatTime for LocalTimeFormatter {
211 fn format_time(
212 &self,
213 writer: &mut tracing_subscriber::fmt::format::Writer<'_>,
214 ) -> std::fmt::Result {
215 let now = chrono::Local::now();
216 writer.write_str(&now.to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
217 }
218}
219
220struct ProgressTracker {
221 lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
222 pbar_thread: Option<std::thread::JoinHandle<()>>,
223}
224
225#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
226pub enum ProgressType {
227 #[default]
228 #[value(name = "auto", alias = "Auto")]
229 Auto,
230 #[value(name = "ProgressBar", alias = "progress-bar")]
231 ProgressBar,
232 #[value(name = "TextUpdates", alias = "text-updates")]
233 TextUpdates,
234}
235
236pub enum GeneralProgressType {
237 User(ProgressType),
238 Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
239 RemoteMaster {
240 progress_type: ProgressType,
241 get_progress_snapshot:
242 Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>,
243 },
244}
245
246impl std::fmt::Debug for GeneralProgressType {
247 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248 match self {
249 GeneralProgressType::User(pt) => write!(f, "User({pt:?})"),
250 GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
251 GeneralProgressType::RemoteMaster { progress_type, .. } => {
252 write!(
253 f,
254 "RemoteMaster(progress_type: {progress_type:?}, <function>)"
255 )
256 }
257 }
258 }
259}
260
261#[derive(Debug)]
262pub struct ProgressSettings {
263 pub progress_type: GeneralProgressType,
264 pub progress_delay: Option<String>,
265}
266
267fn progress_bar(
268 lock: &std::sync::Mutex<bool>,
269 cvar: &std::sync::Condvar,
270 delay_opt: &Option<std::time::Duration>,
271) {
272 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
273 PBAR.set_style(
274 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
275 .unwrap()
276 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
277 );
278 let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
279 let mut is_done = lock.lock().unwrap();
280 loop {
281 PBAR.set_position(PBAR.position() + 1); PBAR.set_message(prog_printer.print().unwrap());
283 let result = cvar.wait_timeout(is_done, delay).unwrap();
284 is_done = result.0;
285 if *is_done {
286 break;
287 }
288 }
289 PBAR.finish_and_clear();
290}
291
292fn get_datetime_prefix() -> String {
293 chrono::Local::now()
294 .format("%Y-%m-%dT%H:%M:%S%.3f%:z")
295 .to_string()
296}
297
298fn text_updates(
299 lock: &std::sync::Mutex<bool>,
300 cvar: &std::sync::Condvar,
301 delay_opt: &Option<std::time::Duration>,
302) {
303 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
304 let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
305 let mut is_done = lock.lock().unwrap();
306 loop {
307 eprintln!("=======================");
308 eprintln!(
309 "{}\n--{}",
310 get_datetime_prefix(),
311 prog_printer.print().unwrap()
312 );
313 let result = cvar.wait_timeout(is_done, delay).unwrap();
314 is_done = result.0;
315 if *is_done {
316 break;
317 }
318 }
319}
320
321fn rcpd_updates(
322 lock: &std::sync::Mutex<bool>,
323 cvar: &std::sync::Condvar,
324 delay_opt: &Option<std::time::Duration>,
325 sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
326) {
327 tracing::debug!("Starting rcpd progress updates");
328 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
329 let mut is_done = lock.lock().unwrap();
330 loop {
331 if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
332 tracing::debug!("Progress update channel closed, stopping progress updates");
334 break;
335 }
336 let result = cvar.wait_timeout(is_done, delay).unwrap();
337 is_done = result.0;
338 if *is_done {
339 break;
340 }
341 }
342}
343
344fn remote_master_updates<F>(
345 lock: &std::sync::Mutex<bool>,
346 cvar: &std::sync::Condvar,
347 delay_opt: &Option<std::time::Duration>,
348 get_progress_snapshot: F,
349 progress_type: ProgressType,
350) where
351 F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
352{
353 let interactive = match progress_type {
354 ProgressType::Auto => std::io::stderr().is_terminal(),
355 ProgressType::ProgressBar => true,
356 ProgressType::TextUpdates => false,
357 };
358 if interactive {
359 PBAR.set_style(
360 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
361 .unwrap()
362 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
363 );
364 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
365 let mut printer = RcpdProgressPrinter::new();
366 let mut is_done = lock.lock().unwrap();
367 loop {
368 let progress_map = get_progress_snapshot();
369 let source_progress = &progress_map[RcpdType::Source];
370 let destination_progress = &progress_map[RcpdType::Destination];
371 PBAR.set_position(PBAR.position() + 1); PBAR.set_message(
373 printer
374 .print(source_progress, destination_progress)
375 .unwrap(),
376 );
377 let result = cvar.wait_timeout(is_done, delay).unwrap();
378 is_done = result.0;
379 if *is_done {
380 break;
381 }
382 }
383 PBAR.finish_and_clear();
384 } else {
385 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
386 let mut printer = RcpdProgressPrinter::new();
387 let mut is_done = lock.lock().unwrap();
388 loop {
389 let progress_map = get_progress_snapshot();
390 let source_progress = &progress_map[RcpdType::Source];
391 let destination_progress = &progress_map[RcpdType::Destination];
392 eprintln!("=======================");
393 eprintln!(
394 "{}\n--{}",
395 get_datetime_prefix(),
396 printer
397 .print(source_progress, destination_progress)
398 .unwrap()
399 );
400 let result = cvar.wait_timeout(is_done, delay).unwrap();
401 is_done = result.0;
402 if *is_done {
403 break;
404 }
405 }
406 }
407}
408
409impl ProgressTracker {
410 pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
411 let lock_cvar =
412 std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
413 let lock_cvar_clone = lock_cvar.clone();
414 let pbar_thread = std::thread::spawn(move || {
415 let (lock, cvar) = &*lock_cvar_clone;
416 match progress_type {
417 GeneralProgressType::Remote(sender) => {
418 rcpd_updates(lock, cvar, &delay_opt, sender);
419 }
420 GeneralProgressType::RemoteMaster {
421 progress_type,
422 get_progress_snapshot,
423 } => {
424 remote_master_updates(
425 lock,
426 cvar,
427 &delay_opt,
428 get_progress_snapshot,
429 progress_type,
430 );
431 }
432 _ => {
433 let interactive = match progress_type {
434 GeneralProgressType::User(ProgressType::Auto) => {
435 std::io::stderr().is_terminal()
436 }
437 GeneralProgressType::User(ProgressType::ProgressBar) => true,
438 GeneralProgressType::User(ProgressType::TextUpdates) => false,
439 GeneralProgressType::Remote(_)
440 | GeneralProgressType::RemoteMaster { .. } => {
441 unreachable!("Invalid progress type: {progress_type:?}")
442 }
443 };
444 if interactive {
445 progress_bar(lock, cvar, &delay_opt);
446 } else {
447 text_updates(lock, cvar, &delay_opt);
448 }
449 }
450 }
451 });
452 Self {
453 lock_cvar,
454 pbar_thread: Some(pbar_thread),
455 }
456 }
457}
458
459impl Drop for ProgressTracker {
460 fn drop(&mut self) {
461 let (lock, cvar) = &*self.lock_cvar;
462 let mut is_done = lock.lock().unwrap();
463 *is_done = true;
464 cvar.notify_one();
465 drop(is_done);
466 if let Some(pbar_thread) = self.pbar_thread.take() {
467 pbar_thread.join().unwrap();
468 }
469 }
470}
471
472pub fn parse_metadata_cmp_settings(
473 settings: &str,
474) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
475 let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
476 for setting in settings.split(',') {
477 match setting {
478 "uid" => metadata_cmp_settings.uid = true,
479 "gid" => metadata_cmp_settings.gid = true,
480 "mode" => metadata_cmp_settings.mode = true,
481 "size" => metadata_cmp_settings.size = true,
482 "mtime" => metadata_cmp_settings.mtime = true,
483 "ctime" => metadata_cmp_settings.ctime = true,
484 _ => {
485 return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
486 }
487 }
488 }
489 Ok(metadata_cmp_settings)
490}
491
492fn parse_type_settings(
493 settings: &str,
494) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
495 let mut user_and_time = preserve::UserAndTimeSettings::default();
496 let mut mode_mask = None;
497 for setting in settings.split(',') {
498 match setting {
499 "uid" => user_and_time.uid = true,
500 "gid" => user_and_time.gid = true,
501 "time" => user_and_time.time = true,
502 _ => {
503 if let Ok(mask) = u32::from_str_radix(setting, 8) {
504 mode_mask = Some(mask);
505 } else {
506 return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
507 }
508 }
509 }
510 }
511 Ok((user_and_time, mode_mask))
512}
513
514pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
515 match settings {
517 "all" => return Ok(preserve::preserve_all()),
518 "none" => return Ok(preserve::preserve_none()),
519 _ => {}
520 }
521 let mut preserve_settings = preserve::Settings::default();
522 for type_settings in settings.split_whitespace() {
523 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
524 let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
525 format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
526 )?;
527 match obj_type {
528 "f" | "file" => {
529 preserve_settings.file = preserve::FileSettings::default();
530 preserve_settings.file.user_and_time = user_and_time_settings;
531 if let Some(mode) = mode_opt {
532 preserve_settings.file.mode_mask = mode;
533 }
534 }
535 "d" | "dir" | "directory" => {
536 preserve_settings.dir = preserve::DirSettings::default();
537 preserve_settings.dir.user_and_time = user_and_time_settings;
538 if let Some(mode) = mode_opt {
539 preserve_settings.dir.mode_mask = mode;
540 }
541 }
542 "l" | "link" | "symlink" => {
543 preserve_settings.symlink = preserve::SymlinkSettings::default();
544 preserve_settings.symlink.user_and_time = user_and_time_settings;
545 }
546 _ => {
547 return Err(anyhow!("Unknown object type: {}", obj_type));
548 }
549 }
550 } else {
551 return Err(anyhow!("Invalid preserve settings: {}", settings));
552 }
553 }
554 Ok(preserve_settings)
555}
556
557pub fn validate_update_compare_vs_preserve(
560 update_compare: &filecmp::MetadataCmpSettings,
561 preserve: &preserve::Settings,
562) -> Result<(), String> {
563 let mut missing = Vec::new();
564 if update_compare.mtime && !preserve.file.user_and_time.time {
565 missing.push("mtime");
566 }
567 if update_compare.uid && !preserve.file.user_and_time.uid {
568 missing.push("uid");
569 }
570 if update_compare.gid && !preserve.file.user_and_time.gid {
571 missing.push("gid");
572 }
573 if update_compare.mode && preserve.file.mode_mask != 0o7777 {
575 missing.push("mode");
576 }
577 if missing.is_empty() {
578 Ok(())
579 } else {
580 Err(format!(
581 "--update compares [{}] but --preserve-settings does not preserve them. \
582 Use --allow-lossy-update to override or adjust --preserve-settings.",
583 missing.join(", ")
584 ))
585 }
586}
587
588pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
589 let mut cmp_settings = cmp::ObjSettings::default();
590 for type_settings in settings.split_whitespace() {
591 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
592 let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
593 "parsing preserve settings: {obj_settings}, type: {obj_type}"
594 ))?;
595 let obj_type = match obj_type {
596 "f" | "file" => ObjType::File,
597 "d" | "dir" | "directory" => ObjType::Dir,
598 "l" | "link" | "symlink" => ObjType::Symlink,
599 "o" | "other" => ObjType::Other,
600 _ => {
601 return Err(anyhow!("Unknown obj type: {}", obj_type));
602 }
603 };
604 cmp_settings[obj_type] = obj_cmp_settings;
605 } else {
606 return Err(anyhow!("Invalid preserve settings: {}", settings));
607 }
608 }
609 Ok(cmp_settings)
610}
611
612pub async fn cmp(
613 src: &std::path::Path,
614 dst: &std::path::Path,
615 log: &cmp::LogWriter,
616 settings: &cmp::Settings,
617) -> Result<cmp::Summary, anyhow::Error> {
618 cmp::cmp(&PROGRESS, src, dst, log, settings).await
619}
620
621pub async fn copy(
622 src: &std::path::Path,
623 dst: &std::path::Path,
624 settings: ©::Settings,
625 preserve: &preserve::Settings,
626) -> Result<copy::Summary, copy::Error> {
627 copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
628}
629
630pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
631 rm::rm(&PROGRESS, path, settings).await
632}
633
634pub async fn link(
635 src: &std::path::Path,
636 dst: &std::path::Path,
637 update: &Option<std::path::PathBuf>,
638 settings: &link::Settings,
639) -> Result<link::Summary, link::Error> {
640 let cwd = std::env::current_dir()
641 .with_context(|| "failed to get current working directory")
642 .map_err(|err| link::Error::new(err, link::Summary::default()))?;
643 link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
644}
645
646fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
647 match std::env::var(name) {
648 Ok(val) => match val.parse() {
649 Ok(val) => val,
650 Err(_) => default,
651 },
652 Err(_) => default,
653 }
654}
655
656#[must_use]
658pub fn collect_runtime_stats() -> RuntimeStats {
659 collect_runtime_stats_inner(procfs::process::Process::myself().ok())
660}
661
662fn collect_runtime_stats_inner(process: Option<procfs::process::Process>) -> RuntimeStats {
663 let Some(process) = process else {
664 return RuntimeStats::default();
665 };
666 collect_runtime_stats_for_process(&process).unwrap_or_default()
667}
668
669fn collect_runtime_stats_for_process(
670 process: &procfs::process::Process,
671) -> anyhow::Result<RuntimeStats> {
672 let stat = process.stat()?;
673 let clock_ticks = procfs::ticks_per_second() as f64;
674 let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
676 Ok(RuntimeStats {
677 cpu_time_user_ms: ((stat.utime as f64 / clock_ticks) * 1000.0) as u64,
678 cpu_time_kernel_ms: ((stat.stime as f64 / clock_ticks) * 1000.0) as u64,
679 peak_rss_bytes: vmhwm_kb * 1024,
680 })
681}
682
683fn print_runtime_stats_for_role(prefix: &str, stats: &RuntimeStats) {
684 let cpu_total =
685 std::time::Duration::from_millis(stats.cpu_time_user_ms + stats.cpu_time_kernel_ms);
686 let cpu_kernel = std::time::Duration::from_millis(stats.cpu_time_kernel_ms);
687 let cpu_user = std::time::Duration::from_millis(stats.cpu_time_user_ms);
688 println!(
689 "{prefix}cpu time : {:.2?} | k: {:.2?} | u: {:.2?}",
690 cpu_total, cpu_kernel, cpu_user
691 );
692 println!(
693 "{prefix}peak RSS : {}",
694 bytesize::ByteSize(stats.peak_rss_bytes)
695 );
696}
697
698#[rustfmt::skip]
699fn print_runtime_stats() -> Result<(), anyhow::Error> {
700 let remote_stats = REMOTE_RUNTIME_STATS.lock().unwrap().take();
702 if let Some(remote) = remote_stats {
703 println!("walltime : {:.2?}", &PROGRESS.get_duration());
705 println!();
706 let source_is_local = is_localhost(&remote.source_host);
707 let dest_is_local = is_localhost(&remote.dest_host);
708 let master_stats = collect_runtime_stats();
710 if !source_is_local {
712 println!("SOURCE ({}):", remote.source_host);
713 print_runtime_stats_for_role(" ", &remote.source_stats);
714 println!();
715 }
716 if !dest_is_local {
717 println!("DESTINATION ({}):", remote.dest_host);
718 print_runtime_stats_for_role(" ", &remote.dest_stats);
719 println!();
720 }
721 match (source_is_local, dest_is_local) {
723 (true, true) => {
724 println!("MASTER + SOURCE + DESTINATION (localhost):");
725 print_runtime_stats_for_role(" master ", &master_stats);
726 print_runtime_stats_for_role(" source ", &remote.source_stats);
727 print_runtime_stats_for_role(" dest ", &remote.dest_stats);
728 }
729 (true, false) => {
730 println!("MASTER + SOURCE (localhost):");
731 print_runtime_stats_for_role(" master ", &master_stats);
732 print_runtime_stats_for_role(" source ", &remote.source_stats);
733 }
734 (false, true) => {
735 println!("MASTER + DESTINATION (localhost):");
736 print_runtime_stats_for_role(" master ", &master_stats);
737 print_runtime_stats_for_role(" dest ", &remote.dest_stats);
738 }
739 (false, false) => {
740 println!("MASTER (localhost):");
741 print_runtime_stats_for_role(" ", &master_stats);
742 }
743 }
744 return Ok(());
745 }
746 let process = procfs::process::Process::myself()?;
748 let stat = process.stat()?;
749 let clock_ticks_per_second = procfs::ticks_per_second();
751 let ticks_to_duration = |ticks: u64| {
752 std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
753 };
754 let vmhwm_kb = process.status()?.vmhwm.unwrap_or(0);
756 println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
757 println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
758 println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm_kb * 1024));
759 Ok(())
760}
761
762fn get_max_open_files() -> Result<u64, std::io::Error> {
763 let mut rlim = libc::rlimit {
764 rlim_cur: 0,
765 rlim_max: 0,
766 };
767 let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
769 if result == 0 {
770 Ok(rlim.rlim_cur)
771 } else {
772 Err(std::io::Error::last_os_error())
773 }
774}
775
776struct ProgWriter {}
777
778impl ProgWriter {
779 fn new() -> Self {
780 Self {}
781 }
782}
783
784impl std::io::Write for ProgWriter {
785 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
786 PBAR.suspend(|| std::io::stdout().write(buf))
787 }
788 fn flush(&mut self) -> std::io::Result<()> {
789 std::io::stdout().flush()
790 }
791}
792
793fn get_hostname() -> String {
794 nix::unistd::gethostname()
795 .ok()
796 .and_then(|os_str| os_str.into_string().ok())
797 .unwrap_or_else(|| "unknown".to_string())
798}
799
800#[must_use]
801pub fn generate_debug_log_filename(prefix: &str) -> String {
802 let now = chrono::Utc::now();
803 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
804 let process_id = std::process::id();
805 format!("{prefix}-{timestamp}-{process_id}")
806}
807
808#[must_use]
812pub fn generate_trace_filename(prefix: &str, identifier: &str, extension: &str) -> String {
813 let hostname = get_hostname();
814 let pid = std::process::id();
815 let timestamp = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S");
816 format!("{prefix}-{identifier}-{hostname}-{pid}-{timestamp}.{extension}")
817}
818
819#[instrument(skip(func))] pub fn run<Fut, Summary, Error>(
821 progress: Option<ProgressSettings>,
822 output: OutputConfig,
823 runtime: RuntimeConfig,
824 throttle: ThrottleConfig,
825 tracing_config: TracingConfig,
826 func: impl FnOnce() -> Fut,
827) -> Option<Summary>
828where
830 Summary: std::fmt::Display,
831 Error: std::fmt::Display + std::fmt::Debug,
832 Fut: std::future::Future<Output = Result<Summary, Error>>,
833{
834 let _ = get_progress();
838 if let Err(e) = throttle.validate() {
840 eprintln!("Configuration error: {e}");
841 return None;
842 }
843 let OutputConfig {
845 quiet,
846 verbose,
847 print_summary,
848 suppress_runtime_stats,
849 } = output;
850 let RuntimeConfig {
851 max_workers,
852 max_blocking_threads,
853 } = runtime;
854 let ThrottleConfig {
855 max_open_files,
856 ops_throttle,
857 iops_throttle,
858 chunk_size: _,
859 } = throttle;
860 let TracingConfig {
861 remote_layer: remote_tracing_layer,
862 debug_log_file,
863 chrome_trace_prefix,
864 flamegraph_prefix,
865 trace_identifier,
866 profile_level,
867 tokio_console,
868 tokio_console_port,
869 } = tracing_config;
870 let mut _chrome_guard: Option<tracing_chrome::FlushGuard> = None;
872 let mut _flame_guard: Option<tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>> =
873 None;
874 if quiet {
875 assert!(
876 verbose == 0,
877 "Quiet mode and verbose mode are mutually exclusive"
878 );
879 } else {
880 let make_env_filter = || {
882 let level_directive = match verbose {
883 0 => "error".parse().unwrap(),
884 1 => "info".parse().unwrap(),
885 2 => "debug".parse().unwrap(),
886 _ => "trace".parse().unwrap(),
887 };
888 tracing_subscriber::EnvFilter::from_default_env()
891 .add_directive(level_directive)
892 .add_directive("tokio=info".parse().unwrap())
893 .add_directive("runtime=info".parse().unwrap())
894 .add_directive("quinn=warn".parse().unwrap())
895 .add_directive("rustls=warn".parse().unwrap())
896 .add_directive("h2=warn".parse().unwrap())
897 };
898 let file_layer = if let Some(ref log_file_path) = debug_log_file {
899 let file = std::fs::OpenOptions::new()
900 .create(true)
901 .append(true)
902 .open(log_file_path)
903 .unwrap_or_else(|e| {
904 panic!("Failed to create debug log file at '{log_file_path}': {e}")
905 });
906 let file_layer = tracing_subscriber::fmt::layer()
907 .with_target(true)
908 .with_line_number(true)
909 .with_thread_ids(true)
910 .with_timer(LocalTimeFormatter)
911 .with_ansi(false)
912 .with_writer(file)
913 .with_filter(make_env_filter());
914 Some(file_layer)
915 } else {
916 None
917 };
918 let fmt_layer = if remote_tracing_layer.is_some() {
920 None
921 } else {
922 let fmt_layer = tracing_subscriber::fmt::layer()
923 .with_target(true)
924 .with_line_number(true)
925 .with_span_events(if verbose > 2 {
926 FmtSpan::NEW | FmtSpan::CLOSE
927 } else {
928 FmtSpan::NONE
929 })
930 .with_timer(LocalTimeFormatter)
931 .pretty()
932 .with_writer(ProgWriter::new)
933 .with_filter(make_env_filter());
934 Some(fmt_layer)
935 };
936 let remote_tracing_layer =
938 remote_tracing_layer.map(|layer| layer.with_filter(make_env_filter()));
939 let console_layer = if tokio_console {
940 let console_port = tokio_console_port.unwrap_or(6669);
941 let retention_seconds: u64 =
942 read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
943 eprintln!("Tokio console server listening on 127.0.0.1:{console_port}");
944 let console_layer = console_subscriber::ConsoleLayer::builder()
945 .retention(std::time::Duration::from_secs(retention_seconds))
946 .server_addr(([127, 0, 0, 1], console_port))
947 .spawn();
948 Some(console_layer)
949 } else {
950 None
951 };
952 let profiling_enabled = chrome_trace_prefix.is_some() || flamegraph_prefix.is_some();
956 let profile_filter_str = if profiling_enabled {
957 let level_str = profile_level.as_deref().unwrap_or("trace");
958 let valid_levels = ["trace", "debug", "info", "warn", "error", "off"];
960 if !valid_levels.contains(&level_str.to_lowercase().as_str()) {
961 eprintln!(
962 "Invalid --profile-level '{}'. Valid values: trace, debug, info, warn, error, off",
963 level_str
964 );
965 std::process::exit(1);
966 }
967 Some(format!(
969 "tokio=off,quinn=off,h2=off,hyper=off,rustls=off,{}",
970 level_str
971 ))
972 } else {
973 None
974 };
975 let make_profile_filter =
977 || tracing_subscriber::EnvFilter::new(profile_filter_str.as_ref().unwrap());
978 let chrome_layer = if let Some(ref prefix) = chrome_trace_prefix {
980 let filename = generate_trace_filename(prefix, &trace_identifier, "json");
981 eprintln!("Chrome trace will be written to: {filename}");
982 let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
983 .file(&filename)
984 .include_args(true)
985 .build();
986 _chrome_guard = Some(guard);
987 Some(layer.with_filter(make_profile_filter()))
988 } else {
989 None
990 };
991 let flame_layer = if let Some(ref prefix) = flamegraph_prefix {
993 let filename = generate_trace_filename(prefix, &trace_identifier, "folded");
994 eprintln!("Flamegraph data will be written to: {filename}");
995 match tracing_flame::FlameLayer::with_file(&filename) {
996 Ok((layer, guard)) => {
997 _flame_guard = Some(guard);
998 Some(layer.with_filter(make_profile_filter()))
999 }
1000 Err(e) => {
1001 eprintln!("Failed to create flamegraph layer: {e}");
1002 None
1003 }
1004 }
1005 } else {
1006 None
1007 };
1008 tracing_subscriber::registry()
1009 .with(file_layer)
1010 .with(fmt_layer)
1011 .with(remote_tracing_layer)
1012 .with(console_layer)
1013 .with(chrome_layer)
1014 .with(flame_layer)
1015 .init();
1016 }
1017 let mut builder = tokio::runtime::Builder::new_multi_thread();
1018 builder.enable_all();
1019 if max_workers > 0 {
1020 builder.worker_threads(max_workers);
1021 }
1022 if max_blocking_threads > 0 {
1023 builder.max_blocking_threads(max_blocking_threads);
1024 }
1025 if !sysinfo::set_open_files_limit(usize::MAX) {
1026 tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
1027 }
1028 let set_max_open_files = max_open_files.unwrap_or_else(|| {
1029 let limit = get_max_open_files().expect(
1030 "We failed to query rlimit, if this is expected try specifying --max-open-files",
1031 ) as usize;
1032 std::cmp::min(limit / 10 * 8, 4096)
1035 });
1036 if set_max_open_files > 0 {
1037 tracing::info!("Setting max open files to: {}", set_max_open_files);
1038 throttle::set_max_open_files(set_max_open_files);
1039 } else {
1040 tracing::info!("Not applying any limit to max open files!");
1041 }
1042 let runtime = builder.build().expect("Failed to create runtime");
1043 fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
1044 let mut replenish = replenish;
1045 let mut interval = std::time::Duration::from_secs(1);
1046 while replenish > 100 && interval > std::time::Duration::from_millis(1) {
1047 replenish /= 10;
1048 interval /= 10;
1049 }
1050 (replenish, interval)
1051 }
1052 if ops_throttle > 0 {
1053 let (replenish, interval) = get_replenish_interval(ops_throttle);
1054 throttle::init_ops_tokens(replenish);
1055 runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
1056 }
1057 if iops_throttle > 0 {
1058 let (replenish, interval) = get_replenish_interval(iops_throttle);
1059 throttle::init_iops_tokens(replenish);
1060 runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
1061 }
1062 let res = {
1063 let _progress = progress.map(|settings| {
1064 tracing::debug!("Requesting progress updates {settings:?}");
1065 let delay = settings.progress_delay.map(|delay_str| {
1066 humantime::parse_duration(&delay_str)
1067 .expect("Couldn't parse duration out of --progress-delay")
1068 });
1069 ProgressTracker::new(settings.progress_type, delay)
1070 });
1071 runtime.block_on(func())
1072 };
1073 match &res {
1074 Ok(summary) => {
1075 if print_summary || verbose > 0 {
1076 println!("{summary}");
1077 }
1078 }
1079 Err(err) => {
1080 if !quiet {
1081 println!("{err:?}");
1082 }
1083 }
1084 }
1085 if (print_summary || verbose > 0) && !suppress_runtime_stats {
1086 if let Err(err) = print_runtime_stats() {
1087 println!("Failed to print runtime stats: {err:?}");
1088 }
1089 }
1090 res.ok()
1091}
1092
1093#[cfg(test)]
1094mod runtime_stats_tests {
1095 use super::*;
1096 use anyhow::Result;
1097
1098 #[test]
1099 fn collect_runtime_stats_matches_procfs_snapshot() -> Result<()> {
1100 let process = procfs::process::Process::myself()?;
1101 let expected = collect_runtime_stats_for_process(&process)?;
1102 let actual = collect_runtime_stats();
1103 let cpu_tolerance_ms = 50;
1104 let rss_tolerance_bytes = 1_000_000;
1105 assert!(
1106 expected.cpu_time_user_ms.abs_diff(actual.cpu_time_user_ms) <= cpu_tolerance_ms,
1107 "user CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1108 expected.cpu_time_user_ms,
1109 actual.cpu_time_user_ms
1110 );
1111 assert!(
1112 expected
1113 .cpu_time_kernel_ms
1114 .abs_diff(actual.cpu_time_kernel_ms)
1115 <= cpu_tolerance_ms,
1116 "kernel CPU deviated by more than {cpu_tolerance_ms}ms: expected {}, got {}",
1117 expected.cpu_time_kernel_ms,
1118 actual.cpu_time_kernel_ms
1119 );
1120 assert!(
1121 expected.peak_rss_bytes.abs_diff(actual.peak_rss_bytes) <= rss_tolerance_bytes,
1122 "peak RSS deviated by more than {rss_tolerance_bytes} bytes: expected {}, got {}",
1123 expected.peak_rss_bytes,
1124 actual.peak_rss_bytes
1125 );
1126 Ok(())
1127 }
1128
1129 #[test]
1130 fn collect_runtime_stats_returns_default_on_error() {
1131 let stats = collect_runtime_stats_inner(None);
1132 assert_eq!(stats, RuntimeStats::default());
1133
1134 let nonexistent_process = procfs::process::Process::new(i32::MAX).ok();
1135 let stats = collect_runtime_stats_inner(nonexistent_process);
1136 assert_eq!(stats, RuntimeStats::default());
1137 }
1138}
1139
1140#[cfg(test)]
1141mod parse_preserve_settings_tests {
1142 use super::*;
1143 #[test]
1144 fn preset_all_returns_preserve_all() {
1145 let settings = parse_preserve_settings("all").unwrap();
1146 let expected = preserve::preserve_all();
1147 assert_eq!(settings.file.mode_mask, expected.file.mode_mask);
1148 assert!(settings.file.user_and_time.uid);
1149 assert!(settings.file.user_and_time.gid);
1150 assert!(settings.file.user_and_time.time);
1151 assert_eq!(settings.dir.mode_mask, expected.dir.mode_mask);
1152 assert!(settings.dir.user_and_time.uid);
1153 assert!(settings.dir.user_and_time.gid);
1154 assert!(settings.dir.user_and_time.time);
1155 assert!(settings.symlink.user_and_time.uid);
1156 assert!(settings.symlink.user_and_time.gid);
1157 assert!(settings.symlink.user_and_time.time);
1158 }
1159 #[test]
1160 fn preset_none_returns_preserve_none() {
1161 let settings = parse_preserve_settings("none").unwrap();
1162 let expected = preserve::preserve_none();
1163 assert_eq!(settings.file.mode_mask, expected.file.mode_mask);
1164 assert!(!settings.file.user_and_time.uid);
1165 assert!(!settings.file.user_and_time.gid);
1166 assert!(!settings.file.user_and_time.time);
1167 assert_eq!(settings.dir.mode_mask, expected.dir.mode_mask);
1168 assert!(!settings.dir.user_and_time.uid);
1169 assert!(!settings.dir.user_and_time.gid);
1170 assert!(!settings.dir.user_and_time.time);
1171 assert!(!settings.symlink.user_and_time.uid);
1172 assert!(!settings.symlink.user_and_time.gid);
1173 assert!(!settings.symlink.user_and_time.time);
1174 }
1175 #[test]
1176 fn per_type_settings_still_work() {
1177 let settings = parse_preserve_settings("f:uid,time,0777 d:gid").unwrap();
1178 assert!(settings.file.user_and_time.uid);
1179 assert!(settings.file.user_and_time.time);
1180 assert!(!settings.file.user_and_time.gid);
1181 assert_eq!(settings.file.mode_mask, 0o777);
1182 assert!(!settings.dir.user_and_time.uid);
1183 assert!(settings.dir.user_and_time.gid);
1184 assert!(!settings.dir.user_and_time.time);
1185 }
1186 #[test]
1187 fn invalid_settings_returns_error() {
1188 assert!(parse_preserve_settings("invalid").is_err());
1189 assert!(parse_preserve_settings("f:unknown_attr").is_err());
1190 }
1191}
1192
1193#[cfg(test)]
1194mod validate_update_compare_vs_preserve_tests {
1195 use super::*;
1196 #[test]
1197 fn detects_mtime_mismatch() {
1198 let compare = filecmp::MetadataCmpSettings {
1199 mtime: true,
1200 ..Default::default()
1201 };
1202 let preserve = preserve::preserve_none();
1203 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1204 assert!(result.is_err());
1205 assert!(result.unwrap_err().contains("mtime"));
1206 }
1207 #[test]
1208 fn detects_uid_mismatch() {
1209 let compare = filecmp::MetadataCmpSettings {
1210 uid: true,
1211 ..Default::default()
1212 };
1213 let preserve = preserve::preserve_none();
1214 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1215 assert!(result.is_err());
1216 assert!(result.unwrap_err().contains("uid"));
1217 }
1218 #[test]
1219 fn detects_gid_mismatch() {
1220 let compare = filecmp::MetadataCmpSettings {
1221 gid: true,
1222 ..Default::default()
1223 };
1224 let preserve = preserve::preserve_none();
1225 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1226 assert!(result.is_err());
1227 assert!(result.unwrap_err().contains("gid"));
1228 }
1229 #[test]
1230 fn detects_mode_mismatch() {
1231 let compare = filecmp::MetadataCmpSettings {
1232 mode: true,
1233 ..Default::default()
1234 };
1235 let mut preserve = preserve::preserve_none();
1236 preserve.file.mode_mask = 0;
1237 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1238 assert!(result.is_err());
1239 assert!(result.unwrap_err().contains("mode"));
1240 }
1241 #[test]
1242 fn detects_multiple_mismatches() {
1243 let compare = filecmp::MetadataCmpSettings {
1244 mtime: true,
1245 uid: true,
1246 gid: true,
1247 ..Default::default()
1248 };
1249 let preserve = preserve::preserve_none();
1250 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1251 assert!(result.is_err());
1252 let err = result.unwrap_err();
1253 assert!(err.contains("mtime"));
1254 assert!(err.contains("uid"));
1255 assert!(err.contains("gid"));
1256 }
1257 #[test]
1258 fn passes_when_preserve_covers_all_compared_attrs() {
1259 let compare = filecmp::MetadataCmpSettings {
1260 mtime: true,
1261 uid: true,
1262 gid: true,
1263 mode: true,
1264 size: true, ctime: true, };
1267 let preserve = preserve::preserve_all();
1268 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1269 assert!(result.is_ok());
1270 }
1271 #[test]
1272 fn fails_with_partial_mode_mask_when_mode_compared() {
1273 let compare = filecmp::MetadataCmpSettings {
1276 mode: true,
1277 ..Default::default()
1278 };
1279 let preserve = preserve::preserve_none();
1280 let result = validate_update_compare_vs_preserve(&compare, &preserve);
1281 assert!(result.is_err());
1282 assert!(result.unwrap_err().contains("mode"));
1283 }
1284}