1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use hashtree_core::{Cid, DirEntry, HashTree, HashTreeConfig, LinkType, Store};
5use hashtree_index::{BTree, BTreeOptions, SearchIndex};
6
7use crate::helpers::merge_search_index_options;
8use crate::manifest::write_collection_manifest_metadata;
9use crate::schema::normalize_typed_collection_item;
10use crate::{
11 create_empty_collection_state, default_search_prefix, load_collection_state,
12 CollectionDefinition, CollectionEntryContext, CollectionError, CollectionOptions,
13 CollectionState, CollectionWriteContext, COLLECTION_MANIFEST_METADATA_FILE, MANIFEST_BY_ID,
14};
15
16pub struct CollectionWriter<S: Store, T> {
17 tree: HashTree<S>,
18 index: BTree<S>,
19 search_indexes: BTreeMap<String, SearchIndex<S>>,
20 definition: CollectionDefinition<T>,
21 state: CollectionState,
22}
23
24impl<S: Store, T> CollectionWriter<S, T> {
25 pub fn new(store: Arc<S>, definition: CollectionDefinition<T>) -> Self {
26 Self::with_options(store, definition, CollectionOptions::default())
27 }
28
29 pub fn with_options(
30 store: Arc<S>,
31 definition: CollectionDefinition<T>,
32 options: CollectionOptions,
33 ) -> Self {
34 let state = create_empty_collection_state(&definition);
35 Self::with_state_and_options(store, definition, state, options)
36 }
37
38 pub fn with_state(
39 store: Arc<S>,
40 definition: CollectionDefinition<T>,
41 state: CollectionState,
42 ) -> Self {
43 Self::with_state_and_options(store, definition, state, CollectionOptions::default())
44 }
45
46 pub fn with_state_and_options(
47 store: Arc<S>,
48 definition: CollectionDefinition<T>,
49 state: CollectionState,
50 options: CollectionOptions,
51 ) -> Self {
52 let search_indexes = definition
53 .search_indexes()
54 .iter()
55 .map(|index| {
56 (
57 index.name().to_string(),
58 SearchIndex::new(
59 Arc::clone(&store),
60 merge_search_index_options(index.options().clone(), &options),
61 ),
62 )
63 })
64 .collect();
65
66 Self {
67 tree: HashTree::new(HashTreeConfig::new(Arc::clone(&store))),
68 index: BTree::new(
69 store,
70 BTreeOptions {
71 order: options.btree_order,
72 },
73 ),
74 search_indexes,
75 definition,
76 state,
77 }
78 }
79
80 pub async fn from_root(
81 store: Arc<S>,
82 definition: CollectionDefinition<T>,
83 root: Option<&Cid>,
84 options: CollectionOptions,
85 ) -> Result<Self, CollectionError> {
86 let state = load_collection_state(Arc::clone(&store), &definition, root).await?;
87 Ok(Self::with_state_and_options(
88 store, definition, state, options,
89 ))
90 }
91
92 pub fn snapshot(&self) -> CollectionState {
93 self.state.clone()
94 }
95
96 pub fn state(&self) -> &CollectionState {
97 &self.state
98 }
99
100 pub async fn write_root(&self) -> Result<Option<Cid>, CollectionError> {
101 let mut entries = Vec::new();
102 if let Some(cid) = self.state.by_id_root.as_ref() {
103 entries.push(DirEntry::from_cid(MANIFEST_BY_ID, cid).with_link_type(LinkType::Dir));
104 }
105 for index in self.definition.key_indexes() {
106 if let Some(cid) = self.state.key_root(index.name()) {
107 entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
108 }
109 }
110 for index in self.definition.search_indexes() {
111 if let Some(cid) = self.state.search_root(index.name()) {
112 entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
113 }
114 }
115 if let Some((cid, size)) =
116 write_collection_manifest_metadata(&self.tree, &self.definition).await?
117 {
118 entries.push(
119 DirEntry::from_cid(COLLECTION_MANIFEST_METADATA_FILE, &cid)
120 .with_size(size)
121 .with_link_type(LinkType::File),
122 );
123 }
124
125 if entries.is_empty() {
126 return Ok(None);
127 }
128
129 Ok(Some(self.tree.put_directory(entries).await?))
130 }
131
132 fn read_search_root_group(&self, root_name: &str) -> Option<Cid> {
133 for index in self.definition.search_indexes() {
134 if index.root_name().unwrap_or(index.name()) == root_name {
135 return self.state.search_root(index.name()).cloned();
136 }
137 }
138 None
139 }
140
141 fn assign_search_root_groups(&mut self, groups: &BTreeMap<String, Option<Cid>>) {
142 for index in self.definition.search_indexes() {
143 let root_name = index.root_name().unwrap_or(index.name());
144 if let Some(root) = groups.get(root_name) {
145 self.state
146 .search_roots
147 .insert(index.name().to_string(), root.clone());
148 }
149 }
150 }
151}
152
153impl<S: Store, T: Clone> CollectionWriter<S, T> {
154 pub fn normalize(&self, item: &T) -> Result<T, CollectionError> {
155 normalize_typed_collection_item(&self.definition, item)
156 }
157
158 pub async fn put(
159 &mut self,
160 item: &T,
161 cid: &Cid,
162 previous: Option<&T>,
163 ) -> Result<CollectionState, CollectionError> {
164 self.put_with_context(item, cid, previous, None, None).await
165 }
166
167 pub async fn put_with_context(
168 &mut self,
169 item: &T,
170 cid: &Cid,
171 previous: Option<&T>,
172 context: Option<&CollectionWriteContext>,
173 previous_context: Option<&CollectionWriteContext>,
174 ) -> Result<CollectionState, CollectionError> {
175 let next_item = self.normalize(item)?;
176 let previous_item = previous
177 .map(|previous| self.normalize(previous))
178 .transpose()?;
179
180 if let Some(previous_item) = previous_item.as_ref() {
181 self.delete_normalized_with_context(previous_item, previous_context.or(context))
182 .await?;
183 }
184
185 self.put_normalized_with_context(&next_item, cid, context)
186 .await
187 }
188
189 pub async fn delete(&mut self, item: &T) -> Result<CollectionState, CollectionError> {
190 self.delete_with_context(item, None).await
191 }
192
193 pub async fn delete_with_context(
194 &mut self,
195 item: &T,
196 context: Option<&CollectionWriteContext>,
197 ) -> Result<CollectionState, CollectionError> {
198 let next_item = self.normalize(item)?;
199 self.delete_normalized_with_context(&next_item, context)
200 .await
201 }
202
203 pub async fn rebuild<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
204 where
205 I: IntoIterator<Item = (T, Cid)>,
206 {
207 let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
208 for (item, cid) in entries {
209 let next_item = self.normalize(&item)?;
210 let id = self.definition.item_id(&next_item)?;
211 final_entries.insert(id, (next_item, cid, None));
212 }
213
214 self.rebuild_from_final_entries(final_entries).await
215 }
216
217 pub async fn reindex<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
218 where
219 I: IntoIterator<Item = (T, Cid)>,
220 {
221 self.rebuild(entries).await
222 }
223
224 pub async fn rebuild_with_context<I>(
225 &mut self,
226 entries: I,
227 ) -> Result<CollectionState, CollectionError>
228 where
229 I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
230 {
231 let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
232 for (item, cid, context) in entries {
233 let next_item = self.normalize(&item)?;
234 let id = self.definition.item_id(&next_item)?;
235 final_entries.insert(id, (next_item, cid, context));
236 }
237
238 self.rebuild_from_final_entries(final_entries).await
239 }
240
241 pub async fn reindex_with_context<I>(
242 &mut self,
243 entries: I,
244 ) -> Result<CollectionState, CollectionError>
245 where
246 I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
247 {
248 self.rebuild_with_context(entries).await
249 }
250
251 async fn put_normalized_with_context(
252 &mut self,
253 item: &T,
254 cid: &Cid,
255 context: Option<&CollectionWriteContext>,
256 ) -> Result<CollectionState, CollectionError> {
257 let id = self.definition.item_id(item)?;
258 self.state.by_id_root = Some(
259 self.index
260 .insert_link(self.state.by_id_root.as_ref(), &id, cid)
261 .await?,
262 );
263
264 for index in self.definition.key_indexes() {
265 let mut root = self.state.key_root(index.name()).cloned();
266 for key in index.materialize_keys(item) {
267 root = Some(self.index.insert_link(root.as_ref(), &key, cid).await?);
268 }
269 self.state.key_roots.insert(index.name().to_string(), root);
270 }
271
272 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
273 let entry_context = CollectionEntryContext {
274 id: &id,
275 cid: Some(cid),
276 write_context: context,
277 };
278 for index in self.definition.search_indexes() {
279 let Some(search_index) = self.search_indexes.get(index.name()) else {
280 continue;
281 };
282
283 let root_name = index.root_name().unwrap_or(index.name()).to_string();
284 let mut root = search_root_groups
285 .get(&root_name)
286 .cloned()
287 .unwrap_or_else(|| self.read_search_root_group(&root_name));
288
289 for entry in index.materialize_entries(item, &entry_context) {
290 let terms = search_index.parse_keywords(&entry.text);
291 if terms.is_empty() {
292 continue;
293 }
294 let entry_id = entry.id.as_deref().unwrap_or(&id);
295 let Some(entry_cid) = entry.cid.as_ref().or(Some(cid)) else {
296 continue;
297 };
298 let default_prefix = default_search_prefix(index.name());
299 root = Some(
300 search_index
301 .index_link(
302 root.as_ref(),
303 entry
304 .prefix
305 .as_deref()
306 .or_else(|| index.prefix())
307 .unwrap_or(&default_prefix),
308 &terms,
309 entry_id,
310 entry_cid,
311 )
312 .await?,
313 );
314 }
315
316 search_root_groups.insert(root_name, root);
317 }
318
319 if !search_root_groups.is_empty() {
320 self.assign_search_root_groups(&search_root_groups);
321 }
322
323 Ok(self.snapshot())
324 }
325
326 async fn delete_normalized_with_context(
327 &mut self,
328 item: &T,
329 context: Option<&CollectionWriteContext>,
330 ) -> Result<CollectionState, CollectionError> {
331 let id = self.definition.item_id(item)?;
332 if let Some(root) = self.state.by_id_root.as_ref() {
333 self.state.by_id_root = self.index.delete(root, &id).await?;
334 }
335
336 for index in self.definition.key_indexes() {
337 let mut root = self.state.key_root(index.name()).cloned();
338 for key in index.materialize_keys(item) {
339 let Some(active_root) = root.as_ref() else {
340 break;
341 };
342 root = self.index.delete(active_root, &key).await?;
343 }
344 self.state.key_roots.insert(index.name().to_string(), root);
345 }
346
347 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
348 let entry_context = CollectionEntryContext {
349 id: &id,
350 cid: None,
351 write_context: context,
352 };
353 for index in self.definition.search_indexes() {
354 let Some(search_index) = self.search_indexes.get(index.name()) else {
355 continue;
356 };
357
358 let root_name = index.root_name().unwrap_or(index.name()).to_string();
359 let mut root = search_root_groups
360 .get(&root_name)
361 .cloned()
362 .unwrap_or_else(|| self.read_search_root_group(&root_name));
363 let Some(existing_root) = root.clone() else {
364 continue;
365 };
366 root = Some(existing_root);
367
368 for entry in index.materialize_entries(item, &entry_context) {
369 let terms = search_index.parse_keywords(&entry.text);
370 if terms.is_empty() {
371 continue;
372 }
373 let entry_id = entry.id.as_deref().unwrap_or(&id);
374 let Some(active_root) = root.as_ref() else {
375 break;
376 };
377 let default_prefix = default_search_prefix(index.name());
378 root = search_index
379 .remove_link(
380 active_root,
381 entry
382 .prefix
383 .as_deref()
384 .or_else(|| index.prefix())
385 .unwrap_or(&default_prefix),
386 &terms,
387 entry_id,
388 )
389 .await?;
390 }
391
392 search_root_groups.insert(root_name, root);
393 }
394
395 if !search_root_groups.is_empty() {
396 self.assign_search_root_groups(&search_root_groups);
397 } else {
398 for index in self.definition.search_indexes() {
399 let root_name = index.root_name().unwrap_or(index.name()).to_string();
400 self.state.search_roots.insert(
401 index.name().to_string(),
402 self.read_search_root_group(&root_name),
403 );
404 }
405 }
406
407 Ok(self.snapshot())
408 }
409
410 async fn rebuild_without_search(
411 &mut self,
412 final_entries: BTreeMap<String, (T, Cid, Option<CollectionWriteContext>)>,
413 ) -> Result<CollectionState, CollectionError> {
414 let mut by_id = BTreeMap::<String, Cid>::new();
415 let mut key_roots = self
416 .definition
417 .key_indexes()
418 .iter()
419 .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
420 .collect::<BTreeMap<_, _>>();
421
422 for (id, (item, cid, _context)) in final_entries {
423 by_id.insert(id, cid.clone());
424 for index in self.definition.key_indexes() {
425 let root = key_roots
426 .get_mut(index.name())
427 .expect("collection key root must exist");
428 for key in index.materialize_keys(&item) {
429 root.insert(key, cid.clone());
430 }
431 }
432 }
433
434 self.state = create_empty_collection_state(&self.definition);
435 self.state.by_id_root = self.index.build_links(by_id).await?;
436 for index in self.definition.key_indexes() {
437 let root = self
438 .index
439 .build_links(key_roots.remove(index.name()).unwrap_or_default())
440 .await?;
441 self.state.key_roots.insert(index.name().to_string(), root);
442 }
443
444 Ok(self.snapshot())
445 }
446
447 async fn rebuild_from_final_entries(
448 &mut self,
449 final_entries: BTreeMap<String, (T, Cid, Option<CollectionWriteContext>)>,
450 ) -> Result<CollectionState, CollectionError> {
451 if self.definition.search_indexes().is_empty() {
452 return self.rebuild_without_search(final_entries).await;
453 }
454
455 let mut by_id = BTreeMap::<String, Cid>::new();
456 let mut key_roots = self
457 .definition
458 .key_indexes()
459 .iter()
460 .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
461 .collect::<BTreeMap<_, _>>();
462 let mut search_root_entries = BTreeMap::<String, BTreeMap<String, Cid>>::new();
463
464 for (id, (item, cid, context)) in final_entries {
465 by_id.insert(id.clone(), cid.clone());
466
467 for index in self.definition.key_indexes() {
468 let root = key_roots
469 .get_mut(index.name())
470 .expect("collection key root must exist");
471 for key in index.materialize_keys(&item) {
472 root.insert(key, cid.clone());
473 }
474 }
475
476 let entry_context = CollectionEntryContext {
477 id: &id,
478 cid: Some(&cid),
479 write_context: context.as_ref(),
480 };
481
482 for index in self.definition.search_indexes() {
483 let Some(search_index) = self.search_indexes.get(index.name()) else {
484 continue;
485 };
486
487 let root_name = index.root_name().unwrap_or(index.name()).to_string();
488 let root_entries = search_root_entries.entry(root_name).or_default();
489
490 for entry in index.materialize_entries(&item, &entry_context) {
491 let terms = search_index.parse_keywords(&entry.text);
492 if terms.is_empty() {
493 continue;
494 }
495
496 let entry_id = entry.id.as_deref().unwrap_or(&id);
497 let Some(entry_cid) = entry.cid.as_ref().or(Some(&cid)) else {
498 continue;
499 };
500 let default_prefix = default_search_prefix(index.name());
501 let prefix = entry
502 .prefix
503 .as_deref()
504 .or_else(|| index.prefix())
505 .unwrap_or(&default_prefix);
506
507 for term in terms {
508 root_entries
509 .insert(format!("{prefix}{term}:{entry_id}"), entry_cid.clone());
510 }
511 }
512 }
513 }
514
515 self.state = create_empty_collection_state(&self.definition);
516 self.state.by_id_root = self.index.build_links(by_id).await?;
517 for index in self.definition.key_indexes() {
518 let root = self
519 .index
520 .build_links(key_roots.remove(index.name()).unwrap_or_default())
521 .await?;
522 self.state.key_roots.insert(index.name().to_string(), root);
523 }
524
525 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
526 for (root_name, root_entries) in search_root_entries {
527 let Some(search_index) = self.definition.search_indexes().iter().find_map(|index| {
528 ((index.root_name().unwrap_or(index.name())) == root_name)
529 .then(|| self.search_indexes.get(index.name()))
530 .flatten()
531 }) else {
532 continue;
533 };
534
535 search_root_groups.insert(root_name, search_index.build_links(root_entries).await?);
536 }
537
538 if !search_root_groups.is_empty() {
539 self.assign_search_root_groups(&search_root_groups);
540 }
541
542 Ok(self.snapshot())
543 }
544}