Skip to main content

hashtree_collection/
writer.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use hashtree_core::{Cid, DirEntry, HashTree, HashTreeConfig, LinkType, Store};
5use hashtree_index::{BTree, BTreeOptions, SearchIndex};
6
7use crate::helpers::merge_search_index_options;
8use crate::manifest::write_collection_manifest_metadata;
9use crate::schema::normalize_typed_collection_item;
10use crate::{
11    create_empty_collection_state, default_search_prefix, load_collection_state,
12    CollectionDefinition, CollectionEntryContext, CollectionError, CollectionOptions,
13    CollectionState, CollectionWriteContext, COLLECTION_MANIFEST_METADATA_FILE, MANIFEST_BY_ID,
14};
15
16pub struct CollectionWriter<S: Store, T> {
17    tree: HashTree<S>,
18    index: BTree<S>,
19    search_indexes: BTreeMap<String, SearchIndex<S>>,
20    definition: CollectionDefinition<T>,
21    state: CollectionState,
22}
23
24impl<S: Store, T> CollectionWriter<S, T> {
25    pub fn new(store: Arc<S>, definition: CollectionDefinition<T>) -> Self {
26        Self::with_options(store, definition, CollectionOptions::default())
27    }
28
29    pub fn with_options(
30        store: Arc<S>,
31        definition: CollectionDefinition<T>,
32        options: CollectionOptions,
33    ) -> Self {
34        let state = create_empty_collection_state(&definition);
35        Self::with_state_and_options(store, definition, state, options)
36    }
37
38    pub fn with_state(
39        store: Arc<S>,
40        definition: CollectionDefinition<T>,
41        state: CollectionState,
42    ) -> Self {
43        Self::with_state_and_options(store, definition, state, CollectionOptions::default())
44    }
45
46    pub fn with_state_and_options(
47        store: Arc<S>,
48        definition: CollectionDefinition<T>,
49        state: CollectionState,
50        options: CollectionOptions,
51    ) -> Self {
52        let search_indexes = definition
53            .search_indexes()
54            .iter()
55            .map(|index| {
56                (
57                    index.name().to_string(),
58                    SearchIndex::new(
59                        Arc::clone(&store),
60                        merge_search_index_options(index.options().clone(), &options),
61                    ),
62                )
63            })
64            .collect();
65
66        Self {
67            tree: HashTree::new(HashTreeConfig::new(Arc::clone(&store))),
68            index: BTree::new(
69                store,
70                BTreeOptions {
71                    order: options.btree_order,
72                },
73            ),
74            search_indexes,
75            definition,
76            state,
77        }
78    }
79
80    pub async fn from_root(
81        store: Arc<S>,
82        definition: CollectionDefinition<T>,
83        root: Option<&Cid>,
84        options: CollectionOptions,
85    ) -> Result<Self, CollectionError> {
86        let state = load_collection_state(Arc::clone(&store), &definition, root).await?;
87        Ok(Self::with_state_and_options(
88            store, definition, state, options,
89        ))
90    }
91
92    pub fn snapshot(&self) -> CollectionState {
93        self.state.clone()
94    }
95
96    pub fn state(&self) -> &CollectionState {
97        &self.state
98    }
99
100    pub async fn write_root(&self) -> Result<Option<Cid>, CollectionError> {
101        let mut entries = Vec::new();
102        if let Some(cid) = self.state.by_id_root.as_ref() {
103            entries.push(DirEntry::from_cid(MANIFEST_BY_ID, cid).with_link_type(LinkType::Dir));
104        }
105        for index in self.definition.key_indexes() {
106            if let Some(cid) = self.state.key_root(index.name()) {
107                entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
108            }
109        }
110        for index in self.definition.search_indexes() {
111            if let Some(cid) = self.state.search_root(index.name()) {
112                entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
113            }
114        }
115        if let Some((cid, size)) =
116            write_collection_manifest_metadata(&self.tree, &self.definition).await?
117        {
118            entries.push(
119                DirEntry::from_cid(COLLECTION_MANIFEST_METADATA_FILE, &cid)
120                    .with_size(size)
121                    .with_link_type(LinkType::File),
122            );
123        }
124
125        if entries.is_empty() {
126            return Ok(None);
127        }
128
129        Ok(Some(self.tree.put_directory(entries).await?))
130    }
131
132    fn read_search_root_group(&self, root_name: &str) -> Option<Cid> {
133        for index in self.definition.search_indexes() {
134            if index.root_name().unwrap_or(index.name()) == root_name {
135                return self.state.search_root(index.name()).cloned();
136            }
137        }
138        None
139    }
140
141    fn assign_search_root_groups(&mut self, groups: &BTreeMap<String, Option<Cid>>) {
142        for index in self.definition.search_indexes() {
143            let root_name = index.root_name().unwrap_or(index.name());
144            if let Some(root) = groups.get(root_name) {
145                self.state
146                    .search_roots
147                    .insert(index.name().to_string(), root.clone());
148            }
149        }
150    }
151}
152
153impl<S: Store, T: Clone> CollectionWriter<S, T> {
154    pub fn normalize(&self, item: &T) -> Result<T, CollectionError> {
155        normalize_typed_collection_item(&self.definition, item)
156    }
157
158    pub async fn put(
159        &mut self,
160        item: &T,
161        cid: &Cid,
162        previous: Option<&T>,
163    ) -> Result<CollectionState, CollectionError> {
164        self.put_with_context(item, cid, previous, None, None).await
165    }
166
167    pub async fn put_with_context(
168        &mut self,
169        item: &T,
170        cid: &Cid,
171        previous: Option<&T>,
172        context: Option<&CollectionWriteContext>,
173        previous_context: Option<&CollectionWriteContext>,
174    ) -> Result<CollectionState, CollectionError> {
175        let next_item = self.normalize(item)?;
176        let previous_item = previous
177            .map(|previous| self.normalize(previous))
178            .transpose()?;
179
180        if let Some(previous_item) = previous_item.as_ref() {
181            self.delete_normalized_with_context(previous_item, previous_context.or(context))
182                .await?;
183        }
184
185        self.put_normalized_with_context(&next_item, cid, context)
186            .await
187    }
188
189    pub async fn delete(&mut self, item: &T) -> Result<CollectionState, CollectionError> {
190        self.delete_with_context(item, None).await
191    }
192
193    pub async fn delete_with_context(
194        &mut self,
195        item: &T,
196        context: Option<&CollectionWriteContext>,
197    ) -> Result<CollectionState, CollectionError> {
198        let next_item = self.normalize(item)?;
199        self.delete_normalized_with_context(&next_item, context)
200            .await
201    }
202
203    pub async fn rebuild<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
204    where
205        I: IntoIterator<Item = (T, Cid)>,
206    {
207        let mut final_entries = BTreeMap::<String, (T, Cid)>::new();
208        for (item, cid) in entries {
209            let next_item = self.normalize(&item)?;
210            let id = self.definition.item_id(&next_item)?;
211            final_entries.insert(id, (next_item, cid));
212        }
213
214        if self.definition.search_indexes().is_empty() {
215            return self.rebuild_without_search(final_entries).await;
216        }
217
218        self.state = create_empty_collection_state(&self.definition);
219        for (_id, (item, cid)) in final_entries {
220            self.put_normalized_with_context(&item, &cid, None).await?;
221        }
222        Ok(self.snapshot())
223    }
224
225    pub async fn reindex<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
226    where
227        I: IntoIterator<Item = (T, Cid)>,
228    {
229        self.rebuild(entries).await
230    }
231
232    pub async fn rebuild_with_context<I>(
233        &mut self,
234        entries: I,
235    ) -> Result<CollectionState, CollectionError>
236    where
237        I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
238    {
239        let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
240        for (item, cid, context) in entries {
241            let next_item = self.normalize(&item)?;
242            let id = self.definition.item_id(&next_item)?;
243            final_entries.insert(id, (next_item, cid, context));
244        }
245
246        self.state = create_empty_collection_state(&self.definition);
247        for (_id, (item, cid, context)) in final_entries {
248            self.put_normalized_with_context(&item, &cid, context.as_ref())
249                .await?;
250        }
251        Ok(self.snapshot())
252    }
253
254    pub async fn reindex_with_context<I>(
255        &mut self,
256        entries: I,
257    ) -> Result<CollectionState, CollectionError>
258    where
259        I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
260    {
261        self.rebuild_with_context(entries).await
262    }
263
264    async fn put_normalized_with_context(
265        &mut self,
266        item: &T,
267        cid: &Cid,
268        context: Option<&CollectionWriteContext>,
269    ) -> Result<CollectionState, CollectionError> {
270        let id = self.definition.item_id(item)?;
271        self.state.by_id_root = Some(
272            self.index
273                .insert_link(self.state.by_id_root.as_ref(), &id, cid)
274                .await?,
275        );
276
277        for index in self.definition.key_indexes() {
278            let mut root = self.state.key_root(index.name()).cloned();
279            for key in index.materialize_keys(item) {
280                root = Some(self.index.insert_link(root.as_ref(), &key, cid).await?);
281            }
282            self.state.key_roots.insert(index.name().to_string(), root);
283        }
284
285        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
286        let entry_context = CollectionEntryContext {
287            id: &id,
288            cid: Some(cid),
289            write_context: context,
290        };
291        for index in self.definition.search_indexes() {
292            let Some(search_index) = self.search_indexes.get(index.name()) else {
293                continue;
294            };
295
296            let root_name = index.root_name().unwrap_or(index.name()).to_string();
297            let mut root = search_root_groups
298                .get(&root_name)
299                .cloned()
300                .unwrap_or_else(|| self.read_search_root_group(&root_name));
301
302            for entry in index.materialize_entries(item, &entry_context) {
303                let terms = search_index.parse_keywords(&entry.text);
304                if terms.is_empty() {
305                    continue;
306                }
307                let entry_id = entry.id.as_deref().unwrap_or(&id);
308                let Some(entry_cid) = entry.cid.as_ref().or(Some(cid)) else {
309                    continue;
310                };
311                let default_prefix = default_search_prefix(index.name());
312                root = Some(
313                    search_index
314                        .index_link(
315                            root.as_ref(),
316                            entry
317                                .prefix
318                                .as_deref()
319                                .or_else(|| index.prefix())
320                                .unwrap_or(&default_prefix),
321                            &terms,
322                            entry_id,
323                            entry_cid,
324                        )
325                        .await?,
326                );
327            }
328
329            search_root_groups.insert(root_name, root);
330        }
331
332        if !search_root_groups.is_empty() {
333            self.assign_search_root_groups(&search_root_groups);
334        }
335
336        Ok(self.snapshot())
337    }
338
339    async fn delete_normalized_with_context(
340        &mut self,
341        item: &T,
342        context: Option<&CollectionWriteContext>,
343    ) -> Result<CollectionState, CollectionError> {
344        let id = self.definition.item_id(item)?;
345        if let Some(root) = self.state.by_id_root.as_ref() {
346            self.state.by_id_root = self.index.delete(root, &id).await?;
347        }
348
349        for index in self.definition.key_indexes() {
350            let mut root = self.state.key_root(index.name()).cloned();
351            for key in index.materialize_keys(item) {
352                let Some(active_root) = root.as_ref() else {
353                    break;
354                };
355                root = self.index.delete(active_root, &key).await?;
356            }
357            self.state.key_roots.insert(index.name().to_string(), root);
358        }
359
360        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
361        let entry_context = CollectionEntryContext {
362            id: &id,
363            cid: None,
364            write_context: context,
365        };
366        for index in self.definition.search_indexes() {
367            let Some(search_index) = self.search_indexes.get(index.name()) else {
368                continue;
369            };
370
371            let root_name = index.root_name().unwrap_or(index.name()).to_string();
372            let mut root = search_root_groups
373                .get(&root_name)
374                .cloned()
375                .unwrap_or_else(|| self.read_search_root_group(&root_name));
376            let Some(existing_root) = root.clone() else {
377                continue;
378            };
379            root = Some(existing_root);
380
381            for entry in index.materialize_entries(item, &entry_context) {
382                let terms = search_index.parse_keywords(&entry.text);
383                if terms.is_empty() {
384                    continue;
385                }
386                let entry_id = entry.id.as_deref().unwrap_or(&id);
387                let Some(active_root) = root.as_ref() else {
388                    break;
389                };
390                let default_prefix = default_search_prefix(index.name());
391                root = search_index
392                    .remove_link(
393                        active_root,
394                        entry
395                            .prefix
396                            .as_deref()
397                            .or_else(|| index.prefix())
398                            .unwrap_or(&default_prefix),
399                        &terms,
400                        entry_id,
401                    )
402                    .await?;
403            }
404
405            search_root_groups.insert(root_name, root);
406        }
407
408        if !search_root_groups.is_empty() {
409            self.assign_search_root_groups(&search_root_groups);
410        } else {
411            for index in self.definition.search_indexes() {
412                let root_name = index.root_name().unwrap_or(index.name()).to_string();
413                self.state.search_roots.insert(
414                    index.name().to_string(),
415                    self.read_search_root_group(&root_name),
416                );
417            }
418        }
419
420        Ok(self.snapshot())
421    }
422
423    async fn rebuild_without_search(
424        &mut self,
425        final_entries: BTreeMap<String, (T, Cid)>,
426    ) -> Result<CollectionState, CollectionError> {
427        let mut by_id = BTreeMap::<String, Cid>::new();
428        let mut key_roots = self
429            .definition
430            .key_indexes()
431            .iter()
432            .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
433            .collect::<BTreeMap<_, _>>();
434
435        for (id, (item, cid)) in final_entries {
436            by_id.insert(id, cid.clone());
437            for index in self.definition.key_indexes() {
438                let root = key_roots
439                    .get_mut(index.name())
440                    .expect("collection key root must exist");
441                for key in index.materialize_keys(&item) {
442                    root.insert(key, cid.clone());
443                }
444            }
445        }
446
447        self.state = create_empty_collection_state(&self.definition);
448        self.state.by_id_root = self.index.build_links(by_id).await?;
449        for index in self.definition.key_indexes() {
450            let root = self
451                .index
452                .build_links(key_roots.remove(index.name()).unwrap_or_default())
453                .await?;
454            self.state.key_roots.insert(index.name().to_string(), root);
455        }
456
457        Ok(self.snapshot())
458    }
459}