1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use hashtree_core::{Cid, DirEntry, HashTree, HashTreeConfig, LinkType, Store};
5use hashtree_index::{BTree, BTreeOptions, SearchIndex};
6
7use crate::helpers::merge_search_index_options;
8use crate::manifest::write_collection_manifest_metadata;
9use crate::schema::normalize_typed_collection_item;
10use crate::{
11 create_empty_collection_state, default_search_prefix, load_collection_state,
12 CollectionDefinition, CollectionEntryContext, CollectionError, CollectionOptions,
13 CollectionState, CollectionWriteContext, COLLECTION_MANIFEST_METADATA_FILE, MANIFEST_BY_ID,
14};
15
16pub struct CollectionWriter<S: Store, T> {
17 tree: HashTree<S>,
18 index: BTree<S>,
19 search_indexes: BTreeMap<String, SearchIndex<S>>,
20 definition: CollectionDefinition<T>,
21 state: CollectionState,
22}
23
24impl<S: Store, T> CollectionWriter<S, T> {
25 pub fn new(store: Arc<S>, definition: CollectionDefinition<T>) -> Self {
26 Self::with_options(store, definition, CollectionOptions::default())
27 }
28
29 pub fn with_options(
30 store: Arc<S>,
31 definition: CollectionDefinition<T>,
32 options: CollectionOptions,
33 ) -> Self {
34 let state = create_empty_collection_state(&definition);
35 Self::with_state_and_options(store, definition, state, options)
36 }
37
38 pub fn with_state(
39 store: Arc<S>,
40 definition: CollectionDefinition<T>,
41 state: CollectionState,
42 ) -> Self {
43 Self::with_state_and_options(store, definition, state, CollectionOptions::default())
44 }
45
46 pub fn with_state_and_options(
47 store: Arc<S>,
48 definition: CollectionDefinition<T>,
49 state: CollectionState,
50 options: CollectionOptions,
51 ) -> Self {
52 let search_indexes = definition
53 .search_indexes()
54 .iter()
55 .map(|index| {
56 (
57 index.name().to_string(),
58 SearchIndex::new(
59 Arc::clone(&store),
60 merge_search_index_options(index.options().clone(), &options),
61 ),
62 )
63 })
64 .collect();
65
66 Self {
67 tree: HashTree::new(HashTreeConfig::new(Arc::clone(&store))),
68 index: BTree::new(
69 store,
70 BTreeOptions {
71 order: options.btree_order,
72 },
73 ),
74 search_indexes,
75 definition,
76 state,
77 }
78 }
79
80 pub async fn from_root(
81 store: Arc<S>,
82 definition: CollectionDefinition<T>,
83 root: Option<&Cid>,
84 options: CollectionOptions,
85 ) -> Result<Self, CollectionError> {
86 let state = load_collection_state(Arc::clone(&store), &definition, root).await?;
87 Ok(Self::with_state_and_options(
88 store, definition, state, options,
89 ))
90 }
91
92 pub fn snapshot(&self) -> CollectionState {
93 self.state.clone()
94 }
95
96 pub fn state(&self) -> &CollectionState {
97 &self.state
98 }
99
100 pub async fn write_root(&self) -> Result<Option<Cid>, CollectionError> {
101 let mut entries = Vec::new();
102 if let Some(cid) = self.state.by_id_root.as_ref() {
103 entries.push(DirEntry::from_cid(MANIFEST_BY_ID, cid).with_link_type(LinkType::Dir));
104 }
105 for index in self.definition.key_indexes() {
106 if let Some(cid) = self.state.key_root(index.name()) {
107 entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
108 }
109 }
110 for index in self.definition.search_indexes() {
111 if let Some(cid) = self.state.search_root(index.name()) {
112 entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
113 }
114 }
115 if let Some((cid, size)) =
116 write_collection_manifest_metadata(&self.tree, &self.definition).await?
117 {
118 entries.push(
119 DirEntry::from_cid(COLLECTION_MANIFEST_METADATA_FILE, &cid)
120 .with_size(size)
121 .with_link_type(LinkType::File),
122 );
123 }
124
125 if entries.is_empty() {
126 return Ok(None);
127 }
128
129 Ok(Some(self.tree.put_directory(entries).await?))
130 }
131
132 fn read_search_root_group(&self, root_name: &str) -> Option<Cid> {
133 for index in self.definition.search_indexes() {
134 if index.root_name().unwrap_or(index.name()) == root_name {
135 return self.state.search_root(index.name()).cloned();
136 }
137 }
138 None
139 }
140
141 fn assign_search_root_groups(&mut self, groups: &BTreeMap<String, Option<Cid>>) {
142 for index in self.definition.search_indexes() {
143 let root_name = index.root_name().unwrap_or(index.name());
144 if let Some(root) = groups.get(root_name) {
145 self.state
146 .search_roots
147 .insert(index.name().to_string(), root.clone());
148 }
149 }
150 }
151}
152
153impl<S: Store, T: Clone> CollectionWriter<S, T> {
154 pub fn normalize(&self, item: &T) -> Result<T, CollectionError> {
155 normalize_typed_collection_item(&self.definition, item)
156 }
157
158 pub async fn put(
159 &mut self,
160 item: &T,
161 cid: &Cid,
162 previous: Option<&T>,
163 ) -> Result<CollectionState, CollectionError> {
164 self.put_with_context(item, cid, previous, None, None).await
165 }
166
167 pub async fn put_with_context(
168 &mut self,
169 item: &T,
170 cid: &Cid,
171 previous: Option<&T>,
172 context: Option<&CollectionWriteContext>,
173 previous_context: Option<&CollectionWriteContext>,
174 ) -> Result<CollectionState, CollectionError> {
175 let next_item = self.normalize(item)?;
176 let previous_item = previous
177 .map(|previous| self.normalize(previous))
178 .transpose()?;
179
180 if let Some(previous_item) = previous_item.as_ref() {
181 self.delete_normalized_with_context(previous_item, previous_context.or(context))
182 .await?;
183 }
184
185 self.put_normalized_with_context(&next_item, cid, context)
186 .await
187 }
188
189 pub async fn delete(&mut self, item: &T) -> Result<CollectionState, CollectionError> {
190 self.delete_with_context(item, None).await
191 }
192
193 pub async fn delete_with_context(
194 &mut self,
195 item: &T,
196 context: Option<&CollectionWriteContext>,
197 ) -> Result<CollectionState, CollectionError> {
198 let next_item = self.normalize(item)?;
199 self.delete_normalized_with_context(&next_item, context)
200 .await
201 }
202
203 pub async fn rebuild<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
204 where
205 I: IntoIterator<Item = (T, Cid)>,
206 {
207 let mut final_entries = BTreeMap::<String, (T, Cid)>::new();
208 for (item, cid) in entries {
209 let next_item = self.normalize(&item)?;
210 let id = self.definition.item_id(&next_item)?;
211 final_entries.insert(id, (next_item, cid));
212 }
213
214 if self.definition.search_indexes().is_empty() {
215 return self.rebuild_without_search(final_entries).await;
216 }
217
218 self.state = create_empty_collection_state(&self.definition);
219 for (_id, (item, cid)) in final_entries {
220 self.put_normalized_with_context(&item, &cid, None).await?;
221 }
222 Ok(self.snapshot())
223 }
224
225 pub async fn reindex<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
226 where
227 I: IntoIterator<Item = (T, Cid)>,
228 {
229 self.rebuild(entries).await
230 }
231
232 pub async fn rebuild_with_context<I>(
233 &mut self,
234 entries: I,
235 ) -> Result<CollectionState, CollectionError>
236 where
237 I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
238 {
239 let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
240 for (item, cid, context) in entries {
241 let next_item = self.normalize(&item)?;
242 let id = self.definition.item_id(&next_item)?;
243 final_entries.insert(id, (next_item, cid, context));
244 }
245
246 self.state = create_empty_collection_state(&self.definition);
247 for (_id, (item, cid, context)) in final_entries {
248 self.put_normalized_with_context(&item, &cid, context.as_ref())
249 .await?;
250 }
251 Ok(self.snapshot())
252 }
253
254 pub async fn reindex_with_context<I>(
255 &mut self,
256 entries: I,
257 ) -> Result<CollectionState, CollectionError>
258 where
259 I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
260 {
261 self.rebuild_with_context(entries).await
262 }
263
264 async fn put_normalized_with_context(
265 &mut self,
266 item: &T,
267 cid: &Cid,
268 context: Option<&CollectionWriteContext>,
269 ) -> Result<CollectionState, CollectionError> {
270 let id = self.definition.item_id(item)?;
271 self.state.by_id_root = Some(
272 self.index
273 .insert_link(self.state.by_id_root.as_ref(), &id, cid)
274 .await?,
275 );
276
277 for index in self.definition.key_indexes() {
278 let mut root = self.state.key_root(index.name()).cloned();
279 for key in index.materialize_keys(item) {
280 root = Some(self.index.insert_link(root.as_ref(), &key, cid).await?);
281 }
282 self.state.key_roots.insert(index.name().to_string(), root);
283 }
284
285 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
286 let entry_context = CollectionEntryContext {
287 id: &id,
288 cid: Some(cid),
289 write_context: context,
290 };
291 for index in self.definition.search_indexes() {
292 let Some(search_index) = self.search_indexes.get(index.name()) else {
293 continue;
294 };
295
296 let root_name = index.root_name().unwrap_or(index.name()).to_string();
297 let mut root = search_root_groups
298 .get(&root_name)
299 .cloned()
300 .unwrap_or_else(|| self.read_search_root_group(&root_name));
301
302 for entry in index.materialize_entries(item, &entry_context) {
303 let terms = search_index.parse_keywords(&entry.text);
304 if terms.is_empty() {
305 continue;
306 }
307 let entry_id = entry.id.as_deref().unwrap_or(&id);
308 let Some(entry_cid) = entry.cid.as_ref().or(Some(cid)) else {
309 continue;
310 };
311 let default_prefix = default_search_prefix(index.name());
312 root = Some(
313 search_index
314 .index_link(
315 root.as_ref(),
316 entry
317 .prefix
318 .as_deref()
319 .or_else(|| index.prefix())
320 .unwrap_or(&default_prefix),
321 &terms,
322 entry_id,
323 entry_cid,
324 )
325 .await?,
326 );
327 }
328
329 search_root_groups.insert(root_name, root);
330 }
331
332 if !search_root_groups.is_empty() {
333 self.assign_search_root_groups(&search_root_groups);
334 }
335
336 Ok(self.snapshot())
337 }
338
339 async fn delete_normalized_with_context(
340 &mut self,
341 item: &T,
342 context: Option<&CollectionWriteContext>,
343 ) -> Result<CollectionState, CollectionError> {
344 let id = self.definition.item_id(item)?;
345 if let Some(root) = self.state.by_id_root.as_ref() {
346 self.state.by_id_root = self.index.delete(root, &id).await?;
347 }
348
349 for index in self.definition.key_indexes() {
350 let mut root = self.state.key_root(index.name()).cloned();
351 for key in index.materialize_keys(item) {
352 let Some(active_root) = root.as_ref() else {
353 break;
354 };
355 root = self.index.delete(active_root, &key).await?;
356 }
357 self.state.key_roots.insert(index.name().to_string(), root);
358 }
359
360 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
361 let entry_context = CollectionEntryContext {
362 id: &id,
363 cid: None,
364 write_context: context,
365 };
366 for index in self.definition.search_indexes() {
367 let Some(search_index) = self.search_indexes.get(index.name()) else {
368 continue;
369 };
370
371 let root_name = index.root_name().unwrap_or(index.name()).to_string();
372 let mut root = search_root_groups
373 .get(&root_name)
374 .cloned()
375 .unwrap_or_else(|| self.read_search_root_group(&root_name));
376 let Some(existing_root) = root.clone() else {
377 continue;
378 };
379 root = Some(existing_root);
380
381 for entry in index.materialize_entries(item, &entry_context) {
382 let terms = search_index.parse_keywords(&entry.text);
383 if terms.is_empty() {
384 continue;
385 }
386 let entry_id = entry.id.as_deref().unwrap_or(&id);
387 let Some(active_root) = root.as_ref() else {
388 break;
389 };
390 let default_prefix = default_search_prefix(index.name());
391 root = search_index
392 .remove_link(
393 active_root,
394 entry
395 .prefix
396 .as_deref()
397 .or_else(|| index.prefix())
398 .unwrap_or(&default_prefix),
399 &terms,
400 entry_id,
401 )
402 .await?;
403 }
404
405 search_root_groups.insert(root_name, root);
406 }
407
408 if !search_root_groups.is_empty() {
409 self.assign_search_root_groups(&search_root_groups);
410 } else {
411 for index in self.definition.search_indexes() {
412 let root_name = index.root_name().unwrap_or(index.name()).to_string();
413 self.state.search_roots.insert(
414 index.name().to_string(),
415 self.read_search_root_group(&root_name),
416 );
417 }
418 }
419
420 Ok(self.snapshot())
421 }
422
423 async fn rebuild_without_search(
424 &mut self,
425 final_entries: BTreeMap<String, (T, Cid)>,
426 ) -> Result<CollectionState, CollectionError> {
427 let mut by_id = BTreeMap::<String, Cid>::new();
428 let mut key_roots = self
429 .definition
430 .key_indexes()
431 .iter()
432 .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
433 .collect::<BTreeMap<_, _>>();
434
435 for (id, (item, cid)) in final_entries {
436 by_id.insert(id, cid.clone());
437 for index in self.definition.key_indexes() {
438 let root = key_roots
439 .get_mut(index.name())
440 .expect("collection key root must exist");
441 for key in index.materialize_keys(&item) {
442 root.insert(key, cid.clone());
443 }
444 }
445 }
446
447 self.state = create_empty_collection_state(&self.definition);
448 self.state.by_id_root = self.index.build_links(by_id).await?;
449 for index in self.definition.key_indexes() {
450 let root = self
451 .index
452 .build_links(key_roots.remove(index.name()).unwrap_or_default())
453 .await?;
454 self.state.key_roots.insert(index.name().to_string(), root);
455 }
456
457 Ok(self.snapshot())
458 }
459}