1#[macro_use]
97extern crate lazy_static;
98
99use crate::cmp::ObjType;
100use anyhow::anyhow;
101use anyhow::Context;
102use std::io::IsTerminal;
103use tracing::instrument;
104use tracing_subscriber::fmt::format::FmtSpan;
105use tracing_subscriber::prelude::*;
106
107pub mod cmp;
108pub mod copy;
109pub mod filegen;
110pub mod link;
111pub mod preserve;
112pub mod remote_tracing;
113pub mod rm;
114
115pub mod filecmp;
116pub mod progress;
117mod testutils;
118
119pub use progress::{RcpdProgressPrinter, SerializableProgress};
120
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, enum_map::Enum)]
123pub enum RcpdType {
124 Source,
125 Destination,
126}
127
128impl std::fmt::Display for RcpdType {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 match self {
131 RcpdType::Source => write!(f, "source"),
132 RcpdType::Destination => write!(f, "destination"),
133 }
134 }
135}
136
137pub type ProgressSnapshot<T> = enum_map::EnumMap<RcpdType, T>;
139
140lazy_static! {
141 static ref PROGRESS: progress::Progress = progress::Progress::new();
142 static ref PBAR: indicatif::ProgressBar = indicatif::ProgressBar::new_spinner();
143}
144
145#[must_use]
146pub fn get_progress() -> &'static progress::Progress {
147 &PROGRESS
148}
149
150struct ProgressTracker {
151 lock_cvar: std::sync::Arc<(std::sync::Mutex<bool>, std::sync::Condvar)>,
152 pbar_thread: Option<std::thread::JoinHandle<()>>,
153}
154
155#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
156pub enum ProgressType {
157 #[default]
158 Auto,
159 ProgressBar,
160 TextUpdates,
161}
162
163pub enum GeneralProgressType {
164 User(ProgressType),
165 Remote(tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>),
166 RemoteMaster(Box<dyn Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static>),
167}
168
169impl std::fmt::Debug for GeneralProgressType {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 match self {
172 GeneralProgressType::User(pt) => write!(f, "User({pt:?})"),
173 GeneralProgressType::Remote(_) => write!(f, "Remote(<sender>)"),
174 GeneralProgressType::RemoteMaster(_) => write!(f, "RemoteMaster(<function>)"),
175 }
176 }
177}
178
179impl std::str::FromStr for ProgressType {
181 type Err = anyhow::Error;
182
183 fn from_str(s: &str) -> Result<Self, Self::Err> {
184 match s {
185 "auto" | "Auto" => Ok(ProgressType::Auto),
186 "ProgressBar" => Ok(ProgressType::ProgressBar),
187 "TextUpdates" => Ok(ProgressType::TextUpdates),
188 _ => Err(anyhow!("Invalid progress type: {}", s)),
189 }
190 }
191}
192
193#[derive(Debug)]
194pub struct ProgressSettings {
195 pub progress_type: GeneralProgressType,
196 pub progress_delay: Option<String>,
197}
198
199fn progress_bar(
200 lock: &std::sync::Mutex<bool>,
201 cvar: &std::sync::Condvar,
202 delay_opt: &Option<std::time::Duration>,
203) {
204 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
205 PBAR.set_style(
206 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
207 .unwrap()
208 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
209 );
210 let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
211 let mut is_done = lock.lock().unwrap();
212 loop {
213 PBAR.set_position(PBAR.position() + 1); PBAR.set_message(prog_printer.print().unwrap());
215 let result = cvar.wait_timeout(is_done, delay).unwrap();
216 is_done = result.0;
217 if *is_done {
218 break;
219 }
220 }
221 PBAR.finish_and_clear();
222}
223
224fn text_updates(
225 lock: &std::sync::Mutex<bool>,
226 cvar: &std::sync::Condvar,
227 delay_opt: &Option<std::time::Duration>,
228) {
229 let delay = delay_opt.unwrap_or(std::time::Duration::from_secs(10));
230 let mut prog_printer = progress::ProgressPrinter::new(&PROGRESS);
231 let mut is_done = lock.lock().unwrap();
232 loop {
233 eprintln!("--{}", prog_printer.print().unwrap());
234 let result = cvar.wait_timeout(is_done, delay).unwrap();
235 is_done = result.0;
236 if *is_done {
237 break;
238 }
239 }
240}
241
242fn rcpd_updates(
243 lock: &std::sync::Mutex<bool>,
244 cvar: &std::sync::Condvar,
245 delay_opt: &Option<std::time::Duration>,
246 sender: tokio::sync::mpsc::UnboundedSender<remote_tracing::TracingMessage>,
247) {
248 tracing::debug!("Starting rcpd progress updates");
249 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
250 let mut is_done = lock.lock().unwrap();
251 loop {
252 if remote_tracing::send_progress_update(&sender, &PROGRESS).is_err() {
253 tracing::debug!("Progress update channel closed, stopping progress updates");
255 break;
256 }
257 let result = cvar.wait_timeout(is_done, delay).unwrap();
258 is_done = result.0;
259 if *is_done {
260 break;
261 }
262 }
263}
264
265fn remote_master_updates<F>(
266 lock: &std::sync::Mutex<bool>,
267 cvar: &std::sync::Condvar,
268 delay_opt: &Option<std::time::Duration>,
269 get_progress_snapshot: F,
270) where
271 F: Fn() -> ProgressSnapshot<SerializableProgress> + Send + 'static,
272{
273 PBAR.set_style(
274 indicatif::ProgressStyle::with_template("{spinner:.cyan} {msg}")
275 .unwrap()
276 .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]),
277 );
278 let delay = delay_opt.unwrap_or(std::time::Duration::from_millis(200));
279 let mut printer = RcpdProgressPrinter::new();
280 let mut is_done = lock.lock().unwrap();
281 loop {
282 let progress_map = get_progress_snapshot();
283 let source_progress = &progress_map[RcpdType::Source];
284 let destination_progress = &progress_map[RcpdType::Destination];
285 PBAR.set_position(PBAR.position() + 1); PBAR.set_message(
287 printer
288 .print(source_progress, destination_progress)
289 .unwrap(),
290 );
291 let result = cvar.wait_timeout(is_done, delay).unwrap();
292 is_done = result.0;
293 if *is_done {
294 break;
295 }
296 }
297 PBAR.finish_and_clear();
298}
299
300impl ProgressTracker {
301 pub fn new(progress_type: GeneralProgressType, delay_opt: Option<std::time::Duration>) -> Self {
302 let lock_cvar =
303 std::sync::Arc::new((std::sync::Mutex::new(false), std::sync::Condvar::new()));
304 let lock_cvar_clone = lock_cvar.clone();
305 let pbar_thread = std::thread::spawn(move || {
306 let (lock, cvar) = &*lock_cvar_clone;
307 match progress_type {
308 GeneralProgressType::Remote(sender) => {
309 rcpd_updates(lock, cvar, &delay_opt, sender);
310 }
311 GeneralProgressType::RemoteMaster(get_progress_snapshot) => {
312 remote_master_updates(lock, cvar, &delay_opt, get_progress_snapshot);
313 }
314 _ => {
315 let interactive = match progress_type {
316 GeneralProgressType::User(ProgressType::Auto) => {
317 std::io::stderr().is_terminal()
318 }
319 GeneralProgressType::User(ProgressType::ProgressBar) => true,
320 GeneralProgressType::User(ProgressType::TextUpdates) => false,
321 GeneralProgressType::Remote(_) | GeneralProgressType::RemoteMaster(_) => {
322 unreachable!("Invalid progress type: {progress_type:?}")
323 }
324 };
325 if interactive {
326 progress_bar(lock, cvar, &delay_opt);
327 } else {
328 text_updates(lock, cvar, &delay_opt);
329 }
330 }
331 }
332 });
333 Self {
334 lock_cvar,
335 pbar_thread: Some(pbar_thread),
336 }
337 }
338}
339
340impl Drop for ProgressTracker {
341 fn drop(&mut self) {
342 let (lock, cvar) = &*self.lock_cvar;
343 let mut is_done = lock.lock().unwrap();
344 *is_done = true;
345 cvar.notify_one();
346 drop(is_done);
347 if let Some(pbar_thread) = self.pbar_thread.take() {
348 pbar_thread.join().unwrap();
349 }
350 }
351}
352
353pub fn parse_metadata_cmp_settings(
354 settings: &str,
355) -> Result<filecmp::MetadataCmpSettings, anyhow::Error> {
356 let mut metadata_cmp_settings = filecmp::MetadataCmpSettings::default();
357 for setting in settings.split(',') {
358 match setting {
359 "uid" => metadata_cmp_settings.uid = true,
360 "gid" => metadata_cmp_settings.gid = true,
361 "mode" => metadata_cmp_settings.mode = true,
362 "size" => metadata_cmp_settings.size = true,
363 "mtime" => metadata_cmp_settings.mtime = true,
364 "ctime" => metadata_cmp_settings.ctime = true,
365 _ => {
366 return Err(anyhow!("Unknown metadata comparison setting: {}", setting));
367 }
368 }
369 }
370 Ok(metadata_cmp_settings)
371}
372
373fn parse_type_settings(
374 settings: &str,
375) -> Result<(preserve::UserAndTimeSettings, Option<preserve::ModeMask>), anyhow::Error> {
376 let mut user_and_time = preserve::UserAndTimeSettings::default();
377 let mut mode_mask = None;
378 for setting in settings.split(',') {
379 match setting {
380 "uid" => user_and_time.uid = true,
381 "gid" => user_and_time.gid = true,
382 "time" => user_and_time.time = true,
383 _ => {
384 if let Ok(mask) = u32::from_str_radix(setting, 8) {
385 mode_mask = Some(mask);
386 } else {
387 return Err(anyhow!("Unknown preserve attribute specified: {}", setting));
388 }
389 }
390 }
391 }
392 Ok((user_and_time, mode_mask))
393}
394
395pub fn parse_preserve_settings(settings: &str) -> Result<preserve::Settings, anyhow::Error> {
396 let mut preserve_settings = preserve::Settings::default();
397 for type_settings in settings.split(' ') {
398 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
399 let (user_and_time_settings, mode_opt) = parse_type_settings(obj_settings).context(
400 format!("parsing preserve settings: {obj_settings}, type: {obj_type}"),
401 )?;
402 match obj_type {
403 "f" | "file" => {
404 preserve_settings.file = preserve::FileSettings::default();
405 preserve_settings.file.user_and_time = user_and_time_settings;
406 if let Some(mode) = mode_opt {
407 preserve_settings.file.mode_mask = mode;
408 }
409 }
410 "d" | "dir" | "directory" => {
411 preserve_settings.dir = preserve::DirSettings::default();
412 preserve_settings.dir.user_and_time = user_and_time_settings;
413 if let Some(mode) = mode_opt {
414 preserve_settings.dir.mode_mask = mode;
415 }
416 }
417 "l" | "link" | "symlink" => {
418 preserve_settings.symlink = preserve::SymlinkSettings::default();
419 preserve_settings.symlink.user_and_time = user_and_time_settings;
420 }
421 _ => {
422 return Err(anyhow!("Unknown object type: {}", obj_type));
423 }
424 }
425 } else {
426 return Err(anyhow!("Invalid preserve settings: {}", settings));
427 }
428 }
429 Ok(preserve_settings)
430}
431
432pub fn parse_compare_settings(settings: &str) -> Result<cmp::ObjSettings, anyhow::Error> {
433 let mut cmp_settings = cmp::ObjSettings::default();
434 for type_settings in settings.split(' ') {
435 if let Some((obj_type, obj_settings)) = type_settings.split_once(':') {
436 let obj_cmp_settings = parse_metadata_cmp_settings(obj_settings).context(format!(
437 "parsing preserve settings: {obj_settings}, type: {obj_type}"
438 ))?;
439 let obj_type = match obj_type {
440 "f" | "file" => ObjType::File,
441 "d" | "dir" | "directory" => ObjType::Dir,
442 "l" | "link" | "symlink" => ObjType::Symlink,
443 _ => {
444 return Err(anyhow!("Unknown obj type: {}", obj_type));
445 }
446 };
447 cmp_settings[obj_type] = obj_cmp_settings;
448 } else {
449 return Err(anyhow!("Invalid preserve settings: {}", settings));
450 }
451 }
452 Ok(cmp_settings)
453}
454
455pub async fn cmp(
456 src: &std::path::Path,
457 dst: &std::path::Path,
458 log: &cmp::LogWriter,
459 settings: &cmp::Settings,
460) -> Result<cmp::Summary, anyhow::Error> {
461 cmp::cmp(&PROGRESS, src, dst, log, settings).await
462}
463
464pub async fn copy(
465 src: &std::path::Path,
466 dst: &std::path::Path,
467 settings: ©::Settings,
468 preserve: &preserve::Settings,
469) -> Result<copy::Summary, copy::Error> {
470 copy::copy(&PROGRESS, src, dst, settings, preserve, false).await
471}
472
473pub async fn rm(path: &std::path::Path, settings: &rm::Settings) -> Result<rm::Summary, rm::Error> {
474 rm::rm(&PROGRESS, path, settings).await
475}
476
477pub async fn link(
478 src: &std::path::Path,
479 dst: &std::path::Path,
480 update: &Option<std::path::PathBuf>,
481 settings: &link::Settings,
482) -> Result<link::Summary, link::Error> {
483 let cwd = std::env::current_dir()
484 .map_err(|err| link::Error::new(anyhow::Error::msg(err), link::Summary::default()))?;
485 link::link(&PROGRESS, &cwd, src, dst, update, settings, false).await
486}
487
488fn read_env_or_default<T: std::str::FromStr>(name: &str, default: T) -> T {
489 match std::env::var(name) {
490 Ok(val) => match val.parse() {
491 Ok(val) => val,
492 Err(_) => default,
493 },
494 Err(_) => default,
495 }
496}
497
498#[rustfmt::skip]
499fn print_runtime_stats() -> Result<(), anyhow::Error> {
500 let process = procfs::process::Process::myself()?;
501 let stat = process.stat()?;
502 let clock_ticks_per_second = procfs::ticks_per_second();
504 let ticks_to_duration = |ticks: u64| {
505 std::time::Duration::from_secs_f64(ticks as f64 / clock_ticks_per_second as f64)
506 };
507 let vmhwm = process.status()?.vmhwm.unwrap_or(0);
508 println!("walltime : {:.2?}", &PROGRESS.get_duration(),);
509 println!("cpu time : {:.2?} | k: {:.2?} | u: {:.2?}", ticks_to_duration(stat.utime + stat.stime), ticks_to_duration(stat.stime), ticks_to_duration(stat.utime));
510 println!("peak RSS : {:.2?}", bytesize::ByteSize(vmhwm));
511 Ok(())
512}
513
514fn get_max_open_files() -> Result<u64, std::io::Error> {
515 let mut rlim = libc::rlimit {
516 rlim_cur: 0,
517 rlim_max: 0,
518 };
519 let result = unsafe { libc::getrlimit(libc::RLIMIT_NOFILE, &raw mut rlim) };
521 if result == 0 {
522 Ok(rlim.rlim_cur)
523 } else {
524 Err(std::io::Error::last_os_error())
525 }
526}
527
528struct ProgWriter {}
529
530impl ProgWriter {
531 fn new() -> Self {
532 Self {}
533 }
534}
535
536impl std::io::Write for ProgWriter {
537 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
538 PBAR.suspend(|| std::io::stdout().write(buf))
539 }
540 fn flush(&mut self) -> std::io::Result<()> {
541 std::io::stdout().flush()
542 }
543}
544
545#[must_use]
546pub fn generate_debug_log_filename(prefix: &str) -> String {
547 let now = chrono::Utc::now();
548 let timestamp = now.format("%Y-%m-%dT%H:%M:%S").to_string();
549 let process_id = std::process::id();
550 format!("{prefix}-{timestamp}-{process_id}")
551}
552
553#[instrument(skip(func))] #[allow(clippy::too_many_arguments)]
555pub fn run<Fut, Summary, Error>(
556 progress: Option<ProgressSettings>,
557 quiet: bool,
558 verbose: u8,
559 print_summary: bool,
560 max_workers: usize,
561 max_blocking_threads: usize,
562 max_open_files: Option<usize>,
563 ops_throttle: usize,
564 iops_throttle: usize,
565 chunk_size: u64,
566 remote_tracing_layer: Option<remote_tracing::RemoteTracingLayer>,
567 debug_log_file: Option<String>,
568 func: impl FnOnce() -> Fut,
569) -> Option<Summary>
570where
572 Summary: std::fmt::Display,
573 Error: std::fmt::Display + std::fmt::Debug,
574 Fut: std::future::Future<Output = Result<Summary, Error>>,
575{
576 if quiet {
577 assert!(
578 verbose == 0,
579 "Quiet mode and verbose mode are mutually exclusive"
580 );
581 } else {
582 let env_filter =
583 tracing_subscriber::EnvFilter::from_default_env().add_directive(match verbose {
584 0 => "error".parse().unwrap(),
585 1 => "info".parse().unwrap(),
586 2 => "debug".parse().unwrap(),
587 _ => "trace".parse().unwrap(),
588 });
589 let file_layer = if let Some(ref log_file_path) = debug_log_file {
590 let file = std::fs::OpenOptions::new()
591 .create(true)
592 .append(true)
593 .open(log_file_path)
594 .unwrap_or_else(|e| {
595 panic!("Failed to create debug log file at '{log_file_path}': {e}")
596 });
597 let file_layer = tracing_subscriber::fmt::layer()
598 .with_target(true)
599 .with_line_number(true)
600 .with_thread_ids(true)
601 .with_ansi(false)
602 .with_writer(file)
603 .with_filter(
604 tracing_subscriber::EnvFilter::from_default_env().add_directive(
605 match verbose {
606 0 => "error".parse().unwrap(),
607 1 => "info".parse().unwrap(),
608 2 => "debug".parse().unwrap(),
609 _ => "trace".parse().unwrap(),
610 },
611 ),
612 );
613 Some(file_layer)
614 } else {
615 None
616 };
617 let fmt_layer = if remote_tracing_layer.is_some() {
618 None
619 } else {
620 let fmt_layer = tracing_subscriber::fmt::layer()
621 .with_target(true)
622 .with_line_number(true)
623 .with_span_events(if verbose > 2 {
624 FmtSpan::NEW | FmtSpan::CLOSE
625 } else {
626 FmtSpan::NONE
627 })
628 .pretty()
629 .with_writer(ProgWriter::new);
630 Some(fmt_layer)
631 };
632 let is_console_enabled = match std::env::var("RCP_TOKIO_TRACING_CONSOLE_ENABLED") {
633 Ok(val) => matches!(val.to_lowercase().as_str(), "true" | "1"),
634 Err(_) => false,
635 };
636 let console_layer = if is_console_enabled {
637 let console_port: u16 =
638 read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_SERVER_PORT", 6669);
639 let retention_seconds: u64 =
640 read_env_or_default("RCP_TOKIO_TRACING_CONSOLE_RETENTION_SECONDS", 60);
641 let console_layer = console_subscriber::ConsoleLayer::builder()
642 .retention(std::time::Duration::from_secs(retention_seconds))
643 .server_addr(([127, 0, 0, 1], console_port))
644 .spawn();
645 Some(console_layer)
646 } else {
647 None
648 };
649 tracing_subscriber::registry()
650 .with(env_filter)
651 .with(file_layer)
652 .with(fmt_layer)
653 .with(remote_tracing_layer)
654 .with(console_layer)
655 .init();
656 }
657 let mut builder = tokio::runtime::Builder::new_multi_thread();
658 builder.enable_all();
659 if max_workers > 0 {
660 builder.worker_threads(max_workers);
661 }
662 if max_blocking_threads > 0 {
663 builder.max_blocking_threads(max_blocking_threads);
664 }
665 if !sysinfo::set_open_files_limit(isize::MAX) {
666 tracing::info!("Failed to update the open files limit (expected on non-linux targets)");
667 }
668 let set_max_open_files = max_open_files.unwrap_or_else(|| {
669 let limit = get_max_open_files().expect(
670 "We failed to query rlimit, if this is expected try specifying --max-open-files",
671 ) as usize;
672 80 * limit / 100 });
674 if set_max_open_files > 0 {
675 tracing::info!("Setting max open files to: {}", set_max_open_files);
676 throttle::set_max_open_files(set_max_open_files);
677 } else {
678 tracing::info!("Not applying any limit to max open files!");
679 }
680 let runtime = builder.build().expect("Failed to create runtime");
681 fn get_replenish_interval(replenish: usize) -> (usize, std::time::Duration) {
682 let mut replenish = replenish;
683 let mut interval = std::time::Duration::from_secs(1);
684 while replenish > 100 && interval > std::time::Duration::from_millis(1) {
685 replenish /= 10;
686 interval /= 10;
687 }
688 (replenish, interval)
689 }
690 if ops_throttle > 0 {
691 let (replenish, interval) = get_replenish_interval(ops_throttle);
692 throttle::init_ops_tokens(replenish);
693 runtime.spawn(throttle::run_ops_replenish_thread(replenish, interval));
694 }
695 if iops_throttle > 0 {
696 if chunk_size == 0 {
697 tracing::error!("Chunk size must be specified when using --iops-throttle");
698 return None;
699 }
700 let (replenish, interval) = get_replenish_interval(iops_throttle);
701 throttle::init_iops_tokens(replenish);
702 runtime.spawn(throttle::run_iops_replenish_thread(replenish, interval));
703 } else if chunk_size > 0 {
704 tracing::error!(
705 "--chunk-size > 0 but --iops-throttle is 0 -- did you intend to use --iops-throttle?"
706 );
707 return None;
708 }
709 let res = {
710 let _progress = progress.map(|settings| {
711 tracing::debug!("Requesting progress updates {settings:?}");
712 let delay = settings.progress_delay.map(|delay_str| {
713 humantime::parse_duration(&delay_str)
714 .expect("Couldn't parse duration out of --progress-delay")
715 });
716 ProgressTracker::new(settings.progress_type, delay)
717 });
718 runtime.block_on(func())
719 };
720 match &res {
721 Ok(summary) => {
722 if print_summary || verbose > 0 {
723 println!("{summary}");
724 }
725 }
726 Err(err) => {
727 if !quiet {
728 println!("{err:?}");
729 }
730 }
731 }
732 if print_summary || verbose > 0 {
733 if let Err(err) = print_runtime_stats() {
734 println!("Failed to print runtime stats: {err:?}");
735 }
736 }
737 res.ok()
738}