Skip to main content

common/
cmp.rs

1use anyhow::{Context, Result};
2use async_recursion::async_recursion;
3use enum_map::{Enum, EnumMap};
4use tokio::io::AsyncWriteExt;
5use tracing::instrument;
6
7use crate::copy::is_file_type_same;
8use crate::filecmp;
9use crate::progress;
10
11#[derive(Copy, Clone, Debug, Enum)]
12pub enum CompareResult {
13    Same,
14    Different,
15    SrcMissing, // object missing in src but present in dst
16    DstMissing, // same as above but flipped
17}
18
19#[derive(Copy, Clone, Debug, Enum)]
20pub enum ObjType {
21    File,
22    Dir,
23    Symlink,
24    Other, // sockets, block devices, character devices, FIFOs, etc.
25}
26
27pub type ObjSettings = EnumMap<ObjType, filecmp::MetadataCmpSettings>;
28
29#[derive(Debug, Clone)]
30pub struct Settings {
31    pub compare: ObjSettings,
32    pub fail_early: bool,
33    pub exit_early: bool,
34    pub expand_missing: bool,
35    pub filter: Option<crate::filter::FilterSettings>,
36}
37
38pub type Mismatch = EnumMap<ObjType, EnumMap<CompareResult, u64>>;
39
40/// Count of skipped items per object type
41pub type Skipped = EnumMap<ObjType, u64>;
42
43/// Output format for comparison results and summary.
44#[derive(Copy, Clone, Debug, Default, clap::ValueEnum)]
45pub enum OutputFormat {
46    /// JSON output (NDJSON for differences, JSON object for summary)
47    #[default]
48    Json,
49    /// Human-readable text output (legacy format)
50    Text,
51}
52
53fn compare_result_name(cr: CompareResult) -> &'static str {
54    match cr {
55        CompareResult::Same => "same",
56        CompareResult::Different => "different",
57        CompareResult::SrcMissing => "src_missing",
58        CompareResult::DstMissing => "dst_missing",
59    }
60}
61
62fn obj_type_name(ot: ObjType) -> &'static str {
63    match ot {
64        ObjType::File => "file",
65        ObjType::Dir => "dir",
66        ObjType::Symlink => "symlink",
67        ObjType::Other => "other",
68    }
69}
70
71/// Encodes a path as a JSON-safe string that is round-trippable for arbitrary
72/// Unix paths. Literal backslashes are escaped as `\\`, and non-UTF-8 bytes
73/// are escaped as `\xHH`. To decode, first parse the JSON string, then scan
74/// left-to-right: `\\` → literal `\`, `\xHH` → raw byte, all other characters
75/// are literal UTF-8.
76fn path_to_json_string(path: &std::path::Path) -> String {
77    use std::os::unix::ffi::OsStrExt;
78    let bytes = path.as_os_str().as_bytes();
79    let mut out = String::with_capacity(bytes.len());
80    for chunk in bytes.utf8_chunks() {
81        for c in chunk.valid().chars() {
82            if c == '\\' {
83                out.push_str("\\\\");
84            } else {
85                out.push(c);
86            }
87        }
88        for &b in chunk.invalid() {
89            use std::fmt::Write;
90            write!(out, "\\x{b:02x}").unwrap();
91        }
92    }
93    out
94}
95
96#[derive(Default)]
97pub struct Summary {
98    pub mismatch: Mismatch,
99    pub skipped: Skipped,
100    /// Total size of regular files compared on the source side, in bytes.
101    pub src_bytes: u64,
102    /// Total size of regular files compared on the destination side, in bytes.
103    pub dst_bytes: u64,
104}
105
106impl std::ops::Add for Summary {
107    type Output = Self;
108    fn add(self, other: Self) -> Self {
109        let mut mismatch = self.mismatch;
110        for (obj_type, &cmp_res_map) in &other.mismatch {
111            for (cmp_res, &count) in &cmp_res_map {
112                mismatch[obj_type][cmp_res] += count;
113            }
114        }
115        let mut skipped = self.skipped;
116        for (obj_type, &count) in &other.skipped {
117            skipped[obj_type] += count;
118        }
119        Self {
120            mismatch,
121            skipped,
122            src_bytes: self.src_bytes + other.src_bytes,
123            dst_bytes: self.dst_bytes + other.dst_bytes,
124        }
125    }
126}
127
128impl std::fmt::Display for Summary {
129    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
130        writeln!(
131            f,
132            "src size (compared): {}",
133            bytesize::ByteSize(self.src_bytes)
134        )?;
135        writeln!(
136            f,
137            "dst size (compared): {}",
138            bytesize::ByteSize(self.dst_bytes)
139        )?;
140        for (obj_type, &cmp_res_map) in &self.mismatch {
141            for (cmp_res, &count) in &cmp_res_map {
142                writeln!(f, "{obj_type:?} {cmp_res:?}: {count}")?;
143            }
144        }
145        for (obj_type, &count) in &self.skipped {
146            if count > 0 {
147                writeln!(f, "{obj_type:?} Skipped: {count}")?;
148            }
149        }
150        Ok(())
151    }
152}
153
154/// Wraps a [`Summary`] with an [`OutputFormat`] so that [`Display`](std::fmt::Display)
155/// renders either human-readable text or JSON.
156pub struct FormattedSummary {
157    pub summary: Summary,
158    pub format: OutputFormat,
159}
160
161impl std::fmt::Display for FormattedSummary {
162    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
163        match self.format {
164            OutputFormat::Text => write!(f, "{}", self.summary),
165            OutputFormat::Json => {
166                let mut mismatch = serde_json::Map::new();
167                for (obj_type, &cmp_res_map) in &self.summary.mismatch {
168                    let mut counts = serde_json::Map::new();
169                    for (cmp_res, &count) in &cmp_res_map {
170                        counts.insert(
171                            compare_result_name(cmp_res).to_string(),
172                            serde_json::Value::Number(count.into()),
173                        );
174                    }
175                    mismatch.insert(
176                        obj_type_name(obj_type).to_string(),
177                        serde_json::Value::Object(counts),
178                    );
179                }
180                let mut skipped = serde_json::Map::new();
181                for (obj_type, &count) in &self.summary.skipped {
182                    if count > 0 {
183                        skipped.insert(
184                            obj_type_name(obj_type).to_string(),
185                            serde_json::Value::Number(count.into()),
186                        );
187                    }
188                }
189                let stats = crate::collect_runtime_stats();
190                let walltime = crate::get_progress().get_duration();
191                let obj = serde_json::json!({
192                    "src_bytes": self.summary.src_bytes,
193                    "dst_bytes": self.summary.dst_bytes,
194                    "mismatch": serde_json::Value::Object(mismatch),
195                    "skipped": serde_json::Value::Object(skipped),
196                    "walltime_ms": walltime.as_millis() as u64,
197                    "cpu_time_user_ms": stats.cpu_time_user_ms,
198                    "cpu_time_kernel_ms": stats.cpu_time_kernel_ms,
199                    "peak_rss_bytes": stats.peak_rss_bytes,
200                });
201                write!(f, "{obj}")
202            }
203        }
204    }
205}
206
207#[derive(Clone)]
208pub struct LogWriter {
209    file: Option<std::sync::Arc<tokio::sync::Mutex<tokio::io::BufWriter<tokio::fs::File>>>>,
210    stdout: Option<std::sync::Arc<tokio::sync::Mutex<tokio::io::BufWriter<tokio::io::Stdout>>>>,
211    format: OutputFormat,
212}
213
214impl std::fmt::Debug for LogWriter {
215    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
216        f.debug_struct("LogWriter")
217            .field("file", &self.file.is_some())
218            .field("stdout", &self.stdout.is_some())
219            .field("format", &self.format)
220            .finish()
221    }
222}
223
224impl LogWriter {
225    /// Creates a new LogWriter.
226    ///
227    /// If `log_path_opt` is provided, output goes to that file.
228    /// Otherwise, if `use_stdout` is true, output goes to stdout.
229    /// If both are false/None, no output is produced.
230    pub async fn new(
231        log_path_opt: Option<&std::path::Path>,
232        use_stdout: bool,
233        format: OutputFormat,
234    ) -> Result<Self> {
235        if let Some(log_path) = log_path_opt {
236            let log_file = tokio::fs::OpenOptions::new()
237                .write(true)
238                .create_new(true)
239                .open(log_path)
240                .await
241                .with_context(|| format!("Failed to open log file: {log_path:?}"))?;
242            let log =
243                std::sync::Arc::new(tokio::sync::Mutex::new(tokio::io::BufWriter::new(log_file)));
244            Ok(Self {
245                file: Some(log),
246                stdout: None,
247                format,
248            })
249        } else if use_stdout {
250            Ok(Self {
251                file: None,
252                stdout: Some(std::sync::Arc::new(tokio::sync::Mutex::new(
253                    tokio::io::BufWriter::new(tokio::io::stdout()),
254                ))),
255                format,
256            })
257        } else {
258            Ok(Self {
259                file: None,
260                stdout: None,
261                format,
262            })
263        }
264    }
265    /// Creates a silent LogWriter that produces no output, using the default format.
266    /// Convenience constructor primarily for tests.
267    pub async fn silent() -> Result<Self> {
268        Self::new(None, false, OutputFormat::default()).await
269    }
270
271    pub async fn log_mismatch(
272        &self,
273        cmp_result: CompareResult,
274        src_obj_type: Option<ObjType>,
275        src: &std::path::Path,
276        dst_obj_type: Option<ObjType>,
277        dst: &std::path::Path,
278    ) -> Result<()> {
279        let msg = match self.format {
280            OutputFormat::Text => {
281                format!(
282                    "[{cmp_result:?}]\n\t[{src_obj_type:?}]\t{src:?}\n\t[{dst_obj_type:?}]\t{dst:?}\n"
283                )
284            }
285            OutputFormat::Json => {
286                let src_type_val = match src_obj_type {
287                    Some(ot) => serde_json::Value::String(obj_type_name(ot).to_string()),
288                    None => serde_json::Value::Null,
289                };
290                let dst_type_val = match dst_obj_type {
291                    Some(ot) => serde_json::Value::String(obj_type_name(ot).to_string()),
292                    None => serde_json::Value::Null,
293                };
294                let obj = serde_json::json!({
295                    "result": compare_result_name(cmp_result),
296                    "src_type": src_type_val,
297                    "src": path_to_json_string(src),
298                    "dst_type": dst_type_val,
299                    "dst": path_to_json_string(dst),
300                });
301                format!("{obj}\n")
302            }
303        };
304        self.write(&msg).await
305    }
306
307    async fn write(&self, msg: &str) -> Result<()> {
308        if let Some(log) = &self.file {
309            let mut log = log.lock().await;
310            log.write_all(msg.as_bytes())
311                .await
312                .context("Failed to write to log file")?;
313        }
314        if let Some(stdout) = &self.stdout {
315            let mut stdout = stdout.lock().await;
316            stdout
317                .write_all(msg.as_bytes())
318                .await
319                .context("Failed to write to stdout")?;
320        }
321        Ok(())
322    }
323
324    pub async fn flush(&self) -> Result<()> {
325        if let Some(log) = &self.file {
326            let mut log = log.lock().await;
327            log.flush().await.context("Failed to flush log file")?;
328        }
329        if let Some(stdout) = &self.stdout {
330            let mut stdout = stdout.lock().await;
331            stdout.flush().await.context("Failed to flush stdout")?;
332        }
333        Ok(())
334    }
335}
336
337fn obj_type(metadata: &std::fs::Metadata) -> ObjType {
338    if metadata.is_file() {
339        ObjType::File
340    } else if metadata.is_dir() {
341        ObjType::Dir
342    } else if metadata.is_symlink() {
343        ObjType::Symlink
344    } else {
345        // sockets, block devices, character devices, FIFOs, etc.
346        ObjType::Other
347    }
348}
349
350/// Public entry point for compare operations.
351/// Internally delegates to cmp_internal with source_root/dest_root tracking for proper filter matching.
352#[instrument(skip(prog_track))]
353pub async fn cmp(
354    prog_track: &'static progress::Progress,
355    src: &std::path::Path,
356    dst: &std::path::Path,
357    log: &LogWriter,
358    settings: &Settings,
359) -> Result<Summary> {
360    cmp_internal(prog_track, src, dst, src, dst, log, settings).await
361}
362
363/// Recursively walks a directory tree on the existing side and records every entry as missing
364/// on the other side.
365#[instrument(skip(prog_track))]
366#[async_recursion]
367async fn expand_missing_tree(
368    prog_track: &'static progress::Progress,
369    existing_path: &std::path::Path,
370    mirror_path: &std::path::Path,
371    existing_root: &std::path::Path,
372    result: CompareResult,
373    log: &LogWriter,
374    settings: &Settings,
375) -> Result<Summary> {
376    let _prog_guard = prog_track.ops.guard();
377    // The side we probe against is fully determined by which tree is
378    // missing: `DstMissing` means we're enumerating src, `SrcMissing`
379    // means we're enumerating dst. `Same` / `Different` are never
380    // passed in (the only two call sites pass the `*Missing` variants),
381    // but be defensive and default to source.
382    let side = match result {
383        CompareResult::DstMissing => congestion::Side::Source,
384        CompareResult::SrcMissing => congestion::Side::Destination,
385        CompareResult::Same | CompareResult::Different => congestion::Side::Source,
386    };
387    let metadata = crate::walk::run_metadata_probed(
388        side,
389        congestion::MetadataOp::Stat,
390        tokio::fs::symlink_metadata(existing_path),
391    )
392    .await
393    .with_context(|| format!("failed reading metadata from {:?}", &existing_path))?;
394    let existing_obj_type = obj_type(&metadata);
395    let mut summary = Summary::default();
396    summary.mismatch[existing_obj_type][result] += 1;
397    // track file sizes on the appropriate side
398    if metadata.is_file() {
399        match result {
400            CompareResult::DstMissing => summary.src_bytes += metadata.len(),
401            CompareResult::SrcMissing => summary.dst_bytes += metadata.len(),
402            _ => {}
403        }
404    }
405    match result {
406        CompareResult::DstMissing => {
407            log.log_mismatch(
408                result,
409                Some(existing_obj_type),
410                existing_path,
411                None,
412                mirror_path,
413            )
414            .await?;
415        }
416        CompareResult::SrcMissing => {
417            log.log_mismatch(
418                result,
419                None,
420                mirror_path,
421                Some(existing_obj_type),
422                existing_path,
423            )
424            .await?;
425        }
426        _ => {}
427    }
428    if settings.exit_early {
429        return Ok(summary);
430    }
431    if !metadata.is_dir() {
432        return Ok(summary);
433    }
434    let mut entries = tokio::fs::read_dir(existing_path)
435        .await
436        .with_context(|| format!("cannot open directory {:?} for reading", &existing_path))?;
437    let mut join_set = tokio::task::JoinSet::new();
438    let errors = crate::error_collector::ErrorCollector::default();
439    loop {
440        let Some((entry, entry_file_type)) =
441            crate::walk::next_entry_probed(&mut entries, side, || {
442                format!("failed traversing directory {:?}", &existing_path)
443            })
444            .await?
445        else {
446            break;
447        };
448        let entry_path = entry.path();
449        let entry_name = entry_path.file_name().unwrap();
450        // apply filter if configured
451        if let Some(ref filter) = settings.filter {
452            let relative_path = entry_path
453                .strip_prefix(existing_root)
454                .unwrap_or(&entry_path);
455            let is_dir = entry_file_type.map(|ft| ft.is_dir()).unwrap_or(false);
456            if !matches!(
457                filter.should_include(relative_path, is_dir),
458                crate::filter::FilterResult::Included
459            ) {
460                // increment skipped counter based on entry type
461                let entry_obj_type = if is_dir {
462                    ObjType::Dir
463                } else if entry_file_type.map(|ft| ft.is_symlink()).unwrap_or(false) {
464                    ObjType::Symlink
465                } else {
466                    ObjType::File
467                };
468                summary.skipped[entry_obj_type] += 1;
469                continue;
470            }
471        }
472        let child_mirror = mirror_path.join(entry_name);
473        let log = log.clone();
474        let settings = settings.clone();
475        let existing_root = existing_root.to_owned();
476        // for positively-known leaf entries (file/symlink/special), acquire
477        // the pending-meta permit BEFORE spawning so we don't create
478        // unbounded tasks. We deliberately skip pre-acquire when
479        // `entry_file_type` is None: the entry could actually be a directory,
480        // and chained unknown-typed directories holding permits while
481        // recursing would deadlock the pending-meta pool. Known directories
482        // also skip pre-acquire. We use the pending-meta semaphore (not
483        // open-files) because cmp doesn't hold fds; decoupling avoids
484        // contention with concurrent copy paths that hold open-files permits.
485        let known_leaf = entry_file_type.is_some_and(|ft| !ft.is_dir());
486        let pending_guard = if known_leaf {
487            Some(throttle::pending_meta_permit().await)
488        } else {
489            None
490        };
491        join_set.spawn(async move {
492            let _pending_guard = pending_guard;
493            expand_missing_tree(
494                prog_track,
495                &entry_path,
496                &child_mirror,
497                &existing_root,
498                result,
499                &log,
500                &settings,
501            )
502            .await
503        });
504    }
505    drop(entries);
506    while let Some(res) = join_set.join_next().await {
507        match res? {
508            Ok(child_summary) => summary = summary + child_summary,
509            Err(error) => {
510                tracing::error!(
511                    "expand_missing_tree: {:?} failed with: {:#}",
512                    existing_path,
513                    &error
514                );
515                errors.push(error);
516                if settings.fail_early {
517                    break;
518                }
519            }
520        }
521    }
522    if let Some(err) = errors.into_error() {
523        return Err(err);
524    }
525    Ok(summary)
526}
527
528#[instrument(skip(prog_track))]
529#[async_recursion]
530async fn cmp_internal(
531    prog_track: &'static progress::Progress,
532    src: &std::path::Path,
533    dst: &std::path::Path,
534    source_root: &std::path::Path,
535    dest_root: &std::path::Path,
536    log: &LogWriter,
537    settings: &Settings,
538) -> Result<Summary> {
539    let _prog_guard = prog_track.ops.guard();
540    tracing::debug!("reading source metadata");
541    // it is impossible for src not exist other than user passing invalid path (which is an error)
542    let src_metadata = crate::walk::run_metadata_probed(
543        congestion::Side::Source,
544        congestion::MetadataOp::Stat,
545        tokio::fs::symlink_metadata(src),
546    )
547    .await
548    .with_context(|| format!("failed reading metadata from {:?}", &src))?;
549    // apply filter to root item (when src == source_root, this is the initial call)
550    if src == source_root
551        && let Some(filter) = &settings.filter
552        && let Some(name) = src.file_name()
553    {
554        let is_dir = src_metadata.is_dir();
555        if !matches!(
556            filter.should_include_root_item(name.as_ref(), is_dir),
557            crate::filter::FilterResult::Included
558        ) {
559            // root item filtered out, return summary with skipped count
560            let src_obj_type = obj_type(&src_metadata);
561            let mut summary = Summary::default();
562            summary.skipped[src_obj_type] += 1;
563            return Ok(summary);
564        }
565    }
566    let mut cmp_summary = Summary::default();
567    let src_obj_type = obj_type(&src_metadata);
568    // track file sizes for the summary
569    if src_metadata.is_file() {
570        cmp_summary.src_bytes += src_metadata.len();
571    }
572    let dst_metadata = {
573        let probed = crate::walk::run_metadata_probed(
574            congestion::Side::Destination,
575            congestion::MetadataOp::Stat,
576            tokio::fs::symlink_metadata(dst),
577        )
578        .await;
579        match probed {
580            Ok(metadata) => metadata,
581            Err(err) => {
582                if err.kind() == std::io::ErrorKind::NotFound {
583                    if settings.expand_missing && src_metadata.is_dir() {
584                        let expanded = expand_missing_tree(
585                            prog_track,
586                            src,
587                            dst,
588                            source_root,
589                            CompareResult::DstMissing,
590                            log,
591                            settings,
592                        )
593                        .await?;
594                        cmp_summary = cmp_summary + expanded;
595                    } else {
596                        cmp_summary.mismatch[src_obj_type][CompareResult::DstMissing] += 1;
597                        log.log_mismatch(
598                            CompareResult::DstMissing,
599                            Some(src_obj_type),
600                            src,
601                            None,
602                            dst,
603                        )
604                        .await?;
605                    }
606                    return Ok(cmp_summary);
607                }
608                return Err(err).context(format!("failed reading metadata from {:?}", &dst));
609            }
610        }
611    };
612    if dst_metadata.is_file() {
613        cmp_summary.dst_bytes += dst_metadata.len();
614    }
615    if !is_file_type_same(&src_metadata, &dst_metadata)
616        || !filecmp::metadata_equal(
617            &settings.compare[src_obj_type],
618            &src_metadata,
619            &dst_metadata,
620        )
621    {
622        // we use the src type for the summary attribution
623        cmp_summary.mismatch[src_obj_type][CompareResult::Different] += 1;
624        let dst_obj_type = obj_type(&dst_metadata);
625        log.log_mismatch(
626            CompareResult::Different,
627            Some(src_obj_type),
628            src,
629            Some(dst_obj_type),
630            dst,
631        )
632        .await?;
633        if settings.exit_early {
634            return Ok(cmp_summary);
635        }
636    } else {
637        cmp_summary.mismatch[src_obj_type][CompareResult::Same] += 1;
638    }
639    if !src_metadata.is_dir() || !dst_metadata.is_dir() {
640        // nothing more to do
641        return Ok(cmp_summary);
642    }
643    tracing::debug!("process contents of 'src' directory");
644    let mut src_entries = tokio::fs::read_dir(src)
645        .await
646        .with_context(|| format!("cannot open directory {src:?} for reading"))?;
647    let mut join_set = tokio::task::JoinSet::new();
648    let errors = crate::error_collector::ErrorCollector::default();
649    // create a set of all the files we already processed
650    let mut processed_files = std::collections::HashSet::new();
651    // iterate through src entries and recursively call "cmp" on each one
652    loop {
653        let Some((src_entry, entry_file_type)) =
654            crate::walk::next_entry_probed(&mut src_entries, congestion::Side::Source, || {
655                format!("failed traversing directory {:?}", &src)
656            })
657            .await?
658        else {
659            break;
660        };
661        let entry_path = src_entry.path();
662        let entry_name = entry_path.file_name().unwrap();
663        // apply filter if configured
664        if let Some(ref filter) = settings.filter {
665            // compute relative path from source_root for filter matching
666            let relative_path = entry_path.strip_prefix(source_root).unwrap_or(&entry_path);
667            let is_dir = entry_file_type.map(|ft| ft.is_dir()).unwrap_or(false);
668            if !matches!(
669                filter.should_include(relative_path, is_dir),
670                crate::filter::FilterResult::Included
671            ) {
672                // increment skipped counter based on entry type
673                let entry_obj_type = if is_dir {
674                    ObjType::Dir
675                } else if entry_file_type.map(|ft| ft.is_symlink()).unwrap_or(false) {
676                    ObjType::Symlink
677                } else {
678                    ObjType::File
679                };
680                cmp_summary.skipped[entry_obj_type] += 1;
681                continue;
682            }
683        }
684        processed_files.insert(entry_name.to_owned());
685        let dst_path = dst.join(entry_name);
686        let log = log.clone();
687        let settings = settings.clone();
688        let source_root = source_root.to_owned();
689        let dest_root = dest_root.to_owned();
690        // for positively-known leaf entries (file/symlink/special), acquire
691        // the pending-meta permit BEFORE spawning so we don't create
692        // unbounded tasks. We deliberately skip pre-acquire when
693        // `entry_file_type` is None: the entry could actually be a directory,
694        // and chained unknown-typed directories holding permits while
695        // recursing would deadlock the pending-meta pool. Known directories
696        // also skip pre-acquire. We use the pending-meta semaphore (not
697        // open-files) because cmp doesn't hold fds; decoupling avoids
698        // contention with concurrent copy paths that hold open-files permits.
699        let known_leaf = entry_file_type.is_some_and(|ft| !ft.is_dir());
700        let pending_guard = if known_leaf {
701            Some(throttle::pending_meta_permit().await)
702        } else {
703            None
704        };
705        let do_cmp = || async move {
706            let _pending_guard = pending_guard;
707            cmp_internal(
708                prog_track,
709                &entry_path,
710                &dst_path,
711                &source_root,
712                &dest_root,
713                &log,
714                &settings,
715            )
716            .await
717        };
718        join_set.spawn(do_cmp());
719    }
720    // unfortunately ReadDir is opening file-descriptors and there's not a good way to limit this,
721    // one thing we CAN do however is to drop it as soon as we're done with it
722    drop(src_entries);
723    tracing::debug!("process contents of 'dst' directory");
724    let mut dst_entries = tokio::fs::read_dir(dst)
725        .await
726        .with_context(|| format!("cannot open directory {:?} for reading", &dst))?;
727    // iterate through update entries and log each one that's not present in src
728    loop {
729        let Some((dst_entry, entry_file_type)) =
730            crate::walk::next_entry_probed(&mut dst_entries, congestion::Side::Destination, || {
731                format!("failed traversing directory {:?}", &dst)
732            })
733            .await?
734        else {
735            break;
736        };
737        let entry_path = dst_entry.path();
738        let entry_name = entry_path.file_name().unwrap();
739        if processed_files.contains(entry_name) {
740            // we already must have considered this file, skip it
741            continue;
742        }
743        // apply filter if configured - if this entry would be filtered, don't report as missing
744        if let Some(ref filter) = settings.filter {
745            // compute relative path from dest_root for filter matching
746            let relative_path = entry_path.strip_prefix(dest_root).unwrap_or(&entry_path);
747            let is_dir = entry_file_type.map(|ft| ft.is_dir()).unwrap_or(false);
748            if !matches!(
749                filter.should_include(relative_path, is_dir),
750                crate::filter::FilterResult::Included
751            ) {
752                // increment skipped counter based on entry type
753                let entry_obj_type = if is_dir {
754                    ObjType::Dir
755                } else if entry_file_type.map(|ft| ft.is_symlink()).unwrap_or(false) {
756                    ObjType::Symlink
757                } else {
758                    ObjType::File
759                };
760                cmp_summary.skipped[entry_obj_type] += 1;
761                continue;
762            }
763        }
764        tracing::debug!("found a new entry in the 'dst' directory");
765        let dst_path = dst.join(entry_name);
766        let dst_entry_metadata = crate::walk::run_metadata_probed(
767            congestion::Side::Destination,
768            congestion::MetadataOp::Stat,
769            tokio::fs::symlink_metadata(&dst_path),
770        )
771        .await
772        .with_context(|| format!("failed reading metadata from {:?}", &dst_path))?;
773        let dst_obj_type = obj_type(&dst_entry_metadata);
774        if settings.expand_missing && dst_entry_metadata.is_dir() {
775            match expand_missing_tree(
776                prog_track,
777                &dst_path,
778                &src.join(entry_name),
779                dest_root,
780                CompareResult::SrcMissing,
781                log,
782                settings,
783            )
784            .await
785            {
786                Ok(expanded) => cmp_summary = cmp_summary + expanded,
787                Err(error) => {
788                    tracing::error!(
789                        "expand_missing_tree: {:?} failed with: {:#}",
790                        &dst_path,
791                        &error
792                    );
793                    errors.push(error);
794                    if settings.fail_early {
795                        // unwrap is safe: we just pushed an error
796                        return Err(errors.into_error().unwrap());
797                    }
798                }
799            }
800        } else {
801            if dst_entry_metadata.is_file() {
802                cmp_summary.dst_bytes += dst_entry_metadata.len();
803            }
804            cmp_summary.mismatch[dst_obj_type][CompareResult::SrcMissing] += 1;
805            log.log_mismatch(
806                CompareResult::SrcMissing,
807                None,
808                &src.join(entry_name),
809                Some(dst_obj_type),
810                &dst_path,
811            )
812            .await?;
813        }
814    }
815    // unfortunately ReadDir is opening file-descriptors and there's not a good way to limit this,
816    // one thing we CAN do however is to drop it as soon as we're done with it
817    drop(dst_entries);
818    while let Some(res) = join_set.join_next().await {
819        match res? {
820            Ok(summary) => cmp_summary = cmp_summary + summary,
821            Err(error) => {
822                tracing::error!("cmp: {:?} vs {:?} failed with: {:#}", src, dst, &error);
823                errors.push(error);
824                if settings.fail_early {
825                    break;
826                }
827            }
828        }
829    }
830    if let Some(err) = errors.into_error() {
831        return Err(err);
832    }
833    Ok(cmp_summary)
834}
835
836#[cfg(test)]
837mod cmp_tests {
838    use crate::copy;
839    use crate::preserve;
840    use crate::testutils;
841    use enum_map::enum_map;
842    use tracing_test::traced_test;
843
844    use super::*;
845
846    static PROGRESS: std::sync::LazyLock<progress::Progress> =
847        std::sync::LazyLock::new(progress::Progress::new);
848    static NO_PRESERVE_SETTINGS: std::sync::LazyLock<preserve::Settings> =
849        std::sync::LazyLock::new(preserve::preserve_none);
850    static DO_PRESERVE_SETTINGS: std::sync::LazyLock<preserve::Settings> =
851        std::sync::LazyLock::new(preserve::preserve_all);
852
853    async fn setup_test_dirs(preserve: bool) -> Result<std::path::PathBuf> {
854        let tmp_dir = testutils::setup_test_dir().await?;
855        let test_path = tmp_dir.as_path();
856        copy::copy(
857            &PROGRESS,
858            &test_path.join("foo"),
859            &test_path.join("bar"),
860            &copy::Settings {
861                dereference: false,
862                fail_early: false,
863                overwrite: false,
864                overwrite_compare: filecmp::MetadataCmpSettings {
865                    size: true,
866                    mtime: true,
867                    ..Default::default()
868                },
869                overwrite_filter: None,
870                ignore_existing: false,
871                chunk_size: 0,
872                skip_specials: false,
873                remote_copy_buffer_size: 0,
874                filter: None,
875                dry_run: None,
876            },
877            if preserve {
878                &DO_PRESERVE_SETTINGS
879            } else {
880                &NO_PRESERVE_SETTINGS
881            },
882            false,
883        )
884        .await?;
885        Ok(tmp_dir)
886    }
887
888    async fn truncate_file(path: &str) -> Result<()> {
889        let file = tokio::fs::File::create(path).await?;
890        file.set_len(0).await?;
891        Ok(())
892    }
893
894    #[tokio::test]
895    #[traced_test]
896    async fn check_basic_cmp() -> Result<()> {
897        let tmp_dir = setup_test_dirs(true).await?;
898        // drop 1 file from src
899        tokio::fs::remove_file(&tmp_dir.join("foo").join("bar").join("1.txt")).await?;
900        // sleep to ensure mtime is different, this acts as a poor-mans barrier
901        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
902        // modify 1 file in dst
903        truncate_file(
904            tmp_dir
905                .join("bar")
906                .join("baz")
907                .join("4.txt")
908                .to_str()
909                .unwrap(),
910        )
911        .await?;
912        // drop 1 (other) file from dst
913        tokio::fs::remove_file(&tmp_dir.join("bar").join("bar").join("2.txt")).await?;
914        // create one more file in dst -- this will also modify the mtime of the directory
915        tokio::fs::File::create(&tmp_dir.join("bar").join("baz").join("7.txt")).await?;
916        let compare_settings = Settings {
917            fail_early: false,
918            exit_early: false,
919            expand_missing: false,
920            compare: enum_map! {
921                ObjType::File => filecmp::MetadataCmpSettings {
922                    size: true,
923                    mtime: true,
924                    ..Default::default()
925                },
926                ObjType::Dir => filecmp::MetadataCmpSettings {
927                    mtime: true,
928                    ..Default::default()
929                },
930                ObjType::Symlink => filecmp::MetadataCmpSettings {
931                    mtime: true,
932                    ..Default::default()
933                },
934                ObjType::Other => filecmp::MetadataCmpSettings {
935                    mtime: true,
936                    ..Default::default()
937                },
938            },
939            filter: None,
940        };
941        let summary = cmp(
942            &PROGRESS,
943            &tmp_dir.join("foo"),
944            &tmp_dir.join("bar"),
945            &LogWriter::new(
946                Some(tmp_dir.join("cmp.log").as_path()),
947                false,
948                OutputFormat::Text,
949            )
950            .await?,
951            &compare_settings,
952        )
953        .await?;
954        let mismatch: Mismatch = enum_map! {
955            ObjType::File => enum_map! {
956                CompareResult::Different => 1,
957                CompareResult::Same => 2,
958                CompareResult::SrcMissing => 2,
959                CompareResult::DstMissing => 1,
960            },
961            ObjType::Dir => enum_map! {
962                CompareResult::Different => 2,
963                CompareResult::Same => 1,
964                CompareResult::SrcMissing => 0,
965                CompareResult::DstMissing => 0,
966            },
967            ObjType::Symlink => enum_map! {
968                CompareResult::Different => 0,
969                CompareResult::Same => 2,
970                CompareResult::SrcMissing => 0,
971                CompareResult::DstMissing => 0,
972            },
973            ObjType::Other => enum_map! {
974                CompareResult::Different => 0,
975                CompareResult::Same => 0,
976                CompareResult::SrcMissing => 0,
977                CompareResult::DstMissing => 0,
978            },
979        };
980        assert_eq!(summary.mismatch, mismatch);
981        // src has 4 regular files of 1 byte each (0.txt, bar/2.txt, bar/3.txt, baz/4.txt)
982        assert_eq!(summary.src_bytes, 4);
983        // dst has: 0.txt(1B), bar/1.txt(1B, SrcMissing), bar/3.txt(1B), baz/4.txt(0B, truncated), baz/7.txt(0B, SrcMissing)
984        assert_eq!(summary.dst_bytes, 3);
985        Ok(())
986    }
987
988    #[tokio::test]
989    #[traced_test]
990    async fn cmp_with_filter_excludes_files() -> Result<()> {
991        let tmp_dir = setup_test_dirs(true).await?;
992        // setup: src=foo, dst=bar (identical at this point)
993        // add a file to dst that would be reported as SrcMissing
994        tokio::fs::write(&tmp_dir.join("bar").join("extra.txt"), "extra").await?;
995        // without filter, should report extra.txt as SrcMissing
996        let compare_settings_no_filter = Settings {
997            fail_early: false,
998            exit_early: false,
999            expand_missing: false,
1000            compare: enum_map! {
1001                ObjType::File => filecmp::MetadataCmpSettings {
1002                    size: true,
1003                    mtime: true,
1004                    ..Default::default()
1005                },
1006                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1007                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1008                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1009            },
1010            filter: None,
1011        };
1012        let summary = cmp(
1013            &PROGRESS,
1014            &tmp_dir.join("foo"),
1015            &tmp_dir.join("bar"),
1016            &LogWriter::silent().await?,
1017            &compare_settings_no_filter,
1018        )
1019        .await?;
1020        assert_eq!(
1021            summary.mismatch[ObjType::File][CompareResult::SrcMissing],
1022            1
1023        );
1024        // with filter excluding extra.txt, should not report it
1025        let mut filter = crate::filter::FilterSettings::new();
1026        filter.add_exclude("extra.txt")?;
1027        let compare_settings_with_filter = Settings {
1028            fail_early: false,
1029            exit_early: false,
1030            expand_missing: false,
1031            compare: enum_map! {
1032                ObjType::File => filecmp::MetadataCmpSettings {
1033                    size: true,
1034                    mtime: true,
1035                    ..Default::default()
1036                },
1037                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1038                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1039                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1040            },
1041            filter: Some(filter),
1042        };
1043        let summary = cmp(
1044            &PROGRESS,
1045            &tmp_dir.join("foo"),
1046            &tmp_dir.join("bar"),
1047            &LogWriter::silent().await?,
1048            &compare_settings_with_filter,
1049        )
1050        .await?;
1051        assert_eq!(
1052            summary.mismatch[ObjType::File][CompareResult::SrcMissing],
1053            0
1054        );
1055        Ok(())
1056    }
1057
1058    #[tokio::test]
1059    #[traced_test]
1060    async fn cmp_with_include_only_compares_matching() -> Result<()> {
1061        let tmp_dir = setup_test_dirs(true).await?;
1062        // setup: src=foo, dst=bar (identical at this point)
1063        // modify a file that won't be included
1064        tokio::fs::write(&tmp_dir.join("bar").join("bar").join("1.txt"), "modified").await?;
1065        // with include pattern for only *.rs files, the .txt modification shouldn't appear
1066        let mut filter = crate::filter::FilterSettings::new();
1067        filter.add_include("*.rs")?;
1068        let compare_settings = Settings {
1069            fail_early: false,
1070            exit_early: false,
1071            expand_missing: false,
1072            compare: enum_map! {
1073                ObjType::File => filecmp::MetadataCmpSettings {
1074                    size: true,
1075                    mtime: true,
1076                    ..Default::default()
1077                },
1078                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1079                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1080                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1081            },
1082            filter: Some(filter),
1083        };
1084        let summary = cmp(
1085            &PROGRESS,
1086            &tmp_dir.join("foo"),
1087            &tmp_dir.join("bar"),
1088            &LogWriter::silent().await?,
1089            &compare_settings,
1090        )
1091        .await?;
1092        // no differences should be reported since all .txt files are excluded
1093        assert_eq!(summary.mismatch[ObjType::File][CompareResult::Different], 0);
1094        assert_eq!(summary.mismatch[ObjType::File][CompareResult::Same], 0);
1095        assert_eq!(
1096            summary.mismatch[ObjType::File][CompareResult::SrcMissing],
1097            0
1098        );
1099        assert_eq!(
1100            summary.mismatch[ObjType::File][CompareResult::DstMissing],
1101            0
1102        );
1103        Ok(())
1104    }
1105
1106    #[tokio::test]
1107    #[traced_test]
1108    async fn cmp_with_path_pattern_filters_nested() -> Result<()> {
1109        // test that path-based patterns like "bar/*.txt" work correctly when recursing
1110        // this verifies source_root tracking is working properly
1111        let tmp_dir = setup_test_dirs(true).await?;
1112        // test structure:
1113        // foo/bar/1.txt, foo/bar/2.txt, foo/bar/3.txt
1114        // foo/baz/4.txt, foo/baz/5.txt (symlink), foo/baz/6.txt (symlink)
1115        // filter: only include bar/*.txt
1116        let mut filter = crate::filter::FilterSettings::new();
1117        filter.add_include("bar/*.txt")?;
1118        let compare_settings = Settings {
1119            fail_early: false,
1120            exit_early: false,
1121            expand_missing: false,
1122            compare: enum_map! {
1123                ObjType::File => filecmp::MetadataCmpSettings {
1124                    size: true,
1125                    ..Default::default()
1126                },
1127                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1128                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1129                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1130            },
1131            filter: Some(filter),
1132        };
1133        let summary = cmp(
1134            &PROGRESS,
1135            &tmp_dir.join("foo"),
1136            &tmp_dir.join("bar"),
1137            &LogWriter::silent().await?,
1138            &compare_settings,
1139        )
1140        .await?;
1141        // should only compare files in bar/ subdirectory (3 files: 1.txt, 2.txt, 3.txt)
1142        // all should be "Same" since we copied foo to bar earlier
1143        assert_eq!(
1144            summary.mismatch[ObjType::File][CompareResult::Same],
1145            3,
1146            "should have 3 same files from bar/*.txt pattern"
1147        );
1148        // files in baz/ should not be compared (filtered out)
1149        // 0.txt at root should not be compared
1150        assert_eq!(summary.mismatch[ObjType::File][CompareResult::Different], 0);
1151        Ok(())
1152    }
1153
1154    #[tokio::test]
1155    #[traced_test]
1156    async fn cmp_filter_applies_to_root_file() -> Result<()> {
1157        // test that filters apply to the root item itself
1158        let tmp_dir = testutils::create_temp_dir().await?;
1159        // create two different files
1160        tokio::fs::write(tmp_dir.join("test.txt"), "content1").await?;
1161        tokio::fs::write(tmp_dir.join("test2.txt"), "content2").await?;
1162        // filter: only include *.rs files
1163        let mut filter = crate::filter::FilterSettings::new();
1164        filter.add_include("*.rs")?;
1165        let compare_settings = Settings {
1166            fail_early: false,
1167            exit_early: false,
1168            expand_missing: false,
1169            compare: enum_map! {
1170                ObjType::File => filecmp::MetadataCmpSettings {
1171                    size: true,
1172                    ..Default::default()
1173                },
1174                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1175                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1176                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1177            },
1178            filter: Some(filter),
1179        };
1180        // compare test.txt vs test2.txt - should be filtered out (not *.rs)
1181        let summary = cmp(
1182            &PROGRESS,
1183            &tmp_dir.join("test.txt"),
1184            &tmp_dir.join("test2.txt"),
1185            &LogWriter::silent().await?,
1186            &compare_settings,
1187        )
1188        .await?;
1189        // should return empty summary since root file is filtered
1190        assert_eq!(summary.mismatch[ObjType::File][CompareResult::Same], 0);
1191        assert_eq!(summary.mismatch[ObjType::File][CompareResult::Different], 0);
1192        Ok(())
1193    }
1194
1195    #[tokio::test]
1196    #[traced_test]
1197    async fn cmp_filter_excludes_root_directory() -> Result<()> {
1198        // test that filters apply to root directories
1199        let tmp_dir = testutils::setup_test_dir().await?;
1200        // filter: exclude directories named "foo"
1201        let mut filter = crate::filter::FilterSettings::new();
1202        filter.add_exclude("foo")?;
1203        let compare_settings = Settings {
1204            fail_early: false,
1205            exit_early: false,
1206            expand_missing: false,
1207            compare: enum_map! {
1208                ObjType::File => filecmp::MetadataCmpSettings::default(),
1209                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1210                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1211                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1212            },
1213            filter: Some(filter),
1214        };
1215        // compare foo vs bar - foo should be filtered out
1216        let summary = cmp(
1217            &PROGRESS,
1218            &tmp_dir.join("foo"),
1219            &tmp_dir.join("bar"),
1220            &LogWriter::silent().await?,
1221            &compare_settings,
1222        )
1223        .await?;
1224        // should return empty summary since root dir is excluded
1225        assert_eq!(summary.mismatch[ObjType::Dir][CompareResult::Same], 0);
1226        assert_eq!(summary.mismatch[ObjType::Dir][CompareResult::Different], 0);
1227        assert_eq!(summary.mismatch[ObjType::File][CompareResult::Same], 0);
1228        Ok(())
1229    }
1230
1231    #[tokio::test]
1232    #[traced_test]
1233    async fn cmp_combined_include_exclude_patterns() -> Result<()> {
1234        let tmp_dir = setup_test_dirs(true).await?;
1235        // include all .txt files, but exclude bar/2.txt specifically
1236        let mut filter = crate::filter::FilterSettings::new();
1237        filter.add_include("**/*.txt")?;
1238        filter.add_exclude("bar/2.txt")?;
1239        let compare_settings = Settings {
1240            fail_early: false,
1241            exit_early: false,
1242            expand_missing: false,
1243            compare: enum_map! {
1244                ObjType::File => filecmp::MetadataCmpSettings {
1245                    size: true,
1246                    ..Default::default()
1247                },
1248                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1249                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1250                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1251            },
1252            filter: Some(filter),
1253        };
1254        let summary = cmp(
1255            &PROGRESS,
1256            &tmp_dir.join("foo"),
1257            &tmp_dir.join("bar"),
1258            &LogWriter::silent().await?,
1259            &compare_settings,
1260        )
1261        .await?;
1262        // should compare: 0.txt, bar/1.txt, bar/3.txt, baz/4.txt = 4 files (same)
1263        // should skip: bar/2.txt (excluded by pattern), 5.txt and 6.txt (symlinks, no match for *.txt in src dir) = 1 file + 2 symlinks
1264        // note: the pattern **/*.txt only matches files with .txt extension, but 5.txt and 6.txt in baz are symlinks
1265        assert_eq!(
1266            summary.mismatch[ObjType::File][CompareResult::Same],
1267            4,
1268            "should compare 4 .txt files as same"
1269        );
1270        // bar/2.txt is skipped for both src and dst traversal = 2 skipped
1271        assert_eq!(
1272            summary.skipped[ObjType::File],
1273            2,
1274            "should skip 2 files (bar/2.txt on src and dst)"
1275        );
1276        Ok(())
1277    }
1278
1279    #[tokio::test]
1280    #[traced_test]
1281    async fn cmp_skipped_counts_comprehensive() -> Result<()> {
1282        let tmp_dir = setup_test_dirs(true).await?;
1283        // exclude bar/ directory entirely
1284        let mut filter = crate::filter::FilterSettings::new();
1285        filter.add_exclude("bar/")?;
1286        let compare_settings = Settings {
1287            fail_early: false,
1288            exit_early: false,
1289            expand_missing: false,
1290            compare: enum_map! {
1291                ObjType::File => filecmp::MetadataCmpSettings {
1292                    size: true,
1293                    ..Default::default()
1294                },
1295                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1296                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1297                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1298            },
1299            filter: Some(filter),
1300        };
1301        let summary = cmp(
1302            &PROGRESS,
1303            &tmp_dir.join("foo"),
1304            &tmp_dir.join("bar"),
1305            &LogWriter::silent().await?,
1306            &compare_settings,
1307        )
1308        .await?;
1309        // compared: 0.txt (same), baz/4.txt (same) = 2 files
1310        // compared: baz/5.txt symlink (same), baz/6.txt symlink (same) = 2 symlinks
1311        // skipped: bar directory in src and dst = 2 dirs (cmp traverses both)
1312        assert_eq!(
1313            summary.mismatch[ObjType::File][CompareResult::Same],
1314            2,
1315            "should compare 2 files as same"
1316        );
1317        assert_eq!(
1318            summary.mismatch[ObjType::Symlink][CompareResult::Same],
1319            2,
1320            "should compare 2 symlinks as same"
1321        );
1322        assert_eq!(
1323            summary.skipped[ObjType::Dir],
1324            2,
1325            "should skip 2 directories (bar in src + bar in dst)"
1326        );
1327        Ok(())
1328    }
1329
1330    #[tokio::test]
1331    #[traced_test]
1332    async fn expand_missing_dst_reports_all_entries() -> Result<()> {
1333        let tmp_dir = setup_test_dirs(true).await?;
1334        // remove bar/bar directory entirely from dst
1335        tokio::fs::remove_dir_all(&tmp_dir.join("bar").join("bar")).await?;
1336        let compare_settings = Settings {
1337            fail_early: false,
1338            exit_early: false,
1339            expand_missing: true,
1340            compare: enum_map! {
1341                ObjType::File => filecmp::MetadataCmpSettings::default(),
1342                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1343                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1344                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1345            },
1346            filter: None,
1347        };
1348        let summary = cmp(
1349            &PROGRESS,
1350            &tmp_dir.join("foo"),
1351            &tmp_dir.join("bar"),
1352            &LogWriter::silent().await?,
1353            &compare_settings,
1354        )
1355        .await?;
1356        // bar/bar dir has: bar/ (1 dir) + 1.txt, 2.txt, 3.txt (3 files)
1357        assert_eq!(
1358            summary.mismatch[ObjType::Dir][CompareResult::DstMissing],
1359            1,
1360            "should report 1 directory as DstMissing"
1361        );
1362        assert_eq!(
1363            summary.mismatch[ObjType::File][CompareResult::DstMissing],
1364            3,
1365            "should report 3 files as DstMissing"
1366        );
1367        Ok(())
1368    }
1369
1370    #[tokio::test]
1371    #[traced_test]
1372    async fn expand_missing_src_reports_all_entries() -> Result<()> {
1373        let tmp_dir = setup_test_dirs(true).await?;
1374        // create a new subdir in dst with files
1375        let newdir = tmp_dir.join("bar").join("newdir");
1376        tokio::fs::create_dir(&newdir).await?;
1377        tokio::fs::write(newdir.join("a.txt"), "a").await?;
1378        tokio::fs::write(newdir.join("b.txt"), "b").await?;
1379        let compare_settings = Settings {
1380            fail_early: false,
1381            exit_early: false,
1382            expand_missing: true,
1383            compare: enum_map! {
1384                ObjType::File => filecmp::MetadataCmpSettings::default(),
1385                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1386                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1387                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1388            },
1389            filter: None,
1390        };
1391        let summary = cmp(
1392            &PROGRESS,
1393            &tmp_dir.join("foo"),
1394            &tmp_dir.join("bar"),
1395            &LogWriter::silent().await?,
1396            &compare_settings,
1397        )
1398        .await?;
1399        assert_eq!(
1400            summary.mismatch[ObjType::Dir][CompareResult::SrcMissing],
1401            1,
1402            "should report 1 directory as SrcMissing"
1403        );
1404        assert_eq!(
1405            summary.mismatch[ObjType::File][CompareResult::SrcMissing],
1406            2,
1407            "should report 2 files as SrcMissing"
1408        );
1409        Ok(())
1410    }
1411
1412    #[tokio::test]
1413    #[traced_test]
1414    async fn expand_missing_dst_deeply_nested() -> Result<()> {
1415        // verify expansion recurses through multiple directory levels
1416        let tmp_dir = testutils::create_temp_dir().await?;
1417        let src = tmp_dir.join("src");
1418        let dst = tmp_dir.join("dst");
1419        tokio::fs::create_dir(&src).await?;
1420        tokio::fs::create_dir(&dst).await?;
1421        // create src/a/b/c/d.txt -- 3 dirs deep
1422        let deep = src.join("a").join("b").join("c");
1423        tokio::fs::create_dir_all(&deep).await?;
1424        tokio::fs::write(deep.join("d.txt"), "d").await?;
1425        // also add a sibling file at an intermediate level
1426        tokio::fs::write(src.join("a").join("b").join("mid.txt"), "m").await?;
1427        // dst exists but is empty -- everything in src is DstMissing
1428        let compare_settings = Settings {
1429            fail_early: false,
1430            exit_early: false,
1431            expand_missing: true,
1432            compare: enum_map! {
1433                ObjType::File => filecmp::MetadataCmpSettings::default(),
1434                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1435                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1436                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1437            },
1438            filter: None,
1439        };
1440        let summary = cmp(
1441            &PROGRESS,
1442            &src,
1443            &dst,
1444            &LogWriter::silent().await?,
1445            &compare_settings,
1446        )
1447        .await?;
1448        // dirs: a, a/b, a/b/c = 3 DstMissing dirs
1449        assert_eq!(
1450            summary.mismatch[ObjType::Dir][CompareResult::DstMissing],
1451            3,
1452            "should report 3 nested directories as DstMissing"
1453        );
1454        // files: a/b/c/d.txt, a/b/mid.txt = 2 DstMissing files
1455        assert_eq!(
1456            summary.mismatch[ObjType::File][CompareResult::DstMissing],
1457            2,
1458            "should report 2 files as DstMissing"
1459        );
1460        // src_bytes: d.txt(1) + mid.txt(1) = 2
1461        assert_eq!(
1462            summary.src_bytes, 2,
1463            "should track bytes for expanded files"
1464        );
1465        Ok(())
1466    }
1467
1468    #[tokio::test]
1469    #[traced_test]
1470    async fn expand_missing_src_deeply_nested() -> Result<()> {
1471        // verify expansion recurses for SrcMissing through multiple levels
1472        let tmp_dir = testutils::create_temp_dir().await?;
1473        let src = tmp_dir.join("src");
1474        let dst = tmp_dir.join("dst");
1475        tokio::fs::create_dir(&src).await?;
1476        tokio::fs::create_dir(&dst).await?;
1477        // create dst/x/y/z.txt -- dirs only in dst
1478        let deep = dst.join("x").join("y");
1479        tokio::fs::create_dir_all(&deep).await?;
1480        tokio::fs::write(deep.join("z.txt"), "zz").await?;
1481        let compare_settings = Settings {
1482            fail_early: false,
1483            exit_early: false,
1484            expand_missing: true,
1485            compare: enum_map! {
1486                ObjType::File => filecmp::MetadataCmpSettings::default(),
1487                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1488                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1489                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1490            },
1491            filter: None,
1492        };
1493        let summary = cmp(
1494            &PROGRESS,
1495            &src,
1496            &dst,
1497            &LogWriter::silent().await?,
1498            &compare_settings,
1499        )
1500        .await?;
1501        // dirs: x, x/y = 2 SrcMissing dirs
1502        assert_eq!(
1503            summary.mismatch[ObjType::Dir][CompareResult::SrcMissing],
1504            2,
1505            "should report 2 nested directories as SrcMissing"
1506        );
1507        // files: x/y/z.txt = 1 SrcMissing file
1508        assert_eq!(
1509            summary.mismatch[ObjType::File][CompareResult::SrcMissing],
1510            1,
1511            "should report 1 file as SrcMissing"
1512        );
1513        // dst_bytes: z.txt(2)
1514        assert_eq!(
1515            summary.dst_bytes, 2,
1516            "should track bytes for expanded files"
1517        );
1518        Ok(())
1519    }
1520
1521    #[tokio::test]
1522    #[traced_test]
1523    async fn expand_missing_with_exclude_filter() -> Result<()> {
1524        // verify that filters are applied during expansion. exclude *.log files
1525        // from the missing subtree
1526        let tmp_dir = testutils::create_temp_dir().await?;
1527        let src = tmp_dir.join("src");
1528        let dst = tmp_dir.join("dst");
1529        tokio::fs::create_dir(&src).await?;
1530        tokio::fs::create_dir(&dst).await?;
1531        // src/missing_dir/ has mixed files
1532        let missing = src.join("missing_dir");
1533        tokio::fs::create_dir(&missing).await?;
1534        tokio::fs::write(missing.join("keep.txt"), "k").await?;
1535        tokio::fs::write(missing.join("skip.log"), "s").await?;
1536        tokio::fs::write(missing.join("also_keep.txt"), "a").await?;
1537        let mut filter = crate::filter::FilterSettings::new();
1538        filter.add_exclude("*.log")?;
1539        let compare_settings = Settings {
1540            fail_early: false,
1541            exit_early: false,
1542            expand_missing: true,
1543            compare: enum_map! {
1544                ObjType::File => filecmp::MetadataCmpSettings::default(),
1545                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1546                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1547                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1548            },
1549            filter: Some(filter),
1550        };
1551        let summary = cmp(
1552            &PROGRESS,
1553            &src,
1554            &dst,
1555            &LogWriter::silent().await?,
1556            &compare_settings,
1557        )
1558        .await?;
1559        // missing_dir itself = 1 DstMissing dir
1560        assert_eq!(summary.mismatch[ObjType::Dir][CompareResult::DstMissing], 1,);
1561        // only keep.txt and also_keep.txt should be reported. skip.log is filtered
1562        assert_eq!(
1563            summary.mismatch[ObjType::File][CompareResult::DstMissing],
1564            2,
1565            "should report only non-excluded files as DstMissing"
1566        );
1567        // skip.log should be counted as skipped
1568        assert_eq!(
1569            summary.skipped[ObjType::File],
1570            1,
1571            "should count excluded file as skipped"
1572        );
1573        Ok(())
1574    }
1575
1576    #[tokio::test]
1577    #[traced_test]
1578    async fn expand_missing_with_include_filter() -> Result<()> {
1579        // verify that include filters restrict which children are reported during expansion
1580        let tmp_dir = testutils::create_temp_dir().await?;
1581        let src = tmp_dir.join("src");
1582        let dst = tmp_dir.join("dst");
1583        tokio::fs::create_dir(&src).await?;
1584        tokio::fs::create_dir(&dst).await?;
1585        // src/data/ has a mix of file types
1586        let data = src.join("data");
1587        tokio::fs::create_dir(&data).await?;
1588        tokio::fs::write(data.join("a.rs"), "fn main() {}").await?;
1589        tokio::fs::write(data.join("b.txt"), "hello").await?;
1590        tokio::fs::write(data.join("c.rs"), "fn test() {}").await?;
1591        let mut filter = crate::filter::FilterSettings::new();
1592        filter.add_include("**/*.rs")?;
1593        let compare_settings = Settings {
1594            fail_early: false,
1595            exit_early: false,
1596            expand_missing: true,
1597            compare: enum_map! {
1598                ObjType::File => filecmp::MetadataCmpSettings::default(),
1599                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1600                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1601                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1602            },
1603            filter: Some(filter),
1604        };
1605        let summary = cmp(
1606            &PROGRESS,
1607            &src,
1608            &dst,
1609            &LogWriter::silent().await?,
1610            &compare_settings,
1611        )
1612        .await?;
1613        // data dir = 1 DstMissing dir
1614        assert_eq!(summary.mismatch[ObjType::Dir][CompareResult::DstMissing], 1,);
1615        // only a.rs and c.rs should be reported. b.txt is filtered out
1616        assert_eq!(
1617            summary.mismatch[ObjType::File][CompareResult::DstMissing],
1618            2,
1619            "should report only included files as DstMissing"
1620        );
1621        Ok(())
1622    }
1623
1624    #[tokio::test]
1625    #[traced_test]
1626    async fn expand_missing_with_nested_path_filter() -> Result<()> {
1627        // verify path-based patterns work correctly during expansion.
1628        // only include files under a specific nested path
1629        let tmp_dir = testutils::create_temp_dir().await?;
1630        let src = tmp_dir.join("src");
1631        let dst = tmp_dir.join("dst");
1632        tokio::fs::create_dir(&src).await?;
1633        tokio::fs::create_dir(&dst).await?;
1634        // src/top/ has two subdirs: keep/ and skip/
1635        let top = src.join("top");
1636        let keep = top.join("keep");
1637        let skip = top.join("skip");
1638        tokio::fs::create_dir_all(&keep).await?;
1639        tokio::fs::create_dir_all(&skip).await?;
1640        tokio::fs::write(keep.join("1.txt"), "1").await?;
1641        tokio::fs::write(keep.join("2.txt"), "2").await?;
1642        tokio::fs::write(skip.join("3.txt"), "3").await?;
1643        let mut filter = crate::filter::FilterSettings::new();
1644        filter.add_include("top/keep/**")?;
1645        let compare_settings = Settings {
1646            fail_early: false,
1647            exit_early: false,
1648            expand_missing: true,
1649            compare: enum_map! {
1650                ObjType::File => filecmp::MetadataCmpSettings::default(),
1651                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1652                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1653                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1654            },
1655            filter: Some(filter),
1656        };
1657        let summary = cmp(
1658            &PROGRESS,
1659            &src,
1660            &dst,
1661            &LogWriter::silent().await?,
1662            &compare_settings,
1663        )
1664        .await?;
1665        // only keep/ subtree: keep dir(1) + top dir(1) = 2 dirs. skip dir is filtered
1666        assert_eq!(
1667            summary.mismatch[ObjType::Dir][CompareResult::DstMissing],
1668            2,
1669            "should report top and keep dirs as DstMissing"
1670        );
1671        // only 1.txt and 2.txt from keep/
1672        assert_eq!(
1673            summary.mismatch[ObjType::File][CompareResult::DstMissing],
1674            2,
1675            "should report only files under keep/ as DstMissing"
1676        );
1677        Ok(())
1678    }
1679
1680    #[tokio::test]
1681    #[traced_test]
1682    async fn expand_missing_false_preserves_original_behavior() -> Result<()> {
1683        let tmp_dir = setup_test_dirs(true).await?;
1684        // remove bar/bar directory entirely from dst
1685        tokio::fs::remove_dir_all(&tmp_dir.join("bar").join("bar")).await?;
1686        let compare_settings = Settings {
1687            fail_early: false,
1688            exit_early: false,
1689            expand_missing: false,
1690            compare: enum_map! {
1691                ObjType::File => filecmp::MetadataCmpSettings::default(),
1692                ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1693                ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1694                ObjType::Other => filecmp::MetadataCmpSettings::default(),
1695            },
1696            filter: None,
1697        };
1698        let summary = cmp(
1699            &PROGRESS,
1700            &tmp_dir.join("foo"),
1701            &tmp_dir.join("bar"),
1702            &LogWriter::silent().await?,
1703            &compare_settings,
1704        )
1705        .await?;
1706        // without expand_missing, only the top-level dir is reported
1707        assert_eq!(
1708            summary.mismatch[ObjType::Dir][CompareResult::DstMissing],
1709            1,
1710            "should report only 1 directory as DstMissing"
1711        );
1712        assert_eq!(
1713            summary.mismatch[ObjType::File][CompareResult::DstMissing],
1714            0,
1715            "should not report individual files as DstMissing"
1716        );
1717        Ok(())
1718    }
1719
1720    #[test]
1721    fn path_to_json_string_utf8() {
1722        let path = std::path::Path::new("/foo/bar/baz.txt");
1723        assert_eq!(path_to_json_string(path), "/foo/bar/baz.txt");
1724    }
1725
1726    #[test]
1727    fn path_to_json_string_non_utf8() {
1728        use std::ffi::OsStr;
1729        use std::os::unix::ffi::OsStrExt;
1730        // embed 0xFF byte in the middle
1731        let os_str = OsStr::from_bytes(b"/tmp/bad\xffname.txt");
1732        let path = std::path::Path::new(os_str);
1733        assert_eq!(path_to_json_string(path), "/tmp/bad\\xffname.txt");
1734    }
1735
1736    #[test]
1737    fn path_to_json_string_multiple_bad_bytes() {
1738        use std::ffi::OsStr;
1739        use std::os::unix::ffi::OsStrExt;
1740        let os_str = OsStr::from_bytes(b"\x80/ok/\xfe\xff/end");
1741        let path = std::path::Path::new(os_str);
1742        assert_eq!(path_to_json_string(path), "\\x80/ok/\\xfe\\xff/end");
1743    }
1744
1745    #[test]
1746    fn path_to_json_string_escapes_backslashes() {
1747        // a path with a literal backslash must be escaped so it doesn't
1748        // collide with \xHH byte escapes
1749        let path = std::path::Path::new("/tmp/bad\\xffname.txt");
1750        assert_eq!(path_to_json_string(path), "/tmp/bad\\\\xffname.txt");
1751    }
1752
1753    #[test]
1754    fn path_to_json_string_no_collision() {
1755        use std::ffi::OsStr;
1756        use std::os::unix::ffi::OsStrExt;
1757        // literal backslash-x-f-f in the filename
1758        let literal = std::path::Path::new("/tmp/bad\\xffname.txt");
1759        // actual 0xFF byte in the filename
1760        let raw = std::path::Path::new(OsStr::from_bytes(b"/tmp/bad\xffname.txt"));
1761        // these must produce different output
1762        assert_ne!(path_to_json_string(literal), path_to_json_string(raw));
1763    }
1764
1765    /// Stress tests exercising max-open-files saturation during cmp.
1766    mod max_open_files_tests {
1767        use super::*;
1768        use anyhow::Context;
1769
1770        /// deep + wide cmp: directory tree deeper than the open-files limit, with files
1771        /// at every level. verifies no deadlock occurs (directories don't consume permits).
1772        #[tokio::test]
1773        #[traced_test]
1774        async fn deep_tree_no_deadlock_under_open_files_saturation() -> Result<()> {
1775            let tmp_dir = testutils::create_temp_dir().await?;
1776            let src = tmp_dir.join("src");
1777            let dst = tmp_dir.join("dst");
1778            let depth = 20;
1779            let files_per_level = 5;
1780            let limit = 4;
1781            // create matching directory chains in src and dst, deeper than the permit limit
1782            let mut src_dir = src.clone();
1783            let mut dst_dir = dst.clone();
1784            for level in 0..depth {
1785                tokio::fs::create_dir_all(&src_dir).await?;
1786                tokio::fs::create_dir_all(&dst_dir).await?;
1787                for f in 0..files_per_level {
1788                    let name = format!("f{}_{}.txt", level, f);
1789                    let content = format!("L{}F{}", level, f);
1790                    tokio::fs::write(src_dir.join(&name), &content).await?;
1791                    tokio::fs::write(dst_dir.join(&name), &content).await?;
1792                }
1793                src_dir = src_dir.join(format!("d{}", level));
1794                dst_dir = dst_dir.join(format!("d{}", level));
1795            }
1796            throttle::set_max_open_files(limit);
1797            let compare_settings = Settings {
1798                fail_early: false,
1799                exit_early: false,
1800                expand_missing: false,
1801                compare: enum_map::enum_map! {
1802                    ObjType::File => filecmp::MetadataCmpSettings { size: true, ..Default::default() },
1803                    ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1804                    ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1805                    ObjType::Other => filecmp::MetadataCmpSettings::default(),
1806                },
1807                filter: None,
1808            };
1809            let summary = tokio::time::timeout(
1810                std::time::Duration::from_secs(30),
1811                cmp(
1812                    &PROGRESS,
1813                    &src,
1814                    &dst,
1815                    &LogWriter::silent().await?,
1816                    &compare_settings,
1817                ),
1818            )
1819            .await
1820            .context("cmp timed out — possible deadlock")?
1821            .context("cmp failed")?;
1822            assert_eq!(
1823                summary.mismatch[ObjType::File][CompareResult::Same],
1824                depth * files_per_level
1825            );
1826            Ok(())
1827        }
1828
1829        /// expand_missing under saturation: dst is empty, src is a deep tree.
1830        /// verifies expand_missing_tree's recursion bounds tasks under the permit cap.
1831        #[tokio::test]
1832        #[traced_test]
1833        async fn expand_missing_under_open_files_saturation() -> Result<()> {
1834            let tmp_dir = testutils::create_temp_dir().await?;
1835            let src = tmp_dir.join("src");
1836            let dst = tmp_dir.join("dst");
1837            let depth = 10;
1838            let files_per_level = 5;
1839            let limit = 4;
1840            // create a deep tree in src; dst stays empty
1841            let mut dir = src.clone();
1842            for level in 0..depth {
1843                tokio::fs::create_dir_all(&dir).await?;
1844                for f in 0..files_per_level {
1845                    tokio::fs::write(dir.join(format!("f{}_{}.txt", level, f)), "x").await?;
1846                }
1847                dir = dir.join(format!("d{}", level));
1848            }
1849            tokio::fs::create_dir(&dst).await?;
1850            throttle::set_max_open_files(limit);
1851            let compare_settings = Settings {
1852                fail_early: false,
1853                exit_early: false,
1854                expand_missing: true,
1855                compare: enum_map::enum_map! {
1856                    ObjType::File => filecmp::MetadataCmpSettings::default(),
1857                    ObjType::Dir => filecmp::MetadataCmpSettings::default(),
1858                    ObjType::Symlink => filecmp::MetadataCmpSettings::default(),
1859                    ObjType::Other => filecmp::MetadataCmpSettings::default(),
1860                },
1861                filter: None,
1862            };
1863            let summary = tokio::time::timeout(
1864                std::time::Duration::from_secs(30),
1865                cmp(
1866                    &PROGRESS,
1867                    &src,
1868                    &dst,
1869                    &LogWriter::silent().await?,
1870                    &compare_settings,
1871                ),
1872            )
1873            .await
1874            .context("cmp timed out — possible deadlock")?
1875            .context("cmp failed")?;
1876            // every file under src is missing on dst
1877            assert_eq!(
1878                summary.mismatch[ObjType::File][CompareResult::DstMissing],
1879                depth * files_per_level
1880            );
1881            Ok(())
1882        }
1883    }
1884}