hashtree_collection/
source.rs1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use hashtree_core::{Cid, Store};
5use hashtree_index::{BTree, BTreeOptions, SearchIndex};
6
7use crate::{
8 default_search_prefix, load_collection_state, CollectionDefinition, CollectionError,
9 CollectionState, SearchLinkResult, SearchOptions,
10};
11
12#[derive(Debug, Clone, PartialEq)]
13pub struct CollectionIndexLinkResult {
14 pub key: String,
15 pub cid: Cid,
16}
17
18struct CollectionSearchSource<S: Store> {
19 prefix: String,
20 index: SearchIndex<S>,
21}
22
23pub struct CollectionSource<S: Store> {
24 index: BTree<S>,
25 search_indexes: BTreeMap<String, CollectionSearchSource<S>>,
26 state: CollectionState,
27}
28
29impl<S: Store> CollectionSource<S> {
30 pub fn new(store: Arc<S>, state: CollectionState) -> Self {
31 Self {
32 index: BTree::new(store, BTreeOptions::default()),
33 search_indexes: BTreeMap::new(),
34 state,
35 }
36 }
37
38 pub fn with_definition<T>(
39 store: Arc<S>,
40 state: CollectionState,
41 definition: &CollectionDefinition<T>,
42 ) -> Self {
43 let search_indexes = definition
44 .search_indexes()
45 .iter()
46 .map(|index| {
47 (
48 index.name().to_string(),
49 CollectionSearchSource {
50 prefix: index
51 .prefix()
52 .map(ToOwned::to_owned)
53 .unwrap_or_else(|| default_search_prefix(index.name())),
54 index: SearchIndex::new(Arc::clone(&store), index.options().clone()),
55 },
56 )
57 })
58 .collect();
59
60 Self {
61 index: BTree::new(store, BTreeOptions::default()),
62 search_indexes,
63 state,
64 }
65 }
66
67 pub async fn from_root<T>(
68 store: Arc<S>,
69 definition: &CollectionDefinition<T>,
70 root: Option<&Cid>,
71 ) -> Result<Self, CollectionError> {
72 let state = load_collection_state(Arc::clone(&store), definition, root).await?;
73 Ok(Self::with_definition(store, state, definition))
74 }
75
76 pub fn state(&self) -> &CollectionState {
77 &self.state
78 }
79
80 pub async fn get(&self, id: &str) -> Result<Option<Cid>, CollectionError> {
81 Ok(self
82 .index
83 .get_link(self.state.by_id_root.as_ref(), id)
84 .await?)
85 }
86
87 pub async fn search(
88 &self,
89 index_name: &str,
90 query: &str,
91 options: SearchOptions,
92 ) -> Result<Vec<SearchLinkResult>, CollectionError> {
93 let Some(runtime) = self.search_indexes.get(index_name) else {
94 return Ok(Vec::new());
95 };
96 Ok(runtime
97 .index
98 .search_links(
99 self.state.search_root(index_name),
100 &runtime.prefix,
101 query,
102 options,
103 )
104 .await?)
105 }
106
107 pub async fn get_index_link(
108 &self,
109 index_name: &str,
110 key: &str,
111 ) -> Result<Option<Cid>, CollectionError> {
112 let root = self
113 .state
114 .key_root(index_name)
115 .or_else(|| self.state.search_root(index_name));
116 let Some(root) = root else {
117 return Ok(None);
118 };
119 Ok(self.index.get_link(Some(root), key).await?)
120 }
121
122 pub async fn query_index(
123 &self,
124 index_name: &str,
125 prefix: Option<&str>,
126 limit: Option<usize>,
127 ) -> Result<Vec<CollectionIndexLinkResult>, CollectionError> {
128 let root = self
129 .state
130 .key_root(index_name)
131 .or_else(|| self.state.search_root(index_name));
132 let Some(root) = root else {
133 return Ok(Vec::new());
134 };
135
136 let entries = if let Some(prefix) = prefix {
137 self.index.prefix_links(root, prefix).await?
138 } else {
139 self.index.links_entries(Some(root)).await?
140 };
141
142 let limit = limit.unwrap_or(usize::MAX);
143 Ok(entries
144 .into_iter()
145 .take(limit)
146 .map(|(key, cid)| CollectionIndexLinkResult { key, cid })
147 .collect())
148 }
149}