Skip to main content

hashtree_core/
diff.rs

1//! Tree diff operations for efficient incremental updates
2//!
3//! Computes the difference between two merkle trees, identifying which
4//! hashes exist in the new tree but not the old tree. This enables
5//! efficient push operations where only changed content is uploaded.
6//!
7//! # Key Optimization: Subtree Pruning
8//!
9//! When a subtree's root hash matches between old and new trees, the entire
10//! subtree is skipped - no need to traverse it since identical hash means
11//! identical content.
12
13use std::collections::HashSet;
14use std::sync::atomic::{AtomicUsize, Ordering};
15
16use crate::codec::{decode_tree_node, is_tree_node};
17use crate::crypto::decrypt_chk;
18use crate::hashtree::{HashTree, HashTreeError};
19use crate::store::Store;
20use crate::types::{Cid, Hash};
21
22/// Result of a tree diff operation
23#[derive(Debug, Clone)]
24pub struct TreeDiff {
25    /// Hashes present in new tree but not in old tree (need upload)
26    pub added: Vec<Hash>,
27    /// Statistics about the diff operation
28    pub stats: DiffStats,
29}
30
31impl TreeDiff {
32    /// Create an empty diff (identical trees)
33    pub fn empty() -> Self {
34        Self {
35            added: Vec::new(),
36            stats: DiffStats::default(),
37        }
38    }
39
40    /// Check if there are any changes
41    pub fn is_empty(&self) -> bool {
42        self.added.is_empty()
43    }
44
45    /// Number of hashes to upload
46    pub fn added_count(&self) -> usize {
47        self.added.len()
48    }
49}
50
51/// Statistics from the diff operation
52#[derive(Debug, Clone, Default)]
53pub struct DiffStats {
54    /// Number of nodes visited in old tree
55    pub old_tree_nodes: usize,
56    /// Number of nodes visited in new tree
57    pub new_tree_nodes: usize,
58    /// Number of subtrees skipped due to hash match
59    pub unchanged_subtrees: usize,
60}
61
62/// Collect all hashes in a tree using parallel traversal
63///
64/// This walks the tree and collects all unique hashes (both tree nodes and blobs).
65/// Uses parallel fetching for efficiency.
66///
67/// # Arguments
68/// * `tree` - The HashTree instance with store access
69/// * `root` - Root CID to start from
70/// * `concurrency` - Number of concurrent fetches
71///
72/// # Returns
73/// Set of all hashes in the tree
74pub async fn collect_hashes<S: Store>(
75    tree: &HashTree<S>,
76    root: &Cid,
77    concurrency: usize,
78) -> Result<HashSet<Hash>, HashTreeError> {
79    collect_hashes_with_progress(tree, root, concurrency, None).await
80}
81
82/// Collect hashes with optional progress tracking
83pub async fn collect_hashes_with_progress<S: Store>(
84    tree: &HashTree<S>,
85    root: &Cid,
86    concurrency: usize,
87    progress: Option<&AtomicUsize>,
88) -> Result<HashSet<Hash>, HashTreeError> {
89    use futures::stream::{FuturesUnordered, StreamExt};
90    use std::collections::VecDeque;
91
92    let store = tree.get_store();
93    let mut hashes = HashSet::new();
94    let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
95    let mut active = FuturesUnordered::new();
96
97    // Seed with root
98    pending.push_back((root.hash, root.key));
99
100    loop {
101        // Fill up to concurrency limit
102        while active.len() < concurrency {
103            if let Some((hash, key)) = pending.pop_front() {
104                // Skip if already visited
105                if hashes.contains(&hash) {
106                    continue;
107                }
108                hashes.insert(hash);
109
110                let store = store.clone();
111                let fut = async move {
112                    let data = store
113                        .get(&hash)
114                        .await
115                        .map_err(|e| HashTreeError::Store(e.to_string()))?;
116                    Ok::<_, HashTreeError>((hash, key, data))
117                };
118                active.push(fut);
119            } else {
120                break;
121            }
122        }
123
124        // If nothing active, we're done
125        if active.is_empty() {
126            break;
127        }
128
129        // Wait for any future to complete
130        if let Some(result) = active.next().await {
131            let (_hash, key, data) = result?;
132
133            if let Some(counter) = progress {
134                counter.fetch_add(1, Ordering::Relaxed);
135            }
136
137            let data = match data {
138                Some(d) => d,
139                None => continue,
140            };
141
142            // Decrypt if key present
143            let plaintext = if let Some(k) = key {
144                decrypt_chk(&data, &k).unwrap_or(data)
145            } else {
146                data
147            };
148
149            // If it's a tree node, queue children
150            if is_tree_node(&plaintext) {
151                if let Ok(node) = decode_tree_node(&plaintext) {
152                    for link in node.links {
153                        if !hashes.contains(&link.hash) {
154                            pending.push_back((link.hash, link.key));
155                        }
156                    }
157                }
158            }
159        }
160    }
161
162    Ok(hashes)
163}
164
165/// Compute diff between two trees
166///
167/// Returns hashes present in `new_root` but not in `old_root`.
168/// Uses subtree pruning: when a hash exists in old tree, skips entire subtree.
169///
170/// # Algorithm
171/// 1. Collect all hashes from old tree into a set
172/// 2. Walk new tree:
173///    - If hash in old set: skip (subtree unchanged)
174///    - If hash not in old set: add to result, traverse children
175///
176/// # Arguments
177/// * `tree` - HashTree instance
178/// * `old_root` - Root of the old tree (may be None for first push)
179/// * `new_root` - Root of the new tree
180/// * `concurrency` - Number of concurrent fetches
181///
182/// # Returns
183/// TreeDiff with added hashes and statistics
184pub async fn tree_diff<S: Store>(
185    tree: &HashTree<S>,
186    old_root: Option<&Cid>,
187    new_root: &Cid,
188    concurrency: usize,
189) -> Result<TreeDiff, HashTreeError> {
190    // No old tree = everything is new
191    let old_hashes = if let Some(old) = old_root {
192        collect_hashes(tree, old, concurrency).await?
193    } else {
194        HashSet::new()
195    };
196
197    tree_diff_with_old_hashes(tree, &old_hashes, new_root, concurrency).await
198}
199
200/// Compute diff given pre-computed old hashes
201///
202/// Use this when you already have the old tree's hash set (e.g., from a previous operation)
203pub async fn tree_diff_with_old_hashes<S: Store>(
204    tree: &HashTree<S>,
205    old_hashes: &HashSet<Hash>,
206    new_root: &Cid,
207    concurrency: usize,
208) -> Result<TreeDiff, HashTreeError> {
209    use futures::stream::{FuturesUnordered, StreamExt};
210    use std::collections::VecDeque;
211
212    let store = tree.get_store();
213    let mut added: Vec<Hash> = Vec::new();
214    let mut visited: HashSet<Hash> = HashSet::new();
215    let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
216    let mut active = FuturesUnordered::new();
217
218    let mut stats = DiffStats {
219        old_tree_nodes: old_hashes.len(),
220        new_tree_nodes: 0,
221        unchanged_subtrees: 0,
222    };
223
224    // Seed with new root
225    pending.push_back((new_root.hash, new_root.key));
226
227    loop {
228        // Fill up to concurrency limit
229        while active.len() < concurrency {
230            if let Some((hash, key)) = pending.pop_front() {
231                // Skip if already visited
232                if visited.contains(&hash) {
233                    continue;
234                }
235                visited.insert(hash);
236
237                // KEY OPTIMIZATION: If hash exists in old tree, skip entire subtree
238                if old_hashes.contains(&hash) {
239                    stats.unchanged_subtrees += 1;
240                    continue;
241                }
242
243                // Hash is new - will need to upload
244                added.push(hash);
245                stats.new_tree_nodes += 1;
246
247                // Fetch to check for children
248                let store = store.clone();
249                let fut = async move {
250                    let data = store
251                        .get(&hash)
252                        .await
253                        .map_err(|e| HashTreeError::Store(e.to_string()))?;
254                    Ok::<_, HashTreeError>((hash, key, data))
255                };
256                active.push(fut);
257            } else {
258                break;
259            }
260        }
261
262        // If nothing active, we're done
263        if active.is_empty() {
264            break;
265        }
266
267        // Wait for any future to complete
268        if let Some(result) = active.next().await {
269            let (_hash, key, data) = result?;
270
271            let data = match data {
272                Some(d) => d,
273                None => continue,
274            };
275
276            // Decrypt if key present
277            let plaintext = if let Some(k) = key {
278                decrypt_chk(&data, &k).unwrap_or(data)
279            } else {
280                data
281            };
282
283            // If it's a tree node, queue children
284            if is_tree_node(&plaintext) {
285                if let Ok(node) = decode_tree_node(&plaintext) {
286                    for link in node.links {
287                        if !visited.contains(&link.hash) {
288                            pending.push_back((link.hash, link.key));
289                        }
290                    }
291                }
292            }
293        }
294    }
295
296    Ok(TreeDiff { added, stats })
297}
298
299/// Streaming diff - yields hashes as they're discovered
300///
301/// Memory efficient for very large trees. Yields hashes that need upload
302/// one at a time instead of collecting into a Vec.
303pub async fn tree_diff_streaming<S, F>(
304    tree: &HashTree<S>,
305    old_hashes: &HashSet<Hash>,
306    new_root: &Cid,
307    concurrency: usize,
308    mut callback: F,
309) -> Result<DiffStats, HashTreeError>
310where
311    S: Store,
312    F: FnMut(Hash) -> bool, // return false to stop early
313{
314    use futures::stream::{FuturesUnordered, StreamExt};
315    use std::collections::VecDeque;
316
317    let store = tree.get_store();
318    let mut visited: HashSet<Hash> = HashSet::new();
319    let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
320    let mut active = FuturesUnordered::new();
321
322    let mut stats = DiffStats {
323        old_tree_nodes: old_hashes.len(),
324        new_tree_nodes: 0,
325        unchanged_subtrees: 0,
326    };
327
328    pending.push_back((new_root.hash, new_root.key));
329
330    loop {
331        while active.len() < concurrency {
332            if let Some((hash, key)) = pending.pop_front() {
333                if visited.contains(&hash) {
334                    continue;
335                }
336                visited.insert(hash);
337
338                if old_hashes.contains(&hash) {
339                    stats.unchanged_subtrees += 1;
340                    continue;
341                }
342
343                stats.new_tree_nodes += 1;
344
345                // Yield this hash via callback
346                if !callback(hash) {
347                    // Early termination requested
348                    return Ok(stats);
349                }
350
351                let store = store.clone();
352                let fut = async move {
353                    let data = store
354                        .get(&hash)
355                        .await
356                        .map_err(|e| HashTreeError::Store(e.to_string()))?;
357                    Ok::<_, HashTreeError>((hash, key, data))
358                };
359                active.push(fut);
360            } else {
361                break;
362            }
363        }
364
365        if active.is_empty() {
366            break;
367        }
368
369        if let Some(result) = active.next().await {
370            let (_hash, key, data) = result?;
371
372            let data = match data {
373                Some(d) => d,
374                None => continue,
375            };
376
377            let plaintext = if let Some(k) = key {
378                decrypt_chk(&data, &k).unwrap_or(data)
379            } else {
380                data
381            };
382
383            if is_tree_node(&plaintext) {
384                if let Ok(node) = decode_tree_node(&plaintext) {
385                    for link in node.links {
386                        if !visited.contains(&link.hash) {
387                            pending.push_back((link.hash, link.key));
388                        }
389                    }
390                }
391            }
392        }
393    }
394
395    Ok(stats)
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use crate::store::MemoryStore;
402    use crate::types::{DirEntry, LinkType};
403    use crate::HashTreeConfig;
404    use std::sync::Arc;
405
406    fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
407        let store = Arc::new(MemoryStore::new());
408        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
409        (store, tree)
410    }
411
412    fn make_encrypted_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
413        let store = Arc::new(MemoryStore::new());
414        let tree = HashTree::new(HashTreeConfig::new(store.clone()));
415        (store, tree)
416    }
417
418    #[tokio::test]
419    async fn test_diff_identical_trees() {
420        let (_store, tree) = make_tree();
421
422        // Create a simple tree
423        let file1 = tree.put_blob(b"content1").await.unwrap();
424        let file2 = tree.put_blob(b"content2").await.unwrap();
425        let dir_cid = tree
426            .put_directory(vec![
427                DirEntry::new("a.txt", file1).with_size(8),
428                DirEntry::new("b.txt", file2).with_size(8),
429            ])
430            .await
431            .unwrap();
432
433        // Diff tree against itself should be empty
434        let diff = tree_diff(&tree, Some(&dir_cid), &dir_cid, 4).await.unwrap();
435
436        assert!(diff.is_empty(), "identical trees should have empty diff");
437        assert_eq!(diff.added_count(), 0);
438    }
439
440    #[tokio::test]
441    async fn test_diff_single_file_change() {
442        let (_store, tree) = make_tree();
443
444        // Create old tree
445        let file1 = tree.put_blob(b"content1").await.unwrap();
446        let file2 = tree.put_blob(b"content2").await.unwrap();
447        let old_dir = tree
448            .put_directory(vec![
449                DirEntry::new("a.txt", file1).with_size(8),
450                DirEntry::new("b.txt", file2).with_size(8),
451            ])
452            .await
453            .unwrap();
454
455        // Create new tree with one file changed
456        let file1_new = tree.put_blob(b"content1-modified").await.unwrap();
457        let new_dir = tree
458            .put_directory(vec![
459                DirEntry::new("a.txt", file1_new).with_size(17),
460                DirEntry::new("b.txt", file2).with_size(8), // unchanged
461            ])
462            .await
463            .unwrap();
464
465        let diff = tree_diff(&tree, Some(&old_dir), &new_dir, 4).await.unwrap();
466
467        // Should have: new root dir + new file blob
468        assert!(!diff.is_empty());
469        assert_eq!(diff.added_count(), 2); // new dir node + new file
470        assert!(diff.added.contains(&file1_new));
471        assert!(diff.added.contains(&new_dir.hash));
472        assert!(!diff.added.contains(&file2)); // file2 unchanged
473    }
474
475    #[tokio::test]
476    async fn test_diff_subtree_unchanged() {
477        let (_store, tree) = make_tree();
478
479        // Create subdirectory
480        let sub_file = tree.put_blob(b"sub content").await.unwrap();
481        let subdir = tree
482            .put_directory(vec![DirEntry::new("sub.txt", sub_file).with_size(11)])
483            .await
484            .unwrap();
485
486        // Old tree
487        let file1 = tree.put_blob(b"root file").await.unwrap();
488        let old_root = tree
489            .put_directory(vec![
490                DirEntry::new("file.txt", file1).with_size(9),
491                DirEntry::new("subdir", subdir.hash)
492                    .with_size(0)
493                    .with_link_type(LinkType::Dir),
494            ])
495            .await
496            .unwrap();
497
498        // New tree - change root file, subdir unchanged
499        let file1_new = tree.put_blob(b"root file changed").await.unwrap();
500        let new_root = tree
501            .put_directory(vec![
502                DirEntry::new("file.txt", file1_new).with_size(17),
503                DirEntry::new("subdir", subdir.hash)
504                    .with_size(0)
505                    .with_link_type(LinkType::Dir),
506            ])
507            .await
508            .unwrap();
509
510        let diff = tree_diff(&tree, Some(&old_root), &new_root, 4)
511            .await
512            .unwrap();
513
514        // Should NOT include subdir or its contents
515        assert!(diff.added.contains(&new_root.hash));
516        assert!(diff.added.contains(&file1_new));
517        assert!(!diff.added.contains(&subdir.hash)); // subdir unchanged
518        assert!(!diff.added.contains(&sub_file)); // subdir content unchanged
519
520        // Stats should show subtree was skipped
521        assert!(diff.stats.unchanged_subtrees > 0);
522    }
523
524    #[tokio::test]
525    async fn test_diff_new_directory() {
526        let (_store, tree) = make_tree();
527
528        // Old tree - simple
529        let file1 = tree.put_blob(b"file1").await.unwrap();
530        let old_root = tree
531            .put_directory(vec![DirEntry::new("a.txt", file1).with_size(5)])
532            .await
533            .unwrap();
534
535        // New tree - add directory
536        let new_file = tree.put_blob(b"new file").await.unwrap();
537        let new_dir = tree
538            .put_directory(vec![DirEntry::new("inner.txt", new_file).with_size(8)])
539            .await
540            .unwrap();
541        let new_root = tree
542            .put_directory(vec![
543                DirEntry::new("a.txt", file1).with_size(5),
544                DirEntry::new("newdir", new_dir.hash)
545                    .with_size(0)
546                    .with_link_type(LinkType::Dir),
547            ])
548            .await
549            .unwrap();
550
551        let diff = tree_diff(&tree, Some(&old_root), &new_root, 4)
552            .await
553            .unwrap();
554
555        // Should include new root, new dir, and new file
556        assert!(diff.added.contains(&new_root.hash));
557        assert!(diff.added.contains(&new_dir.hash));
558        assert!(diff.added.contains(&new_file));
559        // Original file should NOT be in diff
560        assert!(!diff.added.contains(&file1));
561    }
562
563    #[tokio::test]
564    async fn test_diff_empty_old_tree() {
565        let (_store, tree) = make_tree();
566
567        // New tree
568        let file1 = tree.put_blob(b"content").await.unwrap();
569        let new_root = tree
570            .put_directory(vec![DirEntry::new("file.txt", file1).with_size(7)])
571            .await
572            .unwrap();
573
574        // No old tree (first push)
575        let diff = tree_diff(&tree, None, &new_root, 4).await.unwrap();
576
577        // Everything should be new
578        assert_eq!(diff.added_count(), 2); // root + file
579        assert!(diff.added.contains(&new_root.hash));
580        assert!(diff.added.contains(&file1));
581    }
582
583    #[tokio::test]
584    async fn test_diff_encrypted_trees() {
585        let (_store, tree) = make_encrypted_tree();
586
587        // Create old tree (encrypted)
588        let (file1_cid, _) = tree.put(b"content1").await.unwrap();
589        let (file2_cid, _) = tree.put(b"content2").await.unwrap();
590        let old_dir = tree
591            .put_directory(vec![
592                DirEntry::from_cid("a.txt", &file1_cid).with_size(8),
593                DirEntry::from_cid("b.txt", &file2_cid).with_size(8),
594            ])
595            .await
596            .unwrap();
597
598        // Create new tree with one file changed
599        let (file1_new_cid, _) = tree.put(b"content1-modified").await.unwrap();
600        let new_dir = tree
601            .put_directory(vec![
602                DirEntry::from_cid("a.txt", &file1_new_cid).with_size(17),
603                DirEntry::from_cid("b.txt", &file2_cid).with_size(8),
604            ])
605            .await
606            .unwrap();
607
608        let diff = tree_diff(&tree, Some(&old_dir), &new_dir, 4).await.unwrap();
609
610        // Should work with encrypted content
611        assert!(!diff.is_empty());
612        assert!(diff.added.contains(&file1_new_cid.hash));
613        assert!(!diff.added.contains(&file2_cid.hash)); // unchanged
614    }
615
616    #[tokio::test]
617    async fn test_collect_hashes() {
618        let (_store, tree) = make_tree();
619
620        let file1 = tree.put_blob(b"content1").await.unwrap();
621        let file2 = tree.put_blob(b"content2").await.unwrap();
622        let dir_cid = tree
623            .put_directory(vec![
624                DirEntry::new("a.txt", file1).with_size(8),
625                DirEntry::new("b.txt", file2).with_size(8),
626            ])
627            .await
628            .unwrap();
629
630        let hashes = collect_hashes(&tree, &dir_cid, 4).await.unwrap();
631
632        assert_eq!(hashes.len(), 3); // dir + 2 files
633        assert!(hashes.contains(&dir_cid.hash));
634        assert!(hashes.contains(&file1));
635        assert!(hashes.contains(&file2));
636    }
637
638    #[tokio::test]
639    async fn test_diff_streaming() {
640        let (_store, tree) = make_tree();
641
642        let file1 = tree.put_blob(b"old").await.unwrap();
643        let old_root = tree
644            .put_directory(vec![DirEntry::new("a.txt", file1).with_size(3)])
645            .await
646            .unwrap();
647
648        let file2 = tree.put_blob(b"new").await.unwrap();
649        let new_root = tree
650            .put_directory(vec![DirEntry::new("a.txt", file2).with_size(3)])
651            .await
652            .unwrap();
653
654        let old_hashes = collect_hashes(&tree, &old_root, 4).await.unwrap();
655
656        let mut streamed: Vec<Hash> = Vec::new();
657        let stats = tree_diff_streaming(&tree, &old_hashes, &new_root, 4, |hash| {
658            streamed.push(hash);
659            true // continue
660        })
661        .await
662        .unwrap();
663
664        assert_eq!(streamed.len(), 2); // new root + new file
665        assert!(streamed.contains(&new_root.hash));
666        assert!(streamed.contains(&file2));
667        assert_eq!(stats.new_tree_nodes, 2);
668    }
669
670    #[tokio::test]
671    async fn test_diff_streaming_early_stop() {
672        let (_store, tree) = make_tree();
673
674        let file1 = tree.put_blob(b"f1").await.unwrap();
675        let file2 = tree.put_blob(b"f2").await.unwrap();
676        let file3 = tree.put_blob(b"f3").await.unwrap();
677        let new_root = tree
678            .put_directory(vec![
679                DirEntry::new("a.txt", file1).with_size(2),
680                DirEntry::new("b.txt", file2).with_size(2),
681                DirEntry::new("c.txt", file3).with_size(2),
682            ])
683            .await
684            .unwrap();
685
686        let old_hashes = HashSet::new(); // empty = all new
687
688        let mut count = 0;
689        let _stats = tree_diff_streaming(&tree, &old_hashes, &new_root, 1, |_hash| {
690            count += 1;
691            count < 2 // stop after 2
692        })
693        .await
694        .unwrap();
695
696        assert!(count <= 2, "should have stopped early");
697    }
698
699    #[tokio::test]
700    async fn test_diff_large_tree_structure() {
701        let (_store, tree) = make_tree();
702
703        // Create a tree with many files
704        let mut entries = Vec::new();
705        let mut old_hashes_vec = Vec::new();
706
707        for i in 0..100 {
708            let data = format!("content {}", i);
709            let hash = tree.put_blob(data.as_bytes()).await.unwrap();
710            entries
711                .push(DirEntry::new(format!("file{}.txt", i), hash).with_size(data.len() as u64));
712            old_hashes_vec.push(hash);
713        }
714
715        let old_root = tree.put_directory(entries.clone()).await.unwrap();
716        old_hashes_vec.push(old_root.hash);
717
718        // Modify 5 files
719        for i in 0..5 {
720            let data = format!("modified content {}", i);
721            let hash = tree.put_blob(data.as_bytes()).await.unwrap();
722            entries[i] = DirEntry::new(format!("file{}.txt", i), hash).with_size(data.len() as u64);
723        }
724
725        let new_root = tree.put_directory(entries).await.unwrap();
726
727        let diff = tree_diff(&tree, Some(&old_root), &new_root, 8)
728            .await
729            .unwrap();
730
731        // Should have 6 new items: new root + 5 modified files
732        assert_eq!(diff.added_count(), 6);
733        assert!(diff.added.contains(&new_root.hash));
734
735        // Should have skipped ~95 unchanged files
736        assert!(diff.stats.unchanged_subtrees >= 95);
737    }
738}