1use std::collections::BTreeMap;
2use std::sync::Arc;
3
4use hashtree_core::{Cid, DirEntry, HashTree, HashTreeConfig, LinkType, Store};
5use hashtree_index::{BTree, BTreeOptions, SearchIndex};
6
7use crate::helpers::merge_search_index_options;
8use crate::schema::normalize_typed_collection_item;
9use crate::{
10 create_empty_collection_state, default_search_prefix, load_collection_state,
11 CollectionDefinition, CollectionEntryContext, CollectionError, CollectionOptions,
12 CollectionState, CollectionWriteContext, MANIFEST_BY_ID,
13};
14
15pub struct CollectionWriter<S: Store, T> {
16 tree: HashTree<S>,
17 index: BTree<S>,
18 search_indexes: BTreeMap<String, SearchIndex<S>>,
19 definition: CollectionDefinition<T>,
20 state: CollectionState,
21}
22
23impl<S: Store, T> CollectionWriter<S, T> {
24 pub fn new(store: Arc<S>, definition: CollectionDefinition<T>) -> Self {
25 Self::with_options(store, definition, CollectionOptions::default())
26 }
27
28 pub fn with_options(
29 store: Arc<S>,
30 definition: CollectionDefinition<T>,
31 options: CollectionOptions,
32 ) -> Self {
33 let state = create_empty_collection_state(&definition);
34 Self::with_state_and_options(store, definition, state, options)
35 }
36
37 pub fn with_state(
38 store: Arc<S>,
39 definition: CollectionDefinition<T>,
40 state: CollectionState,
41 ) -> Self {
42 Self::with_state_and_options(store, definition, state, CollectionOptions::default())
43 }
44
45 pub fn with_state_and_options(
46 store: Arc<S>,
47 definition: CollectionDefinition<T>,
48 state: CollectionState,
49 options: CollectionOptions,
50 ) -> Self {
51 let search_indexes = definition
52 .search_indexes()
53 .iter()
54 .map(|index| {
55 (
56 index.name().to_string(),
57 SearchIndex::new(
58 Arc::clone(&store),
59 merge_search_index_options(index.options().clone(), &options),
60 ),
61 )
62 })
63 .collect();
64
65 Self {
66 tree: HashTree::new(HashTreeConfig::new(Arc::clone(&store))),
67 index: BTree::new(
68 store,
69 BTreeOptions {
70 order: options.btree_order,
71 },
72 ),
73 search_indexes,
74 definition,
75 state,
76 }
77 }
78
79 pub async fn from_root(
80 store: Arc<S>,
81 definition: CollectionDefinition<T>,
82 root: Option<&Cid>,
83 options: CollectionOptions,
84 ) -> Result<Self, CollectionError> {
85 let state = load_collection_state(Arc::clone(&store), &definition, root).await?;
86 Ok(Self::with_state_and_options(
87 store, definition, state, options,
88 ))
89 }
90
91 pub fn snapshot(&self) -> CollectionState {
92 self.state.clone()
93 }
94
95 pub fn state(&self) -> &CollectionState {
96 &self.state
97 }
98
99 pub async fn write_root(&self) -> Result<Option<Cid>, CollectionError> {
100 let mut entries = Vec::new();
101 if let Some(cid) = self.state.by_id_root.as_ref() {
102 entries.push(DirEntry::from_cid(MANIFEST_BY_ID, cid).with_link_type(LinkType::Dir));
103 }
104 for index in self.definition.key_indexes() {
105 if let Some(cid) = self.state.key_root(index.name()) {
106 entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
107 }
108 }
109 for index in self.definition.search_indexes() {
110 if let Some(cid) = self.state.search_root(index.name()) {
111 entries.push(DirEntry::from_cid(index.name(), cid).with_link_type(LinkType::Dir));
112 }
113 }
114
115 if entries.is_empty() {
116 return Ok(None);
117 }
118
119 Ok(Some(self.tree.put_directory(entries).await?))
120 }
121
122 fn read_search_root_group(&self, root_name: &str) -> Option<Cid> {
123 for index in self.definition.search_indexes() {
124 if index.root_name().unwrap_or(index.name()) == root_name {
125 return self.state.search_root(index.name()).cloned();
126 }
127 }
128 None
129 }
130
131 fn assign_search_root_groups(&mut self, groups: &BTreeMap<String, Option<Cid>>) {
132 for index in self.definition.search_indexes() {
133 let root_name = index.root_name().unwrap_or(index.name());
134 if let Some(root) = groups.get(root_name) {
135 self.state
136 .search_roots
137 .insert(index.name().to_string(), root.clone());
138 }
139 }
140 }
141}
142
143impl<S: Store, T: Clone> CollectionWriter<S, T> {
144 pub fn normalize(&self, item: &T) -> Result<T, CollectionError> {
145 normalize_typed_collection_item(&self.definition, item)
146 }
147
148 pub async fn put(
149 &mut self,
150 item: &T,
151 cid: &Cid,
152 previous: Option<&T>,
153 ) -> Result<CollectionState, CollectionError> {
154 self.put_with_context(item, cid, previous, None, None).await
155 }
156
157 pub async fn put_with_context(
158 &mut self,
159 item: &T,
160 cid: &Cid,
161 previous: Option<&T>,
162 context: Option<&CollectionWriteContext>,
163 previous_context: Option<&CollectionWriteContext>,
164 ) -> Result<CollectionState, CollectionError> {
165 let next_item = self.normalize(item)?;
166 let previous_item = previous
167 .map(|previous| self.normalize(previous))
168 .transpose()?;
169
170 if let Some(previous_item) = previous_item.as_ref() {
171 self.delete_normalized_with_context(previous_item, previous_context.or(context))
172 .await?;
173 }
174
175 self.put_normalized_with_context(&next_item, cid, context)
176 .await
177 }
178
179 pub async fn delete(&mut self, item: &T) -> Result<CollectionState, CollectionError> {
180 self.delete_with_context(item, None).await
181 }
182
183 pub async fn delete_with_context(
184 &mut self,
185 item: &T,
186 context: Option<&CollectionWriteContext>,
187 ) -> Result<CollectionState, CollectionError> {
188 let next_item = self.normalize(item)?;
189 self.delete_normalized_with_context(&next_item, context)
190 .await
191 }
192
193 pub async fn rebuild<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
194 where
195 I: IntoIterator<Item = (T, Cid)>,
196 {
197 let mut final_entries = BTreeMap::<String, (T, Cid)>::new();
198 for (item, cid) in entries {
199 let next_item = self.normalize(&item)?;
200 let id = self.definition.item_id(&next_item)?;
201 final_entries.insert(id, (next_item, cid));
202 }
203
204 if self.definition.search_indexes().is_empty() {
205 return self.rebuild_without_search(final_entries).await;
206 }
207
208 self.state = create_empty_collection_state(&self.definition);
209 for (_id, (item, cid)) in final_entries {
210 self.put_normalized_with_context(&item, &cid, None).await?;
211 }
212 Ok(self.snapshot())
213 }
214
215 pub async fn reindex<I>(&mut self, entries: I) -> Result<CollectionState, CollectionError>
216 where
217 I: IntoIterator<Item = (T, Cid)>,
218 {
219 self.rebuild(entries).await
220 }
221
222 pub async fn rebuild_with_context<I>(
223 &mut self,
224 entries: I,
225 ) -> Result<CollectionState, CollectionError>
226 where
227 I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
228 {
229 let mut final_entries = BTreeMap::<String, (T, Cid, Option<CollectionWriteContext>)>::new();
230 for (item, cid, context) in entries {
231 let next_item = self.normalize(&item)?;
232 let id = self.definition.item_id(&next_item)?;
233 final_entries.insert(id, (next_item, cid, context));
234 }
235
236 self.state = create_empty_collection_state(&self.definition);
237 for (_id, (item, cid, context)) in final_entries {
238 self.put_normalized_with_context(&item, &cid, context.as_ref())
239 .await?;
240 }
241 Ok(self.snapshot())
242 }
243
244 pub async fn reindex_with_context<I>(
245 &mut self,
246 entries: I,
247 ) -> Result<CollectionState, CollectionError>
248 where
249 I: IntoIterator<Item = (T, Cid, Option<CollectionWriteContext>)>,
250 {
251 self.rebuild_with_context(entries).await
252 }
253
254 async fn put_normalized_with_context(
255 &mut self,
256 item: &T,
257 cid: &Cid,
258 context: Option<&CollectionWriteContext>,
259 ) -> Result<CollectionState, CollectionError> {
260 let id = self.definition.item_id(item)?;
261 self.state.by_id_root = Some(
262 self.index
263 .insert_link(self.state.by_id_root.as_ref(), &id, cid)
264 .await?,
265 );
266
267 for index in self.definition.key_indexes() {
268 let mut root = self.state.key_root(index.name()).cloned();
269 for key in index.materialize_keys(item) {
270 root = Some(self.index.insert_link(root.as_ref(), &key, cid).await?);
271 }
272 self.state.key_roots.insert(index.name().to_string(), root);
273 }
274
275 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
276 let entry_context = CollectionEntryContext {
277 id: &id,
278 cid: Some(cid),
279 write_context: context,
280 };
281 for index in self.definition.search_indexes() {
282 let Some(search_index) = self.search_indexes.get(index.name()) else {
283 continue;
284 };
285
286 let root_name = index.root_name().unwrap_or(index.name()).to_string();
287 let mut root = search_root_groups
288 .get(&root_name)
289 .cloned()
290 .unwrap_or_else(|| self.read_search_root_group(&root_name));
291
292 for entry in index.materialize_entries(item, &entry_context) {
293 let terms = search_index.parse_keywords(&entry.text);
294 if terms.is_empty() {
295 continue;
296 }
297 let entry_id = entry.id.as_deref().unwrap_or(&id);
298 let Some(entry_cid) = entry.cid.as_ref().or(Some(cid)) else {
299 continue;
300 };
301 let default_prefix = default_search_prefix(index.name());
302 root = Some(
303 search_index
304 .index_link(
305 root.as_ref(),
306 entry
307 .prefix
308 .as_deref()
309 .or_else(|| index.prefix())
310 .unwrap_or(&default_prefix),
311 &terms,
312 entry_id,
313 entry_cid,
314 )
315 .await?,
316 );
317 }
318
319 search_root_groups.insert(root_name, root);
320 }
321
322 if !search_root_groups.is_empty() {
323 self.assign_search_root_groups(&search_root_groups);
324 }
325
326 Ok(self.snapshot())
327 }
328
329 async fn delete_normalized_with_context(
330 &mut self,
331 item: &T,
332 context: Option<&CollectionWriteContext>,
333 ) -> Result<CollectionState, CollectionError> {
334 let id = self.definition.item_id(item)?;
335 if let Some(root) = self.state.by_id_root.as_ref() {
336 self.state.by_id_root = self.index.delete(root, &id).await?;
337 }
338
339 for index in self.definition.key_indexes() {
340 let mut root = self.state.key_root(index.name()).cloned();
341 for key in index.materialize_keys(item) {
342 let Some(active_root) = root.as_ref() else {
343 break;
344 };
345 root = self.index.delete(active_root, &key).await?;
346 }
347 self.state.key_roots.insert(index.name().to_string(), root);
348 }
349
350 let mut search_root_groups = BTreeMap::<String, Option<Cid>>::new();
351 let entry_context = CollectionEntryContext {
352 id: &id,
353 cid: None,
354 write_context: context,
355 };
356 for index in self.definition.search_indexes() {
357 let Some(search_index) = self.search_indexes.get(index.name()) else {
358 continue;
359 };
360
361 let root_name = index.root_name().unwrap_or(index.name()).to_string();
362 let mut root = search_root_groups
363 .get(&root_name)
364 .cloned()
365 .unwrap_or_else(|| self.read_search_root_group(&root_name));
366 let Some(existing_root) = root.clone() else {
367 continue;
368 };
369 root = Some(existing_root);
370
371 for entry in index.materialize_entries(item, &entry_context) {
372 let terms = search_index.parse_keywords(&entry.text);
373 if terms.is_empty() {
374 continue;
375 }
376 let entry_id = entry.id.as_deref().unwrap_or(&id);
377 let Some(active_root) = root.as_ref() else {
378 break;
379 };
380 let default_prefix = default_search_prefix(index.name());
381 root = search_index
382 .remove_link(
383 active_root,
384 entry
385 .prefix
386 .as_deref()
387 .or_else(|| index.prefix())
388 .unwrap_or(&default_prefix),
389 &terms,
390 entry_id,
391 )
392 .await?;
393 }
394
395 search_root_groups.insert(root_name, root);
396 }
397
398 if !search_root_groups.is_empty() {
399 self.assign_search_root_groups(&search_root_groups);
400 } else {
401 for index in self.definition.search_indexes() {
402 let root_name = index.root_name().unwrap_or(index.name()).to_string();
403 self.state.search_roots.insert(
404 index.name().to_string(),
405 self.read_search_root_group(&root_name),
406 );
407 }
408 }
409
410 Ok(self.snapshot())
411 }
412
413 async fn rebuild_without_search(
414 &mut self,
415 final_entries: BTreeMap<String, (T, Cid)>,
416 ) -> Result<CollectionState, CollectionError> {
417 let mut by_id = BTreeMap::<String, Cid>::new();
418 let mut key_roots = self
419 .definition
420 .key_indexes()
421 .iter()
422 .map(|index| (index.name().to_string(), BTreeMap::<String, Cid>::new()))
423 .collect::<BTreeMap<_, _>>();
424
425 for (id, (item, cid)) in final_entries {
426 by_id.insert(id, cid.clone());
427 for index in self.definition.key_indexes() {
428 let root = key_roots
429 .get_mut(index.name())
430 .expect("collection key root must exist");
431 for key in index.materialize_keys(&item) {
432 root.insert(key, cid.clone());
433 }
434 }
435 }
436
437 self.state = create_empty_collection_state(&self.definition);
438 self.state.by_id_root = self.index.build_links(by_id).await?;
439 for index in self.definition.key_indexes() {
440 let root = self
441 .index
442 .build_links(key_roots.remove(index.name()).unwrap_or_default())
443 .await?;
444 self.state.key_roots.insert(index.name().to_string(), root);
445 }
446
447 Ok(self.snapshot())
448 }
449}