1use anyhow::{anyhow, Context, Result};
2use async_recursion::async_recursion;
3use enum_map::{Enum, EnumMap};
4use tokio::io::AsyncWriteExt;
5use tracing::instrument;
6
7use crate::copy::is_file_type_same;
8use crate::filecmp;
9use crate::progress;
10
11#[derive(Copy, Clone, Debug, Enum)]
12pub enum CompareResult {
13 Same,
14 Different,
15 SrcMissing, DstMissing, }
18
19#[derive(Copy, Clone, Debug, Enum)]
20pub enum ObjType {
21 File,
22 Dir,
23 Symlink,
24 Other, }
26
27pub type ObjSettings = EnumMap<ObjType, filecmp::MetadataCmpSettings>;
28
29#[derive(Debug, Copy, Clone)]
30pub struct Settings {
31 pub compare: ObjSettings,
32 pub fail_early: bool,
33 pub exit_early: bool,
34}
35
36pub type Mismatch = EnumMap<ObjType, EnumMap<CompareResult, u64>>;
37
38#[derive(Default)]
39pub struct Summary {
40 pub mismatch: Mismatch,
41}
42
43impl std::ops::Add for Summary {
44 type Output = Self;
45 fn add(self, other: Self) -> Self {
46 let mut mismatch = self.mismatch;
47 for (obj_type, &cmp_res_map) in &other.mismatch {
48 for (cmp_res, &count) in &cmp_res_map {
49 mismatch[obj_type][cmp_res] += count;
50 }
51 }
52 Self { mismatch }
53 }
54}
55
56impl std::fmt::Display for Summary {
57 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
58 for (obj_type, &cmp_res_map) in &self.mismatch {
59 for (cmp_res, &count) in &cmp_res_map {
60 writeln!(f, "{obj_type:?} {cmp_res:?}: {count}")?;
61 }
62 }
63 Ok(())
64 }
65}
66
67#[derive(Debug, Clone)]
68pub struct LogWriter {
69 log_opt: Option<std::sync::Arc<tokio::sync::Mutex<tokio::io::BufWriter<tokio::fs::File>>>>,
70}
71
72impl LogWriter {
73 pub async fn new(log_path_opt: Option<&std::path::Path>) -> Result<Self> {
74 if let Some(log_path) = log_path_opt {
75 let log_file = tokio::fs::OpenOptions::new()
76 .write(true)
77 .create_new(true)
78 .open(log_path)
79 .await
80 .with_context(|| format!("Failed to open log file: {log_path:?}"))?;
81 let log =
82 std::sync::Arc::new(tokio::sync::Mutex::new(tokio::io::BufWriter::new(log_file)));
83 Ok(Self { log_opt: Some(log) })
84 } else {
85 Ok(Self { log_opt: None })
86 }
87 }
88
89 pub async fn log_mismatch(
90 &self,
91 cmp_result: CompareResult,
92 src_obj_type: Option<ObjType>,
93 src: &std::path::Path,
94 dst_obj_type: Option<ObjType>,
95 dst: &std::path::Path,
96 ) -> Result<()> {
97 self.write(&format!(
98 "[{cmp_result:?}]\n\t[{src_obj_type:?}]\t{src:?}\n\t[{dst_obj_type:?}]\t{dst:?}\n"
99 ))
100 .await
101 }
102
103 async fn write(&self, msg: &str) -> Result<()> {
104 if let Some(log) = &self.log_opt {
105 let mut log = log.lock().await;
106 log.write_all(msg.as_bytes())
107 .await
108 .context("Failed to write to log file")?;
109 }
110 Ok(())
111 }
112
113 pub async fn flush(&self) -> Result<()> {
114 if let Some(log) = &self.log_opt {
115 let mut log = log.lock().await;
116 log.flush().await.context("Failed to flush log file")?;
117 }
118 Ok(())
119 }
120}
121
122fn obj_type(metadata: &std::fs::Metadata) -> ObjType {
123 if metadata.is_file() {
124 ObjType::File
125 } else if metadata.is_dir() {
126 ObjType::Dir
127 } else if metadata.is_symlink() {
128 ObjType::Symlink
129 } else {
130 ObjType::Other
132 }
133}
134
135#[instrument(skip(prog_track))]
136#[async_recursion]
137pub async fn cmp(
138 prog_track: &'static progress::Progress,
139 src: &std::path::Path,
140 dst: &std::path::Path,
141 log: &LogWriter,
142 settings: &Settings,
143) -> Result<Summary> {
144 let _prog_guard = prog_track.ops.guard();
145 tracing::debug!("reading source metadata");
146 let src_metadata = tokio::fs::symlink_metadata(src)
148 .await
149 .with_context(|| format!("failed reading metadata from {:?}", &src))?;
150 let mut cmp_summary = Summary::default();
151 let src_obj_type = obj_type(&src_metadata);
152 let dst_metadata = {
153 match tokio::fs::symlink_metadata(dst).await {
154 Ok(metadata) => metadata,
155 Err(err) => {
156 if err.kind() == std::io::ErrorKind::NotFound {
157 cmp_summary.mismatch[src_obj_type][CompareResult::DstMissing] += 1;
158 log.log_mismatch(
159 CompareResult::DstMissing,
160 Some(src_obj_type),
161 src,
162 None,
163 dst,
164 )
165 .await?;
166 return Ok(cmp_summary);
167 }
168 return Err(err).context(format!("failed reading metadata from {:?}", &dst));
169 }
170 }
171 };
172 if !is_file_type_same(&src_metadata, &dst_metadata)
173 || !filecmp::metadata_equal(
174 &settings.compare[src_obj_type],
175 &src_metadata,
176 &dst_metadata,
177 )
178 {
179 cmp_summary.mismatch[src_obj_type][CompareResult::Different] += 1;
181 let dst_obj_type = obj_type(&dst_metadata);
182 log.log_mismatch(
183 CompareResult::Different,
184 Some(src_obj_type),
185 src,
186 Some(dst_obj_type),
187 dst,
188 )
189 .await?;
190 if settings.exit_early {
191 return Ok(cmp_summary);
192 }
193 } else {
194 cmp_summary.mismatch[src_obj_type][CompareResult::Same] += 1;
195 }
196 if !src_metadata.is_dir() || !dst_metadata.is_dir() {
197 return Ok(cmp_summary);
199 }
200 tracing::debug!("process contents of 'src' directory");
201 let mut src_entries = tokio::fs::read_dir(src)
202 .await
203 .with_context(|| format!("cannot open directory {src:?} for reading"))?;
204 let mut join_set = tokio::task::JoinSet::new();
205 let mut success = true;
206 let mut processed_files = std::collections::HashSet::new();
208 while let Some(src_entry) = src_entries
210 .next_entry()
211 .await
212 .with_context(|| format!("failed traversing directory {:?}", &src))?
213 {
214 throttle::get_ops_token().await;
218 let entry_path = src_entry.path();
219 let entry_name = entry_path.file_name().unwrap();
220 processed_files.insert(entry_name.to_owned());
221 let dst_path = dst.join(entry_name);
222 let log = log.clone();
223 let settings = *settings;
224 let do_cmp =
225 || async move { cmp(prog_track, &entry_path, &dst_path, &log, &settings).await };
226 join_set.spawn(do_cmp());
227 }
228 drop(src_entries);
231 tracing::debug!("process contents of 'dst' directory");
232 let mut dst_entries = tokio::fs::read_dir(dst)
233 .await
234 .with_context(|| format!("cannot open directory {:?} for reading", &dst))?;
235 while let Some(dst_entry) = dst_entries
237 .next_entry()
238 .await
239 .with_context(|| format!("failed traversing directory {:?}", &dst))?
240 {
241 let entry_path = dst_entry.path();
242 let entry_name = entry_path.file_name().unwrap();
243 if processed_files.contains(entry_name) {
244 continue;
246 }
247 tracing::debug!("found a new entry in the 'dst' directory");
248 let dst_path = dst.join(entry_name);
249 let dst_entry_metadata = tokio::fs::symlink_metadata(&dst_path)
250 .await
251 .with_context(|| format!("failed reading metadata from {:?}", &dst_path))?;
252 let dst_obj_type = obj_type(&dst_entry_metadata);
253 cmp_summary.mismatch[dst_obj_type][CompareResult::SrcMissing] += 1;
254 log.log_mismatch(
255 CompareResult::SrcMissing,
256 None,
257 &src.join(entry_name),
258 Some(dst_obj_type),
259 &dst_path,
260 )
261 .await?;
262 }
263 drop(dst_entries);
266 while let Some(res) = join_set.join_next().await {
267 match res? {
268 Ok(summary) => cmp_summary = cmp_summary + summary,
269 Err(error) => {
270 tracing::error!("cmp: {:?} vs {:?} failed with: {}", src, dst, &error);
271 if settings.fail_early {
272 return Err(error);
273 }
274 success = false;
275 }
276 }
277 }
278 if !success {
279 return Err(anyhow!("cmp: {:?} vs {:?} failed!", src, dst));
280 }
281 Ok(cmp_summary)
282}
283
284#[cfg(test)]
285mod cmp_tests {
286 use crate::copy;
287 use crate::preserve;
288 use crate::testutils;
289 use enum_map::enum_map;
290 use tracing_test::traced_test;
291
292 use super::*;
293
294 lazy_static! {
295 static ref PROGRESS: progress::Progress = progress::Progress::new();
296 static ref NO_PRESERVE_SETTINGS: preserve::Settings = preserve::preserve_default();
297 static ref DO_PRESERVE_SETTINGS: preserve::Settings = preserve::preserve_all();
298 }
299
300 async fn setup_test_dirs(preserve: bool) -> Result<std::path::PathBuf> {
301 let tmp_dir = testutils::setup_test_dir().await?;
302 let test_path = tmp_dir.as_path();
303 copy::copy(
304 &PROGRESS,
305 &test_path.join("foo"),
306 &test_path.join("bar"),
307 ©::Settings {
308 dereference: false,
309 fail_early: false,
310 overwrite: false,
311 overwrite_compare: filecmp::MetadataCmpSettings {
312 size: true,
313 mtime: true,
314 ..Default::default()
315 },
316 chunk_size: 0,
317 },
318 if preserve {
319 &DO_PRESERVE_SETTINGS
320 } else {
321 &NO_PRESERVE_SETTINGS
322 },
323 false,
324 )
325 .await?;
326 Ok(tmp_dir)
327 }
328
329 async fn truncate_file(path: &str) -> Result<()> {
330 let file = tokio::fs::File::create(path).await?;
331 file.set_len(0).await?;
332 Ok(())
333 }
334
335 #[tokio::test]
336 #[traced_test]
337 async fn check_basic_cmp() -> Result<()> {
338 let tmp_dir = setup_test_dirs(true).await?;
339 tokio::fs::remove_file(&tmp_dir.join("foo").join("bar").join("1.txt")).await?;
341 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
343 truncate_file(
345 tmp_dir
346 .join("bar")
347 .join("baz")
348 .join("4.txt")
349 .to_str()
350 .unwrap(),
351 )
352 .await?;
353 tokio::fs::remove_file(&tmp_dir.join("bar").join("bar").join("2.txt")).await?;
355 tokio::fs::File::create(&tmp_dir.join("bar").join("baz").join("7.txt")).await?;
357 let compare_settings = Settings {
358 fail_early: false,
359 exit_early: false,
360 compare: enum_map! {
361 ObjType::File => filecmp::MetadataCmpSettings {
362 size: true,
363 mtime: true,
364 ..Default::default()
365 },
366 ObjType::Dir => filecmp::MetadataCmpSettings {
367 mtime: true,
368 ..Default::default()
369 },
370 ObjType::Symlink => filecmp::MetadataCmpSettings {
371 mtime: true,
372 ..Default::default()
373 },
374 ObjType::Other => filecmp::MetadataCmpSettings {
375 mtime: true,
376 ..Default::default()
377 },
378 },
379 };
380 let summary = cmp(
381 &PROGRESS,
382 &tmp_dir.join("foo"),
383 &tmp_dir.join("bar"),
384 &LogWriter::new(Some(tmp_dir.join("cmp.log").as_path())).await?,
385 &compare_settings,
386 )
387 .await?;
388 let mismatch: Mismatch = enum_map! {
389 ObjType::File => enum_map! {
390 CompareResult::Different => 1,
391 CompareResult::Same => 2,
392 CompareResult::SrcMissing => 2,
393 CompareResult::DstMissing => 1,
394 },
395 ObjType::Dir => enum_map! {
396 CompareResult::Different => 2,
397 CompareResult::Same => 1,
398 CompareResult::SrcMissing => 0,
399 CompareResult::DstMissing => 0,
400 },
401 ObjType::Symlink => enum_map! {
402 CompareResult::Different => 0,
403 CompareResult::Same => 2,
404 CompareResult::SrcMissing => 0,
405 CompareResult::DstMissing => 0,
406 },
407 ObjType::Other => enum_map! {
408 CompareResult::Different => 0,
409 CompareResult::Same => 0,
410 CompareResult::SrcMissing => 0,
411 CompareResult::DstMissing => 0,
412 },
413 };
414 assert_eq!(summary.mismatch, mismatch);
415 Ok(())
416 }
417}