common/
cmp.rs

1use anyhow::{anyhow, Context, Result};
2use async_recursion::async_recursion;
3use enum_map::{Enum, EnumMap};
4use tokio::io::AsyncWriteExt;
5use tracing::instrument;
6
7use crate::copy::is_file_type_same;
8use crate::filecmp;
9use crate::progress;
10
11#[derive(Copy, Clone, Debug, Enum)]
12pub enum CompareResult {
13    Same,
14    Different,
15    SrcMissing, // object missing in src but present in dst
16    DstMissing, // same as above but flipped
17}
18
19#[derive(Copy, Clone, Debug, Enum)]
20pub enum ObjType {
21    File,
22    Dir,
23    Symlink,
24    Other, // sockets, block devices, character devices, FIFOs, etc.
25}
26
27pub type ObjSettings = EnumMap<ObjType, filecmp::MetadataCmpSettings>;
28
29#[derive(Debug, Copy, Clone)]
30pub struct Settings {
31    pub compare: ObjSettings,
32    pub fail_early: bool,
33    pub exit_early: bool,
34}
35
36pub type Mismatch = EnumMap<ObjType, EnumMap<CompareResult, u64>>;
37
38#[derive(Default)]
39pub struct Summary {
40    pub mismatch: Mismatch,
41}
42
43impl std::ops::Add for Summary {
44    type Output = Self;
45    fn add(self, other: Self) -> Self {
46        let mut mismatch = self.mismatch;
47        for (obj_type, &cmp_res_map) in &other.mismatch {
48            for (cmp_res, &count) in &cmp_res_map {
49                mismatch[obj_type][cmp_res] += count;
50            }
51        }
52        Self { mismatch }
53    }
54}
55
56impl std::fmt::Display for Summary {
57    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
58        for (obj_type, &cmp_res_map) in &self.mismatch {
59            for (cmp_res, &count) in &cmp_res_map {
60                writeln!(f, "{obj_type:?} {cmp_res:?}: {count}")?;
61            }
62        }
63        Ok(())
64    }
65}
66
67#[derive(Debug, Clone)]
68pub struct LogWriter {
69    log_opt: Option<std::sync::Arc<tokio::sync::Mutex<tokio::io::BufWriter<tokio::fs::File>>>>,
70}
71
72impl LogWriter {
73    pub async fn new(log_path_opt: Option<&std::path::Path>) -> Result<Self> {
74        if let Some(log_path) = log_path_opt {
75            let log_file = tokio::fs::OpenOptions::new()
76                .write(true)
77                .create_new(true)
78                .open(log_path)
79                .await
80                .with_context(|| format!("Failed to open log file: {log_path:?}"))?;
81            let log =
82                std::sync::Arc::new(tokio::sync::Mutex::new(tokio::io::BufWriter::new(log_file)));
83            Ok(Self { log_opt: Some(log) })
84        } else {
85            Ok(Self { log_opt: None })
86        }
87    }
88
89    pub async fn log_mismatch(
90        &self,
91        cmp_result: CompareResult,
92        src_obj_type: Option<ObjType>,
93        src: &std::path::Path,
94        dst_obj_type: Option<ObjType>,
95        dst: &std::path::Path,
96    ) -> Result<()> {
97        self.write(&format!(
98            "[{cmp_result:?}]\n\t[{src_obj_type:?}]\t{src:?}\n\t[{dst_obj_type:?}]\t{dst:?}\n"
99        ))
100        .await
101    }
102
103    async fn write(&self, msg: &str) -> Result<()> {
104        if let Some(log) = &self.log_opt {
105            let mut log = log.lock().await;
106            log.write_all(msg.as_bytes())
107                .await
108                .context("Failed to write to log file")?;
109        }
110        Ok(())
111    }
112
113    pub async fn flush(&self) -> Result<()> {
114        if let Some(log) = &self.log_opt {
115            let mut log = log.lock().await;
116            log.flush().await.context("Failed to flush log file")?;
117        }
118        Ok(())
119    }
120}
121
122fn obj_type(metadata: &std::fs::Metadata) -> ObjType {
123    if metadata.is_file() {
124        ObjType::File
125    } else if metadata.is_dir() {
126        ObjType::Dir
127    } else if metadata.is_symlink() {
128        ObjType::Symlink
129    } else {
130        // sockets, block devices, character devices, FIFOs, etc.
131        ObjType::Other
132    }
133}
134
135#[instrument(skip(prog_track))]
136#[async_recursion]
137pub async fn cmp(
138    prog_track: &'static progress::Progress,
139    src: &std::path::Path,
140    dst: &std::path::Path,
141    log: &LogWriter,
142    settings: &Settings,
143) -> Result<Summary> {
144    let _prog_guard = prog_track.ops.guard();
145    tracing::debug!("reading source metadata");
146    // it is impossible for src not exist other than user passing invalid path (which is an error)
147    let src_metadata = tokio::fs::symlink_metadata(src)
148        .await
149        .with_context(|| format!("failed reading metadata from {:?}", &src))?;
150    let mut cmp_summary = Summary::default();
151    let src_obj_type = obj_type(&src_metadata);
152    let dst_metadata = {
153        match tokio::fs::symlink_metadata(dst).await {
154            Ok(metadata) => metadata,
155            Err(err) => {
156                if err.kind() == std::io::ErrorKind::NotFound {
157                    cmp_summary.mismatch[src_obj_type][CompareResult::DstMissing] += 1;
158                    log.log_mismatch(
159                        CompareResult::DstMissing,
160                        Some(src_obj_type),
161                        src,
162                        None,
163                        dst,
164                    )
165                    .await?;
166                    return Ok(cmp_summary);
167                }
168                return Err(err).context(format!("failed reading metadata from {:?}", &dst));
169            }
170        }
171    };
172    if !is_file_type_same(&src_metadata, &dst_metadata)
173        || !filecmp::metadata_equal(
174            &settings.compare[src_obj_type],
175            &src_metadata,
176            &dst_metadata,
177        )
178    {
179        // we use the src type for the summary attribution
180        cmp_summary.mismatch[src_obj_type][CompareResult::Different] += 1;
181        let dst_obj_type = obj_type(&dst_metadata);
182        log.log_mismatch(
183            CompareResult::Different,
184            Some(src_obj_type),
185            src,
186            Some(dst_obj_type),
187            dst,
188        )
189        .await?;
190        if settings.exit_early {
191            return Ok(cmp_summary);
192        }
193    } else {
194        cmp_summary.mismatch[src_obj_type][CompareResult::Same] += 1;
195    }
196    if !src_metadata.is_dir() || !dst_metadata.is_dir() {
197        // nothing more to do
198        return Ok(cmp_summary);
199    }
200    tracing::debug!("process contents of 'src' directory");
201    let mut src_entries = tokio::fs::read_dir(src)
202        .await
203        .with_context(|| format!("cannot open directory {src:?} for reading"))?;
204    let mut join_set = tokio::task::JoinSet::new();
205    let mut success = true;
206    // create a set of all the files we already processed
207    let mut processed_files = std::collections::HashSet::new();
208    // iterate through src entries and recursively call "cmp" on each one
209    while let Some(src_entry) = src_entries
210        .next_entry()
211        .await
212        .with_context(|| format!("failed traversing directory {:?}", &src))?
213    {
214        // it's better to await the token here so that we throttle the syscalls generated by the
215        // DirEntry call. the ops-throttle will never cause a deadlock (unlike max-open-files limit)
216        // so it's safe to do here.
217        throttle::get_ops_token().await;
218        let entry_path = src_entry.path();
219        let entry_name = entry_path.file_name().unwrap();
220        processed_files.insert(entry_name.to_owned());
221        let dst_path = dst.join(entry_name);
222        let log = log.clone();
223        let settings = *settings;
224        let do_cmp =
225            || async move { cmp(prog_track, &entry_path, &dst_path, &log, &settings).await };
226        join_set.spawn(do_cmp());
227    }
228    // unfortunately ReadDir is opening file-descriptors and there's not a good way to limit this,
229    // one thing we CAN do however is to drop it as soon as we're done with it
230    drop(src_entries);
231    tracing::debug!("process contents of 'dst' directory");
232    let mut dst_entries = tokio::fs::read_dir(dst)
233        .await
234        .with_context(|| format!("cannot open directory {:?} for reading", &dst))?;
235    // iterate through update entries and log each one that's not present in src
236    while let Some(dst_entry) = dst_entries
237        .next_entry()
238        .await
239        .with_context(|| format!("failed traversing directory {:?}", &dst))?
240    {
241        let entry_path = dst_entry.path();
242        let entry_name = entry_path.file_name().unwrap();
243        if processed_files.contains(entry_name) {
244            // we already must have considered this file, skip it
245            continue;
246        }
247        tracing::debug!("found a new entry in the 'dst' directory");
248        let dst_path = dst.join(entry_name);
249        let dst_entry_metadata = tokio::fs::symlink_metadata(&dst_path)
250            .await
251            .with_context(|| format!("failed reading metadata from {:?}", &dst_path))?;
252        let dst_obj_type = obj_type(&dst_entry_metadata);
253        cmp_summary.mismatch[dst_obj_type][CompareResult::SrcMissing] += 1;
254        log.log_mismatch(
255            CompareResult::SrcMissing,
256            None,
257            &src.join(entry_name),
258            Some(dst_obj_type),
259            &dst_path,
260        )
261        .await?;
262    }
263    // unfortunately ReadDir is opening file-descriptors and there's not a good way to limit this,
264    // one thing we CAN do however is to drop it as soon as we're done with it
265    drop(dst_entries);
266    while let Some(res) = join_set.join_next().await {
267        match res? {
268            Ok(summary) => cmp_summary = cmp_summary + summary,
269            Err(error) => {
270                tracing::error!("cmp: {:?} vs {:?} failed with: {}", src, dst, &error);
271                if settings.fail_early {
272                    return Err(error);
273                }
274                success = false;
275            }
276        }
277    }
278    if !success {
279        return Err(anyhow!("cmp: {:?} vs {:?} failed!", src, dst));
280    }
281    Ok(cmp_summary)
282}
283
284#[cfg(test)]
285mod cmp_tests {
286    use crate::copy;
287    use crate::preserve;
288    use crate::testutils;
289    use enum_map::enum_map;
290    use tracing_test::traced_test;
291
292    use super::*;
293
294    lazy_static! {
295        static ref PROGRESS: progress::Progress = progress::Progress::new();
296        static ref NO_PRESERVE_SETTINGS: preserve::Settings = preserve::preserve_default();
297        static ref DO_PRESERVE_SETTINGS: preserve::Settings = preserve::preserve_all();
298    }
299
300    async fn setup_test_dirs(preserve: bool) -> Result<std::path::PathBuf> {
301        let tmp_dir = testutils::setup_test_dir().await?;
302        let test_path = tmp_dir.as_path();
303        copy::copy(
304            &PROGRESS,
305            &test_path.join("foo"),
306            &test_path.join("bar"),
307            &copy::Settings {
308                dereference: false,
309                fail_early: false,
310                overwrite: false,
311                overwrite_compare: filecmp::MetadataCmpSettings {
312                    size: true,
313                    mtime: true,
314                    ..Default::default()
315                },
316                chunk_size: 0,
317            },
318            if preserve {
319                &DO_PRESERVE_SETTINGS
320            } else {
321                &NO_PRESERVE_SETTINGS
322            },
323            false,
324        )
325        .await?;
326        Ok(tmp_dir)
327    }
328
329    async fn truncate_file(path: &str) -> Result<()> {
330        let file = tokio::fs::File::create(path).await?;
331        file.set_len(0).await?;
332        Ok(())
333    }
334
335    #[tokio::test]
336    #[traced_test]
337    async fn check_basic_cmp() -> Result<()> {
338        let tmp_dir = setup_test_dirs(true).await?;
339        // drop 1 file from src
340        tokio::fs::remove_file(&tmp_dir.join("foo").join("bar").join("1.txt")).await?;
341        // sleep to ensure mtime is different, this acts as a poor-mans barrier
342        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
343        // modify 1 file in dst
344        truncate_file(
345            tmp_dir
346                .join("bar")
347                .join("baz")
348                .join("4.txt")
349                .to_str()
350                .unwrap(),
351        )
352        .await?;
353        // drop 1 (other) file from dst
354        tokio::fs::remove_file(&tmp_dir.join("bar").join("bar").join("2.txt")).await?;
355        // create one more file in dst -- this will also modify the mtime of the directory
356        tokio::fs::File::create(&tmp_dir.join("bar").join("baz").join("7.txt")).await?;
357        let compare_settings = Settings {
358            fail_early: false,
359            exit_early: false,
360            compare: enum_map! {
361                ObjType::File => filecmp::MetadataCmpSettings {
362                    size: true,
363                    mtime: true,
364                    ..Default::default()
365                },
366                ObjType::Dir => filecmp::MetadataCmpSettings {
367                    mtime: true,
368                    ..Default::default()
369                },
370                ObjType::Symlink => filecmp::MetadataCmpSettings {
371                    mtime: true,
372                    ..Default::default()
373                },
374                ObjType::Other => filecmp::MetadataCmpSettings {
375                    mtime: true,
376                    ..Default::default()
377                },
378            },
379        };
380        let summary = cmp(
381            &PROGRESS,
382            &tmp_dir.join("foo"),
383            &tmp_dir.join("bar"),
384            &LogWriter::new(Some(tmp_dir.join("cmp.log").as_path())).await?,
385            &compare_settings,
386        )
387        .await?;
388        let mismatch: Mismatch = enum_map! {
389            ObjType::File => enum_map! {
390                CompareResult::Different => 1,
391                CompareResult::Same => 2,
392                CompareResult::SrcMissing => 2,
393                CompareResult::DstMissing => 1,
394            },
395            ObjType::Dir => enum_map! {
396                CompareResult::Different => 2,
397                CompareResult::Same => 1,
398                CompareResult::SrcMissing => 0,
399                CompareResult::DstMissing => 0,
400            },
401            ObjType::Symlink => enum_map! {
402                CompareResult::Different => 0,
403                CompareResult::Same => 2,
404                CompareResult::SrcMissing => 0,
405                CompareResult::DstMissing => 0,
406            },
407            ObjType::Other => enum_map! {
408                CompareResult::Different => 0,
409                CompareResult::Same => 0,
410                CompareResult::SrcMissing => 0,
411                CompareResult::DstMissing => 0,
412            },
413        };
414        assert_eq!(summary.mismatch, mismatch);
415        Ok(())
416    }
417}