Skip to main content

hashtree_index/
lib.rs

1//! Content-addressed B-tree indexes backed by hashtree.
2
3use std::cmp::Ordering;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use hashtree_core::{
9    Cid, DirEntry, HashTree, HashTreeConfig, HashTreeError, LinkType, Store, TreeEntry,
10};
11
12const DEFAULT_ORDER: usize = 32;
13
14#[derive(Debug, Clone, Default)]
15pub struct BTreeOptions {
16    pub order: Option<usize>,
17}
18
19#[derive(Debug, thiserror::Error)]
20pub enum BTreeError {
21    #[error("hash tree error: {0}")]
22    HashTree(#[from] HashTreeError),
23    #[error("value was not valid utf-8: {0}")]
24    Utf8(#[from] std::string::FromUtf8Error),
25}
26
27#[derive(Debug, Clone)]
28struct SplitResult {
29    left: Cid,
30    right: Cid,
31    left_first_key: String,
32    right_first_key: String,
33}
34
35#[derive(Debug, Clone)]
36enum InsertValue {
37    String(String),
38    Link(Cid),
39}
40
41type BTreeFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, BTreeError>> + 'a>>;
42
43pub struct BTree<S: Store> {
44    tree: HashTree<S>,
45    max_keys: usize,
46}
47
48impl<S: Store> BTree<S> {
49    pub fn new(store: Arc<S>, options: BTreeOptions) -> Self {
50        let order = options.order.unwrap_or(DEFAULT_ORDER).max(2);
51        Self {
52            tree: HashTree::new(HashTreeConfig::new(store)),
53            max_keys: order - 1,
54        }
55    }
56
57    pub async fn insert(
58        &self,
59        root: Option<&Cid>,
60        key: &str,
61        value: &str,
62    ) -> Result<Cid, BTreeError> {
63        if let Some(root) = root {
64            if self.get(Some(root), key).await?.as_deref() == Some(value) {
65                return Ok(root.clone());
66            }
67
68            let result = self
69                .insert_recursive(
70                    root.clone(),
71                    key.to_string(),
72                    InsertValue::String(value.to_string()),
73                )
74                .await?;
75            return self.finish_insert(result).await;
76        }
77
78        self.create_leaf(&[(key.to_string(), value.to_string())])
79            .await
80    }
81
82    pub async fn get(&self, root: Option<&Cid>, key: &str) -> Result<Option<String>, BTreeError> {
83        let Some(root) = root else {
84            return Ok(None);
85        };
86        self.get_recursive(root.clone(), key.to_string()).await
87    }
88
89    pub async fn insert_link(
90        &self,
91        root: Option<&Cid>,
92        key: &str,
93        target_cid: &Cid,
94    ) -> Result<Cid, BTreeError> {
95        if let Some(root) = root {
96            if self
97                .get_link(Some(root), key)
98                .await?
99                .is_some_and(|existing| cid_equals(&existing, target_cid))
100            {
101                return Ok(root.clone());
102            }
103
104            let result = self
105                .insert_recursive(
106                    root.clone(),
107                    key.to_string(),
108                    InsertValue::Link(target_cid.clone()),
109                )
110                .await?;
111            return self.finish_insert(result).await;
112        }
113
114        self.create_leaf_with_links(&[(key.to_string(), target_cid.clone())])
115            .await
116    }
117
118    pub async fn insert_link_unchecked(
119        &self,
120        root: Option<&Cid>,
121        key: &str,
122        target_cid: &Cid,
123    ) -> Result<Cid, BTreeError> {
124        if let Some(root) = root {
125            let result = self
126                .insert_recursive(
127                    root.clone(),
128                    key.to_string(),
129                    InsertValue::Link(target_cid.clone()),
130                )
131                .await?;
132            return self.finish_insert(result).await;
133        }
134
135        self.create_leaf_with_links(&[(key.to_string(), target_cid.clone())])
136            .await
137    }
138
139    pub async fn get_link(&self, root: Option<&Cid>, key: &str) -> Result<Option<Cid>, BTreeError> {
140        let Some(root) = root else {
141            return Ok(None);
142        };
143        self.get_link_recursive(root.clone(), key.to_string()).await
144    }
145
146    pub async fn entries(&self, root: Option<&Cid>) -> Result<Vec<(String, String)>, BTreeError> {
147        let Some(root) = root else {
148            return Ok(Vec::new());
149        };
150        self.traverse_in_order(root.clone()).await
151    }
152
153    pub async fn links_entries(
154        &self,
155        root: Option<&Cid>,
156    ) -> Result<Vec<(String, Cid)>, BTreeError> {
157        let Some(root) = root else {
158            return Ok(Vec::new());
159        };
160        self.traverse_links_in_order(root.clone()).await
161    }
162
163    pub async fn range(
164        &self,
165        root: &Cid,
166        start: Option<&str>,
167        end: Option<&str>,
168    ) -> Result<Vec<(String, String)>, BTreeError> {
169        self.range_traverse(
170            root.clone(),
171            start.map(ToOwned::to_owned),
172            end.map(ToOwned::to_owned),
173        )
174        .await
175    }
176
177    pub async fn prefix(
178        &self,
179        root: &Cid,
180        prefix: &str,
181    ) -> Result<Vec<(String, String)>, BTreeError> {
182        let end = increment_prefix(prefix);
183        self.range(root, Some(prefix), end.as_deref()).await
184    }
185
186    pub async fn prefix_links(
187        &self,
188        root: &Cid,
189        prefix: &str,
190    ) -> Result<Vec<(String, Cid)>, BTreeError> {
191        let end = increment_prefix(prefix);
192        self.range_link_traverse(root.clone(), Some(prefix.to_string()), end)
193            .await
194    }
195
196    pub async fn delete(&self, root: &Cid, key: &str) -> Result<Option<Cid>, BTreeError> {
197        self.delete_recursive(root.clone(), key.to_string()).await
198    }
199
200    pub async fn merge(
201        &self,
202        base: Option<&Cid>,
203        other: Option<&Cid>,
204        prefer_other: bool,
205    ) -> Result<Option<Cid>, BTreeError> {
206        let Some(other) = other else {
207            return Ok(base.cloned());
208        };
209        let Some(mut result) = base.cloned().or_else(|| Some(other.clone())) else {
210            return Ok(None);
211        };
212        if base.is_none() {
213            return Ok(Some(result));
214        }
215
216        for (key, value) in self.entries(Some(other)).await? {
217            let existing = self.get(Some(&result), &key).await?;
218            if existing.is_none() || prefer_other {
219                result = self.insert(Some(&result), &key, &value).await?;
220            }
221        }
222
223        Ok(Some(result))
224    }
225
226    pub async fn merge_links(
227        &self,
228        base: Option<&Cid>,
229        other: Option<&Cid>,
230        prefer_other: bool,
231    ) -> Result<Option<Cid>, BTreeError> {
232        let Some(other) = other else {
233            return Ok(base.cloned());
234        };
235        let Some(mut result) = base.cloned().or_else(|| Some(other.clone())) else {
236            return Ok(None);
237        };
238        if base.is_none() {
239            return Ok(Some(result));
240        }
241
242        for (key, value) in self.links_entries(Some(other)).await? {
243            let existing = self.get_link(Some(&result), &key).await?;
244            if existing.is_none() || prefer_other {
245                result = self.insert_link(Some(&result), &key, &value).await?;
246            }
247        }
248
249        Ok(Some(result))
250    }
251
252    async fn finish_insert(&self, result: InsertResult) -> Result<Cid, BTreeError> {
253        if let Some(split) = result.split {
254            return self
255                .create_internal_root(
256                    &split.left_first_key,
257                    &split.left,
258                    &split.right_first_key,
259                    &split.right,
260                )
261                .await;
262        }
263        Ok(result.cid)
264    }
265
266    fn get_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<String>> {
267        Box::pin(async move {
268            let entries = self.tree.list_directory(&root).await?;
269            if is_leaf_node(&entries) {
270                let escaped = escape_key(&key);
271                let Some(entry) = entries.iter().find(|entry| entry.name == escaped) else {
272                    return Ok(None);
273                };
274                if entry.link_type != LinkType::Blob {
275                    return Ok(None);
276                }
277
278                let cid = entry_cid(entry);
279                let Some(data) = self.tree.get(&cid, None).await? else {
280                    return Ok(None);
281                };
282                return Ok(Some(String::from_utf8(data)?));
283            }
284
285            let child = find_child(&entries, &key);
286            self.get_recursive(entry_cid(&child), key).await
287        })
288    }
289
290    fn get_link_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<Cid>> {
291        Box::pin(async move {
292            let entries = self.tree.list_directory(&root).await?;
293            if is_leaf_node(&entries) {
294                let escaped = escape_key(&key);
295                let Some(entry) = entries.iter().find(|entry| entry.name == escaped) else {
296                    return Ok(None);
297                };
298                if entry.link_type != LinkType::File {
299                    return Ok(None);
300                }
301                return Ok(Some(entry_cid(entry)));
302            }
303
304            let child = find_child(&entries, &key);
305            self.get_link_recursive(entry_cid(&child), key).await
306        })
307    }
308
309    fn insert_recursive<'a>(
310        &'a self,
311        node: Cid,
312        key: String,
313        value: InsertValue,
314    ) -> BTreeFuture<'a, InsertResult> {
315        Box::pin(async move {
316            let entries = self.tree.list_directory(&node).await?;
317            if is_leaf_node(&entries) {
318                return self.insert_into_leaf(node, entries, key, value).await;
319            }
320            self.insert_into_internal(node, entries, key, value).await
321        })
322    }
323
324    fn insert_into_leaf<'a>(
325        &'a self,
326        node: Cid,
327        _entries: Vec<TreeEntry>,
328        key: String,
329        value: InsertValue,
330    ) -> BTreeFuture<'a, InsertResult> {
331        Box::pin(async move {
332            let escaped_key = escape_key(&key);
333            let (entry_cid, size, link_type) = match value {
334                InsertValue::String(value) => {
335                    let (cid, size) = self.tree.put_file(value.as_bytes()).await?;
336                    (cid, size, LinkType::Blob)
337                }
338                InsertValue::Link(cid) => (cid, 0, LinkType::File),
339            };
340
341            let new_node = self
342                .tree
343                .set_entry(&node, &[], &escaped_key, &entry_cid, size, link_type)
344                .await?;
345
346            let new_entries = self.tree.list_directory(&new_node).await?;
347            if new_entries.len() > self.max_keys {
348                return Ok(InsertResult {
349                    cid: new_node,
350                    split: Some(self.split_leaf(new_entries).await?),
351                });
352            }
353
354            Ok(InsertResult {
355                cid: new_node,
356                split: None,
357            })
358        })
359    }
360
361    fn insert_into_internal<'a>(
362        &'a self,
363        node: Cid,
364        entries: Vec<TreeEntry>,
365        key: String,
366        value: InsertValue,
367    ) -> BTreeFuture<'a, InsertResult> {
368        Box::pin(async move {
369            let child = find_child(&entries, &key);
370            let child_name = child.name.clone();
371            let child_cid = entry_cid(&child);
372            let result = self.insert_recursive(child_cid, key, value).await?;
373
374            let mut new_node = self
375                .tree
376                .set_entry(&node, &[], &child_name, &result.cid, 0, LinkType::Dir)
377                .await?;
378
379            if let Some(split) = result.split {
380                new_node = self.tree.remove_entry(&new_node, &[], &child_name).await?;
381                new_node = self
382                    .tree
383                    .set_entry(
384                        &new_node,
385                        &[],
386                        &escape_key(&split.left_first_key),
387                        &split.left,
388                        0,
389                        LinkType::Dir,
390                    )
391                    .await?;
392                new_node = self
393                    .tree
394                    .set_entry(
395                        &new_node,
396                        &[],
397                        &escape_key(&split.right_first_key),
398                        &split.right,
399                        0,
400                        LinkType::Dir,
401                    )
402                    .await?;
403            }
404
405            let new_entries = self.tree.list_directory(&new_node).await?;
406            if new_entries.len() > self.max_keys {
407                return Ok(InsertResult {
408                    cid: new_node,
409                    split: Some(self.split_internal(new_entries).await?),
410                });
411            }
412
413            Ok(InsertResult {
414                cid: new_node,
415                split: None,
416            })
417        })
418    }
419
420    async fn split_leaf(&self, entries: Vec<TreeEntry>) -> Result<SplitResult, BTreeError> {
421        let sorted = sort_entries(entries);
422        let mid = sorted.len() / 2;
423        let left_entries = &sorted[..mid];
424        let right_entries = &sorted[mid..];
425
426        let left = self.create_node_from_entries(left_entries).await?;
427        let right = self.create_node_from_entries(right_entries).await?;
428
429        Ok(SplitResult {
430            left,
431            right,
432            left_first_key: unescape_key(&left_entries[0].name),
433            right_first_key: unescape_key(&right_entries[0].name),
434        })
435    }
436
437    async fn split_internal(&self, entries: Vec<TreeEntry>) -> Result<SplitResult, BTreeError> {
438        let sorted = sort_entries(entries);
439        let mid = sorted.len() / 2;
440        let left_entries = &sorted[..mid];
441        let right_entries = &sorted[mid..];
442
443        let left = self.create_node_from_entries(left_entries).await?;
444        let right = self.create_node_from_entries(right_entries).await?;
445
446        Ok(SplitResult {
447            left,
448            right,
449            left_first_key: unescape_key(&left_entries[0].name),
450            right_first_key: unescape_key(&right_entries[0].name),
451        })
452    }
453
454    async fn create_leaf(&self, items: &[(String, String)]) -> Result<Cid, BTreeError> {
455        let mut entries = Vec::with_capacity(items.len());
456        for (key, value) in items {
457            let (cid, size) = self.tree.put_file(value.as_bytes()).await?;
458            entries.push(
459                DirEntry::from_cid(escape_key(key), &cid)
460                    .with_size(size)
461                    .with_link_type(LinkType::Blob),
462            );
463        }
464        Ok(self.tree.put_directory(entries).await?)
465    }
466
467    async fn create_leaf_with_links(&self, items: &[(String, Cid)]) -> Result<Cid, BTreeError> {
468        let entries: Vec<DirEntry> = items
469            .iter()
470            .map(|(key, cid)| {
471                DirEntry::from_cid(escape_key(key), cid).with_link_type(LinkType::File)
472            })
473            .collect();
474        Ok(self.tree.put_directory(entries).await?)
475    }
476
477    async fn create_internal_root(
478        &self,
479        left_key: &str,
480        left: &Cid,
481        right_key: &str,
482        right: &Cid,
483    ) -> Result<Cid, BTreeError> {
484        let entries = vec![
485            DirEntry::from_cid(escape_key(left_key), left).with_link_type(LinkType::Dir),
486            DirEntry::from_cid(escape_key(right_key), right).with_link_type(LinkType::Dir),
487        ];
488        Ok(self.tree.put_directory(entries).await?)
489    }
490
491    async fn create_node_from_entries(&self, entries: &[TreeEntry]) -> Result<Cid, BTreeError> {
492        let dir_entries = entries
493            .iter()
494            .cloned()
495            .map(tree_entry_to_dir_entry)
496            .collect::<Vec<_>>();
497        Ok(self.tree.put_directory(dir_entries).await?)
498    }
499
500    fn delete_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<Cid>> {
501        Box::pin(async move {
502            let entries = self.tree.list_directory(&root).await?;
503            if is_leaf_node(&entries) {
504                let escaped = escape_key(&key);
505                if !entries.iter().any(|entry| entry.name == escaped) {
506                    return Ok(Some(root));
507                }
508
509                let new_root = self.tree.remove_entry(&root, &[], &escaped).await?;
510                let new_entries = self.tree.list_directory(&new_root).await?;
511                if new_entries.is_empty() {
512                    return Ok(None);
513                }
514                return Ok(Some(new_root));
515            }
516
517            let child = find_child(&entries, &key);
518            let child_name = child.name.clone();
519            let new_child = self.delete_recursive(entry_cid(&child), key).await?;
520
521            let Some(new_child) = new_child else {
522                let new_root = self.tree.remove_entry(&root, &[], &child_name).await?;
523                let new_entries = self.tree.list_directory(&new_root).await?;
524                if new_entries.is_empty() {
525                    return Ok(None);
526                }
527                if new_entries.len() == 1 && new_entries[0].link_type == LinkType::Dir {
528                    return Ok(Some(entry_cid(&new_entries[0])));
529                }
530                return Ok(Some(new_root));
531            };
532
533            if cid_equals(&new_child, &entry_cid(&child)) {
534                return Ok(Some(root));
535            }
536
537            let updated = self
538                .tree
539                .set_entry(&root, &[], &child_name, &new_child, 0, LinkType::Dir)
540                .await?;
541            Ok(Some(updated))
542        })
543    }
544
545    fn traverse_in_order<'a>(&'a self, node: Cid) -> BTreeFuture<'a, Vec<(String, String)>> {
546        Box::pin(async move {
547            let entries = self.tree.list_directory(&node).await?;
548            let sorted = sort_entries(entries);
549            let mut out = Vec::new();
550
551            if is_leaf_node(&sorted) {
552                for entry in sorted {
553                    if entry.link_type != LinkType::Blob {
554                        continue;
555                    }
556                    let cid = entry_cid(&entry);
557                    if let Some(data) = self.tree.get(&cid, None).await? {
558                        out.push((unescape_key(&entry.name), String::from_utf8(data)?));
559                    }
560                }
561                return Ok(out);
562            }
563
564            for child in sorted {
565                out.extend(self.traverse_in_order(entry_cid(&child)).await?);
566            }
567            Ok(out)
568        })
569    }
570
571    fn traverse_links_in_order<'a>(&'a self, node: Cid) -> BTreeFuture<'a, Vec<(String, Cid)>> {
572        Box::pin(async move {
573            let entries = self.tree.list_directory(&node).await?;
574            let sorted = sort_entries(entries);
575            let mut out = Vec::new();
576
577            if is_leaf_node(&sorted) {
578                for entry in sorted {
579                    if entry.link_type == LinkType::File {
580                        out.push((unescape_key(&entry.name), entry_cid(&entry)));
581                    }
582                }
583                return Ok(out);
584            }
585
586            for child in sorted {
587                out.extend(self.traverse_links_in_order(entry_cid(&child)).await?);
588            }
589            Ok(out)
590        })
591    }
592
593    fn range_traverse<'a>(
594        &'a self,
595        node: Cid,
596        start: Option<String>,
597        end: Option<String>,
598    ) -> BTreeFuture<'a, Vec<(String, String)>> {
599        Box::pin(async move {
600            let entries = self.tree.list_directory(&node).await?;
601            let sorted = sort_entries(entries);
602            let mut out = Vec::new();
603
604            if is_leaf_node(&sorted) {
605                for entry in sorted {
606                    if entry.link_type != LinkType::Blob {
607                        continue;
608                    }
609                    let key = unescape_key(&entry.name);
610                    if start.as_ref().is_some_and(|start| key < *start) {
611                        continue;
612                    }
613                    if end.as_ref().is_some_and(|end| key >= *end) {
614                        return Ok(out);
615                    }
616
617                    let cid = entry_cid(&entry);
618                    if let Some(data) = self.tree.get(&cid, None).await? {
619                        out.push((key, String::from_utf8(data)?));
620                    }
621                }
622                return Ok(out);
623            }
624
625            for (index, child) in sorted.iter().enumerate() {
626                let child_min = unescape_key(&child.name);
627                let child_max = sorted.get(index + 1).map(|entry| unescape_key(&entry.name));
628
629                if start.as_ref().is_some_and(|start| {
630                    child_max
631                        .as_ref()
632                        .is_some_and(|child_max| child_max <= start)
633                }) {
634                    continue;
635                }
636                if end.as_ref().is_some_and(|end| child_min >= *end) {
637                    return Ok(out);
638                }
639
640                out.extend(
641                    self.range_traverse(entry_cid(child), start.clone(), end.clone())
642                        .await?,
643                );
644            }
645
646            Ok(out)
647        })
648    }
649
650    fn range_link_traverse<'a>(
651        &'a self,
652        node: Cid,
653        start: Option<String>,
654        end: Option<String>,
655    ) -> BTreeFuture<'a, Vec<(String, Cid)>> {
656        Box::pin(async move {
657            let entries = self.tree.list_directory(&node).await?;
658            let sorted = sort_entries(entries);
659            let mut out = Vec::new();
660
661            if is_leaf_node(&sorted) {
662                for entry in sorted {
663                    if entry.link_type != LinkType::File {
664                        continue;
665                    }
666                    let key = unescape_key(&entry.name);
667                    if start.as_ref().is_some_and(|start| key < *start) {
668                        continue;
669                    }
670                    if end.as_ref().is_some_and(|end| key >= *end) {
671                        return Ok(out);
672                    }
673                    out.push((key, entry_cid(&entry)));
674                }
675                return Ok(out);
676            }
677
678            for (index, child) in sorted.iter().enumerate() {
679                let child_min = unescape_key(&child.name);
680                let child_max = sorted.get(index + 1).map(|entry| unescape_key(&entry.name));
681
682                if start.as_ref().is_some_and(|start| {
683                    child_max
684                        .as_ref()
685                        .is_some_and(|child_max| child_max <= start)
686                }) {
687                    continue;
688                }
689                if end.as_ref().is_some_and(|end| child_min >= *end) {
690                    return Ok(out);
691                }
692
693                out.extend(
694                    self.range_link_traverse(entry_cid(child), start.clone(), end.clone())
695                        .await?,
696                );
697            }
698
699            Ok(out)
700        })
701    }
702}
703
704#[derive(Debug, Clone)]
705struct InsertResult {
706    cid: Cid,
707    split: Option<SplitResult>,
708}
709
710pub fn escape_key(key: &str) -> String {
711    key.replace('%', "%25")
712        .replace('/', "%2F")
713        .replace('\0', "%00")
714}
715
716pub fn unescape_key(name: &str) -> String {
717    name.replace("%2F", "/")
718        .replace("%2f", "/")
719        .replace("%00", "\0")
720        .replace("%25", "%")
721}
722
723fn increment_prefix(value: &str) -> Option<String> {
724    if value.is_empty() {
725        return Some(String::new());
726    }
727
728    let mut chars: Vec<char> = value.chars().collect();
729    let last = chars.pop()?;
730    let next = char::from_u32(last as u32 + 1)?;
731    chars.push(next);
732    Some(chars.into_iter().collect())
733}
734
735fn cid_equals(left: &Cid, right: &Cid) -> bool {
736    left.hash == right.hash && left.key == right.key
737}
738
739fn is_leaf_node(entries: &[TreeEntry]) -> bool {
740    entries.is_empty() || entries.iter().any(|entry| entry.link_type != LinkType::Dir)
741}
742
743fn sort_entries(mut entries: Vec<TreeEntry>) -> Vec<TreeEntry> {
744    entries.sort_by(|left, right| compare_unescaped_names(&left.name, &right.name));
745    entries
746}
747
748fn compare_unescaped_names(left: &str, right: &str) -> Ordering {
749    unescape_key(left).cmp(&unescape_key(right))
750}
751
752fn find_child(entries: &[TreeEntry], key: &str) -> TreeEntry {
753    let sorted = sort_entries(entries.to_vec());
754    for window in sorted.windows(2) {
755        let next_name = unescape_key(&window[1].name);
756        if key < next_name.as_str() {
757            return window[0].clone();
758        }
759    }
760    sorted
761        .last()
762        .cloned()
763        .expect("internal nodes must have children")
764}
765
766fn entry_cid(entry: &TreeEntry) -> Cid {
767    Cid {
768        hash: entry.hash,
769        key: entry.key,
770    }
771}
772
773fn tree_entry_to_dir_entry(entry: TreeEntry) -> DirEntry {
774    let mut out = DirEntry::from_cid(&entry.name, &entry_cid(&entry))
775        .with_size(entry.size)
776        .with_link_type(entry.link_type);
777    if let Some(meta) = entry.meta {
778        out = out.with_meta(meta);
779    }
780    out
781}