Skip to main content

hashtree_index/
lib.rs

1//! Content-addressed B-tree indexes backed by hashtree.
2
3mod search;
4
5use std::cmp::Ordering;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9
10use hashtree_core::{
11    Cid, DirEntry, HashTree, HashTreeConfig, HashTreeError, LinkType, Store, TreeEntry,
12};
13pub use search::{
14    SearchError, SearchIndex, SearchIndexOptions, SearchLinkResult, SearchOptions, SearchResult,
15};
16
17const DEFAULT_ORDER: usize = 32;
18
19#[derive(Debug, Clone, Default)]
20pub struct BTreeOptions {
21    pub order: Option<usize>,
22}
23
24#[derive(Debug, thiserror::Error)]
25pub enum BTreeError {
26    #[error("hash tree error: {0}")]
27    HashTree(#[from] HashTreeError),
28    #[error("value was not valid utf-8: {0}")]
29    Utf8(#[from] std::string::FromUtf8Error),
30}
31
32#[derive(Debug, Clone)]
33struct SplitResult {
34    left: Cid,
35    right: Cid,
36    left_first_key: String,
37    right_first_key: String,
38    left_count: u64,
39    right_count: u64,
40}
41
42#[derive(Debug, Clone)]
43enum InsertValue {
44    String(String),
45    Link(Cid),
46}
47
48type BTreeFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, BTreeError>> + 'a>>;
49
50pub struct BTree<S: Store> {
51    tree: HashTree<S>,
52    max_keys: usize,
53}
54
55#[derive(Debug, Clone)]
56struct BuiltNode {
57    first_key: String,
58    cid: Cid,
59    count: u64,
60}
61
62impl<S: Store> BTree<S> {
63    pub fn new(store: Arc<S>, options: BTreeOptions) -> Self {
64        let order = options.order.unwrap_or(DEFAULT_ORDER).max(2);
65        Self {
66            tree: HashTree::new(HashTreeConfig::new(store)),
67            max_keys: order - 1,
68        }
69    }
70
71    pub async fn insert(
72        &self,
73        root: Option<&Cid>,
74        key: &str,
75        value: &str,
76    ) -> Result<Cid, BTreeError> {
77        if let Some(root) = root {
78            if self.get(Some(root), key).await?.as_deref() == Some(value) {
79                return Ok(root.clone());
80            }
81
82            let result = self
83                .insert_recursive(
84                    root.clone(),
85                    key.to_string(),
86                    InsertValue::String(value.to_string()),
87                )
88                .await?;
89            return self.finish_insert(result).await;
90        }
91
92        self.create_leaf(&[(key.to_string(), value.to_string())])
93            .await
94    }
95
96    pub async fn get(&self, root: Option<&Cid>, key: &str) -> Result<Option<String>, BTreeError> {
97        let Some(root) = root else {
98            return Ok(None);
99        };
100        self.get_recursive(root.clone(), key.to_string()).await
101    }
102
103    pub async fn insert_link(
104        &self,
105        root: Option<&Cid>,
106        key: &str,
107        target_cid: &Cid,
108    ) -> Result<Cid, BTreeError> {
109        if let Some(root) = root {
110            if self
111                .get_link(Some(root), key)
112                .await?
113                .is_some_and(|existing| cid_equals(&existing, target_cid))
114            {
115                return Ok(root.clone());
116            }
117
118            let result = self
119                .insert_recursive(
120                    root.clone(),
121                    key.to_string(),
122                    InsertValue::Link(target_cid.clone()),
123                )
124                .await?;
125            return self.finish_insert(result).await;
126        }
127
128        self.create_leaf_with_links(&[(key.to_string(), target_cid.clone())])
129            .await
130    }
131
132    pub async fn insert_link_unchecked(
133        &self,
134        root: Option<&Cid>,
135        key: &str,
136        target_cid: &Cid,
137    ) -> Result<Cid, BTreeError> {
138        if let Some(root) = root {
139            let result = self
140                .insert_recursive(
141                    root.clone(),
142                    key.to_string(),
143                    InsertValue::Link(target_cid.clone()),
144                )
145                .await?;
146            return self.finish_insert(result).await;
147        }
148
149        self.create_leaf_with_links(&[(key.to_string(), target_cid.clone())])
150            .await
151    }
152
153    pub async fn get_link(&self, root: Option<&Cid>, key: &str) -> Result<Option<Cid>, BTreeError> {
154        let Some(root) = root else {
155            return Ok(None);
156        };
157        self.get_link_recursive(root.clone(), key.to_string()).await
158    }
159
160    pub async fn entries(&self, root: Option<&Cid>) -> Result<Vec<(String, String)>, BTreeError> {
161        let Some(root) = root else {
162            return Ok(Vec::new());
163        };
164        self.traverse_in_order(root.clone()).await
165    }
166
167    pub async fn links_entries(
168        &self,
169        root: Option<&Cid>,
170    ) -> Result<Vec<(String, Cid)>, BTreeError> {
171        let Some(root) = root else {
172            return Ok(Vec::new());
173        };
174        self.traverse_links_in_order(root.clone()).await
175    }
176
177    pub async fn links_entries_limited(
178        &self,
179        root: Option<&Cid>,
180        limit: usize,
181    ) -> Result<Vec<(String, Cid)>, BTreeError> {
182        if limit == 0 {
183            return Ok(Vec::new());
184        }
185        let Some(root) = root else {
186            return Ok(Vec::new());
187        };
188        self.range_link_traverse_limited(root.clone(), None, None, limit)
189            .await
190    }
191
192    /// Count CID links by walking the tree.
193    ///
194    /// Uses stored subtree sizes when available, but scans descendants when
195    /// older roots do not carry complete counts.
196    pub async fn count_links(&self, root: Option<&Cid>) -> Result<u64, BTreeError> {
197        self.scan_links(root).await
198    }
199
200    /// Count CID links by explicitly walking the tree.
201    pub async fn scan_links(&self, root: Option<&Cid>) -> Result<u64, BTreeError> {
202        let Some(root) = root else {
203            return Ok(0);
204        };
205        self.count_links_recursive(root.clone()).await
206    }
207
208    /// Read the stored CID-link count from the root node without scanning.
209    ///
210    /// Returns `Ok(None)` when the root was built by older code that does not
211    /// store complete subtree sizes.
212    pub async fn count_stored_links(&self, root: Option<&Cid>) -> Result<Option<u64>, BTreeError> {
213        let Some(root) = root else {
214            return Ok(Some(0));
215        };
216
217        let entries = self.tree.list_directory(root).await?;
218        if is_leaf_node(&entries) {
219            return Ok(Some(count_link_entries(&entries)));
220        }
221
222        let mut count = 0;
223        for entry in &entries {
224            let Some(child_count) = stored_link_subtree_count(entry) else {
225                return Ok(None);
226            };
227            count += child_count;
228        }
229        Ok(Some(count))
230    }
231
232    pub async fn range(
233        &self,
234        root: &Cid,
235        start: Option<&str>,
236        end: Option<&str>,
237    ) -> Result<Vec<(String, String)>, BTreeError> {
238        self.range_traverse(
239            root.clone(),
240            start.map(ToOwned::to_owned),
241            end.map(ToOwned::to_owned),
242        )
243        .await
244    }
245
246    pub async fn prefix(
247        &self,
248        root: &Cid,
249        prefix: &str,
250    ) -> Result<Vec<(String, String)>, BTreeError> {
251        let end = increment_prefix(prefix);
252        self.range(root, Some(prefix), end.as_deref()).await
253    }
254
255    pub async fn prefix_links(
256        &self,
257        root: &Cid,
258        prefix: &str,
259    ) -> Result<Vec<(String, Cid)>, BTreeError> {
260        let end = increment_prefix(prefix);
261        self.range_link_traverse(root.clone(), Some(prefix.to_string()), end)
262            .await
263    }
264
265    pub async fn prefix_links_limited(
266        &self,
267        root: &Cid,
268        prefix: &str,
269        limit: usize,
270    ) -> Result<Vec<(String, Cid)>, BTreeError> {
271        if limit == 0 {
272            return Ok(Vec::new());
273        }
274        let end = increment_prefix(prefix);
275        self.range_link_traverse_limited(root.clone(), Some(prefix.to_string()), end, limit)
276            .await
277    }
278
279    pub async fn delete(&self, root: &Cid, key: &str) -> Result<Option<Cid>, BTreeError> {
280        self.delete_recursive(root.clone(), key.to_string()).await
281    }
282
283    pub async fn merge(
284        &self,
285        base: Option<&Cid>,
286        other: Option<&Cid>,
287        prefer_other: bool,
288    ) -> Result<Option<Cid>, BTreeError> {
289        let Some(other) = other else {
290            return Ok(base.cloned());
291        };
292        let Some(mut result) = base.cloned().or_else(|| Some(other.clone())) else {
293            return Ok(None);
294        };
295        if base.is_none() {
296            return Ok(Some(result));
297        }
298
299        for (key, value) in self.entries(Some(other)).await? {
300            let existing = self.get(Some(&result), &key).await?;
301            if existing.is_none() || prefer_other {
302                result = self.insert(Some(&result), &key, &value).await?;
303            }
304        }
305
306        Ok(Some(result))
307    }
308
309    pub async fn merge_links(
310        &self,
311        base: Option<&Cid>,
312        other: Option<&Cid>,
313        prefer_other: bool,
314    ) -> Result<Option<Cid>, BTreeError> {
315        let Some(other) = other else {
316            return Ok(base.cloned());
317        };
318        let Some(mut result) = base.cloned().or_else(|| Some(other.clone())) else {
319            return Ok(None);
320        };
321        if base.is_none() {
322            return Ok(Some(result));
323        }
324
325        for (key, value) in self.links_entries(Some(other)).await? {
326            let existing = self.get_link(Some(&result), &key).await?;
327            if existing.is_none() || prefer_other {
328                result = self.insert_link(Some(&result), &key, &value).await?;
329            }
330        }
331
332        Ok(Some(result))
333    }
334
335    pub async fn build<I>(&self, items: I) -> Result<Option<Cid>, BTreeError>
336    where
337        I: IntoIterator<Item = (String, String)>,
338    {
339        let mut sorted: Vec<(String, String)> = items.into_iter().collect();
340        if sorted.is_empty() {
341            return Ok(None);
342        }
343
344        sorted.sort_by(|left, right| left.0.cmp(&right.0));
345
346        let mut deduped = Vec::with_capacity(sorted.len());
347        for (key, value) in sorted {
348            if let Some((last_key, last_value)) = deduped.last_mut() {
349                if *last_key == key {
350                    *last_value = value;
351                    continue;
352                }
353            }
354            deduped.push((key, value));
355        }
356
357        let mut level = Vec::with_capacity(deduped.len().div_ceil(self.max_keys));
358        for chunk in deduped.chunks(self.max_keys) {
359            let cid = self.create_leaf(chunk).await?;
360            level.push(BuiltNode {
361                first_key: chunk[0].0.clone(),
362                cid,
363                count: 0,
364            });
365        }
366
367        while level.len() > 1 {
368            let mut next_level = Vec::with_capacity(level.len().div_ceil(self.max_keys));
369            for chunk in level.chunks(self.max_keys) {
370                let cid = self.create_internal_node(chunk).await?;
371                next_level.push(BuiltNode {
372                    first_key: chunk[0].first_key.clone(),
373                    cid,
374                    count: 0,
375                });
376            }
377            level = next_level;
378        }
379
380        Ok(level.pop().map(|node| node.cid))
381    }
382
383    pub async fn build_links<I>(&self, items: I) -> Result<Option<Cid>, BTreeError>
384    where
385        I: IntoIterator<Item = (String, Cid)>,
386    {
387        let mut sorted: Vec<(String, Cid)> = items.into_iter().collect();
388        if sorted.is_empty() {
389            return Ok(None);
390        }
391
392        sorted.sort_by(|left, right| left.0.cmp(&right.0));
393
394        let mut deduped = Vec::with_capacity(sorted.len());
395        for (key, cid) in sorted {
396            if let Some((last_key, last_cid)) = deduped.last_mut() {
397                if *last_key == key {
398                    *last_cid = cid;
399                    continue;
400                }
401            }
402            deduped.push((key, cid));
403        }
404
405        let mut level = Vec::with_capacity(deduped.len().div_ceil(self.max_keys));
406        for chunk in deduped.chunks(self.max_keys) {
407            let cid = self.create_leaf_with_links(chunk).await?;
408            level.push(BuiltNode {
409                first_key: chunk[0].0.clone(),
410                cid,
411                count: chunk.len() as u64,
412            });
413        }
414
415        while level.len() > 1 {
416            let mut next_level = Vec::with_capacity(level.len().div_ceil(self.max_keys));
417            for chunk in level.chunks(self.max_keys) {
418                let cid = self.create_internal_node(chunk).await?;
419                next_level.push(BuiltNode {
420                    first_key: chunk[0].first_key.clone(),
421                    cid,
422                    count: chunk.iter().map(|child| child.count).sum(),
423                });
424            }
425            level = next_level;
426        }
427
428        Ok(level.pop().map(|node| node.cid))
429    }
430
431    async fn finish_insert(&self, result: InsertResult) -> Result<Cid, BTreeError> {
432        if let Some(split) = result.split {
433            return self
434                .create_internal_root(
435                    &split.left_first_key,
436                    &split.left,
437                    split.left_count,
438                    &split.right_first_key,
439                    &split.right,
440                    split.right_count,
441                )
442                .await;
443        }
444        Ok(result.cid)
445    }
446
447    fn get_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<String>> {
448        Box::pin(async move {
449            let entries = self.tree.list_directory(&root).await?;
450            if is_leaf_node(&entries) {
451                let escaped = escape_key(&key);
452                let Some(entry) = entries.iter().find(|entry| entry.name == escaped) else {
453                    return Ok(None);
454                };
455                if entry.link_type != LinkType::Blob {
456                    return Ok(None);
457                }
458
459                let cid = entry_cid(entry);
460                let Some(data) = self.tree.get(&cid, None).await? else {
461                    return Ok(None);
462                };
463                return Ok(Some(String::from_utf8(data)?));
464            }
465
466            let child = find_child(&entries, &key);
467            self.get_recursive(entry_cid(&child), key).await
468        })
469    }
470
471    fn get_link_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<Cid>> {
472        Box::pin(async move {
473            let entries = self.tree.list_directory(&root).await?;
474            if is_leaf_node(&entries) {
475                let escaped = escape_key(&key);
476                let Some(entry) = entries.iter().find(|entry| entry.name == escaped) else {
477                    return Ok(None);
478                };
479                if entry.link_type != LinkType::File {
480                    return Ok(None);
481                }
482                return Ok(Some(entry_cid(entry)));
483            }
484
485            let child = find_child(&entries, &key);
486            self.get_link_recursive(entry_cid(&child), key).await
487        })
488    }
489
490    fn insert_recursive<'a>(
491        &'a self,
492        node: Cid,
493        key: String,
494        value: InsertValue,
495    ) -> BTreeFuture<'a, InsertResult> {
496        Box::pin(async move {
497            let entries = self.tree.list_directory(&node).await?;
498            if is_leaf_node(&entries) {
499                return self.insert_into_leaf(node, entries, key, value).await;
500            }
501            self.insert_into_internal(node, entries, key, value).await
502        })
503    }
504
505    fn insert_into_leaf<'a>(
506        &'a self,
507        node: Cid,
508        _entries: Vec<TreeEntry>,
509        key: String,
510        value: InsertValue,
511    ) -> BTreeFuture<'a, InsertResult> {
512        Box::pin(async move {
513            let escaped_key = escape_key(&key);
514            let (entry_cid, size, link_type) = match value {
515                InsertValue::String(value) => {
516                    let (cid, size) = self.tree.put_file(value.as_bytes()).await?;
517                    (cid, size, LinkType::Blob)
518                }
519                InsertValue::Link(cid) => (cid, 0, LinkType::File),
520            };
521
522            let new_node = self
523                .tree
524                .set_entry(&node, &[], &escaped_key, &entry_cid, size, link_type)
525                .await?;
526
527            let new_entries = self.tree.list_directory(&new_node).await?;
528            if new_entries.len() > self.max_keys {
529                return Ok(InsertResult {
530                    cid: new_node,
531                    count: count_link_entries_or_subtrees(self, &new_entries).await?,
532                    split: Some(self.split_leaf(new_entries).await?),
533                });
534            }
535
536            Ok(InsertResult {
537                cid: new_node,
538                count: count_link_entries_or_subtrees(self, &new_entries).await?,
539                split: None,
540            })
541        })
542    }
543
544    fn insert_into_internal<'a>(
545        &'a self,
546        node: Cid,
547        entries: Vec<TreeEntry>,
548        key: String,
549        value: InsertValue,
550    ) -> BTreeFuture<'a, InsertResult> {
551        Box::pin(async move {
552            let child = find_child(&entries, &key);
553            let child_name = child.name.clone();
554            let child_cid = entry_cid(&child);
555            let result = self.insert_recursive(child_cid, key, value).await?;
556
557            let mut new_node = self
558                .tree
559                .set_entry(
560                    &node,
561                    &[],
562                    &child_name,
563                    &result.cid,
564                    result.count,
565                    LinkType::Dir,
566                )
567                .await?;
568
569            if let Some(split) = result.split {
570                new_node = self.tree.remove_entry(&new_node, &[], &child_name).await?;
571                new_node = self
572                    .tree
573                    .set_entry(
574                        &new_node,
575                        &[],
576                        &escape_key(&split.left_first_key),
577                        &split.left,
578                        split.left_count,
579                        LinkType::Dir,
580                    )
581                    .await?;
582                new_node = self
583                    .tree
584                    .set_entry(
585                        &new_node,
586                        &[],
587                        &escape_key(&split.right_first_key),
588                        &split.right,
589                        split.right_count,
590                        LinkType::Dir,
591                    )
592                    .await?;
593            }
594
595            let new_entries = self.tree.list_directory(&new_node).await?;
596            if new_entries.len() > self.max_keys {
597                return Ok(InsertResult {
598                    cid: new_node,
599                    count: count_link_entries_or_subtrees(self, &new_entries).await?,
600                    split: Some(self.split_internal(new_entries).await?),
601                });
602            }
603
604            Ok(InsertResult {
605                cid: new_node,
606                count: count_link_entries_or_subtrees(self, &new_entries).await?,
607                split: None,
608            })
609        })
610    }
611
612    async fn split_leaf(&self, entries: Vec<TreeEntry>) -> Result<SplitResult, BTreeError> {
613        let sorted = sort_entries(entries);
614        let mid = sorted.len() / 2;
615        let left_entries = &sorted[..mid];
616        let right_entries = &sorted[mid..];
617
618        let left = self.create_node_from_entries(left_entries).await?;
619        let right = self.create_node_from_entries(right_entries).await?;
620
621        Ok(SplitResult {
622            left,
623            right,
624            left_first_key: unescape_key(&left_entries[0].name),
625            right_first_key: unescape_key(&right_entries[0].name),
626            left_count: count_link_entries(left_entries),
627            right_count: count_link_entries(right_entries),
628        })
629    }
630
631    async fn split_internal(&self, entries: Vec<TreeEntry>) -> Result<SplitResult, BTreeError> {
632        let sorted = sort_entries(entries);
633        let mid = sorted.len() / 2;
634        let left_entries = &sorted[..mid];
635        let right_entries = &sorted[mid..];
636
637        let left = self.create_node_from_entries(left_entries).await?;
638        let right = self.create_node_from_entries(right_entries).await?;
639
640        Ok(SplitResult {
641            left,
642            right,
643            left_first_key: unescape_key(&left_entries[0].name),
644            right_first_key: unescape_key(&right_entries[0].name),
645            left_count: count_link_entries_or_subtrees(self, left_entries).await?,
646            right_count: count_link_entries_or_subtrees(self, right_entries).await?,
647        })
648    }
649
650    async fn create_leaf(&self, items: &[(String, String)]) -> Result<Cid, BTreeError> {
651        let mut entries = Vec::with_capacity(items.len());
652        for (key, value) in items {
653            let (cid, size) = self.tree.put_file(value.as_bytes()).await?;
654            entries.push(
655                DirEntry::from_cid(escape_key(key), &cid)
656                    .with_size(size)
657                    .with_link_type(LinkType::Blob),
658            );
659        }
660        Ok(self.tree.put_directory(entries).await?)
661    }
662
663    async fn create_leaf_with_links(&self, items: &[(String, Cid)]) -> Result<Cid, BTreeError> {
664        let entries: Vec<DirEntry> = items
665            .iter()
666            .map(|(key, cid)| {
667                DirEntry::from_cid(escape_key(key), cid).with_link_type(LinkType::File)
668            })
669            .collect();
670        Ok(self.tree.put_directory(entries).await?)
671    }
672
673    async fn create_internal_node(&self, children: &[BuiltNode]) -> Result<Cid, BTreeError> {
674        let entries: Vec<DirEntry> = children
675            .iter()
676            .map(|child| {
677                DirEntry::from_cid(escape_key(&child.first_key), &child.cid)
678                    .with_size(child.count)
679                    .with_link_type(LinkType::Dir)
680            })
681            .collect();
682        Ok(self.tree.put_directory(entries).await?)
683    }
684
685    async fn create_internal_root(
686        &self,
687        left_key: &str,
688        left: &Cid,
689        left_count: u64,
690        right_key: &str,
691        right: &Cid,
692        right_count: u64,
693    ) -> Result<Cid, BTreeError> {
694        let entries = vec![
695            DirEntry::from_cid(escape_key(left_key), left)
696                .with_size(left_count)
697                .with_link_type(LinkType::Dir),
698            DirEntry::from_cid(escape_key(right_key), right)
699                .with_size(right_count)
700                .with_link_type(LinkType::Dir),
701        ];
702        Ok(self.tree.put_directory(entries).await?)
703    }
704
705    async fn create_node_from_entries(&self, entries: &[TreeEntry]) -> Result<Cid, BTreeError> {
706        let dir_entries = entries
707            .iter()
708            .cloned()
709            .map(tree_entry_to_dir_entry)
710            .collect::<Vec<_>>();
711        Ok(self.tree.put_directory(dir_entries).await?)
712    }
713
714    fn delete_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<Cid>> {
715        Box::pin(async move {
716            let entries = self.tree.list_directory(&root).await?;
717            if is_leaf_node(&entries) {
718                let escaped = escape_key(&key);
719                if !entries.iter().any(|entry| entry.name == escaped) {
720                    return Ok(Some(root));
721                }
722
723                let new_root = self.tree.remove_entry(&root, &[], &escaped).await?;
724                let new_entries = self.tree.list_directory(&new_root).await?;
725                if new_entries.is_empty() {
726                    return Ok(None);
727                }
728                return Ok(Some(new_root));
729            }
730
731            let child = find_child(&entries, &key);
732            let child_name = child.name.clone();
733            let new_child = self.delete_recursive(entry_cid(&child), key).await?;
734
735            let Some(new_child) = new_child else {
736                let new_root = self.tree.remove_entry(&root, &[], &child_name).await?;
737                let new_entries = self.tree.list_directory(&new_root).await?;
738                if new_entries.is_empty() {
739                    return Ok(None);
740                }
741                if new_entries.len() == 1 && new_entries[0].link_type == LinkType::Dir {
742                    return Ok(Some(entry_cid(&new_entries[0])));
743                }
744                return Ok(Some(new_root));
745            };
746
747            if cid_equals(&new_child, &entry_cid(&child)) {
748                return Ok(Some(root));
749            }
750
751            let updated = self
752                .tree
753                .set_entry(
754                    &root,
755                    &[],
756                    &child_name,
757                    &new_child,
758                    count_link_entries_or_subtrees(
759                        self,
760                        &self.tree.list_directory(&new_child).await?,
761                    )
762                    .await?,
763                    LinkType::Dir,
764                )
765                .await?;
766            Ok(Some(updated))
767        })
768    }
769
770    fn traverse_in_order<'a>(&'a self, node: Cid) -> BTreeFuture<'a, Vec<(String, String)>> {
771        Box::pin(async move {
772            let entries = self.tree.list_directory(&node).await?;
773            let sorted = sort_entries(entries);
774            let mut out = Vec::new();
775
776            if is_leaf_node(&sorted) {
777                for entry in sorted {
778                    if entry.link_type != LinkType::Blob {
779                        continue;
780                    }
781                    let cid = entry_cid(&entry);
782                    if let Some(data) = self.tree.get(&cid, None).await? {
783                        out.push((unescape_key(&entry.name), String::from_utf8(data)?));
784                    }
785                }
786                return Ok(out);
787            }
788
789            for child in sorted {
790                out.extend(self.traverse_in_order(entry_cid(&child)).await?);
791            }
792            Ok(out)
793        })
794    }
795
796    fn traverse_links_in_order<'a>(&'a self, node: Cid) -> BTreeFuture<'a, Vec<(String, Cid)>> {
797        Box::pin(async move {
798            let entries = self.tree.list_directory(&node).await?;
799            let sorted = sort_entries(entries);
800            let mut out = Vec::new();
801
802            if is_leaf_node(&sorted) {
803                for entry in sorted {
804                    if entry.link_type == LinkType::File {
805                        out.push((unescape_key(&entry.name), entry_cid(&entry)));
806                    }
807                }
808                return Ok(out);
809            }
810
811            for child in sorted {
812                out.extend(self.traverse_links_in_order(entry_cid(&child)).await?);
813            }
814            Ok(out)
815        })
816    }
817
818    fn range_traverse<'a>(
819        &'a self,
820        node: Cid,
821        start: Option<String>,
822        end: Option<String>,
823    ) -> BTreeFuture<'a, Vec<(String, String)>> {
824        Box::pin(async move {
825            let entries = self.tree.list_directory(&node).await?;
826            let sorted = sort_entries(entries);
827            let mut out = Vec::new();
828
829            if is_leaf_node(&sorted) {
830                for entry in sorted {
831                    if entry.link_type != LinkType::Blob {
832                        continue;
833                    }
834                    let key = unescape_key(&entry.name);
835                    if start.as_ref().is_some_and(|start| key < *start) {
836                        continue;
837                    }
838                    if end.as_ref().is_some_and(|end| key >= *end) {
839                        return Ok(out);
840                    }
841
842                    let cid = entry_cid(&entry);
843                    if let Some(data) = self.tree.get(&cid, None).await? {
844                        out.push((key, String::from_utf8(data)?));
845                    }
846                }
847                return Ok(out);
848            }
849
850            for (index, child) in sorted.iter().enumerate() {
851                let child_min = unescape_key(&child.name);
852                let child_max = sorted.get(index + 1).map(|entry| unescape_key(&entry.name));
853
854                if start.as_ref().is_some_and(|start| {
855                    child_max
856                        .as_ref()
857                        .is_some_and(|child_max| child_max <= start)
858                }) {
859                    continue;
860                }
861                if end.as_ref().is_some_and(|end| child_min >= *end) {
862                    return Ok(out);
863                }
864
865                out.extend(
866                    self.range_traverse(entry_cid(child), start.clone(), end.clone())
867                        .await?,
868                );
869            }
870
871            Ok(out)
872        })
873    }
874
875    fn range_link_traverse<'a>(
876        &'a self,
877        node: Cid,
878        start: Option<String>,
879        end: Option<String>,
880    ) -> BTreeFuture<'a, Vec<(String, Cid)>> {
881        Box::pin(async move {
882            let entries = self.tree.list_directory(&node).await?;
883            let sorted = sort_entries(entries);
884            let mut out = Vec::new();
885
886            if is_leaf_node(&sorted) {
887                for entry in sorted {
888                    if entry.link_type != LinkType::File {
889                        continue;
890                    }
891                    let key = unescape_key(&entry.name);
892                    if start.as_ref().is_some_and(|start| key < *start) {
893                        continue;
894                    }
895                    if end.as_ref().is_some_and(|end| key >= *end) {
896                        return Ok(out);
897                    }
898                    out.push((key, entry_cid(&entry)));
899                }
900                return Ok(out);
901            }
902
903            for (index, child) in sorted.iter().enumerate() {
904                let child_min = unescape_key(&child.name);
905                let child_max = sorted.get(index + 1).map(|entry| unescape_key(&entry.name));
906
907                if start.as_ref().is_some_and(|start| {
908                    child_max
909                        .as_ref()
910                        .is_some_and(|child_max| child_max <= start)
911                }) {
912                    continue;
913                }
914                if end.as_ref().is_some_and(|end| child_min >= *end) {
915                    return Ok(out);
916                }
917
918                out.extend(
919                    self.range_link_traverse(entry_cid(child), start.clone(), end.clone())
920                        .await?,
921                );
922            }
923
924            Ok(out)
925        })
926    }
927
928    fn range_link_traverse_limited<'a>(
929        &'a self,
930        node: Cid,
931        start: Option<String>,
932        end: Option<String>,
933        limit: usize,
934    ) -> BTreeFuture<'a, Vec<(String, Cid)>> {
935        Box::pin(async move {
936            let entries = self.tree.list_directory(&node).await?;
937            let sorted = sort_entries(entries);
938            let mut out = Vec::new();
939
940            if is_leaf_node(&sorted) {
941                for entry in sorted {
942                    if entry.link_type != LinkType::File {
943                        continue;
944                    }
945                    let key = unescape_key(&entry.name);
946                    if start.as_ref().is_some_and(|start| key < *start) {
947                        continue;
948                    }
949                    if end.as_ref().is_some_and(|end| key >= *end) {
950                        return Ok(out);
951                    }
952                    out.push((key, entry_cid(&entry)));
953                    if out.len() >= limit {
954                        return Ok(out);
955                    }
956                }
957                return Ok(out);
958            }
959
960            for (index, child) in sorted.iter().enumerate() {
961                let child_min = unescape_key(&child.name);
962                let child_max = sorted.get(index + 1).map(|entry| unescape_key(&entry.name));
963
964                if start.as_ref().is_some_and(|start| {
965                    child_max
966                        .as_ref()
967                        .is_some_and(|child_max| child_max <= start)
968                }) {
969                    continue;
970                }
971                if end.as_ref().is_some_and(|end| child_min >= *end) {
972                    return Ok(out);
973                }
974
975                let remaining = limit.saturating_sub(out.len());
976                if remaining == 0 {
977                    return Ok(out);
978                }
979                out.extend(
980                    self.range_link_traverse_limited(
981                        entry_cid(child),
982                        start.clone(),
983                        end.clone(),
984                        remaining,
985                    )
986                    .await?,
987                );
988                if out.len() >= limit {
989                    return Ok(out);
990                }
991            }
992
993            Ok(out)
994        })
995    }
996
997    fn count_links_recursive<'a>(&'a self, node: Cid) -> BTreeFuture<'a, u64> {
998        Box::pin(async move {
999            let entries = self.tree.list_directory(&node).await?;
1000            count_link_entries_or_subtrees(self, &entries).await
1001        })
1002    }
1003}
1004
1005#[derive(Debug, Clone)]
1006struct InsertResult {
1007    cid: Cid,
1008    count: u64,
1009    split: Option<SplitResult>,
1010}
1011
1012pub fn escape_key(key: &str) -> String {
1013    key.replace('%', "%25")
1014        .replace('/', "%2F")
1015        .replace('\0', "%00")
1016}
1017
1018pub fn unescape_key(name: &str) -> String {
1019    name.replace("%2F", "/")
1020        .replace("%2f", "/")
1021        .replace("%00", "\0")
1022        .replace("%25", "%")
1023}
1024
1025fn increment_prefix(value: &str) -> Option<String> {
1026    if value.is_empty() {
1027        return Some(String::new());
1028    }
1029
1030    let mut chars: Vec<char> = value.chars().collect();
1031    let last = chars.pop()?;
1032    let next = char::from_u32(last as u32 + 1)?;
1033    chars.push(next);
1034    Some(chars.into_iter().collect())
1035}
1036
1037fn cid_equals(left: &Cid, right: &Cid) -> bool {
1038    left.hash == right.hash && left.key == right.key
1039}
1040
1041fn is_leaf_node(entries: &[TreeEntry]) -> bool {
1042    entries.is_empty() || entries.iter().any(|entry| entry.link_type != LinkType::Dir)
1043}
1044
1045fn sort_entries(mut entries: Vec<TreeEntry>) -> Vec<TreeEntry> {
1046    entries.sort_by(|left, right| compare_unescaped_names(&left.name, &right.name));
1047    entries
1048}
1049
1050fn compare_unescaped_names(left: &str, right: &str) -> Ordering {
1051    unescape_key(left).cmp(&unescape_key(right))
1052}
1053
1054fn find_child(entries: &[TreeEntry], key: &str) -> TreeEntry {
1055    let sorted = sort_entries(entries.to_vec());
1056    for window in sorted.windows(2) {
1057        let next_name = unescape_key(&window[1].name);
1058        if key < next_name.as_str() {
1059            return window[0].clone();
1060        }
1061    }
1062    sorted
1063        .last()
1064        .cloned()
1065        .expect("internal nodes must have children")
1066}
1067
1068fn entry_cid(entry: &TreeEntry) -> Cid {
1069    Cid {
1070        hash: entry.hash,
1071        key: entry.key,
1072    }
1073}
1074
1075fn tree_entry_to_dir_entry(entry: TreeEntry) -> DirEntry {
1076    let mut out = DirEntry::from_cid(&entry.name, &entry_cid(&entry))
1077        .with_size(entry.size)
1078        .with_link_type(entry.link_type);
1079    if let Some(meta) = entry.meta {
1080        out = out.with_meta(meta);
1081    }
1082    out
1083}
1084
1085fn count_link_entries(entries: &[TreeEntry]) -> u64 {
1086    entries
1087        .iter()
1088        .filter(|entry| entry.link_type == LinkType::File)
1089        .count() as u64
1090}
1091
1092fn stored_link_subtree_count(entry: &TreeEntry) -> Option<u64> {
1093    if entry.link_type != LinkType::Dir || entry.size == 0 {
1094        return None;
1095    }
1096    Some(entry.size)
1097}
1098
1099async fn count_link_entries_or_subtrees<S: Store>(
1100    btree: &BTree<S>,
1101    entries: &[TreeEntry],
1102) -> Result<u64, BTreeError> {
1103    if is_leaf_node(entries) {
1104        return Ok(count_link_entries(entries));
1105    }
1106
1107    let mut count = 0;
1108    for entry in entries {
1109        count += match stored_link_subtree_count(entry) {
1110            Some(child_count) => child_count,
1111            None => btree.count_links_recursive(entry_cid(entry)).await?,
1112        };
1113    }
1114    Ok(count)
1115}