1use super::*;
2
3impl UnifiedStore {
4 pub(crate) fn persist_entities_to_pager(
5 &self,
6 collection: &str,
7 entities: &[UnifiedEntity],
8 ) -> Result<(), StoreError> {
9 self.persist_entities_impl(collection, entities, false)
10 }
11
12 pub(crate) fn persist_entity_refs_to_pager(
16 &self,
17 collection: &str,
18 entities: &[&UnifiedEntity],
19 ) -> Result<(), StoreError> {
20 self.persist_entity_refs_impl(collection, entities, false)
21 }
22
23 pub(crate) fn persist_entity_refs_to_pager_wal_only(
26 &self,
27 collection: &str,
28 entities: &[&UnifiedEntity],
29 ) -> Result<(), StoreError> {
30 self.persist_entity_refs_impl(collection, entities, true)
31 }
32
33 fn persist_entity_refs_impl(
34 &self,
35 collection: &str,
36 entities: &[&UnifiedEntity],
37 skip_btree: bool,
38 ) -> Result<(), StoreError> {
39 if entities.is_empty() {
40 return Ok(());
41 }
42 let Some(pager) = &self.pager else {
43 return Ok(());
44 };
45 let fv = self.format_version();
46 let manager = self
47 .get_collection(collection)
48 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
49 let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
50 .iter()
51 .map(|entity| {
52 let metadata = manager.get_metadata(entity.id);
53 (
54 entity.id.raw().to_be_bytes().to_vec(),
55 Self::serialize_entity_record(entity, metadata.as_ref(), fv),
56 )
57 })
58 .collect();
59 serialized.sort_by(|a, b| a.0.cmp(&b.0));
60
61 if !skip_btree {
62 let mut btree_indices = self.btree_indices.write();
63 let btree = btree_indices
64 .entry(collection.to_string())
65 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
66 let root_before = btree.root_page_id();
67 btree.upsert_batch_sorted(&serialized).map_err(|e| {
68 StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
69 })?;
70 let root_after = btree.root_page_id();
71 drop(btree_indices);
72 if root_before != root_after {
73 self.mark_paged_registry_dirty();
74 }
75 }
76 let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
77 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
78 collection: collection.to_string(),
79 records,
80 }])?;
81 Ok(())
82 }
83
84 pub(crate) fn persist_entities_to_pager_wal_only(
96 &self,
97 collection: &str,
98 entities: &[UnifiedEntity],
99 ) -> Result<(), StoreError> {
100 self.persist_entities_impl(collection, entities, true)
101 }
102
103 fn persist_entities_impl(
104 &self,
105 collection: &str,
106 entities: &[UnifiedEntity],
107 skip_btree: bool,
108 ) -> Result<(), StoreError> {
109 if entities.is_empty() {
110 return Ok(());
111 }
112
113 let Some(pager) = &self.pager else {
114 return Ok(());
115 };
116
117 let fv = self.format_version();
118 let manager = self
119 .get_collection(collection)
120 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
121 let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
122 .iter()
123 .map(|entity| {
124 let metadata = manager.get_metadata(entity.id);
125 (
126 entity.id.raw().to_be_bytes().to_vec(),
127 Self::serialize_entity_record(entity, metadata.as_ref(), fv),
128 )
129 })
130 .collect();
131 serialized.sort_by(|a, b| a.0.cmp(&b.0));
133
134 if !skip_btree {
135 let mut btree_indices = self.btree_indices.write();
136 let btree = btree_indices
137 .entry(collection.to_string())
138 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
139 let root_before = btree.root_page_id();
140
141 btree.upsert_batch_sorted(&serialized).map_err(|e| {
145 StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
146 })?;
147 let root_after = btree.root_page_id();
148 drop(btree_indices);
149 if root_before != root_after {
150 self.mark_paged_registry_dirty();
151 }
152 }
153 let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
163 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
164 collection: collection.to_string(),
165 records,
166 }])?;
167
168 Ok(())
169 }
170
171 pub(crate) fn update_graph_label_index(
173 &self,
174 collection: &str,
175 label: &str,
176 entity_id: EntityId,
177 ) {
178 let key = (collection.to_string(), label.to_string());
179 let mut idx = self.graph_label_index.write();
180 idx.entry(key).or_default().push(entity_id);
181 }
182
183 pub(crate) fn remove_from_graph_label_index(&self, collection: &str, entity_id: EntityId) {
185 let mut idx = self.graph_label_index.write();
186 for ((col, _), ids) in idx.iter_mut() {
187 if col == collection {
188 ids.retain(|&id| id != entity_id);
189 }
190 }
191 idx.retain(|_, ids| !ids.is_empty());
193 }
194
195 pub(crate) fn remove_from_graph_label_index_batch(
196 &self,
197 collection: &str,
198 entity_ids: &[EntityId],
199 ) {
200 if entity_ids.is_empty() {
201 return;
202 }
203 let id_set: std::collections::HashSet<EntityId> = entity_ids.iter().copied().collect();
204 let mut idx = self.graph_label_index.write();
205 for ((col, _), ids) in idx.iter_mut() {
206 if col == collection {
207 ids.retain(|id| !id_set.contains(id));
208 }
209 }
210 idx.retain(|_, ids| !ids.is_empty());
211 }
212
213 pub fn lookup_graph_nodes_by_label(&self, label: &str) -> Vec<EntityId> {
215 let idx = self.graph_label_index.read();
216 idx.iter()
217 .filter(|((_, l), _)| l == label)
218 .flat_map(|(_, ids)| ids.iter().copied())
219 .collect()
220 }
221
222 pub fn lookup_graph_nodes_by_label_in(&self, collection: &str, label: &str) -> Vec<EntityId> {
224 let idx = self.graph_label_index.read();
225 idx.get(&(collection.to_string(), label.to_string()))
226 .cloned()
227 .unwrap_or_default()
228 }
229
230 pub fn create_collection(&self, name: impl Into<String>) -> Result<(), StoreError> {
231 let name = name.into();
232 let mut collections = self.collections.write();
233
234 if collections.contains_key(&name) {
235 return Err(StoreError::CollectionExists(name));
236 }
237
238 let manager = SegmentManager::with_config(&name, self.config.manager_config.clone());
239 collections.insert(name.clone(), Arc::new(manager));
240 drop(collections);
241 self.mark_paged_registry_dirty();
242 self.finish_paged_write([StoreWalAction::CreateCollection { name }])?;
243
244 Ok(())
245 }
246
247 pub fn get_or_create_collection(&self, name: impl Into<String>) -> Arc<SegmentManager> {
249 let name = name.into();
250 {
252 let collections = self.collections.read();
253 if let Some(manager) = collections.get(&name) {
254 return Arc::clone(manager);
255 }
256 }
257 let mut collections = self.collections.write();
259 if let Some(manager) = collections.get(&name) {
261 return Arc::clone(manager);
262 }
263 let manager = Arc::new(SegmentManager::with_config(
264 &name,
265 self.config.manager_config.clone(),
266 ));
267 collections.insert(name, Arc::clone(&manager));
268 self.mark_paged_registry_dirty();
269 manager
270 }
271
272 pub fn get_collection(&self, name: &str) -> Option<Arc<SegmentManager>> {
274 self.collections.read().get(name).map(Arc::clone)
275 }
276
277 pub fn context_index(&self) -> &ContextIndex {
279 &self.context_index
280 }
281
282 pub fn take_replayed_turbo_inserts(&self, collection: &str) -> Option<Vec<(u64, Vec<f32>)>> {
288 let mut map = self.replayed_turbo_inserts.lock();
289 map.remove(collection)
290 }
291
292 pub fn set_config_tree(&self, prefix: &str, json: &crate::serde_json::Value) -> usize {
295 let _ = self.get_or_create_collection("red_config");
296 let mut pairs = Vec::new();
297 flatten_config_json(prefix, json, &mut pairs);
298 let mut saved = 0;
299 for (key, value) in pairs {
300 let entity = UnifiedEntity::new(
301 EntityId::new(0),
302 EntityKind::TableRow {
303 table: Arc::from("red_config"),
304 row_id: 0,
305 },
306 EntityData::Row(RowData {
307 columns: Vec::new(),
308 named: Some(
309 [
310 ("key".to_string(), crate::storage::schema::Value::text(key)),
311 ("value".to_string(), value),
312 ]
313 .into_iter()
314 .collect(),
315 ),
316 schema: None,
317 }),
318 );
319 if self.insert_auto("red_config", entity).is_ok() {
320 saved += 1;
321 }
322 }
323 saved
324 }
325
326 pub fn get_config(&self, key: &str) -> Option<crate::storage::schema::Value> {
328 let manager = self.get_collection("red_config")?;
329 for entity in manager.query_all(|_| true) {
330 if let EntityData::Row(row) = &entity.data {
331 if let Some(named) = &row.named {
332 let key_matches = named
333 .get("key")
334 .and_then(|v| match v {
335 crate::storage::schema::Value::Text(s) => Some(s.as_ref() == key),
336 _ => None,
337 })
338 .unwrap_or(false);
339 if key_matches {
340 return named.get("value").cloned();
341 }
342 }
343 }
344 }
345 None
346 }
347
348 pub fn list_collections(&self) -> Vec<String> {
350 self.collections.read().keys().cloned().collect()
351 }
352
353 pub fn drop_collection(&self, name: &str) -> Result<(), StoreError> {
355 let manager = {
356 let mut collections = self.collections.write();
357
358 collections
359 .remove(name)
360 .ok_or_else(|| StoreError::CollectionNotFound(name.to_string()))?
361 };
362
363 let entities = manager.query_all(|_| true);
364 let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
365
366 for entity_id in &entity_ids {
367 self.context_index.remove_entity(*entity_id);
368 let _ = self.unindex_cross_refs(*entity_id);
369 }
370
371 self.btree_indices.write().remove(name);
372
373 self.entity_cache.retain(|entity_id, (collection, _)| {
374 collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
375 });
376
377 self.cross_refs.write().retain(|source_id, refs| {
378 refs.retain(|(target_id, _, target_collection)| {
379 target_collection != name && !entity_ids.iter().any(|id| id == target_id)
380 });
381 !entity_ids.iter().any(|id| id == source_id)
382 });
383
384 self.reverse_refs.write().retain(|target_id, refs| {
385 refs.retain(|(source_id, _, source_collection)| {
386 source_collection != name && !entity_ids.iter().any(|id| id == source_id)
387 });
388 !entity_ids.iter().any(|id| id == target_id)
389 });
390
391 self.mark_paged_registry_dirty();
392 self.finish_paged_write([StoreWalAction::DropCollection {
393 name: name.to_string(),
394 }])?;
395
396 Ok(())
397 }
398
399 pub fn insert(&self, collection: &str, entity: UnifiedEntity) -> Result<EntityId, StoreError> {
401 let manager = self
402 .get_collection(collection)
403 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
404
405 let mut entity = entity;
406 if entity.id.raw() == 0 {
407 entity.id = self.next_entity_id();
408 } else {
409 self.register_entity_id(entity.id);
410 }
411 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
413 if *row_id == 0 {
414 *row_id = manager.next_row_id();
415 } else {
416 manager.register_row_id(*row_id);
417 }
418 }
419 entity.ensure_table_logical_id();
420 let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
422 {
423 Some(node.label.clone())
424 } else {
425 None
426 };
427
428 let id = manager.insert(entity)?;
429 self.register_entity_id(id);
430
431 if let Some(ref label) = graph_node_label {
433 self.update_graph_label_index(collection, label, id);
434 }
435
436 let mut registry_dirty = false;
438 if let Some(pager) = &self.pager {
439 if let Some(entity) = manager.get(id) {
440 let mut btree_indices = self.btree_indices.write();
441 let btree = btree_indices
442 .entry(collection.to_string())
443 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
444 let root_before = btree.root_page_id();
445
446 let key = id.raw().to_be_bytes();
447 let metadata = manager.get_metadata(id);
448 let value = Self::serialize_entity_record(
449 &entity,
450 metadata.as_ref(),
451 self.format_version(),
452 );
453 let _ = btree.insert(&key, &value);
455 registry_dirty = root_before != btree.root_page_id();
456 }
457 }
458
459 if self.config.auto_index_refs {
461 if let Some(entity) = manager.get(id) {
462 self.index_cross_refs(&entity, collection)?;
463 }
464 }
465
466 if self.pager.is_some() {
470 let actions = manager
471 .get(id)
472 .map(|entity| {
473 let metadata = manager.get_metadata(id);
474 vec![StoreWalAction::upsert_entity(
475 collection,
476 &entity,
477 metadata.as_ref(),
478 self.format_version(),
479 )]
480 })
481 .unwrap_or_default();
482 if registry_dirty {
483 self.mark_paged_registry_dirty();
484 }
485 self.finish_paged_write(actions)?;
486 }
487
488 Ok(id)
489 }
490
491 pub fn bulk_insert(
496 &self,
497 collection: &str,
498 mut entities: Vec<UnifiedEntity>,
499 ) -> Result<Vec<EntityId>, StoreError> {
500 let trace = matches!(
504 std::env::var("REDDB_BULK_TIMING").ok().as_deref(),
505 Some("1") | Some("true") | Some("on")
506 );
507 let t_start = std::time::Instant::now();
508 let n = entities.len();
509 let manager = self.get_or_create_collection(collection);
510 let t_get_coll = t_start.elapsed();
511
512 let t0 = std::time::Instant::now();
521 let n_missing_entity_ids = entities.iter().filter(|e| e.id.raw() == 0).count() as u64;
522 let n_missing_row_ids = entities
523 .iter()
524 .filter(|e| matches!(e.kind, EntityKind::TableRow { row_id: 0, .. }))
525 .count() as u64;
526 let mut entity_id_range = if n_missing_entity_ids > 0 {
527 self.reserve_entity_ids(n_missing_entity_ids)
528 } else {
529 0..0
530 };
531 let mut row_id_range = if n_missing_row_ids > 0 {
532 manager.reserve_row_ids(n_missing_row_ids)
533 } else {
534 0..0
535 };
536 for entity in &mut entities {
537 if entity.id.raw() == 0 {
538 let next = entity_id_range
539 .next()
540 .expect("reserved entity-id range exhausted");
541 entity.id = EntityId::new(next);
542 } else {
543 self.register_entity_id(entity.id);
544 }
545 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
546 if *row_id == 0 {
547 *row_id = row_id_range
548 .next()
549 .expect("reserved row-id range exhausted");
550 } else {
551 manager.register_row_id(*row_id);
552 }
553 }
554 entity.ensure_table_logical_id();
555 }
556 let t_assign_ids = t0.elapsed();
557
558 let graph_labels: Vec<Option<(String, EntityId)>> = entities
560 .iter()
561 .map(|e| {
562 if let EntityKind::GraphNode(ref node) = e.kind {
563 Some((node.label.clone(), e.id))
564 } else {
565 None
566 }
567 })
568 .collect();
569
570 let t0 = std::time::Instant::now();
577 let serialized: Option<Vec<(Vec<u8>, Vec<u8>)>> = if self.pager.is_some() {
578 let fv = self.format_version();
579 let serial_map = |e: &UnifiedEntity| {
580 (
581 e.id.raw().to_be_bytes().to_vec(),
582 Self::serialize_entity_record(e, None, fv),
583 )
584 };
585 if entities.len() >= 512 {
591 use rayon::prelude::*;
592 Some(entities.par_iter().map(serial_map).collect())
593 } else {
594 Some(entities.iter().map(serial_map).collect())
595 }
596 } else {
597 None
598 };
599 let t_serialize = t0.elapsed();
600
601 let t0 = std::time::Instant::now();
603 let ids = manager.bulk_insert(entities)?;
604 let t_manager = t0.elapsed();
605 for id in &ids {
606 self.register_entity_id(*id);
607 }
608
609 for (label, entity_id) in graph_labels.iter().flatten() {
611 self.update_graph_label_index(collection, label, *entity_id);
612 }
613
614 let skip_btree_requested = matches!(
624 std::env::var("REDDB_BULK_SKIP_PERSIST_UNSAFE")
625 .ok()
626 .as_deref(),
627 Some("1") | Some("true") | Some("on")
628 );
629 let skip_btree = skip_btree_requested && self.pager.is_none();
632 if skip_btree_requested && !skip_btree {
633 static IGNORED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
635 IGNORED.get_or_init(|| {
636 tracing::warn!(
637 "REDDB_BULK_SKIP_PERSIST_UNSAFE set but durable pager is \
638 active — flag ignored; bulk inserts will be persisted normally"
639 );
640 });
641 } else if skip_btree {
642 static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
644 WARNED.get_or_init(|| {
645 tracing::warn!(
646 "REDDB_BULK_SKIP_PERSIST_UNSAFE set (ephemeral/no-pager mode) — \
647 bulk inserts NOT durable; data will be lost on restart"
648 );
649 });
650 }
651
652 let mut t_btree_lock = std::time::Duration::ZERO;
656 let mut t_btree_insert = std::time::Duration::ZERO;
657 let mut t_flush = std::time::Duration::ZERO;
658 if !skip_btree {
659 if let (Some(pager), Some(batch)) = (&self.pager, serialized.as_ref()) {
660 let t0 = std::time::Instant::now();
661 let mut btree_indices = self.btree_indices.write();
662 let btree = btree_indices
663 .entry(collection.to_string())
664 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
665 let root_before = btree.root_page_id();
666 t_btree_lock = t0.elapsed();
667
668 let t0 = std::time::Instant::now();
669 let _ = btree.bulk_insert_sorted(batch);
670 t_btree_insert = t0.elapsed();
671 let registry_dirty = root_before != btree.root_page_id();
672
673 let t0 = std::time::Instant::now();
674 if registry_dirty {
675 self.mark_paged_registry_dirty();
676 }
677 t_flush = t0.elapsed();
678 }
679 }
680
681 let actions = serialized
687 .map(|batch| {
688 let records: Vec<Vec<u8>> =
689 batch.into_iter().map(|(_key, record)| record).collect();
690 vec![StoreWalAction::BulkUpsertEntityRecords {
691 collection: collection.to_string(),
692 records,
693 }]
694 })
695 .unwrap_or_default();
696 self.finish_paged_write(actions)?;
697
698 if trace {
699 tracing::debug!(
700 n,
701 total = ?t_start.elapsed(),
702 get_coll = ?t_get_coll,
703 assign = ?t_assign_ids,
704 serialize = ?t_serialize,
705 manager = ?t_manager,
706 btree_lock = ?t_btree_lock,
707 btree = ?t_btree_insert,
708 flush = ?t_flush,
709 "bulk_insert timing"
710 );
711 }
712
713 Ok(ids)
714 }
715
716 pub fn insert_auto(
718 &self,
719 collection: &str,
720 entity: UnifiedEntity,
721 ) -> Result<EntityId, StoreError> {
722 let manager = self.get_or_create_collection(collection);
723 let mut entity = entity;
724 if entity.id.raw() == 0 {
725 entity.id = self.next_entity_id();
726 } else {
727 self.register_entity_id(entity.id);
728 }
729 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
731 if *row_id == 0 {
732 *row_id = manager.next_row_id();
733 } else {
734 manager.register_row_id(*row_id);
735 }
736 }
737 entity.ensure_table_logical_id();
738
739 let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
741 {
742 Some(node.label.clone())
743 } else {
744 None
745 };
746 self.context_index.index_entity(collection, &entity);
748
749 let id_for_serialize = entity.id;
764 let serialized_record: Option<Vec<u8>> = if self.pager.is_some() {
765 Some(Self::serialize_entity_record(
766 &entity,
767 None,
768 self.format_version(),
769 ))
770 } else {
771 None
772 };
773 if self.config.auto_index_refs {
774 self.index_cross_refs(&entity, collection)?;
775 }
776
777 let id = manager.insert(entity)?;
778 debug_assert_eq!(id, id_for_serialize);
779 if let Some(ref label) = graph_node_label {
787 self.update_graph_label_index(collection, label, id);
788 }
789
790 let mut registry_dirty = false;
791 if let (Some(_pager), Some(record)) = (&self.pager, serialized_record.as_ref()) {
792 if let Some(btree) = self.get_or_create_btree(collection) {
793 let root_before = btree.root_page_id();
794
795 let key = id.raw().to_be_bytes();
796 btree.insert(&key, record).map_err(|e| {
797 StoreError::Io(std::io::Error::other(format!(
798 "B-tree insert error while inserting '{collection}'/{id}: {e}"
799 )))
800 })?;
801 registry_dirty = root_before != btree.root_page_id();
802 }
803 }
804
805 if self.pager.is_some() {
809 let actions = serialized_record
810 .map(|record| {
811 vec![StoreWalAction::UpsertEntityRecord {
812 collection: collection.to_string(),
813 record,
814 }]
815 })
816 .unwrap_or_default();
817 if registry_dirty {
818 self.mark_paged_registry_dirty();
819 }
820 self.finish_paged_write(actions)?;
821 }
822 Ok(id)
823 }
824
825 pub fn get(&self, collection: &str, id: EntityId) -> Option<UnifiedEntity> {
831 if let Some(entity) = self
833 .get_collection(collection)
834 .and_then(|manager| manager.get(id))
835 {
836 return Some(entity);
837 }
838
839 if self.pager.is_some() {
841 let btree_indices = self.btree_indices.read();
842 if let Some(btree) = btree_indices.get(collection) {
843 let key = id.raw().to_be_bytes();
844 if let Ok(Some(value)) = btree.get(&key) {
845 if let Ok((entity, _)) =
846 Self::deserialize_entity_record(&value, self.format_version())
847 {
848 return Some(entity);
849 }
850 }
851 }
852 }
853
854 None
855 }
856
857 pub fn get_table_row_by_logical_id(
863 &self,
864 collection: &str,
865 logical_id: EntityId,
866 ) -> Option<UnifiedEntity> {
867 if let Some(entity) = self.get(collection, logical_id) {
868 if matches!(entity.kind, EntityKind::TableRow { .. })
869 && entity.logical_id() == logical_id
870 && entity.xmax == 0
871 {
872 return Some(entity);
873 }
874 }
875
876 let manager = self.get_collection(collection)?;
877 let mut matches = manager.query_all(|entity| {
878 matches!(entity.kind, EntityKind::TableRow { .. }) && entity.logical_id() == logical_id
879 });
880 matches
881 .iter()
882 .find(|entity| entity.xmax == 0)
883 .cloned()
884 .or_else(|| matches.pop())
885 }
886
887 pub fn table_row_versions_by_logical_id(
888 &self,
889 collection: &str,
890 logical_id: EntityId,
891 ) -> Vec<UnifiedEntity> {
892 self.get_collection(collection)
893 .map(|manager| {
894 manager.query_all(|entity| {
895 matches!(entity.kind, EntityKind::TableRow { .. })
896 && entity.logical_id() == logical_id
897 })
898 })
899 .unwrap_or_default()
900 }
901
902 pub fn vacuum_mvcc_history(
903 &self,
904 collection: &str,
905 cutoff_xid: u64,
906 ) -> Result<MvccVacuumStats, StoreError> {
907 let manager = self
908 .get_collection(collection)
909 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
910 let entities =
911 manager.query_all(|entity| matches!(entity.kind, EntityKind::TableRow { .. }));
912 let mut logical = HashMap::<EntityId, (bool, u64)>::new();
913 for entity in &entities {
914 let entry = logical.entry(entity.logical_id()).or_insert((false, 0));
915 if entity.xmax == 0 {
916 entry.0 = true;
917 }
918 entry.1 = entry.1.max(entity.xmin);
919 }
920
921 let mut stats = MvccVacuumStats::default();
922 let mut reclaim_ids = Vec::new();
923 for entity in entities {
924 if entity.xmax == 0 {
925 continue;
926 }
927 stats.scanned_versions += 1;
928 let (has_live_version, max_xmin) = logical
929 .get(&entity.logical_id())
930 .copied()
931 .unwrap_or((false, entity.xmin));
932 let is_delete_tombstone = !has_live_version && entity.xmin == max_xmin;
933 if entity.xmax < cutoff_xid {
934 stats.reclaimed_versions += 1;
935 if is_delete_tombstone {
936 stats.reclaimed_tombstones += 1;
937 } else {
938 stats.reclaimed_history_versions += 1;
939 }
940 reclaim_ids.push(entity.id);
941 } else {
942 stats.retained_versions += 1;
943 if is_delete_tombstone {
944 stats.retained_tombstones += 1;
945 } else {
946 stats.retained_history_versions += 1;
947 }
948 }
949 }
950
951 if !reclaim_ids.is_empty() {
952 self.delete_batch(collection, &reclaim_ids)?;
953 }
954 Ok(stats)
955 }
956
957 pub(crate) fn install_versioned_table_row_update(
958 &self,
959 collection: &str,
960 old_version: UnifiedEntity,
961 mut new_version: UnifiedEntity,
962 metadata: Option<&Metadata>,
963 ) -> Result<(), StoreError> {
964 let manager = self
965 .get_collection(collection)
966 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
967
968 let old_id = old_version.id;
969 let new_id = new_version.id;
970 let inherited_metadata = metadata.cloned().or_else(|| manager.get_metadata(old_id));
971
972 self.entity_cache.remove(old_id.raw());
973 self.entity_cache.remove(new_id.raw());
974 manager.update(old_version.clone())?;
975 self.context_index.remove_entity(old_id);
976
977 self.register_entity_id(new_version.id);
978 if let EntityKind::TableRow { ref mut row_id, .. } = new_version.kind {
979 if *row_id == 0 {
980 *row_id = manager.next_row_id();
981 } else {
982 manager.register_row_id(*row_id);
983 }
984 }
985 new_version.ensure_table_logical_id();
986 manager.insert(new_version.clone())?;
987 if let Some(metadata) = inherited_metadata {
988 manager.set_metadata(new_id, metadata)?;
989 }
990 self.context_index.index_entity(collection, &new_version);
991 if self.config.auto_index_refs {
992 self.index_cross_refs(&new_version, collection)?;
993 }
994
995 let old_metadata = manager.get_metadata(old_id);
996 let new_metadata = manager.get_metadata(new_id);
997 let fv = self.format_version();
998 let records = vec![
999 Self::serialize_entity_record(&old_version, old_metadata.as_ref(), fv),
1000 Self::serialize_entity_record(&new_version, new_metadata.as_ref(), fv),
1001 ];
1002 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
1003 collection: collection.to_string(),
1004 records,
1005 }])?;
1006
1007 Ok(())
1008 }
1009
1010 pub fn get_batch(&self, collection: &str, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1016 match self.get_collection(collection) {
1017 Some(manager) => manager.get_many(ids),
1018 None => vec![None; ids.len()],
1019 }
1020 }
1021
1022 pub fn get_any(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1024 if let Some(cached) = self.entity_cache.get(id.raw()) {
1028 return Some(cached);
1029 }
1030
1031 let collections = self.collections.read();
1033 for (name, manager) in collections.iter() {
1034 if let Some(entity) = manager.get(id) {
1035 let result = (name.clone(), entity);
1036 drop(collections);
1040 self.entity_cache.insert(id.raw(), result.clone());
1041 return Some(result);
1042 }
1043 }
1044 None
1045 }
1046
1047 pub fn entity_cache_hit_rate(&self) -> Option<f64> {
1053 self.entity_cache.hit_rate()
1054 }
1055
1056 pub fn entity_cache_stats(&self) -> super::super::entity_cache::EntityCacheStats {
1058 self.entity_cache.stats()
1059 }
1060
1061 pub fn delete(&self, collection: &str, id: EntityId) -> Result<bool, StoreError> {
1063 self.entity_cache.remove(id.raw());
1065 let manager = self
1066 .get_collection(collection)
1067 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1068
1069 let deleted = manager.delete(id)?;
1070 if !deleted {
1071 return Ok(false);
1072 }
1073
1074 let mut registry_dirty = false;
1076 if self.pager.is_some() {
1077 let btree_indices = self.btree_indices.read();
1078 if let Some(btree) = btree_indices.get(collection) {
1079 let root_before = btree.root_page_id();
1080 let key = id.raw().to_be_bytes();
1081 let _ = btree.delete(&key);
1082 registry_dirty = root_before != btree.root_page_id();
1083 }
1084 }
1085
1086 self.unindex_cross_refs(id)?;
1088
1089 self.remove_from_graph_label_index(collection, id);
1091
1092 if registry_dirty {
1093 self.mark_paged_registry_dirty();
1094 }
1095 self.finish_paged_write([StoreWalAction::DeleteEntityRecord {
1096 collection: collection.to_string(),
1097 entity_id: id.raw(),
1098 }])?;
1099
1100 Ok(true)
1101 }
1102
1103 pub fn delete_batch(
1104 &self,
1105 collection: &str,
1106 ids: &[EntityId],
1107 ) -> Result<Vec<EntityId>, StoreError> {
1108 if ids.is_empty() {
1109 return Ok(Vec::new());
1110 }
1111
1112 self.entity_cache.remove_many(ids.iter().map(|id| id.raw()));
1117
1118 let manager = self
1119 .get_collection(collection)
1120 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1121
1122 let deleted_ids = manager.delete_batch(ids)?;
1123 if deleted_ids.is_empty() {
1124 return Ok(deleted_ids);
1125 }
1126
1127 let mut registry_dirty = false;
1128 if self.pager.is_some() {
1129 let btree_indices = self.btree_indices.read();
1130 if let Some(btree) = btree_indices.get(collection) {
1131 let root_before = btree.root_page_id();
1132 for id in &deleted_ids {
1133 let key = id.raw().to_be_bytes();
1134 let _ = btree.delete(&key);
1135 }
1136 registry_dirty = root_before != btree.root_page_id();
1137 }
1138 }
1139
1140 self.unindex_cross_refs_batch(&deleted_ids)?;
1141 self.remove_from_graph_label_index_batch(collection, &deleted_ids);
1142 if registry_dirty {
1143 self.mark_paged_registry_dirty();
1144 }
1145 let actions = deleted_ids
1146 .iter()
1147 .map(|id| StoreWalAction::DeleteEntityRecord {
1148 collection: collection.to_string(),
1149 entity_id: id.raw(),
1150 })
1151 .collect::<Vec<_>>();
1152 self.finish_paged_write(actions)?;
1153
1154 Ok(deleted_ids)
1155 }
1156
1157 pub fn set_metadata(
1159 &self,
1160 collection: &str,
1161 id: EntityId,
1162 metadata: Metadata,
1163 ) -> Result<(), StoreError> {
1164 let manager = self
1165 .get_collection(collection)
1166 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1167
1168 manager.set_metadata(id, metadata)?;
1169 if let Some(entity) = manager.get(id) {
1170 self.persist_entities_to_pager(collection, std::slice::from_ref(&entity))?;
1171 }
1172 Ok(())
1173 }
1174
1175 pub fn get_metadata(&self, collection: &str, id: EntityId) -> Option<Metadata> {
1177 self.get_collection(collection)?.get_metadata(id)
1178 }
1179
1180 pub fn add_cross_ref(
1182 &self,
1183 source_collection: &str,
1184 source_id: EntityId,
1185 target_collection: &str,
1186 target_id: EntityId,
1187 ref_type: RefType,
1188 weight: f32,
1189 ) -> Result<(), StoreError> {
1190 let source_manager = self
1192 .get_collection(source_collection)
1193 .ok_or_else(|| StoreError::CollectionNotFound(source_collection.to_string()))?;
1194
1195 if source_manager.get(source_id).is_none() {
1196 return Err(StoreError::EntityNotFound(source_id));
1197 }
1198
1199 let target_manager = self
1201 .get_collection(target_collection)
1202 .ok_or_else(|| StoreError::CollectionNotFound(target_collection.to_string()))?;
1203
1204 if target_manager.get(target_id).is_none() {
1205 return Err(StoreError::EntityNotFound(target_id));
1206 }
1207
1208 let current_refs = self
1210 .cross_refs
1211 .read()
1212 .get(&source_id)
1213 .map_or(0, |v| v.len());
1214
1215 if current_refs >= self.config.max_cross_refs {
1216 return Err(StoreError::TooManyRefs(source_id));
1217 }
1218
1219 let mut registry_dirty = false;
1220 {
1221 let mut forward = self.cross_refs.write();
1222 let refs = forward.entry(source_id).or_default();
1223 let inserted = !refs.iter().any(|(id, kind, coll)| {
1224 *id == target_id && *kind == ref_type && coll == target_collection
1225 });
1226 if inserted {
1227 refs.push((target_id, ref_type, target_collection.to_string()));
1228 registry_dirty = true;
1229 }
1230 }
1231
1232 {
1233 let mut reverse = self.reverse_refs.write();
1234 let refs = reverse.entry(target_id).or_default();
1235 let inserted = !refs.iter().any(|(id, kind, coll)| {
1236 *id == source_id && *kind == ref_type && coll == source_collection
1237 });
1238 if inserted {
1239 refs.push((source_id, ref_type, source_collection.to_string()));
1240 registry_dirty = true;
1241 }
1242 }
1243
1244 if let Some(mut entity) = source_manager.get(source_id) {
1245 if !entity.cross_refs().iter().any(|xref| {
1246 xref.target == target_id
1247 && xref.ref_type == ref_type
1248 && xref.target_collection == target_collection
1249 }) {
1250 let cross_ref = CrossRef::with_weight(
1251 source_id,
1252 target_id,
1253 target_collection,
1254 ref_type,
1255 weight,
1256 );
1257 entity.add_cross_ref(cross_ref);
1258 let _ = source_manager.update(entity.clone());
1259 registry_dirty = true;
1260 self.persist_entities_to_pager(source_collection, std::slice::from_ref(&entity))?;
1261 }
1262 }
1263
1264 if registry_dirty {
1265 self.mark_paged_registry_dirty();
1266 if matches!(
1267 self.config.durability_mode,
1268 crate::api::DurabilityMode::Strict
1269 ) {
1270 self.flush_paged_state()?;
1271 }
1272 }
1273
1274 Ok(())
1275 }
1276
1277 pub fn get_refs_from(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1279 self.cross_refs.read().get(&id).cloned().unwrap_or_default()
1280 }
1281
1282 pub fn get_refs_to(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1284 self.reverse_refs
1285 .read()
1286 .get(&id)
1287 .cloned()
1288 .unwrap_or_default()
1289 }
1290
1291 pub fn expand_refs(
1293 &self,
1294 id: EntityId,
1295 depth: u32,
1296 ref_types: Option<&[RefType]>,
1297 ) -> Vec<(UnifiedEntity, u32, RefType)> {
1298 let mut results = Vec::new();
1299 let mut visited = std::collections::HashSet::new();
1300 visited.insert(id);
1301
1302 self.expand_refs_recursive(id, depth, ref_types, &mut visited, &mut results, 1);
1303
1304 results
1305 }
1306
1307 fn expand_refs_recursive(
1308 &self,
1309 id: EntityId,
1310 max_depth: u32,
1311 ref_types: Option<&[RefType]>,
1312 visited: &mut std::collections::HashSet<EntityId>,
1313 results: &mut Vec<(UnifiedEntity, u32, RefType)>,
1314 current_depth: u32,
1315 ) {
1316 if current_depth > max_depth {
1317 return;
1318 }
1319
1320 for (target_id, ref_type, target_collection) in self.get_refs_from(id) {
1321 if visited.contains(&target_id) {
1322 continue;
1323 }
1324
1325 if let Some(types) = ref_types {
1326 if !types.contains(&ref_type) {
1327 continue;
1328 }
1329 }
1330
1331 visited.insert(target_id);
1332
1333 if let Some(entity) = self.get(&target_collection, target_id) {
1334 results.push((entity, current_depth, ref_type));
1335
1336 self.expand_refs_recursive(
1338 target_id,
1339 max_depth,
1340 ref_types,
1341 visited,
1342 results,
1343 current_depth + 1,
1344 );
1345 }
1346 }
1347 }
1348
1349 pub(crate) fn index_cross_refs(
1351 &self,
1352 entity: &UnifiedEntity,
1353 collection: &str,
1354 ) -> Result<(), StoreError> {
1355 let mut registry_dirty = false;
1356 for cross_ref in entity.cross_refs() {
1357 if cross_ref.target_collection.is_empty() {
1358 continue;
1359 }
1360 {
1361 let mut forward = self.cross_refs.write();
1362 let refs = forward.entry(cross_ref.source).or_default();
1363 let inserted = !refs.iter().any(|(id, kind, coll)| {
1364 *id == cross_ref.target
1365 && *kind == cross_ref.ref_type
1366 && coll == &cross_ref.target_collection
1367 });
1368 if inserted {
1369 refs.push((
1370 cross_ref.target,
1371 cross_ref.ref_type,
1372 cross_ref.target_collection.clone(),
1373 ));
1374 registry_dirty = true;
1375 }
1376 }
1377
1378 {
1379 let mut reverse = self.reverse_refs.write();
1380 let refs = reverse.entry(cross_ref.target).or_default();
1381 let inserted = !refs.iter().any(|(id, kind, coll)| {
1382 *id == cross_ref.source && *kind == cross_ref.ref_type && coll == collection
1383 });
1384 if inserted {
1385 refs.push((cross_ref.source, cross_ref.ref_type, collection.to_string()));
1386 registry_dirty = true;
1387 }
1388 }
1389 }
1390
1391 if registry_dirty {
1392 self.mark_paged_registry_dirty();
1393 }
1394
1395 Ok(())
1396 }
1397
1398 pub(crate) fn unindex_cross_refs(&self, id: EntityId) -> Result<(), StoreError> {
1400 self.cross_refs.write().remove(&id);
1402
1403 let mut reverse = self.reverse_refs.write();
1405 for refs in reverse.values_mut() {
1406 refs.retain(|(source, _, _)| *source != id);
1407 }
1408 reverse.remove(&id);
1409 self.mark_paged_registry_dirty();
1410
1411 Ok(())
1412 }
1413
1414 pub(crate) fn unindex_cross_refs_batch(&self, ids: &[EntityId]) -> Result<(), StoreError> {
1415 if ids.is_empty() {
1416 return Ok(());
1417 }
1418
1419 let id_set: std::collections::HashSet<EntityId> = ids.iter().copied().collect();
1420
1421 let needs_forward_cleanup = {
1435 let forward = self.cross_refs.read();
1436 id_set.iter().any(|id| forward.contains_key(id))
1437 };
1438 let needs_reverse_cleanup = {
1439 let reverse = self.reverse_refs.read();
1440 id_set.iter().any(|id| reverse.contains_key(id))
1441 };
1442
1443 if !needs_forward_cleanup && !needs_reverse_cleanup {
1444 self.unindex_cross_refs_fast_path
1445 .fetch_add(1, Ordering::Relaxed);
1446 return Ok(());
1447 }
1448
1449 if needs_forward_cleanup {
1450 let mut forward = self.cross_refs.write();
1451 for id in &id_set {
1452 forward.remove(id);
1453 }
1454 }
1455
1456 if needs_reverse_cleanup || needs_forward_cleanup {
1457 let mut reverse = self.reverse_refs.write();
1461 if needs_forward_cleanup {
1462 for refs in reverse.values_mut() {
1463 refs.retain(|(source, _, _)| !id_set.contains(source));
1464 }
1465 }
1466 reverse.retain(|target, refs| !id_set.contains(target) && !refs.is_empty());
1467 }
1468 self.mark_paged_registry_dirty();
1469
1470 Ok(())
1471 }
1472
1473 pub fn unindex_cross_refs_fast_path_hits(&self) -> u64 {
1477 self.unindex_cross_refs_fast_path.load(Ordering::Relaxed)
1478 }
1479
1480 pub fn query_all<F>(&self, filter: F) -> Vec<(String, UnifiedEntity)>
1482 where
1483 F: Fn(&UnifiedEntity) -> bool + Clone + Send + Sync,
1484 {
1485 let pairs: Vec<(String, Arc<SegmentManager>)> = {
1496 let collections = self.collections.read();
1497 collections
1498 .iter()
1499 .map(|(name, mgr)| (name.clone(), Arc::clone(mgr)))
1500 .collect()
1501 };
1502
1503 let use_parallel = pairs.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1504 if !use_parallel {
1505 return pairs
1507 .into_iter()
1508 .flat_map(|(name, mgr)| {
1509 mgr.query_all(filter.clone())
1510 .into_iter()
1511 .map(move |e| (name.clone(), e))
1512 })
1513 .collect();
1514 }
1515
1516 let filter_ref = &filter;
1518 let collection_results: Vec<Vec<(String, UnifiedEntity)>> = std::thread::scope(|s| {
1519 pairs
1520 .iter()
1521 .map(|(name, manager)| {
1522 let name = (*name).clone();
1523 s.spawn(move || {
1524 manager
1525 .query_all(|e| filter_ref(e))
1526 .into_iter()
1527 .map(|e| (name.clone(), e))
1528 .collect::<Vec<_>>()
1529 })
1530 })
1531 .collect::<Vec<_>>()
1532 .into_iter()
1533 .map(|h| h.join().unwrap_or_default())
1534 .collect()
1535 });
1536
1537 collection_results.into_iter().flatten().collect()
1538 }
1539
1540 pub fn filter_metadata_all(
1542 &self,
1543 filters: &[(String, MetadataFilter)],
1544 ) -> Vec<(String, EntityId)> {
1545 let mut results = Vec::new();
1546 let collections = self.collections.read();
1547
1548 for (name, manager) in collections.iter() {
1549 for id in manager.filter_metadata(filters) {
1550 results.push((name.clone(), id));
1551 }
1552 }
1553
1554 results
1555 }
1556
1557 pub fn stats(&self) -> StoreStats {
1559 let pairs: Vec<(String, Arc<SegmentManager>)> = {
1565 let collections = self.collections.read();
1566 collections
1567 .iter()
1568 .map(|(name, mgr)| (name.clone(), Arc::clone(mgr)))
1569 .collect()
1570 };
1571
1572 let mut stats = StoreStats {
1573 collection_count: pairs.len(),
1574 ..Default::default()
1575 };
1576
1577 for (name, manager) in &pairs {
1578 let manager_stats = manager.stats();
1579 stats.total_entities += manager_stats.total_entities;
1580 stats.total_memory_bytes += manager_stats.total_memory_bytes;
1581 stats.collections.insert(name.clone(), manager_stats);
1582 }
1583
1584 stats
1585 }
1586
1587 pub fn run_maintenance(&self) -> Result<(), StoreError> {
1589 let collections = self.collections.read();
1590 for manager in collections.values() {
1591 manager.run_maintenance()?;
1592 }
1593 Ok(())
1594 }
1595}
1596
1597fn flatten_config_json(
1599 prefix: &str,
1600 value: &crate::serde_json::Value,
1601 out: &mut Vec<(String, crate::storage::schema::Value)>,
1602) {
1603 use crate::storage::schema::Value;
1604 match value {
1605 crate::serde_json::Value::Object(map) => {
1606 for (k, v) in map {
1607 let key = if prefix.is_empty() {
1608 k.clone()
1609 } else {
1610 format!("{prefix}.{k}")
1611 };
1612 flatten_config_json(&key, v, out);
1613 }
1614 }
1615 crate::serde_json::Value::String(s) => {
1616 out.push((prefix.to_string(), Value::text(s.clone())));
1617 }
1618 crate::serde_json::Value::Number(n) => {
1619 if n.fract().abs() < f64::EPSILON {
1620 out.push((prefix.to_string(), Value::UnsignedInteger(*n as u64)));
1621 } else {
1622 out.push((prefix.to_string(), Value::Float(*n)));
1623 }
1624 }
1625 crate::serde_json::Value::Bool(b) => {
1626 out.push((prefix.to_string(), Value::Boolean(*b)));
1627 }
1628 crate::serde_json::Value::Null => {
1629 out.push((prefix.to_string(), Value::Null));
1630 }
1631 crate::serde_json::Value::Array(arr) => {
1632 let json_str = crate::serde_json::to_string(value).unwrap_or_default();
1633 out.push((prefix.to_string(), Value::text(json_str)));
1634 }
1635 }
1636}