common/
cmp.rs

1use anyhow::{anyhow, Context, Result};
2use async_recursion::async_recursion;
3use enum_map::{Enum, EnumMap};
4use tokio::io::AsyncWriteExt;
5use tracing::instrument;
6
7use crate::copy::is_file_type_same;
8use crate::filecmp;
9use crate::progress;
10
11#[derive(Copy, Clone, Debug, Enum)]
12pub enum CompareResult {
13    Same,
14    Different,
15    SrcMissing, // object missing in src but present in dst
16    DstMissing, // same as above but flipped
17}
18
19#[derive(Copy, Clone, Debug, Enum)]
20pub enum ObjType {
21    File,
22    Dir,
23    Symlink,
24}
25
26pub type ObjSettings = EnumMap<ObjType, filecmp::MetadataCmpSettings>;
27
28#[derive(Debug, Copy, Clone)]
29pub struct Settings {
30    pub compare: ObjSettings,
31    pub fail_early: bool,
32    pub exit_early: bool,
33}
34
35pub type Mismatch = EnumMap<ObjType, EnumMap<CompareResult, u64>>;
36
37#[derive(Default)]
38pub struct Summary {
39    pub mismatch: Mismatch,
40}
41
42impl std::ops::Add for Summary {
43    type Output = Self;
44    fn add(self, other: Self) -> Self {
45        let mut mismatch = self.mismatch;
46        for (obj_type, &cmp_res_map) in &other.mismatch {
47            for (cmp_res, &count) in &cmp_res_map {
48                mismatch[obj_type][cmp_res] += count;
49            }
50        }
51        Self { mismatch }
52    }
53}
54
55impl std::fmt::Display for Summary {
56    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
57        for (obj_type, &cmp_res_map) in &self.mismatch {
58            for (cmp_res, &count) in &cmp_res_map {
59                writeln!(f, "{obj_type:?} {cmp_res:?}: {count}")?;
60            }
61        }
62        Ok(())
63    }
64}
65
66#[derive(Debug, Clone)]
67pub struct LogWriter {
68    log_opt: Option<std::sync::Arc<tokio::sync::Mutex<tokio::io::BufWriter<tokio::fs::File>>>>,
69}
70
71impl LogWriter {
72    pub async fn new(log_path_opt: Option<&std::path::Path>) -> Result<Self> {
73        if let Some(log_path) = log_path_opt {
74            let log_file = tokio::fs::OpenOptions::new()
75                .write(true)
76                .create_new(true)
77                .open(log_path)
78                .await
79                .with_context(|| format!("Failed to open log file: {log_path:?}"))?;
80            let log =
81                std::sync::Arc::new(tokio::sync::Mutex::new(tokio::io::BufWriter::new(log_file)));
82            Ok(Self { log_opt: Some(log) })
83        } else {
84            Ok(Self { log_opt: None })
85        }
86    }
87
88    pub async fn log_mismatch(
89        &self,
90        cmp_result: CompareResult,
91        src_obj_type: Option<ObjType>,
92        src: &std::path::Path,
93        dst_obj_type: Option<ObjType>,
94        dst: &std::path::Path,
95    ) -> Result<()> {
96        self.write(&format!(
97            "[{cmp_result:?}]\n\t[{src_obj_type:?}]\t{src:?}\n\t[{dst_obj_type:?}]\t{dst:?}\n"
98        ))
99        .await
100    }
101
102    async fn write(&self, msg: &str) -> Result<()> {
103        if let Some(log) = &self.log_opt {
104            let mut log = log.lock().await;
105            log.write_all(msg.as_bytes())
106                .await
107                .context("Failed to write to log file")?;
108        }
109        Ok(())
110    }
111
112    pub async fn flush(&self) -> Result<()> {
113        if let Some(log) = &self.log_opt {
114            let mut log = log.lock().await;
115            log.flush().await.context("Failed to flush log file")?;
116        }
117        Ok(())
118    }
119}
120
121fn obj_type(metadata: &std::fs::Metadata) -> ObjType {
122    if metadata.is_file() {
123        ObjType::File
124    } else if metadata.is_dir() {
125        ObjType::Dir
126    } else if metadata.is_symlink() {
127        ObjType::Symlink
128    } else {
129        unreachable!("Unknown object type! {:?}", &metadata);
130    }
131}
132
133#[instrument(skip(prog_track))]
134#[async_recursion]
135pub async fn cmp(
136    prog_track: &'static progress::Progress,
137    src: &std::path::Path,
138    dst: &std::path::Path,
139    log: &LogWriter,
140    settings: &Settings,
141) -> Result<Summary> {
142    let _prog_guard = prog_track.ops.guard();
143    tracing::debug!("reading source metadata");
144    // it is impossible for src not exist other than user passing invalid path (which is an error)
145    let src_metadata = tokio::fs::symlink_metadata(src)
146        .await
147        .with_context(|| format!("failed reading metadata from {:?}", &src))?;
148    let mut cmp_summary = Summary::default();
149    let src_obj_type = obj_type(&src_metadata);
150    let dst_metadata = {
151        match tokio::fs::symlink_metadata(dst).await {
152            Ok(metadata) => metadata,
153            Err(err) => {
154                if err.kind() == std::io::ErrorKind::NotFound {
155                    cmp_summary.mismatch[src_obj_type][CompareResult::DstMissing] += 1;
156                    log.log_mismatch(
157                        CompareResult::DstMissing,
158                        Some(src_obj_type),
159                        src,
160                        None,
161                        dst,
162                    )
163                    .await?;
164                    return Ok(cmp_summary);
165                }
166                return Err(err).context(format!("failed reading metadata from {:?}", &dst));
167            }
168        }
169    };
170    if !is_file_type_same(&src_metadata, &dst_metadata)
171        || !filecmp::metadata_equal(
172            &settings.compare[src_obj_type],
173            &src_metadata,
174            &dst_metadata,
175        )
176    {
177        // we use the src type for the summary attribution
178        cmp_summary.mismatch[src_obj_type][CompareResult::Different] += 1;
179        let dst_obj_type = obj_type(&dst_metadata);
180        log.log_mismatch(
181            CompareResult::Different,
182            Some(src_obj_type),
183            src,
184            Some(dst_obj_type),
185            dst,
186        )
187        .await?;
188        if settings.exit_early {
189            return Ok(cmp_summary);
190        }
191    } else {
192        cmp_summary.mismatch[src_obj_type][CompareResult::Same] += 1;
193    }
194    if !src_metadata.is_dir() || !dst_metadata.is_dir() {
195        // nothing more to do
196        return Ok(cmp_summary);
197    }
198    tracing::debug!("process contents of 'src' directory");
199    let mut src_entries = tokio::fs::read_dir(src)
200        .await
201        .with_context(|| format!("cannot open directory {src:?} for reading"))?;
202    let mut join_set = tokio::task::JoinSet::new();
203    let mut success = true;
204    // create a set of all the files we already processed
205    let mut processed_files = std::collections::HashSet::new();
206    // iterate through src entries and recursively call "cmp" on each one
207    while let Some(src_entry) = src_entries
208        .next_entry()
209        .await
210        .with_context(|| format!("failed traversing directory {:?}", &src))?
211    {
212        // it's better to await the token here so that we throttle the syscalls generated by the
213        // DirEntry call. the ops-throttle will never cause a deadlock (unlike max-open-files limit)
214        // so it's safe to do here.
215        throttle::get_ops_token().await;
216        let entry_path = src_entry.path();
217        let entry_name = entry_path.file_name().unwrap();
218        processed_files.insert(entry_name.to_owned());
219        let dst_path = dst.join(entry_name);
220        let log = log.clone();
221        let settings = *settings;
222        let do_cmp =
223            || async move { cmp(prog_track, &entry_path, &dst_path, &log, &settings).await };
224        join_set.spawn(do_cmp());
225    }
226    // unfortunately ReadDir is opening file-descriptors and there's not a good way to limit this,
227    // one thing we CAN do however is to drop it as soon as we're done with it
228    drop(src_entries);
229    tracing::debug!("process contents of 'dst' directory");
230    let mut dst_entries = tokio::fs::read_dir(dst)
231        .await
232        .with_context(|| format!("cannot open directory {:?} for reading", &dst))?;
233    // iterate through update entries and log each one that's not present in src
234    while let Some(dst_entry) = dst_entries
235        .next_entry()
236        .await
237        .with_context(|| format!("failed traversing directory {:?}", &dst))?
238    {
239        let entry_path = dst_entry.path();
240        let entry_name = entry_path.file_name().unwrap();
241        if processed_files.contains(entry_name) {
242            // we already must have considered this file, skip it
243            continue;
244        }
245        tracing::debug!("found a new entry in the 'dst' directory");
246        let dst_path = dst.join(entry_name);
247        let dst_entry_metadata = tokio::fs::symlink_metadata(&dst_path)
248            .await
249            .with_context(|| format!("failed reading metadata from {:?}", &dst_path))?;
250        let dst_obj_type = obj_type(&dst_entry_metadata);
251        cmp_summary.mismatch[dst_obj_type][CompareResult::SrcMissing] += 1;
252        log.log_mismatch(
253            CompareResult::SrcMissing,
254            None,
255            &src.join(entry_name),
256            Some(dst_obj_type),
257            &dst_path,
258        )
259        .await?;
260    }
261    // unfortunately ReadDir is opening file-descriptors and there's not a good way to limit this,
262    // one thing we CAN do however is to drop it as soon as we're done with it
263    drop(dst_entries);
264    while let Some(res) = join_set.join_next().await {
265        match res? {
266            Ok(summary) => cmp_summary = cmp_summary + summary,
267            Err(error) => {
268                tracing::error!("cmp: {:?} vs {:?} failed with: {}", src, dst, &error);
269                if settings.fail_early {
270                    return Err(error);
271                }
272                success = false;
273            }
274        }
275    }
276    if !success {
277        return Err(anyhow!("cmp: {:?} vs {:?} failed!", src, dst));
278    }
279    Ok(cmp_summary)
280}
281
282#[cfg(test)]
283mod cmp_tests {
284    use crate::copy;
285    use crate::preserve;
286    use crate::testutils;
287    use enum_map::enum_map;
288    use tracing_test::traced_test;
289
290    use super::*;
291
292    lazy_static! {
293        static ref PROGRESS: progress::Progress = progress::Progress::new();
294        static ref NO_PRESERVE_SETTINGS: preserve::Settings = preserve::preserve_default();
295        static ref DO_PRESERVE_SETTINGS: preserve::Settings = preserve::preserve_all();
296    }
297
298    async fn setup_test_dirs(preserve: bool) -> Result<std::path::PathBuf> {
299        let tmp_dir = testutils::setup_test_dir().await?;
300        let test_path = tmp_dir.as_path();
301        copy::copy(
302            &PROGRESS,
303            &test_path.join("foo"),
304            &test_path.join("bar"),
305            &copy::Settings {
306                dereference: false,
307                fail_early: false,
308                overwrite: false,
309                overwrite_compare: filecmp::MetadataCmpSettings {
310                    size: true,
311                    mtime: true,
312                    ..Default::default()
313                },
314                chunk_size: 0,
315            },
316            if preserve {
317                &DO_PRESERVE_SETTINGS
318            } else {
319                &NO_PRESERVE_SETTINGS
320            },
321            false,
322        )
323        .await?;
324        Ok(tmp_dir)
325    }
326
327    async fn truncate_file(path: &str) -> Result<()> {
328        let file = tokio::fs::File::create(path).await?;
329        file.set_len(0).await?;
330        Ok(())
331    }
332
333    #[tokio::test]
334    #[traced_test]
335    async fn check_basic_cmp() -> Result<()> {
336        let tmp_dir = setup_test_dirs(true).await?;
337        // drop 1 file from src
338        tokio::fs::remove_file(&tmp_dir.join("foo").join("bar").join("1.txt")).await?;
339        // sleep to ensure mtime is different, this acts as a poor-mans barrier
340        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
341        // modify 1 file in dst
342        truncate_file(
343            tmp_dir
344                .join("bar")
345                .join("baz")
346                .join("4.txt")
347                .to_str()
348                .unwrap(),
349        )
350        .await?;
351        // drop 1 (other) file from dst
352        tokio::fs::remove_file(&tmp_dir.join("bar").join("bar").join("2.txt")).await?;
353        // create one more file in dst -- this will also modify the mtime of the directory
354        tokio::fs::File::create(&tmp_dir.join("bar").join("baz").join("7.txt")).await?;
355        let compare_settings = Settings {
356            fail_early: false,
357            exit_early: false,
358            compare: enum_map! {
359                ObjType::File => filecmp::MetadataCmpSettings {
360                    size: true,
361                    mtime: true,
362                    ..Default::default()
363                },
364                ObjType::Dir => filecmp::MetadataCmpSettings {
365                    mtime: true,
366                    ..Default::default()
367                },
368                ObjType::Symlink => filecmp::MetadataCmpSettings {
369                    mtime: true,
370                    ..Default::default()
371                },
372            },
373        };
374        let summary = cmp(
375            &PROGRESS,
376            &tmp_dir.join("foo"),
377            &tmp_dir.join("bar"),
378            &LogWriter::new(Some(tmp_dir.join("cmp.log").as_path())).await?,
379            &compare_settings,
380        )
381        .await?;
382        let mismatch: Mismatch = enum_map! {
383            ObjType::File => enum_map! {
384                CompareResult::Different => 1,
385                CompareResult::Same => 2,
386                CompareResult::SrcMissing => 2,
387                CompareResult::DstMissing => 1,
388            },
389            ObjType::Dir => enum_map! {
390                CompareResult::Different => 2,
391                CompareResult::Same => 1,
392                CompareResult::SrcMissing => 0,
393                CompareResult::DstMissing => 0,
394            },
395            ObjType::Symlink => enum_map! {
396                CompareResult::Different => 0,
397                CompareResult::Same => 2,
398                CompareResult::SrcMissing => 0,
399                CompareResult::DstMissing => 0,
400            },
401        };
402        assert_eq!(summary.mismatch, mismatch);
403        Ok(())
404    }
405}