common/
cmp.rs

1use anyhow::{anyhow, Context, Result};
2use async_recursion::async_recursion;
3use enum_map::{Enum, EnumMap};
4use tokio::io::AsyncWriteExt;
5use tracing::instrument;
6
7use crate::copy::is_file_type_same;
8use crate::filecmp;
9use crate::progress;
10
11#[derive(Copy, Clone, Debug, Enum)]
12pub enum CompareResult {
13    Same,
14    Different,
15    SrcMissing, // object missing in src but present in dst
16    DstMissing, // same as above but flipped
17}
18
19#[derive(Copy, Clone, Debug, Enum)]
20pub enum ObjType {
21    File,
22    Dir,
23    Symlink,
24    Other, // sockets, block devices, character devices, FIFOs, etc.
25}
26
27pub type ObjSettings = EnumMap<ObjType, filecmp::MetadataCmpSettings>;
28
29#[derive(Debug, Copy, Clone)]
30pub struct Settings {
31    pub compare: ObjSettings,
32    pub fail_early: bool,
33    pub exit_early: bool,
34}
35
36pub type Mismatch = EnumMap<ObjType, EnumMap<CompareResult, u64>>;
37
38#[derive(Default)]
39pub struct Summary {
40    pub mismatch: Mismatch,
41}
42
43impl std::ops::Add for Summary {
44    type Output = Self;
45    fn add(self, other: Self) -> Self {
46        let mut mismatch = self.mismatch;
47        for (obj_type, &cmp_res_map) in &other.mismatch {
48            for (cmp_res, &count) in &cmp_res_map {
49                mismatch[obj_type][cmp_res] += count;
50            }
51        }
52        Self { mismatch }
53    }
54}
55
56impl std::fmt::Display for Summary {
57    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
58        for (obj_type, &cmp_res_map) in &self.mismatch {
59            for (cmp_res, &count) in &cmp_res_map {
60                writeln!(f, "{obj_type:?} {cmp_res:?}: {count}")?;
61            }
62        }
63        Ok(())
64    }
65}
66
67#[derive(Clone)]
68pub struct LogWriter {
69    file: Option<std::sync::Arc<tokio::sync::Mutex<tokio::io::BufWriter<tokio::fs::File>>>>,
70    stdout: Option<std::sync::Arc<tokio::sync::Mutex<tokio::io::BufWriter<tokio::io::Stdout>>>>,
71}
72
73impl std::fmt::Debug for LogWriter {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("LogWriter")
76            .field("file", &self.file.is_some())
77            .field("stdout", &self.stdout.is_some())
78            .finish()
79    }
80}
81
82impl LogWriter {
83    /// Creates a new LogWriter.
84    ///
85    /// If `log_path_opt` is provided, output goes to that file.
86    /// Otherwise, if `use_stdout` is true, output goes to stdout.
87    /// If both are false/None, no output is produced.
88    pub async fn new(log_path_opt: Option<&std::path::Path>, use_stdout: bool) -> Result<Self> {
89        if let Some(log_path) = log_path_opt {
90            let log_file = tokio::fs::OpenOptions::new()
91                .write(true)
92                .create_new(true)
93                .open(log_path)
94                .await
95                .with_context(|| format!("Failed to open log file: {log_path:?}"))?;
96            let log =
97                std::sync::Arc::new(tokio::sync::Mutex::new(tokio::io::BufWriter::new(log_file)));
98            Ok(Self {
99                file: Some(log),
100                stdout: None,
101            })
102        } else if use_stdout {
103            Ok(Self {
104                file: None,
105                stdout: Some(std::sync::Arc::new(tokio::sync::Mutex::new(
106                    tokio::io::BufWriter::new(tokio::io::stdout()),
107                ))),
108            })
109        } else {
110            Ok(Self {
111                file: None,
112                stdout: None,
113            })
114        }
115    }
116
117    pub async fn log_mismatch(
118        &self,
119        cmp_result: CompareResult,
120        src_obj_type: Option<ObjType>,
121        src: &std::path::Path,
122        dst_obj_type: Option<ObjType>,
123        dst: &std::path::Path,
124    ) -> Result<()> {
125        self.write(&format!(
126            "[{cmp_result:?}]\n\t[{src_obj_type:?}]\t{src:?}\n\t[{dst_obj_type:?}]\t{dst:?}\n"
127        ))
128        .await
129    }
130
131    async fn write(&self, msg: &str) -> Result<()> {
132        if let Some(log) = &self.file {
133            let mut log = log.lock().await;
134            log.write_all(msg.as_bytes())
135                .await
136                .context("Failed to write to log file")?;
137        }
138        if let Some(stdout) = &self.stdout {
139            let mut stdout = stdout.lock().await;
140            stdout
141                .write_all(msg.as_bytes())
142                .await
143                .context("Failed to write to stdout")?;
144        }
145        Ok(())
146    }
147
148    pub async fn flush(&self) -> Result<()> {
149        if let Some(log) = &self.file {
150            let mut log = log.lock().await;
151            log.flush().await.context("Failed to flush log file")?;
152        }
153        if let Some(stdout) = &self.stdout {
154            let mut stdout = stdout.lock().await;
155            stdout.flush().await.context("Failed to flush stdout")?;
156        }
157        Ok(())
158    }
159}
160
161fn obj_type(metadata: &std::fs::Metadata) -> ObjType {
162    if metadata.is_file() {
163        ObjType::File
164    } else if metadata.is_dir() {
165        ObjType::Dir
166    } else if metadata.is_symlink() {
167        ObjType::Symlink
168    } else {
169        // sockets, block devices, character devices, FIFOs, etc.
170        ObjType::Other
171    }
172}
173
174#[instrument(skip(prog_track))]
175#[async_recursion]
176pub async fn cmp(
177    prog_track: &'static progress::Progress,
178    src: &std::path::Path,
179    dst: &std::path::Path,
180    log: &LogWriter,
181    settings: &Settings,
182) -> Result<Summary> {
183    let _prog_guard = prog_track.ops.guard();
184    tracing::debug!("reading source metadata");
185    // it is impossible for src not exist other than user passing invalid path (which is an error)
186    let src_metadata = tokio::fs::symlink_metadata(src)
187        .await
188        .with_context(|| format!("failed reading metadata from {:?}", &src))?;
189    let mut cmp_summary = Summary::default();
190    let src_obj_type = obj_type(&src_metadata);
191    let dst_metadata = {
192        match tokio::fs::symlink_metadata(dst).await {
193            Ok(metadata) => metadata,
194            Err(err) => {
195                if err.kind() == std::io::ErrorKind::NotFound {
196                    cmp_summary.mismatch[src_obj_type][CompareResult::DstMissing] += 1;
197                    log.log_mismatch(
198                        CompareResult::DstMissing,
199                        Some(src_obj_type),
200                        src,
201                        None,
202                        dst,
203                    )
204                    .await?;
205                    return Ok(cmp_summary);
206                }
207                return Err(err).context(format!("failed reading metadata from {:?}", &dst));
208            }
209        }
210    };
211    if !is_file_type_same(&src_metadata, &dst_metadata)
212        || !filecmp::metadata_equal(
213            &settings.compare[src_obj_type],
214            &src_metadata,
215            &dst_metadata,
216        )
217    {
218        // we use the src type for the summary attribution
219        cmp_summary.mismatch[src_obj_type][CompareResult::Different] += 1;
220        let dst_obj_type = obj_type(&dst_metadata);
221        log.log_mismatch(
222            CompareResult::Different,
223            Some(src_obj_type),
224            src,
225            Some(dst_obj_type),
226            dst,
227        )
228        .await?;
229        if settings.exit_early {
230            return Ok(cmp_summary);
231        }
232    } else {
233        cmp_summary.mismatch[src_obj_type][CompareResult::Same] += 1;
234    }
235    if !src_metadata.is_dir() || !dst_metadata.is_dir() {
236        // nothing more to do
237        return Ok(cmp_summary);
238    }
239    tracing::debug!("process contents of 'src' directory");
240    let mut src_entries = tokio::fs::read_dir(src)
241        .await
242        .with_context(|| format!("cannot open directory {src:?} for reading"))?;
243    let mut join_set = tokio::task::JoinSet::new();
244    let mut success = true;
245    // create a set of all the files we already processed
246    let mut processed_files = std::collections::HashSet::new();
247    // iterate through src entries and recursively call "cmp" on each one
248    while let Some(src_entry) = src_entries
249        .next_entry()
250        .await
251        .with_context(|| format!("failed traversing directory {:?}", &src))?
252    {
253        // it's better to await the token here so that we throttle the syscalls generated by the
254        // DirEntry call. the ops-throttle will never cause a deadlock (unlike max-open-files limit)
255        // so it's safe to do here.
256        throttle::get_ops_token().await;
257        let entry_path = src_entry.path();
258        let entry_name = entry_path.file_name().unwrap();
259        processed_files.insert(entry_name.to_owned());
260        let dst_path = dst.join(entry_name);
261        let log = log.clone();
262        let settings = *settings;
263        let do_cmp =
264            || async move { cmp(prog_track, &entry_path, &dst_path, &log, &settings).await };
265        join_set.spawn(do_cmp());
266    }
267    // unfortunately ReadDir is opening file-descriptors and there's not a good way to limit this,
268    // one thing we CAN do however is to drop it as soon as we're done with it
269    drop(src_entries);
270    tracing::debug!("process contents of 'dst' directory");
271    let mut dst_entries = tokio::fs::read_dir(dst)
272        .await
273        .with_context(|| format!("cannot open directory {:?} for reading", &dst))?;
274    // iterate through update entries and log each one that's not present in src
275    while let Some(dst_entry) = dst_entries
276        .next_entry()
277        .await
278        .with_context(|| format!("failed traversing directory {:?}", &dst))?
279    {
280        let entry_path = dst_entry.path();
281        let entry_name = entry_path.file_name().unwrap();
282        if processed_files.contains(entry_name) {
283            // we already must have considered this file, skip it
284            continue;
285        }
286        tracing::debug!("found a new entry in the 'dst' directory");
287        let dst_path = dst.join(entry_name);
288        let dst_entry_metadata = tokio::fs::symlink_metadata(&dst_path)
289            .await
290            .with_context(|| format!("failed reading metadata from {:?}", &dst_path))?;
291        let dst_obj_type = obj_type(&dst_entry_metadata);
292        cmp_summary.mismatch[dst_obj_type][CompareResult::SrcMissing] += 1;
293        log.log_mismatch(
294            CompareResult::SrcMissing,
295            None,
296            &src.join(entry_name),
297            Some(dst_obj_type),
298            &dst_path,
299        )
300        .await?;
301    }
302    // unfortunately ReadDir is opening file-descriptors and there's not a good way to limit this,
303    // one thing we CAN do however is to drop it as soon as we're done with it
304    drop(dst_entries);
305    while let Some(res) = join_set.join_next().await {
306        match res? {
307            Ok(summary) => cmp_summary = cmp_summary + summary,
308            Err(error) => {
309                tracing::error!("cmp: {:?} vs {:?} failed with: {:#}", src, dst, &error);
310                if settings.fail_early {
311                    return Err(error);
312                }
313                success = false;
314            }
315        }
316    }
317    if !success {
318        return Err(anyhow!("cmp: {:?} vs {:?} failed!", src, dst));
319    }
320    Ok(cmp_summary)
321}
322
323#[cfg(test)]
324mod cmp_tests {
325    use crate::copy;
326    use crate::preserve;
327    use crate::testutils;
328    use enum_map::enum_map;
329    use tracing_test::traced_test;
330
331    use super::*;
332
333    lazy_static! {
334        static ref PROGRESS: progress::Progress = progress::Progress::new();
335        static ref NO_PRESERVE_SETTINGS: preserve::Settings = preserve::preserve_default();
336        static ref DO_PRESERVE_SETTINGS: preserve::Settings = preserve::preserve_all();
337    }
338
339    async fn setup_test_dirs(preserve: bool) -> Result<std::path::PathBuf> {
340        let tmp_dir = testutils::setup_test_dir().await?;
341        let test_path = tmp_dir.as_path();
342        copy::copy(
343            &PROGRESS,
344            &test_path.join("foo"),
345            &test_path.join("bar"),
346            &copy::Settings {
347                dereference: false,
348                fail_early: false,
349                overwrite: false,
350                overwrite_compare: filecmp::MetadataCmpSettings {
351                    size: true,
352                    mtime: true,
353                    ..Default::default()
354                },
355                chunk_size: 0,
356                remote_copy_buffer_size: 0,
357            },
358            if preserve {
359                &DO_PRESERVE_SETTINGS
360            } else {
361                &NO_PRESERVE_SETTINGS
362            },
363            false,
364        )
365        .await?;
366        Ok(tmp_dir)
367    }
368
369    async fn truncate_file(path: &str) -> Result<()> {
370        let file = tokio::fs::File::create(path).await?;
371        file.set_len(0).await?;
372        Ok(())
373    }
374
375    #[tokio::test]
376    #[traced_test]
377    async fn check_basic_cmp() -> Result<()> {
378        let tmp_dir = setup_test_dirs(true).await?;
379        // drop 1 file from src
380        tokio::fs::remove_file(&tmp_dir.join("foo").join("bar").join("1.txt")).await?;
381        // sleep to ensure mtime is different, this acts as a poor-mans barrier
382        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
383        // modify 1 file in dst
384        truncate_file(
385            tmp_dir
386                .join("bar")
387                .join("baz")
388                .join("4.txt")
389                .to_str()
390                .unwrap(),
391        )
392        .await?;
393        // drop 1 (other) file from dst
394        tokio::fs::remove_file(&tmp_dir.join("bar").join("bar").join("2.txt")).await?;
395        // create one more file in dst -- this will also modify the mtime of the directory
396        tokio::fs::File::create(&tmp_dir.join("bar").join("baz").join("7.txt")).await?;
397        let compare_settings = Settings {
398            fail_early: false,
399            exit_early: false,
400            compare: enum_map! {
401                ObjType::File => filecmp::MetadataCmpSettings {
402                    size: true,
403                    mtime: true,
404                    ..Default::default()
405                },
406                ObjType::Dir => filecmp::MetadataCmpSettings {
407                    mtime: true,
408                    ..Default::default()
409                },
410                ObjType::Symlink => filecmp::MetadataCmpSettings {
411                    mtime: true,
412                    ..Default::default()
413                },
414                ObjType::Other => filecmp::MetadataCmpSettings {
415                    mtime: true,
416                    ..Default::default()
417                },
418            },
419        };
420        let summary = cmp(
421            &PROGRESS,
422            &tmp_dir.join("foo"),
423            &tmp_dir.join("bar"),
424            &LogWriter::new(Some(tmp_dir.join("cmp.log").as_path()), false).await?,
425            &compare_settings,
426        )
427        .await?;
428        let mismatch: Mismatch = enum_map! {
429            ObjType::File => enum_map! {
430                CompareResult::Different => 1,
431                CompareResult::Same => 2,
432                CompareResult::SrcMissing => 2,
433                CompareResult::DstMissing => 1,
434            },
435            ObjType::Dir => enum_map! {
436                CompareResult::Different => 2,
437                CompareResult::Same => 1,
438                CompareResult::SrcMissing => 0,
439                CompareResult::DstMissing => 0,
440            },
441            ObjType::Symlink => enum_map! {
442                CompareResult::Different => 0,
443                CompareResult::Same => 2,
444                CompareResult::SrcMissing => 0,
445                CompareResult::DstMissing => 0,
446            },
447            ObjType::Other => enum_map! {
448                CompareResult::Different => 0,
449                CompareResult::Same => 0,
450                CompareResult::SrcMissing => 0,
451                CompareResult::DstMissing => 0,
452            },
453        };
454        assert_eq!(summary.mismatch, mismatch);
455        Ok(())
456    }
457}