Skip to main content

hashtree_core/
diff.rs

1//! Tree diff operations for efficient incremental updates
2//!
3//! Computes the difference between two merkle trees, identifying which
4//! hashes exist in the new tree but not the old tree. This enables
5//! efficient push operations where only changed content is uploaded.
6//!
7//! # Key Optimization: Subtree Pruning
8//!
9//! When a subtree's root hash matches between old and new trees, the entire
10//! subtree is skipped - no need to traverse it since identical hash means
11//! identical content.
12
13use std::collections::HashSet;
14use std::sync::atomic::{AtomicUsize, Ordering};
15
16use crate::codec::{decode_tree_node, is_tree_node};
17use crate::crypto::decrypt_chk;
18use crate::hashtree::{HashTree, HashTreeError};
19use crate::store::Store;
20use crate::types::{Cid, Hash};
21
22/// Result of a tree diff operation
23#[derive(Debug, Clone)]
24pub struct TreeDiff {
25    /// Hashes present in new tree but not in old tree (need upload)
26    pub added: Vec<Hash>,
27    /// Statistics about the diff operation
28    pub stats: DiffStats,
29}
30
31impl TreeDiff {
32    /// Create an empty diff (identical trees)
33    pub fn empty() -> Self {
34        Self {
35            added: Vec::new(),
36            stats: DiffStats::default(),
37        }
38    }
39
40    /// Check if there are any changes
41    pub fn is_empty(&self) -> bool {
42        self.added.is_empty()
43    }
44
45    /// Number of hashes to upload
46    pub fn added_count(&self) -> usize {
47        self.added.len()
48    }
49}
50
51/// Statistics from the diff operation
52#[derive(Debug, Clone, Default)]
53pub struct DiffStats {
54    /// Number of nodes visited in old tree
55    pub old_tree_nodes: usize,
56    /// Number of nodes visited in new tree
57    pub new_tree_nodes: usize,
58    /// Number of subtrees skipped due to hash match
59    pub unchanged_subtrees: usize,
60}
61
62fn decrypt_if_keyed(data: Vec<u8>, key: Option<[u8; 32]>) -> Result<Vec<u8>, HashTreeError> {
63    if let Some(k) = key {
64        decrypt_chk(&data, &k).map_err(|e| HashTreeError::Decryption(e.to_string()))
65    } else {
66        Ok(data)
67    }
68}
69
70/// Collect all hashes in a tree using parallel traversal
71///
72/// This walks the tree and collects all unique hashes (both tree nodes and blobs).
73/// Uses parallel fetching for efficiency.
74///
75/// # Arguments
76/// * `tree` - The HashTree instance with store access
77/// * `root` - Root CID to start from
78/// * `concurrency` - Number of concurrent fetches
79///
80/// # Returns
81/// Set of all hashes in the tree
82pub async fn collect_hashes<S: Store>(
83    tree: &HashTree<S>,
84    root: &Cid,
85    concurrency: usize,
86) -> Result<HashSet<Hash>, HashTreeError> {
87    collect_hashes_with_progress(tree, root, concurrency, None).await
88}
89
90/// Collect hashes with optional progress tracking
91pub async fn collect_hashes_with_progress<S: Store>(
92    tree: &HashTree<S>,
93    root: &Cid,
94    concurrency: usize,
95    progress: Option<&AtomicUsize>,
96) -> Result<HashSet<Hash>, HashTreeError> {
97    use futures::stream::{FuturesUnordered, StreamExt};
98    use std::collections::VecDeque;
99
100    let store = tree.get_store();
101    let mut hashes = HashSet::new();
102    let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
103    let mut active = FuturesUnordered::new();
104
105    // Seed with root
106    pending.push_back((root.hash, root.key));
107
108    loop {
109        // Fill up to concurrency limit
110        while active.len() < concurrency {
111            if let Some((hash, key)) = pending.pop_front() {
112                // Skip if already visited
113                if hashes.contains(&hash) {
114                    continue;
115                }
116                hashes.insert(hash);
117
118                let store = store.clone();
119                let fut = async move {
120                    let data = store
121                        .get(&hash)
122                        .await
123                        .map_err(|e| HashTreeError::Store(e.to_string()))?;
124                    Ok::<_, HashTreeError>((hash, key, data))
125                };
126                active.push(fut);
127            } else {
128                break;
129            }
130        }
131
132        // If nothing active, we're done
133        if active.is_empty() {
134            break;
135        }
136
137        // Wait for any future to complete
138        if let Some(result) = active.next().await {
139            let (_hash, key, data) = result?;
140
141            if let Some(counter) = progress {
142                counter.fetch_add(1, Ordering::Relaxed);
143            }
144
145            let data = match data {
146                Some(d) => d,
147                None => continue,
148            };
149
150            // Decrypt if key present (strict: wrong key is an error).
151            let plaintext = decrypt_if_keyed(data, key)?;
152
153            // If it's a tree node, queue children
154            if is_tree_node(&plaintext) {
155                if let Ok(node) = decode_tree_node(&plaintext) {
156                    for link in node.links {
157                        if !hashes.contains(&link.hash) {
158                            pending.push_back((link.hash, link.key));
159                        }
160                    }
161                }
162            }
163        }
164    }
165
166    Ok(hashes)
167}
168
169/// Compute diff between two trees
170///
171/// Returns hashes present in `new_root` but not in `old_root`.
172/// Uses subtree pruning: when a hash exists in old tree, skips entire subtree.
173///
174/// # Algorithm
175/// 1. Collect all hashes from old tree into a set
176/// 2. Walk new tree:
177///    - If hash in old set: skip (subtree unchanged)
178///    - If hash not in old set: add to result, traverse children
179///
180/// # Arguments
181/// * `tree` - HashTree instance
182/// * `old_root` - Root of the old tree (may be None for first push)
183/// * `new_root` - Root of the new tree
184/// * `concurrency` - Number of concurrent fetches
185///
186/// # Returns
187/// TreeDiff with added hashes and statistics
188pub async fn tree_diff<S: Store>(
189    tree: &HashTree<S>,
190    old_root: Option<&Cid>,
191    new_root: &Cid,
192    concurrency: usize,
193) -> Result<TreeDiff, HashTreeError> {
194    // No old tree = everything is new
195    let old_hashes = if let Some(old) = old_root {
196        collect_hashes(tree, old, concurrency).await?
197    } else {
198        HashSet::new()
199    };
200
201    tree_diff_with_old_hashes(tree, &old_hashes, new_root, concurrency).await
202}
203
204/// Compute diff given pre-computed old hashes
205///
206/// Use this when you already have the old tree's hash set (e.g., from a previous operation)
207pub async fn tree_diff_with_old_hashes<S: Store>(
208    tree: &HashTree<S>,
209    old_hashes: &HashSet<Hash>,
210    new_root: &Cid,
211    concurrency: usize,
212) -> Result<TreeDiff, HashTreeError> {
213    use futures::stream::{FuturesUnordered, StreamExt};
214    use std::collections::VecDeque;
215
216    let store = tree.get_store();
217    let mut added: Vec<Hash> = Vec::new();
218    let mut visited: HashSet<Hash> = HashSet::new();
219    let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
220    let mut active = FuturesUnordered::new();
221
222    let mut stats = DiffStats {
223        old_tree_nodes: old_hashes.len(),
224        new_tree_nodes: 0,
225        unchanged_subtrees: 0,
226    };
227
228    // Seed with new root
229    pending.push_back((new_root.hash, new_root.key));
230
231    loop {
232        // Fill up to concurrency limit
233        while active.len() < concurrency {
234            if let Some((hash, key)) = pending.pop_front() {
235                // Skip if already visited
236                if visited.contains(&hash) {
237                    continue;
238                }
239                visited.insert(hash);
240
241                // KEY OPTIMIZATION: If hash exists in old tree, skip entire subtree
242                if old_hashes.contains(&hash) {
243                    stats.unchanged_subtrees += 1;
244                    continue;
245                }
246
247                // Hash is new - will need to upload
248                added.push(hash);
249                stats.new_tree_nodes += 1;
250
251                // Fetch to check for children
252                let store = store.clone();
253                let fut = async move {
254                    let data = store
255                        .get(&hash)
256                        .await
257                        .map_err(|e| HashTreeError::Store(e.to_string()))?;
258                    Ok::<_, HashTreeError>((hash, key, data))
259                };
260                active.push(fut);
261            } else {
262                break;
263            }
264        }
265
266        // If nothing active, we're done
267        if active.is_empty() {
268            break;
269        }
270
271        // Wait for any future to complete
272        if let Some(result) = active.next().await {
273            let (_hash, key, data) = result?;
274
275            let data = match data {
276                Some(d) => d,
277                None => continue,
278            };
279
280            // Decrypt if key present (strict: wrong key is an error).
281            let plaintext = decrypt_if_keyed(data, key)?;
282
283            // If it's a tree node, queue children
284            if is_tree_node(&plaintext) {
285                if let Ok(node) = decode_tree_node(&plaintext) {
286                    for link in node.links {
287                        if !visited.contains(&link.hash) {
288                            pending.push_back((link.hash, link.key));
289                        }
290                    }
291                }
292            }
293        }
294    }
295
296    Ok(TreeDiff { added, stats })
297}
298
299/// Streaming diff - yields hashes as they're discovered
300///
301/// Memory efficient for very large trees. Yields hashes that need upload
302/// one at a time instead of collecting into a Vec.
303pub async fn tree_diff_streaming<S, F>(
304    tree: &HashTree<S>,
305    old_hashes: &HashSet<Hash>,
306    new_root: &Cid,
307    concurrency: usize,
308    mut callback: F,
309) -> Result<DiffStats, HashTreeError>
310where
311    S: Store,
312    F: FnMut(Hash) -> bool, // return false to stop early
313{
314    use futures::stream::{FuturesUnordered, StreamExt};
315    use std::collections::VecDeque;
316
317    let store = tree.get_store();
318    let mut visited: HashSet<Hash> = HashSet::new();
319    let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
320    let mut active = FuturesUnordered::new();
321
322    let mut stats = DiffStats {
323        old_tree_nodes: old_hashes.len(),
324        new_tree_nodes: 0,
325        unchanged_subtrees: 0,
326    };
327
328    pending.push_back((new_root.hash, new_root.key));
329
330    loop {
331        while active.len() < concurrency {
332            if let Some((hash, key)) = pending.pop_front() {
333                if visited.contains(&hash) {
334                    continue;
335                }
336                visited.insert(hash);
337
338                if old_hashes.contains(&hash) {
339                    stats.unchanged_subtrees += 1;
340                    continue;
341                }
342
343                stats.new_tree_nodes += 1;
344
345                // Yield this hash via callback
346                if !callback(hash) {
347                    // Early termination requested
348                    return Ok(stats);
349                }
350
351                let store = store.clone();
352                let fut = async move {
353                    let data = store
354                        .get(&hash)
355                        .await
356                        .map_err(|e| HashTreeError::Store(e.to_string()))?;
357                    Ok::<_, HashTreeError>((hash, key, data))
358                };
359                active.push(fut);
360            } else {
361                break;
362            }
363        }
364
365        if active.is_empty() {
366            break;
367        }
368
369        if let Some(result) = active.next().await {
370            let (_hash, key, data) = result?;
371
372            let data = match data {
373                Some(d) => d,
374                None => continue,
375            };
376
377            let plaintext = decrypt_if_keyed(data, key)?;
378
379            if is_tree_node(&plaintext) {
380                if let Ok(node) = decode_tree_node(&plaintext) {
381                    for link in node.links {
382                        if !visited.contains(&link.hash) {
383                            pending.push_back((link.hash, link.key));
384                        }
385                    }
386                }
387            }
388        }
389    }
390
391    Ok(stats)
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397    use crate::store::MemoryStore;
398    use crate::types::{DirEntry, LinkType};
399    use crate::HashTreeConfig;
400    use std::sync::Arc;
401
402    fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
403        let store = Arc::new(MemoryStore::new());
404        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
405        (store, tree)
406    }
407
408    fn make_encrypted_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
409        let store = Arc::new(MemoryStore::new());
410        let tree = HashTree::new(HashTreeConfig::new(store.clone()));
411        (store, tree)
412    }
413
414    #[tokio::test]
415    async fn test_diff_identical_trees() {
416        let (_store, tree) = make_tree();
417
418        // Create a simple tree
419        let file1 = tree.put_blob(b"content1").await.unwrap();
420        let file2 = tree.put_blob(b"content2").await.unwrap();
421        let dir_cid = tree
422            .put_directory(vec![
423                DirEntry::new("a.txt", file1).with_size(8),
424                DirEntry::new("b.txt", file2).with_size(8),
425            ])
426            .await
427            .unwrap();
428
429        // Diff tree against itself should be empty
430        let diff = tree_diff(&tree, Some(&dir_cid), &dir_cid, 4).await.unwrap();
431
432        assert!(diff.is_empty(), "identical trees should have empty diff");
433        assert_eq!(diff.added_count(), 0);
434    }
435
436    #[tokio::test]
437    async fn test_diff_single_file_change() {
438        let (_store, tree) = make_tree();
439
440        // Create old tree
441        let file1 = tree.put_blob(b"content1").await.unwrap();
442        let file2 = tree.put_blob(b"content2").await.unwrap();
443        let old_dir = tree
444            .put_directory(vec![
445                DirEntry::new("a.txt", file1).with_size(8),
446                DirEntry::new("b.txt", file2).with_size(8),
447            ])
448            .await
449            .unwrap();
450
451        // Create new tree with one file changed
452        let file1_new = tree.put_blob(b"content1-modified").await.unwrap();
453        let new_dir = tree
454            .put_directory(vec![
455                DirEntry::new("a.txt", file1_new).with_size(17),
456                DirEntry::new("b.txt", file2).with_size(8), // unchanged
457            ])
458            .await
459            .unwrap();
460
461        let diff = tree_diff(&tree, Some(&old_dir), &new_dir, 4).await.unwrap();
462
463        // Should have: new root dir + new file blob
464        assert!(!diff.is_empty());
465        assert_eq!(diff.added_count(), 2); // new dir node + new file
466        assert!(diff.added.contains(&file1_new));
467        assert!(diff.added.contains(&new_dir.hash));
468        assert!(!diff.added.contains(&file2)); // file2 unchanged
469    }
470
471    #[tokio::test]
472    async fn test_diff_subtree_unchanged() {
473        let (_store, tree) = make_tree();
474
475        // Create subdirectory
476        let sub_file = tree.put_blob(b"sub content").await.unwrap();
477        let subdir = tree
478            .put_directory(vec![DirEntry::new("sub.txt", sub_file).with_size(11)])
479            .await
480            .unwrap();
481
482        // Old tree
483        let file1 = tree.put_blob(b"root file").await.unwrap();
484        let old_root = tree
485            .put_directory(vec![
486                DirEntry::new("file.txt", file1).with_size(9),
487                DirEntry::new("subdir", subdir.hash)
488                    .with_size(0)
489                    .with_link_type(LinkType::Dir),
490            ])
491            .await
492            .unwrap();
493
494        // New tree - change root file, subdir unchanged
495        let file1_new = tree.put_blob(b"root file changed").await.unwrap();
496        let new_root = tree
497            .put_directory(vec![
498                DirEntry::new("file.txt", file1_new).with_size(17),
499                DirEntry::new("subdir", subdir.hash)
500                    .with_size(0)
501                    .with_link_type(LinkType::Dir),
502            ])
503            .await
504            .unwrap();
505
506        let diff = tree_diff(&tree, Some(&old_root), &new_root, 4)
507            .await
508            .unwrap();
509
510        // Should NOT include subdir or its contents
511        assert!(diff.added.contains(&new_root.hash));
512        assert!(diff.added.contains(&file1_new));
513        assert!(!diff.added.contains(&subdir.hash)); // subdir unchanged
514        assert!(!diff.added.contains(&sub_file)); // subdir content unchanged
515
516        // Stats should show subtree was skipped
517        assert!(diff.stats.unchanged_subtrees > 0);
518    }
519
520    #[tokio::test]
521    async fn test_diff_new_directory() {
522        let (_store, tree) = make_tree();
523
524        // Old tree - simple
525        let file1 = tree.put_blob(b"file1").await.unwrap();
526        let old_root = tree
527            .put_directory(vec![DirEntry::new("a.txt", file1).with_size(5)])
528            .await
529            .unwrap();
530
531        // New tree - add directory
532        let new_file = tree.put_blob(b"new file").await.unwrap();
533        let new_dir = tree
534            .put_directory(vec![DirEntry::new("inner.txt", new_file).with_size(8)])
535            .await
536            .unwrap();
537        let new_root = tree
538            .put_directory(vec![
539                DirEntry::new("a.txt", file1).with_size(5),
540                DirEntry::new("newdir", new_dir.hash)
541                    .with_size(0)
542                    .with_link_type(LinkType::Dir),
543            ])
544            .await
545            .unwrap();
546
547        let diff = tree_diff(&tree, Some(&old_root), &new_root, 4)
548            .await
549            .unwrap();
550
551        // Should include new root, new dir, and new file
552        assert!(diff.added.contains(&new_root.hash));
553        assert!(diff.added.contains(&new_dir.hash));
554        assert!(diff.added.contains(&new_file));
555        // Original file should NOT be in diff
556        assert!(!diff.added.contains(&file1));
557    }
558
559    #[tokio::test]
560    async fn test_diff_empty_old_tree() {
561        let (_store, tree) = make_tree();
562
563        // New tree
564        let file1 = tree.put_blob(b"content").await.unwrap();
565        let new_root = tree
566            .put_directory(vec![DirEntry::new("file.txt", file1).with_size(7)])
567            .await
568            .unwrap();
569
570        // No old tree (first push)
571        let diff = tree_diff(&tree, None, &new_root, 4).await.unwrap();
572
573        // Everything should be new
574        assert_eq!(diff.added_count(), 2); // root + file
575        assert!(diff.added.contains(&new_root.hash));
576        assert!(diff.added.contains(&file1));
577    }
578
579    #[tokio::test]
580    async fn test_diff_encrypted_trees() {
581        let (_store, tree) = make_encrypted_tree();
582
583        // Create old tree (encrypted)
584        let (file1_cid, _) = tree.put(b"content1").await.unwrap();
585        let (file2_cid, _) = tree.put(b"content2").await.unwrap();
586        let old_dir = tree
587            .put_directory(vec![
588                DirEntry::from_cid("a.txt", &file1_cid).with_size(8),
589                DirEntry::from_cid("b.txt", &file2_cid).with_size(8),
590            ])
591            .await
592            .unwrap();
593
594        // Create new tree with one file changed
595        let (file1_new_cid, _) = tree.put(b"content1-modified").await.unwrap();
596        let new_dir = tree
597            .put_directory(vec![
598                DirEntry::from_cid("a.txt", &file1_new_cid).with_size(17),
599                DirEntry::from_cid("b.txt", &file2_cid).with_size(8),
600            ])
601            .await
602            .unwrap();
603
604        let diff = tree_diff(&tree, Some(&old_dir), &new_dir, 4).await.unwrap();
605
606        // Should work with encrypted content
607        assert!(!diff.is_empty());
608        assert!(diff.added.contains(&file1_new_cid.hash));
609        assert!(!diff.added.contains(&file2_cid.hash)); // unchanged
610    }
611
612    #[tokio::test]
613    async fn test_collect_hashes() {
614        let (_store, tree) = make_tree();
615
616        let file1 = tree.put_blob(b"content1").await.unwrap();
617        let file2 = tree.put_blob(b"content2").await.unwrap();
618        let dir_cid = tree
619            .put_directory(vec![
620                DirEntry::new("a.txt", file1).with_size(8),
621                DirEntry::new("b.txt", file2).with_size(8),
622            ])
623            .await
624            .unwrap();
625
626        let hashes = collect_hashes(&tree, &dir_cid, 4).await.unwrap();
627
628        assert_eq!(hashes.len(), 3); // dir + 2 files
629        assert!(hashes.contains(&dir_cid.hash));
630        assert!(hashes.contains(&file1));
631        assert!(hashes.contains(&file2));
632    }
633
634    #[tokio::test]
635    async fn test_diff_streaming() {
636        let (_store, tree) = make_tree();
637
638        let file1 = tree.put_blob(b"old").await.unwrap();
639        let old_root = tree
640            .put_directory(vec![DirEntry::new("a.txt", file1).with_size(3)])
641            .await
642            .unwrap();
643
644        let file2 = tree.put_blob(b"new").await.unwrap();
645        let new_root = tree
646            .put_directory(vec![DirEntry::new("a.txt", file2).with_size(3)])
647            .await
648            .unwrap();
649
650        let old_hashes = collect_hashes(&tree, &old_root, 4).await.unwrap();
651
652        let mut streamed: Vec<Hash> = Vec::new();
653        let stats = tree_diff_streaming(&tree, &old_hashes, &new_root, 4, |hash| {
654            streamed.push(hash);
655            true // continue
656        })
657        .await
658        .unwrap();
659
660        assert_eq!(streamed.len(), 2); // new root + new file
661        assert!(streamed.contains(&new_root.hash));
662        assert!(streamed.contains(&file2));
663        assert_eq!(stats.new_tree_nodes, 2);
664    }
665
666    #[tokio::test]
667    async fn test_diff_streaming_early_stop() {
668        let (_store, tree) = make_tree();
669
670        let file1 = tree.put_blob(b"f1").await.unwrap();
671        let file2 = tree.put_blob(b"f2").await.unwrap();
672        let file3 = tree.put_blob(b"f3").await.unwrap();
673        let new_root = tree
674            .put_directory(vec![
675                DirEntry::new("a.txt", file1).with_size(2),
676                DirEntry::new("b.txt", file2).with_size(2),
677                DirEntry::new("c.txt", file3).with_size(2),
678            ])
679            .await
680            .unwrap();
681
682        let old_hashes = HashSet::new(); // empty = all new
683
684        let mut count = 0;
685        let _stats = tree_diff_streaming(&tree, &old_hashes, &new_root, 1, |_hash| {
686            count += 1;
687            count < 2 // stop after 2
688        })
689        .await
690        .unwrap();
691
692        assert!(count <= 2, "should have stopped early");
693    }
694
695    #[tokio::test]
696    async fn test_diff_large_tree_structure() {
697        let (_store, tree) = make_tree();
698
699        // Create a tree with many files
700        let mut entries = Vec::new();
701        let mut old_hashes_vec = Vec::new();
702
703        for i in 0..100 {
704            let data = format!("content {}", i);
705            let hash = tree.put_blob(data.as_bytes()).await.unwrap();
706            entries
707                .push(DirEntry::new(format!("file{}.txt", i), hash).with_size(data.len() as u64));
708            old_hashes_vec.push(hash);
709        }
710
711        let old_root = tree.put_directory(entries.clone()).await.unwrap();
712        old_hashes_vec.push(old_root.hash);
713
714        // Modify 5 files
715        for i in 0..5 {
716            let data = format!("modified content {}", i);
717            let hash = tree.put_blob(data.as_bytes()).await.unwrap();
718            entries[i] = DirEntry::new(format!("file{}.txt", i), hash).with_size(data.len() as u64);
719        }
720
721        let new_root = tree.put_directory(entries).await.unwrap();
722
723        let diff = tree_diff(&tree, Some(&old_root), &new_root, 8)
724            .await
725            .unwrap();
726
727        // Should have 6 new items: new root + 5 modified files
728        assert_eq!(diff.added_count(), 6);
729        assert!(diff.added.contains(&new_root.hash));
730
731        // Should have skipped ~95 unchanged files
732        assert!(diff.stats.unchanged_subtrees >= 95);
733    }
734}