1use std::cmp::Ordering;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7
8use hashtree_core::{
9 Cid, DirEntry, HashTree, HashTreeConfig, HashTreeError, LinkType, Store, TreeEntry,
10};
11
12const DEFAULT_ORDER: usize = 32;
13
14#[derive(Debug, Clone, Default)]
15pub struct BTreeOptions {
16 pub order: Option<usize>,
17}
18
19#[derive(Debug, thiserror::Error)]
20pub enum BTreeError {
21 #[error("hash tree error: {0}")]
22 HashTree(#[from] HashTreeError),
23 #[error("value was not valid utf-8: {0}")]
24 Utf8(#[from] std::string::FromUtf8Error),
25}
26
27#[derive(Debug, Clone)]
28struct SplitResult {
29 left: Cid,
30 right: Cid,
31 left_first_key: String,
32 right_first_key: String,
33}
34
35#[derive(Debug, Clone)]
36enum InsertValue {
37 String(String),
38 Link(Cid),
39}
40
41type BTreeFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, BTreeError>> + 'a>>;
42
43pub struct BTree<S: Store> {
44 tree: HashTree<S>,
45 max_keys: usize,
46}
47
48impl<S: Store> BTree<S> {
49 pub fn new(store: Arc<S>, options: BTreeOptions) -> Self {
50 let order = options.order.unwrap_or(DEFAULT_ORDER).max(2);
51 Self {
52 tree: HashTree::new(HashTreeConfig::new(store)),
53 max_keys: order - 1,
54 }
55 }
56
57 pub async fn insert(
58 &self,
59 root: Option<&Cid>,
60 key: &str,
61 value: &str,
62 ) -> Result<Cid, BTreeError> {
63 if let Some(root) = root {
64 if self.get(Some(root), key).await?.as_deref() == Some(value) {
65 return Ok(root.clone());
66 }
67
68 let result = self
69 .insert_recursive(
70 root.clone(),
71 key.to_string(),
72 InsertValue::String(value.to_string()),
73 )
74 .await?;
75 return self.finish_insert(result).await;
76 }
77
78 self.create_leaf(&[(key.to_string(), value.to_string())])
79 .await
80 }
81
82 pub async fn get(&self, root: Option<&Cid>, key: &str) -> Result<Option<String>, BTreeError> {
83 let Some(root) = root else {
84 return Ok(None);
85 };
86 self.get_recursive(root.clone(), key.to_string()).await
87 }
88
89 pub async fn insert_link(
90 &self,
91 root: Option<&Cid>,
92 key: &str,
93 target_cid: &Cid,
94 ) -> Result<Cid, BTreeError> {
95 if let Some(root) = root {
96 if self
97 .get_link(Some(root), key)
98 .await?
99 .is_some_and(|existing| cid_equals(&existing, target_cid))
100 {
101 return Ok(root.clone());
102 }
103
104 let result = self
105 .insert_recursive(
106 root.clone(),
107 key.to_string(),
108 InsertValue::Link(target_cid.clone()),
109 )
110 .await?;
111 return self.finish_insert(result).await;
112 }
113
114 self.create_leaf_with_links(&[(key.to_string(), target_cid.clone())])
115 .await
116 }
117
118 pub async fn get_link(&self, root: Option<&Cid>, key: &str) -> Result<Option<Cid>, BTreeError> {
119 let Some(root) = root else {
120 return Ok(None);
121 };
122 self.get_link_recursive(root.clone(), key.to_string()).await
123 }
124
125 pub async fn entries(&self, root: Option<&Cid>) -> Result<Vec<(String, String)>, BTreeError> {
126 let Some(root) = root else {
127 return Ok(Vec::new());
128 };
129 self.traverse_in_order(root.clone()).await
130 }
131
132 pub async fn links_entries(
133 &self,
134 root: Option<&Cid>,
135 ) -> Result<Vec<(String, Cid)>, BTreeError> {
136 let Some(root) = root else {
137 return Ok(Vec::new());
138 };
139 self.traverse_links_in_order(root.clone()).await
140 }
141
142 pub async fn range(
143 &self,
144 root: &Cid,
145 start: Option<&str>,
146 end: Option<&str>,
147 ) -> Result<Vec<(String, String)>, BTreeError> {
148 self.range_traverse(
149 root.clone(),
150 start.map(ToOwned::to_owned),
151 end.map(ToOwned::to_owned),
152 )
153 .await
154 }
155
156 pub async fn prefix(
157 &self,
158 root: &Cid,
159 prefix: &str,
160 ) -> Result<Vec<(String, String)>, BTreeError> {
161 let end = increment_prefix(prefix);
162 self.range(root, Some(prefix), end.as_deref()).await
163 }
164
165 pub async fn prefix_links(
166 &self,
167 root: &Cid,
168 prefix: &str,
169 ) -> Result<Vec<(String, Cid)>, BTreeError> {
170 let end = increment_prefix(prefix);
171 self.range_link_traverse(root.clone(), Some(prefix.to_string()), end)
172 .await
173 }
174
175 pub async fn delete(&self, root: &Cid, key: &str) -> Result<Option<Cid>, BTreeError> {
176 self.delete_recursive(root.clone(), key.to_string()).await
177 }
178
179 pub async fn merge(
180 &self,
181 base: Option<&Cid>,
182 other: Option<&Cid>,
183 prefer_other: bool,
184 ) -> Result<Option<Cid>, BTreeError> {
185 let Some(other) = other else {
186 return Ok(base.cloned());
187 };
188 let Some(mut result) = base.cloned().or_else(|| Some(other.clone())) else {
189 return Ok(None);
190 };
191 if base.is_none() {
192 return Ok(Some(result));
193 }
194
195 for (key, value) in self.entries(Some(other)).await? {
196 let existing = self.get(Some(&result), &key).await?;
197 if existing.is_none() || prefer_other {
198 result = self.insert(Some(&result), &key, &value).await?;
199 }
200 }
201
202 Ok(Some(result))
203 }
204
205 pub async fn merge_links(
206 &self,
207 base: Option<&Cid>,
208 other: Option<&Cid>,
209 prefer_other: bool,
210 ) -> Result<Option<Cid>, BTreeError> {
211 let Some(other) = other else {
212 return Ok(base.cloned());
213 };
214 let Some(mut result) = base.cloned().or_else(|| Some(other.clone())) else {
215 return Ok(None);
216 };
217 if base.is_none() {
218 return Ok(Some(result));
219 }
220
221 for (key, value) in self.links_entries(Some(other)).await? {
222 let existing = self.get_link(Some(&result), &key).await?;
223 if existing.is_none() || prefer_other {
224 result = self.insert_link(Some(&result), &key, &value).await?;
225 }
226 }
227
228 Ok(Some(result))
229 }
230
231 async fn finish_insert(&self, result: InsertResult) -> Result<Cid, BTreeError> {
232 if let Some(split) = result.split {
233 return self
234 .create_internal_root(
235 &split.left_first_key,
236 &split.left,
237 &split.right_first_key,
238 &split.right,
239 )
240 .await;
241 }
242 Ok(result.cid)
243 }
244
245 fn get_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<String>> {
246 Box::pin(async move {
247 let entries = self.tree.list_directory(&root).await?;
248 if is_leaf_node(&entries) {
249 let escaped = escape_key(&key);
250 let Some(entry) = entries.iter().find(|entry| entry.name == escaped) else {
251 return Ok(None);
252 };
253 if entry.link_type != LinkType::Blob {
254 return Ok(None);
255 }
256
257 let cid = entry_cid(entry);
258 let Some(data) = self.tree.get(&cid, None).await? else {
259 return Ok(None);
260 };
261 return Ok(Some(String::from_utf8(data)?));
262 }
263
264 let child = find_child(&entries, &key);
265 self.get_recursive(entry_cid(&child), key).await
266 })
267 }
268
269 fn get_link_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<Cid>> {
270 Box::pin(async move {
271 let entries = self.tree.list_directory(&root).await?;
272 if is_leaf_node(&entries) {
273 let escaped = escape_key(&key);
274 let Some(entry) = entries.iter().find(|entry| entry.name == escaped) else {
275 return Ok(None);
276 };
277 if entry.link_type != LinkType::File {
278 return Ok(None);
279 }
280 return Ok(Some(entry_cid(entry)));
281 }
282
283 let child = find_child(&entries, &key);
284 self.get_link_recursive(entry_cid(&child), key).await
285 })
286 }
287
288 fn insert_recursive<'a>(
289 &'a self,
290 node: Cid,
291 key: String,
292 value: InsertValue,
293 ) -> BTreeFuture<'a, InsertResult> {
294 Box::pin(async move {
295 let entries = self.tree.list_directory(&node).await?;
296 if is_leaf_node(&entries) {
297 return self.insert_into_leaf(node, entries, key, value).await;
298 }
299 self.insert_into_internal(node, entries, key, value).await
300 })
301 }
302
303 fn insert_into_leaf<'a>(
304 &'a self,
305 node: Cid,
306 _entries: Vec<TreeEntry>,
307 key: String,
308 value: InsertValue,
309 ) -> BTreeFuture<'a, InsertResult> {
310 Box::pin(async move {
311 let escaped_key = escape_key(&key);
312 let (entry_cid, size, link_type) = match value {
313 InsertValue::String(value) => {
314 let (cid, size) = self.tree.put_file(value.as_bytes()).await?;
315 (cid, size, LinkType::Blob)
316 }
317 InsertValue::Link(cid) => (cid, 0, LinkType::File),
318 };
319
320 let new_node = self
321 .tree
322 .set_entry(&node, &[], &escaped_key, &entry_cid, size, link_type)
323 .await?;
324
325 let new_entries = self.tree.list_directory(&new_node).await?;
326 if new_entries.len() > self.max_keys {
327 return Ok(InsertResult {
328 cid: new_node,
329 split: Some(self.split_leaf(new_entries).await?),
330 });
331 }
332
333 Ok(InsertResult {
334 cid: new_node,
335 split: None,
336 })
337 })
338 }
339
340 fn insert_into_internal<'a>(
341 &'a self,
342 node: Cid,
343 entries: Vec<TreeEntry>,
344 key: String,
345 value: InsertValue,
346 ) -> BTreeFuture<'a, InsertResult> {
347 Box::pin(async move {
348 let child = find_child(&entries, &key);
349 let child_name = child.name.clone();
350 let child_cid = entry_cid(&child);
351 let result = self.insert_recursive(child_cid, key, value).await?;
352
353 let mut new_node = self
354 .tree
355 .set_entry(&node, &[], &child_name, &result.cid, 0, LinkType::Dir)
356 .await?;
357
358 if let Some(split) = result.split {
359 new_node = self.tree.remove_entry(&new_node, &[], &child_name).await?;
360 new_node = self
361 .tree
362 .set_entry(
363 &new_node,
364 &[],
365 &escape_key(&split.left_first_key),
366 &split.left,
367 0,
368 LinkType::Dir,
369 )
370 .await?;
371 new_node = self
372 .tree
373 .set_entry(
374 &new_node,
375 &[],
376 &escape_key(&split.right_first_key),
377 &split.right,
378 0,
379 LinkType::Dir,
380 )
381 .await?;
382 }
383
384 let new_entries = self.tree.list_directory(&new_node).await?;
385 if new_entries.len() > self.max_keys {
386 return Ok(InsertResult {
387 cid: new_node,
388 split: Some(self.split_internal(new_entries).await?),
389 });
390 }
391
392 Ok(InsertResult {
393 cid: new_node,
394 split: None,
395 })
396 })
397 }
398
399 async fn split_leaf(&self, entries: Vec<TreeEntry>) -> Result<SplitResult, BTreeError> {
400 let sorted = sort_entries(entries);
401 let mid = sorted.len() / 2;
402 let left_entries = &sorted[..mid];
403 let right_entries = &sorted[mid..];
404
405 let left = self.create_node_from_entries(left_entries).await?;
406 let right = self.create_node_from_entries(right_entries).await?;
407
408 Ok(SplitResult {
409 left,
410 right,
411 left_first_key: unescape_key(&left_entries[0].name),
412 right_first_key: unescape_key(&right_entries[0].name),
413 })
414 }
415
416 async fn split_internal(&self, entries: Vec<TreeEntry>) -> Result<SplitResult, BTreeError> {
417 let sorted = sort_entries(entries);
418 let mid = sorted.len() / 2;
419 let left_entries = &sorted[..mid];
420 let right_entries = &sorted[mid..];
421
422 let left = self.create_node_from_entries(left_entries).await?;
423 let right = self.create_node_from_entries(right_entries).await?;
424
425 Ok(SplitResult {
426 left,
427 right,
428 left_first_key: unescape_key(&left_entries[0].name),
429 right_first_key: unescape_key(&right_entries[0].name),
430 })
431 }
432
433 async fn create_leaf(&self, items: &[(String, String)]) -> Result<Cid, BTreeError> {
434 let mut entries = Vec::with_capacity(items.len());
435 for (key, value) in items {
436 let (cid, size) = self.tree.put_file(value.as_bytes()).await?;
437 entries.push(
438 DirEntry::from_cid(escape_key(key), &cid)
439 .with_size(size)
440 .with_link_type(LinkType::Blob),
441 );
442 }
443 Ok(self.tree.put_directory(entries).await?)
444 }
445
446 async fn create_leaf_with_links(&self, items: &[(String, Cid)]) -> Result<Cid, BTreeError> {
447 let entries: Vec<DirEntry> = items
448 .iter()
449 .map(|(key, cid)| {
450 DirEntry::from_cid(escape_key(key), cid).with_link_type(LinkType::File)
451 })
452 .collect();
453 Ok(self.tree.put_directory(entries).await?)
454 }
455
456 async fn create_internal_root(
457 &self,
458 left_key: &str,
459 left: &Cid,
460 right_key: &str,
461 right: &Cid,
462 ) -> Result<Cid, BTreeError> {
463 let entries = vec![
464 DirEntry::from_cid(escape_key(left_key), left).with_link_type(LinkType::Dir),
465 DirEntry::from_cid(escape_key(right_key), right).with_link_type(LinkType::Dir),
466 ];
467 Ok(self.tree.put_directory(entries).await?)
468 }
469
470 async fn create_node_from_entries(&self, entries: &[TreeEntry]) -> Result<Cid, BTreeError> {
471 let dir_entries = entries
472 .iter()
473 .cloned()
474 .map(tree_entry_to_dir_entry)
475 .collect::<Vec<_>>();
476 Ok(self.tree.put_directory(dir_entries).await?)
477 }
478
479 fn delete_recursive<'a>(&'a self, root: Cid, key: String) -> BTreeFuture<'a, Option<Cid>> {
480 Box::pin(async move {
481 let entries = self.tree.list_directory(&root).await?;
482 if is_leaf_node(&entries) {
483 let escaped = escape_key(&key);
484 if !entries.iter().any(|entry| entry.name == escaped) {
485 return Ok(Some(root));
486 }
487
488 let new_root = self.tree.remove_entry(&root, &[], &escaped).await?;
489 let new_entries = self.tree.list_directory(&new_root).await?;
490 if new_entries.is_empty() {
491 return Ok(None);
492 }
493 return Ok(Some(new_root));
494 }
495
496 let child = find_child(&entries, &key);
497 let child_name = child.name.clone();
498 let new_child = self.delete_recursive(entry_cid(&child), key).await?;
499
500 let Some(new_child) = new_child else {
501 let new_root = self.tree.remove_entry(&root, &[], &child_name).await?;
502 let new_entries = self.tree.list_directory(&new_root).await?;
503 if new_entries.is_empty() {
504 return Ok(None);
505 }
506 if new_entries.len() == 1 && new_entries[0].link_type == LinkType::Dir {
507 return Ok(Some(entry_cid(&new_entries[0])));
508 }
509 return Ok(Some(new_root));
510 };
511
512 if cid_equals(&new_child, &entry_cid(&child)) {
513 return Ok(Some(root));
514 }
515
516 let updated = self
517 .tree
518 .set_entry(&root, &[], &child_name, &new_child, 0, LinkType::Dir)
519 .await?;
520 Ok(Some(updated))
521 })
522 }
523
524 fn traverse_in_order<'a>(&'a self, node: Cid) -> BTreeFuture<'a, Vec<(String, String)>> {
525 Box::pin(async move {
526 let entries = self.tree.list_directory(&node).await?;
527 let sorted = sort_entries(entries);
528 let mut out = Vec::new();
529
530 if is_leaf_node(&sorted) {
531 for entry in sorted {
532 if entry.link_type != LinkType::Blob {
533 continue;
534 }
535 let cid = entry_cid(&entry);
536 if let Some(data) = self.tree.get(&cid, None).await? {
537 out.push((unescape_key(&entry.name), String::from_utf8(data)?));
538 }
539 }
540 return Ok(out);
541 }
542
543 for child in sorted {
544 out.extend(self.traverse_in_order(entry_cid(&child)).await?);
545 }
546 Ok(out)
547 })
548 }
549
550 fn traverse_links_in_order<'a>(&'a self, node: Cid) -> BTreeFuture<'a, Vec<(String, Cid)>> {
551 Box::pin(async move {
552 let entries = self.tree.list_directory(&node).await?;
553 let sorted = sort_entries(entries);
554 let mut out = Vec::new();
555
556 if is_leaf_node(&sorted) {
557 for entry in sorted {
558 if entry.link_type == LinkType::File {
559 out.push((unescape_key(&entry.name), entry_cid(&entry)));
560 }
561 }
562 return Ok(out);
563 }
564
565 for child in sorted {
566 out.extend(self.traverse_links_in_order(entry_cid(&child)).await?);
567 }
568 Ok(out)
569 })
570 }
571
572 fn range_traverse<'a>(
573 &'a self,
574 node: Cid,
575 start: Option<String>,
576 end: Option<String>,
577 ) -> BTreeFuture<'a, Vec<(String, String)>> {
578 Box::pin(async move {
579 let entries = self.tree.list_directory(&node).await?;
580 let sorted = sort_entries(entries);
581 let mut out = Vec::new();
582
583 if is_leaf_node(&sorted) {
584 for entry in sorted {
585 if entry.link_type != LinkType::Blob {
586 continue;
587 }
588 let key = unescape_key(&entry.name);
589 if start.as_ref().is_some_and(|start| key < *start) {
590 continue;
591 }
592 if end.as_ref().is_some_and(|end| key >= *end) {
593 return Ok(out);
594 }
595
596 let cid = entry_cid(&entry);
597 if let Some(data) = self.tree.get(&cid, None).await? {
598 out.push((key, String::from_utf8(data)?));
599 }
600 }
601 return Ok(out);
602 }
603
604 for (index, child) in sorted.iter().enumerate() {
605 let child_min = unescape_key(&child.name);
606 let child_max = sorted.get(index + 1).map(|entry| unescape_key(&entry.name));
607
608 if start.as_ref().is_some_and(|start| {
609 child_max
610 .as_ref()
611 .is_some_and(|child_max| child_max <= start)
612 }) {
613 continue;
614 }
615 if end.as_ref().is_some_and(|end| child_min >= *end) {
616 return Ok(out);
617 }
618
619 out.extend(
620 self.range_traverse(entry_cid(child), start.clone(), end.clone())
621 .await?,
622 );
623 }
624
625 Ok(out)
626 })
627 }
628
629 fn range_link_traverse<'a>(
630 &'a self,
631 node: Cid,
632 start: Option<String>,
633 end: Option<String>,
634 ) -> BTreeFuture<'a, Vec<(String, Cid)>> {
635 Box::pin(async move {
636 let entries = self.tree.list_directory(&node).await?;
637 let sorted = sort_entries(entries);
638 let mut out = Vec::new();
639
640 if is_leaf_node(&sorted) {
641 for entry in sorted {
642 if entry.link_type != LinkType::File {
643 continue;
644 }
645 let key = unescape_key(&entry.name);
646 if start.as_ref().is_some_and(|start| key < *start) {
647 continue;
648 }
649 if end.as_ref().is_some_and(|end| key >= *end) {
650 return Ok(out);
651 }
652 out.push((key, entry_cid(&entry)));
653 }
654 return Ok(out);
655 }
656
657 for (index, child) in sorted.iter().enumerate() {
658 let child_min = unescape_key(&child.name);
659 let child_max = sorted.get(index + 1).map(|entry| unescape_key(&entry.name));
660
661 if start.as_ref().is_some_and(|start| {
662 child_max
663 .as_ref()
664 .is_some_and(|child_max| child_max <= start)
665 }) {
666 continue;
667 }
668 if end.as_ref().is_some_and(|end| child_min >= *end) {
669 return Ok(out);
670 }
671
672 out.extend(
673 self.range_link_traverse(entry_cid(child), start.clone(), end.clone())
674 .await?,
675 );
676 }
677
678 Ok(out)
679 })
680 }
681}
682
683#[derive(Debug, Clone)]
684struct InsertResult {
685 cid: Cid,
686 split: Option<SplitResult>,
687}
688
689pub fn escape_key(key: &str) -> String {
690 key.replace('%', "%25")
691 .replace('/', "%2F")
692 .replace('\0', "%00")
693}
694
695pub fn unescape_key(name: &str) -> String {
696 name.replace("%2F", "/")
697 .replace("%2f", "/")
698 .replace("%00", "\0")
699 .replace("%25", "%")
700}
701
702fn increment_prefix(value: &str) -> Option<String> {
703 if value.is_empty() {
704 return Some(String::new());
705 }
706
707 let mut chars: Vec<char> = value.chars().collect();
708 let last = chars.pop()?;
709 let next = char::from_u32(last as u32 + 1)?;
710 chars.push(next);
711 Some(chars.into_iter().collect())
712}
713
714fn cid_equals(left: &Cid, right: &Cid) -> bool {
715 left.hash == right.hash && left.key == right.key
716}
717
718fn is_leaf_node(entries: &[TreeEntry]) -> bool {
719 entries.is_empty() || entries.iter().any(|entry| entry.link_type != LinkType::Dir)
720}
721
722fn sort_entries(mut entries: Vec<TreeEntry>) -> Vec<TreeEntry> {
723 entries.sort_by(|left, right| compare_unescaped_names(&left.name, &right.name));
724 entries
725}
726
727fn compare_unescaped_names(left: &str, right: &str) -> Ordering {
728 unescape_key(left).cmp(&unescape_key(right))
729}
730
731fn find_child(entries: &[TreeEntry], key: &str) -> TreeEntry {
732 let sorted = sort_entries(entries.to_vec());
733 for window in sorted.windows(2) {
734 let next_name = unescape_key(&window[1].name);
735 if key < next_name.as_str() {
736 return window[0].clone();
737 }
738 }
739 sorted
740 .last()
741 .cloned()
742 .expect("internal nodes must have children")
743}
744
745fn entry_cid(entry: &TreeEntry) -> Cid {
746 Cid {
747 hash: entry.hash,
748 key: entry.key,
749 }
750}
751
752fn tree_entry_to_dir_entry(entry: TreeEntry) -> DirEntry {
753 let mut out = DirEntry::from_cid(&entry.name, &entry_cid(&entry))
754 .with_size(entry.size)
755 .with_link_type(entry.link_type);
756 if let Some(meta) = entry.meta {
757 out = out.with_meta(meta);
758 }
759 out
760}