1use std::cmp::Ordering;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use hashtree_core::{
9 Cid, DirEntry, HashTree, HashTreeConfig, HashTreeError, LinkType, Store, TreeEntry,
10};
11
12const DEFAULT_ORDER: usize = 32;
13
14#[derive(Debug, Clone, Default)]
15pub struct BTreeOptions {
16 pub order: Option<usize>,
17}
18
19#[derive(Debug, thiserror::Error)]
20pub enum BTreeError {
21 #[error("hash tree error: {0}")]
22 HashTree(#[from] HashTreeError),
23 #[error("value was not valid utf-8: {0}")]
24 Utf8(#[from] std::string::FromUtf8Error),
25}
26
27#[derive(Debug, Clone)]
28struct SplitResult {
29 left: Cid,
30 right: Cid,
31 left_first_key: String,
32 right_first_key: String,
33}
34
35#[derive(Debug, Clone)]
36enum InsertValue {
37 String(String),
38 Link(Cid),
39}
40
41type BTreeFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, BTreeError>> + 'a>>;
42
43pub struct BTree<S: Store> {
44 tree: HashTree<S>,
45 max_keys: usize,
46}
47
48impl<S: Store> BTree<S> {
49 pub fn new(store: Arc<S>, options: BTreeOptions) -> Self {
50 let order = options.order.unwrap_or(DEFAULT_ORDER).max(2);
51 Self {
52 tree: HashTree::new(HashTreeConfig::new(store)),
53 max_keys: order - 1,
54 }
55 }
56
57 pub async fn insert(
58 &self,
59 root: Option<&Cid>,
60 key: &str,
61 value: &str,
62 ) -> Result<Cid, BTreeError> {
63 if let Some(root) = root {
64 if self.get(Some(root), key).await?.as_deref() == Some(value) {
65 return Ok(root.clone());
66 }
67
68 let result = self
69 .insert_recursive(
70 root.clone(),
71 key.to_string(),
72 InsertValue::String(value.to_string()),
73 )
74 .await?;
75 return self.finish_insert(result).await;
76 }
77
78 self.create_leaf(&[(key.to_string(), value.to_string())])
79 .await
80 }
81
82 pub async fn get(&self, root: Option<&Cid>, key: &str) -> Result<Option<String>, BTreeError> {
83 let Some(root) = root else {
84 return Ok(None);
85 };
86 self.get_recursive(root.clone(), key.to_string()).await
87 }
88
89 pub async fn insert_link(
90 &self,
91 root: Option<&Cid>,
92 key: &str,
93 target_cid: &Cid,
94 ) -> Result<Cid, BTreeError> {
95 if let Some(root) = root {
96 if self
97 .get_link(Some(root), key)
98 .await?
99 .is_some_and(|existing| cid_equals(&existing, target_cid))
100 {
101 return Ok(root.clone());
102 }
103
104 let result = self
105 .insert_recursive(
106 root.clone(),
107 key.to_string(),
108 InsertValue::Link(target_cid.clone()),
109 )
110 .await?;
111 return self.finish_insert(result).await;
112 }
113
114 self.create_leaf_with_links(&[(key.to_string(), target_cid.clone())])
115 .await
116 }
117
118 pub async fn insert_link_unchecked(
119 &self,
120 root: Option<&Cid>,
121 key: &str,
122 target_cid: &Cid,
123 ) -> Result<Cid, BTreeError> {
124 if let Some(root) = root {
125 let result = self
126 .insert_recursive(
127 root.clone(),
128 key.to_string(),
129 InsertValue::Link(target_cid.clone()),
130 )
131 .await?;
132 return self.finish_insert(result).await;
133 }
134
135 self.create_leaf_with_links(&[(key.to_string(), target_cid.clone())])
136 .await
137 }
138
139 pub async fn get_link(&self, root: Option<&Cid>, key: &str) -> Result<Option<Cid>, BTreeError> {
140 let Some(root) = root else {
141 return Ok(None);
142 };
143 self.get_link_recursive(root.clone(), key.to_string()).await
144 }
145
146 pub async fn entries(&self, root: Option<&Cid>) -> Result<Vec<(String, String)>, BTreeError> {
147 let Some(root) = root else {
148 return Ok(Vec::new());
149 };
150 self.traverse_in_order(root.clone()).await
151 }
152
153 pub async fn links_entries(
154 &self,
155 root: Option<&Cid>,
156 ) -> Result<Vec<(String, Cid)>, BTreeError> {
157 let Some(root) = root else {
158 return Ok(Vec::new());
159 };
160 self.traverse_links_in_order(root.clone()).await
161 }
162
163 pub async fn range(
164 &self,
165 root: &Cid,
166 start: Option<&str>,
167 end: Option<&str>,
168 ) -> Result<Vec<(String, String)>, BTreeError> {
169 self.range_traverse(
170 root.clone(),
171 start.map(ToOwned::to_owned),
172 end.map(ToOwned::to_owned),
173 )
174 .await
175 }
176
177 pub async fn prefix(
178 &self,
179 root: &Cid,
180 prefix: &str,
181 ) -> Result<Vec<(String, String)>, BTreeError> {
182 let end = increment_prefix(prefix);
183 self.range(root, Some(prefix), end.as_deref()).await
184 }
185
186 pub async fn prefix_links(
187 &self,
188 root: &Cid,
189 prefix: &str,
190 ) -> Result<Vec<(String, Cid)>, BTreeError> {
191 let end = increment_prefix(prefix);
192 self.range_link_traverse(root.clone(), Some(prefix.to_string()), end)
193 .await
194 }
195
196 pub async fn delete(&self, root: &Cid, key: &str) -> Result<Option<Cid>, BTreeError> {
197 self.delete_recursive(root.clone(), key.to_string()).await
198 }
199
200 pub async fn merge(
201 &self,
202 base: Option<&Cid>,
203 other: Option<&Cid>,
204 prefer_other: bool,
205 ) -> Result<Option<Cid>, BTreeError> {
206 let Some(other) = other else {
207 return Ok(base.cloned());
208 };
209 let Some(mut result) = base.cloned().or_else(|| Some(other.clone())) else {
210 return Ok(None);
211 };
212 if base.is_none() {
213 return Ok(Some(result));
214 }
215
216 for (key, value) in self.entries(Some(other)).await? {
217 let existing = self.get(Some(&result), &key).await?;
218 if existing.is_none() || prefer_other {
219 result = self.insert(Some(&result), &key, &value).await?;
220 }
221 }
222
223 Ok(Some(result))
224 }
225
226 pub async fn merge_links(
227 &self,
228 base: Option<&Cid>,
229 other: Option<&Cid>,
230 prefer_other: bool,
231 ) -> Result<Option<Cid>, BTreeError> {
232 let Some(other) = other else {
233 return Ok(base.cloned());
234 };
235 let Some(mut result) = base.cloned().or_else(|| Some(other.clone())) else {
236 return Ok(None);
237 };
238 if base.is_none() {
239 return Ok(Some(result));
240 }
241
242 for (key, value) in self.links_entries(Some(other)).await? {
243 let existing = self.get_link(Some(&result), &key).await?;
244 if existing.is_none() || prefer_other {
245 result = self.insert_link(Some(&result), &key, &value).await?;
246 }
247 }
248
249 Ok(Some(result))
250 }
251
252 async fn finish_insert(&self, result: InsertResult) -> Result<Cid, BTreeError> {
253 if let Some(split) = result.split {
254 return self
255 .create_internal_root(
256 &split.left_first_key,
257 &split.left,
258 &split.right_first_key,
259 &split.right,
260 )
261 .await;
262 }
263 Ok(result.cid)
264 }
265
266 fn get_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<String>> {
267 Box::pin(async move {
268 let entries = self.tree.list_directory(&root).await?;
269 if is_leaf_node(&entries) {
270 let escaped = escape_key(&key);
271 let Some(entry) = entries.iter().find(|entry| entry.name == escaped) else {
272 return Ok(None);
273 };
274 if entry.link_type != LinkType::Blob {
275 return Ok(None);
276 }
277
278 let cid = entry_cid(entry);
279 let Some(data) = self.tree.get(&cid, None).await? else {
280 return Ok(None);
281 };
282 return Ok(Some(String::from_utf8(data)?));
283 }
284
285 let child = find_child(&entries, &key);
286 self.get_recursive(entry_cid(&child), key).await
287 })
288 }
289
290 fn get_link_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<Cid>> {
291 Box::pin(async move {
292 let entries = self.tree.list_directory(&root).await?;
293 if is_leaf_node(&entries) {
294 let escaped = escape_key(&key);
295 let Some(entry) = entries.iter().find(|entry| entry.name == escaped) else {
296 return Ok(None);
297 };
298 if entry.link_type != LinkType::File {
299 return Ok(None);
300 }
301 return Ok(Some(entry_cid(entry)));
302 }
303
304 let child = find_child(&entries, &key);
305 self.get_link_recursive(entry_cid(&child), key).await
306 })
307 }
308
309 fn insert_recursive<'a>(
310 &'a self,
311 node: Cid,
312 key: String,
313 value: InsertValue,
314 ) -> BTreeFuture<'a, InsertResult> {
315 Box::pin(async move {
316 let entries = self.tree.list_directory(&node).await?;
317 if is_leaf_node(&entries) {
318 return self.insert_into_leaf(node, entries, key, value).await;
319 }
320 self.insert_into_internal(node, entries, key, value).await
321 })
322 }
323
324 fn insert_into_leaf<'a>(
325 &'a self,
326 node: Cid,
327 _entries: Vec<TreeEntry>,
328 key: String,
329 value: InsertValue,
330 ) -> BTreeFuture<'a, InsertResult> {
331 Box::pin(async move {
332 let escaped_key = escape_key(&key);
333 let (entry_cid, size, link_type) = match value {
334 InsertValue::String(value) => {
335 let (cid, size) = self.tree.put_file(value.as_bytes()).await?;
336 (cid, size, LinkType::Blob)
337 }
338 InsertValue::Link(cid) => (cid, 0, LinkType::File),
339 };
340
341 let new_node = self
342 .tree
343 .set_entry(&node, &[], &escaped_key, &entry_cid, size, link_type)
344 .await?;
345
346 let new_entries = self.tree.list_directory(&new_node).await?;
347 if new_entries.len() > self.max_keys {
348 return Ok(InsertResult {
349 cid: new_node,
350 split: Some(self.split_leaf(new_entries).await?),
351 });
352 }
353
354 Ok(InsertResult {
355 cid: new_node,
356 split: None,
357 })
358 })
359 }
360
361 fn insert_into_internal<'a>(
362 &'a self,
363 node: Cid,
364 entries: Vec<TreeEntry>,
365 key: String,
366 value: InsertValue,
367 ) -> BTreeFuture<'a, InsertResult> {
368 Box::pin(async move {
369 let child = find_child(&entries, &key);
370 let child_name = child.name.clone();
371 let child_cid = entry_cid(&child);
372 let result = self.insert_recursive(child_cid, key, value).await?;
373
374 let mut new_node = self
375 .tree
376 .set_entry(&node, &[], &child_name, &result.cid, 0, LinkType::Dir)
377 .await?;
378
379 if let Some(split) = result.split {
380 new_node = self.tree.remove_entry(&new_node, &[], &child_name).await?;
381 new_node = self
382 .tree
383 .set_entry(
384 &new_node,
385 &[],
386 &escape_key(&split.left_first_key),
387 &split.left,
388 0,
389 LinkType::Dir,
390 )
391 .await?;
392 new_node = self
393 .tree
394 .set_entry(
395 &new_node,
396 &[],
397 &escape_key(&split.right_first_key),
398 &split.right,
399 0,
400 LinkType::Dir,
401 )
402 .await?;
403 }
404
405 let new_entries = self.tree.list_directory(&new_node).await?;
406 if new_entries.len() > self.max_keys {
407 return Ok(InsertResult {
408 cid: new_node,
409 split: Some(self.split_internal(new_entries).await?),
410 });
411 }
412
413 Ok(InsertResult {
414 cid: new_node,
415 split: None,
416 })
417 })
418 }
419
420 async fn split_leaf(&self, entries: Vec<TreeEntry>) -> Result<SplitResult, BTreeError> {
421 let sorted = sort_entries(entries);
422 let mid = sorted.len() / 2;
423 let left_entries = &sorted[..mid];
424 let right_entries = &sorted[mid..];
425
426 let left = self.create_node_from_entries(left_entries).await?;
427 let right = self.create_node_from_entries(right_entries).await?;
428
429 Ok(SplitResult {
430 left,
431 right,
432 left_first_key: unescape_key(&left_entries[0].name),
433 right_first_key: unescape_key(&right_entries[0].name),
434 })
435 }
436
437 async fn split_internal(&self, entries: Vec<TreeEntry>) -> Result<SplitResult, BTreeError> {
438 let sorted = sort_entries(entries);
439 let mid = sorted.len() / 2;
440 let left_entries = &sorted[..mid];
441 let right_entries = &sorted[mid..];
442
443 let left = self.create_node_from_entries(left_entries).await?;
444 let right = self.create_node_from_entries(right_entries).await?;
445
446 Ok(SplitResult {
447 left,
448 right,
449 left_first_key: unescape_key(&left_entries[0].name),
450 right_first_key: unescape_key(&right_entries[0].name),
451 })
452 }
453
454 async fn create_leaf(&self, items: &[(String, String)]) -> Result<Cid, BTreeError> {
455 let mut entries = Vec::with_capacity(items.len());
456 for (key, value) in items {
457 let (cid, size) = self.tree.put_file(value.as_bytes()).await?;
458 entries.push(
459 DirEntry::from_cid(escape_key(key), &cid)
460 .with_size(size)
461 .with_link_type(LinkType::Blob),
462 );
463 }
464 Ok(self.tree.put_directory(entries).await?)
465 }
466
467 async fn create_leaf_with_links(&self, items: &[(String, Cid)]) -> Result<Cid, BTreeError> {
468 let entries: Vec<DirEntry> = items
469 .iter()
470 .map(|(key, cid)| {
471 DirEntry::from_cid(escape_key(key), cid).with_link_type(LinkType::File)
472 })
473 .collect();
474 Ok(self.tree.put_directory(entries).await?)
475 }
476
477 async fn create_internal_root(
478 &self,
479 left_key: &str,
480 left: &Cid,
481 right_key: &str,
482 right: &Cid,
483 ) -> Result<Cid, BTreeError> {
484 let entries = vec![
485 DirEntry::from_cid(escape_key(left_key), left).with_link_type(LinkType::Dir),
486 DirEntry::from_cid(escape_key(right_key), right).with_link_type(LinkType::Dir),
487 ];
488 Ok(self.tree.put_directory(entries).await?)
489 }
490
491 async fn create_node_from_entries(&self, entries: &[TreeEntry]) -> Result<Cid, BTreeError> {
492 let dir_entries = entries
493 .iter()
494 .cloned()
495 .map(tree_entry_to_dir_entry)
496 .collect::<Vec<_>>();
497 Ok(self.tree.put_directory(dir_entries).await?)
498 }
499
500 fn delete_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<Cid>> {
501 Box::pin(async move {
502 let entries = self.tree.list_directory(&root).await?;
503 if is_leaf_node(&entries) {
504 let escaped = escape_key(&key);
505 if !entries.iter().any(|entry| entry.name == escaped) {
506 return Ok(Some(root));
507 }
508
509 let new_root = self.tree.remove_entry(&root, &[], &escaped).await?;
510 let new_entries = self.tree.list_directory(&new_root).await?;
511 if new_entries.is_empty() {
512 return Ok(None);
513 }
514 return Ok(Some(new_root));
515 }
516
517 let child = find_child(&entries, &key);
518 let child_name = child.name.clone();
519 let new_child = self.delete_recursive(entry_cid(&child), key).await?;
520
521 let Some(new_child) = new_child else {
522 let new_root = self.tree.remove_entry(&root, &[], &child_name).await?;
523 let new_entries = self.tree.list_directory(&new_root).await?;
524 if new_entries.is_empty() {
525 return Ok(None);
526 }
527 if new_entries.len() == 1 && new_entries[0].link_type == LinkType::Dir {
528 return Ok(Some(entry_cid(&new_entries[0])));
529 }
530 return Ok(Some(new_root));
531 };
532
533 if cid_equals(&new_child, &entry_cid(&child)) {
534 return Ok(Some(root));
535 }
536
537 let updated = self
538 .tree
539 .set_entry(&root, &[], &child_name, &new_child, 0, LinkType::Dir)
540 .await?;
541 Ok(Some(updated))
542 })
543 }
544
545 fn traverse_in_order<'a>(&'a self, node: Cid) -> BTreeFuture<'a, Vec<(String, String)>> {
546 Box::pin(async move {
547 let entries = self.tree.list_directory(&node).await?;
548 let sorted = sort_entries(entries);
549 let mut out = Vec::new();
550
551 if is_leaf_node(&sorted) {
552 for entry in sorted {
553 if entry.link_type != LinkType::Blob {
554 continue;
555 }
556 let cid = entry_cid(&entry);
557 if let Some(data) = self.tree.get(&cid, None).await? {
558 out.push((unescape_key(&entry.name), String::from_utf8(data)?));
559 }
560 }
561 return Ok(out);
562 }
563
564 for child in sorted {
565 out.extend(self.traverse_in_order(entry_cid(&child)).await?);
566 }
567 Ok(out)
568 })
569 }
570
571 fn traverse_links_in_order<'a>(&'a self, node: Cid) -> BTreeFuture<'a, Vec<(String, Cid)>> {
572 Box::pin(async move {
573 let entries = self.tree.list_directory(&node).await?;
574 let sorted = sort_entries(entries);
575 let mut out = Vec::new();
576
577 if is_leaf_node(&sorted) {
578 for entry in sorted {
579 if entry.link_type == LinkType::File {
580 out.push((unescape_key(&entry.name), entry_cid(&entry)));
581 }
582 }
583 return Ok(out);
584 }
585
586 for child in sorted {
587 out.extend(self.traverse_links_in_order(entry_cid(&child)).await?);
588 }
589 Ok(out)
590 })
591 }
592
593 fn range_traverse<'a>(
594 &'a self,
595 node: Cid,
596 start: Option<String>,
597 end: Option<String>,
598 ) -> BTreeFuture<'a, Vec<(String, String)>> {
599 Box::pin(async move {
600 let entries = self.tree.list_directory(&node).await?;
601 let sorted = sort_entries(entries);
602 let mut out = Vec::new();
603
604 if is_leaf_node(&sorted) {
605 for entry in sorted {
606 if entry.link_type != LinkType::Blob {
607 continue;
608 }
609 let key = unescape_key(&entry.name);
610 if start.as_ref().is_some_and(|start| key < *start) {
611 continue;
612 }
613 if end.as_ref().is_some_and(|end| key >= *end) {
614 return Ok(out);
615 }
616
617 let cid = entry_cid(&entry);
618 if let Some(data) = self.tree.get(&cid, None).await? {
619 out.push((key, String::from_utf8(data)?));
620 }
621 }
622 return Ok(out);
623 }
624
625 for (index, child) in sorted.iter().enumerate() {
626 let child_min = unescape_key(&child.name);
627 let child_max = sorted.get(index + 1).map(|entry| unescape_key(&entry.name));
628
629 if start.as_ref().is_some_and(|start| {
630 child_max
631 .as_ref()
632 .is_some_and(|child_max| child_max <= start)
633 }) {
634 continue;
635 }
636 if end.as_ref().is_some_and(|end| child_min >= *end) {
637 return Ok(out);
638 }
639
640 out.extend(
641 self.range_traverse(entry_cid(child), start.clone(), end.clone())
642 .await?,
643 );
644 }
645
646 Ok(out)
647 })
648 }
649
650 fn range_link_traverse<'a>(
651 &'a self,
652 node: Cid,
653 start: Option<String>,
654 end: Option<String>,
655 ) -> BTreeFuture<'a, Vec<(String, Cid)>> {
656 Box::pin(async move {
657 let entries = self.tree.list_directory(&node).await?;
658 let sorted = sort_entries(entries);
659 let mut out = Vec::new();
660
661 if is_leaf_node(&sorted) {
662 for entry in sorted {
663 if entry.link_type != LinkType::File {
664 continue;
665 }
666 let key = unescape_key(&entry.name);
667 if start.as_ref().is_some_and(|start| key < *start) {
668 continue;
669 }
670 if end.as_ref().is_some_and(|end| key >= *end) {
671 return Ok(out);
672 }
673 out.push((key, entry_cid(&entry)));
674 }
675 return Ok(out);
676 }
677
678 for (index, child) in sorted.iter().enumerate() {
679 let child_min = unescape_key(&child.name);
680 let child_max = sorted.get(index + 1).map(|entry| unescape_key(&entry.name));
681
682 if start.as_ref().is_some_and(|start| {
683 child_max
684 .as_ref()
685 .is_some_and(|child_max| child_max <= start)
686 }) {
687 continue;
688 }
689 if end.as_ref().is_some_and(|end| child_min >= *end) {
690 return Ok(out);
691 }
692
693 out.extend(
694 self.range_link_traverse(entry_cid(child), start.clone(), end.clone())
695 .await?,
696 );
697 }
698
699 Ok(out)
700 })
701 }
702}
703
704#[derive(Debug, Clone)]
705struct InsertResult {
706 cid: Cid,
707 split: Option<SplitResult>,
708}
709
710pub fn escape_key(key: &str) -> String {
711 key.replace('%', "%25")
712 .replace('/', "%2F")
713 .replace('\0', "%00")
714}
715
716pub fn unescape_key(name: &str) -> String {
717 name.replace("%2F", "/")
718 .replace("%2f", "/")
719 .replace("%00", "\0")
720 .replace("%25", "%")
721}
722
723fn increment_prefix(value: &str) -> Option<String> {
724 if value.is_empty() {
725 return Some(String::new());
726 }
727
728 let mut chars: Vec<char> = value.chars().collect();
729 let last = chars.pop()?;
730 let next = char::from_u32(last as u32 + 1)?;
731 chars.push(next);
732 Some(chars.into_iter().collect())
733}
734
735fn cid_equals(left: &Cid, right: &Cid) -> bool {
736 left.hash == right.hash && left.key == right.key
737}
738
739fn is_leaf_node(entries: &[TreeEntry]) -> bool {
740 entries.is_empty() || entries.iter().any(|entry| entry.link_type != LinkType::Dir)
741}
742
743fn sort_entries(mut entries: Vec<TreeEntry>) -> Vec<TreeEntry> {
744 entries.sort_by(|left, right| compare_unescaped_names(&left.name, &right.name));
745 entries
746}
747
748fn compare_unescaped_names(left: &str, right: &str) -> Ordering {
749 unescape_key(left).cmp(&unescape_key(right))
750}
751
752fn find_child(entries: &[TreeEntry], key: &str) -> TreeEntry {
753 let sorted = sort_entries(entries.to_vec());
754 for window in sorted.windows(2) {
755 let next_name = unescape_key(&window[1].name);
756 if key < next_name.as_str() {
757 return window[0].clone();
758 }
759 }
760 sorted
761 .last()
762 .cloned()
763 .expect("internal nodes must have children")
764}
765
766fn entry_cid(entry: &TreeEntry) -> Cid {
767 Cid {
768 hash: entry.hash,
769 key: entry.key,
770 }
771}
772
773fn tree_entry_to_dir_entry(entry: TreeEntry) -> DirEntry {
774 let mut out = DirEntry::from_cid(&entry.name, &entry_cid(&entry))
775 .with_size(entry.size)
776 .with_link_type(entry.link_type);
777 if let Some(meta) = entry.meta {
778 out = out.with_meta(meta);
779 }
780 out
781}