Skip to main content

hashtree_collection/
source.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use hashtree_core::{Cid, Store};
5use hashtree_index::{BTree, BTreeOptions, SearchIndex};
6
7use crate::{
8    default_search_prefix, load_collection_state, CollectionDefinition, CollectionError,
9    CollectionState, SearchLinkResult, SearchOptions,
10};
11
12#[derive(Debug, Clone, PartialEq)]
13pub struct CollectionIndexLinkResult {
14    pub key: String,
15    pub cid: Cid,
16}
17
18struct CollectionSearchSource<S: Store> {
19    prefix: String,
20    index: SearchIndex<S>,
21}
22
23pub struct CollectionSource<S: Store> {
24    index: BTree<S>,
25    search_indexes: BTreeMap<String, CollectionSearchSource<S>>,
26    state: CollectionState,
27}
28
29impl<S: Store> CollectionSource<S> {
30    pub fn new(store: Arc<S>, state: CollectionState) -> Self {
31        Self {
32            index: BTree::new(store, BTreeOptions::default()),
33            search_indexes: BTreeMap::new(),
34            state,
35        }
36    }
37
38    pub fn with_definition<T>(
39        store: Arc<S>,
40        state: CollectionState,
41        definition: &CollectionDefinition<T>,
42    ) -> Self {
43        let search_indexes = definition
44            .search_indexes()
45            .iter()
46            .map(|index| {
47                (
48                    index.name().to_string(),
49                    CollectionSearchSource {
50                        prefix: index
51                            .prefix()
52                            .map(ToOwned::to_owned)
53                            .unwrap_or_else(|| default_search_prefix(index.name())),
54                        index: SearchIndex::new(Arc::clone(&store), index.options().clone()),
55                    },
56                )
57            })
58            .collect();
59
60        Self {
61            index: BTree::new(store, BTreeOptions::default()),
62            search_indexes,
63            state,
64        }
65    }
66
67    pub async fn from_root<T>(
68        store: Arc<S>,
69        definition: &CollectionDefinition<T>,
70        root: Option<&Cid>,
71    ) -> Result<Self, CollectionError> {
72        let state = load_collection_state(Arc::clone(&store), definition, root).await?;
73        Ok(Self::with_definition(store, state, definition))
74    }
75
76    pub fn state(&self) -> &CollectionState {
77        &self.state
78    }
79
80    pub async fn get(&self, id: &str) -> Result<Option<Cid>, CollectionError> {
81        Ok(self
82            .index
83            .get_link(self.state.by_id_root.as_ref(), id)
84            .await?)
85    }
86
87    pub async fn search(
88        &self,
89        index_name: &str,
90        query: &str,
91        options: SearchOptions,
92    ) -> Result<Vec<SearchLinkResult>, CollectionError> {
93        let Some(runtime) = self.search_indexes.get(index_name) else {
94            return Ok(Vec::new());
95        };
96        Ok(runtime
97            .index
98            .search_links(
99                self.state.search_root(index_name),
100                &runtime.prefix,
101                query,
102                options,
103            )
104            .await?)
105    }
106
107    pub async fn get_index_link(
108        &self,
109        index_name: &str,
110        key: &str,
111    ) -> Result<Option<Cid>, CollectionError> {
112        let root = self
113            .state
114            .key_root(index_name)
115            .or_else(|| self.state.search_root(index_name));
116        let Some(root) = root else {
117            return Ok(None);
118        };
119        Ok(self.index.get_link(Some(root), key).await?)
120    }
121
122    pub async fn query_index(
123        &self,
124        index_name: &str,
125        prefix: Option<&str>,
126        limit: Option<usize>,
127    ) -> Result<Vec<CollectionIndexLinkResult>, CollectionError> {
128        let root = self
129            .state
130            .key_root(index_name)
131            .or_else(|| self.state.search_root(index_name));
132        let Some(root) = root else {
133            return Ok(Vec::new());
134        };
135
136        let entries = if let Some(prefix) = prefix {
137            self.index.prefix_links(root, prefix).await?
138        } else {
139            self.index.links_entries(Some(root)).await?
140        };
141
142        let limit = limit.unwrap_or(usize::MAX);
143        Ok(entries
144            .into_iter()
145            .take(limit)
146            .map(|(key, cid)| CollectionIndexLinkResult { key, cid })
147            .collect())
148    }
149}