1use super::*;
2
3impl UnifiedStore {
4 pub(crate) fn persist_entities_to_pager(
5 &self,
6 collection: &str,
7 entities: &[UnifiedEntity],
8 ) -> Result<(), StoreError> {
9 self.persist_entities_impl(collection, entities, false)
10 }
11
12 pub(crate) fn persist_entity_refs_to_pager(
16 &self,
17 collection: &str,
18 entities: &[&UnifiedEntity],
19 ) -> Result<(), StoreError> {
20 self.persist_entity_refs_impl(collection, entities, false)
21 }
22
23 pub(crate) fn persist_entity_refs_to_pager_wal_only(
26 &self,
27 collection: &str,
28 entities: &[&UnifiedEntity],
29 ) -> Result<(), StoreError> {
30 self.persist_entity_refs_impl(collection, entities, true)
31 }
32
33 fn persist_entity_refs_impl(
34 &self,
35 collection: &str,
36 entities: &[&UnifiedEntity],
37 skip_btree: bool,
38 ) -> Result<(), StoreError> {
39 if entities.is_empty() {
40 return Ok(());
41 }
42 if self.pager.is_none() && self.config.embedded_wal_path.is_none() {
43 return Ok(());
44 }
45 let fv = self.format_version();
46 let manager = self
47 .get_collection(collection)
48 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
49 let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
50 .iter()
51 .map(|entity| {
52 let metadata = manager.get_metadata(entity.id);
53 (
54 entity.id.raw().to_be_bytes().to_vec(),
55 Self::serialize_entity_record(entity, metadata.as_ref(), fv),
56 )
57 })
58 .collect();
59 serialized.sort_by(|a, b| a.0.cmp(&b.0));
60
61 if !skip_btree {
62 let Some(pager) = &self.pager else {
63 let records: Vec<Vec<u8>> =
64 serialized.into_iter().map(|(_id, record)| record).collect();
65 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
66 collection: collection.to_string(),
67 records,
68 }])?;
69 return Ok(());
70 };
71 let mut btree_indices = self.btree_indices.write();
72 let btree = btree_indices
73 .entry(collection.to_string())
74 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
75 let root_before = btree.root_page_id();
76 btree.upsert_batch_sorted(&serialized).map_err(|e| {
77 StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
78 })?;
79 let root_after = btree.root_page_id();
80 drop(btree_indices);
81 if root_before != root_after {
82 self.mark_paged_registry_dirty();
83 }
84 }
85 let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
86 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
87 collection: collection.to_string(),
88 records,
89 }])?;
90 Ok(())
91 }
92
93 pub(crate) fn persist_entities_to_pager_wal_only(
105 &self,
106 collection: &str,
107 entities: &[UnifiedEntity],
108 ) -> Result<(), StoreError> {
109 self.persist_entities_impl(collection, entities, true)
110 }
111
112 fn persist_entities_impl(
113 &self,
114 collection: &str,
115 entities: &[UnifiedEntity],
116 skip_btree: bool,
117 ) -> Result<(), StoreError> {
118 if entities.is_empty() {
119 return Ok(());
120 }
121
122 if self.pager.is_none() && self.config.embedded_wal_path.is_none() {
123 return Ok(());
124 }
125
126 let fv = self.format_version();
127 let manager = self
128 .get_collection(collection)
129 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
130 let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
131 .iter()
132 .map(|entity| {
133 let metadata = manager.get_metadata(entity.id);
134 (
135 entity.id.raw().to_be_bytes().to_vec(),
136 Self::serialize_entity_record(entity, metadata.as_ref(), fv),
137 )
138 })
139 .collect();
140 serialized.sort_by(|a, b| a.0.cmp(&b.0));
142
143 if !skip_btree {
144 let Some(pager) = &self.pager else {
145 let records: Vec<Vec<u8>> =
146 serialized.into_iter().map(|(_id, record)| record).collect();
147 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
148 collection: collection.to_string(),
149 records,
150 }])?;
151 return Ok(());
152 };
153 let mut btree_indices = self.btree_indices.write();
154 let btree = btree_indices
155 .entry(collection.to_string())
156 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
157 let root_before = btree.root_page_id();
158
159 btree.upsert_batch_sorted(&serialized).map_err(|e| {
163 StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
164 })?;
165 let root_after = btree.root_page_id();
166 drop(btree_indices);
167 if root_before != root_after {
168 self.mark_paged_registry_dirty();
169 }
170 }
171 let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
181 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
182 collection: collection.to_string(),
183 records,
184 }])?;
185
186 Ok(())
187 }
188
189 pub(crate) fn update_graph_label_index(
191 &self,
192 collection: &str,
193 label: &str,
194 entity_id: EntityId,
195 ) {
196 let key = (collection.to_string(), label.to_string());
197 let mut idx = self.graph_label_index.write();
198 idx.entry(key).or_default().push(entity_id);
199 }
200
201 pub(crate) fn remove_from_graph_label_index(&self, collection: &str, entity_id: EntityId) {
203 let mut idx = self.graph_label_index.write();
204 for ((col, _), ids) in idx.iter_mut() {
205 if col == collection {
206 ids.retain(|&id| id != entity_id);
207 }
208 }
209 idx.retain(|_, ids| !ids.is_empty());
211 }
212
213 pub(crate) fn remove_from_graph_label_index_batch(
214 &self,
215 collection: &str,
216 entity_ids: &[EntityId],
217 ) {
218 if entity_ids.is_empty() {
219 return;
220 }
221 let id_set: std::collections::HashSet<EntityId> = entity_ids.iter().copied().collect();
222 let mut idx = self.graph_label_index.write();
223 for ((col, _), ids) in idx.iter_mut() {
224 if col == collection {
225 ids.retain(|id| !id_set.contains(id));
226 }
227 }
228 idx.retain(|_, ids| !ids.is_empty());
229 }
230
231 pub fn lookup_graph_nodes_by_label(&self, label: &str) -> Vec<EntityId> {
233 let idx = self.graph_label_index.read();
234 idx.iter()
235 .filter(|((_, l), _)| l == label)
236 .flat_map(|(_, ids)| ids.iter().copied())
237 .collect()
238 }
239
240 pub fn lookup_graph_nodes_by_label_in(&self, collection: &str, label: &str) -> Vec<EntityId> {
242 let idx = self.graph_label_index.read();
243 idx.get(&(collection.to_string(), label.to_string()))
244 .cloned()
245 .unwrap_or_default()
246 }
247
248 pub fn create_collection(&self, name: impl Into<String>) -> Result<(), StoreError> {
249 let name = name.into();
250 let mut collections = self.collections.write();
251
252 if collections.contains_key(&name) {
253 return Err(StoreError::CollectionExists(name));
254 }
255
256 let manager = SegmentManager::with_config(&name, self.config.manager_config.clone());
257 collections.insert(name.clone(), Arc::new(manager));
258 drop(collections);
259
260 if let Err(err) = self.publish_operational_collection_create(&name) {
261 let mut collections = self.collections.write();
262 collections.remove(&name);
263 return Err(err);
264 }
265
266 self.mark_paged_registry_dirty();
267 self.finish_paged_write([StoreWalAction::CreateCollection { name }])?;
268
269 Ok(())
270 }
271
272 pub fn get_or_create_collection(&self, name: impl Into<String>) -> Arc<SegmentManager> {
274 let name = name.into();
275 {
277 let collections = self.collections.read();
278 if let Some(manager) = collections.get(&name) {
279 return Arc::clone(manager);
280 }
281 }
282 let mut collections = self.collections.write();
284 if let Some(manager) = collections.get(&name) {
286 return Arc::clone(manager);
287 }
288 let manager = Arc::new(SegmentManager::with_config(
289 &name,
290 self.config.manager_config.clone(),
291 ));
292 collections.insert(name, Arc::clone(&manager));
293 self.mark_paged_registry_dirty();
294 manager
295 }
296
297 pub fn get_collection(&self, name: &str) -> Option<Arc<SegmentManager>> {
299 self.collections.read().get(name).map(Arc::clone)
300 }
301
302 pub fn context_index(&self) -> &ContextIndex {
304 &self.context_index
305 }
306
307 pub fn take_replayed_turbo_inserts(&self, collection: &str) -> Option<Vec<(u64, Vec<f32>)>> {
313 let mut map = self.replayed_turbo_inserts.lock();
314 map.remove(collection)
315 }
316
317 pub fn set_config_tree(&self, prefix: &str, json: &crate::serde_json::Value) -> usize {
320 let _ = self.get_or_create_collection("red_config");
321 let mut pairs = Vec::new();
322 flatten_config_json(prefix, json, &mut pairs);
323 let mut saved = 0;
324 for (key, value) in pairs {
325 let entity = UnifiedEntity::new(
326 EntityId::new(0),
327 EntityKind::TableRow {
328 table: Arc::from("red_config"),
329 row_id: 0,
330 },
331 EntityData::Row(RowData {
332 columns: Vec::new(),
333 named: Some(
334 [
335 ("key".to_string(), crate::storage::schema::Value::text(key)),
336 ("value".to_string(), value),
337 ]
338 .into_iter()
339 .collect(),
340 ),
341 schema: None,
342 }),
343 );
344 if self.insert_auto("red_config", entity).is_ok() {
345 saved += 1;
346 }
347 }
348 saved
349 }
350
351 pub fn get_config(&self, key: &str) -> Option<crate::storage::schema::Value> {
353 let manager = self.get_collection("red_config")?;
354 for entity in manager.query_all(|_| true) {
355 if let EntityData::Row(row) = &entity.data {
356 if let Some(named) = &row.named {
357 let key_matches = named
358 .get("key")
359 .and_then(|v| match v {
360 crate::storage::schema::Value::Text(s) => Some(s.as_ref() == key),
361 _ => None,
362 })
363 .unwrap_or(false);
364 if key_matches {
365 return named.get("value").cloned();
366 }
367 }
368 }
369 }
370 None
371 }
372
373 pub fn list_collections(&self) -> Vec<String> {
375 self.collections.read().keys().cloned().collect()
376 }
377
378 pub fn drop_collection(&self, name: &str) -> Result<(), StoreError> {
380 self.publish_operational_collection_pending_drop(name)?;
381 let manager = {
382 let mut collections = self.collections.write();
383
384 collections
385 .remove(name)
386 .ok_or_else(|| StoreError::CollectionNotFound(name.to_string()))?
387 };
388
389 let entities = manager.query_all(|_| true);
390 let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
391
392 for entity_id in &entity_ids {
393 self.context_index.remove_entity(*entity_id);
394 let _ = self.unindex_cross_refs(*entity_id);
395 }
396
397 self.btree_indices.write().remove(name);
398
399 self.entity_cache.retain(|entity_id, (collection, _)| {
400 collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
401 });
402
403 self.cross_refs.write().retain(|source_id, refs| {
404 refs.retain(|(target_id, _, target_collection)| {
405 target_collection != name && !entity_ids.iter().any(|id| id == target_id)
406 });
407 !entity_ids.iter().any(|id| id == source_id)
408 });
409
410 self.reverse_refs.write().retain(|target_id, refs| {
411 refs.retain(|(source_id, _, source_collection)| {
412 source_collection != name && !entity_ids.iter().any(|id| id == source_id)
413 });
414 !entity_ids.iter().any(|id| id == target_id)
415 });
416
417 self.mark_paged_registry_dirty();
418 self.finish_paged_write([StoreWalAction::DropCollection {
419 name: name.to_string(),
420 }])?;
421 self.publish_operational_collection_drop_finished(name)?;
422
423 Ok(())
424 }
425
426 pub fn insert(&self, collection: &str, entity: UnifiedEntity) -> Result<EntityId, StoreError> {
428 let manager = self
429 .get_collection(collection)
430 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
431
432 let mut entity = entity;
433 if entity.id.raw() == 0 {
434 entity.id = self.next_entity_id();
435 } else {
436 self.register_entity_id(entity.id);
437 }
438 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
440 if *row_id == 0 {
441 *row_id = manager.next_row_id();
442 } else {
443 manager.register_row_id(*row_id);
444 }
445 }
446 entity.ensure_table_logical_id();
447 let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
449 {
450 Some(node.label.clone())
451 } else {
452 None
453 };
454
455 let id = manager.insert(entity)?;
456 self.register_entity_id(id);
457
458 if let Some(ref label) = graph_node_label {
460 self.update_graph_label_index(collection, label, id);
461 }
462
463 let mut registry_dirty = false;
465 if let Some(pager) = &self.pager {
466 if let Some(entity) = manager.get(id) {
467 let mut btree_indices = self.btree_indices.write();
468 let btree = btree_indices
469 .entry(collection.to_string())
470 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
471 let root_before = btree.root_page_id();
472
473 let key = id.raw().to_be_bytes();
474 let metadata = manager.get_metadata(id);
475 let value = Self::serialize_entity_record(
476 &entity,
477 metadata.as_ref(),
478 self.format_version(),
479 );
480 let _ = btree.insert(&key, &value);
482 registry_dirty = root_before != btree.root_page_id();
483 }
484 }
485
486 if self.config.auto_index_refs {
488 if let Some(entity) = manager.get(id) {
489 self.index_cross_refs(&entity, collection)?;
490 }
491 }
492
493 if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
497 let actions = manager
498 .get(id)
499 .map(|entity| {
500 let metadata = manager.get_metadata(id);
501 vec![StoreWalAction::upsert_entity(
502 collection,
503 &entity,
504 metadata.as_ref(),
505 self.format_version(),
506 )]
507 })
508 .unwrap_or_default();
509 if registry_dirty {
510 self.mark_paged_registry_dirty();
511 }
512 self.finish_paged_write(actions)?;
513 }
514
515 Ok(id)
516 }
517
518 pub fn bulk_insert(
523 &self,
524 collection: &str,
525 mut entities: Vec<UnifiedEntity>,
526 ) -> Result<Vec<EntityId>, StoreError> {
527 let trace = matches!(
531 std::env::var("REDDB_BULK_TIMING").ok().as_deref(),
532 Some("1") | Some("true") | Some("on")
533 );
534 let t_start = std::time::Instant::now();
535 let n = entities.len();
536 let manager = self.get_or_create_collection(collection);
537 let t_get_coll = t_start.elapsed();
538
539 let t0 = std::time::Instant::now();
548 let n_missing_entity_ids = entities.iter().filter(|e| e.id.raw() == 0).count() as u64;
549 let n_missing_row_ids = entities
550 .iter()
551 .filter(|e| matches!(e.kind, EntityKind::TableRow { row_id: 0, .. }))
552 .count() as u64;
553 let mut entity_id_range = if n_missing_entity_ids > 0 {
554 self.reserve_entity_ids(n_missing_entity_ids)
555 } else {
556 0..0
557 };
558 let mut row_id_range = if n_missing_row_ids > 0 {
559 manager.reserve_row_ids(n_missing_row_ids)
560 } else {
561 0..0
562 };
563 for entity in &mut entities {
564 if entity.id.raw() == 0 {
565 let next = entity_id_range
566 .next()
567 .expect("reserved entity-id range exhausted");
568 entity.id = EntityId::new(next);
569 } else {
570 self.register_entity_id(entity.id);
571 }
572 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
573 if *row_id == 0 {
574 *row_id = row_id_range
575 .next()
576 .expect("reserved row-id range exhausted");
577 } else {
578 manager.register_row_id(*row_id);
579 }
580 }
581 entity.ensure_table_logical_id();
582 }
583 let t_assign_ids = t0.elapsed();
584
585 let graph_labels: Vec<Option<(String, EntityId)>> = entities
587 .iter()
588 .map(|e| {
589 if let EntityKind::GraphNode(ref node) = e.kind {
590 Some((node.label.clone(), e.id))
591 } else {
592 None
593 }
594 })
595 .collect();
596
597 let t0 = std::time::Instant::now();
604 let serialized: Option<Vec<(Vec<u8>, Vec<u8>)>> =
605 if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
606 let fv = self.format_version();
607 let serial_map = |e: &UnifiedEntity| {
608 (
609 e.id.raw().to_be_bytes().to_vec(),
610 Self::serialize_entity_record(e, None, fv),
611 )
612 };
613 if entities.len() >= 512 {
619 use rayon::prelude::*;
620 Some(entities.par_iter().map(serial_map).collect())
621 } else {
622 Some(entities.iter().map(serial_map).collect())
623 }
624 } else {
625 None
626 };
627 let t_serialize = t0.elapsed();
628
629 let t0 = std::time::Instant::now();
631 let ids = manager.bulk_insert(entities)?;
632 let t_manager = t0.elapsed();
633 for id in &ids {
634 self.register_entity_id(*id);
635 }
636
637 for (label, entity_id) in graph_labels.iter().flatten() {
639 self.update_graph_label_index(collection, label, *entity_id);
640 }
641
642 let skip_btree_requested = matches!(
652 std::env::var("REDDB_BULK_SKIP_PERSIST_UNSAFE")
653 .ok()
654 .as_deref(),
655 Some("1") | Some("true") | Some("on")
656 );
657 let skip_btree = skip_btree_requested && self.pager.is_none();
660 if skip_btree_requested && !skip_btree {
661 static IGNORED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
663 IGNORED.get_or_init(|| {
664 tracing::warn!(
665 "REDDB_BULK_SKIP_PERSIST_UNSAFE set but durable pager is \
666 active — flag ignored; bulk inserts will be persisted normally"
667 );
668 });
669 } else if skip_btree {
670 static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
672 WARNED.get_or_init(|| {
673 tracing::warn!(
674 "REDDB_BULK_SKIP_PERSIST_UNSAFE set (ephemeral/no-pager mode) — \
675 bulk inserts NOT durable; data will be lost on restart"
676 );
677 });
678 }
679
680 let mut t_btree_lock = std::time::Duration::ZERO;
684 let mut t_btree_insert = std::time::Duration::ZERO;
685 let mut t_flush = std::time::Duration::ZERO;
686 if !skip_btree {
687 if let (Some(pager), Some(batch)) = (&self.pager, serialized.as_ref()) {
688 let t0 = std::time::Instant::now();
689 let mut btree_indices = self.btree_indices.write();
690 let btree = btree_indices
691 .entry(collection.to_string())
692 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
693 let root_before = btree.root_page_id();
694 t_btree_lock = t0.elapsed();
695
696 let t0 = std::time::Instant::now();
697 let _ = btree.bulk_insert_sorted(batch);
698 t_btree_insert = t0.elapsed();
699 let registry_dirty = root_before != btree.root_page_id();
700
701 let t0 = std::time::Instant::now();
702 if registry_dirty {
703 self.mark_paged_registry_dirty();
704 }
705 t_flush = t0.elapsed();
706 }
707 }
708
709 let actions = serialized
715 .map(|batch| {
716 let records: Vec<Vec<u8>> =
717 batch.into_iter().map(|(_key, record)| record).collect();
718 vec![StoreWalAction::BulkUpsertEntityRecords {
719 collection: collection.to_string(),
720 records,
721 }]
722 })
723 .unwrap_or_default();
724 self.finish_paged_write(actions)?;
725
726 if trace {
727 tracing::debug!(
728 n,
729 total = ?t_start.elapsed(),
730 get_coll = ?t_get_coll,
731 assign = ?t_assign_ids,
732 serialize = ?t_serialize,
733 manager = ?t_manager,
734 btree_lock = ?t_btree_lock,
735 btree = ?t_btree_insert,
736 flush = ?t_flush,
737 "bulk_insert timing"
738 );
739 }
740
741 Ok(ids)
742 }
743
744 pub fn insert_auto(
746 &self,
747 collection: &str,
748 entity: UnifiedEntity,
749 ) -> Result<EntityId, StoreError> {
750 let manager = self.get_or_create_collection(collection);
751 let mut entity = entity;
752 if entity.id.raw() == 0 {
753 entity.id = self.next_entity_id();
754 } else {
755 self.register_entity_id(entity.id);
756 }
757 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
759 if *row_id == 0 {
760 *row_id = manager.next_row_id();
761 } else {
762 manager.register_row_id(*row_id);
763 }
764 }
765 entity.ensure_table_logical_id();
766
767 let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
769 {
770 Some(node.label.clone())
771 } else {
772 None
773 };
774 self.context_index.index_entity(collection, &entity);
776
777 let id_for_serialize = entity.id;
792 let serialized_record: Option<Vec<u8>> =
793 if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
794 Some(Self::serialize_entity_record(
795 &entity,
796 None,
797 self.format_version(),
798 ))
799 } else {
800 None
801 };
802 if self.config.auto_index_refs {
803 self.index_cross_refs(&entity, collection)?;
804 }
805
806 let id = manager.insert(entity)?;
807 debug_assert_eq!(id, id_for_serialize);
808 if let Some(ref label) = graph_node_label {
816 self.update_graph_label_index(collection, label, id);
817 }
818
819 let mut registry_dirty = false;
820 if let (Some(_pager), Some(record)) = (&self.pager, serialized_record.as_ref()) {
821 if let Some(btree) = self.get_or_create_btree(collection) {
822 let root_before = btree.root_page_id();
823
824 let key = id.raw().to_be_bytes();
825 btree.insert(&key, record).map_err(|e| {
826 StoreError::Io(std::io::Error::other(format!(
827 "B-tree insert error while inserting '{collection}'/{id}: {e}"
828 )))
829 })?;
830 registry_dirty = root_before != btree.root_page_id();
831 }
832 }
833
834 if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
838 let actions = serialized_record
839 .map(|record| {
840 vec![StoreWalAction::UpsertEntityRecord {
841 collection: collection.to_string(),
842 record,
843 }]
844 })
845 .unwrap_or_default();
846 if registry_dirty {
847 self.mark_paged_registry_dirty();
848 }
849 self.finish_paged_write(actions)?;
850 }
851 Ok(id)
852 }
853
854 pub fn get(&self, collection: &str, id: EntityId) -> Option<UnifiedEntity> {
860 if let Some(entity) = self
862 .get_collection(collection)
863 .and_then(|manager| manager.get(id))
864 {
865 return Some(entity);
866 }
867
868 if self.pager.is_some() {
870 let btree_indices = self.btree_indices.read();
871 if let Some(btree) = btree_indices.get(collection) {
872 let key = id.raw().to_be_bytes();
873 if let Ok(Some(value)) = btree.get(&key) {
874 if let Ok((entity, _)) =
875 Self::deserialize_entity_record(&value, self.format_version())
876 {
877 return Some(entity);
878 }
879 }
880 }
881 }
882
883 None
884 }
885
886 pub fn get_table_row_by_logical_id(
892 &self,
893 collection: &str,
894 logical_id: EntityId,
895 ) -> Option<UnifiedEntity> {
896 if let Some(entity) = self.get(collection, logical_id) {
897 if matches!(entity.kind, EntityKind::TableRow { .. })
898 && entity.logical_id() == logical_id
899 && entity.xmax == 0
900 {
901 return Some(entity);
902 }
903 }
904
905 let manager = self.get_collection(collection)?;
906 let mut matches = manager.query_all(|entity| {
907 matches!(entity.kind, EntityKind::TableRow { .. }) && entity.logical_id() == logical_id
908 });
909 matches
910 .iter()
911 .find(|entity| entity.xmax == 0)
912 .cloned()
913 .or_else(|| matches.pop())
914 }
915
916 pub fn table_row_versions_by_logical_id(
917 &self,
918 collection: &str,
919 logical_id: EntityId,
920 ) -> Vec<UnifiedEntity> {
921 self.get_collection(collection)
922 .map(|manager| {
923 manager.query_all(|entity| {
924 matches!(entity.kind, EntityKind::TableRow { .. })
925 && entity.logical_id() == logical_id
926 })
927 })
928 .unwrap_or_default()
929 }
930
931 pub fn vacuum_mvcc_history(
932 &self,
933 collection: &str,
934 cutoff_xid: u64,
935 ) -> Result<MvccVacuumStats, StoreError> {
936 let manager = self
937 .get_collection(collection)
938 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
939 let entities =
940 manager.query_all(|entity| matches!(entity.kind, EntityKind::TableRow { .. }));
941 let mut logical = HashMap::<EntityId, (bool, u64)>::new();
942 for entity in &entities {
943 let entry = logical.entry(entity.logical_id()).or_insert((false, 0));
944 if entity.xmax == 0 {
945 entry.0 = true;
946 }
947 entry.1 = entry.1.max(entity.xmin);
948 }
949
950 let mut stats = MvccVacuumStats::default();
951 let mut reclaim_ids = Vec::new();
952 for entity in entities {
953 if entity.xmax == 0 {
954 continue;
955 }
956 stats.scanned_versions += 1;
957 let (has_live_version, max_xmin) = logical
958 .get(&entity.logical_id())
959 .copied()
960 .unwrap_or((false, entity.xmin));
961 let is_delete_tombstone = !has_live_version && entity.xmin == max_xmin;
962 if entity.xmax < cutoff_xid {
963 stats.reclaimed_versions += 1;
964 if is_delete_tombstone {
965 stats.reclaimed_tombstones += 1;
966 } else {
967 stats.reclaimed_history_versions += 1;
968 }
969 reclaim_ids.push(entity.id);
970 } else {
971 stats.retained_versions += 1;
972 if is_delete_tombstone {
973 stats.retained_tombstones += 1;
974 } else {
975 stats.retained_history_versions += 1;
976 }
977 }
978 }
979
980 if !reclaim_ids.is_empty() {
981 self.delete_batch(collection, &reclaim_ids)?;
982 }
983 Ok(stats)
984 }
985
986 pub(crate) fn install_versioned_table_row_update(
987 &self,
988 collection: &str,
989 old_version: UnifiedEntity,
990 mut new_version: UnifiedEntity,
991 metadata: Option<&Metadata>,
992 ) -> Result<(), StoreError> {
993 let manager = self
994 .get_collection(collection)
995 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
996
997 let old_id = old_version.id;
998 let new_id = new_version.id;
999 let inherited_metadata = metadata.cloned().or_else(|| manager.get_metadata(old_id));
1000
1001 self.entity_cache.remove(old_id.raw());
1002 self.entity_cache.remove(new_id.raw());
1003 manager.update(old_version.clone())?;
1004 self.context_index.remove_entity(old_id);
1005
1006 self.register_entity_id(new_version.id);
1007 if let EntityKind::TableRow { ref mut row_id, .. } = new_version.kind {
1008 if *row_id == 0 {
1009 *row_id = manager.next_row_id();
1010 } else {
1011 manager.register_row_id(*row_id);
1012 }
1013 }
1014 new_version.ensure_table_logical_id();
1015 manager.insert(new_version.clone())?;
1016 if let Some(metadata) = inherited_metadata {
1017 manager.set_metadata(new_id, metadata)?;
1018 }
1019 self.context_index.index_entity(collection, &new_version);
1020 if self.config.auto_index_refs {
1021 self.index_cross_refs(&new_version, collection)?;
1022 }
1023
1024 let old_metadata = manager.get_metadata(old_id);
1025 let new_metadata = manager.get_metadata(new_id);
1026 let fv = self.format_version();
1027 let records = vec![
1028 Self::serialize_entity_record(&old_version, old_metadata.as_ref(), fv),
1029 Self::serialize_entity_record(&new_version, new_metadata.as_ref(), fv),
1030 ];
1031 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
1032 collection: collection.to_string(),
1033 records,
1034 }])?;
1035
1036 Ok(())
1037 }
1038
1039 pub fn get_batch(&self, collection: &str, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1045 match self.get_collection(collection) {
1046 Some(manager) => manager.get_many(ids),
1047 None => vec![None; ids.len()],
1048 }
1049 }
1050
1051 pub fn get_any(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1053 if let Some(cached) = self.entity_cache.get(id.raw()) {
1057 return Some(cached);
1058 }
1059
1060 let collections = self.collections.read();
1062 for (name, manager) in collections.iter() {
1063 if let Some(entity) = manager.get(id) {
1064 let result = (name.clone(), entity);
1065 drop(collections);
1069 self.entity_cache.insert(id.raw(), result.clone());
1070 return Some(result);
1071 }
1072 }
1073 None
1074 }
1075
1076 pub fn entity_cache_hit_rate(&self) -> Option<f64> {
1082 self.entity_cache.hit_rate()
1083 }
1084
1085 pub fn entity_cache_stats(&self) -> super::super::entity_cache::EntityCacheStats {
1087 self.entity_cache.stats()
1088 }
1089
1090 pub fn delete(&self, collection: &str, id: EntityId) -> Result<bool, StoreError> {
1092 self.entity_cache.remove(id.raw());
1094 let manager = self
1095 .get_collection(collection)
1096 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1097
1098 let deleted = manager.delete(id)?;
1099 if !deleted {
1100 return Ok(false);
1101 }
1102
1103 let mut registry_dirty = false;
1105 if self.pager.is_some() {
1106 let btree_indices = self.btree_indices.read();
1107 if let Some(btree) = btree_indices.get(collection) {
1108 let root_before = btree.root_page_id();
1109 let key = id.raw().to_be_bytes();
1110 let _ = btree.delete(&key);
1111 registry_dirty = root_before != btree.root_page_id();
1112 }
1113 }
1114
1115 self.unindex_cross_refs(id)?;
1117
1118 self.remove_from_graph_label_index(collection, id);
1120
1121 if registry_dirty {
1122 self.mark_paged_registry_dirty();
1123 }
1124 self.finish_paged_write([StoreWalAction::DeleteEntityRecord {
1125 collection: collection.to_string(),
1126 entity_id: id.raw(),
1127 }])?;
1128
1129 Ok(true)
1130 }
1131
1132 pub fn delete_batch(
1133 &self,
1134 collection: &str,
1135 ids: &[EntityId],
1136 ) -> Result<Vec<EntityId>, StoreError> {
1137 if ids.is_empty() {
1138 return Ok(Vec::new());
1139 }
1140
1141 self.entity_cache.remove_many(ids.iter().map(|id| id.raw()));
1146
1147 let manager = self
1148 .get_collection(collection)
1149 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1150
1151 let deleted_ids = manager.delete_batch(ids)?;
1152 if deleted_ids.is_empty() {
1153 return Ok(deleted_ids);
1154 }
1155
1156 let mut registry_dirty = false;
1157 if self.pager.is_some() {
1158 let btree_indices = self.btree_indices.read();
1159 if let Some(btree) = btree_indices.get(collection) {
1160 let root_before = btree.root_page_id();
1161 for id in &deleted_ids {
1162 let key = id.raw().to_be_bytes();
1163 let _ = btree.delete(&key);
1164 }
1165 registry_dirty = root_before != btree.root_page_id();
1166 }
1167 }
1168
1169 self.unindex_cross_refs_batch(&deleted_ids)?;
1170 self.remove_from_graph_label_index_batch(collection, &deleted_ids);
1171 if registry_dirty {
1172 self.mark_paged_registry_dirty();
1173 }
1174 let actions = deleted_ids
1175 .iter()
1176 .map(|id| StoreWalAction::DeleteEntityRecord {
1177 collection: collection.to_string(),
1178 entity_id: id.raw(),
1179 })
1180 .collect::<Vec<_>>();
1181 self.finish_paged_write(actions)?;
1182
1183 Ok(deleted_ids)
1184 }
1185
1186 pub fn set_metadata(
1188 &self,
1189 collection: &str,
1190 id: EntityId,
1191 metadata: Metadata,
1192 ) -> Result<(), StoreError> {
1193 let manager = self
1194 .get_collection(collection)
1195 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1196
1197 manager.set_metadata(id, metadata)?;
1198 if let Some(entity) = manager.get(id) {
1199 self.persist_entities_to_pager(collection, std::slice::from_ref(&entity))?;
1200 }
1201 Ok(())
1202 }
1203
1204 pub fn get_metadata(&self, collection: &str, id: EntityId) -> Option<Metadata> {
1206 self.get_collection(collection)?.get_metadata(id)
1207 }
1208
1209 pub fn add_cross_ref(
1211 &self,
1212 source_collection: &str,
1213 source_id: EntityId,
1214 target_collection: &str,
1215 target_id: EntityId,
1216 ref_type: RefType,
1217 weight: f32,
1218 ) -> Result<(), StoreError> {
1219 let source_manager = self
1221 .get_collection(source_collection)
1222 .ok_or_else(|| StoreError::CollectionNotFound(source_collection.to_string()))?;
1223
1224 if source_manager.get(source_id).is_none() {
1225 return Err(StoreError::EntityNotFound(source_id));
1226 }
1227
1228 let target_manager = self
1230 .get_collection(target_collection)
1231 .ok_or_else(|| StoreError::CollectionNotFound(target_collection.to_string()))?;
1232
1233 if target_manager.get(target_id).is_none() {
1234 return Err(StoreError::EntityNotFound(target_id));
1235 }
1236
1237 let current_refs = self
1239 .cross_refs
1240 .read()
1241 .get(&source_id)
1242 .map_or(0, |v| v.len());
1243
1244 if current_refs >= self.config.max_cross_refs {
1245 return Err(StoreError::TooManyRefs(source_id));
1246 }
1247
1248 let mut registry_dirty = false;
1249 {
1250 let mut forward = self.cross_refs.write();
1251 let refs = forward.entry(source_id).or_default();
1252 let inserted = !refs.iter().any(|(id, kind, coll)| {
1253 *id == target_id && *kind == ref_type && coll == target_collection
1254 });
1255 if inserted {
1256 refs.push((target_id, ref_type, target_collection.to_string()));
1257 registry_dirty = true;
1258 }
1259 }
1260
1261 {
1262 let mut reverse = self.reverse_refs.write();
1263 let refs = reverse.entry(target_id).or_default();
1264 let inserted = !refs.iter().any(|(id, kind, coll)| {
1265 *id == source_id && *kind == ref_type && coll == source_collection
1266 });
1267 if inserted {
1268 refs.push((source_id, ref_type, source_collection.to_string()));
1269 registry_dirty = true;
1270 }
1271 }
1272
1273 if let Some(mut entity) = source_manager.get(source_id) {
1274 if !entity.cross_refs().iter().any(|xref| {
1275 xref.target == target_id
1276 && xref.ref_type == ref_type
1277 && xref.target_collection == target_collection
1278 }) {
1279 let cross_ref = CrossRef::with_weight(
1280 source_id,
1281 target_id,
1282 target_collection,
1283 ref_type,
1284 weight,
1285 );
1286 entity.add_cross_ref(cross_ref);
1287 let _ = source_manager.update(entity.clone());
1288 registry_dirty = true;
1289 self.persist_entities_to_pager(source_collection, std::slice::from_ref(&entity))?;
1290 }
1291 }
1292
1293 if registry_dirty {
1294 self.mark_paged_registry_dirty();
1295 if matches!(
1296 self.config.durability_mode,
1297 crate::api::DurabilityMode::Strict
1298 ) {
1299 self.flush_paged_state()?;
1300 }
1301 }
1302
1303 Ok(())
1304 }
1305
1306 pub fn get_refs_from(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1308 self.cross_refs.read().get(&id).cloned().unwrap_or_default()
1309 }
1310
1311 pub fn get_refs_to(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1313 self.reverse_refs
1314 .read()
1315 .get(&id)
1316 .cloned()
1317 .unwrap_or_default()
1318 }
1319
1320 pub fn expand_refs(
1322 &self,
1323 id: EntityId,
1324 depth: u32,
1325 ref_types: Option<&[RefType]>,
1326 ) -> Vec<(UnifiedEntity, u32, RefType)> {
1327 let mut results = Vec::new();
1328 let mut visited = std::collections::HashSet::new();
1329 visited.insert(id);
1330
1331 self.expand_refs_recursive(id, depth, ref_types, &mut visited, &mut results, 1);
1332
1333 results
1334 }
1335
1336 fn expand_refs_recursive(
1337 &self,
1338 id: EntityId,
1339 max_depth: u32,
1340 ref_types: Option<&[RefType]>,
1341 visited: &mut std::collections::HashSet<EntityId>,
1342 results: &mut Vec<(UnifiedEntity, u32, RefType)>,
1343 current_depth: u32,
1344 ) {
1345 if current_depth > max_depth {
1346 return;
1347 }
1348
1349 for (target_id, ref_type, target_collection) in self.get_refs_from(id) {
1350 if visited.contains(&target_id) {
1351 continue;
1352 }
1353
1354 if let Some(types) = ref_types {
1355 if !types.contains(&ref_type) {
1356 continue;
1357 }
1358 }
1359
1360 visited.insert(target_id);
1361
1362 if let Some(entity) = self.get(&target_collection, target_id) {
1363 results.push((entity, current_depth, ref_type));
1364
1365 self.expand_refs_recursive(
1367 target_id,
1368 max_depth,
1369 ref_types,
1370 visited,
1371 results,
1372 current_depth + 1,
1373 );
1374 }
1375 }
1376 }
1377
1378 pub(crate) fn index_cross_refs(
1380 &self,
1381 entity: &UnifiedEntity,
1382 collection: &str,
1383 ) -> Result<(), StoreError> {
1384 let mut registry_dirty = false;
1385 for cross_ref in entity.cross_refs() {
1386 if cross_ref.target_collection.is_empty() {
1387 continue;
1388 }
1389 {
1390 let mut forward = self.cross_refs.write();
1391 let refs = forward.entry(cross_ref.source).or_default();
1392 let inserted = !refs.iter().any(|(id, kind, coll)| {
1393 *id == cross_ref.target
1394 && *kind == cross_ref.ref_type
1395 && coll == &cross_ref.target_collection
1396 });
1397 if inserted {
1398 refs.push((
1399 cross_ref.target,
1400 cross_ref.ref_type,
1401 cross_ref.target_collection.clone(),
1402 ));
1403 registry_dirty = true;
1404 }
1405 }
1406
1407 {
1408 let mut reverse = self.reverse_refs.write();
1409 let refs = reverse.entry(cross_ref.target).or_default();
1410 let inserted = !refs.iter().any(|(id, kind, coll)| {
1411 *id == cross_ref.source && *kind == cross_ref.ref_type && coll == collection
1412 });
1413 if inserted {
1414 refs.push((cross_ref.source, cross_ref.ref_type, collection.to_string()));
1415 registry_dirty = true;
1416 }
1417 }
1418 }
1419
1420 if registry_dirty {
1421 self.mark_paged_registry_dirty();
1422 }
1423
1424 Ok(())
1425 }
1426
1427 pub(crate) fn unindex_cross_refs(&self, id: EntityId) -> Result<(), StoreError> {
1429 self.cross_refs.write().remove(&id);
1431
1432 let mut reverse = self.reverse_refs.write();
1434 for refs in reverse.values_mut() {
1435 refs.retain(|(source, _, _)| *source != id);
1436 }
1437 reverse.remove(&id);
1438 self.mark_paged_registry_dirty();
1439
1440 Ok(())
1441 }
1442
1443 pub(crate) fn unindex_cross_refs_batch(&self, ids: &[EntityId]) -> Result<(), StoreError> {
1444 if ids.is_empty() {
1445 return Ok(());
1446 }
1447
1448 let id_set: std::collections::HashSet<EntityId> = ids.iter().copied().collect();
1449
1450 let needs_forward_cleanup = {
1464 let forward = self.cross_refs.read();
1465 id_set.iter().any(|id| forward.contains_key(id))
1466 };
1467 let needs_reverse_cleanup = {
1468 let reverse = self.reverse_refs.read();
1469 id_set.iter().any(|id| reverse.contains_key(id))
1470 };
1471
1472 if !needs_forward_cleanup && !needs_reverse_cleanup {
1473 self.unindex_cross_refs_fast_path
1474 .fetch_add(1, Ordering::Relaxed);
1475 return Ok(());
1476 }
1477
1478 if needs_forward_cleanup {
1479 let mut forward = self.cross_refs.write();
1480 for id in &id_set {
1481 forward.remove(id);
1482 }
1483 }
1484
1485 if needs_reverse_cleanup || needs_forward_cleanup {
1486 let mut reverse = self.reverse_refs.write();
1490 if needs_forward_cleanup {
1491 for refs in reverse.values_mut() {
1492 refs.retain(|(source, _, _)| !id_set.contains(source));
1493 }
1494 }
1495 reverse.retain(|target, refs| !id_set.contains(target) && !refs.is_empty());
1496 }
1497 self.mark_paged_registry_dirty();
1498
1499 Ok(())
1500 }
1501
1502 pub fn unindex_cross_refs_fast_path_hits(&self) -> u64 {
1506 self.unindex_cross_refs_fast_path.load(Ordering::Relaxed)
1507 }
1508
1509 pub fn query_all<F>(&self, filter: F) -> Vec<(String, UnifiedEntity)>
1511 where
1512 F: Fn(&UnifiedEntity) -> bool + Clone + Send + Sync,
1513 {
1514 let pairs: Vec<(String, Arc<SegmentManager>)> = {
1525 let collections = self.collections.read();
1526 collections
1527 .iter()
1528 .map(|(name, mgr)| (name.clone(), Arc::clone(mgr)))
1529 .collect()
1530 };
1531
1532 let use_parallel = pairs.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1533 if !use_parallel {
1534 return pairs
1536 .into_iter()
1537 .flat_map(|(name, mgr)| {
1538 mgr.query_all(filter.clone())
1539 .into_iter()
1540 .map(move |e| (name.clone(), e))
1541 })
1542 .collect();
1543 }
1544
1545 let filter_ref = &filter;
1547 let collection_results: Vec<Vec<(String, UnifiedEntity)>> = std::thread::scope(|s| {
1548 pairs
1549 .iter()
1550 .map(|(name, manager)| {
1551 let name = (*name).clone();
1552 s.spawn(move || {
1553 manager
1554 .query_all(|e| filter_ref(e))
1555 .into_iter()
1556 .map(|e| (name.clone(), e))
1557 .collect::<Vec<_>>()
1558 })
1559 })
1560 .collect::<Vec<_>>()
1561 .into_iter()
1562 .map(|h| h.join().unwrap_or_default())
1563 .collect()
1564 });
1565
1566 collection_results.into_iter().flatten().collect()
1567 }
1568
1569 pub fn filter_metadata_all(
1571 &self,
1572 filters: &[(String, MetadataFilter)],
1573 ) -> Vec<(String, EntityId)> {
1574 let mut results = Vec::new();
1575 let collections = self.collections.read();
1576
1577 for (name, manager) in collections.iter() {
1578 for id in manager.filter_metadata(filters) {
1579 results.push((name.clone(), id));
1580 }
1581 }
1582
1583 results
1584 }
1585
1586 pub fn stats(&self) -> StoreStats {
1588 let pairs: Vec<(String, Arc<SegmentManager>)> = {
1594 let collections = self.collections.read();
1595 collections
1596 .iter()
1597 .map(|(name, mgr)| (name.clone(), Arc::clone(mgr)))
1598 .collect()
1599 };
1600
1601 let mut stats = StoreStats {
1602 collection_count: pairs.len(),
1603 ..Default::default()
1604 };
1605
1606 for (name, manager) in &pairs {
1607 let manager_stats = manager.stats();
1608 stats.total_entities += manager_stats.total_entities;
1609 stats.total_memory_bytes += manager_stats.total_memory_bytes;
1610 stats.collections.insert(name.clone(), manager_stats);
1611 }
1612
1613 stats
1614 }
1615
1616 pub fn run_maintenance(&self) -> Result<(), StoreError> {
1618 let collections = self.collections.read();
1619 for manager in collections.values() {
1620 manager.run_maintenance()?;
1621 }
1622 Ok(())
1623 }
1624}
1625
1626fn flatten_config_json(
1628 prefix: &str,
1629 value: &crate::serde_json::Value,
1630 out: &mut Vec<(String, crate::storage::schema::Value)>,
1631) {
1632 use crate::storage::schema::Value;
1633 match value {
1634 crate::serde_json::Value::Object(map) => {
1635 for (k, v) in map {
1636 let key = if prefix.is_empty() {
1637 k.clone()
1638 } else {
1639 format!("{prefix}.{k}")
1640 };
1641 flatten_config_json(&key, v, out);
1642 }
1643 }
1644 crate::serde_json::Value::String(s) => {
1645 out.push((prefix.to_string(), Value::text(s.clone())));
1646 }
1647 crate::serde_json::Value::Number(n) => {
1648 if n.fract().abs() < f64::EPSILON {
1649 out.push((prefix.to_string(), Value::UnsignedInteger(*n as u64)));
1650 } else {
1651 out.push((prefix.to_string(), Value::Float(*n)));
1652 }
1653 }
1654 crate::serde_json::Value::Bool(b) => {
1655 out.push((prefix.to_string(), Value::Boolean(*b)));
1656 }
1657 crate::serde_json::Value::Null => {
1658 out.push((prefix.to_string(), Value::Null));
1659 }
1660 crate::serde_json::Value::Array(arr) => {
1661 let json_str = crate::serde_json::to_string(value).unwrap_or_default();
1662 out.push((prefix.to_string(), Value::text(json_str)));
1663 }
1664 }
1665}