1use super::*;
2
3impl UnifiedStore {
4 pub(crate) fn persist_entities_to_pager(
5 &self,
6 collection: &str,
7 entities: &[UnifiedEntity],
8 ) -> Result<(), StoreError> {
9 self.persist_entities_impl(collection, entities, false)
10 }
11
12 pub(crate) fn persist_entity_refs_to_pager(
16 &self,
17 collection: &str,
18 entities: &[&UnifiedEntity],
19 ) -> Result<(), StoreError> {
20 self.persist_entity_refs_impl(collection, entities, false)
21 }
22
23 pub(crate) fn persist_entity_refs_to_pager_wal_only(
26 &self,
27 collection: &str,
28 entities: &[&UnifiedEntity],
29 ) -> Result<(), StoreError> {
30 self.persist_entity_refs_impl(collection, entities, true)
31 }
32
33 fn persist_entity_refs_impl(
34 &self,
35 collection: &str,
36 entities: &[&UnifiedEntity],
37 skip_btree: bool,
38 ) -> Result<(), StoreError> {
39 if entities.is_empty() {
40 return Ok(());
41 }
42 if self.pager.is_none() && self.config.embedded_wal_path.is_none() {
43 return Ok(());
44 }
45 let fv = self.format_version();
46 let manager = self
47 .get_collection(collection)
48 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
49 let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
50 .iter()
51 .map(|entity| {
52 let metadata = manager.get_metadata(entity.id);
53 (
54 entity.id.raw().to_be_bytes().to_vec(),
55 Self::serialize_entity_record(entity, metadata.as_ref(), fv),
56 )
57 })
58 .collect();
59 serialized.sort_by(|a, b| a.0.cmp(&b.0));
60
61 if !skip_btree {
62 let Some(pager) = &self.pager else {
63 let records: Vec<Vec<u8>> =
64 serialized.into_iter().map(|(_id, record)| record).collect();
65 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
66 collection: collection.to_string(),
67 records,
68 }])?;
69 return Ok(());
70 };
71 let mut btree_indices = self.btree_indices.write();
72 let btree = btree_indices
73 .entry(collection.to_string())
74 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
75 let root_before = btree.root_page_id();
76 btree.upsert_batch_sorted(&serialized).map_err(|e| {
77 StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
78 })?;
79 let root_after = btree.root_page_id();
80 drop(btree_indices);
81 if root_before != root_after {
82 self.mark_paged_registry_dirty();
83 }
84 }
85 let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
86 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
87 collection: collection.to_string(),
88 records,
89 }])?;
90 Ok(())
91 }
92
93 pub(crate) fn persist_entities_to_pager_wal_only(
105 &self,
106 collection: &str,
107 entities: &[UnifiedEntity],
108 ) -> Result<(), StoreError> {
109 self.persist_entities_impl(collection, entities, true)
110 }
111
112 fn persist_entities_impl(
113 &self,
114 collection: &str,
115 entities: &[UnifiedEntity],
116 skip_btree: bool,
117 ) -> Result<(), StoreError> {
118 if entities.is_empty() {
119 return Ok(());
120 }
121
122 if self.pager.is_none() && self.config.embedded_wal_path.is_none() {
123 return Ok(());
124 }
125
126 let fv = self.format_version();
127 let manager = self
128 .get_collection(collection)
129 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
130 let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
131 .iter()
132 .map(|entity| {
133 let metadata = manager.get_metadata(entity.id);
134 (
135 entity.id.raw().to_be_bytes().to_vec(),
136 Self::serialize_entity_record(entity, metadata.as_ref(), fv),
137 )
138 })
139 .collect();
140 serialized.sort_by(|a, b| a.0.cmp(&b.0));
142
143 if !skip_btree {
144 let Some(pager) = &self.pager else {
145 let records: Vec<Vec<u8>> =
146 serialized.into_iter().map(|(_id, record)| record).collect();
147 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
148 collection: collection.to_string(),
149 records,
150 }])?;
151 return Ok(());
152 };
153 let mut btree_indices = self.btree_indices.write();
154 let btree = btree_indices
155 .entry(collection.to_string())
156 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
157 let root_before = btree.root_page_id();
158
159 btree.upsert_batch_sorted(&serialized).map_err(|e| {
163 StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
164 })?;
165 let root_after = btree.root_page_id();
166 drop(btree_indices);
167 if root_before != root_after {
168 self.mark_paged_registry_dirty();
169 }
170 }
171 let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
181 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
182 collection: collection.to_string(),
183 records,
184 }])?;
185
186 Ok(())
187 }
188
189 pub(crate) fn update_graph_label_index(
191 &self,
192 collection: &str,
193 label: &str,
194 entity_id: EntityId,
195 ) {
196 let key = (collection.to_string(), label.to_string());
197 let mut idx = self.graph_label_index.write();
198 idx.entry(key).or_default().push(entity_id);
199 }
200
201 pub(crate) fn remove_from_graph_label_index(&self, collection: &str, entity_id: EntityId) {
203 let mut idx = self.graph_label_index.write();
204 for ((col, _), ids) in idx.iter_mut() {
205 if col == collection {
206 ids.retain(|&id| id != entity_id);
207 }
208 }
209 idx.retain(|_, ids| !ids.is_empty());
211 }
212
213 pub(crate) fn remove_from_graph_label_index_batch(
214 &self,
215 collection: &str,
216 entity_ids: &[EntityId],
217 ) {
218 if entity_ids.is_empty() {
219 return;
220 }
221 let id_set: std::collections::HashSet<EntityId> = entity_ids.iter().copied().collect();
222 let mut idx = self.graph_label_index.write();
223 for ((col, _), ids) in idx.iter_mut() {
224 if col == collection {
225 ids.retain(|id| !id_set.contains(id));
226 }
227 }
228 idx.retain(|_, ids| !ids.is_empty());
229 }
230
231 pub fn lookup_graph_nodes_by_label(&self, label: &str) -> Vec<EntityId> {
233 let idx = self.graph_label_index.read();
234 idx.iter()
235 .filter(|((_, l), _)| l == label)
236 .flat_map(|(_, ids)| ids.iter().copied())
237 .collect()
238 }
239
240 pub fn lookup_graph_nodes_by_label_in(&self, collection: &str, label: &str) -> Vec<EntityId> {
242 let idx = self.graph_label_index.read();
243 idx.get(&(collection.to_string(), label.to_string()))
244 .cloned()
245 .unwrap_or_default()
246 }
247
248 pub fn create_collection(&self, name: impl Into<String>) -> Result<(), StoreError> {
249 let name = name.into();
250 let mut collections = self.collections.write();
251
252 if collections.contains_key(&name) {
253 return Err(StoreError::CollectionExists(name));
254 }
255
256 let manager = SegmentManager::with_config(&name, self.config.manager_config.clone());
257 collections.insert(name.clone(), Arc::new(manager));
258 drop(collections);
259
260 if let Err(err) = self.publish_operational_collection_create(&name) {
261 let mut collections = self.collections.write();
262 collections.remove(&name);
263 return Err(err);
264 }
265
266 self.mark_paged_registry_dirty();
267 self.finish_paged_write([StoreWalAction::CreateCollection { name }])?;
268
269 Ok(())
270 }
271
272 pub fn get_or_create_collection(&self, name: impl Into<String>) -> Arc<SegmentManager> {
274 let name = name.into();
275 {
277 let collections = self.collections.read();
278 if let Some(manager) = collections.get(&name) {
279 return Arc::clone(manager);
280 }
281 }
282 let mut collections = self.collections.write();
284 if let Some(manager) = collections.get(&name) {
286 return Arc::clone(manager);
287 }
288 let manager = Arc::new(SegmentManager::with_config(
289 &name,
290 self.config.manager_config.clone(),
291 ));
292 collections.insert(name, Arc::clone(&manager));
293 self.mark_paged_registry_dirty();
294 manager
295 }
296
297 pub fn get_collection(&self, name: &str) -> Option<Arc<SegmentManager>> {
299 self.collections.read().get(name).map(Arc::clone)
300 }
301
302 pub fn context_index(&self) -> &ContextIndex {
304 &self.context_index
305 }
306
307 pub fn take_replayed_turbo_inserts(&self, collection: &str) -> Option<Vec<(u64, Vec<f32>)>> {
313 let mut map = self.replayed_turbo_inserts.lock();
314 map.remove(collection)
315 }
316
317 pub fn set_config_tree(&self, prefix: &str, json: &crate::serde_json::Value) -> usize {
320 let _ = self.get_or_create_collection("red_config");
321 let mut pairs = Vec::new();
322 flatten_config_json(prefix, json, &mut pairs);
323 let mut saved = 0;
324 for (key, value) in pairs {
325 let entity = UnifiedEntity::new(
326 EntityId::new(0),
327 EntityKind::TableRow {
328 table: Arc::from("red_config"),
329 row_id: 0,
330 },
331 EntityData::Row(RowData {
332 columns: Vec::new(),
333 named: Some(
334 [
335 ("key".to_string(), crate::storage::schema::Value::text(key)),
336 ("value".to_string(), value),
337 ]
338 .into_iter()
339 .collect(),
340 ),
341 schema: None,
342 }),
343 );
344 if self.insert_auto("red_config", entity).is_ok() {
345 saved += 1;
346 }
347 }
348 saved
349 }
350
351 pub fn get_config(&self, key: &str) -> Option<crate::storage::schema::Value> {
353 let manager = self.get_collection("red_config")?;
354 for entity in manager.query_all(|_| true) {
355 if let EntityData::Row(row) = &entity.data {
356 if let Some(named) = &row.named {
357 let key_matches = named
358 .get("key")
359 .and_then(|v| match v {
360 crate::storage::schema::Value::Text(s) => Some(s.as_ref() == key),
361 _ => None,
362 })
363 .unwrap_or(false);
364 if key_matches {
365 return named.get("value").cloned();
366 }
367 }
368 }
369 }
370 None
371 }
372
373 pub fn set_aux_metadata(&self, bytes: Vec<u8>) {
378 *self.aux_metadata.write() = bytes;
379 }
380
381 pub fn aux_metadata(&self) -> Vec<u8> {
383 self.aux_metadata.read().clone()
384 }
385
386 pub fn list_collections(&self) -> Vec<String> {
388 self.collections.read().keys().cloned().collect()
389 }
390
391 pub fn drop_collection(&self, name: &str) -> Result<(), StoreError> {
393 self.publish_operational_collection_pending_drop(name)?;
394 let manager = {
395 let mut collections = self.collections.write();
396
397 collections
398 .remove(name)
399 .ok_or_else(|| StoreError::CollectionNotFound(name.to_string()))?
400 };
401
402 let entities = manager.query_all(|_| true);
403 let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
404
405 for entity_id in &entity_ids {
406 self.context_index.remove_entity(*entity_id);
407 let _ = self.unindex_cross_refs(*entity_id);
408 }
409
410 self.btree_indices.write().remove(name);
411
412 self.entity_cache.retain(|entity_id, (collection, _)| {
413 collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
414 });
415
416 self.cross_refs.write().retain(|source_id, refs| {
417 refs.retain(|(target_id, _, target_collection)| {
418 target_collection != name && !entity_ids.iter().any(|id| id == target_id)
419 });
420 !entity_ids.iter().any(|id| id == source_id)
421 });
422
423 self.reverse_refs.write().retain(|target_id, refs| {
424 refs.retain(|(source_id, _, source_collection)| {
425 source_collection != name && !entity_ids.iter().any(|id| id == source_id)
426 });
427 !entity_ids.iter().any(|id| id == target_id)
428 });
429
430 self.mark_paged_registry_dirty();
431 self.finish_paged_write([StoreWalAction::DropCollection {
432 name: name.to_string(),
433 }])?;
434 self.publish_operational_collection_drop_finished(name)?;
435
436 Ok(())
437 }
438
439 pub fn insert(&self, collection: &str, entity: UnifiedEntity) -> Result<EntityId, StoreError> {
441 let manager = self
442 .get_collection(collection)
443 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
444
445 let mut entity = entity;
446 if entity.id.raw() == 0 {
447 entity.id = self.next_entity_id();
448 } else {
449 self.register_entity_id(entity.id);
450 }
451 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
453 if *row_id == 0 {
454 *row_id = manager.next_row_id();
455 } else {
456 manager.register_row_id(*row_id);
457 }
458 }
459 entity.ensure_table_logical_id();
460 let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
462 {
463 Some(node.label.clone())
464 } else {
465 None
466 };
467
468 let id = manager.insert(entity)?;
469 self.register_entity_id(id);
470
471 if let Some(ref label) = graph_node_label {
473 self.update_graph_label_index(collection, label, id);
474 }
475
476 let mut registry_dirty = false;
478 if let Some(pager) = &self.pager {
479 if let Some(entity) = manager.get(id) {
480 let mut btree_indices = self.btree_indices.write();
481 let btree = btree_indices
482 .entry(collection.to_string())
483 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
484 let root_before = btree.root_page_id();
485
486 let key = id.raw().to_be_bytes();
487 let metadata = manager.get_metadata(id);
488 let value = Self::serialize_entity_record(
489 &entity,
490 metadata.as_ref(),
491 self.format_version(),
492 );
493 let _ = btree.insert(&key, &value);
495 registry_dirty = root_before != btree.root_page_id();
496 }
497 }
498
499 if self.config.auto_index_refs {
501 if let Some(entity) = manager.get(id) {
502 self.index_cross_refs(&entity, collection)?;
503 }
504 }
505
506 if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
510 let actions = manager
511 .get(id)
512 .map(|entity| {
513 let metadata = manager.get_metadata(id);
514 vec![StoreWalAction::upsert_entity(
515 collection,
516 &entity,
517 metadata.as_ref(),
518 self.format_version(),
519 )]
520 })
521 .unwrap_or_default();
522 if registry_dirty {
523 self.mark_paged_registry_dirty();
524 }
525 self.finish_paged_write(actions)?;
526 }
527
528 Ok(id)
529 }
530
531 pub fn bulk_insert(
536 &self,
537 collection: &str,
538 mut entities: Vec<UnifiedEntity>,
539 ) -> Result<Vec<EntityId>, StoreError> {
540 let trace = matches!(
544 std::env::var("REDDB_BULK_TIMING").ok().as_deref(),
545 Some("1") | Some("true") | Some("on")
546 );
547 let t_start = std::time::Instant::now();
548 let n = entities.len();
549 let manager = self.get_or_create_collection(collection);
550 let t_get_coll = t_start.elapsed();
551
552 let t0 = std::time::Instant::now();
561 let n_missing_entity_ids = entities.iter().filter(|e| e.id.raw() == 0).count() as u64;
562 let n_missing_row_ids = entities
563 .iter()
564 .filter(|e| matches!(e.kind, EntityKind::TableRow { row_id: 0, .. }))
565 .count() as u64;
566 let mut entity_id_range = if n_missing_entity_ids > 0 {
567 self.reserve_entity_ids(n_missing_entity_ids)
568 } else {
569 0..0
570 };
571 let mut row_id_range = if n_missing_row_ids > 0 {
572 manager.reserve_row_ids(n_missing_row_ids)
573 } else {
574 0..0
575 };
576 for entity in &mut entities {
577 if entity.id.raw() == 0 {
578 let next = entity_id_range
579 .next()
580 .expect("reserved entity-id range exhausted");
581 entity.id = EntityId::new(next);
582 } else {
583 self.register_entity_id(entity.id);
584 }
585 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
586 if *row_id == 0 {
587 *row_id = row_id_range
588 .next()
589 .expect("reserved row-id range exhausted");
590 } else {
591 manager.register_row_id(*row_id);
592 }
593 }
594 entity.ensure_table_logical_id();
595 }
596 let t_assign_ids = t0.elapsed();
597
598 let graph_labels: Vec<Option<(String, EntityId)>> = entities
600 .iter()
601 .map(|e| {
602 if let EntityKind::GraphNode(ref node) = e.kind {
603 Some((node.label.clone(), e.id))
604 } else {
605 None
606 }
607 })
608 .collect();
609
610 let t0 = std::time::Instant::now();
617 let serialized: Option<Vec<(Vec<u8>, Vec<u8>)>> =
618 if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
619 let fv = self.format_version();
620 let serial_map = |e: &UnifiedEntity| {
621 (
622 e.id.raw().to_be_bytes().to_vec(),
623 Self::serialize_entity_record(e, None, fv),
624 )
625 };
626 if entities.len() >= 512 {
632 use rayon::prelude::*;
633 Some(entities.par_iter().map(serial_map).collect())
634 } else {
635 Some(entities.iter().map(serial_map).collect())
636 }
637 } else {
638 None
639 };
640 let t_serialize = t0.elapsed();
641
642 let t0 = std::time::Instant::now();
644 let ids = manager.bulk_insert(entities)?;
645 let t_manager = t0.elapsed();
646 for id in &ids {
647 self.register_entity_id(*id);
648 }
649
650 for (label, entity_id) in graph_labels.iter().flatten() {
652 self.update_graph_label_index(collection, label, *entity_id);
653 }
654
655 let skip_btree_requested = matches!(
665 std::env::var("REDDB_BULK_SKIP_PERSIST_UNSAFE")
666 .ok()
667 .as_deref(),
668 Some("1") | Some("true") | Some("on")
669 );
670 let skip_btree = skip_btree_requested && self.pager.is_none();
673 if skip_btree_requested && !skip_btree {
674 static IGNORED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
676 IGNORED.get_or_init(|| {
677 tracing::warn!(
678 "REDDB_BULK_SKIP_PERSIST_UNSAFE set but durable pager is \
679 active — flag ignored; bulk inserts will be persisted normally"
680 );
681 });
682 } else if skip_btree {
683 static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
685 WARNED.get_or_init(|| {
686 tracing::warn!(
687 "REDDB_BULK_SKIP_PERSIST_UNSAFE set (ephemeral/no-pager mode) — \
688 bulk inserts NOT durable; data will be lost on restart"
689 );
690 });
691 }
692
693 let mut t_btree_lock = std::time::Duration::ZERO;
697 let mut t_btree_insert = std::time::Duration::ZERO;
698 let mut t_flush = std::time::Duration::ZERO;
699 if !skip_btree {
700 if let (Some(pager), Some(batch)) = (&self.pager, serialized.as_ref()) {
701 let t0 = std::time::Instant::now();
702 let mut btree_indices = self.btree_indices.write();
703 let btree = btree_indices
704 .entry(collection.to_string())
705 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
706 let root_before = btree.root_page_id();
707 t_btree_lock = t0.elapsed();
708
709 let t0 = std::time::Instant::now();
710 let _ = btree.bulk_insert_sorted(batch);
711 t_btree_insert = t0.elapsed();
712 let registry_dirty = root_before != btree.root_page_id();
713
714 let t0 = std::time::Instant::now();
715 if registry_dirty {
716 self.mark_paged_registry_dirty();
717 }
718 t_flush = t0.elapsed();
719 }
720 }
721
722 let actions = serialized
728 .map(|batch| {
729 let records: Vec<Vec<u8>> =
730 batch.into_iter().map(|(_key, record)| record).collect();
731 vec![StoreWalAction::BulkUpsertEntityRecords {
732 collection: collection.to_string(),
733 records,
734 }]
735 })
736 .unwrap_or_default();
737 self.finish_paged_write(actions)?;
738
739 if trace {
740 tracing::debug!(
741 n,
742 total = ?t_start.elapsed(),
743 get_coll = ?t_get_coll,
744 assign = ?t_assign_ids,
745 serialize = ?t_serialize,
746 manager = ?t_manager,
747 btree_lock = ?t_btree_lock,
748 btree = ?t_btree_insert,
749 flush = ?t_flush,
750 "bulk_insert timing"
751 );
752 }
753
754 Ok(ids)
755 }
756
757 pub fn insert_auto(
759 &self,
760 collection: &str,
761 entity: UnifiedEntity,
762 ) -> Result<EntityId, StoreError> {
763 let manager = self.get_or_create_collection(collection);
764 let mut entity = entity;
765 if entity.id.raw() == 0 {
766 entity.id = self.next_entity_id();
767 } else {
768 self.register_entity_id(entity.id);
769 }
770 if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
772 if *row_id == 0 {
773 *row_id = manager.next_row_id();
774 } else {
775 manager.register_row_id(*row_id);
776 }
777 }
778 entity.ensure_table_logical_id();
779
780 let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
782 {
783 Some(node.label.clone())
784 } else {
785 None
786 };
787 self.context_index.index_entity(collection, &entity);
789
790 let id_for_serialize = entity.id;
805 let serialized_record: Option<Vec<u8>> =
806 if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
807 Some(Self::serialize_entity_record(
808 &entity,
809 None,
810 self.format_version(),
811 ))
812 } else {
813 None
814 };
815 if self.config.auto_index_refs {
816 self.index_cross_refs(&entity, collection)?;
817 }
818
819 let id = manager.insert(entity)?;
820 debug_assert_eq!(id, id_for_serialize);
821 if let Some(ref label) = graph_node_label {
829 self.update_graph_label_index(collection, label, id);
830 }
831
832 let mut registry_dirty = false;
833 if let (Some(_pager), Some(record)) = (&self.pager, serialized_record.as_ref()) {
834 if let Some(btree) = self.get_or_create_btree(collection) {
835 let root_before = btree.root_page_id();
836
837 let key = id.raw().to_be_bytes();
838 btree.insert(&key, record).map_err(|e| {
839 StoreError::Io(std::io::Error::other(format!(
840 "B-tree insert error while inserting '{collection}'/{id}: {e}"
841 )))
842 })?;
843 registry_dirty = root_before != btree.root_page_id();
844 }
845 }
846
847 if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
851 let actions = serialized_record
852 .map(|record| {
853 vec![StoreWalAction::UpsertEntityRecord {
854 collection: collection.to_string(),
855 record,
856 }]
857 })
858 .unwrap_or_default();
859 if registry_dirty {
860 self.mark_paged_registry_dirty();
861 }
862 self.finish_paged_write(actions)?;
863 }
864 Ok(id)
865 }
866
867 pub fn get(&self, collection: &str, id: EntityId) -> Option<UnifiedEntity> {
873 if let Some(entity) = self
875 .get_collection(collection)
876 .and_then(|manager| manager.get(id))
877 {
878 return Some(entity);
879 }
880
881 if self.pager.is_some() {
883 let btree_indices = self.btree_indices.read();
884 if let Some(btree) = btree_indices.get(collection) {
885 let key = id.raw().to_be_bytes();
886 if let Ok(Some(value)) = btree.get(&key) {
887 if let Ok((entity, _)) =
888 Self::deserialize_entity_record(&value, self.format_version())
889 {
890 return Some(entity);
891 }
892 }
893 }
894 }
895
896 None
897 }
898
899 pub fn get_table_row_by_logical_id(
905 &self,
906 collection: &str,
907 logical_id: EntityId,
908 ) -> Option<UnifiedEntity> {
909 if let Some(entity) = self.get(collection, logical_id) {
910 if matches!(entity.kind, EntityKind::TableRow { .. })
911 && entity.logical_id() == logical_id
912 && entity.xmax == 0
913 {
914 return Some(entity);
915 }
916 }
917
918 let manager = self.get_collection(collection)?;
919 let mut matches = manager.query_all(|entity| {
920 matches!(entity.kind, EntityKind::TableRow { .. }) && entity.logical_id() == logical_id
921 });
922 matches
923 .iter()
924 .find(|entity| entity.xmax == 0)
925 .cloned()
926 .or_else(|| matches.pop())
927 }
928
929 pub fn table_row_versions_by_logical_id(
930 &self,
931 collection: &str,
932 logical_id: EntityId,
933 ) -> Vec<UnifiedEntity> {
934 self.get_collection(collection)
935 .map(|manager| {
936 manager.query_all(|entity| {
937 matches!(entity.kind, EntityKind::TableRow { .. })
938 && entity.logical_id() == logical_id
939 })
940 })
941 .unwrap_or_default()
942 }
943
944 pub fn vacuum_mvcc_history(
945 &self,
946 collection: &str,
947 cutoff_xid: u64,
948 ) -> Result<MvccVacuumStats, StoreError> {
949 let manager = self
950 .get_collection(collection)
951 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
952 let entities =
953 manager.query_all(|entity| matches!(entity.kind, EntityKind::TableRow { .. }));
954 let mut logical = HashMap::<EntityId, (bool, u64)>::new();
955 for entity in &entities {
956 let entry = logical.entry(entity.logical_id()).or_insert((false, 0));
957 if entity.xmax == 0 {
958 entry.0 = true;
959 }
960 entry.1 = entry.1.max(entity.xmin);
961 }
962
963 let mut stats = MvccVacuumStats::default();
964 let mut reclaim_ids = Vec::new();
965 for entity in entities {
966 if entity.xmax == 0 {
967 continue;
968 }
969 stats.scanned_versions += 1;
970 let (has_live_version, max_xmin) = logical
971 .get(&entity.logical_id())
972 .copied()
973 .unwrap_or((false, entity.xmin));
974 let is_delete_tombstone = !has_live_version && entity.xmin == max_xmin;
975 if entity.xmax < cutoff_xid {
976 stats.reclaimed_versions += 1;
977 if is_delete_tombstone {
978 stats.reclaimed_tombstones += 1;
979 } else {
980 stats.reclaimed_history_versions += 1;
981 }
982 reclaim_ids.push(entity.id);
983 } else {
984 stats.retained_versions += 1;
985 if is_delete_tombstone {
986 stats.retained_tombstones += 1;
987 } else {
988 stats.retained_history_versions += 1;
989 }
990 }
991 }
992
993 if !reclaim_ids.is_empty() {
994 self.delete_batch(collection, &reclaim_ids)?;
995 }
996 Ok(stats)
997 }
998
999 pub(crate) fn install_versioned_table_row_update(
1000 &self,
1001 collection: &str,
1002 old_version: UnifiedEntity,
1003 mut new_version: UnifiedEntity,
1004 metadata: Option<&Metadata>,
1005 ) -> Result<(), StoreError> {
1006 let manager = self
1007 .get_collection(collection)
1008 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1009
1010 let old_id = old_version.id;
1011 let new_id = new_version.id;
1012 let inherited_metadata = metadata.cloned().or_else(|| manager.get_metadata(old_id));
1013
1014 self.entity_cache.remove(old_id.raw());
1015 self.entity_cache.remove(new_id.raw());
1016 manager.update(old_version.clone())?;
1017 self.context_index.remove_entity(old_id);
1018
1019 self.register_entity_id(new_version.id);
1020 if let EntityKind::TableRow { ref mut row_id, .. } = new_version.kind {
1021 if *row_id == 0 {
1022 *row_id = manager.next_row_id();
1023 } else {
1024 manager.register_row_id(*row_id);
1025 }
1026 }
1027 new_version.ensure_table_logical_id();
1028 manager.insert(new_version.clone())?;
1029 if let Some(metadata) = inherited_metadata {
1030 manager.set_metadata(new_id, metadata)?;
1031 }
1032 self.context_index.index_entity(collection, &new_version);
1033 if self.config.auto_index_refs {
1034 self.index_cross_refs(&new_version, collection)?;
1035 }
1036
1037 let old_metadata = manager.get_metadata(old_id);
1038 let new_metadata = manager.get_metadata(new_id);
1039 let fv = self.format_version();
1040 let records = vec![
1041 Self::serialize_entity_record(&old_version, old_metadata.as_ref(), fv),
1042 Self::serialize_entity_record(&new_version, new_metadata.as_ref(), fv),
1043 ];
1044 self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
1045 collection: collection.to_string(),
1046 records,
1047 }])?;
1048
1049 Ok(())
1050 }
1051
1052 pub fn get_batch(&self, collection: &str, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1058 match self.get_collection(collection) {
1059 Some(manager) => manager.get_many(ids),
1060 None => vec![None; ids.len()],
1061 }
1062 }
1063
1064 pub fn get_any(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1066 if let Some(cached) = self.entity_cache.get(id.raw()) {
1070 return Some(cached);
1071 }
1072
1073 let collections = self.collections.read();
1075 for (name, manager) in collections.iter() {
1076 if let Some(entity) = manager.get(id) {
1077 let result = (name.clone(), entity);
1078 drop(collections);
1082 self.entity_cache.insert(id.raw(), result.clone());
1083 return Some(result);
1084 }
1085 }
1086 None
1087 }
1088
1089 pub fn entity_cache_hit_rate(&self) -> Option<f64> {
1095 self.entity_cache.hit_rate()
1096 }
1097
1098 pub fn entity_cache_stats(&self) -> super::super::entity_cache::EntityCacheStats {
1100 self.entity_cache.stats()
1101 }
1102
1103 pub fn delete(&self, collection: &str, id: EntityId) -> Result<bool, StoreError> {
1105 self.entity_cache.remove(id.raw());
1107 let manager = self
1108 .get_collection(collection)
1109 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1110
1111 let deleted = manager.delete(id)?;
1112 if !deleted {
1113 return Ok(false);
1114 }
1115
1116 let mut registry_dirty = false;
1118 if self.pager.is_some() {
1119 let btree_indices = self.btree_indices.read();
1120 if let Some(btree) = btree_indices.get(collection) {
1121 let root_before = btree.root_page_id();
1122 let key = id.raw().to_be_bytes();
1123 let _ = btree.delete(&key);
1124 registry_dirty = root_before != btree.root_page_id();
1125 }
1126 }
1127
1128 self.unindex_cross_refs(id)?;
1130
1131 self.remove_from_graph_label_index(collection, id);
1133
1134 if registry_dirty {
1135 self.mark_paged_registry_dirty();
1136 }
1137 self.finish_paged_write([StoreWalAction::DeleteEntityRecord {
1138 collection: collection.to_string(),
1139 entity_id: id.raw(),
1140 }])?;
1141
1142 Ok(true)
1143 }
1144
1145 pub fn delete_batch(
1146 &self,
1147 collection: &str,
1148 ids: &[EntityId],
1149 ) -> Result<Vec<EntityId>, StoreError> {
1150 if ids.is_empty() {
1151 return Ok(Vec::new());
1152 }
1153
1154 self.entity_cache.remove_many(ids.iter().map(|id| id.raw()));
1159
1160 let manager = self
1161 .get_collection(collection)
1162 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1163
1164 let deleted_ids = manager.delete_batch(ids)?;
1165 if deleted_ids.is_empty() {
1166 return Ok(deleted_ids);
1167 }
1168
1169 let mut registry_dirty = false;
1170 if self.pager.is_some() {
1171 let btree_indices = self.btree_indices.read();
1172 if let Some(btree) = btree_indices.get(collection) {
1173 let root_before = btree.root_page_id();
1174 for id in &deleted_ids {
1175 let key = id.raw().to_be_bytes();
1176 let _ = btree.delete(&key);
1177 }
1178 registry_dirty = root_before != btree.root_page_id();
1179 }
1180 }
1181
1182 self.unindex_cross_refs_batch(&deleted_ids)?;
1183 self.remove_from_graph_label_index_batch(collection, &deleted_ids);
1184 if registry_dirty {
1185 self.mark_paged_registry_dirty();
1186 }
1187 let actions = deleted_ids
1188 .iter()
1189 .map(|id| StoreWalAction::DeleteEntityRecord {
1190 collection: collection.to_string(),
1191 entity_id: id.raw(),
1192 })
1193 .collect::<Vec<_>>();
1194 self.finish_paged_write(actions)?;
1195
1196 Ok(deleted_ids)
1197 }
1198
1199 pub fn set_metadata(
1201 &self,
1202 collection: &str,
1203 id: EntityId,
1204 metadata: Metadata,
1205 ) -> Result<(), StoreError> {
1206 let manager = self
1207 .get_collection(collection)
1208 .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1209
1210 manager.set_metadata(id, metadata)?;
1211 if let Some(entity) = manager.get(id) {
1212 self.persist_entities_to_pager(collection, std::slice::from_ref(&entity))?;
1213 }
1214 Ok(())
1215 }
1216
1217 pub fn get_metadata(&self, collection: &str, id: EntityId) -> Option<Metadata> {
1219 self.get_collection(collection)?.get_metadata(id)
1220 }
1221
1222 pub fn add_cross_ref(
1224 &self,
1225 source_collection: &str,
1226 source_id: EntityId,
1227 target_collection: &str,
1228 target_id: EntityId,
1229 ref_type: RefType,
1230 weight: f32,
1231 ) -> Result<(), StoreError> {
1232 let source_manager = self
1234 .get_collection(source_collection)
1235 .ok_or_else(|| StoreError::CollectionNotFound(source_collection.to_string()))?;
1236
1237 if source_manager.get(source_id).is_none() {
1238 return Err(StoreError::EntityNotFound(source_id));
1239 }
1240
1241 let target_manager = self
1243 .get_collection(target_collection)
1244 .ok_or_else(|| StoreError::CollectionNotFound(target_collection.to_string()))?;
1245
1246 if target_manager.get(target_id).is_none() {
1247 return Err(StoreError::EntityNotFound(target_id));
1248 }
1249
1250 let current_refs = self
1252 .cross_refs
1253 .read()
1254 .get(&source_id)
1255 .map_or(0, |v| v.len());
1256
1257 if current_refs >= self.config.max_cross_refs {
1258 return Err(StoreError::TooManyRefs(source_id));
1259 }
1260
1261 let mut registry_dirty = false;
1262 {
1263 let mut forward = self.cross_refs.write();
1264 let refs = forward.entry(source_id).or_default();
1265 let inserted = !refs.iter().any(|(id, kind, coll)| {
1266 *id == target_id && *kind == ref_type && coll == target_collection
1267 });
1268 if inserted {
1269 refs.push((target_id, ref_type, target_collection.to_string()));
1270 registry_dirty = true;
1271 }
1272 }
1273
1274 {
1275 let mut reverse = self.reverse_refs.write();
1276 let refs = reverse.entry(target_id).or_default();
1277 let inserted = !refs.iter().any(|(id, kind, coll)| {
1278 *id == source_id && *kind == ref_type && coll == source_collection
1279 });
1280 if inserted {
1281 refs.push((source_id, ref_type, source_collection.to_string()));
1282 registry_dirty = true;
1283 }
1284 }
1285
1286 if let Some(mut entity) = source_manager.get(source_id) {
1287 if !entity.cross_refs().iter().any(|xref| {
1288 xref.target == target_id
1289 && xref.ref_type == ref_type
1290 && xref.target_collection == target_collection
1291 }) {
1292 let cross_ref = CrossRef::with_weight(
1293 source_id,
1294 target_id,
1295 target_collection,
1296 ref_type,
1297 weight,
1298 );
1299 entity.add_cross_ref(cross_ref);
1300 let _ = source_manager.update(entity.clone());
1301 registry_dirty = true;
1302 self.persist_entities_to_pager(source_collection, std::slice::from_ref(&entity))?;
1303 }
1304 }
1305
1306 if registry_dirty {
1307 self.mark_paged_registry_dirty();
1308 if matches!(
1309 self.config.durability_mode,
1310 crate::api::DurabilityMode::Strict
1311 ) {
1312 self.flush_paged_state()?;
1313 }
1314 }
1315
1316 Ok(())
1317 }
1318
1319 pub fn get_refs_from(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1321 self.cross_refs.read().get(&id).cloned().unwrap_or_default()
1322 }
1323
1324 pub fn get_refs_to(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1326 self.reverse_refs
1327 .read()
1328 .get(&id)
1329 .cloned()
1330 .unwrap_or_default()
1331 }
1332
1333 pub fn expand_refs(
1335 &self,
1336 id: EntityId,
1337 depth: u32,
1338 ref_types: Option<&[RefType]>,
1339 ) -> Vec<(UnifiedEntity, u32, RefType)> {
1340 let mut results = Vec::new();
1341 let mut visited = std::collections::HashSet::new();
1342 visited.insert(id);
1343
1344 self.expand_refs_recursive(id, depth, ref_types, &mut visited, &mut results, 1);
1345
1346 results
1347 }
1348
1349 fn expand_refs_recursive(
1350 &self,
1351 id: EntityId,
1352 max_depth: u32,
1353 ref_types: Option<&[RefType]>,
1354 visited: &mut std::collections::HashSet<EntityId>,
1355 results: &mut Vec<(UnifiedEntity, u32, RefType)>,
1356 current_depth: u32,
1357 ) {
1358 if current_depth > max_depth {
1359 return;
1360 }
1361
1362 for (target_id, ref_type, target_collection) in self.get_refs_from(id) {
1363 if visited.contains(&target_id) {
1364 continue;
1365 }
1366
1367 if let Some(types) = ref_types {
1368 if !types.contains(&ref_type) {
1369 continue;
1370 }
1371 }
1372
1373 visited.insert(target_id);
1374
1375 if let Some(entity) = self.get(&target_collection, target_id) {
1376 results.push((entity, current_depth, ref_type));
1377
1378 self.expand_refs_recursive(
1380 target_id,
1381 max_depth,
1382 ref_types,
1383 visited,
1384 results,
1385 current_depth + 1,
1386 );
1387 }
1388 }
1389 }
1390
1391 pub(crate) fn index_cross_refs(
1393 &self,
1394 entity: &UnifiedEntity,
1395 collection: &str,
1396 ) -> Result<(), StoreError> {
1397 let mut registry_dirty = false;
1398 for cross_ref in entity.cross_refs() {
1399 if cross_ref.target_collection.is_empty() {
1400 continue;
1401 }
1402 {
1403 let mut forward = self.cross_refs.write();
1404 let refs = forward.entry(cross_ref.source).or_default();
1405 let inserted = !refs.iter().any(|(id, kind, coll)| {
1406 *id == cross_ref.target
1407 && *kind == cross_ref.ref_type
1408 && coll == &cross_ref.target_collection
1409 });
1410 if inserted {
1411 refs.push((
1412 cross_ref.target,
1413 cross_ref.ref_type,
1414 cross_ref.target_collection.clone(),
1415 ));
1416 registry_dirty = true;
1417 }
1418 }
1419
1420 {
1421 let mut reverse = self.reverse_refs.write();
1422 let refs = reverse.entry(cross_ref.target).or_default();
1423 let inserted = !refs.iter().any(|(id, kind, coll)| {
1424 *id == cross_ref.source && *kind == cross_ref.ref_type && coll == collection
1425 });
1426 if inserted {
1427 refs.push((cross_ref.source, cross_ref.ref_type, collection.to_string()));
1428 registry_dirty = true;
1429 }
1430 }
1431 }
1432
1433 if registry_dirty {
1434 self.mark_paged_registry_dirty();
1435 }
1436
1437 Ok(())
1438 }
1439
1440 pub(crate) fn unindex_cross_refs(&self, id: EntityId) -> Result<(), StoreError> {
1442 self.cross_refs.write().remove(&id);
1444
1445 let mut reverse = self.reverse_refs.write();
1447 for refs in reverse.values_mut() {
1448 refs.retain(|(source, _, _)| *source != id);
1449 }
1450 reverse.remove(&id);
1451 self.mark_paged_registry_dirty();
1452
1453 Ok(())
1454 }
1455
1456 pub(crate) fn unindex_cross_refs_batch(&self, ids: &[EntityId]) -> Result<(), StoreError> {
1457 if ids.is_empty() {
1458 return Ok(());
1459 }
1460
1461 let id_set: std::collections::HashSet<EntityId> = ids.iter().copied().collect();
1462
1463 let needs_forward_cleanup = {
1477 let forward = self.cross_refs.read();
1478 id_set.iter().any(|id| forward.contains_key(id))
1479 };
1480 let needs_reverse_cleanup = {
1481 let reverse = self.reverse_refs.read();
1482 id_set.iter().any(|id| reverse.contains_key(id))
1483 };
1484
1485 if !needs_forward_cleanup && !needs_reverse_cleanup {
1486 self.unindex_cross_refs_fast_path
1487 .fetch_add(1, Ordering::Relaxed);
1488 return Ok(());
1489 }
1490
1491 if needs_forward_cleanup {
1492 let mut forward = self.cross_refs.write();
1493 for id in &id_set {
1494 forward.remove(id);
1495 }
1496 }
1497
1498 if needs_reverse_cleanup || needs_forward_cleanup {
1499 let mut reverse = self.reverse_refs.write();
1503 if needs_forward_cleanup {
1504 for refs in reverse.values_mut() {
1505 refs.retain(|(source, _, _)| !id_set.contains(source));
1506 }
1507 }
1508 reverse.retain(|target, refs| !id_set.contains(target) && !refs.is_empty());
1509 }
1510 self.mark_paged_registry_dirty();
1511
1512 Ok(())
1513 }
1514
1515 pub fn unindex_cross_refs_fast_path_hits(&self) -> u64 {
1519 self.unindex_cross_refs_fast_path.load(Ordering::Relaxed)
1520 }
1521
1522 pub fn query_all<F>(&self, filter: F) -> Vec<(String, UnifiedEntity)>
1524 where
1525 F: Fn(&UnifiedEntity) -> bool + Clone + Send + Sync,
1526 {
1527 let pairs: Vec<(String, Arc<SegmentManager>)> = {
1538 let collections = self.collections.read();
1539 collections
1540 .iter()
1541 .map(|(name, mgr)| (name.clone(), Arc::clone(mgr)))
1542 .collect()
1543 };
1544
1545 let use_parallel = pairs.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1546 if !use_parallel {
1547 return pairs
1549 .into_iter()
1550 .flat_map(|(name, mgr)| {
1551 mgr.query_all(filter.clone())
1552 .into_iter()
1553 .map(move |e| (name.clone(), e))
1554 })
1555 .collect();
1556 }
1557
1558 let filter_ref = &filter;
1560 let collection_results: Vec<Vec<(String, UnifiedEntity)>> = std::thread::scope(|s| {
1561 pairs
1562 .iter()
1563 .map(|(name, manager)| {
1564 let name = (*name).clone();
1565 s.spawn(move || {
1566 manager
1567 .query_all(|e| filter_ref(e))
1568 .into_iter()
1569 .map(|e| (name.clone(), e))
1570 .collect::<Vec<_>>()
1571 })
1572 })
1573 .collect::<Vec<_>>()
1574 .into_iter()
1575 .map(|h| h.join().unwrap_or_default())
1576 .collect()
1577 });
1578
1579 collection_results.into_iter().flatten().collect()
1580 }
1581
1582 pub fn filter_metadata_all(
1584 &self,
1585 filters: &[(String, MetadataFilter)],
1586 ) -> Vec<(String, EntityId)> {
1587 let mut results = Vec::new();
1588 let collections = self.collections.read();
1589
1590 for (name, manager) in collections.iter() {
1591 for id in manager.filter_metadata(filters) {
1592 results.push((name.clone(), id));
1593 }
1594 }
1595
1596 results
1597 }
1598
1599 pub fn stats(&self) -> StoreStats {
1601 let pairs: Vec<(String, Arc<SegmentManager>)> = {
1607 let collections = self.collections.read();
1608 collections
1609 .iter()
1610 .map(|(name, mgr)| (name.clone(), Arc::clone(mgr)))
1611 .collect()
1612 };
1613
1614 let mut stats = StoreStats {
1615 collection_count: pairs.len(),
1616 ..Default::default()
1617 };
1618
1619 for (name, manager) in &pairs {
1620 let manager_stats = manager.stats();
1621 stats.total_entities += manager_stats.total_entities;
1622 stats.total_memory_bytes += manager_stats.total_memory_bytes;
1623 stats.collections.insert(name.clone(), manager_stats);
1624 }
1625
1626 stats
1627 }
1628
1629 pub fn run_maintenance(&self) -> Result<(), StoreError> {
1631 let collections = self.collections.read();
1632 for manager in collections.values() {
1633 manager.run_maintenance()?;
1634 }
1635 Ok(())
1636 }
1637}
1638
1639fn flatten_config_json(
1641 prefix: &str,
1642 value: &crate::serde_json::Value,
1643 out: &mut Vec<(String, crate::storage::schema::Value)>,
1644) {
1645 use crate::storage::schema::Value;
1646 match value {
1647 crate::serde_json::Value::Object(map) => {
1648 for (k, v) in map {
1649 let key = if prefix.is_empty() {
1650 k.clone()
1651 } else {
1652 format!("{prefix}.{k}")
1653 };
1654 flatten_config_json(&key, v, out);
1655 }
1656 }
1657 crate::serde_json::Value::String(s) => {
1658 out.push((prefix.to_string(), Value::text(s.clone())));
1659 }
1660 crate::serde_json::Value::Number(n) => {
1661 if n.fract().abs() < f64::EPSILON {
1662 out.push((prefix.to_string(), Value::UnsignedInteger(*n as u64)));
1663 } else {
1664 out.push((prefix.to_string(), Value::Float(*n)));
1665 }
1666 }
1667 crate::serde_json::Value::Bool(b) => {
1668 out.push((prefix.to_string(), Value::Boolean(*b)));
1669 }
1670 crate::serde_json::Value::Null => {
1671 out.push((prefix.to_string(), Value::Null));
1672 }
1673 crate::serde_json::Value::Array(arr) => {
1674 let json_str = crate::serde_json::to_string(value).unwrap_or_default();
1675 out.push((prefix.to_string(), Value::text(json_str)));
1676 }
1677 }
1678}