Skip to main content

hashtree_collection/
writer.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use hashtree_core::{Cid, DirEntry, HashTree, HashTreeConfig, LinkType, Store};
5use hashtree_index::{BTree, BTreeOptions, SearchIndex};
6
7use crate::helpers::merge_search_index_options;
8use crate::manifest::write_collection_manifest_metadata;
9use crate::schema::normalize_typed_collection_item;
10use crate::{
11    create_empty_collection_state, default_search_prefix, load_collection_state,
12    CollectionDefinition, CollectionEntryContext, CollectionError, CollectionOptions,
13    CollectionState, CollectionWriteContext, COLLECTION_MANIFEST_METADATA_FILE, MANIFEST_BY_ID,
14};
15
16pub struct CollectionWriter<S: Store, T> {
17    tree: HashTree<S>,
18    index: BTree<S>,
19    search_indexes: BTreeMap<String, SearchIndex<S>>,
20    definition: CollectionDefinition<T>,
21    state: CollectionState,
22}
23
24impl<S: Store, T> CollectionWriter<S, T> {
25    pub fn new(store: Arc<S>, definition: CollectionDefinition<T>) -> Self {
26        Self::with_options(store, definition, CollectionOptions::default())
27    }
28
29    pub fn with_options(
30        store: Arc<S>,
31        definition: CollectionDefinition<T>,
32        options: CollectionOptions,
33    ) -> Self {
34        let state = create_empty_collection_state(&definition);
35        Self::with_state_and_options(store, definition, state, options)
36    }
37
38    pub fn with_state(
39        store: Arc<S>,
40        definition: CollectionDefinition<T>,
41        state: CollectionState,
42    ) -> Self {
43        Self::with_state_and_options(store, definition, state, CollectionOptions::default())
44    }
45
46    pub fn with_state_and_options(
47        store: Arc<S>,
48        definition: CollectionDefinition<T>,
49        state: CollectionState,
50        options: CollectionOptions,
51    ) -> Self {
52        let search_indexes = definition
53            .search_indexes()
54            .iter()
55            .map(|index| {
56                (
57                    index.name().to_string(),
58                    SearchIndex::new(
59                        Arc::clone(&store),
60                        merge_search_index_options(index.options().clone(), &options),
61                    ),
62                )
63            })
64            .collect();
65
66        Self {
67            tree: HashTree::new(HashTreeConfig::new(Arc::clone(&store))),
68            index: BTree::new(
69                store,
70                BTreeOptions {
71                    order: options.btree_order,
72                },
73            ),
74            search_indexes,
75            definition,
76            state,
77        }
78    }
79
80    pub async fn from_root(
81        store: Arc<S>,
82        definition: CollectionDefinition<T>,
83        root: Option<&Cid>,
84        options: CollectionOptions,
85    ) -> Result<Self, CollectionError> {
86        let state = load_collection_state(Arc::clone(&store), &definition, root).await?;
87        Ok(Self::with_state_and_options(
88            store, definition, state, options,
89        ))
90    }
91
92    pub fn snapshot(&self) -> CollectionState {
93        self.state.clone()
94    }
95
96    pub fn state(&self) -> &CollectionState {
97        &self.state
98    }
99
100    pub async fn write_root(&self) -> Result<Option<Cid>, CollectionError> {
101        let mut entries = Vec::new();
102        if let Some(cid) = self.state.by_id_root.as_ref() {
103            entries.push(DirEntry::from_cid(MANIFEST_BY_ID, cid).with_link_type(LinkType::Dir));
104        }
105        for index in self.definition.key_indexes() {
106            if let Some(cid) = self.state.key_root(index.name()) {
107                entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
108            }
109        }
110        for index in self.definition.search_indexes() {
111            if let Some(cid) = self.state.search_root(index.name()) {
112                entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
113            }
114        }
115        if let Some((cid, size)) =
116            write_collection_manifest_metadata(&self.tree, &self.definition).await?
117        {
118            entries.push(
119                DirEntry::from_cid(COLLECTION_MANIFEST_METADATA_FILE, &cid)
120                    .with_size(size)
121                    .with_link_type(LinkType::File),
122            );
123        }
124
125        if entries.is_empty() {
126            return Ok(None);
127        }
128
129        Ok(Some(self.tree.put_directory(entries).await?))
130    }
131
132    fn read_search_root_group(&self, root_name: &str) -> Option<Cid> {
133        for index in self.definition.search_indexes() {
134            if index.root_name().unwrap_or(index.name()) == root_name {
135                return self.state.search_root(index.name()).cloned();
136            }
137        }
138        None
139    }
140
141    fn assign_search_root_groups(&mut self, groups: &BTreeMap<String, Option<Cid>>) {
142        for index in self.definition.search_indexes() {
143            let root_name = index.root_name().unwrap_or(index.name());
144            if let Some(root) = groups.get(root_name) {
145                self.state
146                    .search_roots
147                    .insert(index.name().to_string(), root.clone());
148            }
149        }
150    }
151
152    fn has_derived_indexes(&self) -> bool {
153        !self.definition.key_indexes().is_empty() || !self.definition.search_indexes().is_empty()
154    }
155}
156
157impl<S: Store, T: Clone> CollectionWriter<S, T> {
158    pub fn normalize(&self, item: &T) -> Result<T, CollectionError> {
159        normalize_typed_collection_item(&self.definition, item)
160    }
161
162    pub async fn put(
163        &mut self,
164        item: &T,
165        cid: &Cid,
166        previous: Option<&T>,
167    ) -> Result<CollectionState, CollectionError> {
168        self.put_with_context(item, cid, previous, None, None).await
169    }
170
171    pub async fn replace(
172        &mut self,
173        item: &T,
174        cid: &Cid,
175        previous: &T,
176    ) -> Result<CollectionState, CollectionError> {
177        self.replace_with_context(item, cid, previous, None, None).await
178    }
179
180    pub async fn put_with_context(
181        &mut self,
182        item: &T,
183        cid: &Cid,
184        previous: Option<&T>,
185        context: Option<&CollectionWriteContext>,
186        previous_context: Option<&CollectionWriteContext>,
187    ) -> Result<CollectionState, CollectionError> {
188        let next_item = self.normalize(item)?;
189        let id = self.definition.item_id(&next_item)?;
190        let previous_item = previous
191            .map(|previous| self.normalize(previous))
192            .transpose()?;
193
194        if previous_item.is_none() && self.has_derived_indexes() {
195            let existing = if let Some(root) = self.state.by_id_root.as_ref() {
196                self.index.get_link(Some(root), &id).await?
197            } else {
198                None
199            };
200            if existing.is_some() {
201                return Err(CollectionError::MissingPreviousForOverwrite { id });
202            }
203        }
204
205        if let Some(previous_item) = previous_item.as_ref() {
206            self.delete_normalized_with_context(previous_item, previous_context.or(context))
207                .await?;
208        }
209
210        self.put_normalized_with_context(&next_item, cid, context)
211            .await
212    }
213
214    pub async fn replace_with_context(
215        &mut self,
216        item: &T,
217        cid: &Cid,
218        previous: &T,
219        context: Option<&CollectionWriteContext>,
220        previous_context: Option<&CollectionWriteContext>,
221    ) -> Result<CollectionState, CollectionError> {
222        self.put_with_context(item, cid, Some(previous), context, previous_context)
223            .await
224    }
225
226    pub async fn delete(&mut self, item: &T) -> Result<CollectionState, CollectionError> {
227        self.delete_with_context(item, None).await
228    }
229
230    pub async fn delete_with_context(
231        &mut self,
232        item: &T,
233        context: Option<&CollectionWriteContext>,
234    ) -> Result<CollectionState, CollectionError> {
235        let next_item = self.normalize(item)?;
236        self.delete_normalized_with_context(&next_item, context)
237            .await
238    }
239
240    pub async fn rebuild<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
241    where
242        I: IntoIterator<Item = (T, Cid)>,
243    {
244        let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
245        for (item, cid) in entries {
246            let next_item = self.normalize(&item)?;
247            let id = self.definition.item_id(&next_item)?;
248            final_entries.insert(id, (next_item, cid, None));
249        }
250
251        self.rebuild_from_final_entries(final_entries).await
252    }
253
254    pub async fn reindex<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
255    where
256        I: IntoIterator<Item = (T, Cid)>,
257    {
258        self.rebuild(entries).await
259    }
260
261    pub async fn rebuild_with_context<I>(
262        &mut self,
263        entries: I,
264    ) -> Result<CollectionState, CollectionError>
265    where
266        I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
267    {
268        let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
269        for (item, cid, context) in entries {
270            let next_item = self.normalize(&item)?;
271            let id = self.definition.item_id(&next_item)?;
272            final_entries.insert(id, (next_item, cid, context));
273        }
274
275        self.rebuild_from_final_entries(final_entries).await
276    }
277
278    pub async fn reindex_with_context<I>(
279        &mut self,
280        entries: I,
281    ) -> Result<CollectionState, CollectionError>
282    where
283        I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
284    {
285        self.rebuild_with_context(entries).await
286    }
287
288    async fn put_normalized_with_context(
289        &mut self,
290        item: &T,
291        cid: &Cid,
292        context: Option<&CollectionWriteContext>,
293    ) -> Result<CollectionState, CollectionError> {
294        let id = self.definition.item_id(item)?;
295        self.state.by_id_root = Some(
296            self.index
297                .insert_link(self.state.by_id_root.as_ref(), &id, cid)
298                .await?,
299        );
300
301        for index in self.definition.key_indexes() {
302            let mut root = self.state.key_root(index.name()).cloned();
303            for key in index.materialize_keys(item) {
304                root = Some(self.index.insert_link(root.as_ref(), &key, cid).await?);
305            }
306            self.state.key_roots.insert(index.name().to_string(), root);
307        }
308
309        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
310        let entry_context = CollectionEntryContext {
311            id: &id,
312            cid: Some(cid),
313            write_context: context,
314        };
315        for index in self.definition.search_indexes() {
316            let Some(search_index) = self.search_indexes.get(index.name()) else {
317                continue;
318            };
319
320            let root_name = index.root_name().unwrap_or(index.name()).to_string();
321            let mut root = search_root_groups
322                .get(&root_name)
323                .cloned()
324                .unwrap_or_else(|| self.read_search_root_group(&root_name));
325
326            for entry in index.materialize_entries(item, &entry_context) {
327                let terms = search_index.parse_keywords(&entry.text);
328                if terms.is_empty() {
329                    continue;
330                }
331                let entry_id = entry.id.as_deref().unwrap_or(&id);
332                let Some(entry_cid) = entry.cid.as_ref().or(Some(cid)) else {
333                    continue;
334                };
335                let default_prefix = default_search_prefix(index.name());
336                root = Some(
337                    search_index
338                        .index_link(
339                            root.as_ref(),
340                            entry
341                                .prefix
342                                .as_deref()
343                                .or_else(|| index.prefix())
344                                .unwrap_or(&default_prefix),
345                            &terms,
346                            entry_id,
347                            entry_cid,
348                        )
349                        .await?,
350                );
351            }
352
353            search_root_groups.insert(root_name, root);
354        }
355
356        if !search_root_groups.is_empty() {
357            self.assign_search_root_groups(&search_root_groups);
358        }
359
360        Ok(self.snapshot())
361    }
362
363    async fn delete_normalized_with_context(
364        &mut self,
365        item: &T,
366        context: Option<&CollectionWriteContext>,
367    ) -> Result<CollectionState, CollectionError> {
368        let id = self.definition.item_id(item)?;
369        if let Some(root) = self.state.by_id_root.as_ref() {
370            self.state.by_id_root = self.index.delete(root, &id).await?;
371        }
372
373        for index in self.definition.key_indexes() {
374            let mut root = self.state.key_root(index.name()).cloned();
375            for key in index.materialize_keys(item) {
376                let Some(active_root) = root.as_ref() else {
377                    break;
378                };
379                root = self.index.delete(active_root, &key).await?;
380            }
381            self.state.key_roots.insert(index.name().to_string(), root);
382        }
383
384        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
385        let entry_context = CollectionEntryContext {
386            id: &id,
387            cid: None,
388            write_context: context,
389        };
390        for index in self.definition.search_indexes() {
391            let Some(search_index) = self.search_indexes.get(index.name()) else {
392                continue;
393            };
394
395            let root_name = index.root_name().unwrap_or(index.name()).to_string();
396            let mut root = search_root_groups
397                .get(&root_name)
398                .cloned()
399                .unwrap_or_else(|| self.read_search_root_group(&root_name));
400            let Some(existing_root) = root.clone() else {
401                continue;
402            };
403            root = Some(existing_root);
404
405            for entry in index.materialize_entries(item, &entry_context) {
406                let terms = search_index.parse_keywords(&entry.text);
407                if terms.is_empty() {
408                    continue;
409                }
410                let entry_id = entry.id.as_deref().unwrap_or(&id);
411                let Some(active_root) = root.as_ref() else {
412                    break;
413                };
414                let default_prefix = default_search_prefix(index.name());
415                root = search_index
416                    .remove_link(
417                        active_root,
418                        entry
419                            .prefix
420                            .as_deref()
421                            .or_else(|| index.prefix())
422                            .unwrap_or(&default_prefix),
423                        &terms,
424                        entry_id,
425                    )
426                    .await?;
427            }
428
429            search_root_groups.insert(root_name, root);
430        }
431
432        if !search_root_groups.is_empty() {
433            self.assign_search_root_groups(&search_root_groups);
434        } else {
435            for index in self.definition.search_indexes() {
436                let root_name = index.root_name().unwrap_or(index.name()).to_string();
437                self.state.search_roots.insert(
438                    index.name().to_string(),
439                    self.read_search_root_group(&root_name),
440                );
441            }
442        }
443
444        Ok(self.snapshot())
445    }
446
447    async fn rebuild_without_search(
448        &mut self,
449        final_entries: BTreeMap<String, (T, Cid, Option<CollectionWriteContext>)>,
450    ) -> Result<CollectionState, CollectionError> {
451        let mut by_id = BTreeMap::<String, Cid>::new();
452        let mut key_roots = self
453            .definition
454            .key_indexes()
455            .iter()
456            .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
457            .collect::<BTreeMap<_, _>>();
458
459        for (id, (item, cid, _context)) in final_entries {
460            by_id.insert(id, cid.clone());
461            for index in self.definition.key_indexes() {
462                let root = key_roots
463                    .get_mut(index.name())
464                    .expect("collection key root must exist");
465                for key in index.materialize_keys(&item) {
466                    root.insert(key, cid.clone());
467                }
468            }
469        }
470
471        self.state = create_empty_collection_state(&self.definition);
472        self.state.by_id_root = self.index.build_links(by_id).await?;
473        for index in self.definition.key_indexes() {
474            let root = self
475                .index
476                .build_links(key_roots.remove(index.name()).unwrap_or_default())
477                .await?;
478            self.state.key_roots.insert(index.name().to_string(), root);
479        }
480
481        Ok(self.snapshot())
482    }
483
484    async fn rebuild_from_final_entries(
485        &mut self,
486        final_entries: BTreeMap<String, (T, Cid, Option<CollectionWriteContext>)>,
487    ) -> Result<CollectionState, CollectionError> {
488        if self.definition.search_indexes().is_empty() {
489            return self.rebuild_without_search(final_entries).await;
490        }
491
492        let mut by_id = BTreeMap::<String, Cid>::new();
493        let mut key_roots = self
494            .definition
495            .key_indexes()
496            .iter()
497            .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
498            .collect::<BTreeMap<_, _>>();
499        let mut search_root_entries = BTreeMap::<String, BTreeMap<String, Cid>>::new();
500
501        for (id, (item, cid, context)) in final_entries {
502            by_id.insert(id.clone(), cid.clone());
503
504            for index in self.definition.key_indexes() {
505                let root = key_roots
506                    .get_mut(index.name())
507                    .expect("collection key root must exist");
508                for key in index.materialize_keys(&item) {
509                    root.insert(key, cid.clone());
510                }
511            }
512
513            let entry_context = CollectionEntryContext {
514                id: &id,
515                cid: Some(&cid),
516                write_context: context.as_ref(),
517            };
518
519            for index in self.definition.search_indexes() {
520                let Some(search_index) = self.search_indexes.get(index.name()) else {
521                    continue;
522                };
523
524                let root_name = index.root_name().unwrap_or(index.name()).to_string();
525                let root_entries = search_root_entries.entry(root_name).or_default();
526
527                for entry in index.materialize_entries(&item, &entry_context) {
528                    let terms = search_index.parse_keywords(&entry.text);
529                    if terms.is_empty() {
530                        continue;
531                    }
532
533                    let entry_id = entry.id.as_deref().unwrap_or(&id);
534                    let Some(entry_cid) = entry.cid.as_ref().or(Some(&cid)) else {
535                        continue;
536                    };
537                    let default_prefix = default_search_prefix(index.name());
538                    let prefix = entry
539                        .prefix
540                        .as_deref()
541                        .or_else(|| index.prefix())
542                        .unwrap_or(&default_prefix);
543
544                    for term in terms {
545                        root_entries
546                            .insert(format!("{prefix}{term}:{entry_id}"), entry_cid.clone());
547                    }
548                }
549            }
550        }
551
552        self.state = create_empty_collection_state(&self.definition);
553        self.state.by_id_root = self.index.build_links(by_id).await?;
554        for index in self.definition.key_indexes() {
555            let root = self
556                .index
557                .build_links(key_roots.remove(index.name()).unwrap_or_default())
558                .await?;
559            self.state.key_roots.insert(index.name().to_string(), root);
560        }
561
562        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
563        for (root_name, root_entries) in search_root_entries {
564            let Some(search_index) = self.definition.search_indexes().iter().find_map(|index| {
565                ((index.root_name().unwrap_or(index.name())) == root_name)
566                    .then(|| self.search_indexes.get(index.name()))
567                    .flatten()
568            }) else {
569                continue;
570            };
571
572            search_root_groups.insert(root_name, search_index.build_links(root_entries).await?);
573        }
574
575        if !search_root_groups.is_empty() {
576            self.assign_search_root_groups(&search_root_groups);
577        }
578
579        Ok(self.snapshot())
580    }
581}