Skip to main content

hashtree_collection/
writer.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use hashtree_core::{Cid, DirEntry, HashTree, HashTreeConfig, LinkType, Store};
5use hashtree_index::{BTree, BTreeOptions, SearchIndex};
6
7use crate::helpers::merge_search_index_options;
8use crate::manifest::write_collection_manifest_metadata;
9use crate::schema::normalize_typed_collection_item;
10use crate::{
11    create_empty_collection_state, default_search_prefix, load_collection_state,
12    CollectionDefinition, CollectionEntryContext, CollectionError, CollectionOptions,
13    CollectionState, CollectionWriteContext, COLLECTION_MANIFEST_METADATA_FILE, MANIFEST_BY_ID,
14};
15
16pub struct CollectionWriter<S: Store, T> {
17    tree: HashTree<S>,
18    index: BTree<S>,
19    search_indexes: BTreeMap<String, SearchIndex<S>>,
20    definition: CollectionDefinition<T>,
21    state: CollectionState,
22}
23
24impl<S: Store, T> CollectionWriter<S, T> {
25    pub fn new(store: Arc<S>, definition: CollectionDefinition<T>) -> Self {
26        Self::with_options(store, definition, CollectionOptions::default())
27    }
28
29    pub fn with_options(
30        store: Arc<S>,
31        definition: CollectionDefinition<T>,
32        options: CollectionOptions,
33    ) -> Self {
34        let state = create_empty_collection_state(&definition);
35        Self::with_state_and_options(store, definition, state, options)
36    }
37
38    pub fn with_state(
39        store: Arc<S>,
40        definition: CollectionDefinition<T>,
41        state: CollectionState,
42    ) -> Self {
43        Self::with_state_and_options(store, definition, state, CollectionOptions::default())
44    }
45
46    pub fn with_state_and_options(
47        store: Arc<S>,
48        definition: CollectionDefinition<T>,
49        state: CollectionState,
50        options: CollectionOptions,
51    ) -> Self {
52        let search_indexes = definition
53            .search_indexes()
54            .iter()
55            .map(|index| {
56                (
57                    index.name().to_string(),
58                    SearchIndex::new(
59                        Arc::clone(&store),
60                        merge_search_index_options(index.options().clone(), &options),
61                    ),
62                )
63            })
64            .collect();
65
66        Self {
67            tree: HashTree::new(HashTreeConfig::new(Arc::clone(&store))),
68            index: BTree::new(
69                store,
70                BTreeOptions {
71                    order: options.btree_order,
72                },
73            ),
74            search_indexes,
75            definition,
76            state,
77        }
78    }
79
80    pub async fn from_root(
81        store: Arc<S>,
82        definition: CollectionDefinition<T>,
83        root: Option<&Cid>,
84        options: CollectionOptions,
85    ) -> Result<Self, CollectionError> {
86        let state = load_collection_state(Arc::clone(&store), &definition, root).await?;
87        Ok(Self::with_state_and_options(
88            store, definition, state, options,
89        ))
90    }
91
92    pub fn snapshot(&self) -> CollectionState {
93        self.state.clone()
94    }
95
96    pub fn state(&self) -> &CollectionState {
97        &self.state
98    }
99
100    pub async fn write_root(&self) -> Result<Option<Cid>, CollectionError> {
101        let mut entries = Vec::new();
102        if let Some(cid) = self.state.by_id_root.as_ref() {
103            entries.push(DirEntry::from_cid(MANIFEST_BY_ID, cid).with_link_type(LinkType::Dir));
104        }
105        for index in self.definition.key_indexes() {
106            if let Some(cid) = self.state.key_root(index.name()) {
107                entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
108            }
109        }
110        for index in self.definition.search_indexes() {
111            if let Some(cid) = self.state.search_root(index.name()) {
112                entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
113            }
114        }
115        if let Some((cid, size)) =
116            write_collection_manifest_metadata(&self.tree, &self.definition).await?
117        {
118            entries.push(
119                DirEntry::from_cid(COLLECTION_MANIFEST_METADATA_FILE, &cid)
120                    .with_size(size)
121                    .with_link_type(LinkType::File),
122            );
123        }
124
125        if entries.is_empty() {
126            return Ok(None);
127        }
128
129        Ok(Some(self.tree.put_directory(entries).await?))
130    }
131
132    fn read_search_root_group(&self, root_name: &str) -> Option<Cid> {
133        for index in self.definition.search_indexes() {
134            if index.root_name().unwrap_or(index.name()) == root_name {
135                return self.state.search_root(index.name()).cloned();
136            }
137        }
138        None
139    }
140
141    fn assign_search_root_groups(&mut self, groups: &BTreeMap<String, Option<Cid>>) {
142        for index in self.definition.search_indexes() {
143            let root_name = index.root_name().unwrap_or(index.name());
144            if let Some(root) = groups.get(root_name) {
145                self.state
146                    .search_roots
147                    .insert(index.name().to_string(), root.clone());
148            }
149        }
150    }
151
152    fn has_derived_indexes(&self) -> bool {
153        !self.definition.key_indexes().is_empty() || !self.definition.search_indexes().is_empty()
154    }
155}
156
157impl<S: Store, T: Clone> CollectionWriter<S, T> {
158    pub fn normalize(&self, item: &T) -> Result<T, CollectionError> {
159        normalize_typed_collection_item(&self.definition, item)
160    }
161
162    pub async fn put(
163        &mut self,
164        item: &T,
165        cid: &Cid,
166        previous: Option<&T>,
167    ) -> Result<CollectionState, CollectionError> {
168        self.put_with_context(item, cid, previous, None, None).await
169    }
170
171    pub async fn replace(
172        &mut self,
173        item: &T,
174        cid: &Cid,
175        previous: &T,
176    ) -> Result<CollectionState, CollectionError> {
177        self.replace_with_context(item, cid, previous, None, None)
178            .await
179    }
180
181    pub async fn put_with_context(
182        &mut self,
183        item: &T,
184        cid: &Cid,
185        previous: Option<&T>,
186        context: Option<&CollectionWriteContext>,
187        previous_context: Option<&CollectionWriteContext>,
188    ) -> Result<CollectionState, CollectionError> {
189        let next_item = self.normalize(item)?;
190        let id = self.definition.item_id(&next_item)?;
191        let previous_item = previous
192            .map(|previous| self.normalize(previous))
193            .transpose()?;
194
195        if previous_item.is_none() && self.has_derived_indexes() {
196            let existing = if let Some(root) = self.state.by_id_root.as_ref() {
197                self.index.get_link(Some(root), &id).await?
198            } else {
199                None
200            };
201            if existing.is_some() {
202                return Err(CollectionError::MissingPreviousForOverwrite { id });
203            }
204        }
205
206        if let Some(previous_item) = previous_item.as_ref() {
207            self.delete_normalized_with_context(previous_item, previous_context.or(context))
208                .await?;
209        }
210
211        self.put_normalized_with_context(&next_item, cid, context)
212            .await
213    }
214
215    pub async fn replace_with_context(
216        &mut self,
217        item: &T,
218        cid: &Cid,
219        previous: &T,
220        context: Option<&CollectionWriteContext>,
221        previous_context: Option<&CollectionWriteContext>,
222    ) -> Result<CollectionState, CollectionError> {
223        self.put_with_context(item, cid, Some(previous), context, previous_context)
224            .await
225    }
226
227    pub async fn delete(&mut self, item: &T) -> Result<CollectionState, CollectionError> {
228        self.delete_with_context(item, None).await
229    }
230
231    pub async fn delete_with_context(
232        &mut self,
233        item: &T,
234        context: Option<&CollectionWriteContext>,
235    ) -> Result<CollectionState, CollectionError> {
236        let next_item = self.normalize(item)?;
237        self.delete_normalized_with_context(&next_item, context)
238            .await
239    }
240
241    pub async fn rebuild<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
242    where
243        I: IntoIterator<Item = (T, Cid)>,
244    {
245        let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
246        for (item, cid) in entries {
247            let next_item = self.normalize(&item)?;
248            let id = self.definition.item_id(&next_item)?;
249            final_entries.insert(id, (next_item, cid, None));
250        }
251
252        self.rebuild_from_final_entries(final_entries).await
253    }
254
255    pub async fn reindex<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
256    where
257        I: IntoIterator<Item = (T, Cid)>,
258    {
259        self.rebuild(entries).await
260    }
261
262    pub async fn rebuild_with_context<I>(
263        &mut self,
264        entries: I,
265    ) -> Result<CollectionState, CollectionError>
266    where
267        I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
268    {
269        let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
270        for (item, cid, context) in entries {
271            let next_item = self.normalize(&item)?;
272            let id = self.definition.item_id(&next_item)?;
273            final_entries.insert(id, (next_item, cid, context));
274        }
275
276        self.rebuild_from_final_entries(final_entries).await
277    }
278
279    pub async fn reindex_with_context<I>(
280        &mut self,
281        entries: I,
282    ) -> Result<CollectionState, CollectionError>
283    where
284        I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
285    {
286        self.rebuild_with_context(entries).await
287    }
288
289    async fn put_normalized_with_context(
290        &mut self,
291        item: &T,
292        cid: &Cid,
293        context: Option<&CollectionWriteContext>,
294    ) -> Result<CollectionState, CollectionError> {
295        let id = self.definition.item_id(item)?;
296        self.state.by_id_root = Some(
297            self.index
298                .insert_link(self.state.by_id_root.as_ref(), &id, cid)
299                .await?,
300        );
301
302        for index in self.definition.key_indexes() {
303            let mut root = self.state.key_root(index.name()).cloned();
304            for key in index.materialize_keys(item) {
305                root = Some(self.index.insert_link(root.as_ref(), &key, cid).await?);
306            }
307            self.state.key_roots.insert(index.name().to_string(), root);
308        }
309
310        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
311        let entry_context = CollectionEntryContext {
312            id: &id,
313            cid: Some(cid),
314            write_context: context,
315        };
316        for index in self.definition.search_indexes() {
317            let Some(search_index) = self.search_indexes.get(index.name()) else {
318                continue;
319            };
320
321            let root_name = index.root_name().unwrap_or(index.name()).to_string();
322            let mut root = search_root_groups
323                .get(&root_name)
324                .cloned()
325                .unwrap_or_else(|| self.read_search_root_group(&root_name));
326
327            for entry in index.materialize_entries(item, &entry_context) {
328                let terms = search_index.parse_keywords(&entry.text);
329                if terms.is_empty() {
330                    continue;
331                }
332                let entry_id = entry.id.as_deref().unwrap_or(&id);
333                let Some(entry_cid) = entry.cid.as_ref().or(Some(cid)) else {
334                    continue;
335                };
336                let default_prefix = default_search_prefix(index.name());
337                root = Some(
338                    search_index
339                        .index_link(
340                            root.as_ref(),
341                            entry
342                                .prefix
343                                .as_deref()
344                                .or_else(|| index.prefix())
345                                .unwrap_or(&default_prefix),
346                            &terms,
347                            entry_id,
348                            entry_cid,
349                        )
350                        .await?,
351                );
352            }
353
354            search_root_groups.insert(root_name, root);
355        }
356
357        if !search_root_groups.is_empty() {
358            self.assign_search_root_groups(&search_root_groups);
359        }
360
361        Ok(self.snapshot())
362    }
363
364    async fn delete_normalized_with_context(
365        &mut self,
366        item: &T,
367        context: Option<&CollectionWriteContext>,
368    ) -> Result<CollectionState, CollectionError> {
369        let id = self.definition.item_id(item)?;
370        if let Some(root) = self.state.by_id_root.as_ref() {
371            self.state.by_id_root = self.index.delete(root, &id).await?;
372        }
373
374        for index in self.definition.key_indexes() {
375            let mut root = self.state.key_root(index.name()).cloned();
376            for key in index.materialize_keys(item) {
377                let Some(active_root) = root.as_ref() else {
378                    break;
379                };
380                root = self.index.delete(active_root, &key).await?;
381            }
382            self.state.key_roots.insert(index.name().to_string(), root);
383        }
384
385        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
386        let entry_context = CollectionEntryContext {
387            id: &id,
388            cid: None,
389            write_context: context,
390        };
391        for index in self.definition.search_indexes() {
392            let Some(search_index) = self.search_indexes.get(index.name()) else {
393                continue;
394            };
395
396            let root_name = index.root_name().unwrap_or(index.name()).to_string();
397            let mut root = search_root_groups
398                .get(&root_name)
399                .cloned()
400                .unwrap_or_else(|| self.read_search_root_group(&root_name));
401            let Some(existing_root) = root.clone() else {
402                continue;
403            };
404            root = Some(existing_root);
405
406            for entry in index.materialize_entries(item, &entry_context) {
407                let terms = search_index.parse_keywords(&entry.text);
408                if terms.is_empty() {
409                    continue;
410                }
411                let entry_id = entry.id.as_deref().unwrap_or(&id);
412                let Some(active_root) = root.as_ref() else {
413                    break;
414                };
415                let default_prefix = default_search_prefix(index.name());
416                root = search_index
417                    .remove_link(
418                        active_root,
419                        entry
420                            .prefix
421                            .as_deref()
422                            .or_else(|| index.prefix())
423                            .unwrap_or(&default_prefix),
424                        &terms,
425                        entry_id,
426                    )
427                    .await?;
428            }
429
430            search_root_groups.insert(root_name, root);
431        }
432
433        if !search_root_groups.is_empty() {
434            self.assign_search_root_groups(&search_root_groups);
435        } else {
436            for index in self.definition.search_indexes() {
437                let root_name = index.root_name().unwrap_or(index.name()).to_string();
438                self.state.search_roots.insert(
439                    index.name().to_string(),
440                    self.read_search_root_group(&root_name),
441                );
442            }
443        }
444
445        Ok(self.snapshot())
446    }
447
448    async fn rebuild_without_search(
449        &mut self,
450        final_entries: BTreeMap<String, (T, Cid, Option<CollectionWriteContext>)>,
451    ) -> Result<CollectionState, CollectionError> {
452        let mut by_id = BTreeMap::<String, Cid>::new();
453        let mut key_roots = self
454            .definition
455            .key_indexes()
456            .iter()
457            .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
458            .collect::<BTreeMap<_, _>>();
459
460        for (id, (item, cid, _context)) in final_entries {
461            by_id.insert(id, cid.clone());
462            for index in self.definition.key_indexes() {
463                let root = key_roots
464                    .get_mut(index.name())
465                    .expect("collection key root must exist");
466                for key in index.materialize_keys(&item) {
467                    root.insert(key, cid.clone());
468                }
469            }
470        }
471
472        self.state = create_empty_collection_state(&self.definition);
473        self.state.by_id_root = self.index.build_links(by_id).await?;
474        for index in self.definition.key_indexes() {
475            let root = self
476                .index
477                .build_links(key_roots.remove(index.name()).unwrap_or_default())
478                .await?;
479            self.state.key_roots.insert(index.name().to_string(), root);
480        }
481
482        Ok(self.snapshot())
483    }
484
485    async fn rebuild_from_final_entries(
486        &mut self,
487        final_entries: BTreeMap<String, (T, Cid, Option<CollectionWriteContext>)>,
488    ) -> Result<CollectionState, CollectionError> {
489        if self.definition.search_indexes().is_empty() {
490            return self.rebuild_without_search(final_entries).await;
491        }
492
493        let mut by_id = BTreeMap::<String, Cid>::new();
494        let mut key_roots = self
495            .definition
496            .key_indexes()
497            .iter()
498            .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
499            .collect::<BTreeMap<_, _>>();
500        let mut search_root_entries = BTreeMap::<String, BTreeMap<String, Cid>>::new();
501
502        for (id, (item, cid, context)) in final_entries {
503            by_id.insert(id.clone(), cid.clone());
504
505            for index in self.definition.key_indexes() {
506                let root = key_roots
507                    .get_mut(index.name())
508                    .expect("collection key root must exist");
509                for key in index.materialize_keys(&item) {
510                    root.insert(key, cid.clone());
511                }
512            }
513
514            let entry_context = CollectionEntryContext {
515                id: &id,
516                cid: Some(&cid),
517                write_context: context.as_ref(),
518            };
519
520            for index in self.definition.search_indexes() {
521                let Some(search_index) = self.search_indexes.get(index.name()) else {
522                    continue;
523                };
524
525                let root_name = index.root_name().unwrap_or(index.name()).to_string();
526                let root_entries = search_root_entries.entry(root_name).or_default();
527
528                for entry in index.materialize_entries(&item, &entry_context) {
529                    let terms = search_index.parse_keywords(&entry.text);
530                    if terms.is_empty() {
531                        continue;
532                    }
533
534                    let entry_id = entry.id.as_deref().unwrap_or(&id);
535                    let Some(entry_cid) = entry.cid.as_ref().or(Some(&cid)) else {
536                        continue;
537                    };
538                    let default_prefix = default_search_prefix(index.name());
539                    let prefix = entry
540                        .prefix
541                        .as_deref()
542                        .or_else(|| index.prefix())
543                        .unwrap_or(&default_prefix);
544
545                    for term in terms {
546                        root_entries
547                            .insert(format!("{prefix}{term}:{entry_id}"), entry_cid.clone());
548                    }
549                }
550            }
551        }
552
553        self.state = create_empty_collection_state(&self.definition);
554        self.state.by_id_root = self.index.build_links(by_id).await?;
555        for index in self.definition.key_indexes() {
556            let root = self
557                .index
558                .build_links(key_roots.remove(index.name()).unwrap_or_default())
559                .await?;
560            self.state.key_roots.insert(index.name().to_string(), root);
561        }
562
563        let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
564        for (root_name, root_entries) in search_root_entries {
565            let Some(search_index) = self.definition.search_indexes().iter().find_map(|index| {
566                ((index.root_name().unwrap_or(index.name())) == root_name)
567                    .then(|| self.search_indexes.get(index.name()))
568                    .flatten()
569            }) else {
570                continue;
571            };
572
573            search_root_groups.insert(root_name, search_index.build_links(root_entries).await?);
574        }
575
576        if !search_root_groups.is_empty() {
577            self.assign_search_root_groups(&search_root_groups);
578        }
579
580        Ok(self.snapshot())
581    }
582}