1use anyhow::{anyhow, Context, Result};
2use async_recursion::async_recursion;
3use enum_map::{Enum, EnumMap};
4use tokio::io::AsyncWriteExt;
5use tracing::instrument;
6
7use crate::copy::is_file_type_same;
8use crate::filecmp;
9use crate::progress;
10
11#[derive(Copy, Clone, Debug, Enum)]
12pub enum CompareResult {
13 Same,
14 Different,
15 SrcMissing, DstMissing, }
18
19#[derive(Copy, Clone, Debug, Enum)]
20pub enum ObjType {
21 File,
22 Dir,
23 Symlink,
24 Other, }
26
27pub type ObjSettings = EnumMap<ObjType, filecmp::MetadataCmpSettings>;
28
29#[derive(Debug, Copy, Clone)]
30pub struct Settings {
31 pub compare: ObjSettings,
32 pub fail_early: bool,
33 pub exit_early: bool,
34}
35
36pub type Mismatch = EnumMap<ObjType, EnumMap<CompareResult, u64>>;
37
38#[derive(Default)]
39pub struct Summary {
40 pub mismatch: Mismatch,
41}
42
43impl std::ops::Add for Summary {
44 type Output = Self;
45 fn add(self, other: Self) -> Self {
46 let mut mismatch = self.mismatch;
47 for (obj_type, &cmp_res_map) in &other.mismatch {
48 for (cmp_res, &count) in &cmp_res_map {
49 mismatch[obj_type][cmp_res] += count;
50 }
51 }
52 Self { mismatch }
53 }
54}
55
56impl std::fmt::Display for Summary {
57 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
58 for (obj_type, &cmp_res_map) in &self.mismatch {
59 for (cmp_res, &count) in &cmp_res_map {
60 writeln!(f, "{obj_type:?} {cmp_res:?}: {count}")?;
61 }
62 }
63 Ok(())
64 }
65}
66
67#[derive(Clone)]
68pub struct LogWriter {
69 file: Option<std::sync::Arc<tokio::sync::Mutex<tokio::io::BufWriter<tokio::fs::File>>>>,
70 stdout: Option<std::sync::Arc<tokio::sync::Mutex<tokio::io::BufWriter<tokio::io::Stdout>>>>,
71}
72
73impl std::fmt::Debug for LogWriter {
74 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 f.debug_struct("LogWriter")
76 .field("file", &self.file.is_some())
77 .field("stdout", &self.stdout.is_some())
78 .finish()
79 }
80}
81
82impl LogWriter {
83 pub async fn new(log_path_opt: Option<&std::path::Path>, use_stdout: bool) -> Result<Self> {
89 if let Some(log_path) = log_path_opt {
90 let log_file = tokio::fs::OpenOptions::new()
91 .write(true)
92 .create_new(true)
93 .open(log_path)
94 .await
95 .with_context(|| format!("Failed to open log file: {log_path:?}"))?;
96 let log =
97 std::sync::Arc::new(tokio::sync::Mutex::new(tokio::io::BufWriter::new(log_file)));
98 Ok(Self {
99 file: Some(log),
100 stdout: None,
101 })
102 } else if use_stdout {
103 Ok(Self {
104 file: None,
105 stdout: Some(std::sync::Arc::new(tokio::sync::Mutex::new(
106 tokio::io::BufWriter::new(tokio::io::stdout()),
107 ))),
108 })
109 } else {
110 Ok(Self {
111 file: None,
112 stdout: None,
113 })
114 }
115 }
116
117 pub async fn log_mismatch(
118 &self,
119 cmp_result: CompareResult,
120 src_obj_type: Option<ObjType>,
121 src: &std::path::Path,
122 dst_obj_type: Option<ObjType>,
123 dst: &std::path::Path,
124 ) -> Result<()> {
125 self.write(&format!(
126 "[{cmp_result:?}]\n\t[{src_obj_type:?}]\t{src:?}\n\t[{dst_obj_type:?}]\t{dst:?}\n"
127 ))
128 .await
129 }
130
131 async fn write(&self, msg: &str) -> Result<()> {
132 if let Some(log) = &self.file {
133 let mut log = log.lock().await;
134 log.write_all(msg.as_bytes())
135 .await
136 .context("Failed to write to log file")?;
137 }
138 if let Some(stdout) = &self.stdout {
139 let mut stdout = stdout.lock().await;
140 stdout
141 .write_all(msg.as_bytes())
142 .await
143 .context("Failed to write to stdout")?;
144 }
145 Ok(())
146 }
147
148 pub async fn flush(&self) -> Result<()> {
149 if let Some(log) = &self.file {
150 let mut log = log.lock().await;
151 log.flush().await.context("Failed to flush log file")?;
152 }
153 if let Some(stdout) = &self.stdout {
154 let mut stdout = stdout.lock().await;
155 stdout.flush().await.context("Failed to flush stdout")?;
156 }
157 Ok(())
158 }
159}
160
161fn obj_type(metadata: &std::fs::Metadata) -> ObjType {
162 if metadata.is_file() {
163 ObjType::File
164 } else if metadata.is_dir() {
165 ObjType::Dir
166 } else if metadata.is_symlink() {
167 ObjType::Symlink
168 } else {
169 ObjType::Other
171 }
172}
173
174#[instrument(skip(prog_track))]
175#[async_recursion]
176pub async fn cmp(
177 prog_track: &'static progress::Progress,
178 src: &std::path::Path,
179 dst: &std::path::Path,
180 log: &LogWriter,
181 settings: &Settings,
182) -> Result<Summary> {
183 let _prog_guard = prog_track.ops.guard();
184 tracing::debug!("reading source metadata");
185 let src_metadata = tokio::fs::symlink_metadata(src)
187 .await
188 .with_context(|| format!("failed reading metadata from {:?}", &src))?;
189 let mut cmp_summary = Summary::default();
190 let src_obj_type = obj_type(&src_metadata);
191 let dst_metadata = {
192 match tokio::fs::symlink_metadata(dst).await {
193 Ok(metadata) => metadata,
194 Err(err) => {
195 if err.kind() == std::io::ErrorKind::NotFound {
196 cmp_summary.mismatch[src_obj_type][CompareResult::DstMissing] += 1;
197 log.log_mismatch(
198 CompareResult::DstMissing,
199 Some(src_obj_type),
200 src,
201 None,
202 dst,
203 )
204 .await?;
205 return Ok(cmp_summary);
206 }
207 return Err(err).context(format!("failed reading metadata from {:?}", &dst));
208 }
209 }
210 };
211 if !is_file_type_same(&src_metadata, &dst_metadata)
212 || !filecmp::metadata_equal(
213 &settings.compare[src_obj_type],
214 &src_metadata,
215 &dst_metadata,
216 )
217 {
218 cmp_summary.mismatch[src_obj_type][CompareResult::Different] += 1;
220 let dst_obj_type = obj_type(&dst_metadata);
221 log.log_mismatch(
222 CompareResult::Different,
223 Some(src_obj_type),
224 src,
225 Some(dst_obj_type),
226 dst,
227 )
228 .await?;
229 if settings.exit_early {
230 return Ok(cmp_summary);
231 }
232 } else {
233 cmp_summary.mismatch[src_obj_type][CompareResult::Same] += 1;
234 }
235 if !src_metadata.is_dir() || !dst_metadata.is_dir() {
236 return Ok(cmp_summary);
238 }
239 tracing::debug!("process contents of 'src' directory");
240 let mut src_entries = tokio::fs::read_dir(src)
241 .await
242 .with_context(|| format!("cannot open directory {src:?} for reading"))?;
243 let mut join_set = tokio::task::JoinSet::new();
244 let mut success = true;
245 let mut processed_files = std::collections::HashSet::new();
247 while let Some(src_entry) = src_entries
249 .next_entry()
250 .await
251 .with_context(|| format!("failed traversing directory {:?}", &src))?
252 {
253 throttle::get_ops_token().await;
257 let entry_path = src_entry.path();
258 let entry_name = entry_path.file_name().unwrap();
259 processed_files.insert(entry_name.to_owned());
260 let dst_path = dst.join(entry_name);
261 let log = log.clone();
262 let settings = *settings;
263 let do_cmp =
264 || async move { cmp(prog_track, &entry_path, &dst_path, &log, &settings).await };
265 join_set.spawn(do_cmp());
266 }
267 drop(src_entries);
270 tracing::debug!("process contents of 'dst' directory");
271 let mut dst_entries = tokio::fs::read_dir(dst)
272 .await
273 .with_context(|| format!("cannot open directory {:?} for reading", &dst))?;
274 while let Some(dst_entry) = dst_entries
276 .next_entry()
277 .await
278 .with_context(|| format!("failed traversing directory {:?}", &dst))?
279 {
280 let entry_path = dst_entry.path();
281 let entry_name = entry_path.file_name().unwrap();
282 if processed_files.contains(entry_name) {
283 continue;
285 }
286 tracing::debug!("found a new entry in the 'dst' directory");
287 let dst_path = dst.join(entry_name);
288 let dst_entry_metadata = tokio::fs::symlink_metadata(&dst_path)
289 .await
290 .with_context(|| format!("failed reading metadata from {:?}", &dst_path))?;
291 let dst_obj_type = obj_type(&dst_entry_metadata);
292 cmp_summary.mismatch[dst_obj_type][CompareResult::SrcMissing] += 1;
293 log.log_mismatch(
294 CompareResult::SrcMissing,
295 None,
296 &src.join(entry_name),
297 Some(dst_obj_type),
298 &dst_path,
299 )
300 .await?;
301 }
302 drop(dst_entries);
305 while let Some(res) = join_set.join_next().await {
306 match res? {
307 Ok(summary) => cmp_summary = cmp_summary + summary,
308 Err(error) => {
309 tracing::error!("cmp: {:?} vs {:?} failed with: {:#}", src, dst, &error);
310 if settings.fail_early {
311 return Err(error);
312 }
313 success = false;
314 }
315 }
316 }
317 if !success {
318 return Err(anyhow!("cmp: {:?} vs {:?} failed!", src, dst));
319 }
320 Ok(cmp_summary)
321}
322
323#[cfg(test)]
324mod cmp_tests {
325 use crate::copy;
326 use crate::preserve;
327 use crate::testutils;
328 use enum_map::enum_map;
329 use tracing_test::traced_test;
330
331 use super::*;
332
333 lazy_static! {
334 static ref PROGRESS: progress::Progress = progress::Progress::new();
335 static ref NO_PRESERVE_SETTINGS: preserve::Settings = preserve::preserve_default();
336 static ref DO_PRESERVE_SETTINGS: preserve::Settings = preserve::preserve_all();
337 }
338
339 async fn setup_test_dirs(preserve: bool) -> Result<std::path::PathBuf> {
340 let tmp_dir = testutils::setup_test_dir().await?;
341 let test_path = tmp_dir.as_path();
342 copy::copy(
343 &PROGRESS,
344 &test_path.join("foo"),
345 &test_path.join("bar"),
346 ©::Settings {
347 dereference: false,
348 fail_early: false,
349 overwrite: false,
350 overwrite_compare: filecmp::MetadataCmpSettings {
351 size: true,
352 mtime: true,
353 ..Default::default()
354 },
355 chunk_size: 0,
356 remote_copy_buffer_size: 0,
357 },
358 if preserve {
359 &DO_PRESERVE_SETTINGS
360 } else {
361 &NO_PRESERVE_SETTINGS
362 },
363 false,
364 )
365 .await?;
366 Ok(tmp_dir)
367 }
368
369 async fn truncate_file(path: &str) -> Result<()> {
370 let file = tokio::fs::File::create(path).await?;
371 file.set_len(0).await?;
372 Ok(())
373 }
374
375 #[tokio::test]
376 #[traced_test]
377 async fn check_basic_cmp() -> Result<()> {
378 let tmp_dir = setup_test_dirs(true).await?;
379 tokio::fs::remove_file(&tmp_dir.join("foo").join("bar").join("1.txt")).await?;
381 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
383 truncate_file(
385 tmp_dir
386 .join("bar")
387 .join("baz")
388 .join("4.txt")
389 .to_str()
390 .unwrap(),
391 )
392 .await?;
393 tokio::fs::remove_file(&tmp_dir.join("bar").join("bar").join("2.txt")).await?;
395 tokio::fs::File::create(&tmp_dir.join("bar").join("baz").join("7.txt")).await?;
397 let compare_settings = Settings {
398 fail_early: false,
399 exit_early: false,
400 compare: enum_map! {
401 ObjType::File => filecmp::MetadataCmpSettings {
402 size: true,
403 mtime: true,
404 ..Default::default()
405 },
406 ObjType::Dir => filecmp::MetadataCmpSettings {
407 mtime: true,
408 ..Default::default()
409 },
410 ObjType::Symlink => filecmp::MetadataCmpSettings {
411 mtime: true,
412 ..Default::default()
413 },
414 ObjType::Other => filecmp::MetadataCmpSettings {
415 mtime: true,
416 ..Default::default()
417 },
418 },
419 };
420 let summary = cmp(
421 &PROGRESS,
422 &tmp_dir.join("foo"),
423 &tmp_dir.join("bar"),
424 &LogWriter::new(Some(tmp_dir.join("cmp.log").as_path()), false).await?,
425 &compare_settings,
426 )
427 .await?;
428 let mismatch: Mismatch = enum_map! {
429 ObjType::File => enum_map! {
430 CompareResult::Different => 1,
431 CompareResult::Same => 2,
432 CompareResult::SrcMissing => 2,
433 CompareResult::DstMissing => 1,
434 },
435 ObjType::Dir => enum_map! {
436 CompareResult::Different => 2,
437 CompareResult::Same => 1,
438 CompareResult::SrcMissing => 0,
439 CompareResult::DstMissing => 0,
440 },
441 ObjType::Symlink => enum_map! {
442 CompareResult::Different => 0,
443 CompareResult::Same => 2,
444 CompareResult::SrcMissing => 0,
445 CompareResult::DstMissing => 0,
446 },
447 ObjType::Other => enum_map! {
448 CompareResult::Different => 0,
449 CompareResult::Same => 0,
450 CompareResult::SrcMissing => 0,
451 CompareResult::DstMissing => 0,
452 },
453 };
454 assert_eq!(summary.mismatch, mismatch);
455 Ok(())
456 }
457}