1use super::*;
2
3impl UnifiedStore {
4 pub(crate) fn persist_entities_to_pager(
5 &self,
6 collection: &str,
7 entities: &[UnifiedEntity],
8 ) -> Result<(), StoreError> {
9 self.persist_entities_impl(collection, entities, false)
10 }
11
12 pub(crate) fn persist_entity_refs_to_pager(
16 &self,
17 collection: &str,
18 entities: &[&UnifiedEntity],
19 ) -> Result<(), StoreError> {
20 self.persist_entity_refs_impl(collection, entities, false)
21 }
22
23 pub(crate) fn persist_entity_refs_to_pager_wal_only(
26 &self,
27 collection: &str,
28 entities: &[&UnifiedEntity],
29 ) -> Result<(), StoreError> {
30 self.persist_entity_refs_impl(collection, entities, true)
31 }
32
33 fn persist_entity_refs_impl(
34 &self,
35 collection: &str,
36 entities: &[&UnifiedEntity],
37 skip_btree: bool,
38 ) -> Result<(), StoreError> {
39 if entities.is_empty() {
40 return Ok(());
41 }
42 let Some(pager) = &self.pager else {
43 return Ok(());
44 };
45 let fv = self.format_version();
46 let manager = self
47 .get_collection(collection)
48 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
49 let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
50 .iter()
51 .map(|entity| {
52 let metadata = manager.get_metadata(entity.id);
53 (
54 entity.id.raw().to_be_bytes().to_vec(),
55 Self::serialize_entity_record(entity, metadata.as_ref(), fv),
56 )
57 })
58 .collect();
59 serialized.sort_by(|a, b| a.0.cmp(&b.0));
60
61 if !skip_btree {
62 let mut btree_indices = self.btree_indices.write();
63 let btree = btree_indices
64 .entry(collection.to_string())
65 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
66 let root_before = btree.root_page_id();
67 btree.upsert_batch_sorted(&serialized).map_err(|e| {
68 StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
69 })?;
70 let root_after = btree.root_page_id();
71 drop(btree_indices);
72 if root_before != root_after {
73 self.mark_paged_registry_dirty();
74 }
75 }
76 let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
77 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
78 collection: collection.to_string(),
79 records,
80 }])?;
81 Ok(())
82 }
83
84 pub(crate) fn persist_entities_to_pager_wal_only(
96 &self,
97 collection: &str,
98 entities: &[UnifiedEntity],
99 ) -> Result<(), StoreError> {
100 self.persist_entities_impl(collection, entities, true)
101 }
102
103 fn persist_entities_impl(
104 &self,
105 collection: &str,
106 entities: &[UnifiedEntity],
107 skip_btree: bool,
108 ) -> Result<(), StoreError> {
109 if entities.is_empty() {
110 return Ok(());
111 }
112
113 let Some(pager) = &self.pager else {
114 return Ok(());
115 };
116
117 let fv = self.format_version();
118 let manager = self
119 .get_collection(collection)
120 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
121 let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
122 .iter()
123 .map(|entity| {
124 let metadata = manager.get_metadata(entity.id);
125 (
126 entity.id.raw().to_be_bytes().to_vec(),
127 Self::serialize_entity_record(entity, metadata.as_ref(), fv),
128 )
129 })
130 .collect();
131 serialized.sort_by(|a, b| a.0.cmp(&b.0));
133
134 if !skip_btree {
135 let mut btree_indices = self.btree_indices.write();
136 let btree = btree_indices
137 .entry(collection.to_string())
138 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
139 let root_before = btree.root_page_id();
140
141 btree.upsert_batch_sorted(&serialized).map_err(|e| {
145 StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
146 })?;
147 let root_after = btree.root_page_id();
148 drop(btree_indices);
149 if root_before != root_after {
150 self.mark_paged_registry_dirty();
151 }
152 }
153 let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
163 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
164 collection: collection.to_string(),
165 records,
166 }])?;
167
168 Ok(())
169 }
170
171 pub(crate) fn update_graph_label_index(
173 &self,
174 collection: &str,
175 label: &str,
176 entity_id: EntityId,
177 ) {
178 let key = (collection.to_string(), label.to_string());
179 let mut idx = self.graph_label_index.write();
180 idx.entry(key).or_default().push(entity_id);
181 }
182
183 pub(crate) fn remove_from_graph_label_index(&self, collection: &str, entity_id: EntityId) {
185 let mut idx = self.graph_label_index.write();
186 for ((col, _), ids) in idx.iter_mut() {
187 if col == collection {
188 ids.retain(|&id| id != entity_id);
189 }
190 }
191 idx.retain(|_, ids| !ids.is_empty());
193 }
194
195 pub(crate) fn remove_from_graph_label_index_batch(
196 &self,
197 collection: &str,
198 entity_ids: &[EntityId],
199 ) {
200 if entity_ids.is_empty() {
201 return;
202 }
203 let id_set: std::collections::HashSet<EntityId> = entity_ids.iter().copied().collect();
204 let mut idx = self.graph_label_index.write();
205 for ((col, _), ids) in idx.iter_mut() {
206 if col == collection {
207 ids.retain(|id| !id_set.contains(id));
208 }
209 }
210 idx.retain(|_, ids| !ids.is_empty());
211 }
212
213 pub fn lookup_graph_nodes_by_label(&self, label: &str) -> Vec<EntityId> {
215 let idx = self.graph_label_index.read();
216 idx.iter()
217 .filter(|((_, l), _)| l == label)
218 .flat_map(|(_, ids)| ids.iter().copied())
219 .collect()
220 }
221
222 pub fn create_collection(&self, name: impl Into<String>) -> Result<(), StoreError> {
223 let name = name.into();
224 let mut collections = self.collections.write();
225
226 if collections.contains_key(&name) {
227 return Err(StoreError::CollectionExists(name));
228 }
229
230 let manager = SegmentManager::with_config(&name, self.config.manager_config.clone());
231 collections.insert(name.clone(), Arc::new(manager));
232 drop(collections);
233 self.mark_paged_registry_dirty();
234 self.finish_paged_write([StoreWalAction::CreateCollection { name }])?;
235
236 Ok(())
237 }
238
239 pub fn get_or_create_collection(&self, name: impl Into<String>) -> Arc<SegmentManager> {
241 let name = name.into();
242 {
244 let collections = self.collections.read();
245 if let Some(manager) = collections.get(&name) {
246 return Arc::clone(manager);
247 }
248 }
249 let mut collections = self.collections.write();
251 if let Some(manager) = collections.get(&name) {
253 return Arc::clone(manager);
254 }
255 let manager = Arc::new(SegmentManager::with_config(
256 &name,
257 self.config.manager_config.clone(),
258 ));
259 collections.insert(name, Arc::clone(&manager));
260 self.mark_paged_registry_dirty();
261 manager
262 }
263
264 pub fn get_collection(&self, name: &str) -> Option<Arc<SegmentManager>> {
266 self.collections.read().get(name).map(Arc::clone)
267 }
268
269 pub fn context_index(&self) -> &ContextIndex {
271 &self.context_index
272 }
273
274 pub fn set_config_tree(&self, prefix: &str, json: &crate::serde_json::Value) -> usize {
277 let _ = self.get_or_create_collection("red_config");
278 let mut pairs = Vec::new();
279 flatten_config_json(prefix, json, &mut pairs);
280 let mut saved = 0;
281 for (key, value) in pairs {
282 let entity = UnifiedEntity::new(
283 EntityId::new(0),
284 EntityKind::TableRow {
285 table: Arc::from("red_config"),
286 row_id: 0,
287 },
288 EntityData::Row(RowData {
289 columns: Vec::new(),
290 named: Some(
291 [
292 ("key".to_string(), crate::storage::schema::Value::text(key)),
293 ("value".to_string(), value),
294 ]
295 .into_iter()
296 .collect(),
297 ),
298 schema: None,
299 }),
300 );
301 if self.insert_auto("red_config", entity).is_ok() {
302 saved += 1;
303 }
304 }
305 saved
306 }
307
308 pub fn get_config(&self, key: &str) -> Option<crate::storage::schema::Value> {
310 let manager = self.get_collection("red_config")?;
311 for entity in manager.query_all(|_| true) {
312 if let EntityData::Row(row) = &entity.data {
313 if let Some(named) = &row.named {
314 let key_matches = named
315 .get("key")
316 .and_then(|v| match v {
317 crate::storage::schema::Value::Text(s) => Some(s.as_ref() == key),
318 _ => None,
319 })
320 .unwrap_or(false);
321 if key_matches {
322 return named.get("value").cloned();
323 }
324 }
325 }
326 }
327 None
328 }
329
330 pub fn list_collections(&self) -> Vec<String> {
332 self.collections.read().keys().cloned().collect()
333 }
334
335 pub fn drop_collection(&self, name: &str) -> Result<(), StoreError> {
337 let manager = {
338 let mut collections = self.collections.write();
339
340 collections
341 .remove(name)
342 .ok_or_else(|| StoreError::CollectionNotFound(name.to_string()))?
343 };
344
345 let entities = manager.query_all(|_| true);
346 let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
347
348 for entity_id in &entity_ids {
349 self.context_index.remove_entity(*entity_id);
350 let _ = self.unindex_cross_refs(*entity_id);
351 }
352
353 self.btree_indices.write().remove(name);
354
355 self.entity_cache.retain(|entity_id, (collection, _)| {
356 collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
357 });
358
359 self.cross_refs.write().retain(|source_id, refs| {
360 refs.retain(|(target_id, _, target_collection)| {
361 target_collection != name && !entity_ids.iter().any(|id| id == target_id)
362 });
363 !entity_ids.iter().any(|id| id == source_id)
364 });
365
366 self.reverse_refs.write().retain(|target_id, refs| {
367 refs.retain(|(source_id, _, source_collection)| {
368 source_collection != name && !entity_ids.iter().any(|id| id == source_id)
369 });
370 !entity_ids.iter().any(|id| id == target_id)
371 });
372
373 self.mark_paged_registry_dirty();
374 self.finish_paged_write([StoreWalAction::DropCollection {
375 name: name.to_string(),
376 }])?;
377
378 Ok(())
379 }
380
381 pub fn insert(&self, collection: &str, entity: UnifiedEntity) -> Result<EntityId, StoreError> {
383 let manager = self
384 .get_collection(collection)
385 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
386
387 let mut entity = entity;
388 if entity.id.raw() == 0 {
389 entity.id = self.next_entity_id();
390 } else {
391 self.register_entity_id(entity.id);
392 }
393 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
395 if *row_id == 0 {
396 *row_id = manager.next_row_id();
397 } else {
398 manager.register_row_id(*row_id);
399 }
400 }
401 let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
403 {
404 Some(node.label.clone())
405 } else {
406 None
407 };
408
409 let id = manager.insert(entity)?;
410 self.register_entity_id(id);
411
412 if let Some(ref label) = graph_node_label {
414 self.update_graph_label_index(collection, label, id);
415 }
416
417 let mut registry_dirty = false;
419 if let Some(pager) = &self.pager {
420 if let Some(entity) = manager.get(id) {
421 let mut btree_indices = self.btree_indices.write();
422 let btree = btree_indices
423 .entry(collection.to_string())
424 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
425 let root_before = btree.root_page_id();
426
427 let key = id.raw().to_be_bytes();
428 let metadata = manager.get_metadata(id);
429 let value = Self::serialize_entity_record(
430 &entity,
431 metadata.as_ref(),
432 self.format_version(),
433 );
434 let _ = btree.insert(&key, &value);
436 registry_dirty = root_before != btree.root_page_id();
437 }
438 }
439
440 if self.config.auto_index_refs {
442 if let Some(entity) = manager.get(id) {
443 self.index_cross_refs(&entity, collection)?;
444 }
445 }
446
447 if self.pager.is_some() {
451 let actions = manager
452 .get(id)
453 .map(|entity| {
454 let metadata = manager.get_metadata(id);
455 vec![StoreWalAction::upsert_entity(
456 collection,
457 &entity,
458 metadata.as_ref(),
459 self.format_version(),
460 )]
461 })
462 .unwrap_or_default();
463 if registry_dirty {
464 self.mark_paged_registry_dirty();
465 }
466 self.finish_paged_write(actions)?;
467 }
468
469 Ok(id)
470 }
471
472 pub fn bulk_insert(
477 &self,
478 collection: &str,
479 mut entities: Vec<UnifiedEntity>,
480 ) -> Result<Vec<EntityId>, StoreError> {
481 let trace = matches!(
485 std::env::var("REDDB_BULK_TIMING").ok().as_deref(),
486 Some("1") | Some("true") | Some("on")
487 );
488 let t_start = std::time::Instant::now();
489 let n = entities.len();
490 let manager = self.get_or_create_collection(collection);
491 let t_get_coll = t_start.elapsed();
492
493 let t0 = std::time::Instant::now();
502 let n_missing_entity_ids = entities.iter().filter(|e| e.id.raw() == 0).count() as u64;
503 let n_missing_row_ids = entities
504 .iter()
505 .filter(|e| matches!(e.kind, EntityKind::TableRow { row_id: 0, .. }))
506 .count() as u64;
507 let mut entity_id_range = if n_missing_entity_ids > 0 {
508 self.reserve_entity_ids(n_missing_entity_ids)
509 } else {
510 0..0
511 };
512 let mut row_id_range = if n_missing_row_ids > 0 {
513 manager.reserve_row_ids(n_missing_row_ids)
514 } else {
515 0..0
516 };
517 for entity in &mut entities {
518 if entity.id.raw() == 0 {
519 let next = entity_id_range
520 .next()
521 .expect("reserved entity-id range exhausted");
522 entity.id = EntityId::new(next);
523 } else {
524 self.register_entity_id(entity.id);
525 }
526 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
527 if *row_id == 0 {
528 *row_id = row_id_range
529 .next()
530 .expect("reserved row-id range exhausted");
531 } else {
532 manager.register_row_id(*row_id);
533 }
534 }
535 }
536 let t_assign_ids = t0.elapsed();
537
538 let graph_labels: Vec<Option<(String, EntityId)>> = entities
540 .iter()
541 .map(|e| {
542 if let EntityKind::GraphNode(ref node) = e.kind {
543 Some((node.label.clone(), e.id))
544 } else {
545 None
546 }
547 })
548 .collect();
549
550 let t0 = std::time::Instant::now();
557 let serialized: Option<Vec<(Vec<u8>, Vec<u8>)>> = if self.pager.is_some() {
558 let fv = self.format_version();
559 let serial_map = |e: &UnifiedEntity| {
560 (
561 e.id.raw().to_be_bytes().to_vec(),
562 Self::serialize_entity_record(e, None, fv),
563 )
564 };
565 if entities.len() >= 512 {
571 use rayon::prelude::*;
572 Some(entities.par_iter().map(serial_map).collect())
573 } else {
574 Some(entities.iter().map(serial_map).collect())
575 }
576 } else {
577 None
578 };
579 let t_serialize = t0.elapsed();
580
581 let t0 = std::time::Instant::now();
583 let ids = manager.bulk_insert(entities)?;
584 let t_manager = t0.elapsed();
585 for id in &ids {
586 self.register_entity_id(*id);
587 }
588
589 for (label, entity_id) in graph_labels.iter().flatten() {
591 self.update_graph_label_index(collection, label, *entity_id);
592 }
593
594 let skip_btree_requested = matches!(
604 std::env::var("REDDB_BULK_SKIP_PERSIST_UNSAFE")
605 .ok()
606 .as_deref(),
607 Some("1") | Some("true") | Some("on")
608 );
609 let skip_btree = skip_btree_requested && self.pager.is_none();
612 if skip_btree_requested && !skip_btree {
613 static IGNORED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
615 IGNORED.get_or_init(|| {
616 tracing::warn!(
617 "REDDB_BULK_SKIP_PERSIST_UNSAFE set but durable pager is \
618 active — flag ignored; bulk inserts will be persisted normally"
619 );
620 });
621 } else if skip_btree {
622 static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
624 WARNED.get_or_init(|| {
625 tracing::warn!(
626 "REDDB_BULK_SKIP_PERSIST_UNSAFE set (ephemeral/no-pager mode) — \
627 bulk inserts NOT durable; data will be lost on restart"
628 );
629 });
630 }
631
632 let mut t_btree_lock = std::time::Duration::ZERO;
636 let mut t_btree_insert = std::time::Duration::ZERO;
637 let mut t_flush = std::time::Duration::ZERO;
638 if !skip_btree {
639 if let (Some(pager), Some(batch)) = (&self.pager, serialized.as_ref()) {
640 let t0 = std::time::Instant::now();
641 let mut btree_indices = self.btree_indices.write();
642 let btree = btree_indices
643 .entry(collection.to_string())
644 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
645 let root_before = btree.root_page_id();
646 t_btree_lock = t0.elapsed();
647
648 let t0 = std::time::Instant::now();
649 let _ = btree.bulk_insert_sorted(batch);
650 t_btree_insert = t0.elapsed();
651 let registry_dirty = root_before != btree.root_page_id();
652
653 let t0 = std::time::Instant::now();
654 if registry_dirty {
655 self.mark_paged_registry_dirty();
656 }
657 t_flush = t0.elapsed();
658 }
659 }
660
661 let actions = serialized
667 .map(|batch| {
668 let records: Vec<Vec<u8>> =
669 batch.into_iter().map(|(_key, record)| record).collect();
670 vec![StoreWalAction::BulkUpsertEntityRecords {
671 collection: collection.to_string(),
672 records,
673 }]
674 })
675 .unwrap_or_default();
676 self.finish_paged_write(actions)?;
677
678 if trace {
679 tracing::debug!(
680 n,
681 total = ?t_start.elapsed(),
682 get_coll = ?t_get_coll,
683 assign = ?t_assign_ids,
684 serialize = ?t_serialize,
685 manager = ?t_manager,
686 btree_lock = ?t_btree_lock,
687 btree = ?t_btree_insert,
688 flush = ?t_flush,
689 "bulk_insert timing"
690 );
691 }
692
693 Ok(ids)
694 }
695
696 pub fn insert_auto(
698 &self,
699 collection: &str,
700 entity: UnifiedEntity,
701 ) -> Result<EntityId, StoreError> {
702 let manager = self.get_or_create_collection(collection);
703 let mut entity = entity;
704 if entity.id.raw() == 0 {
705 entity.id = self.next_entity_id();
706 } else {
707 self.register_entity_id(entity.id);
708 }
709 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
711 if *row_id == 0 {
712 *row_id = manager.next_row_id();
713 } else {
714 manager.register_row_id(*row_id);
715 }
716 }
717
718 let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
720 {
721 Some(node.label.clone())
722 } else {
723 None
724 };
725 self.context_index.index_entity(collection, &entity);
727
728 let id_for_serialize = entity.id;
743 let serialized_record: Option<Vec<u8>> = if self.pager.is_some() {
744 Some(Self::serialize_entity_record(
745 &entity,
746 None,
747 self.format_version(),
748 ))
749 } else {
750 None
751 };
752 if self.config.auto_index_refs {
753 self.index_cross_refs(&entity, collection)?;
754 }
755
756 let id = manager.insert(entity)?;
757 debug_assert_eq!(id, id_for_serialize);
758 if let Some(ref label) = graph_node_label {
766 self.update_graph_label_index(collection, label, id);
767 }
768
769 let mut registry_dirty = false;
770 if let (Some(_pager), Some(record)) = (&self.pager, serialized_record.as_ref()) {
771 if let Some(btree) = self.get_or_create_btree(collection) {
772 let root_before = btree.root_page_id();
773
774 let key = id.raw().to_be_bytes();
775 btree.insert(&key, record).map_err(|e| {
776 StoreError::Io(std::io::Error::other(format!(
777 "B-tree insert error while inserting '{collection}'/{id}: {e}"
778 )))
779 })?;
780 registry_dirty = root_before != btree.root_page_id();
781 }
782 }
783
784 if self.pager.is_some() {
788 let actions = serialized_record
789 .map(|record| {
790 vec![StoreWalAction::UpsertEntityRecord {
791 collection: collection.to_string(),
792 record,
793 }]
794 })
795 .unwrap_or_default();
796 if registry_dirty {
797 self.mark_paged_registry_dirty();
798 }
799 self.finish_paged_write(actions)?;
800 }
801 Ok(id)
802 }
803
804 pub fn get(&self, collection: &str, id: EntityId) -> Option<UnifiedEntity> {
810 if let Some(entity) = self
812 .get_collection(collection)
813 .and_then(|manager| manager.get(id))
814 {
815 return Some(entity);
816 }
817
818 if self.pager.is_some() {
820 let btree_indices = self.btree_indices.read();
821 if let Some(btree) = btree_indices.get(collection) {
822 let key = id.raw().to_be_bytes();
823 if let Ok(Some(value)) = btree.get(&key) {
824 if let Ok((entity, _)) =
825 Self::deserialize_entity_record(&value, self.format_version())
826 {
827 return Some(entity);
828 }
829 }
830 }
831 }
832
833 None
834 }
835
836 pub fn get_batch(&self, collection: &str, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
842 match self.get_collection(collection) {
843 Some(manager) => manager.get_many(ids),
844 None => vec![None; ids.len()],
845 }
846 }
847
848 pub fn get_any(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
850 if let Some(cached) = self.entity_cache.get(id.raw()) {
854 return Some(cached);
855 }
856
857 let collections = self.collections.read();
859 for (name, manager) in collections.iter() {
860 if let Some(entity) = manager.get(id) {
861 let result = (name.clone(), entity);
862 drop(collections);
866 self.entity_cache.insert(id.raw(), result.clone());
867 return Some(result);
868 }
869 }
870 None
871 }
872
873 pub fn entity_cache_hit_rate(&self) -> Option<f64> {
879 self.entity_cache.hit_rate()
880 }
881
882 pub fn entity_cache_stats(&self) -> super::super::entity_cache::EntityCacheStats {
884 self.entity_cache.stats()
885 }
886
887 pub fn delete(&self, collection: &str, id: EntityId) -> Result<bool, StoreError> {
889 self.entity_cache.remove(id.raw());
891 let manager = self
892 .get_collection(collection)
893 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
894
895 let deleted = manager.delete(id)?;
896 if !deleted {
897 return Ok(false);
898 }
899
900 let mut registry_dirty = false;
902 if self.pager.is_some() {
903 let btree_indices = self.btree_indices.read();
904 if let Some(btree) = btree_indices.get(collection) {
905 let root_before = btree.root_page_id();
906 let key = id.raw().to_be_bytes();
907 let _ = btree.delete(&key);
908 registry_dirty = root_before != btree.root_page_id();
909 }
910 }
911
912 self.unindex_cross_refs(id)?;
914
915 self.remove_from_graph_label_index(collection, id);
917
918 if registry_dirty {
919 self.mark_paged_registry_dirty();
920 }
921 self.finish_paged_write([StoreWalAction::DeleteEntityRecord {
922 collection: collection.to_string(),
923 entity_id: id.raw(),
924 }])?;
925
926 Ok(true)
927 }
928
929 pub fn delete_batch(
930 &self,
931 collection: &str,
932 ids: &[EntityId],
933 ) -> Result<Vec<EntityId>, StoreError> {
934 if ids.is_empty() {
935 return Ok(Vec::new());
936 }
937
938 self.entity_cache.remove_many(ids.iter().map(|id| id.raw()));
943
944 let manager = self
945 .get_collection(collection)
946 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
947
948 let deleted_ids = manager.delete_batch(ids)?;
949 if deleted_ids.is_empty() {
950 return Ok(deleted_ids);
951 }
952
953 let mut registry_dirty = false;
954 if self.pager.is_some() {
955 let btree_indices = self.btree_indices.read();
956 if let Some(btree) = btree_indices.get(collection) {
957 let root_before = btree.root_page_id();
958 for id in &deleted_ids {
959 let key = id.raw().to_be_bytes();
960 let _ = btree.delete(&key);
961 }
962 registry_dirty = root_before != btree.root_page_id();
963 }
964 }
965
966 self.unindex_cross_refs_batch(&deleted_ids)?;
967 self.remove_from_graph_label_index_batch(collection, &deleted_ids);
968 if registry_dirty {
969 self.mark_paged_registry_dirty();
970 }
971 let actions = deleted_ids
972 .iter()
973 .map(|id| StoreWalAction::DeleteEntityRecord {
974 collection: collection.to_string(),
975 entity_id: id.raw(),
976 })
977 .collect::<Vec<_>>();
978 self.finish_paged_write(actions)?;
979
980 Ok(deleted_ids)
981 }
982
983 pub fn set_metadata(
985 &self,
986 collection: &str,
987 id: EntityId,
988 metadata: Metadata,
989 ) -> Result<(), StoreError> {
990 let manager = self
991 .get_collection(collection)
992 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
993
994 manager.set_metadata(id, metadata)?;
995 if let Some(entity) = manager.get(id) {
996 self.persist_entities_to_pager(collection, std::slice::from_ref(&entity))?;
997 }
998 Ok(())
999 }
1000
1001 pub fn get_metadata(&self, collection: &str, id: EntityId) -> Option<Metadata> {
1003 self.get_collection(collection)?.get_metadata(id)
1004 }
1005
1006 pub fn add_cross_ref(
1008 &self,
1009 source_collection: &str,
1010 source_id: EntityId,
1011 target_collection: &str,
1012 target_id: EntityId,
1013 ref_type: RefType,
1014 weight: f32,
1015 ) -> Result<(), StoreError> {
1016 let source_manager = self
1018 .get_collection(source_collection)
1019 .ok_or_else(|| StoreError::CollectionNotFound(source_collection.to_string()))?;
1020
1021 if source_manager.get(source_id).is_none() {
1022 return Err(StoreError::EntityNotFound(source_id));
1023 }
1024
1025 let target_manager = self
1027 .get_collection(target_collection)
1028 .ok_or_else(|| StoreError::CollectionNotFound(target_collection.to_string()))?;
1029
1030 if target_manager.get(target_id).is_none() {
1031 return Err(StoreError::EntityNotFound(target_id));
1032 }
1033
1034 let current_refs = self
1036 .cross_refs
1037 .read()
1038 .get(&source_id)
1039 .map_or(0, |v| v.len());
1040
1041 if current_refs >= self.config.max_cross_refs {
1042 return Err(StoreError::TooManyRefs(source_id));
1043 }
1044
1045 let mut registry_dirty = false;
1046 {
1047 let mut forward = self.cross_refs.write();
1048 let refs = forward.entry(source_id).or_default();
1049 let inserted = !refs.iter().any(|(id, kind, coll)| {
1050 *id == target_id && *kind == ref_type && coll == target_collection
1051 });
1052 if inserted {
1053 refs.push((target_id, ref_type, target_collection.to_string()));
1054 registry_dirty = true;
1055 }
1056 }
1057
1058 {
1059 let mut reverse = self.reverse_refs.write();
1060 let refs = reverse.entry(target_id).or_default();
1061 let inserted = !refs.iter().any(|(id, kind, coll)| {
1062 *id == source_id && *kind == ref_type && coll == source_collection
1063 });
1064 if inserted {
1065 refs.push((source_id, ref_type, source_collection.to_string()));
1066 registry_dirty = true;
1067 }
1068 }
1069
1070 if let Some(mut entity) = source_manager.get(source_id) {
1071 if !entity.cross_refs().iter().any(|xref| {
1072 xref.target == target_id
1073 && xref.ref_type == ref_type
1074 && xref.target_collection == target_collection
1075 }) {
1076 let cross_ref = CrossRef::with_weight(
1077 source_id,
1078 target_id,
1079 target_collection,
1080 ref_type,
1081 weight,
1082 );
1083 entity.add_cross_ref(cross_ref);
1084 let _ = source_manager.update(entity.clone());
1085 registry_dirty = true;
1086 self.persist_entities_to_pager(source_collection, std::slice::from_ref(&entity))?;
1087 }
1088 }
1089
1090 if registry_dirty {
1091 self.mark_paged_registry_dirty();
1092 if matches!(
1093 self.config.durability_mode,
1094 crate::api::DurabilityMode::Strict
1095 ) {
1096 self.flush_paged_state()?;
1097 }
1098 }
1099
1100 Ok(())
1101 }
1102
1103 pub fn get_refs_from(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1105 self.cross_refs.read().get(&id).cloned().unwrap_or_default()
1106 }
1107
1108 pub fn get_refs_to(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1110 self.reverse_refs
1111 .read()
1112 .get(&id)
1113 .cloned()
1114 .unwrap_or_default()
1115 }
1116
1117 pub fn expand_refs(
1119 &self,
1120 id: EntityId,
1121 depth: u32,
1122 ref_types: Option<&[RefType]>,
1123 ) -> Vec<(UnifiedEntity, u32, RefType)> {
1124 let mut results = Vec::new();
1125 let mut visited = std::collections::HashSet::new();
1126 visited.insert(id);
1127
1128 self.expand_refs_recursive(id, depth, ref_types, &mut visited, &mut results, 1);
1129
1130 results
1131 }
1132
1133 fn expand_refs_recursive(
1134 &self,
1135 id: EntityId,
1136 max_depth: u32,
1137 ref_types: Option<&[RefType]>,
1138 visited: &mut std::collections::HashSet<EntityId>,
1139 results: &mut Vec<(UnifiedEntity, u32, RefType)>,
1140 current_depth: u32,
1141 ) {
1142 if current_depth > max_depth {
1143 return;
1144 }
1145
1146 for (target_id, ref_type, target_collection) in self.get_refs_from(id) {
1147 if visited.contains(&target_id) {
1148 continue;
1149 }
1150
1151 if let Some(types) = ref_types {
1152 if !types.contains(&ref_type) {
1153 continue;
1154 }
1155 }
1156
1157 visited.insert(target_id);
1158
1159 if let Some(entity) = self.get(&target_collection, target_id) {
1160 results.push((entity, current_depth, ref_type));
1161
1162 self.expand_refs_recursive(
1164 target_id,
1165 max_depth,
1166 ref_types,
1167 visited,
1168 results,
1169 current_depth + 1,
1170 );
1171 }
1172 }
1173 }
1174
1175 pub(crate) fn index_cross_refs(
1177 &self,
1178 entity: &UnifiedEntity,
1179 collection: &str,
1180 ) -> Result<(), StoreError> {
1181 let mut registry_dirty = false;
1182 for cross_ref in entity.cross_refs() {
1183 if cross_ref.target_collection.is_empty() {
1184 continue;
1185 }
1186 {
1187 let mut forward = self.cross_refs.write();
1188 let refs = forward.entry(cross_ref.source).or_default();
1189 let inserted = !refs.iter().any(|(id, kind, coll)| {
1190 *id == cross_ref.target
1191 && *kind == cross_ref.ref_type
1192 && coll == &cross_ref.target_collection
1193 });
1194 if inserted {
1195 refs.push((
1196 cross_ref.target,
1197 cross_ref.ref_type,
1198 cross_ref.target_collection.clone(),
1199 ));
1200 registry_dirty = true;
1201 }
1202 }
1203
1204 {
1205 let mut reverse = self.reverse_refs.write();
1206 let refs = reverse.entry(cross_ref.target).or_default();
1207 let inserted = !refs.iter().any(|(id, kind, coll)| {
1208 *id == cross_ref.source && *kind == cross_ref.ref_type && coll == collection
1209 });
1210 if inserted {
1211 refs.push((cross_ref.source, cross_ref.ref_type, collection.to_string()));
1212 registry_dirty = true;
1213 }
1214 }
1215 }
1216
1217 if registry_dirty {
1218 self.mark_paged_registry_dirty();
1219 }
1220
1221 Ok(())
1222 }
1223
1224 pub(crate) fn unindex_cross_refs(&self, id: EntityId) -> Result<(), StoreError> {
1226 self.cross_refs.write().remove(&id);
1228
1229 let mut reverse = self.reverse_refs.write();
1231 for refs in reverse.values_mut() {
1232 refs.retain(|(source, _, _)| *source != id);
1233 }
1234 reverse.remove(&id);
1235 self.mark_paged_registry_dirty();
1236
1237 Ok(())
1238 }
1239
1240 pub(crate) fn unindex_cross_refs_batch(&self, ids: &[EntityId]) -> Result<(), StoreError> {
1241 if ids.is_empty() {
1242 return Ok(());
1243 }
1244
1245 let id_set: std::collections::HashSet<EntityId> = ids.iter().copied().collect();
1246
1247 let needs_forward_cleanup = {
1261 let forward = self.cross_refs.read();
1262 id_set.iter().any(|id| forward.contains_key(id))
1263 };
1264 let needs_reverse_cleanup = {
1265 let reverse = self.reverse_refs.read();
1266 id_set.iter().any(|id| reverse.contains_key(id))
1267 };
1268
1269 if !needs_forward_cleanup && !needs_reverse_cleanup {
1270 self.unindex_cross_refs_fast_path
1271 .fetch_add(1, Ordering::Relaxed);
1272 return Ok(());
1273 }
1274
1275 if needs_forward_cleanup {
1276 let mut forward = self.cross_refs.write();
1277 for id in &id_set {
1278 forward.remove(id);
1279 }
1280 }
1281
1282 if needs_reverse_cleanup || needs_forward_cleanup {
1283 let mut reverse = self.reverse_refs.write();
1287 if needs_forward_cleanup {
1288 for refs in reverse.values_mut() {
1289 refs.retain(|(source, _, _)| !id_set.contains(source));
1290 }
1291 }
1292 reverse.retain(|target, refs| !id_set.contains(target) && !refs.is_empty());
1293 }
1294 self.mark_paged_registry_dirty();
1295
1296 Ok(())
1297 }
1298
1299 pub fn unindex_cross_refs_fast_path_hits(&self) -> u64 {
1303 self.unindex_cross_refs_fast_path.load(Ordering::Relaxed)
1304 }
1305
1306 pub fn query_all<F>(&self, filter: F) -> Vec<(String, UnifiedEntity)>
1308 where
1309 F: Fn(&UnifiedEntity) -> bool + Clone + Send + Sync,
1310 {
1311 let collections = self.collections.read();
1312 let pairs: Vec<_> = collections.iter().collect();
1313
1314 let use_parallel = pairs.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1315 if !use_parallel {
1316 return pairs
1318 .into_iter()
1319 .flat_map(|(name, mgr)| {
1320 mgr.query_all(filter.clone())
1321 .into_iter()
1322 .map(move |e| (name.clone(), e))
1323 })
1324 .collect();
1325 }
1326
1327 let filter_ref = &filter;
1329 let collection_results: Vec<Vec<(String, UnifiedEntity)>> = std::thread::scope(|s| {
1330 pairs
1331 .iter()
1332 .map(|(name, manager)| {
1333 let name = (*name).clone();
1334 s.spawn(move || {
1335 manager
1336 .query_all(|e| filter_ref(e))
1337 .into_iter()
1338 .map(|e| (name.clone(), e))
1339 .collect::<Vec<_>>()
1340 })
1341 })
1342 .collect::<Vec<_>>()
1343 .into_iter()
1344 .map(|h| h.join().unwrap_or_default())
1345 .collect()
1346 });
1347
1348 collection_results.into_iter().flatten().collect()
1349 }
1350
1351 pub fn filter_metadata_all(
1353 &self,
1354 filters: &[(String, MetadataFilter)],
1355 ) -> Vec<(String, EntityId)> {
1356 let mut results = Vec::new();
1357 let collections = self.collections.read();
1358
1359 for (name, manager) in collections.iter() {
1360 for id in manager.filter_metadata(filters) {
1361 results.push((name.clone(), id));
1362 }
1363 }
1364
1365 results
1366 }
1367
1368 pub fn stats(&self) -> StoreStats {
1370 let collections = self.collections.read();
1371
1372 let mut stats = StoreStats {
1373 collection_count: collections.len(),
1374 ..Default::default()
1375 };
1376
1377 for (name, manager) in collections.iter() {
1378 let manager_stats = manager.stats();
1379 stats.total_entities += manager_stats.total_entities;
1380 stats.total_memory_bytes += manager_stats.total_memory_bytes;
1381 stats.collections.insert(name.clone(), manager_stats);
1382 }
1383
1384 stats
1385 }
1386
1387 pub fn run_maintenance(&self) -> Result<(), StoreError> {
1389 let collections = self.collections.read();
1390 for manager in collections.values() {
1391 manager.run_maintenance()?;
1392 }
1393 Ok(())
1394 }
1395}
1396
1397fn flatten_config_json(
1399 prefix: &str,
1400 value: &crate::serde_json::Value,
1401 out: &mut Vec<(String, crate::storage::schema::Value)>,
1402) {
1403 use crate::storage::schema::Value;
1404 match value {
1405 crate::serde_json::Value::Object(map) => {
1406 for (k, v) in map {
1407 let key = if prefix.is_empty() {
1408 k.clone()
1409 } else {
1410 format!("{prefix}.{k}")
1411 };
1412 flatten_config_json(&key, v, out);
1413 }
1414 }
1415 crate::serde_json::Value::String(s) => {
1416 out.push((prefix.to_string(), Value::text(s.clone())));
1417 }
1418 crate::serde_json::Value::Number(n) => {
1419 if n.fract().abs() < f64::EPSILON {
1420 out.push((prefix.to_string(), Value::UnsignedInteger(*n as u64)));
1421 } else {
1422 out.push((prefix.to_string(), Value::Float(*n)));
1423 }
1424 }
1425 crate::serde_json::Value::Bool(b) => {
1426 out.push((prefix.to_string(), Value::Boolean(*b)));
1427 }
1428 crate::serde_json::Value::Null => {
1429 out.push((prefix.to_string(), Value::Null));
1430 }
1431 crate::serde_json::Value::Array(arr) => {
1432 let json_str = crate::serde_json::to_string(value).unwrap_or_default();
1433 out.push((prefix.to_string(), Value::text(json_str)));
1434 }
1435 }
1436}