1use std::collections::HashSet;
14use std::sync::atomic::{AtomicUsize, Ordering};
15
16use crate::codec::{decode_tree_node, is_tree_node};
17use crate::crypto::decrypt_chk;
18use crate::hashtree::{HashTree, HashTreeError};
19use crate::store::Store;
20use crate::types::{Cid, Hash};
21
22#[derive(Debug, Clone)]
24pub struct TreeDiff {
25 pub added: Vec<Hash>,
27 pub stats: DiffStats,
29}
30
31impl TreeDiff {
32 pub fn empty() -> Self {
34 Self {
35 added: Vec::new(),
36 stats: DiffStats::default(),
37 }
38 }
39
40 pub fn is_empty(&self) -> bool {
42 self.added.is_empty()
43 }
44
45 pub fn added_count(&self) -> usize {
47 self.added.len()
48 }
49}
50
51#[derive(Debug, Clone, Default)]
53pub struct DiffStats {
54 pub old_tree_nodes: usize,
56 pub new_tree_nodes: usize,
58 pub unchanged_subtrees: usize,
60}
61
62pub async fn collect_hashes<S: Store>(
75 tree: &HashTree<S>,
76 root: &Cid,
77 concurrency: usize,
78) -> Result<HashSet<Hash>, HashTreeError> {
79 collect_hashes_with_progress(tree, root, concurrency, None).await
80}
81
82pub async fn collect_hashes_with_progress<S: Store>(
84 tree: &HashTree<S>,
85 root: &Cid,
86 concurrency: usize,
87 progress: Option<&AtomicUsize>,
88) -> Result<HashSet<Hash>, HashTreeError> {
89 use futures::stream::{FuturesUnordered, StreamExt};
90 use std::collections::VecDeque;
91
92 let store = tree.get_store();
93 let mut hashes = HashSet::new();
94 let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
95 let mut active = FuturesUnordered::new();
96
97 pending.push_back((root.hash, root.key));
99
100 loop {
101 while active.len() < concurrency {
103 if let Some((hash, key)) = pending.pop_front() {
104 if hashes.contains(&hash) {
106 continue;
107 }
108 hashes.insert(hash);
109
110 let store = store.clone();
111 let fut = async move {
112 let data = store
113 .get(&hash)
114 .await
115 .map_err(|e| HashTreeError::Store(e.to_string()))?;
116 Ok::<_, HashTreeError>((hash, key, data))
117 };
118 active.push(fut);
119 } else {
120 break;
121 }
122 }
123
124 if active.is_empty() {
126 break;
127 }
128
129 if let Some(result) = active.next().await {
131 let (_hash, key, data) = result?;
132
133 if let Some(counter) = progress {
134 counter.fetch_add(1, Ordering::Relaxed);
135 }
136
137 let data = match data {
138 Some(d) => d,
139 None => continue,
140 };
141
142 let plaintext = if let Some(k) = key {
144 decrypt_chk(&data, &k).unwrap_or(data)
145 } else {
146 data
147 };
148
149 if is_tree_node(&plaintext) {
151 if let Ok(node) = decode_tree_node(&plaintext) {
152 for link in node.links {
153 if !hashes.contains(&link.hash) {
154 pending.push_back((link.hash, link.key));
155 }
156 }
157 }
158 }
159 }
160 }
161
162 Ok(hashes)
163}
164
165pub async fn tree_diff<S: Store>(
185 tree: &HashTree<S>,
186 old_root: Option<&Cid>,
187 new_root: &Cid,
188 concurrency: usize,
189) -> Result<TreeDiff, HashTreeError> {
190 let old_hashes = if let Some(old) = old_root {
192 collect_hashes(tree, old, concurrency).await?
193 } else {
194 HashSet::new()
195 };
196
197 tree_diff_with_old_hashes(tree, &old_hashes, new_root, concurrency).await
198}
199
200pub async fn tree_diff_with_old_hashes<S: Store>(
204 tree: &HashTree<S>,
205 old_hashes: &HashSet<Hash>,
206 new_root: &Cid,
207 concurrency: usize,
208) -> Result<TreeDiff, HashTreeError> {
209 use futures::stream::{FuturesUnordered, StreamExt};
210 use std::collections::VecDeque;
211
212 let store = tree.get_store();
213 let mut added: Vec<Hash> = Vec::new();
214 let mut visited: HashSet<Hash> = HashSet::new();
215 let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
216 let mut active = FuturesUnordered::new();
217
218 let mut stats = DiffStats {
219 old_tree_nodes: old_hashes.len(),
220 new_tree_nodes: 0,
221 unchanged_subtrees: 0,
222 };
223
224 pending.push_back((new_root.hash, new_root.key));
226
227 loop {
228 while active.len() < concurrency {
230 if let Some((hash, key)) = pending.pop_front() {
231 if visited.contains(&hash) {
233 continue;
234 }
235 visited.insert(hash);
236
237 if old_hashes.contains(&hash) {
239 stats.unchanged_subtrees += 1;
240 continue;
241 }
242
243 added.push(hash);
245 stats.new_tree_nodes += 1;
246
247 let store = store.clone();
249 let fut = async move {
250 let data = store
251 .get(&hash)
252 .await
253 .map_err(|e| HashTreeError::Store(e.to_string()))?;
254 Ok::<_, HashTreeError>((hash, key, data))
255 };
256 active.push(fut);
257 } else {
258 break;
259 }
260 }
261
262 if active.is_empty() {
264 break;
265 }
266
267 if let Some(result) = active.next().await {
269 let (_hash, key, data) = result?;
270
271 let data = match data {
272 Some(d) => d,
273 None => continue,
274 };
275
276 let plaintext = if let Some(k) = key {
278 decrypt_chk(&data, &k).unwrap_or(data)
279 } else {
280 data
281 };
282
283 if is_tree_node(&plaintext) {
285 if let Ok(node) = decode_tree_node(&plaintext) {
286 for link in node.links {
287 if !visited.contains(&link.hash) {
288 pending.push_back((link.hash, link.key));
289 }
290 }
291 }
292 }
293 }
294 }
295
296 Ok(TreeDiff { added, stats })
297}
298
299pub async fn tree_diff_streaming<S, F>(
304 tree: &HashTree<S>,
305 old_hashes: &HashSet<Hash>,
306 new_root: &Cid,
307 concurrency: usize,
308 mut callback: F,
309) -> Result<DiffStats, HashTreeError>
310where
311 S: Store,
312 F: FnMut(Hash) -> bool, {
314 use futures::stream::{FuturesUnordered, StreamExt};
315 use std::collections::VecDeque;
316
317 let store = tree.get_store();
318 let mut visited: HashSet<Hash> = HashSet::new();
319 let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
320 let mut active = FuturesUnordered::new();
321
322 let mut stats = DiffStats {
323 old_tree_nodes: old_hashes.len(),
324 new_tree_nodes: 0,
325 unchanged_subtrees: 0,
326 };
327
328 pending.push_back((new_root.hash, new_root.key));
329
330 loop {
331 while active.len() < concurrency {
332 if let Some((hash, key)) = pending.pop_front() {
333 if visited.contains(&hash) {
334 continue;
335 }
336 visited.insert(hash);
337
338 if old_hashes.contains(&hash) {
339 stats.unchanged_subtrees += 1;
340 continue;
341 }
342
343 stats.new_tree_nodes += 1;
344
345 if !callback(hash) {
347 return Ok(stats);
349 }
350
351 let store = store.clone();
352 let fut = async move {
353 let data = store
354 .get(&hash)
355 .await
356 .map_err(|e| HashTreeError::Store(e.to_string()))?;
357 Ok::<_, HashTreeError>((hash, key, data))
358 };
359 active.push(fut);
360 } else {
361 break;
362 }
363 }
364
365 if active.is_empty() {
366 break;
367 }
368
369 if let Some(result) = active.next().await {
370 let (_hash, key, data) = result?;
371
372 let data = match data {
373 Some(d) => d,
374 None => continue,
375 };
376
377 let plaintext = if let Some(k) = key {
378 decrypt_chk(&data, &k).unwrap_or(data)
379 } else {
380 data
381 };
382
383 if is_tree_node(&plaintext) {
384 if let Ok(node) = decode_tree_node(&plaintext) {
385 for link in node.links {
386 if !visited.contains(&link.hash) {
387 pending.push_back((link.hash, link.key));
388 }
389 }
390 }
391 }
392 }
393 }
394
395 Ok(stats)
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use crate::store::MemoryStore;
402 use crate::types::{DirEntry, LinkType};
403 use crate::HashTreeConfig;
404 use std::sync::Arc;
405
406 fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
407 let store = Arc::new(MemoryStore::new());
408 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
409 (store, tree)
410 }
411
412 fn make_encrypted_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
413 let store = Arc::new(MemoryStore::new());
414 let tree = HashTree::new(HashTreeConfig::new(store.clone()));
415 (store, tree)
416 }
417
418 #[tokio::test]
419 async fn test_diff_identical_trees() {
420 let (_store, tree) = make_tree();
421
422 let file1 = tree.put_blob(b"content1").await.unwrap();
424 let file2 = tree.put_blob(b"content2").await.unwrap();
425 let dir_cid = tree
426 .put_directory(vec![
427 DirEntry::new("a.txt", file1).with_size(8),
428 DirEntry::new("b.txt", file2).with_size(8),
429 ])
430 .await
431 .unwrap();
432
433 let diff = tree_diff(&tree, Some(&dir_cid), &dir_cid, 4).await.unwrap();
435
436 assert!(diff.is_empty(), "identical trees should have empty diff");
437 assert_eq!(diff.added_count(), 0);
438 }
439
440 #[tokio::test]
441 async fn test_diff_single_file_change() {
442 let (_store, tree) = make_tree();
443
444 let file1 = tree.put_blob(b"content1").await.unwrap();
446 let file2 = tree.put_blob(b"content2").await.unwrap();
447 let old_dir = tree
448 .put_directory(vec![
449 DirEntry::new("a.txt", file1).with_size(8),
450 DirEntry::new("b.txt", file2).with_size(8),
451 ])
452 .await
453 .unwrap();
454
455 let file1_new = tree.put_blob(b"content1-modified").await.unwrap();
457 let new_dir = tree
458 .put_directory(vec![
459 DirEntry::new("a.txt", file1_new).with_size(17),
460 DirEntry::new("b.txt", file2).with_size(8), ])
462 .await
463 .unwrap();
464
465 let diff = tree_diff(&tree, Some(&old_dir), &new_dir, 4).await.unwrap();
466
467 assert!(!diff.is_empty());
469 assert_eq!(diff.added_count(), 2); assert!(diff.added.contains(&file1_new));
471 assert!(diff.added.contains(&new_dir.hash));
472 assert!(!diff.added.contains(&file2)); }
474
475 #[tokio::test]
476 async fn test_diff_subtree_unchanged() {
477 let (_store, tree) = make_tree();
478
479 let sub_file = tree.put_blob(b"sub content").await.unwrap();
481 let subdir = tree
482 .put_directory(vec![DirEntry::new("sub.txt", sub_file).with_size(11)])
483 .await
484 .unwrap();
485
486 let file1 = tree.put_blob(b"root file").await.unwrap();
488 let old_root = tree
489 .put_directory(vec![
490 DirEntry::new("file.txt", file1).with_size(9),
491 DirEntry::new("subdir", subdir.hash)
492 .with_size(0)
493 .with_link_type(LinkType::Dir),
494 ])
495 .await
496 .unwrap();
497
498 let file1_new = tree.put_blob(b"root file changed").await.unwrap();
500 let new_root = tree
501 .put_directory(vec![
502 DirEntry::new("file.txt", file1_new).with_size(17),
503 DirEntry::new("subdir", subdir.hash)
504 .with_size(0)
505 .with_link_type(LinkType::Dir),
506 ])
507 .await
508 .unwrap();
509
510 let diff = tree_diff(&tree, Some(&old_root), &new_root, 4)
511 .await
512 .unwrap();
513
514 assert!(diff.added.contains(&new_root.hash));
516 assert!(diff.added.contains(&file1_new));
517 assert!(!diff.added.contains(&subdir.hash)); assert!(!diff.added.contains(&sub_file)); assert!(diff.stats.unchanged_subtrees > 0);
522 }
523
524 #[tokio::test]
525 async fn test_diff_new_directory() {
526 let (_store, tree) = make_tree();
527
528 let file1 = tree.put_blob(b"file1").await.unwrap();
530 let old_root = tree
531 .put_directory(vec![DirEntry::new("a.txt", file1).with_size(5)])
532 .await
533 .unwrap();
534
535 let new_file = tree.put_blob(b"new file").await.unwrap();
537 let new_dir = tree
538 .put_directory(vec![DirEntry::new("inner.txt", new_file).with_size(8)])
539 .await
540 .unwrap();
541 let new_root = tree
542 .put_directory(vec![
543 DirEntry::new("a.txt", file1).with_size(5),
544 DirEntry::new("newdir", new_dir.hash)
545 .with_size(0)
546 .with_link_type(LinkType::Dir),
547 ])
548 .await
549 .unwrap();
550
551 let diff = tree_diff(&tree, Some(&old_root), &new_root, 4)
552 .await
553 .unwrap();
554
555 assert!(diff.added.contains(&new_root.hash));
557 assert!(diff.added.contains(&new_dir.hash));
558 assert!(diff.added.contains(&new_file));
559 assert!(!diff.added.contains(&file1));
561 }
562
563 #[tokio::test]
564 async fn test_diff_empty_old_tree() {
565 let (_store, tree) = make_tree();
566
567 let file1 = tree.put_blob(b"content").await.unwrap();
569 let new_root = tree
570 .put_directory(vec![DirEntry::new("file.txt", file1).with_size(7)])
571 .await
572 .unwrap();
573
574 let diff = tree_diff(&tree, None, &new_root, 4).await.unwrap();
576
577 assert_eq!(diff.added_count(), 2); assert!(diff.added.contains(&new_root.hash));
580 assert!(diff.added.contains(&file1));
581 }
582
583 #[tokio::test]
584 async fn test_diff_encrypted_trees() {
585 let (_store, tree) = make_encrypted_tree();
586
587 let (file1_cid, _) = tree.put(b"content1").await.unwrap();
589 let (file2_cid, _) = tree.put(b"content2").await.unwrap();
590 let old_dir = tree
591 .put_directory(vec![
592 DirEntry::from_cid("a.txt", &file1_cid).with_size(8),
593 DirEntry::from_cid("b.txt", &file2_cid).with_size(8),
594 ])
595 .await
596 .unwrap();
597
598 let (file1_new_cid, _) = tree.put(b"content1-modified").await.unwrap();
600 let new_dir = tree
601 .put_directory(vec![
602 DirEntry::from_cid("a.txt", &file1_new_cid).with_size(17),
603 DirEntry::from_cid("b.txt", &file2_cid).with_size(8),
604 ])
605 .await
606 .unwrap();
607
608 let diff = tree_diff(&tree, Some(&old_dir), &new_dir, 4).await.unwrap();
609
610 assert!(!diff.is_empty());
612 assert!(diff.added.contains(&file1_new_cid.hash));
613 assert!(!diff.added.contains(&file2_cid.hash)); }
615
616 #[tokio::test]
617 async fn test_collect_hashes() {
618 let (_store, tree) = make_tree();
619
620 let file1 = tree.put_blob(b"content1").await.unwrap();
621 let file2 = tree.put_blob(b"content2").await.unwrap();
622 let dir_cid = tree
623 .put_directory(vec![
624 DirEntry::new("a.txt", file1).with_size(8),
625 DirEntry::new("b.txt", file2).with_size(8),
626 ])
627 .await
628 .unwrap();
629
630 let hashes = collect_hashes(&tree, &dir_cid, 4).await.unwrap();
631
632 assert_eq!(hashes.len(), 3); assert!(hashes.contains(&dir_cid.hash));
634 assert!(hashes.contains(&file1));
635 assert!(hashes.contains(&file2));
636 }
637
638 #[tokio::test]
639 async fn test_diff_streaming() {
640 let (_store, tree) = make_tree();
641
642 let file1 = tree.put_blob(b"old").await.unwrap();
643 let old_root = tree
644 .put_directory(vec![DirEntry::new("a.txt", file1).with_size(3)])
645 .await
646 .unwrap();
647
648 let file2 = tree.put_blob(b"new").await.unwrap();
649 let new_root = tree
650 .put_directory(vec![DirEntry::new("a.txt", file2).with_size(3)])
651 .await
652 .unwrap();
653
654 let old_hashes = collect_hashes(&tree, &old_root, 4).await.unwrap();
655
656 let mut streamed: Vec<Hash> = Vec::new();
657 let stats = tree_diff_streaming(&tree, &old_hashes, &new_root, 4, |hash| {
658 streamed.push(hash);
659 true })
661 .await
662 .unwrap();
663
664 assert_eq!(streamed.len(), 2); assert!(streamed.contains(&new_root.hash));
666 assert!(streamed.contains(&file2));
667 assert_eq!(stats.new_tree_nodes, 2);
668 }
669
670 #[tokio::test]
671 async fn test_diff_streaming_early_stop() {
672 let (_store, tree) = make_tree();
673
674 let file1 = tree.put_blob(b"f1").await.unwrap();
675 let file2 = tree.put_blob(b"f2").await.unwrap();
676 let file3 = tree.put_blob(b"f3").await.unwrap();
677 let new_root = tree
678 .put_directory(vec![
679 DirEntry::new("a.txt", file1).with_size(2),
680 DirEntry::new("b.txt", file2).with_size(2),
681 DirEntry::new("c.txt", file3).with_size(2),
682 ])
683 .await
684 .unwrap();
685
686 let old_hashes = HashSet::new(); let mut count = 0;
689 let _stats = tree_diff_streaming(&tree, &old_hashes, &new_root, 1, |_hash| {
690 count += 1;
691 count < 2 })
693 .await
694 .unwrap();
695
696 assert!(count <= 2, "should have stopped early");
697 }
698
699 #[tokio::test]
700 async fn test_diff_large_tree_structure() {
701 let (_store, tree) = make_tree();
702
703 let mut entries = Vec::new();
705 let mut old_hashes_vec = Vec::new();
706
707 for i in 0..100 {
708 let data = format!("content {}", i);
709 let hash = tree.put_blob(data.as_bytes()).await.unwrap();
710 entries
711 .push(DirEntry::new(format!("file{}.txt", i), hash).with_size(data.len() as u64));
712 old_hashes_vec.push(hash);
713 }
714
715 let old_root = tree.put_directory(entries.clone()).await.unwrap();
716 old_hashes_vec.push(old_root.hash);
717
718 for i in 0..5 {
720 let data = format!("modified content {}", i);
721 let hash = tree.put_blob(data.as_bytes()).await.unwrap();
722 entries[i] = DirEntry::new(format!("file{}.txt", i), hash).with_size(data.len() as u64);
723 }
724
725 let new_root = tree.put_directory(entries).await.unwrap();
726
727 let diff = tree_diff(&tree, Some(&old_root), &new_root, 8)
728 .await
729 .unwrap();
730
731 assert_eq!(diff.added_count(), 6);
733 assert!(diff.added.contains(&new_root.hash));
734
735 assert!(diff.stats.unchanged_subtrees >= 95);
737 }
738}