1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use hashtree_core::{Cid, DirEntry, HashTree, HashTreeConfig, LinkType, Store};
5use hashtree_index::{BTree, BTreeOptions, SearchIndex};
6
7use crate::helpers::merge_search_index_options;
8use crate::manifest::write_collection_manifest_metadata;
9use crate::schema::normalize_typed_collection_item;
10use crate::{
11 create_empty_collection_state, default_search_prefix, load_collection_state,
12 CollectionDefinition, CollectionEntryContext, CollectionError, CollectionOptions,
13 CollectionState, CollectionWriteContext, COLLECTION_MANIFEST_METADATA_FILE, MANIFEST_BY_ID,
14};
15
16pub struct CollectionWriter<S: Store, T> {
17 tree: HashTree<S>,
18 index: BTree<S>,
19 search_indexes: BTreeMap<String, SearchIndex<S>>,
20 definition: CollectionDefinition<T>,
21 state: CollectionState,
22}
23
24impl<S: Store, T> CollectionWriter<S, T> {
25 pub fn new(store: Arc<S>, definition: CollectionDefinition<T>) -> Self {
26 Self::with_options(store, definition, CollectionOptions::default())
27 }
28
29 pub fn with_options(
30 store: Arc<S>,
31 definition: CollectionDefinition<T>,
32 options: CollectionOptions,
33 ) -> Self {
34 let state = create_empty_collection_state(&definition);
35 Self::with_state_and_options(store, definition, state, options)
36 }
37
38 pub fn with_state(
39 store: Arc<S>,
40 definition: CollectionDefinition<T>,
41 state: CollectionState,
42 ) -> Self {
43 Self::with_state_and_options(store, definition, state, CollectionOptions::default())
44 }
45
46 pub fn with_state_and_options(
47 store: Arc<S>,
48 definition: CollectionDefinition<T>,
49 state: CollectionState,
50 options: CollectionOptions,
51 ) -> Self {
52 let search_indexes = definition
53 .search_indexes()
54 .iter()
55 .map(|index| {
56 (
57 index.name().to_string(),
58 SearchIndex::new(
59 Arc::clone(&store),
60 merge_search_index_options(index.options().clone(), &options),
61 ),
62 )
63 })
64 .collect();
65
66 Self {
67 tree: HashTree::new(HashTreeConfig::new(Arc::clone(&store))),
68 index: BTree::new(
69 store,
70 BTreeOptions {
71 order: options.btree_order,
72 },
73 ),
74 search_indexes,
75 definition,
76 state,
77 }
78 }
79
80 pub async fn from_root(
81 store: Arc<S>,
82 definition: CollectionDefinition<T>,
83 root: Option<&Cid>,
84 options: CollectionOptions,
85 ) -> Result<Self, CollectionError> {
86 let state = load_collection_state(Arc::clone(&store), &definition, root).await?;
87 Ok(Self::with_state_and_options(
88 store, definition, state, options,
89 ))
90 }
91
92 pub fn snapshot(&self) -> CollectionState {
93 self.state.clone()
94 }
95
96 pub fn state(&self) -> &CollectionState {
97 &self.state
98 }
99
100 pub async fn write_root(&self) -> Result<Option<Cid>, CollectionError> {
101 let mut entries = Vec::new();
102 if let Some(cid) = self.state.by_id_root.as_ref() {
103 entries.push(DirEntry::from_cid(MANIFEST_BY_ID, cid).with_link_type(LinkType::Dir));
104 }
105 for index in self.definition.key_indexes() {
106 if let Some(cid) = self.state.key_root(index.name()) {
107 entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
108 }
109 }
110 for index in self.definition.search_indexes() {
111 if let Some(cid) = self.state.search_root(index.name()) {
112 entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
113 }
114 }
115 if let Some((cid, size)) =
116 write_collection_manifest_metadata(&self.tree, &self.definition).await?
117 {
118 entries.push(
119 DirEntry::from_cid(COLLECTION_MANIFEST_METADATA_FILE, &cid)
120 .with_size(size)
121 .with_link_type(LinkType::File),
122 );
123 }
124
125 if entries.is_empty() {
126 return Ok(None);
127 }
128
129 Ok(Some(self.tree.put_directory(entries).await?))
130 }
131
132 fn read_search_root_group(&self, root_name: &str) -> Option<Cid> {
133 for index in self.definition.search_indexes() {
134 if index.root_name().unwrap_or(index.name()) == root_name {
135 return self.state.search_root(index.name()).cloned();
136 }
137 }
138 None
139 }
140
141 fn assign_search_root_groups(&mut self, groups: &BTreeMap<String, Option<Cid>>) {
142 for index in self.definition.search_indexes() {
143 let root_name = index.root_name().unwrap_or(index.name());
144 if let Some(root) = groups.get(root_name) {
145 self.state
146 .search_roots
147 .insert(index.name().to_string(), root.clone());
148 }
149 }
150 }
151
152 fn has_derived_indexes(&self) -> bool {
153 !self.definition.key_indexes().is_empty() || !self.definition.search_indexes().is_empty()
154 }
155}
156
157impl<S: Store, T: Clone> CollectionWriter<S, T> {
158 pub fn normalize(&self, item: &T) -> Result<T, CollectionError> {
159 normalize_typed_collection_item(&self.definition, item)
160 }
161
162 pub async fn put(
163 &mut self,
164 item: &T,
165 cid: &Cid,
166 previous: Option<&T>,
167 ) -> Result<CollectionState, CollectionError> {
168 self.put_with_context(item, cid, previous, None, None).await
169 }
170
171 pub async fn replace(
172 &mut self,
173 item: &T,
174 cid: &Cid,
175 previous: &T,
176 ) -> Result<CollectionState, CollectionError> {
177 self.replace_with_context(item, cid, previous, None, None)
178 .await
179 }
180
181 pub async fn put_with_context(
182 &mut self,
183 item: &T,
184 cid: &Cid,
185 previous: Option<&T>,
186 context: Option<&CollectionWriteContext>,
187 previous_context: Option<&CollectionWriteContext>,
188 ) -> Result<CollectionState, CollectionError> {
189 let next_item = self.normalize(item)?;
190 let id = self.definition.item_id(&next_item)?;
191 let previous_item = previous
192 .map(|previous| self.normalize(previous))
193 .transpose()?;
194
195 if previous_item.is_none() && self.has_derived_indexes() {
196 let existing = if let Some(root) = self.state.by_id_root.as_ref() {
197 self.index.get_link(Some(root), &id).await?
198 } else {
199 None
200 };
201 if existing.is_some() {
202 return Err(CollectionError::MissingPreviousForOverwrite { id });
203 }
204 }
205
206 if let Some(previous_item) = previous_item.as_ref() {
207 self.delete_normalized_with_context(previous_item, previous_context.or(context))
208 .await?;
209 }
210
211 self.put_normalized_with_context(&next_item, cid, context)
212 .await
213 }
214
215 pub async fn replace_with_context(
216 &mut self,
217 item: &T,
218 cid: &Cid,
219 previous: &T,
220 context: Option<&CollectionWriteContext>,
221 previous_context: Option<&CollectionWriteContext>,
222 ) -> Result<CollectionState, CollectionError> {
223 self.put_with_context(item, cid, Some(previous), context, previous_context)
224 .await
225 }
226
227 pub async fn delete(&mut self, item: &T) -> Result<CollectionState, CollectionError> {
228 self.delete_with_context(item, None).await
229 }
230
231 pub async fn delete_with_context(
232 &mut self,
233 item: &T,
234 context: Option<&CollectionWriteContext>,
235 ) -> Result<CollectionState, CollectionError> {
236 let next_item = self.normalize(item)?;
237 self.delete_normalized_with_context(&next_item, context)
238 .await
239 }
240
241 pub async fn rebuild<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
242 where
243 I: IntoIterator<Item = (T, Cid)>,
244 {
245 let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
246 for (item, cid) in entries {
247 let next_item = self.normalize(&item)?;
248 let id = self.definition.item_id(&next_item)?;
249 final_entries.insert(id, (next_item, cid, None));
250 }
251
252 self.rebuild_from_final_entries(final_entries).await
253 }
254
255 pub async fn reindex<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
256 where
257 I: IntoIterator<Item = (T, Cid)>,
258 {
259 self.rebuild(entries).await
260 }
261
262 pub async fn rebuild_with_context<I>(
263 &mut self,
264 entries: I,
265 ) -> Result<CollectionState, CollectionError>
266 where
267 I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
268 {
269 let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
270 for (item, cid, context) in entries {
271 let next_item = self.normalize(&item)?;
272 let id = self.definition.item_id(&next_item)?;
273 final_entries.insert(id, (next_item, cid, context));
274 }
275
276 self.rebuild_from_final_entries(final_entries).await
277 }
278
279 pub async fn reindex_with_context<I>(
280 &mut self,
281 entries: I,
282 ) -> Result<CollectionState, CollectionError>
283 where
284 I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
285 {
286 self.rebuild_with_context(entries).await
287 }
288
289 async fn put_normalized_with_context(
290 &mut self,
291 item: &T,
292 cid: &Cid,
293 context: Option<&CollectionWriteContext>,
294 ) -> Result<CollectionState, CollectionError> {
295 let id = self.definition.item_id(item)?;
296 self.state.by_id_root = Some(
297 self.index
298 .insert_link(self.state.by_id_root.as_ref(), &id, cid)
299 .await?,
300 );
301
302 for index in self.definition.key_indexes() {
303 let mut root = self.state.key_root(index.name()).cloned();
304 for key in index.materialize_keys(item) {
305 root = Some(self.index.insert_link(root.as_ref(), &key, cid).await?);
306 }
307 self.state.key_roots.insert(index.name().to_string(), root);
308 }
309
310 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
311 let entry_context = CollectionEntryContext {
312 id: &id,
313 cid: Some(cid),
314 write_context: context,
315 };
316 for index in self.definition.search_indexes() {
317 let Some(search_index) = self.search_indexes.get(index.name()) else {
318 continue;
319 };
320
321 let root_name = index.root_name().unwrap_or(index.name()).to_string();
322 let mut root = search_root_groups
323 .get(&root_name)
324 .cloned()
325 .unwrap_or_else(|| self.read_search_root_group(&root_name));
326
327 for entry in index.materialize_entries(item, &entry_context) {
328 let terms = search_index.parse_keywords(&entry.text);
329 if terms.is_empty() {
330 continue;
331 }
332 let entry_id = entry.id.as_deref().unwrap_or(&id);
333 let Some(entry_cid) = entry.cid.as_ref().or(Some(cid)) else {
334 continue;
335 };
336 let default_prefix = default_search_prefix(index.name());
337 root = Some(
338 search_index
339 .index_link(
340 root.as_ref(),
341 entry
342 .prefix
343 .as_deref()
344 .or_else(|| index.prefix())
345 .unwrap_or(&default_prefix),
346 &terms,
347 entry_id,
348 entry_cid,
349 )
350 .await?,
351 );
352 }
353
354 search_root_groups.insert(root_name, root);
355 }
356
357 if !search_root_groups.is_empty() {
358 self.assign_search_root_groups(&search_root_groups);
359 }
360
361 Ok(self.snapshot())
362 }
363
364 async fn delete_normalized_with_context(
365 &mut self,
366 item: &T,
367 context: Option<&CollectionWriteContext>,
368 ) -> Result<CollectionState, CollectionError> {
369 let id = self.definition.item_id(item)?;
370 if let Some(root) = self.state.by_id_root.as_ref() {
371 self.state.by_id_root = self.index.delete(root, &id).await?;
372 }
373
374 for index in self.definition.key_indexes() {
375 let mut root = self.state.key_root(index.name()).cloned();
376 for key in index.materialize_keys(item) {
377 let Some(active_root) = root.as_ref() else {
378 break;
379 };
380 root = self.index.delete(active_root, &key).await?;
381 }
382 self.state.key_roots.insert(index.name().to_string(), root);
383 }
384
385 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
386 let entry_context = CollectionEntryContext {
387 id: &id,
388 cid: None,
389 write_context: context,
390 };
391 for index in self.definition.search_indexes() {
392 let Some(search_index) = self.search_indexes.get(index.name()) else {
393 continue;
394 };
395
396 let root_name = index.root_name().unwrap_or(index.name()).to_string();
397 let mut root = search_root_groups
398 .get(&root_name)
399 .cloned()
400 .unwrap_or_else(|| self.read_search_root_group(&root_name));
401 let Some(existing_root) = root.clone() else {
402 continue;
403 };
404 root = Some(existing_root);
405
406 for entry in index.materialize_entries(item, &entry_context) {
407 let terms = search_index.parse_keywords(&entry.text);
408 if terms.is_empty() {
409 continue;
410 }
411 let entry_id = entry.id.as_deref().unwrap_or(&id);
412 let Some(active_root) = root.as_ref() else {
413 break;
414 };
415 let default_prefix = default_search_prefix(index.name());
416 root = search_index
417 .remove_link(
418 active_root,
419 entry
420 .prefix
421 .as_deref()
422 .or_else(|| index.prefix())
423 .unwrap_or(&default_prefix),
424 &terms,
425 entry_id,
426 )
427 .await?;
428 }
429
430 search_root_groups.insert(root_name, root);
431 }
432
433 if !search_root_groups.is_empty() {
434 self.assign_search_root_groups(&search_root_groups);
435 } else {
436 for index in self.definition.search_indexes() {
437 let root_name = index.root_name().unwrap_or(index.name()).to_string();
438 self.state.search_roots.insert(
439 index.name().to_string(),
440 self.read_search_root_group(&root_name),
441 );
442 }
443 }
444
445 Ok(self.snapshot())
446 }
447
448 async fn rebuild_without_search(
449 &mut self,
450 final_entries: BTreeMap<String, (T, Cid, Option<CollectionWriteContext>)>,
451 ) -> Result<CollectionState, CollectionError> {
452 let mut by_id = BTreeMap::<String, Cid>::new();
453 let mut key_roots = self
454 .definition
455 .key_indexes()
456 .iter()
457 .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
458 .collect::<BTreeMap<_, _>>();
459
460 for (id, (item, cid, _context)) in final_entries {
461 by_id.insert(id, cid.clone());
462 for index in self.definition.key_indexes() {
463 let root = key_roots
464 .get_mut(index.name())
465 .expect("collection key root must exist");
466 for key in index.materialize_keys(&item) {
467 root.insert(key, cid.clone());
468 }
469 }
470 }
471
472 self.state = create_empty_collection_state(&self.definition);
473 self.state.by_id_root = self.index.build_links(by_id).await?;
474 for index in self.definition.key_indexes() {
475 let root = self
476 .index
477 .build_links(key_roots.remove(index.name()).unwrap_or_default())
478 .await?;
479 self.state.key_roots.insert(index.name().to_string(), root);
480 }
481
482 Ok(self.snapshot())
483 }
484
485 async fn rebuild_from_final_entries(
486 &mut self,
487 final_entries: BTreeMap<String, (T, Cid, Option<CollectionWriteContext>)>,
488 ) -> Result<CollectionState, CollectionError> {
489 if self.definition.search_indexes().is_empty() {
490 return self.rebuild_without_search(final_entries).await;
491 }
492
493 let mut by_id = BTreeMap::<String, Cid>::new();
494 let mut key_roots = self
495 .definition
496 .key_indexes()
497 .iter()
498 .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
499 .collect::<BTreeMap<_, _>>();
500 let mut search_root_entries = BTreeMap::<String, BTreeMap<String, Cid>>::new();
501
502 for (id, (item, cid, context)) in final_entries {
503 by_id.insert(id.clone(), cid.clone());
504
505 for index in self.definition.key_indexes() {
506 let root = key_roots
507 .get_mut(index.name())
508 .expect("collection key root must exist");
509 for key in index.materialize_keys(&item) {
510 root.insert(key, cid.clone());
511 }
512 }
513
514 let entry_context = CollectionEntryContext {
515 id: &id,
516 cid: Some(&cid),
517 write_context: context.as_ref(),
518 };
519
520 for index in self.definition.search_indexes() {
521 let Some(search_index) = self.search_indexes.get(index.name()) else {
522 continue;
523 };
524
525 let root_name = index.root_name().unwrap_or(index.name()).to_string();
526 let root_entries = search_root_entries.entry(root_name).or_default();
527
528 for entry in index.materialize_entries(&item, &entry_context) {
529 let terms = search_index.parse_keywords(&entry.text);
530 if terms.is_empty() {
531 continue;
532 }
533
534 let entry_id = entry.id.as_deref().unwrap_or(&id);
535 let Some(entry_cid) = entry.cid.as_ref().or(Some(&cid)) else {
536 continue;
537 };
538 let default_prefix = default_search_prefix(index.name());
539 let prefix = entry
540 .prefix
541 .as_deref()
542 .or_else(|| index.prefix())
543 .unwrap_or(&default_prefix);
544
545 for term in terms {
546 root_entries
547 .insert(format!("{prefix}{term}:{entry_id}"), entry_cid.clone());
548 }
549 }
550 }
551 }
552
553 self.state = create_empty_collection_state(&self.definition);
554 self.state.by_id_root = self.index.build_links(by_id).await?;
555 for index in self.definition.key_indexes() {
556 let root = self
557 .index
558 .build_links(key_roots.remove(index.name()).unwrap_or_default())
559 .await?;
560 self.state.key_roots.insert(index.name().to_string(), root);
561 }
562
563 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
564 for (root_name, root_entries) in search_root_entries {
565 let Some(search_index) = self.definition.search_indexes().iter().find_map(|index| {
566 ((index.root_name().unwrap_or(index.name())) == root_name)
567 .then(|| self.search_indexes.get(index.name()))
568 .flatten()
569 }) else {
570 continue;
571 };
572
573 search_root_groups.insert(root_name, search_index.build_links(root_entries).await?);
574 }
575
576 if !search_root_groups.is_empty() {
577 self.assign_search_root_groups(&search_root_groups);
578 }
579
580 Ok(self.snapshot())
581 }
582}