1use super::*;
2
3impl UnifiedStore {
4 pub(crate) fn persist_entities_to_pager(
5 &self,
6 collection: &str,
7 entities: &[UnifiedEntity],
8 ) -> Result<(), StoreError> {
9 self.persist_entities_impl(collection, entities, false)
10 }
11
12 pub(crate) fn persist_entity_refs_to_pager(
16 &self,
17 collection: &str,
18 entities: &[&UnifiedEntity],
19 ) -> Result<(), StoreError> {
20 self.persist_entity_refs_impl(collection, entities, false)
21 }
22
23 pub(crate) fn persist_entity_refs_to_pager_wal_only(
26 &self,
27 collection: &str,
28 entities: &[&UnifiedEntity],
29 ) -> Result<(), StoreError> {
30 self.persist_entity_refs_impl(collection, entities, true)
31 }
32
33 fn persist_entity_refs_impl(
34 &self,
35 collection: &str,
36 entities: &[&UnifiedEntity],
37 skip_btree: bool,
38 ) -> Result<(), StoreError> {
39 if entities.is_empty() {
40 return Ok(());
41 }
42 let Some(pager) = &self.pager else {
43 return Ok(());
44 };
45 let fv = self.format_version();
46 let manager = self
47 .get_collection(collection)
48 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
49 let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
50 .iter()
51 .map(|entity| {
52 let metadata = manager.get_metadata(entity.id);
53 (
54 entity.id.raw().to_be_bytes().to_vec(),
55 Self::serialize_entity_record(entity, metadata.as_ref(), fv),
56 )
57 })
58 .collect();
59 serialized.sort_by(|a, b| a.0.cmp(&b.0));
60
61 if !skip_btree {
62 let mut btree_indices = self.btree_indices.write();
63 let btree = btree_indices
64 .entry(collection.to_string())
65 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
66 let root_before = btree.root_page_id();
67 btree.upsert_batch_sorted(&serialized).map_err(|e| {
68 StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
69 })?;
70 let root_after = btree.root_page_id();
71 drop(btree_indices);
72 if root_before != root_after {
73 self.mark_paged_registry_dirty();
74 }
75 }
76 let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
77 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
78 collection: collection.to_string(),
79 records,
80 }])?;
81 Ok(())
82 }
83
84 pub(crate) fn persist_entities_to_pager_wal_only(
96 &self,
97 collection: &str,
98 entities: &[UnifiedEntity],
99 ) -> Result<(), StoreError> {
100 self.persist_entities_impl(collection, entities, true)
101 }
102
103 fn persist_entities_impl(
104 &self,
105 collection: &str,
106 entities: &[UnifiedEntity],
107 skip_btree: bool,
108 ) -> Result<(), StoreError> {
109 if entities.is_empty() {
110 return Ok(());
111 }
112
113 let Some(pager) = &self.pager else {
114 return Ok(());
115 };
116
117 let fv = self.format_version();
118 let manager = self
119 .get_collection(collection)
120 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
121 let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
122 .iter()
123 .map(|entity| {
124 let metadata = manager.get_metadata(entity.id);
125 (
126 entity.id.raw().to_be_bytes().to_vec(),
127 Self::serialize_entity_record(entity, metadata.as_ref(), fv),
128 )
129 })
130 .collect();
131 serialized.sort_by(|a, b| a.0.cmp(&b.0));
133
134 if !skip_btree {
135 let mut btree_indices = self.btree_indices.write();
136 let btree = btree_indices
137 .entry(collection.to_string())
138 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
139 let root_before = btree.root_page_id();
140
141 btree.upsert_batch_sorted(&serialized).map_err(|e| {
145 StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
146 })?;
147 let root_after = btree.root_page_id();
148 drop(btree_indices);
149 if root_before != root_after {
150 self.mark_paged_registry_dirty();
151 }
152 }
153 let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
163 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
164 collection: collection.to_string(),
165 records,
166 }])?;
167
168 Ok(())
169 }
170
171 pub(crate) fn update_graph_label_index(
173 &self,
174 collection: &str,
175 label: &str,
176 entity_id: EntityId,
177 ) {
178 let key = (collection.to_string(), label.to_string());
179 let mut idx = self.graph_label_index.write();
180 idx.entry(key).or_default().push(entity_id);
181 }
182
183 pub(crate) fn remove_from_graph_label_index(&self, collection: &str, entity_id: EntityId) {
185 let mut idx = self.graph_label_index.write();
186 for ((col, _), ids) in idx.iter_mut() {
187 if col == collection {
188 ids.retain(|&id| id != entity_id);
189 }
190 }
191 idx.retain(|_, ids| !ids.is_empty());
193 }
194
195 pub(crate) fn remove_from_graph_label_index_batch(
196 &self,
197 collection: &str,
198 entity_ids: &[EntityId],
199 ) {
200 if entity_ids.is_empty() {
201 return;
202 }
203 let id_set: std::collections::HashSet<EntityId> = entity_ids.iter().copied().collect();
204 let mut idx = self.graph_label_index.write();
205 for ((col, _), ids) in idx.iter_mut() {
206 if col == collection {
207 ids.retain(|id| !id_set.contains(id));
208 }
209 }
210 idx.retain(|_, ids| !ids.is_empty());
211 }
212
213 pub fn lookup_graph_nodes_by_label(&self, label: &str) -> Vec<EntityId> {
215 let idx = self.graph_label_index.read();
216 idx.iter()
217 .filter(|((_, l), _)| l == label)
218 .flat_map(|(_, ids)| ids.iter().copied())
219 .collect()
220 }
221
222 pub fn lookup_graph_nodes_by_label_in(&self, collection: &str, label: &str) -> Vec<EntityId> {
224 let idx = self.graph_label_index.read();
225 idx.get(&(collection.to_string(), label.to_string()))
226 .cloned()
227 .unwrap_or_default()
228 }
229
230 pub fn create_collection(&self, name: impl Into<String>) -> Result<(), StoreError> {
231 let name = name.into();
232 let mut collections = self.collections.write();
233
234 if collections.contains_key(&name) {
235 return Err(StoreError::CollectionExists(name));
236 }
237
238 let manager = SegmentManager::with_config(&name, self.config.manager_config.clone());
239 collections.insert(name.clone(), Arc::new(manager));
240 drop(collections);
241 self.mark_paged_registry_dirty();
242 self.finish_paged_write([StoreWalAction::CreateCollection { name }])?;
243
244 Ok(())
245 }
246
247 pub fn get_or_create_collection(&self, name: impl Into<String>) -> Arc<SegmentManager> {
249 let name = name.into();
250 {
252 let collections = self.collections.read();
253 if let Some(manager) = collections.get(&name) {
254 return Arc::clone(manager);
255 }
256 }
257 let mut collections = self.collections.write();
259 if let Some(manager) = collections.get(&name) {
261 return Arc::clone(manager);
262 }
263 let manager = Arc::new(SegmentManager::with_config(
264 &name,
265 self.config.manager_config.clone(),
266 ));
267 collections.insert(name, Arc::clone(&manager));
268 self.mark_paged_registry_dirty();
269 manager
270 }
271
272 pub fn get_collection(&self, name: &str) -> Option<Arc<SegmentManager>> {
274 self.collections.read().get(name).map(Arc::clone)
275 }
276
277 pub fn context_index(&self) -> &ContextIndex {
279 &self.context_index
280 }
281
282 pub fn set_config_tree(&self, prefix: &str, json: &crate::serde_json::Value) -> usize {
285 let _ = self.get_or_create_collection("red_config");
286 let mut pairs = Vec::new();
287 flatten_config_json(prefix, json, &mut pairs);
288 let mut saved = 0;
289 for (key, value) in pairs {
290 let entity = UnifiedEntity::new(
291 EntityId::new(0),
292 EntityKind::TableRow {
293 table: Arc::from("red_config"),
294 row_id: 0,
295 },
296 EntityData::Row(RowData {
297 columns: Vec::new(),
298 named: Some(
299 [
300 ("key".to_string(), crate::storage::schema::Value::text(key)),
301 ("value".to_string(), value),
302 ]
303 .into_iter()
304 .collect(),
305 ),
306 schema: None,
307 }),
308 );
309 if self.insert_auto("red_config", entity).is_ok() {
310 saved += 1;
311 }
312 }
313 saved
314 }
315
316 pub fn get_config(&self, key: &str) -> Option<crate::storage::schema::Value> {
318 let manager = self.get_collection("red_config")?;
319 for entity in manager.query_all(|_| true) {
320 if let EntityData::Row(row) = &entity.data {
321 if let Some(named) = &row.named {
322 let key_matches = named
323 .get("key")
324 .and_then(|v| match v {
325 crate::storage::schema::Value::Text(s) => Some(s.as_ref() == key),
326 _ => None,
327 })
328 .unwrap_or(false);
329 if key_matches {
330 return named.get("value").cloned();
331 }
332 }
333 }
334 }
335 None
336 }
337
338 pub fn list_collections(&self) -> Vec<String> {
340 self.collections.read().keys().cloned().collect()
341 }
342
343 pub fn drop_collection(&self, name: &str) -> Result<(), StoreError> {
345 let manager = {
346 let mut collections = self.collections.write();
347
348 collections
349 .remove(name)
350 .ok_or_else(|| StoreError::CollectionNotFound(name.to_string()))?
351 };
352
353 let entities = manager.query_all(|_| true);
354 let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
355
356 for entity_id in &entity_ids {
357 self.context_index.remove_entity(*entity_id);
358 let _ = self.unindex_cross_refs(*entity_id);
359 }
360
361 self.btree_indices.write().remove(name);
362
363 self.entity_cache.retain(|entity_id, (collection, _)| {
364 collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
365 });
366
367 self.cross_refs.write().retain(|source_id, refs| {
368 refs.retain(|(target_id, _, target_collection)| {
369 target_collection != name && !entity_ids.iter().any(|id| id == target_id)
370 });
371 !entity_ids.iter().any(|id| id == source_id)
372 });
373
374 self.reverse_refs.write().retain(|target_id, refs| {
375 refs.retain(|(source_id, _, source_collection)| {
376 source_collection != name && !entity_ids.iter().any(|id| id == source_id)
377 });
378 !entity_ids.iter().any(|id| id == target_id)
379 });
380
381 self.mark_paged_registry_dirty();
382 self.finish_paged_write([StoreWalAction::DropCollection {
383 name: name.to_string(),
384 }])?;
385
386 Ok(())
387 }
388
389 pub fn insert(&self, collection: &str, entity: UnifiedEntity) -> Result<EntityId, StoreError> {
391 let manager = self
392 .get_collection(collection)
393 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
394
395 let mut entity = entity;
396 if entity.id.raw() == 0 {
397 entity.id = self.next_entity_id();
398 } else {
399 self.register_entity_id(entity.id);
400 }
401 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
403 if *row_id == 0 {
404 *row_id = manager.next_row_id();
405 } else {
406 manager.register_row_id(*row_id);
407 }
408 }
409 entity.ensure_table_logical_id();
410 let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
412 {
413 Some(node.label.clone())
414 } else {
415 None
416 };
417
418 let id = manager.insert(entity)?;
419 self.register_entity_id(id);
420
421 if let Some(ref label) = graph_node_label {
423 self.update_graph_label_index(collection, label, id);
424 }
425
426 let mut registry_dirty = false;
428 if let Some(pager) = &self.pager {
429 if let Some(entity) = manager.get(id) {
430 let mut btree_indices = self.btree_indices.write();
431 let btree = btree_indices
432 .entry(collection.to_string())
433 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
434 let root_before = btree.root_page_id();
435
436 let key = id.raw().to_be_bytes();
437 let metadata = manager.get_metadata(id);
438 let value = Self::serialize_entity_record(
439 &entity,
440 metadata.as_ref(),
441 self.format_version(),
442 );
443 let _ = btree.insert(&key, &value);
445 registry_dirty = root_before != btree.root_page_id();
446 }
447 }
448
449 if self.config.auto_index_refs {
451 if let Some(entity) = manager.get(id) {
452 self.index_cross_refs(&entity, collection)?;
453 }
454 }
455
456 if self.pager.is_some() {
460 let actions = manager
461 .get(id)
462 .map(|entity| {
463 let metadata = manager.get_metadata(id);
464 vec![StoreWalAction::upsert_entity(
465 collection,
466 &entity,
467 metadata.as_ref(),
468 self.format_version(),
469 )]
470 })
471 .unwrap_or_default();
472 if registry_dirty {
473 self.mark_paged_registry_dirty();
474 }
475 self.finish_paged_write(actions)?;
476 }
477
478 Ok(id)
479 }
480
481 pub fn bulk_insert(
486 &self,
487 collection: &str,
488 mut entities: Vec<UnifiedEntity>,
489 ) -> Result<Vec<EntityId>, StoreError> {
490 let trace = matches!(
494 std::env::var("REDDB_BULK_TIMING").ok().as_deref(),
495 Some("1") | Some("true") | Some("on")
496 );
497 let t_start = std::time::Instant::now();
498 let n = entities.len();
499 let manager = self.get_or_create_collection(collection);
500 let t_get_coll = t_start.elapsed();
501
502 let t0 = std::time::Instant::now();
511 let n_missing_entity_ids = entities.iter().filter(|e| e.id.raw() == 0).count() as u64;
512 let n_missing_row_ids = entities
513 .iter()
514 .filter(|e| matches!(e.kind, EntityKind::TableRow { row_id: 0, .. }))
515 .count() as u64;
516 let mut entity_id_range = if n_missing_entity_ids > 0 {
517 self.reserve_entity_ids(n_missing_entity_ids)
518 } else {
519 0..0
520 };
521 let mut row_id_range = if n_missing_row_ids > 0 {
522 manager.reserve_row_ids(n_missing_row_ids)
523 } else {
524 0..0
525 };
526 for entity in &mut entities {
527 if entity.id.raw() == 0 {
528 let next = entity_id_range
529 .next()
530 .expect("reserved entity-id range exhausted");
531 entity.id = EntityId::new(next);
532 } else {
533 self.register_entity_id(entity.id);
534 }
535 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
536 if *row_id == 0 {
537 *row_id = row_id_range
538 .next()
539 .expect("reserved row-id range exhausted");
540 } else {
541 manager.register_row_id(*row_id);
542 }
543 }
544 entity.ensure_table_logical_id();
545 }
546 let t_assign_ids = t0.elapsed();
547
548 let graph_labels: Vec<Option<(String, EntityId)>> = entities
550 .iter()
551 .map(|e| {
552 if let EntityKind::GraphNode(ref node) = e.kind {
553 Some((node.label.clone(), e.id))
554 } else {
555 None
556 }
557 })
558 .collect();
559
560 let t0 = std::time::Instant::now();
567 let serialized: Option<Vec<(Vec<u8>, Vec<u8>)>> = if self.pager.is_some() {
568 let fv = self.format_version();
569 let serial_map = |e: &UnifiedEntity| {
570 (
571 e.id.raw().to_be_bytes().to_vec(),
572 Self::serialize_entity_record(e, None, fv),
573 )
574 };
575 if entities.len() >= 512 {
581 use rayon::prelude::*;
582 Some(entities.par_iter().map(serial_map).collect())
583 } else {
584 Some(entities.iter().map(serial_map).collect())
585 }
586 } else {
587 None
588 };
589 let t_serialize = t0.elapsed();
590
591 let t0 = std::time::Instant::now();
593 let ids = manager.bulk_insert(entities)?;
594 let t_manager = t0.elapsed();
595 for id in &ids {
596 self.register_entity_id(*id);
597 }
598
599 for (label, entity_id) in graph_labels.iter().flatten() {
601 self.update_graph_label_index(collection, label, *entity_id);
602 }
603
604 let skip_btree_requested = matches!(
614 std::env::var("REDDB_BULK_SKIP_PERSIST_UNSAFE")
615 .ok()
616 .as_deref(),
617 Some("1") | Some("true") | Some("on")
618 );
619 let skip_btree = skip_btree_requested && self.pager.is_none();
622 if skip_btree_requested && !skip_btree {
623 static IGNORED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
625 IGNORED.get_or_init(|| {
626 tracing::warn!(
627 "REDDB_BULK_SKIP_PERSIST_UNSAFE set but durable pager is \
628 active — flag ignored; bulk inserts will be persisted normally"
629 );
630 });
631 } else if skip_btree {
632 static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
634 WARNED.get_or_init(|| {
635 tracing::warn!(
636 "REDDB_BULK_SKIP_PERSIST_UNSAFE set (ephemeral/no-pager mode) — \
637 bulk inserts NOT durable; data will be lost on restart"
638 );
639 });
640 }
641
642 let mut t_btree_lock = std::time::Duration::ZERO;
646 let mut t_btree_insert = std::time::Duration::ZERO;
647 let mut t_flush = std::time::Duration::ZERO;
648 if !skip_btree {
649 if let (Some(pager), Some(batch)) = (&self.pager, serialized.as_ref()) {
650 let t0 = std::time::Instant::now();
651 let mut btree_indices = self.btree_indices.write();
652 let btree = btree_indices
653 .entry(collection.to_string())
654 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
655 let root_before = btree.root_page_id();
656 t_btree_lock = t0.elapsed();
657
658 let t0 = std::time::Instant::now();
659 let _ = btree.bulk_insert_sorted(batch);
660 t_btree_insert = t0.elapsed();
661 let registry_dirty = root_before != btree.root_page_id();
662
663 let t0 = std::time::Instant::now();
664 if registry_dirty {
665 self.mark_paged_registry_dirty();
666 }
667 t_flush = t0.elapsed();
668 }
669 }
670
671 let actions = serialized
677 .map(|batch| {
678 let records: Vec<Vec<u8>> =
679 batch.into_iter().map(|(_key, record)| record).collect();
680 vec![StoreWalAction::BulkUpsertEntityRecords {
681 collection: collection.to_string(),
682 records,
683 }]
684 })
685 .unwrap_or_default();
686 self.finish_paged_write(actions)?;
687
688 if trace {
689 tracing::debug!(
690 n,
691 total = ?t_start.elapsed(),
692 get_coll = ?t_get_coll,
693 assign = ?t_assign_ids,
694 serialize = ?t_serialize,
695 manager = ?t_manager,
696 btree_lock = ?t_btree_lock,
697 btree = ?t_btree_insert,
698 flush = ?t_flush,
699 "bulk_insert timing"
700 );
701 }
702
703 Ok(ids)
704 }
705
706 pub fn insert_auto(
708 &self,
709 collection: &str,
710 entity: UnifiedEntity,
711 ) -> Result<EntityId, StoreError> {
712 let manager = self.get_or_create_collection(collection);
713 let mut entity = entity;
714 if entity.id.raw() == 0 {
715 entity.id = self.next_entity_id();
716 } else {
717 self.register_entity_id(entity.id);
718 }
719 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
721 if *row_id == 0 {
722 *row_id = manager.next_row_id();
723 } else {
724 manager.register_row_id(*row_id);
725 }
726 }
727 entity.ensure_table_logical_id();
728
729 let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
731 {
732 Some(node.label.clone())
733 } else {
734 None
735 };
736 self.context_index.index_entity(collection, &entity);
738
739 let id_for_serialize = entity.id;
754 let serialized_record: Option<Vec<u8>> = if self.pager.is_some() {
755 Some(Self::serialize_entity_record(
756 &entity,
757 None,
758 self.format_version(),
759 ))
760 } else {
761 None
762 };
763 if self.config.auto_index_refs {
764 self.index_cross_refs(&entity, collection)?;
765 }
766
767 let id = manager.insert(entity)?;
768 debug_assert_eq!(id, id_for_serialize);
769 if let Some(ref label) = graph_node_label {
777 self.update_graph_label_index(collection, label, id);
778 }
779
780 let mut registry_dirty = false;
781 if let (Some(_pager), Some(record)) = (&self.pager, serialized_record.as_ref()) {
782 if let Some(btree) = self.get_or_create_btree(collection) {
783 let root_before = btree.root_page_id();
784
785 let key = id.raw().to_be_bytes();
786 btree.insert(&key, record).map_err(|e| {
787 StoreError::Io(std::io::Error::other(format!(
788 "B-tree insert error while inserting '{collection}'/{id}: {e}"
789 )))
790 })?;
791 registry_dirty = root_before != btree.root_page_id();
792 }
793 }
794
795 if self.pager.is_some() {
799 let actions = serialized_record
800 .map(|record| {
801 vec![StoreWalAction::UpsertEntityRecord {
802 collection: collection.to_string(),
803 record,
804 }]
805 })
806 .unwrap_or_default();
807 if registry_dirty {
808 self.mark_paged_registry_dirty();
809 }
810 self.finish_paged_write(actions)?;
811 }
812 Ok(id)
813 }
814
815 pub fn get(&self, collection: &str, id: EntityId) -> Option<UnifiedEntity> {
821 if let Some(entity) = self
823 .get_collection(collection)
824 .and_then(|manager| manager.get(id))
825 {
826 return Some(entity);
827 }
828
829 if self.pager.is_some() {
831 let btree_indices = self.btree_indices.read();
832 if let Some(btree) = btree_indices.get(collection) {
833 let key = id.raw().to_be_bytes();
834 if let Ok(Some(value)) = btree.get(&key) {
835 if let Ok((entity, _)) =
836 Self::deserialize_entity_record(&value, self.format_version())
837 {
838 return Some(entity);
839 }
840 }
841 }
842 }
843
844 None
845 }
846
847 pub fn get_table_row_by_logical_id(
853 &self,
854 collection: &str,
855 logical_id: EntityId,
856 ) -> Option<UnifiedEntity> {
857 if let Some(entity) = self.get(collection, logical_id) {
858 if matches!(entity.kind, EntityKind::TableRow { .. })
859 && entity.logical_id() == logical_id
860 && entity.xmax == 0
861 {
862 return Some(entity);
863 }
864 }
865
866 let manager = self.get_collection(collection)?;
867 let mut matches = manager.query_all(|entity| {
868 matches!(entity.kind, EntityKind::TableRow { .. }) && entity.logical_id() == logical_id
869 });
870 matches
871 .iter()
872 .find(|entity| entity.xmax == 0)
873 .cloned()
874 .or_else(|| matches.pop())
875 }
876
877 pub fn table_row_versions_by_logical_id(
878 &self,
879 collection: &str,
880 logical_id: EntityId,
881 ) -> Vec<UnifiedEntity> {
882 self.get_collection(collection)
883 .map(|manager| {
884 manager.query_all(|entity| {
885 matches!(entity.kind, EntityKind::TableRow { .. })
886 && entity.logical_id() == logical_id
887 })
888 })
889 .unwrap_or_default()
890 }
891
892 pub fn vacuum_mvcc_history(
893 &self,
894 collection: &str,
895 cutoff_xid: u64,
896 ) -> Result<MvccVacuumStats, StoreError> {
897 let manager = self
898 .get_collection(collection)
899 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
900 let entities =
901 manager.query_all(|entity| matches!(entity.kind, EntityKind::TableRow { .. }));
902 let mut logical = HashMap::<EntityId, (bool, u64)>::new();
903 for entity in &entities {
904 let entry = logical.entry(entity.logical_id()).or_insert((false, 0));
905 if entity.xmax == 0 {
906 entry.0 = true;
907 }
908 entry.1 = entry.1.max(entity.xmin);
909 }
910
911 let mut stats = MvccVacuumStats::default();
912 let mut reclaim_ids = Vec::new();
913 for entity in entities {
914 if entity.xmax == 0 {
915 continue;
916 }
917 stats.scanned_versions += 1;
918 let (has_live_version, max_xmin) = logical
919 .get(&entity.logical_id())
920 .copied()
921 .unwrap_or((false, entity.xmin));
922 let is_delete_tombstone = !has_live_version && entity.xmin == max_xmin;
923 if entity.xmax < cutoff_xid {
924 stats.reclaimed_versions += 1;
925 if is_delete_tombstone {
926 stats.reclaimed_tombstones += 1;
927 } else {
928 stats.reclaimed_history_versions += 1;
929 }
930 reclaim_ids.push(entity.id);
931 } else {
932 stats.retained_versions += 1;
933 if is_delete_tombstone {
934 stats.retained_tombstones += 1;
935 } else {
936 stats.retained_history_versions += 1;
937 }
938 }
939 }
940
941 if !reclaim_ids.is_empty() {
942 self.delete_batch(collection, &reclaim_ids)?;
943 }
944 Ok(stats)
945 }
946
947 pub(crate) fn install_versioned_table_row_update(
948 &self,
949 collection: &str,
950 old_version: UnifiedEntity,
951 mut new_version: UnifiedEntity,
952 metadata: Option<&Metadata>,
953 ) -> Result<(), StoreError> {
954 let manager = self
955 .get_collection(collection)
956 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
957
958 let old_id = old_version.id;
959 let new_id = new_version.id;
960 let inherited_metadata = metadata.cloned().or_else(|| manager.get_metadata(old_id));
961
962 self.entity_cache.remove(old_id.raw());
963 self.entity_cache.remove(new_id.raw());
964 manager.update(old_version.clone())?;
965 self.context_index.remove_entity(old_id);
966
967 self.register_entity_id(new_version.id);
968 if let EntityKind::TableRow { ref mut row_id, .. } = new_version.kind {
969 if *row_id == 0 {
970 *row_id = manager.next_row_id();
971 } else {
972 manager.register_row_id(*row_id);
973 }
974 }
975 new_version.ensure_table_logical_id();
976 manager.insert(new_version.clone())?;
977 if let Some(metadata) = inherited_metadata {
978 manager.set_metadata(new_id, metadata)?;
979 }
980 self.context_index.index_entity(collection, &new_version);
981 if self.config.auto_index_refs {
982 self.index_cross_refs(&new_version, collection)?;
983 }
984
985 let old_metadata = manager.get_metadata(old_id);
986 let new_metadata = manager.get_metadata(new_id);
987 let fv = self.format_version();
988 let records = vec![
989 Self::serialize_entity_record(&old_version, old_metadata.as_ref(), fv),
990 Self::serialize_entity_record(&new_version, new_metadata.as_ref(), fv),
991 ];
992 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
993 collection: collection.to_string(),
994 records,
995 }])?;
996
997 Ok(())
998 }
999
1000 pub fn get_batch(&self, collection: &str, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1006 match self.get_collection(collection) {
1007 Some(manager) => manager.get_many(ids),
1008 None => vec![None; ids.len()],
1009 }
1010 }
1011
1012 pub fn get_any(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1014 if let Some(cached) = self.entity_cache.get(id.raw()) {
1018 return Some(cached);
1019 }
1020
1021 let collections = self.collections.read();
1023 for (name, manager) in collections.iter() {
1024 if let Some(entity) = manager.get(id) {
1025 let result = (name.clone(), entity);
1026 drop(collections);
1030 self.entity_cache.insert(id.raw(), result.clone());
1031 return Some(result);
1032 }
1033 }
1034 None
1035 }
1036
1037 pub fn entity_cache_hit_rate(&self) -> Option<f64> {
1043 self.entity_cache.hit_rate()
1044 }
1045
1046 pub fn entity_cache_stats(&self) -> super::super::entity_cache::EntityCacheStats {
1048 self.entity_cache.stats()
1049 }
1050
1051 pub fn delete(&self, collection: &str, id: EntityId) -> Result<bool, StoreError> {
1053 self.entity_cache.remove(id.raw());
1055 let manager = self
1056 .get_collection(collection)
1057 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1058
1059 let deleted = manager.delete(id)?;
1060 if !deleted {
1061 return Ok(false);
1062 }
1063
1064 let mut registry_dirty = false;
1066 if self.pager.is_some() {
1067 let btree_indices = self.btree_indices.read();
1068 if let Some(btree) = btree_indices.get(collection) {
1069 let root_before = btree.root_page_id();
1070 let key = id.raw().to_be_bytes();
1071 let _ = btree.delete(&key);
1072 registry_dirty = root_before != btree.root_page_id();
1073 }
1074 }
1075
1076 self.unindex_cross_refs(id)?;
1078
1079 self.remove_from_graph_label_index(collection, id);
1081
1082 if registry_dirty {
1083 self.mark_paged_registry_dirty();
1084 }
1085 self.finish_paged_write([StoreWalAction::DeleteEntityRecord {
1086 collection: collection.to_string(),
1087 entity_id: id.raw(),
1088 }])?;
1089
1090 Ok(true)
1091 }
1092
1093 pub fn delete_batch(
1094 &self,
1095 collection: &str,
1096 ids: &[EntityId],
1097 ) -> Result<Vec<EntityId>, StoreError> {
1098 if ids.is_empty() {
1099 return Ok(Vec::new());
1100 }
1101
1102 self.entity_cache.remove_many(ids.iter().map(|id| id.raw()));
1107
1108 let manager = self
1109 .get_collection(collection)
1110 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1111
1112 let deleted_ids = manager.delete_batch(ids)?;
1113 if deleted_ids.is_empty() {
1114 return Ok(deleted_ids);
1115 }
1116
1117 let mut registry_dirty = false;
1118 if self.pager.is_some() {
1119 let btree_indices = self.btree_indices.read();
1120 if let Some(btree) = btree_indices.get(collection) {
1121 let root_before = btree.root_page_id();
1122 for id in &deleted_ids {
1123 let key = id.raw().to_be_bytes();
1124 let _ = btree.delete(&key);
1125 }
1126 registry_dirty = root_before != btree.root_page_id();
1127 }
1128 }
1129
1130 self.unindex_cross_refs_batch(&deleted_ids)?;
1131 self.remove_from_graph_label_index_batch(collection, &deleted_ids);
1132 if registry_dirty {
1133 self.mark_paged_registry_dirty();
1134 }
1135 let actions = deleted_ids
1136 .iter()
1137 .map(|id| StoreWalAction::DeleteEntityRecord {
1138 collection: collection.to_string(),
1139 entity_id: id.raw(),
1140 })
1141 .collect::<Vec<_>>();
1142 self.finish_paged_write(actions)?;
1143
1144 Ok(deleted_ids)
1145 }
1146
1147 pub fn set_metadata(
1149 &self,
1150 collection: &str,
1151 id: EntityId,
1152 metadata: Metadata,
1153 ) -> Result<(), StoreError> {
1154 let manager = self
1155 .get_collection(collection)
1156 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1157
1158 manager.set_metadata(id, metadata)?;
1159 if let Some(entity) = manager.get(id) {
1160 self.persist_entities_to_pager(collection, std::slice::from_ref(&entity))?;
1161 }
1162 Ok(())
1163 }
1164
1165 pub fn get_metadata(&self, collection: &str, id: EntityId) -> Option<Metadata> {
1167 self.get_collection(collection)?.get_metadata(id)
1168 }
1169
1170 pub fn add_cross_ref(
1172 &self,
1173 source_collection: &str,
1174 source_id: EntityId,
1175 target_collection: &str,
1176 target_id: EntityId,
1177 ref_type: RefType,
1178 weight: f32,
1179 ) -> Result<(), StoreError> {
1180 let source_manager = self
1182 .get_collection(source_collection)
1183 .ok_or_else(|| StoreError::CollectionNotFound(source_collection.to_string()))?;
1184
1185 if source_manager.get(source_id).is_none() {
1186 return Err(StoreError::EntityNotFound(source_id));
1187 }
1188
1189 let target_manager = self
1191 .get_collection(target_collection)
1192 .ok_or_else(|| StoreError::CollectionNotFound(target_collection.to_string()))?;
1193
1194 if target_manager.get(target_id).is_none() {
1195 return Err(StoreError::EntityNotFound(target_id));
1196 }
1197
1198 let current_refs = self
1200 .cross_refs
1201 .read()
1202 .get(&source_id)
1203 .map_or(0, |v| v.len());
1204
1205 if current_refs >= self.config.max_cross_refs {
1206 return Err(StoreError::TooManyRefs(source_id));
1207 }
1208
1209 let mut registry_dirty = false;
1210 {
1211 let mut forward = self.cross_refs.write();
1212 let refs = forward.entry(source_id).or_default();
1213 let inserted = !refs.iter().any(|(id, kind, coll)| {
1214 *id == target_id && *kind == ref_type && coll == target_collection
1215 });
1216 if inserted {
1217 refs.push((target_id, ref_type, target_collection.to_string()));
1218 registry_dirty = true;
1219 }
1220 }
1221
1222 {
1223 let mut reverse = self.reverse_refs.write();
1224 let refs = reverse.entry(target_id).or_default();
1225 let inserted = !refs.iter().any(|(id, kind, coll)| {
1226 *id == source_id && *kind == ref_type && coll == source_collection
1227 });
1228 if inserted {
1229 refs.push((source_id, ref_type, source_collection.to_string()));
1230 registry_dirty = true;
1231 }
1232 }
1233
1234 if let Some(mut entity) = source_manager.get(source_id) {
1235 if !entity.cross_refs().iter().any(|xref| {
1236 xref.target == target_id
1237 && xref.ref_type == ref_type
1238 && xref.target_collection == target_collection
1239 }) {
1240 let cross_ref = CrossRef::with_weight(
1241 source_id,
1242 target_id,
1243 target_collection,
1244 ref_type,
1245 weight,
1246 );
1247 entity.add_cross_ref(cross_ref);
1248 let _ = source_manager.update(entity.clone());
1249 registry_dirty = true;
1250 self.persist_entities_to_pager(source_collection, std::slice::from_ref(&entity))?;
1251 }
1252 }
1253
1254 if registry_dirty {
1255 self.mark_paged_registry_dirty();
1256 if matches!(
1257 self.config.durability_mode,
1258 crate::api::DurabilityMode::Strict
1259 ) {
1260 self.flush_paged_state()?;
1261 }
1262 }
1263
1264 Ok(())
1265 }
1266
1267 pub fn get_refs_from(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1269 self.cross_refs.read().get(&id).cloned().unwrap_or_default()
1270 }
1271
1272 pub fn get_refs_to(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1274 self.reverse_refs
1275 .read()
1276 .get(&id)
1277 .cloned()
1278 .unwrap_or_default()
1279 }
1280
1281 pub fn expand_refs(
1283 &self,
1284 id: EntityId,
1285 depth: u32,
1286 ref_types: Option<&[RefType]>,
1287 ) -> Vec<(UnifiedEntity, u32, RefType)> {
1288 let mut results = Vec::new();
1289 let mut visited = std::collections::HashSet::new();
1290 visited.insert(id);
1291
1292 self.expand_refs_recursive(id, depth, ref_types, &mut visited, &mut results, 1);
1293
1294 results
1295 }
1296
1297 fn expand_refs_recursive(
1298 &self,
1299 id: EntityId,
1300 max_depth: u32,
1301 ref_types: Option<&[RefType]>,
1302 visited: &mut std::collections::HashSet<EntityId>,
1303 results: &mut Vec<(UnifiedEntity, u32, RefType)>,
1304 current_depth: u32,
1305 ) {
1306 if current_depth > max_depth {
1307 return;
1308 }
1309
1310 for (target_id, ref_type, target_collection) in self.get_refs_from(id) {
1311 if visited.contains(&target_id) {
1312 continue;
1313 }
1314
1315 if let Some(types) = ref_types {
1316 if !types.contains(&ref_type) {
1317 continue;
1318 }
1319 }
1320
1321 visited.insert(target_id);
1322
1323 if let Some(entity) = self.get(&target_collection, target_id) {
1324 results.push((entity, current_depth, ref_type));
1325
1326 self.expand_refs_recursive(
1328 target_id,
1329 max_depth,
1330 ref_types,
1331 visited,
1332 results,
1333 current_depth + 1,
1334 );
1335 }
1336 }
1337 }
1338
1339 pub(crate) fn index_cross_refs(
1341 &self,
1342 entity: &UnifiedEntity,
1343 collection: &str,
1344 ) -> Result<(), StoreError> {
1345 let mut registry_dirty = false;
1346 for cross_ref in entity.cross_refs() {
1347 if cross_ref.target_collection.is_empty() {
1348 continue;
1349 }
1350 {
1351 let mut forward = self.cross_refs.write();
1352 let refs = forward.entry(cross_ref.source).or_default();
1353 let inserted = !refs.iter().any(|(id, kind, coll)| {
1354 *id == cross_ref.target
1355 && *kind == cross_ref.ref_type
1356 && coll == &cross_ref.target_collection
1357 });
1358 if inserted {
1359 refs.push((
1360 cross_ref.target,
1361 cross_ref.ref_type,
1362 cross_ref.target_collection.clone(),
1363 ));
1364 registry_dirty = true;
1365 }
1366 }
1367
1368 {
1369 let mut reverse = self.reverse_refs.write();
1370 let refs = reverse.entry(cross_ref.target).or_default();
1371 let inserted = !refs.iter().any(|(id, kind, coll)| {
1372 *id == cross_ref.source && *kind == cross_ref.ref_type && coll == collection
1373 });
1374 if inserted {
1375 refs.push((cross_ref.source, cross_ref.ref_type, collection.to_string()));
1376 registry_dirty = true;
1377 }
1378 }
1379 }
1380
1381 if registry_dirty {
1382 self.mark_paged_registry_dirty();
1383 }
1384
1385 Ok(())
1386 }
1387
1388 pub(crate) fn unindex_cross_refs(&self, id: EntityId) -> Result<(), StoreError> {
1390 self.cross_refs.write().remove(&id);
1392
1393 let mut reverse = self.reverse_refs.write();
1395 for refs in reverse.values_mut() {
1396 refs.retain(|(source, _, _)| *source != id);
1397 }
1398 reverse.remove(&id);
1399 self.mark_paged_registry_dirty();
1400
1401 Ok(())
1402 }
1403
1404 pub(crate) fn unindex_cross_refs_batch(&self, ids: &[EntityId]) -> Result<(), StoreError> {
1405 if ids.is_empty() {
1406 return Ok(());
1407 }
1408
1409 let id_set: std::collections::HashSet<EntityId> = ids.iter().copied().collect();
1410
1411 let needs_forward_cleanup = {
1425 let forward = self.cross_refs.read();
1426 id_set.iter().any(|id| forward.contains_key(id))
1427 };
1428 let needs_reverse_cleanup = {
1429 let reverse = self.reverse_refs.read();
1430 id_set.iter().any(|id| reverse.contains_key(id))
1431 };
1432
1433 if !needs_forward_cleanup && !needs_reverse_cleanup {
1434 self.unindex_cross_refs_fast_path
1435 .fetch_add(1, Ordering::Relaxed);
1436 return Ok(());
1437 }
1438
1439 if needs_forward_cleanup {
1440 let mut forward = self.cross_refs.write();
1441 for id in &id_set {
1442 forward.remove(id);
1443 }
1444 }
1445
1446 if needs_reverse_cleanup || needs_forward_cleanup {
1447 let mut reverse = self.reverse_refs.write();
1451 if needs_forward_cleanup {
1452 for refs in reverse.values_mut() {
1453 refs.retain(|(source, _, _)| !id_set.contains(source));
1454 }
1455 }
1456 reverse.retain(|target, refs| !id_set.contains(target) && !refs.is_empty());
1457 }
1458 self.mark_paged_registry_dirty();
1459
1460 Ok(())
1461 }
1462
1463 pub fn unindex_cross_refs_fast_path_hits(&self) -> u64 {
1467 self.unindex_cross_refs_fast_path.load(Ordering::Relaxed)
1468 }
1469
1470 pub fn query_all<F>(&self, filter: F) -> Vec<(String, UnifiedEntity)>
1472 where
1473 F: Fn(&UnifiedEntity) -> bool + Clone + Send + Sync,
1474 {
1475 let collections = self.collections.read();
1476 let pairs: Vec<_> = collections.iter().collect();
1477
1478 let use_parallel = pairs.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1479 if !use_parallel {
1480 return pairs
1482 .into_iter()
1483 .flat_map(|(name, mgr)| {
1484 mgr.query_all(filter.clone())
1485 .into_iter()
1486 .map(move |e| (name.clone(), e))
1487 })
1488 .collect();
1489 }
1490
1491 let filter_ref = &filter;
1493 let collection_results: Vec<Vec<(String, UnifiedEntity)>> = std::thread::scope(|s| {
1494 pairs
1495 .iter()
1496 .map(|(name, manager)| {
1497 let name = (*name).clone();
1498 s.spawn(move || {
1499 manager
1500 .query_all(|e| filter_ref(e))
1501 .into_iter()
1502 .map(|e| (name.clone(), e))
1503 .collect::<Vec<_>>()
1504 })
1505 })
1506 .collect::<Vec<_>>()
1507 .into_iter()
1508 .map(|h| h.join().unwrap_or_default())
1509 .collect()
1510 });
1511
1512 collection_results.into_iter().flatten().collect()
1513 }
1514
1515 pub fn filter_metadata_all(
1517 &self,
1518 filters: &[(String, MetadataFilter)],
1519 ) -> Vec<(String, EntityId)> {
1520 let mut results = Vec::new();
1521 let collections = self.collections.read();
1522
1523 for (name, manager) in collections.iter() {
1524 for id in manager.filter_metadata(filters) {
1525 results.push((name.clone(), id));
1526 }
1527 }
1528
1529 results
1530 }
1531
1532 pub fn stats(&self) -> StoreStats {
1534 let collections = self.collections.read();
1535
1536 let mut stats = StoreStats {
1537 collection_count: collections.len(),
1538 ..Default::default()
1539 };
1540
1541 for (name, manager) in collections.iter() {
1542 let manager_stats = manager.stats();
1543 stats.total_entities += manager_stats.total_entities;
1544 stats.total_memory_bytes += manager_stats.total_memory_bytes;
1545 stats.collections.insert(name.clone(), manager_stats);
1546 }
1547
1548 stats
1549 }
1550
1551 pub fn run_maintenance(&self) -> Result<(), StoreError> {
1553 let collections = self.collections.read();
1554 for manager in collections.values() {
1555 manager.run_maintenance()?;
1556 }
1557 Ok(())
1558 }
1559}
1560
1561fn flatten_config_json(
1563 prefix: &str,
1564 value: &crate::serde_json::Value,
1565 out: &mut Vec<(String, crate::storage::schema::Value)>,
1566) {
1567 use crate::storage::schema::Value;
1568 match value {
1569 crate::serde_json::Value::Object(map) => {
1570 for (k, v) in map {
1571 let key = if prefix.is_empty() {
1572 k.clone()
1573 } else {
1574 format!("{prefix}.{k}")
1575 };
1576 flatten_config_json(&key, v, out);
1577 }
1578 }
1579 crate::serde_json::Value::String(s) => {
1580 out.push((prefix.to_string(), Value::text(s.clone())));
1581 }
1582 crate::serde_json::Value::Number(n) => {
1583 if n.fract().abs() < f64::EPSILON {
1584 out.push((prefix.to_string(), Value::UnsignedInteger(*n as u64)));
1585 } else {
1586 out.push((prefix.to_string(), Value::Float(*n)));
1587 }
1588 }
1589 crate::serde_json::Value::Bool(b) => {
1590 out.push((prefix.to_string(), Value::Boolean(*b)));
1591 }
1592 crate::serde_json::Value::Null => {
1593 out.push((prefix.to_string(), Value::Null));
1594 }
1595 crate::serde_json::Value::Array(arr) => {
1596 let json_str = crate::serde_json::to_string(value).unwrap_or_default();
1597 out.push((prefix.to_string(), Value::text(json_str)));
1598 }
1599 }
1600}