Skip to main content

hashtree_collection/
writer.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use hashtree_core::{Cid, DirEntry, HashTree, HashTreeConfig, LinkType, Store};
5use hashtree_index::{BTree, BTreeOptions, SearchIndex};
6
7use crate::helpers::merge_search_index_options;
8use crate::manifest::write_collection_manifest_metadata;
9use crate::schema::normalize_typed_collection_item;
10use crate::{
11    create_empty_collection_state, default_search_prefix, load_collection_state,
12    CollectionDefinition, CollectionEntryContext, CollectionError, CollectionOptions,
13    CollectionState, CollectionWriteContext, COLLECTION_MANIFEST_METADATA_FILE, MANIFEST_BY_ID,
14};
15
16pub struct CollectionWriter<S: Store, T> {
17    tree: HashTree<S>,
18    index: BTree<S>,
19    search_indexes: BTreeMap<String, SearchIndex<S>>,
20    definition: CollectionDefinition<T>,
21    state: CollectionState,
22}
23
24impl<S: Store, T> CollectionWriter<S, T> {
25    pub fn new(store: Arc<S>, definition: CollectionDefinition<T>) -> Self {
26        Self::with_options(store, definition, CollectionOptions::default())
27    }
28
29    pub fn with_options(
30        store: Arc<S>,
31        definition: CollectionDefinition<T>,
32        options: CollectionOptions,
33    ) -> Self {
34        let state = create_empty_collection_state(&definition);
35        Self::with_state_and_options(store, definition, state, options)
36    }
37
38    pub fn with_state(
39        store: Arc<S>,
40        definition: CollectionDefinition<T>,
41        state: CollectionState,
42    ) -> Self {
43        Self::with_state_and_options(store, definition, state, CollectionOptions::default())
44    }
45
46    pub fn with_state_and_options(
47        store: Arc<S>,
48        definition: CollectionDefinition<T>,
49        state: CollectionState,
50        options: CollectionOptions,
51    ) -> Self {
52        let search_indexes = definition
53            .search_indexes()
54            .iter()
55            .map(|index| {
56                (
57                    index.name().to_string(),
58                    SearchIndex::new(
59                        Arc::clone(&store),
60                        merge_search_index_options(index.options().clone(), &options),
61                    ),
62                )
63            })
64            .collect();
65
66        Self {
67            tree: HashTree::new(HashTreeConfig::new(Arc::clone(&store))),
68            index: BTree::new(
69                store,
70                BTreeOptions {
71                    order: options.btree_order,
72                },
73            ),
74            search_indexes,
75            definition,
76            state,
77        }
78    }
79
80    pub async fn from_root(
81        store: Arc<S>,
82        definition: CollectionDefinition<T>,
83        root: Option<&Cid>,
84        options: CollectionOptions,
85    ) -> Result<Self, CollectionError> {
86        let state = load_collection_state(Arc::clone(&store), &definition, root).await?;
87        Ok(Self::with_state_and_options(
88            store, definition, state, options,
89        ))
90    }
91
92    pub fn snapshot(&self) -> CollectionState {
93        self.state.clone()
94    }
95
96    pub fn state(&self) -> &CollectionState {
97        &self.state
98    }
99
100    pub async fn write_root(&self) -> Result<Option<Cid>, CollectionError> {
101        let mut entries = Vec::new();
102        if let Some(cid) = self.state.by_id_root.as_ref() {
103            entries.push(DirEntry::from_cid(MANIFEST_BY_ID, cid).with_link_type(LinkType::Dir));
104        }
105        for index in self.definition.key_indexes() {
106            if let Some(cid) = self.state.key_root(index.name()) {
107                entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
108            }
109        }
110        for index in self.definition.search_indexes() {
111            if let Some(cid) = self.state.search_root(index.name()) {
112                entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
113            }
114        }
115        if let Some((cid, size)) =
116            write_collection_manifest_metadata(&self.tree, &self.definition).await?
117        {
118            entries.push(
119                DirEntry::from_cid(COLLECTION_MANIFEST_METADATA_FILE, &cid)
120                    .with_size(size)
121                    .with_link_type(LinkType::File),
122            );
123        }
124
125        if entries.is_empty() {
126            return Ok(None);
127        }
128
129        Ok(Some(self.tree.put_directory(entries).await?))
130    }
131
132    fn read_search_root_group(&self, root_name: &str) -> Option<Cid> {
133        for index in self.definition.search_indexes() {
134            if index.root_name().unwrap_or(index.name()) == root_name {
135                return self.state.search_root(index.name()).cloned();
136            }
137        }
138        None
139    }
140
141    fn assign_search_root_groups(&mut self, groups: &BTreeMap<String, Option<Cid>>) {
142        for index in self.definition.search_indexes() {
143            let root_name = index.root_name().unwrap_or(index.name());
144            if let Some(root) = groups.get(root_name) {
145                self.state
146                    .search_roots
147                    .insert(index.name().to_string(), root.clone());
148            }
149        }
150    }
151}
152
153impl<S: Store, T: Clone> CollectionWriter<S, T> {
154    pub fn normalize(&self, item: &T) -> Result<T, CollectionError> {
155        normalize_typed_collection_item(&self.definition, item)
156    }
157
158    pub async fn put(
159        &mut self,
160        item: &T,
161        cid: &Cid,
162        previous: Option<&T>,
163    ) -> Result<CollectionState, CollectionError> {
164        self.put_with_context(item, cid, previous, None, None).await
165    }
166
167    pub async fn put_with_context(
168        &mut self,
169        item: &T,
170        cid: &Cid,
171        previous: Option<&T>,
172        context: Option<&CollectionWriteContext>,
173        previous_context: Option<&CollectionWriteContext>,
174    ) -> Result<CollectionState, CollectionError> {
175        let next_item = self.normalize(item)?;
176        let previous_item = previous
177            .map(|previous| self.normalize(previous))
178            .transpose()?;
179
180        if let Some(previous_item) = previous_item.as_ref() {
181            self.delete_normalized_with_context(previous_item, previous_context.or(context))
182                .await?;
183        }
184
185        self.put_normalized_with_context(&next_item, cid, context)
186            .await
187    }
188
189    pub async fn delete(&mut self, item: &T) -> Result<CollectionState, CollectionError> {
190        self.delete_with_context(item, None).await
191    }
192
193    pub async fn delete_with_context(
194        &mut self,
195        item: &T,
196        context: Option<&CollectionWriteContext>,
197    ) -> Result<CollectionState, CollectionError> {
198        let next_item = self.normalize(item)?;
199        self.delete_normalized_with_context(&next_item, context)
200            .await
201    }
202
203    pub async fn rebuild<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
204    where
205        I: IntoIterator<Item = (T, Cid)>,
206    {
207        let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
208        for (item, cid) in entries {
209            let next_item = self.normalize(&item)?;
210            let id = self.definition.item_id(&next_item)?;
211            final_entries.insert(id, (next_item, cid, None));
212        }
213
214        self.rebuild_from_final_entries(final_entries).await
215    }
216
217    pub async fn reindex<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
218    where
219        I: IntoIterator<Item = (T, Cid)>,
220    {
221        self.rebuild(entries).await
222    }
223
224    pub async fn rebuild_with_context<I>(
225        &mut self,
226        entries: I,
227    ) -> Result<CollectionState, CollectionError>
228    where
229        I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
230    {
231        let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
232        for (item, cid, context) in entries {
233            let next_item = self.normalize(&item)?;
234            let id = self.definition.item_id(&next_item)?;
235            final_entries.insert(id, (next_item, cid, context));
236        }
237
238        self.rebuild_from_final_entries(final_entries).await
239    }
240
241    pub async fn reindex_with_context<I>(
242        &mut self,
243        entries: I,
244    ) -> Result<CollectionState, CollectionError>
245    where
246        I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
247    {
248        self.rebuild_with_context(entries).await
249    }
250
251    async fn put_normalized_with_context(
252        &mut self,
253        item: &T,
254        cid: &Cid,
255        context: Option<&CollectionWriteContext>,
256    ) -> Result<CollectionState, CollectionError> {
257        let id = self.definition.item_id(item)?;
258        self.state.by_id_root = Some(
259            self.index
260                .insert_link(self.state.by_id_root.as_ref(), &id, cid)
261                .await?,
262        );
263
264        for index in self.definition.key_indexes() {
265            let mut root = self.state.key_root(index.name()).cloned();
266            for key in index.materialize_keys(item) {
267                root = Some(self.index.insert_link(root.as_ref(), &key, cid).await?);
268            }
269            self.state.key_roots.insert(index.name().to_string(), root);
270        }
271
272        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
273        let entry_context = CollectionEntryContext {
274            id: &id,
275            cid: Some(cid),
276            write_context: context,
277        };
278        for index in self.definition.search_indexes() {
279            let Some(search_index) = self.search_indexes.get(index.name()) else {
280                continue;
281            };
282
283            let root_name = index.root_name().unwrap_or(index.name()).to_string();
284            let mut root = search_root_groups
285                .get(&root_name)
286                .cloned()
287                .unwrap_or_else(|| self.read_search_root_group(&root_name));
288
289            for entry in index.materialize_entries(item, &entry_context) {
290                let terms = search_index.parse_keywords(&entry.text);
291                if terms.is_empty() {
292                    continue;
293                }
294                let entry_id = entry.id.as_deref().unwrap_or(&id);
295                let Some(entry_cid) = entry.cid.as_ref().or(Some(cid)) else {
296                    continue;
297                };
298                let default_prefix = default_search_prefix(index.name());
299                root = Some(
300                    search_index
301                        .index_link(
302                            root.as_ref(),
303                            entry
304                                .prefix
305                                .as_deref()
306                                .or_else(|| index.prefix())
307                                .unwrap_or(&default_prefix),
308                            &terms,
309                            entry_id,
310                            entry_cid,
311                        )
312                        .await?,
313                );
314            }
315
316            search_root_groups.insert(root_name, root);
317        }
318
319        if !search_root_groups.is_empty() {
320            self.assign_search_root_groups(&search_root_groups);
321        }
322
323        Ok(self.snapshot())
324    }
325
326    async fn delete_normalized_with_context(
327        &mut self,
328        item: &T,
329        context: Option<&CollectionWriteContext>,
330    ) -> Result<CollectionState, CollectionError> {
331        let id = self.definition.item_id(item)?;
332        if let Some(root) = self.state.by_id_root.as_ref() {
333            self.state.by_id_root = self.index.delete(root, &id).await?;
334        }
335
336        for index in self.definition.key_indexes() {
337            let mut root = self.state.key_root(index.name()).cloned();
338            for key in index.materialize_keys(item) {
339                let Some(active_root) = root.as_ref() else {
340                    break;
341                };
342                root = self.index.delete(active_root, &key).await?;
343            }
344            self.state.key_roots.insert(index.name().to_string(), root);
345        }
346
347        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
348        let entry_context = CollectionEntryContext {
349            id: &id,
350            cid: None,
351            write_context: context,
352        };
353        for index in self.definition.search_indexes() {
354            let Some(search_index) = self.search_indexes.get(index.name()) else {
355                continue;
356            };
357
358            let root_name = index.root_name().unwrap_or(index.name()).to_string();
359            let mut root = search_root_groups
360                .get(&root_name)
361                .cloned()
362                .unwrap_or_else(|| self.read_search_root_group(&root_name));
363            let Some(existing_root) = root.clone() else {
364                continue;
365            };
366            root = Some(existing_root);
367
368            for entry in index.materialize_entries(item, &entry_context) {
369                let terms = search_index.parse_keywords(&entry.text);
370                if terms.is_empty() {
371                    continue;
372                }
373                let entry_id = entry.id.as_deref().unwrap_or(&id);
374                let Some(active_root) = root.as_ref() else {
375                    break;
376                };
377                let default_prefix = default_search_prefix(index.name());
378                root = search_index
379                    .remove_link(
380                        active_root,
381                        entry
382                            .prefix
383                            .as_deref()
384                            .or_else(|| index.prefix())
385                            .unwrap_or(&default_prefix),
386                        &terms,
387                        entry_id,
388                    )
389                    .await?;
390            }
391
392            search_root_groups.insert(root_name, root);
393        }
394
395        if !search_root_groups.is_empty() {
396            self.assign_search_root_groups(&search_root_groups);
397        } else {
398            for index in self.definition.search_indexes() {
399                let root_name = index.root_name().unwrap_or(index.name()).to_string();
400                self.state.search_roots.insert(
401                    index.name().to_string(),
402                    self.read_search_root_group(&root_name),
403                );
404            }
405        }
406
407        Ok(self.snapshot())
408    }
409
410    async fn rebuild_without_search(
411        &mut self,
412        final_entries: BTreeMap<String, (T, Cid, Option<CollectionWriteContext>)>,
413    ) -> Result<CollectionState, CollectionError> {
414        let mut by_id = BTreeMap::<String, Cid>::new();
415        let mut key_roots = self
416            .definition
417            .key_indexes()
418            .iter()
419            .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
420            .collect::<BTreeMap<_, _>>();
421
422        for (id, (item, cid, _context)) in final_entries {
423            by_id.insert(id, cid.clone());
424            for index in self.definition.key_indexes() {
425                let root = key_roots
426                    .get_mut(index.name())
427                    .expect("collection key root must exist");
428                for key in index.materialize_keys(&item) {
429                    root.insert(key, cid.clone());
430                }
431            }
432        }
433
434        self.state = create_empty_collection_state(&self.definition);
435        self.state.by_id_root = self.index.build_links(by_id).await?;
436        for index in self.definition.key_indexes() {
437            let root = self
438                .index
439                .build_links(key_roots.remove(index.name()).unwrap_or_default())
440                .await?;
441            self.state.key_roots.insert(index.name().to_string(), root);
442        }
443
444        Ok(self.snapshot())
445    }
446
447    async fn rebuild_from_final_entries(
448        &mut self,
449        final_entries: BTreeMap<String, (T, Cid, Option<CollectionWriteContext>)>,
450    ) -> Result<CollectionState, CollectionError> {
451        if self.definition.search_indexes().is_empty() {
452            return self.rebuild_without_search(final_entries).await;
453        }
454
455        let mut by_id = BTreeMap::<String, Cid>::new();
456        let mut key_roots = self
457            .definition
458            .key_indexes()
459            .iter()
460            .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
461            .collect::<BTreeMap<_, _>>();
462        let mut search_root_entries = BTreeMap::<String, BTreeMap<String, Cid>>::new();
463
464        for (id, (item, cid, context)) in final_entries {
465            by_id.insert(id.clone(), cid.clone());
466
467            for index in self.definition.key_indexes() {
468                let root = key_roots
469                    .get_mut(index.name())
470                    .expect("collection key root must exist");
471                for key in index.materialize_keys(&item) {
472                    root.insert(key, cid.clone());
473                }
474            }
475
476            let entry_context = CollectionEntryContext {
477                id: &id,
478                cid: Some(&cid),
479                write_context: context.as_ref(),
480            };
481
482            for index in self.definition.search_indexes() {
483                let Some(search_index) = self.search_indexes.get(index.name()) else {
484                    continue;
485                };
486
487                let root_name = index.root_name().unwrap_or(index.name()).to_string();
488                let root_entries = search_root_entries.entry(root_name).or_default();
489
490                for entry in index.materialize_entries(&item, &entry_context) {
491                    let terms = search_index.parse_keywords(&entry.text);
492                    if terms.is_empty() {
493                        continue;
494                    }
495
496                    let entry_id = entry.id.as_deref().unwrap_or(&id);
497                    let Some(entry_cid) = entry.cid.as_ref().or(Some(&cid)) else {
498                        continue;
499                    };
500                    let default_prefix = default_search_prefix(index.name());
501                    let prefix = entry
502                        .prefix
503                        .as_deref()
504                        .or_else(|| index.prefix())
505                        .unwrap_or(&default_prefix);
506
507                    for term in terms {
508                        root_entries.insert(format!("{prefix}{term}:{entry_id}"), entry_cid.clone());
509                    }
510                }
511            }
512        }
513
514        self.state = create_empty_collection_state(&self.definition);
515        self.state.by_id_root = self.index.build_links(by_id).await?;
516        for index in self.definition.key_indexes() {
517            let root = self
518                .index
519                .build_links(key_roots.remove(index.name()).unwrap_or_default())
520                .await?;
521            self.state.key_roots.insert(index.name().to_string(), root);
522        }
523
524        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
525        for (root_name, root_entries) in search_root_entries {
526            let Some(search_index) = self.definition.search_indexes().iter().find_map(|index| {
527                ((index.root_name().unwrap_or(index.name())) == root_name)
528                    .then(|| self.search_indexes.get(index.name()))
529                    .flatten()
530            }) else {
531                continue;
532            };
533
534            search_root_groups.insert(root_name, search_index.build_links(root_entries).await?);
535        }
536
537        if !search_root_groups.is_empty() {
538            self.assign_search_root_groups(&search_root_groups);
539        }
540
541        Ok(self.snapshot())
542    }
543}