Skip to main content

hashtree_collection/
writer.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use hashtree_core::{Cid, DirEntry, HashTree, HashTreeConfig, LinkType, Store};
5use hashtree_index::{BTree, BTreeOptions, SearchIndex};
6
7use crate::helpers::merge_search_index_options;
8use crate::schema::normalize_typed_collection_item;
9use crate::{
10    create_empty_collection_state, default_search_prefix, load_collection_state,
11    CollectionDefinition, CollectionEntryContext, CollectionError, CollectionOptions,
12    CollectionState, CollectionWriteContext, MANIFEST_BY_ID,
13};
14
15pub struct CollectionWriter<S: Store, T> {
16    tree: HashTree<S>,
17    index: BTree<S>,
18    search_indexes: BTreeMap<String, SearchIndex<S>>,
19    definition: CollectionDefinition<T>,
20    state: CollectionState,
21}
22
23impl<S: Store, T> CollectionWriter<S, T> {
24    pub fn new(store: Arc<S>, definition: CollectionDefinition<T>) -> Self {
25        Self::with_options(store, definition, CollectionOptions::default())
26    }
27
28    pub fn with_options(
29        store: Arc<S>,
30        definition: CollectionDefinition<T>,
31        options: CollectionOptions,
32    ) -> Self {
33        let state = create_empty_collection_state(&definition);
34        Self::with_state_and_options(store, definition, state, options)
35    }
36
37    pub fn with_state(
38        store: Arc<S>,
39        definition: CollectionDefinition<T>,
40        state: CollectionState,
41    ) -> Self {
42        Self::with_state_and_options(store, definition, state, CollectionOptions::default())
43    }
44
45    pub fn with_state_and_options(
46        store: Arc<S>,
47        definition: CollectionDefinition<T>,
48        state: CollectionState,
49        options: CollectionOptions,
50    ) -> Self {
51        let search_indexes = definition
52            .search_indexes()
53            .iter()
54            .map(|index| {
55                (
56                    index.name().to_string(),
57                    SearchIndex::new(
58                        Arc::clone(&store),
59                        merge_search_index_options(index.options().clone(), &options),
60                    ),
61                )
62            })
63            .collect();
64
65        Self {
66            tree: HashTree::new(HashTreeConfig::new(Arc::clone(&store))),
67            index: BTree::new(
68                store,
69                BTreeOptions {
70                    order: options.btree_order,
71                },
72            ),
73            search_indexes,
74            definition,
75            state,
76        }
77    }
78
79    pub async fn from_root(
80        store: Arc<S>,
81        definition: CollectionDefinition<T>,
82        root: Option<&Cid>,
83        options: CollectionOptions,
84    ) -> Result<Self, CollectionError> {
85        let state = load_collection_state(Arc::clone(&store), &definition, root).await?;
86        Ok(Self::with_state_and_options(
87            store, definition, state, options,
88        ))
89    }
90
91    pub fn snapshot(&self) -> CollectionState {
92        self.state.clone()
93    }
94
95    pub fn state(&self) -> &CollectionState {
96        &self.state
97    }
98
99    pub async fn write_root(&self) -> Result<Option<Cid>, CollectionError> {
100        let mut entries = Vec::new();
101        if let Some(cid) = self.state.by_id_root.as_ref() {
102            entries.push(DirEntry::from_cid(MANIFEST_BY_ID, cid).with_link_type(LinkType::Dir));
103        }
104        for index in self.definition.key_indexes() {
105            if let Some(cid) = self.state.key_root(index.name()) {
106                entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
107            }
108        }
109        for index in self.definition.search_indexes() {
110            if let Some(cid) = self.state.search_root(index.name()) {
111                entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
112            }
113        }
114
115        if entries.is_empty() {
116            return Ok(None);
117        }
118
119        Ok(Some(self.tree.put_directory(entries).await?))
120    }
121
122    fn read_search_root_group(&self, root_name: &str) -> Option<Cid> {
123        for index in self.definition.search_indexes() {
124            if index.root_name().unwrap_or(index.name()) == root_name {
125                return self.state.search_root(index.name()).cloned();
126            }
127        }
128        None
129    }
130
131    fn assign_search_root_groups(&mut self, groups: &BTreeMap<String, Option<Cid>>) {
132        for index in self.definition.search_indexes() {
133            let root_name = index.root_name().unwrap_or(index.name());
134            if let Some(root) = groups.get(root_name) {
135                self.state
136                    .search_roots
137                    .insert(index.name().to_string(), root.clone());
138            }
139        }
140    }
141}
142
143impl<S: Store, T: Clone> CollectionWriter<S, T> {
144    pub fn normalize(&self, item: &T) -> Result<T, CollectionError> {
145        normalize_typed_collection_item(&self.definition, item)
146    }
147
148    pub async fn put(
149        &mut self,
150        item: &T,
151        cid: &Cid,
152        previous: Option<&T>,
153    ) -> Result<CollectionState, CollectionError> {
154        self.put_with_context(item, cid, previous, None, None).await
155    }
156
157    pub async fn put_with_context(
158        &mut self,
159        item: &T,
160        cid: &Cid,
161        previous: Option<&T>,
162        context: Option<&CollectionWriteContext>,
163        previous_context: Option<&CollectionWriteContext>,
164    ) -> Result<CollectionState, CollectionError> {
165        let next_item = self.normalize(item)?;
166        let previous_item = previous
167            .map(|previous| self.normalize(previous))
168            .transpose()?;
169
170        if let Some(previous_item) = previous_item.as_ref() {
171            self.delete_normalized_with_context(previous_item, previous_context.or(context))
172                .await?;
173        }
174
175        self.put_normalized_with_context(&next_item, cid, context)
176            .await
177    }
178
179    pub async fn delete(&mut self, item: &T) -> Result<CollectionState, CollectionError> {
180        self.delete_with_context(item, None).await
181    }
182
183    pub async fn delete_with_context(
184        &mut self,
185        item: &T,
186        context: Option<&CollectionWriteContext>,
187    ) -> Result<CollectionState, CollectionError> {
188        let next_item = self.normalize(item)?;
189        self.delete_normalized_with_context(&next_item, context)
190            .await
191    }
192
193    pub async fn rebuild<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
194    where
195        I: IntoIterator<Item = (T, Cid)>,
196    {
197        let mut final_entries = BTreeMap::<String, (T, Cid)>::new();
198        for (item, cid) in entries {
199            let next_item = self.normalize(&item)?;
200            let id = self.definition.item_id(&next_item)?;
201            final_entries.insert(id, (next_item, cid));
202        }
203
204        if self.definition.search_indexes().is_empty() {
205            return self.rebuild_without_search(final_entries).await;
206        }
207
208        self.state = create_empty_collection_state(&self.definition);
209        for (_id, (item, cid)) in final_entries {
210            self.put_normalized_with_context(&item, &cid, None).await?;
211        }
212        Ok(self.snapshot())
213    }
214
215    pub async fn reindex<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
216    where
217        I: IntoIterator<Item = (T, Cid)>,
218    {
219        self.rebuild(entries).await
220    }
221
222    pub async fn rebuild_with_context<I>(
223        &mut self,
224        entries: I,
225    ) -> Result<CollectionState, CollectionError>
226    where
227        I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
228    {
229        let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
230        for (item, cid, context) in entries {
231            let next_item = self.normalize(&item)?;
232            let id = self.definition.item_id(&next_item)?;
233            final_entries.insert(id, (next_item, cid, context));
234        }
235
236        self.state = create_empty_collection_state(&self.definition);
237        for (_id, (item, cid, context)) in final_entries {
238            self.put_normalized_with_context(&item, &cid, context.as_ref())
239                .await?;
240        }
241        Ok(self.snapshot())
242    }
243
244    pub async fn reindex_with_context<I>(
245        &mut self,
246        entries: I,
247    ) -> Result<CollectionState, CollectionError>
248    where
249        I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
250    {
251        self.rebuild_with_context(entries).await
252    }
253
254    async fn put_normalized_with_context(
255        &mut self,
256        item: &T,
257        cid: &Cid,
258        context: Option<&CollectionWriteContext>,
259    ) -> Result<CollectionState, CollectionError> {
260        let id = self.definition.item_id(item)?;
261        self.state.by_id_root = Some(
262            self.index
263                .insert_link(self.state.by_id_root.as_ref(), &id, cid)
264                .await?,
265        );
266
267        for index in self.definition.key_indexes() {
268            let mut root = self.state.key_root(index.name()).cloned();
269            for key in index.materialize_keys(item) {
270                root = Some(self.index.insert_link(root.as_ref(), &key, cid).await?);
271            }
272            self.state.key_roots.insert(index.name().to_string(), root);
273        }
274
275        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
276        let entry_context = CollectionEntryContext {
277            id: &id,
278            cid: Some(cid),
279            write_context: context,
280        };
281        for index in self.definition.search_indexes() {
282            let Some(search_index) = self.search_indexes.get(index.name()) else {
283                continue;
284            };
285
286            let root_name = index.root_name().unwrap_or(index.name()).to_string();
287            let mut root = search_root_groups
288                .get(&root_name)
289                .cloned()
290                .unwrap_or_else(|| self.read_search_root_group(&root_name));
291
292            for entry in index.materialize_entries(item, &entry_context) {
293                let terms = search_index.parse_keywords(&entry.text);
294                if terms.is_empty() {
295                    continue;
296                }
297                let entry_id = entry.id.as_deref().unwrap_or(&id);
298                let Some(entry_cid) = entry.cid.as_ref().or(Some(cid)) else {
299                    continue;
300                };
301                let default_prefix = default_search_prefix(index.name());
302                root = Some(
303                    search_index
304                        .index_link(
305                            root.as_ref(),
306                            entry
307                                .prefix
308                                .as_deref()
309                                .or_else(|| index.prefix())
310                                .unwrap_or(&default_prefix),
311                            &terms,
312                            entry_id,
313                            entry_cid,
314                        )
315                        .await?,
316                );
317            }
318
319            search_root_groups.insert(root_name, root);
320        }
321
322        if !search_root_groups.is_empty() {
323            self.assign_search_root_groups(&search_root_groups);
324        }
325
326        Ok(self.snapshot())
327    }
328
329    async fn delete_normalized_with_context(
330        &mut self,
331        item: &T,
332        context: Option<&CollectionWriteContext>,
333    ) -> Result<CollectionState, CollectionError> {
334        let id = self.definition.item_id(item)?;
335        if let Some(root) = self.state.by_id_root.as_ref() {
336            self.state.by_id_root = self.index.delete(root, &id).await?;
337        }
338
339        for index in self.definition.key_indexes() {
340            let mut root = self.state.key_root(index.name()).cloned();
341            for key in index.materialize_keys(item) {
342                let Some(active_root) = root.as_ref() else {
343                    break;
344                };
345                root = self.index.delete(active_root, &key).await?;
346            }
347            self.state.key_roots.insert(index.name().to_string(), root);
348        }
349
350        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
351        let entry_context = CollectionEntryContext {
352            id: &id,
353            cid: None,
354            write_context: context,
355        };
356        for index in self.definition.search_indexes() {
357            let Some(search_index) = self.search_indexes.get(index.name()) else {
358                continue;
359            };
360
361            let root_name = index.root_name().unwrap_or(index.name()).to_string();
362            let mut root = search_root_groups
363                .get(&root_name)
364                .cloned()
365                .unwrap_or_else(|| self.read_search_root_group(&root_name));
366            let Some(existing_root) = root.clone() else {
367                continue;
368            };
369            root = Some(existing_root);
370
371            for entry in index.materialize_entries(item, &entry_context) {
372                let terms = search_index.parse_keywords(&entry.text);
373                if terms.is_empty() {
374                    continue;
375                }
376                let entry_id = entry.id.as_deref().unwrap_or(&id);
377                let Some(active_root) = root.as_ref() else {
378                    break;
379                };
380                let default_prefix = default_search_prefix(index.name());
381                root = search_index
382                    .remove_link(
383                        active_root,
384                        entry
385                            .prefix
386                            .as_deref()
387                            .or_else(|| index.prefix())
388                            .unwrap_or(&default_prefix),
389                        &terms,
390                        entry_id,
391                    )
392                    .await?;
393            }
394
395            search_root_groups.insert(root_name, root);
396        }
397
398        if !search_root_groups.is_empty() {
399            self.assign_search_root_groups(&search_root_groups);
400        } else {
401            for index in self.definition.search_indexes() {
402                let root_name = index.root_name().unwrap_or(index.name()).to_string();
403                self.state.search_roots.insert(
404                    index.name().to_string(),
405                    self.read_search_root_group(&root_name),
406                );
407            }
408        }
409
410        Ok(self.snapshot())
411    }
412
413    async fn rebuild_without_search(
414        &mut self,
415        final_entries: BTreeMap<String, (T, Cid)>,
416    ) -> Result<CollectionState, CollectionError> {
417        let mut by_id = BTreeMap::<String, Cid>::new();
418        let mut key_roots = self
419            .definition
420            .key_indexes()
421            .iter()
422            .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
423            .collect::<BTreeMap<_, _>>();
424
425        for (id, (item, cid)) in final_entries {
426            by_id.insert(id, cid.clone());
427            for index in self.definition.key_indexes() {
428                let root = key_roots
429                    .get_mut(index.name())
430                    .expect("collection key root must exist");
431                for key in index.materialize_keys(&item) {
432                    root.insert(key, cid.clone());
433                }
434            }
435        }
436
437        self.state = create_empty_collection_state(&self.definition);
438        self.state.by_id_root = self.index.build_links(by_id).await?;
439        for index in self.definition.key_indexes() {
440            let root = self
441                .index
442                .build_links(key_roots.remove(index.name()).unwrap_or_default())
443                .await?;
444            self.state.key_roots.insert(index.name().to_string(), root);
445        }
446
447        Ok(self.snapshot())
448    }
449}