Skip to main content

hashtree_core/
diff.rs

1//! Tree diff operations for efficient incremental updates
2//!
3//! Computes the difference between two merkle trees, identifying which
4//! hashes exist in the new tree but not the old tree. This enables
5//! efficient push operations where only changed content is uploaded.
6//!
7//! # Key Optimization: Subtree Pruning
8//!
9//! When a subtree's root hash matches between old and new trees, the entire
10//! subtree is skipped - no need to traverse it since identical hash means
11//! identical content.
12
13use std::collections::HashSet;
14use std::sync::atomic::{AtomicUsize, Ordering};
15
16use crate::codec::{decode_tree_node, is_tree_node};
17use crate::crypto::decrypt_chk;
18use crate::hashtree::{HashTree, HashTreeError};
19use crate::store::Store;
20use crate::types::{Cid, Hash};
21
22/// Result of a tree diff operation
23#[derive(Debug, Clone)]
24pub struct TreeDiff {
25    /// Hashes present in new tree but not in old tree (need upload)
26    pub added: Vec<Hash>,
27    /// Statistics about the diff operation
28    pub stats: DiffStats,
29}
30
31impl TreeDiff {
32    /// Create an empty diff (identical trees)
33    pub fn empty() -> Self {
34        Self {
35            added: Vec::new(),
36            stats: DiffStats::default(),
37        }
38    }
39
40    /// Check if there are any changes
41    pub fn is_empty(&self) -> bool {
42        self.added.is_empty()
43    }
44
45    /// Number of hashes to upload
46    pub fn added_count(&self) -> usize {
47        self.added.len()
48    }
49}
50
51/// Statistics from the diff operation
52#[derive(Debug, Clone, Default)]
53pub struct DiffStats {
54    /// Number of nodes visited in old tree
55    pub old_tree_nodes: usize,
56    /// Number of nodes visited in new tree
57    pub new_tree_nodes: usize,
58    /// Number of subtrees skipped due to hash match
59    pub unchanged_subtrees: usize,
60}
61
62/// Collect all hashes in a tree using parallel traversal
63///
64/// This walks the tree and collects all unique hashes (both tree nodes and blobs).
65/// Uses parallel fetching for efficiency.
66///
67/// # Arguments
68/// * `tree` - The HashTree instance with store access
69/// * `root` - Root CID to start from
70/// * `concurrency` - Number of concurrent fetches
71///
72/// # Returns
73/// Set of all hashes in the tree
74pub async fn collect_hashes<S: Store>(
75    tree: &HashTree<S>,
76    root: &Cid,
77    concurrency: usize,
78) -> Result<HashSet<Hash>, HashTreeError> {
79    collect_hashes_with_progress(tree, root, concurrency, None).await
80}
81
82/// Collect hashes with optional progress tracking
83pub async fn collect_hashes_with_progress<S: Store>(
84    tree: &HashTree<S>,
85    root: &Cid,
86    concurrency: usize,
87    progress: Option<&AtomicUsize>,
88) -> Result<HashSet<Hash>, HashTreeError> {
89    use futures::stream::{FuturesUnordered, StreamExt};
90    use std::collections::VecDeque;
91
92    let store = tree.get_store();
93    let mut hashes = HashSet::new();
94    let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
95    let mut active = FuturesUnordered::new();
96
97    // Seed with root
98    pending.push_back((root.hash, root.key));
99
100    loop {
101        // Fill up to concurrency limit
102        while active.len() < concurrency {
103            if let Some((hash, key)) = pending.pop_front() {
104                // Skip if already visited
105                if hashes.contains(&hash) {
106                    continue;
107                }
108                hashes.insert(hash);
109
110                let store = store.clone();
111                let fut = async move {
112                    let data = store
113                        .get(&hash)
114                        .await
115                        .map_err(|e| HashTreeError::Store(e.to_string()))?;
116                    Ok::<_, HashTreeError>((hash, key, data))
117                };
118                active.push(fut);
119            } else {
120                break;
121            }
122        }
123
124        // If nothing active, we're done
125        if active.is_empty() {
126            break;
127        }
128
129        // Wait for any future to complete
130        if let Some(result) = active.next().await {
131            let (_hash, key, data) = result?;
132
133            if let Some(counter) = progress {
134                counter.fetch_add(1, Ordering::Relaxed);
135            }
136
137            let data = match data {
138                Some(d) => d,
139                None => continue,
140            };
141
142            // Decrypt if key present
143            let plaintext = if let Some(k) = key {
144                decrypt_chk(&data, &k).unwrap_or(data)
145            } else {
146                data
147            };
148
149            // If it's a tree node, queue children
150            if is_tree_node(&plaintext) {
151                if let Ok(node) = decode_tree_node(&plaintext) {
152                    for link in node.links {
153                        if !hashes.contains(&link.hash) {
154                            pending.push_back((link.hash, link.key));
155                        }
156                    }
157                }
158            }
159        }
160    }
161
162    Ok(hashes)
163}
164
165/// Compute diff between two trees
166///
167/// Returns hashes present in `new_root` but not in `old_root`.
168/// Uses subtree pruning: when a hash exists in old tree, skips entire subtree.
169///
170/// # Algorithm
171/// 1. Collect all hashes from old tree into a set
172/// 2. Walk new tree:
173///    - If hash in old set: skip (subtree unchanged)
174///    - If hash not in old set: add to result, traverse children
175///
176/// # Arguments
177/// * `tree` - HashTree instance
178/// * `old_root` - Root of the old tree (may be None for first push)
179/// * `new_root` - Root of the new tree
180/// * `concurrency` - Number of concurrent fetches
181///
182/// # Returns
183/// TreeDiff with added hashes and statistics
184pub async fn tree_diff<S: Store>(
185    tree: &HashTree<S>,
186    old_root: Option<&Cid>,
187    new_root: &Cid,
188    concurrency: usize,
189) -> Result<TreeDiff, HashTreeError> {
190    // No old tree = everything is new
191    let old_hashes = if let Some(old) = old_root {
192        collect_hashes(tree, old, concurrency).await?
193    } else {
194        HashSet::new()
195    };
196
197    tree_diff_with_old_hashes(tree, &old_hashes, new_root, concurrency).await
198}
199
200/// Compute diff given pre-computed old hashes
201///
202/// Use this when you already have the old tree's hash set (e.g., from a previous operation)
203pub async fn tree_diff_with_old_hashes<S: Store>(
204    tree: &HashTree<S>,
205    old_hashes: &HashSet<Hash>,
206    new_root: &Cid,
207    concurrency: usize,
208) -> Result<TreeDiff, HashTreeError> {
209    use futures::stream::{FuturesUnordered, StreamExt};
210    use std::collections::VecDeque;
211
212    let store = tree.get_store();
213    let mut added: Vec<Hash> = Vec::new();
214    let mut visited: HashSet<Hash> = HashSet::new();
215    let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
216    let mut active = FuturesUnordered::new();
217
218    let mut stats = DiffStats {
219        old_tree_nodes: old_hashes.len(),
220        new_tree_nodes: 0,
221        unchanged_subtrees: 0,
222    };
223
224    // Seed with new root
225    pending.push_back((new_root.hash, new_root.key));
226
227    loop {
228        // Fill up to concurrency limit
229        while active.len() < concurrency {
230            if let Some((hash, key)) = pending.pop_front() {
231                // Skip if already visited
232                if visited.contains(&hash) {
233                    continue;
234                }
235                visited.insert(hash);
236
237                // KEY OPTIMIZATION: If hash exists in old tree, skip entire subtree
238                if old_hashes.contains(&hash) {
239                    stats.unchanged_subtrees += 1;
240                    continue;
241                }
242
243                // Hash is new - will need to upload
244                added.push(hash);
245                stats.new_tree_nodes += 1;
246
247                // Fetch to check for children
248                let store = store.clone();
249                let fut = async move {
250                    let data = store
251                        .get(&hash)
252                        .await
253                        .map_err(|e| HashTreeError::Store(e.to_string()))?;
254                    Ok::<_, HashTreeError>((hash, key, data))
255                };
256                active.push(fut);
257            } else {
258                break;
259            }
260        }
261
262        // If nothing active, we're done
263        if active.is_empty() {
264            break;
265        }
266
267        // Wait for any future to complete
268        if let Some(result) = active.next().await {
269            let (_hash, key, data) = result?;
270
271            let data = match data {
272                Some(d) => d,
273                None => continue,
274            };
275
276            // Decrypt if key present
277            let plaintext = if let Some(k) = key {
278                decrypt_chk(&data, &k).unwrap_or(data)
279            } else {
280                data
281            };
282
283            // If it's a tree node, queue children
284            if is_tree_node(&plaintext) {
285                if let Ok(node) = decode_tree_node(&plaintext) {
286                    for link in node.links {
287                        if !visited.contains(&link.hash) {
288                            pending.push_back((link.hash, link.key));
289                        }
290                    }
291                }
292            }
293        }
294    }
295
296    Ok(TreeDiff { added, stats })
297}
298
299/// Streaming diff - yields hashes as they're discovered
300///
301/// Memory efficient for very large trees. Yields hashes that need upload
302/// one at a time instead of collecting into a Vec.
303pub async fn tree_diff_streaming<S, F>(
304    tree: &HashTree<S>,
305    old_hashes: &HashSet<Hash>,
306    new_root: &Cid,
307    concurrency: usize,
308    mut callback: F,
309) -> Result<DiffStats, HashTreeError>
310where
311    S: Store,
312    F: FnMut(Hash) -> bool, // return false to stop early
313{
314    use futures::stream::{FuturesUnordered, StreamExt};
315    use std::collections::VecDeque;
316
317    let store = tree.get_store();
318    let mut visited: HashSet<Hash> = HashSet::new();
319    let mut pending: VecDeque<(Hash, Option<[u8; 32]>)> = VecDeque::new();
320    let mut active = FuturesUnordered::new();
321
322    let mut stats = DiffStats {
323        old_tree_nodes: old_hashes.len(),
324        new_tree_nodes: 0,
325        unchanged_subtrees: 0,
326    };
327
328    pending.push_back((new_root.hash, new_root.key));
329
330    loop {
331        while active.len() < concurrency {
332            if let Some((hash, key)) = pending.pop_front() {
333                if visited.contains(&hash) {
334                    continue;
335                }
336                visited.insert(hash);
337
338                if old_hashes.contains(&hash) {
339                    stats.unchanged_subtrees += 1;
340                    continue;
341                }
342
343                stats.new_tree_nodes += 1;
344
345                // Yield this hash via callback
346                if !callback(hash) {
347                    // Early termination requested
348                    return Ok(stats);
349                }
350
351                let store = store.clone();
352                let fut = async move {
353                    let data = store
354                        .get(&hash)
355                        .await
356                        .map_err(|e| HashTreeError::Store(e.to_string()))?;
357                    Ok::<_, HashTreeError>((hash, key, data))
358                };
359                active.push(fut);
360            } else {
361                break;
362            }
363        }
364
365        if active.is_empty() {
366            break;
367        }
368
369        if let Some(result) = active.next().await {
370            let (_hash, key, data) = result?;
371
372            let data = match data {
373                Some(d) => d,
374                None => continue,
375            };
376
377            let plaintext = if let Some(k) = key {
378                decrypt_chk(&data, &k).unwrap_or(data)
379            } else {
380                data
381            };
382
383            if is_tree_node(&plaintext) {
384                if let Ok(node) = decode_tree_node(&plaintext) {
385                    for link in node.links {
386                        if !visited.contains(&link.hash) {
387                            pending.push_back((link.hash, link.key));
388                        }
389                    }
390                }
391            }
392        }
393    }
394
395    Ok(stats)
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use crate::store::MemoryStore;
402    use crate::types::{DirEntry, LinkType};
403    use crate::HashTreeConfig;
404    use std::sync::Arc;
405
406    fn make_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
407        let store = Arc::new(MemoryStore::new());
408        let tree = HashTree::new(HashTreeConfig::new(store.clone()).public());
409        (store, tree)
410    }
411
412    fn make_encrypted_tree() -> (Arc<MemoryStore>, HashTree<MemoryStore>) {
413        let store = Arc::new(MemoryStore::new());
414        let tree = HashTree::new(HashTreeConfig::new(store.clone()));
415        (store, tree)
416    }
417
418    #[tokio::test]
419    async fn test_diff_identical_trees() {
420        let (_store, tree) = make_tree();
421
422        // Create a simple tree
423        let file1 = tree.put_blob(b"content1").await.unwrap();
424        let file2 = tree.put_blob(b"content2").await.unwrap();
425        let dir_cid = tree
426            .put_directory(vec![
427                DirEntry::new("a.txt", file1).with_size(8),
428                DirEntry::new("b.txt", file2).with_size(8),
429            ])
430            .await
431            .unwrap();
432
433        // Diff tree against itself should be empty
434        let diff = tree_diff(&tree, Some(&dir_cid), &dir_cid, 4).await.unwrap();
435
436        assert!(diff.is_empty(), "identical trees should have empty diff");
437        assert_eq!(diff.added_count(), 0);
438    }
439
440    #[tokio::test]
441    async fn test_diff_single_file_change() {
442        let (_store, tree) = make_tree();
443
444        // Create old tree
445        let file1 = tree.put_blob(b"content1").await.unwrap();
446        let file2 = tree.put_blob(b"content2").await.unwrap();
447        let old_dir = tree
448            .put_directory(vec![
449                DirEntry::new("a.txt", file1).with_size(8),
450                DirEntry::new("b.txt", file2).with_size(8),
451            ])
452            .await
453            .unwrap();
454
455        // Create new tree with one file changed
456        let file1_new = tree.put_blob(b"content1-modified").await.unwrap();
457        let new_dir = tree
458            .put_directory(vec![
459                DirEntry::new("a.txt", file1_new).with_size(17),
460                DirEntry::new("b.txt", file2).with_size(8), // unchanged
461            ])
462            .await
463            .unwrap();
464
465        let diff = tree_diff(&tree, Some(&old_dir), &new_dir, 4).await.unwrap();
466
467        // Should have: new root dir + new file blob
468        assert!(!diff.is_empty());
469        assert_eq!(diff.added_count(), 2); // new dir node + new file
470        assert!(diff.added.contains(&file1_new));
471        assert!(diff.added.contains(&new_dir.hash));
472        assert!(!diff.added.contains(&file2)); // file2 unchanged
473    }
474
475    #[tokio::test]
476    async fn test_diff_subtree_unchanged() {
477        let (_store, tree) = make_tree();
478
479        // Create subdirectory
480        let sub_file = tree.put_blob(b"sub content").await.unwrap();
481        let subdir = tree
482            .put_directory(vec![DirEntry::new("sub.txt", sub_file).with_size(11)])
483            .await
484            .unwrap();
485
486        // Old tree
487        let file1 = tree.put_blob(b"root file").await.unwrap();
488        let old_root = tree
489            .put_directory(vec![
490                DirEntry::new("file.txt", file1).with_size(9),
491                DirEntry::new("subdir", subdir.hash).with_size(0).with_link_type(LinkType::Dir),
492            ])
493            .await
494            .unwrap();
495
496        // New tree - change root file, subdir unchanged
497        let file1_new = tree.put_blob(b"root file changed").await.unwrap();
498        let new_root = tree
499            .put_directory(vec![
500                DirEntry::new("file.txt", file1_new).with_size(17),
501                DirEntry::new("subdir", subdir.hash).with_size(0).with_link_type(LinkType::Dir),
502            ])
503            .await
504            .unwrap();
505
506        let diff = tree_diff(&tree, Some(&old_root), &new_root, 4).await.unwrap();
507
508        // Should NOT include subdir or its contents
509        assert!(diff.added.contains(&new_root.hash));
510        assert!(diff.added.contains(&file1_new));
511        assert!(!diff.added.contains(&subdir.hash)); // subdir unchanged
512        assert!(!diff.added.contains(&sub_file)); // subdir content unchanged
513
514        // Stats should show subtree was skipped
515        assert!(diff.stats.unchanged_subtrees > 0);
516    }
517
518    #[tokio::test]
519    async fn test_diff_new_directory() {
520        let (_store, tree) = make_tree();
521
522        // Old tree - simple
523        let file1 = tree.put_blob(b"file1").await.unwrap();
524        let old_root = tree
525            .put_directory(vec![DirEntry::new("a.txt", file1).with_size(5)])
526            .await
527            .unwrap();
528
529        // New tree - add directory
530        let new_file = tree.put_blob(b"new file").await.unwrap();
531        let new_dir = tree
532            .put_directory(vec![DirEntry::new("inner.txt", new_file).with_size(8)])
533            .await
534            .unwrap();
535        let new_root = tree
536            .put_directory(vec![
537                DirEntry::new("a.txt", file1).with_size(5),
538                DirEntry::new("newdir", new_dir.hash).with_size(0).with_link_type(LinkType::Dir),
539            ])
540            .await
541            .unwrap();
542
543        let diff = tree_diff(&tree, Some(&old_root), &new_root, 4).await.unwrap();
544
545        // Should include new root, new dir, and new file
546        assert!(diff.added.contains(&new_root.hash));
547        assert!(diff.added.contains(&new_dir.hash));
548        assert!(diff.added.contains(&new_file));
549        // Original file should NOT be in diff
550        assert!(!diff.added.contains(&file1));
551    }
552
553    #[tokio::test]
554    async fn test_diff_empty_old_tree() {
555        let (_store, tree) = make_tree();
556
557        // New tree
558        let file1 = tree.put_blob(b"content").await.unwrap();
559        let new_root = tree
560            .put_directory(vec![DirEntry::new("file.txt", file1).with_size(7)])
561            .await
562            .unwrap();
563
564        // No old tree (first push)
565        let diff = tree_diff(&tree, None, &new_root, 4).await.unwrap();
566
567        // Everything should be new
568        assert_eq!(diff.added_count(), 2); // root + file
569        assert!(diff.added.contains(&new_root.hash));
570        assert!(diff.added.contains(&file1));
571    }
572
573    #[tokio::test]
574    async fn test_diff_encrypted_trees() {
575        let (_store, tree) = make_encrypted_tree();
576
577        // Create old tree (encrypted)
578        let (file1_cid, _) = tree.put(b"content1").await.unwrap();
579        let (file2_cid, _) = tree.put(b"content2").await.unwrap();
580        let old_dir = tree
581            .put_directory(vec![
582                DirEntry::from_cid("a.txt", &file1_cid).with_size(8),
583                DirEntry::from_cid("b.txt", &file2_cid).with_size(8),
584            ])
585            .await
586            .unwrap();
587
588        // Create new tree with one file changed
589        let (file1_new_cid, _) = tree.put(b"content1-modified").await.unwrap();
590        let new_dir = tree
591            .put_directory(vec![
592                DirEntry::from_cid("a.txt", &file1_new_cid).with_size(17),
593                DirEntry::from_cid("b.txt", &file2_cid).with_size(8),
594            ])
595            .await
596            .unwrap();
597
598        let diff = tree_diff(&tree, Some(&old_dir), &new_dir, 4).await.unwrap();
599
600        // Should work with encrypted content
601        assert!(!diff.is_empty());
602        assert!(diff.added.contains(&file1_new_cid.hash));
603        assert!(!diff.added.contains(&file2_cid.hash)); // unchanged
604    }
605
606    #[tokio::test]
607    async fn test_collect_hashes() {
608        let (_store, tree) = make_tree();
609
610        let file1 = tree.put_blob(b"content1").await.unwrap();
611        let file2 = tree.put_blob(b"content2").await.unwrap();
612        let dir_cid = tree
613            .put_directory(vec![
614                DirEntry::new("a.txt", file1).with_size(8),
615                DirEntry::new("b.txt", file2).with_size(8),
616            ])
617            .await
618            .unwrap();
619
620        let hashes = collect_hashes(&tree, &dir_cid, 4).await.unwrap();
621
622        assert_eq!(hashes.len(), 3); // dir + 2 files
623        assert!(hashes.contains(&dir_cid.hash));
624        assert!(hashes.contains(&file1));
625        assert!(hashes.contains(&file2));
626    }
627
628    #[tokio::test]
629    async fn test_diff_streaming() {
630        let (_store, tree) = make_tree();
631
632        let file1 = tree.put_blob(b"old").await.unwrap();
633        let old_root = tree
634            .put_directory(vec![DirEntry::new("a.txt", file1).with_size(3)])
635            .await
636            .unwrap();
637
638        let file2 = tree.put_blob(b"new").await.unwrap();
639        let new_root = tree
640            .put_directory(vec![DirEntry::new("a.txt", file2).with_size(3)])
641            .await
642            .unwrap();
643
644        let old_hashes = collect_hashes(&tree, &old_root, 4).await.unwrap();
645
646        let mut streamed: Vec<Hash> = Vec::new();
647        let stats = tree_diff_streaming(&tree, &old_hashes, &new_root, 4, |hash| {
648            streamed.push(hash);
649            true // continue
650        })
651        .await
652        .unwrap();
653
654        assert_eq!(streamed.len(), 2); // new root + new file
655        assert!(streamed.contains(&new_root.hash));
656        assert!(streamed.contains(&file2));
657        assert_eq!(stats.new_tree_nodes, 2);
658    }
659
660    #[tokio::test]
661    async fn test_diff_streaming_early_stop() {
662        let (_store, tree) = make_tree();
663
664        let file1 = tree.put_blob(b"f1").await.unwrap();
665        let file2 = tree.put_blob(b"f2").await.unwrap();
666        let file3 = tree.put_blob(b"f3").await.unwrap();
667        let new_root = tree
668            .put_directory(vec![
669                DirEntry::new("a.txt", file1).with_size(2),
670                DirEntry::new("b.txt", file2).with_size(2),
671                DirEntry::new("c.txt", file3).with_size(2),
672            ])
673            .await
674            .unwrap();
675
676        let old_hashes = HashSet::new(); // empty = all new
677
678        let mut count = 0;
679        let _stats = tree_diff_streaming(&tree, &old_hashes, &new_root, 1, |_hash| {
680            count += 1;
681            count < 2 // stop after 2
682        })
683        .await
684        .unwrap();
685
686        assert!(count <= 2, "should have stopped early");
687    }
688
689    #[tokio::test]
690    async fn test_diff_large_tree_structure() {
691        let (_store, tree) = make_tree();
692
693        // Create a tree with many files
694        let mut entries = Vec::new();
695        let mut old_hashes_vec = Vec::new();
696
697        for i in 0..100 {
698            let data = format!("content {}", i);
699            let hash = tree.put_blob(data.as_bytes()).await.unwrap();
700            entries.push(DirEntry::new(format!("file{}.txt", i), hash).with_size(data.len() as u64));
701            old_hashes_vec.push(hash);
702        }
703
704        let old_root = tree.put_directory(entries.clone()).await.unwrap();
705        old_hashes_vec.push(old_root.hash);
706
707        // Modify 5 files
708        for i in 0..5 {
709            let data = format!("modified content {}", i);
710            let hash = tree.put_blob(data.as_bytes()).await.unwrap();
711            entries[i] = DirEntry::new(format!("file{}.txt", i), hash).with_size(data.len() as u64);
712        }
713
714        let new_root = tree.put_directory(entries).await.unwrap();
715
716        let diff = tree_diff(&tree, Some(&old_root), &new_root, 8).await.unwrap();
717
718        // Should have 6 new items: new root + 5 modified files
719        assert_eq!(diff.added_count(), 6);
720        assert!(diff.added.contains(&new_root.hash));
721
722        // Should have skipped ~95 unchanged files
723        assert!(diff.stats.unchanged_subtrees >= 95);
724    }
725}