Skip to main content

hashtree_index/
lib.rs

1//! Content-addressed B-tree indexes backed by hashtree.
2
3use std::cmp::Ordering;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use hashtree_core::{
9    Cid, DirEntry, HashTree, HashTreeConfig, HashTreeError, LinkType, Store, TreeEntry,
10};
11
12const DEFAULT_ORDER: usize = 32;
13
14#[derive(Debug, Clone, Default)]
15pub struct BTreeOptions {
16    pub order: Option<usize>,
17}
18
19#[derive(Debug, thiserror::Error)]
20pub enum BTreeError {
21    #[error("hash tree error: {0}")]
22    HashTree(#[from] HashTreeError),
23    #[error("value was not valid utf-8: {0}")]
24    Utf8(#[from] std::string::FromUtf8Error),
25}
26
27#[derive(Debug, Clone)]
28struct SplitResult {
29    left: Cid,
30    right: Cid,
31    left_first_key: String,
32    right_first_key: String,
33}
34
35#[derive(Debug, Clone)]
36enum InsertValue {
37    String(String),
38    Link(Cid),
39}
40
41type BTreeFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, BTreeError>> + 'a>>;
42
43pub struct BTree<S: Store> {
44    tree: HashTree<S>,
45    max_keys: usize,
46}
47
48impl<S: Store> BTree<S> {
49    pub fn new(store: Arc<S>, options: BTreeOptions) -> Self {
50        let order = options.order.unwrap_or(DEFAULT_ORDER).max(2);
51        Self {
52            tree: HashTree::new(HashTreeConfig::new(store)),
53            max_keys: order - 1,
54        }
55    }
56
57    pub async fn insert(
58        &self,
59        root: Option<&Cid>,
60        key: &str,
61        value: &str,
62    ) -> Result<Cid, BTreeError> {
63        if let Some(root) = root {
64            if self.get(Some(root), key).await?.as_deref() == Some(value) {
65                return Ok(root.clone());
66            }
67
68            let result = self
69                .insert_recursive(
70                    root.clone(),
71                    key.to_string(),
72                    InsertValue::String(value.to_string()),
73                )
74                .await?;
75            return self.finish_insert(result).await;
76        }
77
78        self.create_leaf(&[(key.to_string(), value.to_string())])
79            .await
80    }
81
82    pub async fn get(&self, root: Option<&Cid>, key: &str) -> Result<Option<String>, BTreeError> {
83        let Some(root) = root else {
84            return Ok(None);
85        };
86        self.get_recursive(root.clone(), key.to_string()).await
87    }
88
89    pub async fn insert_link(
90        &self,
91        root: Option<&Cid>,
92        key: &str,
93        target_cid: &Cid,
94    ) -> Result<Cid, BTreeError> {
95        if let Some(root) = root {
96            if self
97                .get_link(Some(root), key)
98                .await?
99                .is_some_and(|existing| cid_equals(&existing, target_cid))
100            {
101                return Ok(root.clone());
102            }
103
104            let result = self
105                .insert_recursive(
106                    root.clone(),
107                    key.to_string(),
108                    InsertValue::Link(target_cid.clone()),
109                )
110                .await?;
111            return self.finish_insert(result).await;
112        }
113
114        self.create_leaf_with_links(&[(key.to_string(), target_cid.clone())])
115            .await
116    }
117
118    pub async fn get_link(&self, root: Option<&Cid>, key: &str) -> Result<Option<Cid>, BTreeError> {
119        let Some(root) = root else {
120            return Ok(None);
121        };
122        self.get_link_recursive(root.clone(), key.to_string()).await
123    }
124
125    pub async fn entries(&self, root: Option<&Cid>) -> Result<Vec<(String, String)>, BTreeError> {
126        let Some(root) = root else {
127            return Ok(Vec::new());
128        };
129        self.traverse_in_order(root.clone()).await
130    }
131
132    pub async fn links_entries(
133        &self,
134        root: Option<&Cid>,
135    ) -> Result<Vec<(String, Cid)>, BTreeError> {
136        let Some(root) = root else {
137            return Ok(Vec::new());
138        };
139        self.traverse_links_in_order(root.clone()).await
140    }
141
142    pub async fn range(
143        &self,
144        root: &Cid,
145        start: Option<&str>,
146        end: Option<&str>,
147    ) -> Result<Vec<(String, String)>, BTreeError> {
148        self.range_traverse(
149            root.clone(),
150            start.map(ToOwned::to_owned),
151            end.map(ToOwned::to_owned),
152        )
153        .await
154    }
155
156    pub async fn prefix(
157        &self,
158        root: &Cid,
159        prefix: &str,
160    ) -> Result<Vec<(String, String)>, BTreeError> {
161        let end = increment_prefix(prefix);
162        self.range(root, Some(prefix), end.as_deref()).await
163    }
164
165    pub async fn prefix_links(
166        &self,
167        root: &Cid,
168        prefix: &str,
169    ) -> Result<Vec<(String, Cid)>, BTreeError> {
170        let end = increment_prefix(prefix);
171        self.range_link_traverse(root.clone(), Some(prefix.to_string()), end)
172            .await
173    }
174
175    pub async fn delete(&self, root: &Cid, key: &str) -> Result<Option<Cid>, BTreeError> {
176        self.delete_recursive(root.clone(), key.to_string()).await
177    }
178
179    pub async fn merge(
180        &self,
181        base: Option<&Cid>,
182        other: Option<&Cid>,
183        prefer_other: bool,
184    ) -> Result<Option<Cid>, BTreeError> {
185        let Some(other) = other else {
186            return Ok(base.cloned());
187        };
188        let Some(mut result) = base.cloned().or_else(|| Some(other.clone())) else {
189            return Ok(None);
190        };
191        if base.is_none() {
192            return Ok(Some(result));
193        }
194
195        for (key, value) in self.entries(Some(other)).await? {
196            let existing = self.get(Some(&result), &key).await?;
197            if existing.is_none() || prefer_other {
198                result = self.insert(Some(&result), &key, &value).await?;
199            }
200        }
201
202        Ok(Some(result))
203    }
204
205    pub async fn merge_links(
206        &self,
207        base: Option<&Cid>,
208        other: Option<&Cid>,
209        prefer_other: bool,
210    ) -> Result<Option<Cid>, BTreeError> {
211        let Some(other) = other else {
212            return Ok(base.cloned());
213        };
214        let Some(mut result) = base.cloned().or_else(|| Some(other.clone())) else {
215            return Ok(None);
216        };
217        if base.is_none() {
218            return Ok(Some(result));
219        }
220
221        for (key, value) in self.links_entries(Some(other)).await? {
222            let existing = self.get_link(Some(&result), &key).await?;
223            if existing.is_none() || prefer_other {
224                result = self.insert_link(Some(&result), &key, &value).await?;
225            }
226        }
227
228        Ok(Some(result))
229    }
230
231    async fn finish_insert(&self, result: InsertResult) -> Result<Cid, BTreeError> {
232        if let Some(split) = result.split {
233            return self
234                .create_internal_root(
235                    &split.left_first_key,
236                    &split.left,
237                    &split.right_first_key,
238                    &split.right,
239                )
240                .await;
241        }
242        Ok(result.cid)
243    }
244
245    fn get_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<String>> {
246        Box::pin(async move {
247            let entries = self.tree.list_directory(&root).await?;
248            if is_leaf_node(&entries) {
249                let escaped = escape_key(&key);
250                let Some(entry) = entries.iter().find(|entry| entry.name == escaped) else {
251                    return Ok(None);
252                };
253                if entry.link_type != LinkType::Blob {
254                    return Ok(None);
255                }
256
257                let cid = entry_cid(entry);
258                let Some(data) = self.tree.get(&cid, None).await? else {
259                    return Ok(None);
260                };
261                return Ok(Some(String::from_utf8(data)?));
262            }
263
264            let child = find_child(&entries, &key);
265            self.get_recursive(entry_cid(&child), key).await
266        })
267    }
268
269    fn get_link_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<Cid>> {
270        Box::pin(async move {
271            let entries = self.tree.list_directory(&root).await?;
272            if is_leaf_node(&entries) {
273                let escaped = escape_key(&key);
274                let Some(entry) = entries.iter().find(|entry| entry.name == escaped) else {
275                    return Ok(None);
276                };
277                if entry.link_type != LinkType::File {
278                    return Ok(None);
279                }
280                return Ok(Some(entry_cid(entry)));
281            }
282
283            let child = find_child(&entries, &key);
284            self.get_link_recursive(entry_cid(&child), key).await
285        })
286    }
287
288    fn insert_recursive<'a>(
289        &'a self,
290        node: Cid,
291        key: String,
292        value: InsertValue,
293    ) -> BTreeFuture<'a, InsertResult> {
294        Box::pin(async move {
295            let entries = self.tree.list_directory(&node).await?;
296            if is_leaf_node(&entries) {
297                return self.insert_into_leaf(node, entries, key, value).await;
298            }
299            self.insert_into_internal(node, entries, key, value).await
300        })
301    }
302
303    fn insert_into_leaf<'a>(
304        &'a self,
305        node: Cid,
306        _entries: Vec<TreeEntry>,
307        key: String,
308        value: InsertValue,
309    ) -> BTreeFuture<'a, InsertResult> {
310        Box::pin(async move {
311            let escaped_key = escape_key(&key);
312            let (entry_cid, size, link_type) = match value {
313                InsertValue::String(value) => {
314                    let (cid, size) = self.tree.put_file(value.as_bytes()).await?;
315                    (cid, size, LinkType::Blob)
316                }
317                InsertValue::Link(cid) => (cid, 0, LinkType::File),
318            };
319
320            let new_node = self
321                .tree
322                .set_entry(&node, &[], &escaped_key, &entry_cid, size, link_type)
323                .await?;
324
325            let new_entries = self.tree.list_directory(&new_node).await?;
326            if new_entries.len() > self.max_keys {
327                return Ok(InsertResult {
328                    cid: new_node,
329                    split: Some(self.split_leaf(new_entries).await?),
330                });
331            }
332
333            Ok(InsertResult {
334                cid: new_node,
335                split: None,
336            })
337        })
338    }
339
340    fn insert_into_internal<'a>(
341        &'a self,
342        node: Cid,
343        entries: Vec<TreeEntry>,
344        key: String,
345        value: InsertValue,
346    ) -> BTreeFuture<'a, InsertResult> {
347        Box::pin(async move {
348            let child = find_child(&entries, &key);
349            let child_name = child.name.clone();
350            let child_cid = entry_cid(&child);
351            let result = self.insert_recursive(child_cid, key, value).await?;
352
353            let mut new_node = self
354                .tree
355                .set_entry(&node, &[], &child_name, &result.cid, 0, LinkType::Dir)
356                .await?;
357
358            if let Some(split) = result.split {
359                new_node = self.tree.remove_entry(&new_node, &[], &child_name).await?;
360                new_node = self
361                    .tree
362                    .set_entry(
363                        &new_node,
364                        &[],
365                        &escape_key(&split.left_first_key),
366                        &split.left,
367                        0,
368                        LinkType::Dir,
369                    )
370                    .await?;
371                new_node = self
372                    .tree
373                    .set_entry(
374                        &new_node,
375                        &[],
376                        &escape_key(&split.right_first_key),
377                        &split.right,
378                        0,
379                        LinkType::Dir,
380                    )
381                    .await?;
382            }
383
384            let new_entries = self.tree.list_directory(&new_node).await?;
385            if new_entries.len() > self.max_keys {
386                return Ok(InsertResult {
387                    cid: new_node,
388                    split: Some(self.split_internal(new_entries).await?),
389                });
390            }
391
392            Ok(InsertResult {
393                cid: new_node,
394                split: None,
395            })
396        })
397    }
398
399    async fn split_leaf(&self, entries: Vec<TreeEntry>) -> Result<SplitResult, BTreeError> {
400        let sorted = sort_entries(entries);
401        let mid = sorted.len() / 2;
402        let left_entries = &sorted[..mid];
403        let right_entries = &sorted[mid..];
404
405        let left = self.create_node_from_entries(left_entries).await?;
406        let right = self.create_node_from_entries(right_entries).await?;
407
408        Ok(SplitResult {
409            left,
410            right,
411            left_first_key: unescape_key(&left_entries[0].name),
412            right_first_key: unescape_key(&right_entries[0].name),
413        })
414    }
415
416    async fn split_internal(&self, entries: Vec<TreeEntry>) -> Result<SplitResult, BTreeError> {
417        let sorted = sort_entries(entries);
418        let mid = sorted.len() / 2;
419        let left_entries = &sorted[..mid];
420        let right_entries = &sorted[mid..];
421
422        let left = self.create_node_from_entries(left_entries).await?;
423        let right = self.create_node_from_entries(right_entries).await?;
424
425        Ok(SplitResult {
426            left,
427            right,
428            left_first_key: unescape_key(&left_entries[0].name),
429            right_first_key: unescape_key(&right_entries[0].name),
430        })
431    }
432
433    async fn create_leaf(&self, items: &[(String, String)]) -> Result<Cid, BTreeError> {
434        let mut entries = Vec::with_capacity(items.len());
435        for (key, value) in items {
436            let (cid, size) = self.tree.put_file(value.as_bytes()).await?;
437            entries.push(
438                DirEntry::from_cid(escape_key(key), &cid)
439                    .with_size(size)
440                    .with_link_type(LinkType::Blob),
441            );
442        }
443        Ok(self.tree.put_directory(entries).await?)
444    }
445
446    async fn create_leaf_with_links(&self, items: &[(String, Cid)]) -> Result<Cid, BTreeError> {
447        let entries: Vec<DirEntry> = items
448            .iter()
449            .map(|(key, cid)| {
450                DirEntry::from_cid(escape_key(key), cid).with_link_type(LinkType::File)
451            })
452            .collect();
453        Ok(self.tree.put_directory(entries).await?)
454    }
455
456    async fn create_internal_root(
457        &self,
458        left_key: &str,
459        left: &Cid,
460        right_key: &str,
461        right: &Cid,
462    ) -> Result<Cid, BTreeError> {
463        let entries = vec![
464            DirEntry::from_cid(escape_key(left_key), left).with_link_type(LinkType::Dir),
465            DirEntry::from_cid(escape_key(right_key), right).with_link_type(LinkType::Dir),
466        ];
467        Ok(self.tree.put_directory(entries).await?)
468    }
469
470    async fn create_node_from_entries(&self, entries: &[TreeEntry]) -> Result<Cid, BTreeError> {
471        let dir_entries = entries
472            .iter()
473            .cloned()
474            .map(tree_entry_to_dir_entry)
475            .collect::<Vec<_>>();
476        Ok(self.tree.put_directory(dir_entries).await?)
477    }
478
479    fn delete_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<Cid>> {
480        Box::pin(async move {
481            let entries = self.tree.list_directory(&root).await?;
482            if is_leaf_node(&entries) {
483                let escaped = escape_key(&key);
484                if !entries.iter().any(|entry| entry.name == escaped) {
485                    return Ok(Some(root));
486                }
487
488                let new_root = self.tree.remove_entry(&root, &[], &escaped).await?;
489                let new_entries = self.tree.list_directory(&new_root).await?;
490                if new_entries.is_empty() {
491                    return Ok(None);
492                }
493                return Ok(Some(new_root));
494            }
495
496            let child = find_child(&entries, &key);
497            let child_name = child.name.clone();
498            let new_child = self.delete_recursive(entry_cid(&child), key).await?;
499
500            let Some(new_child) = new_child else {
501                let new_root = self.tree.remove_entry(&root, &[], &child_name).await?;
502                let new_entries = self.tree.list_directory(&new_root).await?;
503                if new_entries.is_empty() {
504                    return Ok(None);
505                }
506                if new_entries.len() == 1 && new_entries[0].link_type == LinkType::Dir {
507                    return Ok(Some(entry_cid(&new_entries[0])));
508                }
509                return Ok(Some(new_root));
510            };
511
512            if cid_equals(&new_child, &entry_cid(&child)) {
513                return Ok(Some(root));
514            }
515
516            let updated = self
517                .tree
518                .set_entry(&root, &[], &child_name, &new_child, 0, LinkType::Dir)
519                .await?;
520            Ok(Some(updated))
521        })
522    }
523
524    fn traverse_in_order<'a>(&'a self, node: Cid) -> BTreeFuture<'a, Vec<(String, String)>> {
525        Box::pin(async move {
526            let entries = self.tree.list_directory(&node).await?;
527            let sorted = sort_entries(entries);
528            let mut out = Vec::new();
529
530            if is_leaf_node(&sorted) {
531                for entry in sorted {
532                    if entry.link_type != LinkType::Blob {
533                        continue;
534                    }
535                    let cid = entry_cid(&entry);
536                    if let Some(data) = self.tree.get(&cid, None).await? {
537                        out.push((unescape_key(&entry.name), String::from_utf8(data)?));
538                    }
539                }
540                return Ok(out);
541            }
542
543            for child in sorted {
544                out.extend(self.traverse_in_order(entry_cid(&child)).await?);
545            }
546            Ok(out)
547        })
548    }
549
550    fn traverse_links_in_order<'a>(&'a self, node: Cid) -> BTreeFuture<'a, Vec<(String, Cid)>> {
551        Box::pin(async move {
552            let entries = self.tree.list_directory(&node).await?;
553            let sorted = sort_entries(entries);
554            let mut out = Vec::new();
555
556            if is_leaf_node(&sorted) {
557                for entry in sorted {
558                    if entry.link_type == LinkType::File {
559                        out.push((unescape_key(&entry.name), entry_cid(&entry)));
560                    }
561                }
562                return Ok(out);
563            }
564
565            for child in sorted {
566                out.extend(self.traverse_links_in_order(entry_cid(&child)).await?);
567            }
568            Ok(out)
569        })
570    }
571
572    fn range_traverse<'a>(
573        &'a self,
574        node: Cid,
575        start: Option<String>,
576        end: Option<String>,
577    ) -> BTreeFuture<'a, Vec<(String, String)>> {
578        Box::pin(async move {
579            let entries = self.tree.list_directory(&node).await?;
580            let sorted = sort_entries(entries);
581            let mut out = Vec::new();
582
583            if is_leaf_node(&sorted) {
584                for entry in sorted {
585                    if entry.link_type != LinkType::Blob {
586                        continue;
587                    }
588                    let key = unescape_key(&entry.name);
589                    if start.as_ref().is_some_and(|start| key < *start) {
590                        continue;
591                    }
592                    if end.as_ref().is_some_and(|end| key >= *end) {
593                        return Ok(out);
594                    }
595
596                    let cid = entry_cid(&entry);
597                    if let Some(data) = self.tree.get(&cid, None).await? {
598                        out.push((key, String::from_utf8(data)?));
599                    }
600                }
601                return Ok(out);
602            }
603
604            for (index, child) in sorted.iter().enumerate() {
605                let child_min = unescape_key(&child.name);
606                let child_max = sorted.get(index + 1).map(|entry| unescape_key(&entry.name));
607
608                if start.as_ref().is_some_and(|start| {
609                    child_max
610                        .as_ref()
611                        .is_some_and(|child_max| child_max <= start)
612                }) {
613                    continue;
614                }
615                if end.as_ref().is_some_and(|end| child_min >= *end) {
616                    return Ok(out);
617                }
618
619                out.extend(
620                    self.range_traverse(entry_cid(child), start.clone(), end.clone())
621                        .await?,
622                );
623            }
624
625            Ok(out)
626        })
627    }
628
629    fn range_link_traverse<'a>(
630        &'a self,
631        node: Cid,
632        start: Option<String>,
633        end: Option<String>,
634    ) -> BTreeFuture<'a, Vec<(String, Cid)>> {
635        Box::pin(async move {
636            let entries = self.tree.list_directory(&node).await?;
637            let sorted = sort_entries(entries);
638            let mut out = Vec::new();
639
640            if is_leaf_node(&sorted) {
641                for entry in sorted {
642                    if entry.link_type != LinkType::File {
643                        continue;
644                    }
645                    let key = unescape_key(&entry.name);
646                    if start.as_ref().is_some_and(|start| key < *start) {
647                        continue;
648                    }
649                    if end.as_ref().is_some_and(|end| key >= *end) {
650                        return Ok(out);
651                    }
652                    out.push((key, entry_cid(&entry)));
653                }
654                return Ok(out);
655            }
656
657            for (index, child) in sorted.iter().enumerate() {
658                let child_min = unescape_key(&child.name);
659                let child_max = sorted.get(index + 1).map(|entry| unescape_key(&entry.name));
660
661                if start.as_ref().is_some_and(|start| {
662                    child_max
663                        .as_ref()
664                        .is_some_and(|child_max| child_max <= start)
665                }) {
666                    continue;
667                }
668                if end.as_ref().is_some_and(|end| child_min >= *end) {
669                    return Ok(out);
670                }
671
672                out.extend(
673                    self.range_link_traverse(entry_cid(child), start.clone(), end.clone())
674                        .await?,
675                );
676            }
677
678            Ok(out)
679        })
680    }
681}
682
683#[derive(Debug, Clone)]
684struct InsertResult {
685    cid: Cid,
686    split: Option<SplitResult>,
687}
688
689pub fn escape_key(key: &str) -> String {
690    key.replace('%', "%25")
691        .replace('/', "%2F")
692        .replace('\0', "%00")
693}
694
695pub fn unescape_key(name: &str) -> String {
696    name.replace("%2F", "/")
697        .replace("%2f", "/")
698        .replace("%00", "\0")
699        .replace("%25", "%")
700}
701
702fn increment_prefix(value: &str) -> Option<String> {
703    if value.is_empty() {
704        return Some(String::new());
705    }
706
707    let mut chars: Vec<char> = value.chars().collect();
708    let last = chars.pop()?;
709    let next = char::from_u32(last as u32 + 1)?;
710    chars.push(next);
711    Some(chars.into_iter().collect())
712}
713
714fn cid_equals(left: &Cid, right: &Cid) -> bool {
715    left.hash == right.hash && left.key == right.key
716}
717
718fn is_leaf_node(entries: &[TreeEntry]) -> bool {
719    entries.is_empty() || entries.iter().any(|entry| entry.link_type != LinkType::Dir)
720}
721
722fn sort_entries(mut entries: Vec<TreeEntry>) -> Vec<TreeEntry> {
723    entries.sort_by(|left, right| compare_unescaped_names(&left.name, &right.name));
724    entries
725}
726
727fn compare_unescaped_names(left: &str, right: &str) -> Ordering {
728    unescape_key(left).cmp(&unescape_key(right))
729}
730
731fn find_child(entries: &[TreeEntry], key: &str) -> TreeEntry {
732    let sorted = sort_entries(entries.to_vec());
733    for window in sorted.windows(2) {
734        let next_name = unescape_key(&window[1].name);
735        if key < next_name.as_str() {
736            return window[0].clone();
737        }
738    }
739    sorted
740        .last()
741        .cloned()
742        .expect("internal nodes must have children")
743}
744
745fn entry_cid(entry: &TreeEntry) -> Cid {
746    Cid {
747        hash: entry.hash,
748        key: entry.key,
749    }
750}
751
752fn tree_entry_to_dir_entry(entry: TreeEntry) -> DirEntry {
753    let mut out = DirEntry::from_cid(&entry.name, &entry_cid(&entry))
754        .with_size(entry.size)
755        .with_link_type(entry.link_type);
756    if let Some(meta) = entry.meta {
757        out = out.with_meta(meta);
758    }
759    out
760}