1use std::collections::HashSet;
14use std::sync::atomic::{AtomicUsize, Ordering};
15
16use crate::codec::{decode_tree_node, is_tree_node};
17use crate::crypto::decrypt_chk;
18use crate::hashtree::{HashTree, HashTreeError};
19use crate::store::Store;
20use crate::types::{Cid, Hash};
21
22#[derive(Debug, Clone)]
24pub struct TreeDiff {
25 pub added: Vec<Hash>,
27 pub stats: DiffStats,
29}
30
31impl TreeDiff {
32 pub fn empty() -> Self {
34 Self {
35 added: Vec::new(),
36 stats: DiffStats::default(),
37 }
38 }
39
40 pub fn is_empty(&self) -> bool {
42 self.added.is_empty()
43 }
44
45 pub fn added_count(&self) -> usize {
47 self.added.len()
48 }
49}
50
51#[derive(Debug, Clone, Default)]
53pub struct DiffStats {
54 pub old_tree_nodes: usize,
56 pub new_tree_nodes: usize,
58 pub unchanged_subtrees: usize,
60}
61
62fn decrypt_if_keyed(data: Vec<u8>, key: Option<[u8; 32]>) -> Result<Vec<u8>, HashTreeError> {
63 if let Some(k) = key {
64 decrypt_chk(&data, &k).map_err(|e| HashTreeError::Decryption(e.to_string()))
65 } else {
66 Ok(data)
67 }
68}
69
70pub async fn collect_hashes<S: Store>(
83 tree: &HashTree<S>,
84 root: &Cid,
85 concurrency: usize,
86) -> Result<HashSet<Hash>, HashTreeError> {
87 collect_hashes_with_progress(tree, root, concurrency, None).await
88}
89
90pub async fn collect_hashes_with_progress<S: Store>(
92 tree: &HashTree<S>,
93 root: &Cid,
94 concurrency: usize,
95 progress: Option<&AtomicUsize>,
96) -> Result<HashSet<Hash>, HashTreeError> {
97 use futures::stream::{FuturesUnordered, StreamExt};
98 use std::collections::VecDeque;
99
100 let store = tree.get_store();
101 let mut hashes = HashSet::new();
102 let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
103 let mut active = FuturesUnordered::new();
104
105 pending.push_back((root.hash, root.key));
107
108 loop {
109 while active.len() < concurrency {
111 if let Some((hash, key)) = pending.pop_front() {
112 if hashes.contains(&hash) {
114 continue;
115 }
116 hashes.insert(hash);
117
118 let store = store.clone();
119 let fut = async move {
120 let data = store
121 .get(&hash)
122 .await
123 .map_err(|e| HashTreeError::Store(e.to_string()))?;
124 Ok::<_, HashTreeError>((hash, key, data))
125 };
126 active.push(fut);
127 } else {
128 break;
129 }
130 }
131
132 if active.is_empty() {
134 break;
135 }
136
137 if let Some(result) = active.next().await {
139 let (_hash, key, data) = result?;
140
141 if let Some(counter) = progress {
142 counter.fetch_add(1, Ordering::Relaxed);
143 }
144
145 let data = match data {
146 Some(d) => d,
147 None => continue,
148 };
149
150 let plaintext = decrypt_if_keyed(data, key)?;
152
153 if is_tree_node(&plaintext) {
155 if let Ok(node) = decode_tree_node(&plaintext) {
156 for link in node.links {
157 if !hashes.contains(&link.hash) {
158 pending.push_back((link.hash, link.key));
159 }
160 }
161 }
162 }
163 }
164 }
165
166 Ok(hashes)
167}
168
169pub async fn tree_diff<S: Store>(
189 tree: &HashTree<S>,
190 old_root: Option<&Cid>,
191 new_root: &Cid,
192 concurrency: usize,
193) -> Result<TreeDiff, HashTreeError> {
194 let old_hashes = if let Some(old) = old_root {
196 collect_hashes(tree, old, concurrency).await?
197 } else {
198 HashSet::new()
199 };
200
201 tree_diff_with_old_hashes(tree, &old_hashes, new_root, concurrency).await
202}
203
204pub async fn tree_diff_with_old_hashes<S: Store>(
208 tree: &HashTree<S>,
209 old_hashes: &HashSet<Hash>,
210 new_root: &Cid,
211 concurrency: usize,
212) -> Result<TreeDiff, HashTreeError> {
213 use futures::stream::{FuturesUnordered, StreamExt};
214 use std::collections::VecDeque;
215
216 let store = tree.get_store();
217 let mut added: Vec<Hash> = Vec::new();
218 let mut visited: HashSet<Hash> = HashSet::new();
219 let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
220 let mut active = FuturesUnordered::new();
221
222 let mut stats = DiffStats {
223 old_tree_nodes: old_hashes.len(),
224 new_tree_nodes: 0,
225 unchanged_subtrees: 0,
226 };
227
228 pending.push_back((new_root.hash, new_root.key));
230
231 loop {
232 while active.len() < concurrency {
234 if let Some((hash, key)) = pending.pop_front() {
235 if visited.contains(&hash) {
237 continue;
238 }
239 visited.insert(hash);
240
241 if old_hashes.contains(&hash) {
243 stats.unchanged_subtrees += 1;
244 continue;
245 }
246
247 added.push(hash);
249 stats.new_tree_nodes += 1;
250
251 let store = store.clone();
253 let fut = async move {
254 let data = store
255 .get(&hash)
256 .await
257 .map_err(|e| HashTreeError::Store(e.to_string()))?;
258 Ok::<_, HashTreeError>((hash, key, data))
259 };
260 active.push(fut);
261 } else {
262 break;
263 }
264 }
265
266 if active.is_empty() {
268 break;
269 }
270
271 if let Some(result) = active.next().await {
273 let (_hash, key, data) = result?;
274
275 let data = match data {
276 Some(d) => d,
277 None => continue,
278 };
279
280 let plaintext = decrypt_if_keyed(data, key)?;
282
283 if is_tree_node(&plaintext) {
285 if let Ok(node) = decode_tree_node(&plaintext) {
286 for link in node.links {
287 if !visited.contains(&link.hash) {
288 pending.push_back((link.hash, link.key));
289 }
290 }
291 }
292 }
293 }
294 }
295
296 Ok(TreeDiff { added, stats })
297}
298
299pub async fn tree_diff_streaming<S, F>(
304 tree: &HashTree<S>,
305 old_hashes: &HashSet<Hash>,
306 new_root: &Cid,
307 concurrency: usize,
308 mut callback: F,
309) -> Result<DiffStats, HashTreeError>
310where
311 S: Store,
312 F: FnMut(Hash) -> bool, {
314 use futures::stream::{FuturesUnordered, StreamExt};
315 use std::collections::VecDeque;
316
317 let store = tree.get_store();
318 let mut visited: HashSet<Hash> = HashSet::new();
319 let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
320 let mut active = FuturesUnordered::new();
321
322 let mut stats = DiffStats {
323 old_tree_nodes: old_hashes.len(),
324 new_tree_nodes: 0,
325 unchanged_subtrees: 0,
326 };
327
328 pending.push_back((new_root.hash, new_root.key));
329
330 loop {
331 while active.len() < concurrency {
332 if let Some((hash, key)) = pending.pop_front() {
333 if visited.contains(&hash) {
334 continue;
335 }
336 visited.insert(hash);
337
338 if old_hashes.contains(&hash) {
339 stats.unchanged_subtrees += 1;
340 continue;
341 }
342
343 stats.new_tree_nodes += 1;
344
345 if !callback(hash) {
347 return Ok(stats);
349 }
350
351 let store = store.clone();
352 let fut = async move {
353 let data = store
354 .get(&hash)
355 .await
356 .map_err(|e| HashTreeError::Store(e.to_string()))?;
357 Ok::<_, HashTreeError>((hash, key, data))
358 };
359 active.push(fut);
360 } else {
361 break;
362 }
363 }
364
365 if active.is_empty() {
366 break;
367 }
368
369 if let Some(result) = active.next().await {
370 let (_hash, key, data) = result?;
371
372 let data = match data {
373 Some(d) => d,
374 None => continue,
375 };
376
377 let plaintext = decrypt_if_keyed(data, key)?;
378
379 if is_tree_node(&plaintext) {
380 if let Ok(node) = decode_tree_node(&plaintext) {
381 for link in node.links {
382 if !visited.contains(&link.hash) {
383 pending.push_back((link.hash, link.key));
384 }
385 }
386 }
387 }
388 }
389 }
390
391 Ok(stats)
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397 use crate::store::MemoryStore;
398 use crate::types::{DirEntry, LinkType};
399 use crate::HashTreeConfig;
400 use std::sync::Arc;
401
402 fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
403 let store = Arc::new(MemoryStore::new());
404 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
405 (store, tree)
406 }
407
408 fn make_encrypted_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
409 let store = Arc::new(MemoryStore::new());
410 let tree = HashTree::new(HashTreeConfig::new(store.clone()));
411 (store, tree)
412 }
413
414 #[tokio::test]
415 async fn test_diff_identical_trees() {
416 let (_store, tree) = make_tree();
417
418 let file1 = tree.put_blob(b"content1").await.unwrap();
420 let file2 = tree.put_blob(b"content2").await.unwrap();
421 let dir_cid = tree
422 .put_directory(vec![
423 DirEntry::new("a.txt", file1).with_size(8),
424 DirEntry::new("b.txt", file2).with_size(8),
425 ])
426 .await
427 .unwrap();
428
429 let diff = tree_diff(&tree, Some(&dir_cid), &dir_cid, 4).await.unwrap();
431
432 assert!(diff.is_empty(), "identical trees should have empty diff");
433 assert_eq!(diff.added_count(), 0);
434 }
435
436 #[tokio::test]
437 async fn test_diff_single_file_change() {
438 let (_store, tree) = make_tree();
439
440 let file1 = tree.put_blob(b"content1").await.unwrap();
442 let file2 = tree.put_blob(b"content2").await.unwrap();
443 let old_dir = tree
444 .put_directory(vec![
445 DirEntry::new("a.txt", file1).with_size(8),
446 DirEntry::new("b.txt", file2).with_size(8),
447 ])
448 .await
449 .unwrap();
450
451 let file1_new = tree.put_blob(b"content1-modified").await.unwrap();
453 let new_dir = tree
454 .put_directory(vec![
455 DirEntry::new("a.txt", file1_new).with_size(17),
456 DirEntry::new("b.txt", file2).with_size(8), ])
458 .await
459 .unwrap();
460
461 let diff = tree_diff(&tree, Some(&old_dir), &new_dir, 4).await.unwrap();
462
463 assert!(!diff.is_empty());
465 assert_eq!(diff.added_count(), 2); assert!(diff.added.contains(&file1_new));
467 assert!(diff.added.contains(&new_dir.hash));
468 assert!(!diff.added.contains(&file2)); }
470
471 #[tokio::test]
472 async fn test_diff_subtree_unchanged() {
473 let (_store, tree) = make_tree();
474
475 let sub_file = tree.put_blob(b"sub content").await.unwrap();
477 let subdir = tree
478 .put_directory(vec![DirEntry::new("sub.txt", sub_file).with_size(11)])
479 .await
480 .unwrap();
481
482 let file1 = tree.put_blob(b"root file").await.unwrap();
484 let old_root = tree
485 .put_directory(vec![
486 DirEntry::new("file.txt", file1).with_size(9),
487 DirEntry::new("subdir", subdir.hash)
488 .with_size(0)
489 .with_link_type(LinkType::Dir),
490 ])
491 .await
492 .unwrap();
493
494 let file1_new = tree.put_blob(b"root file changed").await.unwrap();
496 let new_root = tree
497 .put_directory(vec![
498 DirEntry::new("file.txt", file1_new).with_size(17),
499 DirEntry::new("subdir", subdir.hash)
500 .with_size(0)
501 .with_link_type(LinkType::Dir),
502 ])
503 .await
504 .unwrap();
505
506 let diff = tree_diff(&tree, Some(&old_root), &new_root, 4)
507 .await
508 .unwrap();
509
510 assert!(diff.added.contains(&new_root.hash));
512 assert!(diff.added.contains(&file1_new));
513 assert!(!diff.added.contains(&subdir.hash)); assert!(!diff.added.contains(&sub_file)); assert!(diff.stats.unchanged_subtrees > 0);
518 }
519
520 #[tokio::test]
521 async fn test_diff_new_directory() {
522 let (_store, tree) = make_tree();
523
524 let file1 = tree.put_blob(b"file1").await.unwrap();
526 let old_root = tree
527 .put_directory(vec![DirEntry::new("a.txt", file1).with_size(5)])
528 .await
529 .unwrap();
530
531 let new_file = tree.put_blob(b"new file").await.unwrap();
533 let new_dir = tree
534 .put_directory(vec![DirEntry::new("inner.txt", new_file).with_size(8)])
535 .await
536 .unwrap();
537 let new_root = tree
538 .put_directory(vec![
539 DirEntry::new("a.txt", file1).with_size(5),
540 DirEntry::new("newdir", new_dir.hash)
541 .with_size(0)
542 .with_link_type(LinkType::Dir),
543 ])
544 .await
545 .unwrap();
546
547 let diff = tree_diff(&tree, Some(&old_root), &new_root, 4)
548 .await
549 .unwrap();
550
551 assert!(diff.added.contains(&new_root.hash));
553 assert!(diff.added.contains(&new_dir.hash));
554 assert!(diff.added.contains(&new_file));
555 assert!(!diff.added.contains(&file1));
557 }
558
559 #[tokio::test]
560 async fn test_diff_empty_old_tree() {
561 let (_store, tree) = make_tree();
562
563 let file1 = tree.put_blob(b"content").await.unwrap();
565 let new_root = tree
566 .put_directory(vec![DirEntry::new("file.txt", file1).with_size(7)])
567 .await
568 .unwrap();
569
570 let diff = tree_diff(&tree, None, &new_root, 4).await.unwrap();
572
573 assert_eq!(diff.added_count(), 2); assert!(diff.added.contains(&new_root.hash));
576 assert!(diff.added.contains(&file1));
577 }
578
579 #[tokio::test]
580 async fn test_diff_encrypted_trees() {
581 let (_store, tree) = make_encrypted_tree();
582
583 let (file1_cid, _) = tree.put(b"content1").await.unwrap();
585 let (file2_cid, _) = tree.put(b"content2").await.unwrap();
586 let old_dir = tree
587 .put_directory(vec![
588 DirEntry::from_cid("a.txt", &file1_cid).with_size(8),
589 DirEntry::from_cid("b.txt", &file2_cid).with_size(8),
590 ])
591 .await
592 .unwrap();
593
594 let (file1_new_cid, _) = tree.put(b"content1-modified").await.unwrap();
596 let new_dir = tree
597 .put_directory(vec![
598 DirEntry::from_cid("a.txt", &file1_new_cid).with_size(17),
599 DirEntry::from_cid("b.txt", &file2_cid).with_size(8),
600 ])
601 .await
602 .unwrap();
603
604 let diff = tree_diff(&tree, Some(&old_dir), &new_dir, 4).await.unwrap();
605
606 assert!(!diff.is_empty());
608 assert!(diff.added.contains(&file1_new_cid.hash));
609 assert!(!diff.added.contains(&file2_cid.hash)); }
611
612 #[tokio::test]
613 async fn test_collect_hashes() {
614 let (_store, tree) = make_tree();
615
616 let file1 = tree.put_blob(b"content1").await.unwrap();
617 let file2 = tree.put_blob(b"content2").await.unwrap();
618 let dir_cid = tree
619 .put_directory(vec![
620 DirEntry::new("a.txt", file1).with_size(8),
621 DirEntry::new("b.txt", file2).with_size(8),
622 ])
623 .await
624 .unwrap();
625
626 let hashes = collect_hashes(&tree, &dir_cid, 4).await.unwrap();
627
628 assert_eq!(hashes.len(), 3); assert!(hashes.contains(&dir_cid.hash));
630 assert!(hashes.contains(&file1));
631 assert!(hashes.contains(&file2));
632 }
633
634 #[tokio::test]
635 async fn test_diff_streaming() {
636 let (_store, tree) = make_tree();
637
638 let file1 = tree.put_blob(b"old").await.unwrap();
639 let old_root = tree
640 .put_directory(vec![DirEntry::new("a.txt", file1).with_size(3)])
641 .await
642 .unwrap();
643
644 let file2 = tree.put_blob(b"new").await.unwrap();
645 let new_root = tree
646 .put_directory(vec![DirEntry::new("a.txt", file2).with_size(3)])
647 .await
648 .unwrap();
649
650 let old_hashes = collect_hashes(&tree, &old_root, 4).await.unwrap();
651
652 let mut streamed: Vec<Hash> = Vec::new();
653 let stats = tree_diff_streaming(&tree, &old_hashes, &new_root, 4, |hash| {
654 streamed.push(hash);
655 true })
657 .await
658 .unwrap();
659
660 assert_eq!(streamed.len(), 2); assert!(streamed.contains(&new_root.hash));
662 assert!(streamed.contains(&file2));
663 assert_eq!(stats.new_tree_nodes, 2);
664 }
665
666 #[tokio::test]
667 async fn test_diff_streaming_early_stop() {
668 let (_store, tree) = make_tree();
669
670 let file1 = tree.put_blob(b"f1").await.unwrap();
671 let file2 = tree.put_blob(b"f2").await.unwrap();
672 let file3 = tree.put_blob(b"f3").await.unwrap();
673 let new_root = tree
674 .put_directory(vec![
675 DirEntry::new("a.txt", file1).with_size(2),
676 DirEntry::new("b.txt", file2).with_size(2),
677 DirEntry::new("c.txt", file3).with_size(2),
678 ])
679 .await
680 .unwrap();
681
682 let old_hashes = HashSet::new(); let mut count = 0;
685 let _stats = tree_diff_streaming(&tree, &old_hashes, &new_root, 1, |_hash| {
686 count += 1;
687 count < 2 })
689 .await
690 .unwrap();
691
692 assert!(count <= 2, "should have stopped early");
693 }
694
695 #[tokio::test]
696 async fn test_diff_large_tree_structure() {
697 let (_store, tree) = make_tree();
698
699 let mut entries = Vec::new();
701 let mut old_hashes_vec = Vec::new();
702
703 for i in 0..100 {
704 let data = format!("content {}", i);
705 let hash = tree.put_blob(data.as_bytes()).await.unwrap();
706 entries
707 .push(DirEntry::new(format!("file{}.txt", i), hash).with_size(data.len() as u64));
708 old_hashes_vec.push(hash);
709 }
710
711 let old_root = tree.put_directory(entries.clone()).await.unwrap();
712 old_hashes_vec.push(old_root.hash);
713
714 for i in 0..5 {
716 let data = format!("modified content {}", i);
717 let hash = tree.put_blob(data.as_bytes()).await.unwrap();
718 entries[i] = DirEntry::new(format!("file{}.txt", i), hash).with_size(data.len() as u64);
719 }
720
721 let new_root = tree.put_directory(entries).await.unwrap();
722
723 let diff = tree_diff(&tree, Some(&old_root), &new_root, 8)
724 .await
725 .unwrap();
726
727 assert_eq!(diff.added_count(), 6);
729 assert!(diff.added.contains(&new_root.hash));
730
731 assert!(diff.stats.unchanged_subtrees >= 95);
733 }
734}