1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use hashtree_core::{Cid, DirEntry, HashTree, HashTreeConfig, LinkType, Store};
5use hashtree_index::{BTree, BTreeOptions, SearchIndex};
6
7use crate::helpers::merge_search_index_options;
8use crate::manifest::write_collection_manifest_metadata;
9use crate::schema::normalize_typed_collection_item;
10use crate::{
11 create_empty_collection_state, default_search_prefix, load_collection_state,
12 CollectionDefinition, CollectionEntryContext, CollectionError, CollectionOptions,
13 CollectionState, CollectionWriteContext, COLLECTION_MANIFEST_METADATA_FILE, MANIFEST_BY_ID,
14};
15
16pub struct CollectionWriter<S: Store, T> {
17 tree: HashTree<S>,
18 index: BTree<S>,
19 search_indexes: BTreeMap<String, SearchIndex<S>>,
20 definition: CollectionDefinition<T>,
21 state: CollectionState,
22}
23
24impl<S: Store, T> CollectionWriter<S, T> {
25 pub fn new(store: Arc<S>, definition: CollectionDefinition<T>) -> Self {
26 Self::with_options(store, definition, CollectionOptions::default())
27 }
28
29 pub fn with_options(
30 store: Arc<S>,
31 definition: CollectionDefinition<T>,
32 options: CollectionOptions,
33 ) -> Self {
34 let state = create_empty_collection_state(&definition);
35 Self::with_state_and_options(store, definition, state, options)
36 }
37
38 pub fn with_state(
39 store: Arc<S>,
40 definition: CollectionDefinition<T>,
41 state: CollectionState,
42 ) -> Self {
43 Self::with_state_and_options(store, definition, state, CollectionOptions::default())
44 }
45
46 pub fn with_state_and_options(
47 store: Arc<S>,
48 definition: CollectionDefinition<T>,
49 state: CollectionState,
50 options: CollectionOptions,
51 ) -> Self {
52 let search_indexes = definition
53 .search_indexes()
54 .iter()
55 .map(|index| {
56 (
57 index.name().to_string(),
58 SearchIndex::new(
59 Arc::clone(&store),
60 merge_search_index_options(index.options().clone(), &options),
61 ),
62 )
63 })
64 .collect();
65
66 Self {
67 tree: HashTree::new(HashTreeConfig::new(Arc::clone(&store))),
68 index: BTree::new(
69 store,
70 BTreeOptions {
71 order: options.btree_order,
72 },
73 ),
74 search_indexes,
75 definition,
76 state,
77 }
78 }
79
80 pub async fn from_root(
81 store: Arc<S>,
82 definition: CollectionDefinition<T>,
83 root: Option<&Cid>,
84 options: CollectionOptions,
85 ) -> Result<Self, CollectionError> {
86 let state = load_collection_state(Arc::clone(&store), &definition, root).await?;
87 Ok(Self::with_state_and_options(
88 store, definition, state, options,
89 ))
90 }
91
92 pub fn snapshot(&self) -> CollectionState {
93 self.state.clone()
94 }
95
96 pub fn state(&self) -> &CollectionState {
97 &self.state
98 }
99
100 pub async fn write_root(&self) -> Result<Option<Cid>, CollectionError> {
101 let mut entries = Vec::new();
102 if let Some(cid) = self.state.by_id_root.as_ref() {
103 entries.push(DirEntry::from_cid(MANIFEST_BY_ID, cid).with_link_type(LinkType::Dir));
104 }
105 for index in self.definition.key_indexes() {
106 if let Some(cid) = self.state.key_root(index.name()) {
107 entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
108 }
109 }
110 for index in self.definition.search_indexes() {
111 if let Some(cid) = self.state.search_root(index.name()) {
112 entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
113 }
114 }
115 if let Some((cid, size)) =
116 write_collection_manifest_metadata(&self.tree, &self.definition).await?
117 {
118 entries.push(
119 DirEntry::from_cid(COLLECTION_MANIFEST_METADATA_FILE, &cid)
120 .with_size(size)
121 .with_link_type(LinkType::File),
122 );
123 }
124
125 if entries.is_empty() {
126 return Ok(None);
127 }
128
129 Ok(Some(self.tree.put_directory(entries).await?))
130 }
131
132 fn read_search_root_group(&self, root_name: &str) -> Option<Cid> {
133 for index in self.definition.search_indexes() {
134 if index.root_name().unwrap_or(index.name()) == root_name {
135 return self.state.search_root(index.name()).cloned();
136 }
137 }
138 None
139 }
140
141 fn assign_search_root_groups(&mut self, groups: &BTreeMap<String, Option<Cid>>) {
142 for index in self.definition.search_indexes() {
143 let root_name = index.root_name().unwrap_or(index.name());
144 if let Some(root) = groups.get(root_name) {
145 self.state
146 .search_roots
147 .insert(index.name().to_string(), root.clone());
148 }
149 }
150 }
151
152 fn has_derived_indexes(&self) -> bool {
153 !self.definition.key_indexes().is_empty() || !self.definition.search_indexes().is_empty()
154 }
155}
156
157impl<S: Store, T: Clone> CollectionWriter<S, T> {
158 pub fn normalize(&self, item: &T) -> Result<T, CollectionError> {
159 normalize_typed_collection_item(&self.definition, item)
160 }
161
162 pub async fn put(
163 &mut self,
164 item: &T,
165 cid: &Cid,
166 previous: Option<&T>,
167 ) -> Result<CollectionState, CollectionError> {
168 self.put_with_context(item, cid, previous, None, None).await
169 }
170
171 pub async fn replace(
172 &mut self,
173 item: &T,
174 cid: &Cid,
175 previous: &T,
176 ) -> Result<CollectionState, CollectionError> {
177 self.replace_with_context(item, cid, previous, None, None).await
178 }
179
180 pub async fn put_with_context(
181 &mut self,
182 item: &T,
183 cid: &Cid,
184 previous: Option<&T>,
185 context: Option<&CollectionWriteContext>,
186 previous_context: Option<&CollectionWriteContext>,
187 ) -> Result<CollectionState, CollectionError> {
188 let next_item = self.normalize(item)?;
189 let id = self.definition.item_id(&next_item)?;
190 let previous_item = previous
191 .map(|previous| self.normalize(previous))
192 .transpose()?;
193
194 if previous_item.is_none() && self.has_derived_indexes() {
195 let existing = if let Some(root) = self.state.by_id_root.as_ref() {
196 self.index.get_link(Some(root), &id).await?
197 } else {
198 None
199 };
200 if existing.is_some() {
201 return Err(CollectionError::MissingPreviousForOverwrite { id });
202 }
203 }
204
205 if let Some(previous_item) = previous_item.as_ref() {
206 self.delete_normalized_with_context(previous_item, previous_context.or(context))
207 .await?;
208 }
209
210 self.put_normalized_with_context(&next_item, cid, context)
211 .await
212 }
213
214 pub async fn replace_with_context(
215 &mut self,
216 item: &T,
217 cid: &Cid,
218 previous: &T,
219 context: Option<&CollectionWriteContext>,
220 previous_context: Option<&CollectionWriteContext>,
221 ) -> Result<CollectionState, CollectionError> {
222 self.put_with_context(item, cid, Some(previous), context, previous_context)
223 .await
224 }
225
226 pub async fn delete(&mut self, item: &T) -> Result<CollectionState, CollectionError> {
227 self.delete_with_context(item, None).await
228 }
229
230 pub async fn delete_with_context(
231 &mut self,
232 item: &T,
233 context: Option<&CollectionWriteContext>,
234 ) -> Result<CollectionState, CollectionError> {
235 let next_item = self.normalize(item)?;
236 self.delete_normalized_with_context(&next_item, context)
237 .await
238 }
239
240 pub async fn rebuild<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
241 where
242 I: IntoIterator<Item = (T, Cid)>,
243 {
244 let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
245 for (item, cid) in entries {
246 let next_item = self.normalize(&item)?;
247 let id = self.definition.item_id(&next_item)?;
248 final_entries.insert(id, (next_item, cid, None));
249 }
250
251 self.rebuild_from_final_entries(final_entries).await
252 }
253
254 pub async fn reindex<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
255 where
256 I: IntoIterator<Item = (T, Cid)>,
257 {
258 self.rebuild(entries).await
259 }
260
261 pub async fn rebuild_with_context<I>(
262 &mut self,
263 entries: I,
264 ) -> Result<CollectionState, CollectionError>
265 where
266 I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
267 {
268 let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
269 for (item, cid, context) in entries {
270 let next_item = self.normalize(&item)?;
271 let id = self.definition.item_id(&next_item)?;
272 final_entries.insert(id, (next_item, cid, context));
273 }
274
275 self.rebuild_from_final_entries(final_entries).await
276 }
277
278 pub async fn reindex_with_context<I>(
279 &mut self,
280 entries: I,
281 ) -> Result<CollectionState, CollectionError>
282 where
283 I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
284 {
285 self.rebuild_with_context(entries).await
286 }
287
288 async fn put_normalized_with_context(
289 &mut self,
290 item: &T,
291 cid: &Cid,
292 context: Option<&CollectionWriteContext>,
293 ) -> Result<CollectionState, CollectionError> {
294 let id = self.definition.item_id(item)?;
295 self.state.by_id_root = Some(
296 self.index
297 .insert_link(self.state.by_id_root.as_ref(), &id, cid)
298 .await?,
299 );
300
301 for index in self.definition.key_indexes() {
302 let mut root = self.state.key_root(index.name()).cloned();
303 for key in index.materialize_keys(item) {
304 root = Some(self.index.insert_link(root.as_ref(), &key, cid).await?);
305 }
306 self.state.key_roots.insert(index.name().to_string(), root);
307 }
308
309 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
310 let entry_context = CollectionEntryContext {
311 id: &id,
312 cid: Some(cid),
313 write_context: context,
314 };
315 for index in self.definition.search_indexes() {
316 let Some(search_index) = self.search_indexes.get(index.name()) else {
317 continue;
318 };
319
320 let root_name = index.root_name().unwrap_or(index.name()).to_string();
321 let mut root = search_root_groups
322 .get(&root_name)
323 .cloned()
324 .unwrap_or_else(|| self.read_search_root_group(&root_name));
325
326 for entry in index.materialize_entries(item, &entry_context) {
327 let terms = search_index.parse_keywords(&entry.text);
328 if terms.is_empty() {
329 continue;
330 }
331 let entry_id = entry.id.as_deref().unwrap_or(&id);
332 let Some(entry_cid) = entry.cid.as_ref().or(Some(cid)) else {
333 continue;
334 };
335 let default_prefix = default_search_prefix(index.name());
336 root = Some(
337 search_index
338 .index_link(
339 root.as_ref(),
340 entry
341 .prefix
342 .as_deref()
343 .or_else(|| index.prefix())
344 .unwrap_or(&default_prefix),
345 &terms,
346 entry_id,
347 entry_cid,
348 )
349 .await?,
350 );
351 }
352
353 search_root_groups.insert(root_name, root);
354 }
355
356 if !search_root_groups.is_empty() {
357 self.assign_search_root_groups(&search_root_groups);
358 }
359
360 Ok(self.snapshot())
361 }
362
363 async fn delete_normalized_with_context(
364 &mut self,
365 item: &T,
366 context: Option<&CollectionWriteContext>,
367 ) -> Result<CollectionState, CollectionError> {
368 let id = self.definition.item_id(item)?;
369 if let Some(root) = self.state.by_id_root.as_ref() {
370 self.state.by_id_root = self.index.delete(root, &id).await?;
371 }
372
373 for index in self.definition.key_indexes() {
374 let mut root = self.state.key_root(index.name()).cloned();
375 for key in index.materialize_keys(item) {
376 let Some(active_root) = root.as_ref() else {
377 break;
378 };
379 root = self.index.delete(active_root, &key).await?;
380 }
381 self.state.key_roots.insert(index.name().to_string(), root);
382 }
383
384 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
385 let entry_context = CollectionEntryContext {
386 id: &id,
387 cid: None,
388 write_context: context,
389 };
390 for index in self.definition.search_indexes() {
391 let Some(search_index) = self.search_indexes.get(index.name()) else {
392 continue;
393 };
394
395 let root_name = index.root_name().unwrap_or(index.name()).to_string();
396 let mut root = search_root_groups
397 .get(&root_name)
398 .cloned()
399 .unwrap_or_else(|| self.read_search_root_group(&root_name));
400 let Some(existing_root) = root.clone() else {
401 continue;
402 };
403 root = Some(existing_root);
404
405 for entry in index.materialize_entries(item, &entry_context) {
406 let terms = search_index.parse_keywords(&entry.text);
407 if terms.is_empty() {
408 continue;
409 }
410 let entry_id = entry.id.as_deref().unwrap_or(&id);
411 let Some(active_root) = root.as_ref() else {
412 break;
413 };
414 let default_prefix = default_search_prefix(index.name());
415 root = search_index
416 .remove_link(
417 active_root,
418 entry
419 .prefix
420 .as_deref()
421 .or_else(|| index.prefix())
422 .unwrap_or(&default_prefix),
423 &terms,
424 entry_id,
425 )
426 .await?;
427 }
428
429 search_root_groups.insert(root_name, root);
430 }
431
432 if !search_root_groups.is_empty() {
433 self.assign_search_root_groups(&search_root_groups);
434 } else {
435 for index in self.definition.search_indexes() {
436 let root_name = index.root_name().unwrap_or(index.name()).to_string();
437 self.state.search_roots.insert(
438 index.name().to_string(),
439 self.read_search_root_group(&root_name),
440 );
441 }
442 }
443
444 Ok(self.snapshot())
445 }
446
447 async fn rebuild_without_search(
448 &mut self,
449 final_entries: BTreeMap<String, (T, Cid, Option<CollectionWriteContext>)>,
450 ) -> Result<CollectionState, CollectionError> {
451 let mut by_id = BTreeMap::<String, Cid>::new();
452 let mut key_roots = self
453 .definition
454 .key_indexes()
455 .iter()
456 .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
457 .collect::<BTreeMap<_, _>>();
458
459 for (id, (item, cid, _context)) in final_entries {
460 by_id.insert(id, cid.clone());
461 for index in self.definition.key_indexes() {
462 let root = key_roots
463 .get_mut(index.name())
464 .expect("collection key root must exist");
465 for key in index.materialize_keys(&item) {
466 root.insert(key, cid.clone());
467 }
468 }
469 }
470
471 self.state = create_empty_collection_state(&self.definition);
472 self.state.by_id_root = self.index.build_links(by_id).await?;
473 for index in self.definition.key_indexes() {
474 let root = self
475 .index
476 .build_links(key_roots.remove(index.name()).unwrap_or_default())
477 .await?;
478 self.state.key_roots.insert(index.name().to_string(), root);
479 }
480
481 Ok(self.snapshot())
482 }
483
484 async fn rebuild_from_final_entries(
485 &mut self,
486 final_entries: BTreeMap<String, (T, Cid, Option<CollectionWriteContext>)>,
487 ) -> Result<CollectionState, CollectionError> {
488 if self.definition.search_indexes().is_empty() {
489 return self.rebuild_without_search(final_entries).await;
490 }
491
492 let mut by_id = BTreeMap::<String, Cid>::new();
493 let mut key_roots = self
494 .definition
495 .key_indexes()
496 .iter()
497 .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
498 .collect::<BTreeMap<_, _>>();
499 let mut search_root_entries = BTreeMap::<String, BTreeMap<String, Cid>>::new();
500
501 for (id, (item, cid, context)) in final_entries {
502 by_id.insert(id.clone(), cid.clone());
503
504 for index in self.definition.key_indexes() {
505 let root = key_roots
506 .get_mut(index.name())
507 .expect("collection key root must exist");
508 for key in index.materialize_keys(&item) {
509 root.insert(key, cid.clone());
510 }
511 }
512
513 let entry_context = CollectionEntryContext {
514 id: &id,
515 cid: Some(&cid),
516 write_context: context.as_ref(),
517 };
518
519 for index in self.definition.search_indexes() {
520 let Some(search_index) = self.search_indexes.get(index.name()) else {
521 continue;
522 };
523
524 let root_name = index.root_name().unwrap_or(index.name()).to_string();
525 let root_entries = search_root_entries.entry(root_name).or_default();
526
527 for entry in index.materialize_entries(&item, &entry_context) {
528 let terms = search_index.parse_keywords(&entry.text);
529 if terms.is_empty() {
530 continue;
531 }
532
533 let entry_id = entry.id.as_deref().unwrap_or(&id);
534 let Some(entry_cid) = entry.cid.as_ref().or(Some(&cid)) else {
535 continue;
536 };
537 let default_prefix = default_search_prefix(index.name());
538 let prefix = entry
539 .prefix
540 .as_deref()
541 .or_else(|| index.prefix())
542 .unwrap_or(&default_prefix);
543
544 for term in terms {
545 root_entries
546 .insert(format!("{prefix}{term}:{entry_id}"), entry_cid.clone());
547 }
548 }
549 }
550 }
551
552 self.state = create_empty_collection_state(&self.definition);
553 self.state.by_id_root = self.index.build_links(by_id).await?;
554 for index in self.definition.key_indexes() {
555 let root = self
556 .index
557 .build_links(key_roots.remove(index.name()).unwrap_or_default())
558 .await?;
559 self.state.key_roots.insert(index.name().to_string(), root);
560 }
561
562 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
563 for (root_name, root_entries) in search_root_entries {
564 let Some(search_index) = self.definition.search_indexes().iter().find_map(|index| {
565 ((index.root_name().unwrap_or(index.name())) == root_name)
566 .then(|| self.search_indexes.get(index.name()))
567 .flatten()
568 }) else {
569 continue;
570 };
571
572 search_root_groups.insert(root_name, search_index.build_links(root_entries).await?);
573 }
574
575 if !search_root_groups.is_empty() {
576 self.assign_search_root_groups(&search_root_groups);
577 }
578
579 Ok(self.snapshot())
580 }
581}