1use std::collections::HashSet;
14use std::sync::atomic::{AtomicUsize, Ordering};
15
16use crate::codec::{decode_tree_node, is_tree_node};
17use crate::crypto::decrypt_chk;
18use crate::hashtree::{HashTree, HashTreeError};
19use crate::store::Store;
20use crate::types::{Cid, Hash};
21
22#[derive(Debug, Clone)]
24pub struct TreeDiff {
25 pub added: Vec<Hash>,
27 pub stats: DiffStats,
29}
30
31impl TreeDiff {
32 pub fn empty() -> Self {
34 Self {
35 added: Vec::new(),
36 stats: DiffStats::default(),
37 }
38 }
39
40 pub fn is_empty(&self) -> bool {
42 self.added.is_empty()
43 }
44
45 pub fn added_count(&self) -> usize {
47 self.added.len()
48 }
49}
50
51#[derive(Debug, Clone, Default)]
53pub struct DiffStats {
54 pub old_tree_nodes: usize,
56 pub new_tree_nodes: usize,
58 pub unchanged_subtrees: usize,
60}
61
62pub async fn collect_hashes<S: Store>(
75 tree: &HashTree<S>,
76 root: &Cid,
77 concurrency: usize,
78) -> Result<HashSet<Hash>, HashTreeError> {
79 collect_hashes_with_progress(tree, root, concurrency, None).await
80}
81
82pub async fn collect_hashes_with_progress<S: Store>(
84 tree: &HashTree<S>,
85 root: &Cid,
86 concurrency: usize,
87 progress: Option<&AtomicUsize>,
88) -> Result<HashSet<Hash>, HashTreeError> {
89 use futures::stream::{FuturesUnordered, StreamExt};
90 use std::collections::VecDeque;
91
92 let store = tree.get_store();
93 let mut hashes = HashSet::new();
94 let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
95 let mut active = FuturesUnordered::new();
96
97 pending.push_back((root.hash, root.key));
99
100 loop {
101 while active.len() < concurrency {
103 if let Some((hash, key)) = pending.pop_front() {
104 if hashes.contains(&hash) {
106 continue;
107 }
108 hashes.insert(hash);
109
110 let store = store.clone();
111 let fut = async move {
112 let data = store
113 .get(&hash)
114 .await
115 .map_err(|e| HashTreeError::Store(e.to_string()))?;
116 Ok::<_, HashTreeError>((hash, key, data))
117 };
118 active.push(fut);
119 } else {
120 break;
121 }
122 }
123
124 if active.is_empty() {
126 break;
127 }
128
129 if let Some(result) = active.next().await {
131 let (_hash, key, data) = result?;
132
133 if let Some(counter) = progress {
134 counter.fetch_add(1, Ordering::Relaxed);
135 }
136
137 let data = match data {
138 Some(d) => d,
139 None => continue,
140 };
141
142 let plaintext = if let Some(k) = key {
144 decrypt_chk(&data, &k).unwrap_or(data)
145 } else {
146 data
147 };
148
149 if is_tree_node(&plaintext) {
151 if let Ok(node) = decode_tree_node(&plaintext) {
152 for link in node.links {
153 if !hashes.contains(&link.hash) {
154 pending.push_back((link.hash, link.key));
155 }
156 }
157 }
158 }
159 }
160 }
161
162 Ok(hashes)
163}
164
165pub async fn tree_diff<S: Store>(
185 tree: &HashTree<S>,
186 old_root: Option<&Cid>,
187 new_root: &Cid,
188 concurrency: usize,
189) -> Result<TreeDiff, HashTreeError> {
190 let old_hashes = if let Some(old) = old_root {
192 collect_hashes(tree, old, concurrency).await?
193 } else {
194 HashSet::new()
195 };
196
197 tree_diff_with_old_hashes(tree, &old_hashes, new_root, concurrency).await
198}
199
200pub async fn tree_diff_with_old_hashes<S: Store>(
204 tree: &HashTree<S>,
205 old_hashes: &HashSet<Hash>,
206 new_root: &Cid,
207 concurrency: usize,
208) -> Result<TreeDiff, HashTreeError> {
209 use futures::stream::{FuturesUnordered, StreamExt};
210 use std::collections::VecDeque;
211
212 let store = tree.get_store();
213 let mut added: Vec<Hash> = Vec::new();
214 let mut visited: HashSet<Hash> = HashSet::new();
215 let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
216 let mut active = FuturesUnordered::new();
217
218 let mut stats = DiffStats {
219 old_tree_nodes: old_hashes.len(),
220 new_tree_nodes: 0,
221 unchanged_subtrees: 0,
222 };
223
224 pending.push_back((new_root.hash, new_root.key));
226
227 loop {
228 while active.len() < concurrency {
230 if let Some((hash, key)) = pending.pop_front() {
231 if visited.contains(&hash) {
233 continue;
234 }
235 visited.insert(hash);
236
237 if old_hashes.contains(&hash) {
239 stats.unchanged_subtrees += 1;
240 continue;
241 }
242
243 added.push(hash);
245 stats.new_tree_nodes += 1;
246
247 let store = store.clone();
249 let fut = async move {
250 let data = store
251 .get(&hash)
252 .await
253 .map_err(|e| HashTreeError::Store(e.to_string()))?;
254 Ok::<_, HashTreeError>((hash, key, data))
255 };
256 active.push(fut);
257 } else {
258 break;
259 }
260 }
261
262 if active.is_empty() {
264 break;
265 }
266
267 if let Some(result) = active.next().await {
269 let (_hash, key, data) = result?;
270
271 let data = match data {
272 Some(d) => d,
273 None => continue,
274 };
275
276 let plaintext = if let Some(k) = key {
278 decrypt_chk(&data, &k).unwrap_or(data)
279 } else {
280 data
281 };
282
283 if is_tree_node(&plaintext) {
285 if let Ok(node) = decode_tree_node(&plaintext) {
286 for link in node.links {
287 if !visited.contains(&link.hash) {
288 pending.push_back((link.hash, link.key));
289 }
290 }
291 }
292 }
293 }
294 }
295
296 Ok(TreeDiff { added, stats })
297}
298
299pub async fn tree_diff_streaming<S, F>(
304 tree: &HashTree<S>,
305 old_hashes: &HashSet<Hash>,
306 new_root: &Cid,
307 concurrency: usize,
308 mut callback: F,
309) -> Result<DiffStats, HashTreeError>
310where
311 S: Store,
312 F: FnMut(Hash) -> bool, {
314 use futures::stream::{FuturesUnordered, StreamExt};
315 use std::collections::VecDeque;
316
317 let store = tree.get_store();
318 let mut visited: HashSet<Hash> = HashSet::new();
319 let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
320 let mut active = FuturesUnordered::new();
321
322 let mut stats = DiffStats {
323 old_tree_nodes: old_hashes.len(),
324 new_tree_nodes: 0,
325 unchanged_subtrees: 0,
326 };
327
328 pending.push_back((new_root.hash, new_root.key));
329
330 loop {
331 while active.len() < concurrency {
332 if let Some((hash, key)) = pending.pop_front() {
333 if visited.contains(&hash) {
334 continue;
335 }
336 visited.insert(hash);
337
338 if old_hashes.contains(&hash) {
339 stats.unchanged_subtrees += 1;
340 continue;
341 }
342
343 stats.new_tree_nodes += 1;
344
345 if !callback(hash) {
347 return Ok(stats);
349 }
350
351 let store = store.clone();
352 let fut = async move {
353 let data = store
354 .get(&hash)
355 .await
356 .map_err(|e| HashTreeError::Store(e.to_string()))?;
357 Ok::<_, HashTreeError>((hash, key, data))
358 };
359 active.push(fut);
360 } else {
361 break;
362 }
363 }
364
365 if active.is_empty() {
366 break;
367 }
368
369 if let Some(result) = active.next().await {
370 let (_hash, key, data) = result?;
371
372 let data = match data {
373 Some(d) => d,
374 None => continue,
375 };
376
377 let plaintext = if let Some(k) = key {
378 decrypt_chk(&data, &k).unwrap_or(data)
379 } else {
380 data
381 };
382
383 if is_tree_node(&plaintext) {
384 if let Ok(node) = decode_tree_node(&plaintext) {
385 for link in node.links {
386 if !visited.contains(&link.hash) {
387 pending.push_back((link.hash, link.key));
388 }
389 }
390 }
391 }
392 }
393 }
394
395 Ok(stats)
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use crate::store::MemoryStore;
402 use crate::types::{DirEntry, LinkType};
403 use crate::HashTreeConfig;
404 use std::sync::Arc;
405
406 fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
407 let store = Arc::new(MemoryStore::new());
408 let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
409 (store, tree)
410 }
411
412 fn make_encrypted_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
413 let store = Arc::new(MemoryStore::new());
414 let tree = HashTree::new(HashTreeConfig::new(store.clone()));
415 (store, tree)
416 }
417
418 #[tokio::test]
419 async fn test_diff_identical_trees() {
420 let (_store, tree) = make_tree();
421
422 let file1 = tree.put_blob(b"content1").await.unwrap();
424 let file2 = tree.put_blob(b"content2").await.unwrap();
425 let dir_cid = tree
426 .put_directory(vec![
427 DirEntry::new("a.txt", file1).with_size(8),
428 DirEntry::new("b.txt", file2).with_size(8),
429 ])
430 .await
431 .unwrap();
432
433 let diff = tree_diff(&tree, Some(&dir_cid), &dir_cid, 4).await.unwrap();
435
436 assert!(diff.is_empty(), "identical trees should have empty diff");
437 assert_eq!(diff.added_count(), 0);
438 }
439
440 #[tokio::test]
441 async fn test_diff_single_file_change() {
442 let (_store, tree) = make_tree();
443
444 let file1 = tree.put_blob(b"content1").await.unwrap();
446 let file2 = tree.put_blob(b"content2").await.unwrap();
447 let old_dir = tree
448 .put_directory(vec![
449 DirEntry::new("a.txt", file1).with_size(8),
450 DirEntry::new("b.txt", file2).with_size(8),
451 ])
452 .await
453 .unwrap();
454
455 let file1_new = tree.put_blob(b"content1-modified").await.unwrap();
457 let new_dir = tree
458 .put_directory(vec![
459 DirEntry::new("a.txt", file1_new).with_size(17),
460 DirEntry::new("b.txt", file2).with_size(8), ])
462 .await
463 .unwrap();
464
465 let diff = tree_diff(&tree, Some(&old_dir), &new_dir, 4).await.unwrap();
466
467 assert!(!diff.is_empty());
469 assert_eq!(diff.added_count(), 2); assert!(diff.added.contains(&file1_new));
471 assert!(diff.added.contains(&new_dir.hash));
472 assert!(!diff.added.contains(&file2)); }
474
475 #[tokio::test]
476 async fn test_diff_subtree_unchanged() {
477 let (_store, tree) = make_tree();
478
479 let sub_file = tree.put_blob(b"sub content").await.unwrap();
481 let subdir = tree
482 .put_directory(vec![DirEntry::new("sub.txt", sub_file).with_size(11)])
483 .await
484 .unwrap();
485
486 let file1 = tree.put_blob(b"root file").await.unwrap();
488 let old_root = tree
489 .put_directory(vec![
490 DirEntry::new("file.txt", file1).with_size(9),
491 DirEntry::new("subdir", subdir.hash).with_size(0).with_link_type(LinkType::Dir),
492 ])
493 .await
494 .unwrap();
495
496 let file1_new = tree.put_blob(b"root file changed").await.unwrap();
498 let new_root = tree
499 .put_directory(vec![
500 DirEntry::new("file.txt", file1_new).with_size(17),
501 DirEntry::new("subdir", subdir.hash).with_size(0).with_link_type(LinkType::Dir),
502 ])
503 .await
504 .unwrap();
505
506 let diff = tree_diff(&tree, Some(&old_root), &new_root, 4).await.unwrap();
507
508 assert!(diff.added.contains(&new_root.hash));
510 assert!(diff.added.contains(&file1_new));
511 assert!(!diff.added.contains(&subdir.hash)); assert!(!diff.added.contains(&sub_file)); assert!(diff.stats.unchanged_subtrees > 0);
516 }
517
518 #[tokio::test]
519 async fn test_diff_new_directory() {
520 let (_store, tree) = make_tree();
521
522 let file1 = tree.put_blob(b"file1").await.unwrap();
524 let old_root = tree
525 .put_directory(vec![DirEntry::new("a.txt", file1).with_size(5)])
526 .await
527 .unwrap();
528
529 let new_file = tree.put_blob(b"new file").await.unwrap();
531 let new_dir = tree
532 .put_directory(vec![DirEntry::new("inner.txt", new_file).with_size(8)])
533 .await
534 .unwrap();
535 let new_root = tree
536 .put_directory(vec![
537 DirEntry::new("a.txt", file1).with_size(5),
538 DirEntry::new("newdir", new_dir.hash).with_size(0).with_link_type(LinkType::Dir),
539 ])
540 .await
541 .unwrap();
542
543 let diff = tree_diff(&tree, Some(&old_root), &new_root, 4).await.unwrap();
544
545 assert!(diff.added.contains(&new_root.hash));
547 assert!(diff.added.contains(&new_dir.hash));
548 assert!(diff.added.contains(&new_file));
549 assert!(!diff.added.contains(&file1));
551 }
552
553 #[tokio::test]
554 async fn test_diff_empty_old_tree() {
555 let (_store, tree) = make_tree();
556
557 let file1 = tree.put_blob(b"content").await.unwrap();
559 let new_root = tree
560 .put_directory(vec![DirEntry::new("file.txt", file1).with_size(7)])
561 .await
562 .unwrap();
563
564 let diff = tree_diff(&tree, None, &new_root, 4).await.unwrap();
566
567 assert_eq!(diff.added_count(), 2); assert!(diff.added.contains(&new_root.hash));
570 assert!(diff.added.contains(&file1));
571 }
572
573 #[tokio::test]
574 async fn test_diff_encrypted_trees() {
575 let (_store, tree) = make_encrypted_tree();
576
577 let (file1_cid, _) = tree.put(b"content1").await.unwrap();
579 let (file2_cid, _) = tree.put(b"content2").await.unwrap();
580 let old_dir = tree
581 .put_directory(vec![
582 DirEntry::from_cid("a.txt", &file1_cid).with_size(8),
583 DirEntry::from_cid("b.txt", &file2_cid).with_size(8),
584 ])
585 .await
586 .unwrap();
587
588 let (file1_new_cid, _) = tree.put(b"content1-modified").await.unwrap();
590 let new_dir = tree
591 .put_directory(vec![
592 DirEntry::from_cid("a.txt", &file1_new_cid).with_size(17),
593 DirEntry::from_cid("b.txt", &file2_cid).with_size(8),
594 ])
595 .await
596 .unwrap();
597
598 let diff = tree_diff(&tree, Some(&old_dir), &new_dir, 4).await.unwrap();
599
600 assert!(!diff.is_empty());
602 assert!(diff.added.contains(&file1_new_cid.hash));
603 assert!(!diff.added.contains(&file2_cid.hash)); }
605
606 #[tokio::test]
607 async fn test_collect_hashes() {
608 let (_store, tree) = make_tree();
609
610 let file1 = tree.put_blob(b"content1").await.unwrap();
611 let file2 = tree.put_blob(b"content2").await.unwrap();
612 let dir_cid = tree
613 .put_directory(vec![
614 DirEntry::new("a.txt", file1).with_size(8),
615 DirEntry::new("b.txt", file2).with_size(8),
616 ])
617 .await
618 .unwrap();
619
620 let hashes = collect_hashes(&tree, &dir_cid, 4).await.unwrap();
621
622 assert_eq!(hashes.len(), 3); assert!(hashes.contains(&dir_cid.hash));
624 assert!(hashes.contains(&file1));
625 assert!(hashes.contains(&file2));
626 }
627
628 #[tokio::test]
629 async fn test_diff_streaming() {
630 let (_store, tree) = make_tree();
631
632 let file1 = tree.put_blob(b"old").await.unwrap();
633 let old_root = tree
634 .put_directory(vec![DirEntry::new("a.txt", file1).with_size(3)])
635 .await
636 .unwrap();
637
638 let file2 = tree.put_blob(b"new").await.unwrap();
639 let new_root = tree
640 .put_directory(vec![DirEntry::new("a.txt", file2).with_size(3)])
641 .await
642 .unwrap();
643
644 let old_hashes = collect_hashes(&tree, &old_root, 4).await.unwrap();
645
646 let mut streamed: Vec<Hash> = Vec::new();
647 let stats = tree_diff_streaming(&tree, &old_hashes, &new_root, 4, |hash| {
648 streamed.push(hash);
649 true })
651 .await
652 .unwrap();
653
654 assert_eq!(streamed.len(), 2); assert!(streamed.contains(&new_root.hash));
656 assert!(streamed.contains(&file2));
657 assert_eq!(stats.new_tree_nodes, 2);
658 }
659
660 #[tokio::test]
661 async fn test_diff_streaming_early_stop() {
662 let (_store, tree) = make_tree();
663
664 let file1 = tree.put_blob(b"f1").await.unwrap();
665 let file2 = tree.put_blob(b"f2").await.unwrap();
666 let file3 = tree.put_blob(b"f3").await.unwrap();
667 let new_root = tree
668 .put_directory(vec![
669 DirEntry::new("a.txt", file1).with_size(2),
670 DirEntry::new("b.txt", file2).with_size(2),
671 DirEntry::new("c.txt", file3).with_size(2),
672 ])
673 .await
674 .unwrap();
675
676 let old_hashes = HashSet::new(); let mut count = 0;
679 let _stats = tree_diff_streaming(&tree, &old_hashes, &new_root, 1, |_hash| {
680 count += 1;
681 count < 2 })
683 .await
684 .unwrap();
685
686 assert!(count <= 2, "should have stopped early");
687 }
688
689 #[tokio::test]
690 async fn test_diff_large_tree_structure() {
691 let (_store, tree) = make_tree();
692
693 let mut entries = Vec::new();
695 let mut old_hashes_vec = Vec::new();
696
697 for i in 0..100 {
698 let data = format!("content {}", i);
699 let hash = tree.put_blob(data.as_bytes()).await.unwrap();
700 entries.push(DirEntry::new(format!("file{}.txt", i), hash).with_size(data.len() as u64));
701 old_hashes_vec.push(hash);
702 }
703
704 let old_root = tree.put_directory(entries.clone()).await.unwrap();
705 old_hashes_vec.push(old_root.hash);
706
707 for i in 0..5 {
709 let data = format!("modified content {}", i);
710 let hash = tree.put_blob(data.as_bytes()).await.unwrap();
711 entries[i] = DirEntry::new(format!("file{}.txt", i), hash).with_size(data.len() as u64);
712 }
713
714 let new_root = tree.put_directory(entries).await.unwrap();
715
716 let diff = tree_diff(&tree, Some(&old_root), &new_root, 8).await.unwrap();
717
718 assert_eq!(diff.added_count(), 6);
720 assert!(diff.added.contains(&new_root.hash));
721
722 assert!(diff.stats.unchanged_subtrees >= 95);
724 }
725}