1use anyhow::{anyhow, Context, Result};
2use async_recursion::async_recursion;
3use enum_map::{Enum, EnumMap};
4use tokio::io::AsyncWriteExt;
5use tracing::instrument;
6
7use crate::copy::is_file_type_same;
8use crate::filecmp;
9use crate::progress;
10
11#[derive(Copy, Clone, Debug, Enum)]
12pub enum CompareResult {
13 Same,
14 Different,
15 SrcMissing, DstMissing, }
18
19#[derive(Copy, Clone, Debug, Enum)]
20pub enum ObjType {
21 File,
22 Dir,
23 Symlink,
24 Other, }
26
27pub type ObjSettings = EnumMap<ObjType, filecmp::MetadataCmpSettings>;
28
29#[derive(Debug, Copy, Clone)]
30pub struct Settings {
31 pub compare: ObjSettings,
32 pub fail_early: bool,
33 pub exit_early: bool,
34}
35
36pub type Mismatch = EnumMap<ObjType, EnumMap<CompareResult, u64>>;
37
38#[derive(Default)]
39pub struct Summary {
40 pub mismatch: Mismatch,
41}
42
43impl std::ops::Add for Summary {
44 type Output = Self;
45 fn add(self, other: Self) -> Self {
46 let mut mismatch = self.mismatch;
47 for (obj_type, &cmp_res_map) in &other.mismatch {
48 for (cmp_res, &count) in &cmp_res_map {
49 mismatch[obj_type][cmp_res] += count;
50 }
51 }
52 Self { mismatch }
53 }
54}
55
56impl std::fmt::Display for Summary {
57 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
58 for (obj_type, &cmp_res_map) in &self.mismatch {
59 for (cmp_res, &count) in &cmp_res_map {
60 writeln!(f, "{obj_type:?} {cmp_res:?}: {count}")?;
61 }
62 }
63 Ok(())
64 }
65}
66
67#[derive(Debug, Clone)]
68pub struct LogWriter {
69 log_opt: Option<std::sync::Arc<tokio::sync::Mutex<tokio::io::BufWriter<tokio::fs::File>>>>,
70}
71
72impl LogWriter {
73 pub async fn new(log_path_opt: Option<&std::path::Path>) -> Result<Self> {
74 if let Some(log_path) = log_path_opt {
75 let log_file = tokio::fs::OpenOptions::new()
76 .write(true)
77 .create_new(true)
78 .open(log_path)
79 .await
80 .with_context(|| format!("Failed to open log file: {log_path:?}"))?;
81 let log =
82 std::sync::Arc::new(tokio::sync::Mutex::new(tokio::io::BufWriter::new(log_file)));
83 Ok(Self { log_opt: Some(log) })
84 } else {
85 Ok(Self { log_opt: None })
86 }
87 }
88
89 pub async fn log_mismatch(
90 &self,
91 cmp_result: CompareResult,
92 src_obj_type: Option<ObjType>,
93 src: &std::path::Path,
94 dst_obj_type: Option<ObjType>,
95 dst: &std::path::Path,
96 ) -> Result<()> {
97 self.write(&format!(
98 "[{cmp_result:?}]\n\t[{src_obj_type:?}]\t{src:?}\n\t[{dst_obj_type:?}]\t{dst:?}\n"
99 ))
100 .await
101 }
102
103 async fn write(&self, msg: &str) -> Result<()> {
104 if let Some(log) = &self.log_opt {
105 let mut log = log.lock().await;
106 log.write_all(msg.as_bytes())
107 .await
108 .context("Failed to write to log file")?;
109 }
110 Ok(())
111 }
112
113 pub async fn flush(&self) -> Result<()> {
114 if let Some(log) = &self.log_opt {
115 let mut log = log.lock().await;
116 log.flush().await.context("Failed to flush log file")?;
117 }
118 Ok(())
119 }
120}
121
122fn obj_type(metadata: &std::fs::Metadata) -> ObjType {
123 if metadata.is_file() {
124 ObjType::File
125 } else if metadata.is_dir() {
126 ObjType::Dir
127 } else if metadata.is_symlink() {
128 ObjType::Symlink
129 } else {
130 ObjType::Other
132 }
133}
134
135#[instrument(skip(prog_track))]
136#[async_recursion]
137pub async fn cmp(
138 prog_track: &'static progress::Progress,
139 src: &std::path::Path,
140 dst: &std::path::Path,
141 log: &LogWriter,
142 settings: &Settings,
143) -> Result<Summary> {
144 let _prog_guard = prog_track.ops.guard();
145 tracing::debug!("reading source metadata");
146 let src_metadata = tokio::fs::symlink_metadata(src)
148 .await
149 .with_context(|| format!("failed reading metadata from {:?}", &src))?;
150 let mut cmp_summary = Summary::default();
151 let src_obj_type = obj_type(&src_metadata);
152 let dst_metadata = {
153 match tokio::fs::symlink_metadata(dst).await {
154 Ok(metadata) => metadata,
155 Err(err) => {
156 if err.kind() == std::io::ErrorKind::NotFound {
157 cmp_summary.mismatch[src_obj_type][CompareResult::DstMissing] += 1;
158 log.log_mismatch(
159 CompareResult::DstMissing,
160 Some(src_obj_type),
161 src,
162 None,
163 dst,
164 )
165 .await?;
166 return Ok(cmp_summary);
167 }
168 return Err(err).context(format!("failed reading metadata from {:?}", &dst));
169 }
170 }
171 };
172 if !is_file_type_same(&src_metadata, &dst_metadata)
173 || !filecmp::metadata_equal(
174 &settings.compare[src_obj_type],
175 &src_metadata,
176 &dst_metadata,
177 )
178 {
179 cmp_summary.mismatch[src_obj_type][CompareResult::Different] += 1;
181 let dst_obj_type = obj_type(&dst_metadata);
182 log.log_mismatch(
183 CompareResult::Different,
184 Some(src_obj_type),
185 src,
186 Some(dst_obj_type),
187 dst,
188 )
189 .await?;
190 if settings.exit_early {
191 return Ok(cmp_summary);
192 }
193 } else {
194 cmp_summary.mismatch[src_obj_type][CompareResult::Same] += 1;
195 }
196 if !src_metadata.is_dir() || !dst_metadata.is_dir() {
197 return Ok(cmp_summary);
199 }
200 tracing::debug!("process contents of 'src' directory");
201 let mut src_entries = tokio::fs::read_dir(src)
202 .await
203 .with_context(|| format!("cannot open directory {src:?} for reading"))?;
204 let mut join_set = tokio::task::JoinSet::new();
205 let mut success = true;
206 let mut processed_files = std::collections::HashSet::new();
208 while let Some(src_entry) = src_entries
210 .next_entry()
211 .await
212 .with_context(|| format!("failed traversing directory {:?}", &src))?
213 {
214 throttle::get_ops_token().await;
218 let entry_path = src_entry.path();
219 let entry_name = entry_path.file_name().unwrap();
220 processed_files.insert(entry_name.to_owned());
221 let dst_path = dst.join(entry_name);
222 let log = log.clone();
223 let settings = *settings;
224 let do_cmp =
225 || async move { cmp(prog_track, &entry_path, &dst_path, &log, &settings).await };
226 join_set.spawn(do_cmp());
227 }
228 drop(src_entries);
231 tracing::debug!("process contents of 'dst' directory");
232 let mut dst_entries = tokio::fs::read_dir(dst)
233 .await
234 .with_context(|| format!("cannot open directory {:?} for reading", &dst))?;
235 while let Some(dst_entry) = dst_entries
237 .next_entry()
238 .await
239 .with_context(|| format!("failed traversing directory {:?}", &dst))?
240 {
241 let entry_path = dst_entry.path();
242 let entry_name = entry_path.file_name().unwrap();
243 if processed_files.contains(entry_name) {
244 continue;
246 }
247 tracing::debug!("found a new entry in the 'dst' directory");
248 let dst_path = dst.join(entry_name);
249 let dst_entry_metadata = tokio::fs::symlink_metadata(&dst_path)
250 .await
251 .with_context(|| format!("failed reading metadata from {:?}", &dst_path))?;
252 let dst_obj_type = obj_type(&dst_entry_metadata);
253 cmp_summary.mismatch[dst_obj_type][CompareResult::SrcMissing] += 1;
254 log.log_mismatch(
255 CompareResult::SrcMissing,
256 None,
257 &src.join(entry_name),
258 Some(dst_obj_type),
259 &dst_path,
260 )
261 .await?;
262 }
263 drop(dst_entries);
266 while let Some(res) = join_set.join_next().await {
267 match res? {
268 Ok(summary) => cmp_summary = cmp_summary + summary,
269 Err(error) => {
270 tracing::error!("cmp: {:?} vs {:?} failed with: {:#}", src, dst, &error);
271 if settings.fail_early {
272 return Err(error);
273 }
274 success = false;
275 }
276 }
277 }
278 if !success {
279 return Err(anyhow!("cmp: {:?} vs {:?} failed!", src, dst));
280 }
281 Ok(cmp_summary)
282}
283
284#[cfg(test)]
285mod cmp_tests {
286 use crate::copy;
287 use crate::preserve;
288 use crate::testutils;
289 use enum_map::enum_map;
290 use tracing_test::traced_test;
291
292 use super::*;
293
294 lazy_static! {
295 static ref PROGRESS: progress::Progress = progress::Progress::new();
296 static ref NO_PRESERVE_SETTINGS: preserve::Settings = preserve::preserve_default();
297 static ref DO_PRESERVE_SETTINGS: preserve::Settings = preserve::preserve_all();
298 }
299
300 async fn setup_test_dirs(preserve: bool) -> Result<std::path::PathBuf> {
301 let tmp_dir = testutils::setup_test_dir().await?;
302 let test_path = tmp_dir.as_path();
303 copy::copy(
304 &PROGRESS,
305 &test_path.join("foo"),
306 &test_path.join("bar"),
307 ©::Settings {
308 dereference: false,
309 fail_early: false,
310 overwrite: false,
311 overwrite_compare: filecmp::MetadataCmpSettings {
312 size: true,
313 mtime: true,
314 ..Default::default()
315 },
316 chunk_size: 0,
317 remote_copy_buffer_size: 0,
318 },
319 if preserve {
320 &DO_PRESERVE_SETTINGS
321 } else {
322 &NO_PRESERVE_SETTINGS
323 },
324 false,
325 )
326 .await?;
327 Ok(tmp_dir)
328 }
329
330 async fn truncate_file(path: &str) -> Result<()> {
331 let file = tokio::fs::File::create(path).await?;
332 file.set_len(0).await?;
333 Ok(())
334 }
335
336 #[tokio::test]
337 #[traced_test]
338 async fn check_basic_cmp() -> Result<()> {
339 let tmp_dir = setup_test_dirs(true).await?;
340 tokio::fs::remove_file(&tmp_dir.join("foo").join("bar").join("1.txt")).await?;
342 tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
344 truncate_file(
346 tmp_dir
347 .join("bar")
348 .join("baz")
349 .join("4.txt")
350 .to_str()
351 .unwrap(),
352 )
353 .await?;
354 tokio::fs::remove_file(&tmp_dir.join("bar").join("bar").join("2.txt")).await?;
356 tokio::fs::File::create(&tmp_dir.join("bar").join("baz").join("7.txt")).await?;
358 let compare_settings = Settings {
359 fail_early: false,
360 exit_early: false,
361 compare: enum_map! {
362 ObjType::File => filecmp::MetadataCmpSettings {
363 size: true,
364 mtime: true,
365 ..Default::default()
366 },
367 ObjType::Dir => filecmp::MetadataCmpSettings {
368 mtime: true,
369 ..Default::default()
370 },
371 ObjType::Symlink => filecmp::MetadataCmpSettings {
372 mtime: true,
373 ..Default::default()
374 },
375 ObjType::Other => filecmp::MetadataCmpSettings {
376 mtime: true,
377 ..Default::default()
378 },
379 },
380 };
381 let summary = cmp(
382 &PROGRESS,
383 &tmp_dir.join("foo"),
384 &tmp_dir.join("bar"),
385 &LogWriter::new(Some(tmp_dir.join("cmp.log").as_path())).await?,
386 &compare_settings,
387 )
388 .await?;
389 let mismatch: Mismatch = enum_map! {
390 ObjType::File => enum_map! {
391 CompareResult::Different => 1,
392 CompareResult::Same => 2,
393 CompareResult::SrcMissing => 2,
394 CompareResult::DstMissing => 1,
395 },
396 ObjType::Dir => enum_map! {
397 CompareResult::Different => 2,
398 CompareResult::Same => 1,
399 CompareResult::SrcMissing => 0,
400 CompareResult::DstMissing => 0,
401 },
402 ObjType::Symlink => enum_map! {
403 CompareResult::Different => 0,
404 CompareResult::Same => 2,
405 CompareResult::SrcMissing => 0,
406 CompareResult::DstMissing => 0,
407 },
408 ObjType::Other => enum_map! {
409 CompareResult::Different => 0,
410 CompareResult::Same => 0,
411 CompareResult::SrcMissing => 0,
412 CompareResult::DstMissing => 0,
413 },
414 };
415 assert_eq!(summary.mismatch, mismatch);
416 Ok(())
417 }
418}