Skip to main content

reddb_server/storage/unified/store/
impl_entities.rs

1use super::*;
2
3impl UnifiedStore {
4    pub(crate) fn persist_entities_to_pager(
5        &self,
6        collection: &str,
7        entities: &[UnifiedEntity],
8    ) -> Result<(), StoreError> {
9        self.persist_entities_impl(collection, entities, /* skip_btree= */ false)
10    }
11
12    /// Same as `persist_entities_to_pager` but takes `&[&UnifiedEntity]` so
13    /// callers that already hold references (SQL UPDATE fast path) don't
14    /// have to clone a `Vec<UnifiedEntity>` just to satisfy the signature.
15    pub(crate) fn persist_entity_refs_to_pager(
16        &self,
17        collection: &str,
18        entities: &[&UnifiedEntity],
19    ) -> Result<(), StoreError> {
20        self.persist_entity_refs_impl(collection, entities, /* skip_btree= */ false)
21    }
22
23    /// Reference-taking WAL-only variant — see
24    /// `persist_entities_to_pager_wal_only` for semantics.
25    pub(crate) fn persist_entity_refs_to_pager_wal_only(
26        &self,
27        collection: &str,
28        entities: &[&UnifiedEntity],
29    ) -> Result<(), StoreError> {
30        self.persist_entity_refs_impl(collection, entities, /* skip_btree= */ true)
31    }
32
33    fn persist_entity_refs_impl(
34        &self,
35        collection: &str,
36        entities: &[&UnifiedEntity],
37        skip_btree: bool,
38    ) -> Result<(), StoreError> {
39        if entities.is_empty() {
40            return Ok(());
41        }
42        let Some(pager) = &self.pager else {
43            return Ok(());
44        };
45        let fv = self.format_version();
46        let manager = self
47            .get_collection(collection)
48            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
49        let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
50            .iter()
51            .map(|entity| {
52                let metadata = manager.get_metadata(entity.id);
53                (
54                    entity.id.raw().to_be_bytes().to_vec(),
55                    Self::serialize_entity_record(entity, metadata.as_ref(), fv),
56                )
57            })
58            .collect();
59        serialized.sort_by(|a, b| a.0.cmp(&b.0));
60
61        if !skip_btree {
62            let mut btree_indices = self.btree_indices.write();
63            let btree = btree_indices
64                .entry(collection.to_string())
65                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
66            let root_before = btree.root_page_id();
67            btree.upsert_batch_sorted(&serialized).map_err(|e| {
68                StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
69            })?;
70            let root_after = btree.root_page_id();
71            drop(btree_indices);
72            if root_before != root_after {
73                self.mark_paged_registry_dirty();
74            }
75        }
76        let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
77        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
78            collection: collection.to_string(),
79            records,
80        }])?;
81        Ok(())
82    }
83
84    /// PG-HOT-like UPDATE persist: emit the WAL record so the update
85    /// survives a crash, but skip the in-line B-tree upsert. Reads
86    /// continue to see the new entity because `manager.get()` prefers
87    /// the segment over the B-tree; WAL replay re-applies the upsert
88    /// to the B-tree on recovery so the two views converge before
89    /// any stale B-tree page can be surfaced.
90    ///
91    /// Safe to use only when the caller has already updated the
92    /// segment in-place (so live reads see the new value). Cuts out
93    /// the dominant cost of single-row UPDATEs: per-entity serialize
94    /// + leaf descent + page decompress / compress / write_page.
95    pub(crate) fn persist_entities_to_pager_wal_only(
96        &self,
97        collection: &str,
98        entities: &[UnifiedEntity],
99    ) -> Result<(), StoreError> {
100        self.persist_entities_impl(collection, entities, /* skip_btree= */ true)
101    }
102
103    fn persist_entities_impl(
104        &self,
105        collection: &str,
106        entities: &[UnifiedEntity],
107        skip_btree: bool,
108    ) -> Result<(), StoreError> {
109        if entities.is_empty() {
110            return Ok(());
111        }
112
113        let Some(pager) = &self.pager else {
114            return Ok(());
115        };
116
117        let fv = self.format_version();
118        let manager = self
119            .get_collection(collection)
120            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
121        let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
122            .iter()
123            .map(|entity| {
124                let metadata = manager.get_metadata(entity.id);
125                (
126                    entity.id.raw().to_be_bytes().to_vec(),
127                    Self::serialize_entity_record(entity, metadata.as_ref(), fv),
128                )
129            })
130            .collect();
131        // u64 IDs encoded as big-endian — lex order = numeric order.
132        serialized.sort_by(|a, b| a.0.cmp(&b.0));
133
134        if !skip_btree {
135            let mut btree_indices = self.btree_indices.write();
136            let btree = btree_indices
137                .entry(collection.to_string())
138                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
139            let root_before = btree.root_page_id();
140
141            // Walks each distinct leaf once, applies all in-place overwrites
142            // that belong there under one read+write. Keys that miss or grow
143            // fall back to the per-key `upsert` path internally.
144            btree.upsert_batch_sorted(&serialized).map_err(|e| {
145                StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
146            })?;
147            let root_after = btree.root_page_id();
148            drop(btree_indices);
149            if root_before != root_after {
150                self.mark_paged_registry_dirty();
151            }
152        }
153        // One WAL action covering the whole bulk. Previously each entity
154        // produced its own `UpsertEntityRecord` action, which then became
155        // its own `WalRecord::PageWrite` triple inside
156        // `StoreCommitCoordinator::append_actions` — N WAL records per
157        // bulk. The batched variant is one WAL record per bulk.
158        //
159        // WAL is emitted even in the `skip_btree` variant — recovery
160        // replay reapplies the upsert to the B-tree, so durability
161        // is unchanged.
162        let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
163        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
164            collection: collection.to_string(),
165            records,
166        }])?;
167
168        Ok(())
169    }
170
171    /// Insert a label→entity_id mapping into the graph label index.
172    pub(crate) fn update_graph_label_index(
173        &self,
174        collection: &str,
175        label: &str,
176        entity_id: EntityId,
177    ) {
178        let key = (collection.to_string(), label.to_string());
179        let mut idx = self.graph_label_index.write();
180        idx.entry(key).or_default().push(entity_id);
181    }
182
183    /// Remove a specific entity_id from the graph label index (called on delete).
184    pub(crate) fn remove_from_graph_label_index(&self, collection: &str, entity_id: EntityId) {
185        let mut idx = self.graph_label_index.write();
186        for ((col, _), ids) in idx.iter_mut() {
187            if col == collection {
188                ids.retain(|&id| id != entity_id);
189            }
190        }
191        // Prune empty entries to keep the index compact
192        idx.retain(|_, ids| !ids.is_empty());
193    }
194
195    pub(crate) fn remove_from_graph_label_index_batch(
196        &self,
197        collection: &str,
198        entity_ids: &[EntityId],
199    ) {
200        if entity_ids.is_empty() {
201            return;
202        }
203        let id_set: std::collections::HashSet<EntityId> = entity_ids.iter().copied().collect();
204        let mut idx = self.graph_label_index.write();
205        for ((col, _), ids) in idx.iter_mut() {
206            if col == collection {
207                ids.retain(|id| !id_set.contains(id));
208            }
209        }
210        idx.retain(|_, ids| !ids.is_empty());
211    }
212
213    /// Look up entity IDs for a graph node label across all collections.
214    pub fn lookup_graph_nodes_by_label(&self, label: &str) -> Vec<EntityId> {
215        let idx = self.graph_label_index.read();
216        idx.iter()
217            .filter(|((_, l), _)| l == label)
218            .flat_map(|(_, ids)| ids.iter().copied())
219            .collect()
220    }
221
222    /// Look up entity IDs for a graph node label scoped to a single collection.
223    pub fn lookup_graph_nodes_by_label_in(&self, collection: &str, label: &str) -> Vec<EntityId> {
224        let idx = self.graph_label_index.read();
225        idx.get(&(collection.to_string(), label.to_string()))
226            .cloned()
227            .unwrap_or_default()
228    }
229
230    pub fn create_collection(&self, name: impl Into<String>) -> Result<(), StoreError> {
231        let name = name.into();
232        let mut collections = self.collections.write();
233
234        if collections.contains_key(&name) {
235            return Err(StoreError::CollectionExists(name));
236        }
237
238        let manager = SegmentManager::with_config(&name, self.config.manager_config.clone());
239        collections.insert(name.clone(), Arc::new(manager));
240        drop(collections);
241        self.mark_paged_registry_dirty();
242        self.finish_paged_write([StoreWalAction::CreateCollection { name }])?;
243
244        Ok(())
245    }
246
247    /// Get or create a collection
248    pub fn get_or_create_collection(&self, name: impl Into<String>) -> Arc<SegmentManager> {
249        let name = name.into();
250        // Fast path: shared read lock — zero contention for existing collections
251        {
252            let collections = self.collections.read();
253            if let Some(manager) = collections.get(&name) {
254                return Arc::clone(manager);
255            }
256        }
257        // Slow path: exclusive write lock — only when collection is missing
258        let mut collections = self.collections.write();
259        // Double-check after acquiring write lock (another thread may have created it)
260        if let Some(manager) = collections.get(&name) {
261            return Arc::clone(manager);
262        }
263        let manager = Arc::new(SegmentManager::with_config(
264            &name,
265            self.config.manager_config.clone(),
266        ));
267        collections.insert(name, Arc::clone(&manager));
268        self.mark_paged_registry_dirty();
269        manager
270    }
271
272    /// Get a collection
273    pub fn get_collection(&self, name: &str) -> Option<Arc<SegmentManager>> {
274        self.collections.read().get(name).map(Arc::clone)
275    }
276
277    /// Get the context index for cross-structure search.
278    pub fn context_index(&self) -> &ContextIndex {
279        &self.context_index
280    }
281
282    /// Set multiple config KV pairs at once from a JSON tree.
283    /// Keys are flattened with dot-notation: `{"a":{"b":1}}` → `a.b = 1`.
284    pub fn set_config_tree(&self, prefix: &str, json: &crate::serde_json::Value) -> usize {
285        let _ = self.get_or_create_collection("red_config");
286        let mut pairs = Vec::new();
287        flatten_config_json(prefix, json, &mut pairs);
288        let mut saved = 0;
289        for (key, value) in pairs {
290            let entity = UnifiedEntity::new(
291                EntityId::new(0),
292                EntityKind::TableRow {
293                    table: Arc::from("red_config"),
294                    row_id: 0,
295                },
296                EntityData::Row(RowData {
297                    columns: Vec::new(),
298                    named: Some(
299                        [
300                            ("key".to_string(), crate::storage::schema::Value::text(key)),
301                            ("value".to_string(), value),
302                        ]
303                        .into_iter()
304                        .collect(),
305                    ),
306                    schema: None,
307                }),
308            );
309            if self.insert_auto("red_config", entity).is_ok() {
310                saved += 1;
311            }
312        }
313        saved
314    }
315
316    /// Read a single config value from `red_config` by dot-notation key.
317    pub fn get_config(&self, key: &str) -> Option<crate::storage::schema::Value> {
318        let manager = self.get_collection("red_config")?;
319        for entity in manager.query_all(|_| true) {
320            if let EntityData::Row(row) = &entity.data {
321                if let Some(named) = &row.named {
322                    let key_matches = named
323                        .get("key")
324                        .and_then(|v| match v {
325                            crate::storage::schema::Value::Text(s) => Some(s.as_ref() == key),
326                            _ => None,
327                        })
328                        .unwrap_or(false);
329                    if key_matches {
330                        return named.get("value").cloned();
331                    }
332                }
333            }
334        }
335        None
336    }
337
338    /// List all collections
339    pub fn list_collections(&self) -> Vec<String> {
340        self.collections.read().keys().cloned().collect()
341    }
342
343    /// Drop a collection
344    pub fn drop_collection(&self, name: &str) -> Result<(), StoreError> {
345        let manager = {
346            let mut collections = self.collections.write();
347
348            collections
349                .remove(name)
350                .ok_or_else(|| StoreError::CollectionNotFound(name.to_string()))?
351        };
352
353        let entities = manager.query_all(|_| true);
354        let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
355
356        for entity_id in &entity_ids {
357            self.context_index.remove_entity(*entity_id);
358            let _ = self.unindex_cross_refs(*entity_id);
359        }
360
361        self.btree_indices.write().remove(name);
362
363        self.entity_cache.retain(|entity_id, (collection, _)| {
364            collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
365        });
366
367        self.cross_refs.write().retain(|source_id, refs| {
368            refs.retain(|(target_id, _, target_collection)| {
369                target_collection != name && !entity_ids.iter().any(|id| id == target_id)
370            });
371            !entity_ids.iter().any(|id| id == source_id)
372        });
373
374        self.reverse_refs.write().retain(|target_id, refs| {
375            refs.retain(|(source_id, _, source_collection)| {
376                source_collection != name && !entity_ids.iter().any(|id| id == source_id)
377            });
378            !entity_ids.iter().any(|id| id == target_id)
379        });
380
381        self.mark_paged_registry_dirty();
382        self.finish_paged_write([StoreWalAction::DropCollection {
383            name: name.to_string(),
384        }])?;
385
386        Ok(())
387    }
388
389    /// Insert an entity into a collection
390    pub fn insert(&self, collection: &str, entity: UnifiedEntity) -> Result<EntityId, StoreError> {
391        let manager = self
392            .get_collection(collection)
393            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
394
395        let mut entity = entity;
396        if entity.id.raw() == 0 {
397            entity.id = self.next_entity_id();
398        } else {
399            self.register_entity_id(entity.id);
400        }
401        // Assign per-table sequential row_id if not set
402        if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
403            if *row_id == 0 {
404                *row_id = manager.next_row_id();
405            } else {
406                manager.register_row_id(*row_id);
407            }
408        }
409        entity.ensure_table_logical_id();
410        // Capture graph node label before entity is moved into the manager
411        let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
412        {
413            Some(node.label.clone())
414        } else {
415            None
416        };
417
418        let id = manager.insert(entity)?;
419        self.register_entity_id(id);
420
421        // Update graph label index for GraphNode entities
422        if let Some(ref label) = graph_node_label {
423            self.update_graph_label_index(collection, label, id);
424        }
425
426        // Also insert into B-tree index if pager is active
427        let mut registry_dirty = false;
428        if let Some(pager) = &self.pager {
429            if let Some(entity) = manager.get(id) {
430                let mut btree_indices = self.btree_indices.write();
431                let btree = btree_indices
432                    .entry(collection.to_string())
433                    .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
434                let root_before = btree.root_page_id();
435
436                let key = id.raw().to_be_bytes();
437                let metadata = manager.get_metadata(id);
438                let value = Self::serialize_entity_record(
439                    &entity,
440                    metadata.as_ref(),
441                    self.format_version(),
442                );
443                // Ignore duplicate key errors (update scenario)
444                let _ = btree.insert(&key, &value);
445                registry_dirty = root_before != btree.root_page_id();
446            }
447        }
448
449        // Index cross-references if enabled
450        if self.config.auto_index_refs {
451            if let Some(entity) = manager.get(id) {
452                self.index_cross_refs(&entity, collection)?;
453            }
454        }
455
456        // Perf: skip WAL-action construction when the store is
457        // pagerless. For in-memory benchmarks this saved another
458        // `manager.get(id)` + `serialize_entity_record` per call.
459        if self.pager.is_some() {
460            let actions = manager
461                .get(id)
462                .map(|entity| {
463                    let metadata = manager.get_metadata(id);
464                    vec![StoreWalAction::upsert_entity(
465                        collection,
466                        &entity,
467                        metadata.as_ref(),
468                        self.format_version(),
469                    )]
470                })
471                .unwrap_or_default();
472            if registry_dirty {
473                self.mark_paged_registry_dirty();
474            }
475            self.finish_paged_write(actions)?;
476        }
477
478        Ok(id)
479    }
480
481    /// Turbo bulk insert — optimized fast path.
482    ///
483    /// Single lock for the entire batch. Skips bloom filter, memtable,
484    /// context index, and cross-ref indexing. B-tree writes are batched.
485    pub fn bulk_insert(
486        &self,
487        collection: &str,
488        mut entities: Vec<UnifiedEntity>,
489    ) -> Result<Vec<EntityId>, StoreError> {
490        // REDDB_BULK_TIMING=1 prints a per-call breakdown of the bulk
491        // insert path to stderr. Off by default — used by the reddb
492        // benchmark harness to locate ingest bottlenecks.
493        let trace = matches!(
494            std::env::var("REDDB_BULK_TIMING").ok().as_deref(),
495            Some("1") | Some("true") | Some("on")
496        );
497        let t_start = std::time::Instant::now();
498        let n = entities.len();
499        let manager = self.get_or_create_collection(collection);
500        let t_get_coll = t_start.elapsed();
501
502        // Assign IDs and per-table row_ids before serialization. Bulk
503        // insert must follow the same global ID semantics as
504        // insert()/insert_auto(). Reserve ID ranges in single fetch_add
505        // calls instead of one-per-entity. The overwhelming common case
506        // is "every entity comes in with id==0 and kind==TableRow with
507        // row_id==0" (the wire bulk insert path). Any entity that
508        // already carries an id or row_id falls back to the individual
509        // register_* path to preserve those semantics exactly.
510        let t0 = std::time::Instant::now();
511        let n_missing_entity_ids = entities.iter().filter(|e| e.id.raw() == 0).count() as u64;
512        let n_missing_row_ids = entities
513            .iter()
514            .filter(|e| matches!(e.kind, EntityKind::TableRow { row_id: 0, .. }))
515            .count() as u64;
516        let mut entity_id_range = if n_missing_entity_ids > 0 {
517            self.reserve_entity_ids(n_missing_entity_ids)
518        } else {
519            0..0
520        };
521        let mut row_id_range = if n_missing_row_ids > 0 {
522            manager.reserve_row_ids(n_missing_row_ids)
523        } else {
524            0..0
525        };
526        for entity in &mut entities {
527            if entity.id.raw() == 0 {
528                let next = entity_id_range
529                    .next()
530                    .expect("reserved entity-id range exhausted");
531                entity.id = EntityId::new(next);
532            } else {
533                self.register_entity_id(entity.id);
534            }
535            if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
536                if *row_id == 0 {
537                    *row_id = row_id_range
538                        .next()
539                        .expect("reserved row-id range exhausted");
540                } else {
541                    manager.register_row_id(*row_id);
542                }
543            }
544            entity.ensure_table_logical_id();
545        }
546        let t_assign_ids = t0.elapsed();
547
548        // Capture graph node labels before entities are moved into the segment manager
549        let graph_labels: Vec<Option<(String, EntityId)>> = entities
550            .iter()
551            .map(|e| {
552                if let EntityKind::GraphNode(ref node) = e.kind {
553                    Some((node.label.clone(), e.id))
554                } else {
555                    None
556                }
557            })
558            .collect();
559
560        // Pre-serialize for B-tree while we still have references.
561        // `serialize_entity_record` is pure; with `n >= 1024` the
562        // rayon dispatch cost is amortised and the 16-core serialize
563        // fan-out becomes the biggest single win on insert_bulk. For
564        // smaller batches, stay serial — micro-batches pay more in
565        // work-stealing overhead than they save.
566        let t0 = std::time::Instant::now();
567        let serialized: Option<Vec<(Vec<u8>, Vec<u8>)>> = if self.pager.is_some() {
568            let fv = self.format_version();
569            let serial_map = |e: &UnifiedEntity| {
570                (
571                    e.id.raw().to_be_bytes().to_vec(),
572                    Self::serialize_entity_record(e, None, fv),
573                )
574            };
575            // Gate chosen to match the bench's `BULK_BATCH_SIZE=1000`.
576            // Rayon's dispatch overhead is ~30µs/batch — on a 15-col
577            // row serializing for ~40µs the break-even is around
578            // 200-300 rows on a 16-core host. Keep a safety margin
579            // at 512.
580            if entities.len() >= 512 {
581                use rayon::prelude::*;
582                Some(entities.par_iter().map(serial_map).collect())
583            } else {
584                Some(entities.iter().map(serial_map).collect())
585            }
586        } else {
587            None
588        };
589        let t_serialize = t0.elapsed();
590
591        // Move entities into segment
592        let t0 = std::time::Instant::now();
593        let ids = manager.bulk_insert(entities)?;
594        let t_manager = t0.elapsed();
595        for id in &ids {
596            self.register_entity_id(*id);
597        }
598
599        // Update graph label index for bulk-inserted GraphNode entities
600        for (label, entity_id) in graph_labels.iter().flatten() {
601            self.update_graph_label_index(collection, label, *entity_id);
602        }
603
604        // REDDB_BULK_SKIP_PERSIST_UNSAFE=1 skips the persistent B-tree index
605        // during bulk ingest.
606        //
607        // UNSAFE: for ephemeral benchmark containers ONLY.
608        // This flag is silently ignored when a pager (durable storage) is active.
609        // In persistent mode, bulk inserts ALWAYS write to the B-tree so the data
610        // survives a cold restart without any manual rebuild step.
611        //
612        // The flag is only honoured when self.pager is None (in-memory / ephemeral).
613        let skip_btree_requested = matches!(
614            std::env::var("REDDB_BULK_SKIP_PERSIST_UNSAFE")
615                .ok()
616                .as_deref(),
617            Some("1") | Some("true") | Some("on")
618        );
619        // Honour the flag only when there is no durable pager.
620        // If a pager exists we are in persistent mode → always persist.
621        let skip_btree = skip_btree_requested && self.pager.is_none();
622        if skip_btree_requested && !skip_btree {
623            // Flag was set but we ignored it because we have a real pager.
624            static IGNORED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
625            IGNORED.get_or_init(|| {
626                tracing::warn!(
627                    "REDDB_BULK_SKIP_PERSIST_UNSAFE set but durable pager is \
628                     active — flag ignored; bulk inserts will be persisted normally"
629                );
630            });
631        } else if skip_btree {
632            // Ephemeral mode and flag is active — warn once.
633            static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
634            WARNED.get_or_init(|| {
635                tracing::warn!(
636                    "REDDB_BULK_SKIP_PERSIST_UNSAFE set (ephemeral/no-pager mode) — \
637                     bulk inserts NOT durable; data will be lost on restart"
638                );
639            });
640        }
641
642        // Batch B-tree write from pre-serialized data.
643        // Uses sorted bulk insert: walks to a leaf once, appends many entries,
644        // writes each leaf exactly once per batch — O(N) instead of O(N²).
645        let mut t_btree_lock = std::time::Duration::ZERO;
646        let mut t_btree_insert = std::time::Duration::ZERO;
647        let mut t_flush = std::time::Duration::ZERO;
648        if !skip_btree {
649            if let (Some(pager), Some(batch)) = (&self.pager, serialized.as_ref()) {
650                let t0 = std::time::Instant::now();
651                let mut btree_indices = self.btree_indices.write();
652                let btree = btree_indices
653                    .entry(collection.to_string())
654                    .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
655                let root_before = btree.root_page_id();
656                t_btree_lock = t0.elapsed();
657
658                let t0 = std::time::Instant::now();
659                let _ = btree.bulk_insert_sorted(batch);
660                t_btree_insert = t0.elapsed();
661                let registry_dirty = root_before != btree.root_page_id();
662
663                let t0 = std::time::Instant::now();
664                if registry_dirty {
665                    self.mark_paged_registry_dirty();
666                }
667                t_flush = t0.elapsed();
668            }
669        }
670
671        // Fold 25k per-entity `UpsertEntityRecord` actions into one
672        // `BulkUpsertEntityRecords` — same on-disk semantics (replay
673        // applies each record by id), one `WalRecord::PageWrite`
674        // instead of N, and `record.clone()` drops out because we
675        // consume `serialized`.
676        let actions = serialized
677            .map(|batch| {
678                let records: Vec<Vec<u8>> =
679                    batch.into_iter().map(|(_key, record)| record).collect();
680                vec![StoreWalAction::BulkUpsertEntityRecords {
681                    collection: collection.to_string(),
682                    records,
683                }]
684            })
685            .unwrap_or_default();
686        self.finish_paged_write(actions)?;
687
688        if trace {
689            tracing::debug!(
690                n,
691                total = ?t_start.elapsed(),
692                get_coll = ?t_get_coll,
693                assign = ?t_assign_ids,
694                serialize = ?t_serialize,
695                manager = ?t_manager,
696                btree_lock = ?t_btree_lock,
697                btree = ?t_btree_insert,
698                flush = ?t_flush,
699                "bulk_insert timing"
700            );
701        }
702
703        Ok(ids)
704    }
705
706    /// Insert an entity, creating collection if needed
707    pub fn insert_auto(
708        &self,
709        collection: &str,
710        entity: UnifiedEntity,
711    ) -> Result<EntityId, StoreError> {
712        let manager = self.get_or_create_collection(collection);
713        let mut entity = entity;
714        if entity.id.raw() == 0 {
715            entity.id = self.next_entity_id();
716        } else {
717            self.register_entity_id(entity.id);
718        }
719        // Assign per-table sequential row_id if not set
720        if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
721            if *row_id == 0 {
722                *row_id = manager.next_row_id();
723            } else {
724                manager.register_row_id(*row_id);
725            }
726        }
727        entity.ensure_table_logical_id();
728
729        // Capture graph node label before entity is moved into the manager
730        let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
731        {
732            Some(node.label.clone())
733        } else {
734            None
735        };
736        // Index into context index before consuming the entity
737        self.context_index.index_entity(collection, &entity);
738
739        // Pre-serialize the entity record (B-tree image == WAL action body)
740        // and index cross-refs while we still own the entity. Mirrors the
741        // bulk_insert path (see line 559-580 above): segment.insert assigns
742        // a fresh `sequence_id`, but the bulk path already accepts a
743        // sequence_id=0 on-disk image (replay calls manager.insert which
744        // re-assigns sequence_id from the segment's allocator), so the
745        // single-row path can do the same. Fresh inserts never carry
746        // metadata — set_metadata is a separate call — so we pass None,
747        // matching how the bulk path serializes records.
748        //
749        // Eliminates the post-insert `manager.get(id)` clone of the just-
750        // inserted UnifiedEntity (HashMap<String, Value> + per-field
751        // Strings). Per `docs/perf/insert_sequential-2026-05-05.md` P2 this
752        // was the dominant clone in the insert_sequential hot path.
753        let id_for_serialize = entity.id;
754        let serialized_record: Option<Vec<u8>> = if self.pager.is_some() {
755            Some(Self::serialize_entity_record(
756                &entity,
757                None,
758                self.format_version(),
759            ))
760        } else {
761            None
762        };
763        if self.config.auto_index_refs {
764            self.index_cross_refs(&entity, collection)?;
765        }
766
767        let id = manager.insert(entity)?;
768        debug_assert_eq!(id, id_for_serialize);
769        // `register_entity_id` already advances the atomic counter on
770        // the allocation path above (`self.next_entity_id()`), so the
771        // second call here is a no-op CAS loop on the hot path. Only
772        // needed for the caller-supplied-id branch which happens via
773        // the `register_entity_id` call on line 573.
774
775        // Update graph label index for GraphNode entities
776        if let Some(ref label) = graph_node_label {
777            self.update_graph_label_index(collection, label, id);
778        }
779
780        let mut registry_dirty = false;
781        if let (Some(_pager), Some(record)) = (&self.pager, serialized_record.as_ref()) {
782            if let Some(btree) = self.get_or_create_btree(collection) {
783                let root_before = btree.root_page_id();
784
785                let key = id.raw().to_be_bytes();
786                btree.insert(&key, record).map_err(|e| {
787                    StoreError::Io(std::io::Error::other(format!(
788                        "B-tree insert error while inserting '{collection}'/{id}: {e}"
789                    )))
790                })?;
791                registry_dirty = root_before != btree.root_page_id();
792            }
793        }
794
795        // Perf: pagerless → skip WAL-action construction (saves a
796        // third manager.get + entity serialize per insert). For
797        // in-memory runtimes finish_paged_write is a no-op.
798        if self.pager.is_some() {
799            let actions = serialized_record
800                .map(|record| {
801                    vec![StoreWalAction::UpsertEntityRecord {
802                        collection: collection.to_string(),
803                        record,
804                    }]
805                })
806                .unwrap_or_default();
807            if registry_dirty {
808                self.mark_paged_registry_dirty();
809            }
810            self.finish_paged_write(actions)?;
811        }
812        Ok(id)
813    }
814
815    /// Get an entity from a collection
816    ///
817    /// Prefers the live SegmentManager view so reads after update/delete observe
818    /// the current in-memory state even when the paged B-tree image has not been
819    /// refreshed yet. Falls back to the B-tree image for recovery-oriented reads.
820    pub fn get(&self, collection: &str, id: EntityId) -> Option<UnifiedEntity> {
821        // Prefer the live manager state to avoid stale reads after manager.update().
822        if let Some(entity) = self
823            .get_collection(collection)
824            .and_then(|manager| manager.get(id))
825        {
826            return Some(entity);
827        }
828
829        // Fall back to the paged B-tree image if the manager does not currently hold the row.
830        if self.pager.is_some() {
831            let btree_indices = self.btree_indices.read();
832            if let Some(btree) = btree_indices.get(collection) {
833                let key = id.raw().to_be_bytes();
834                if let Ok(Some(value)) = btree.get(&key) {
835                    if let Ok((entity, _)) =
836                        Self::deserialize_entity_record(&value, self.format_version())
837                    {
838                        return Some(entity);
839                    }
840                }
841            }
842        }
843
844        None
845    }
846
847    /// Resolve a table row by stable logical identity.
848    ///
849    /// Legacy rows use `logical_id == id`, so the physical-id lookup remains
850    /// the fast path. Versioned rows may have a different physical id and fall
851    /// back to a scan until the history-store/index slices add a dedicated map.
852    pub fn get_table_row_by_logical_id(
853        &self,
854        collection: &str,
855        logical_id: EntityId,
856    ) -> Option<UnifiedEntity> {
857        if let Some(entity) = self.get(collection, logical_id) {
858            if matches!(entity.kind, EntityKind::TableRow { .. })
859                && entity.logical_id() == logical_id
860                && entity.xmax == 0
861            {
862                return Some(entity);
863            }
864        }
865
866        let manager = self.get_collection(collection)?;
867        let mut matches = manager.query_all(|entity| {
868            matches!(entity.kind, EntityKind::TableRow { .. }) && entity.logical_id() == logical_id
869        });
870        matches
871            .iter()
872            .find(|entity| entity.xmax == 0)
873            .cloned()
874            .or_else(|| matches.pop())
875    }
876
877    pub fn table_row_versions_by_logical_id(
878        &self,
879        collection: &str,
880        logical_id: EntityId,
881    ) -> Vec<UnifiedEntity> {
882        self.get_collection(collection)
883            .map(|manager| {
884                manager.query_all(|entity| {
885                    matches!(entity.kind, EntityKind::TableRow { .. })
886                        && entity.logical_id() == logical_id
887                })
888            })
889            .unwrap_or_default()
890    }
891
892    pub fn vacuum_mvcc_history(
893        &self,
894        collection: &str,
895        cutoff_xid: u64,
896    ) -> Result<MvccVacuumStats, StoreError> {
897        let manager = self
898            .get_collection(collection)
899            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
900        let entities =
901            manager.query_all(|entity| matches!(entity.kind, EntityKind::TableRow { .. }));
902        let mut logical = HashMap::<EntityId, (bool, u64)>::new();
903        for entity in &entities {
904            let entry = logical.entry(entity.logical_id()).or_insert((false, 0));
905            if entity.xmax == 0 {
906                entry.0 = true;
907            }
908            entry.1 = entry.1.max(entity.xmin);
909        }
910
911        let mut stats = MvccVacuumStats::default();
912        let mut reclaim_ids = Vec::new();
913        for entity in entities {
914            if entity.xmax == 0 {
915                continue;
916            }
917            stats.scanned_versions += 1;
918            let (has_live_version, max_xmin) = logical
919                .get(&entity.logical_id())
920                .copied()
921                .unwrap_or((false, entity.xmin));
922            let is_delete_tombstone = !has_live_version && entity.xmin == max_xmin;
923            if entity.xmax < cutoff_xid {
924                stats.reclaimed_versions += 1;
925                if is_delete_tombstone {
926                    stats.reclaimed_tombstones += 1;
927                } else {
928                    stats.reclaimed_history_versions += 1;
929                }
930                reclaim_ids.push(entity.id);
931            } else {
932                stats.retained_versions += 1;
933                if is_delete_tombstone {
934                    stats.retained_tombstones += 1;
935                } else {
936                    stats.retained_history_versions += 1;
937                }
938            }
939        }
940
941        if !reclaim_ids.is_empty() {
942            self.delete_batch(collection, &reclaim_ids)?;
943        }
944        Ok(stats)
945    }
946
947    pub(crate) fn install_versioned_table_row_update(
948        &self,
949        collection: &str,
950        old_version: UnifiedEntity,
951        mut new_version: UnifiedEntity,
952        metadata: Option<&Metadata>,
953    ) -> Result<(), StoreError> {
954        let manager = self
955            .get_collection(collection)
956            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
957
958        let old_id = old_version.id;
959        let new_id = new_version.id;
960        let inherited_metadata = metadata.cloned().or_else(|| manager.get_metadata(old_id));
961
962        self.entity_cache.remove(old_id.raw());
963        self.entity_cache.remove(new_id.raw());
964        manager.update(old_version.clone())?;
965        self.context_index.remove_entity(old_id);
966
967        self.register_entity_id(new_version.id);
968        if let EntityKind::TableRow { ref mut row_id, .. } = new_version.kind {
969            if *row_id == 0 {
970                *row_id = manager.next_row_id();
971            } else {
972                manager.register_row_id(*row_id);
973            }
974        }
975        new_version.ensure_table_logical_id();
976        manager.insert(new_version.clone())?;
977        if let Some(metadata) = inherited_metadata {
978            manager.set_metadata(new_id, metadata)?;
979        }
980        self.context_index.index_entity(collection, &new_version);
981        if self.config.auto_index_refs {
982            self.index_cross_refs(&new_version, collection)?;
983        }
984
985        let old_metadata = manager.get_metadata(old_id);
986        let new_metadata = manager.get_metadata(new_id);
987        let fv = self.format_version();
988        let records = vec![
989            Self::serialize_entity_record(&old_version, old_metadata.as_ref(), fv),
990            Self::serialize_entity_record(&new_version, new_metadata.as_ref(), fv),
991        ];
992        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
993            collection: collection.to_string(),
994            records,
995        }])?;
996
997        Ok(())
998    }
999
1000    /// Batch-fetch multiple entities from the same collection in minimal lock acquisitions.
1001    ///
1002    /// Preferred over N individual `get()` calls in indexed-scan loops (sorted index,
1003    /// bitmap, hash). Reduces lock acquisitions from N×3 to 2-3 total.
1004    /// Preserves input order: `result[i]` corresponds to `ids[i]`.
1005    pub fn get_batch(&self, collection: &str, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1006        match self.get_collection(collection) {
1007            Some(manager) => manager.get_many(ids),
1008            None => vec![None; ids.len()],
1009        }
1010    }
1011
1012    /// Get an entity from any collection
1013    pub fn get_any(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1014        // Sharded LRU probe: hits / misses are counted by `EntityCache::get`,
1015        // so external observers can read `entity_cache_hit_rate()` without
1016        // any extra wiring on the call site.
1017        if let Some(cached) = self.entity_cache.get(id.raw()) {
1018            return Some(cached);
1019        }
1020
1021        // Cache miss — fall back to the full collection scan, then memoise.
1022        let collections = self.collections.read();
1023        for (name, manager) in collections.iter() {
1024            if let Some(entity) = manager.get(id) {
1025                let result = (name.clone(), entity);
1026                // Drop the collections read guard before taking any cache
1027                // lock — the cache shards are independent, but releasing
1028                // here keeps `collections` contention low.
1029                drop(collections);
1030                self.entity_cache.insert(id.raw(), result.clone());
1031                return Some(result);
1032            }
1033        }
1034        None
1035    }
1036
1037    /// Hit rate of the store's entity cache (`get_any` lookups).
1038    ///
1039    /// Returns `None` until the cache has served at least one lookup.
1040    /// Exposed for observability — e.g. dashboards distinguishing graph
1041    /// workloads (high hit rate) from OLTP DML (≈ 0 % hit rate).
1042    pub fn entity_cache_hit_rate(&self) -> Option<f64> {
1043        self.entity_cache.hit_rate()
1044    }
1045
1046    /// Snapshot of cache hit / miss / eviction counters and current size.
1047    pub fn entity_cache_stats(&self) -> super::super::entity_cache::EntityCacheStats {
1048        self.entity_cache.stats()
1049    }
1050
1051    /// Delete an entity
1052    pub fn delete(&self, collection: &str, id: EntityId) -> Result<bool, StoreError> {
1053        // Invalidate entity cache (read-lock probe; no write lock when cold).
1054        self.entity_cache.remove(id.raw());
1055        let manager = self
1056            .get_collection(collection)
1057            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1058
1059        let deleted = manager.delete(id)?;
1060        if !deleted {
1061            return Ok(false);
1062        }
1063
1064        // Remove from B-tree index if active
1065        let mut registry_dirty = false;
1066        if self.pager.is_some() {
1067            let btree_indices = self.btree_indices.read();
1068            if let Some(btree) = btree_indices.get(collection) {
1069                let root_before = btree.root_page_id();
1070                let key = id.raw().to_be_bytes();
1071                let _ = btree.delete(&key);
1072                registry_dirty = root_before != btree.root_page_id();
1073            }
1074        }
1075
1076        // Remove cross-references
1077        self.unindex_cross_refs(id)?;
1078
1079        // Remove from graph label index
1080        self.remove_from_graph_label_index(collection, id);
1081
1082        if registry_dirty {
1083            self.mark_paged_registry_dirty();
1084        }
1085        self.finish_paged_write([StoreWalAction::DeleteEntityRecord {
1086            collection: collection.to_string(),
1087            entity_id: id.raw(),
1088        }])?;
1089
1090        Ok(true)
1091    }
1092
1093    pub fn delete_batch(
1094        &self,
1095        collection: &str,
1096        ids: &[EntityId],
1097    ) -> Result<Vec<EntityId>, StoreError> {
1098        if ids.is_empty() {
1099            return Ok(Vec::new());
1100        }
1101
1102        // Sharded invalidation. The bounded LRU's shard write lock is
1103        // only acquired when the shard actually carries one of these ids,
1104        // so the bench-typical zero-hit `delete_sequential` workload never
1105        // takes a write lock here at all.
1106        self.entity_cache.remove_many(ids.iter().map(|id| id.raw()));
1107
1108        let manager = self
1109            .get_collection(collection)
1110            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1111
1112        let deleted_ids = manager.delete_batch(ids)?;
1113        if deleted_ids.is_empty() {
1114            return Ok(deleted_ids);
1115        }
1116
1117        let mut registry_dirty = false;
1118        if self.pager.is_some() {
1119            let btree_indices = self.btree_indices.read();
1120            if let Some(btree) = btree_indices.get(collection) {
1121                let root_before = btree.root_page_id();
1122                for id in &deleted_ids {
1123                    let key = id.raw().to_be_bytes();
1124                    let _ = btree.delete(&key);
1125                }
1126                registry_dirty = root_before != btree.root_page_id();
1127            }
1128        }
1129
1130        self.unindex_cross_refs_batch(&deleted_ids)?;
1131        self.remove_from_graph_label_index_batch(collection, &deleted_ids);
1132        if registry_dirty {
1133            self.mark_paged_registry_dirty();
1134        }
1135        let actions = deleted_ids
1136            .iter()
1137            .map(|id| StoreWalAction::DeleteEntityRecord {
1138                collection: collection.to_string(),
1139                entity_id: id.raw(),
1140            })
1141            .collect::<Vec<_>>();
1142        self.finish_paged_write(actions)?;
1143
1144        Ok(deleted_ids)
1145    }
1146
1147    /// Set metadata for an entity
1148    pub fn set_metadata(
1149        &self,
1150        collection: &str,
1151        id: EntityId,
1152        metadata: Metadata,
1153    ) -> Result<(), StoreError> {
1154        let manager = self
1155            .get_collection(collection)
1156            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1157
1158        manager.set_metadata(id, metadata)?;
1159        if let Some(entity) = manager.get(id) {
1160            self.persist_entities_to_pager(collection, std::slice::from_ref(&entity))?;
1161        }
1162        Ok(())
1163    }
1164
1165    /// Get metadata for an entity
1166    pub fn get_metadata(&self, collection: &str, id: EntityId) -> Option<Metadata> {
1167        self.get_collection(collection)?.get_metadata(id)
1168    }
1169
1170    /// Add a cross-reference between entities
1171    pub fn add_cross_ref(
1172        &self,
1173        source_collection: &str,
1174        source_id: EntityId,
1175        target_collection: &str,
1176        target_id: EntityId,
1177        ref_type: RefType,
1178        weight: f32,
1179    ) -> Result<(), StoreError> {
1180        // Check source exists
1181        let source_manager = self
1182            .get_collection(source_collection)
1183            .ok_or_else(|| StoreError::CollectionNotFound(source_collection.to_string()))?;
1184
1185        if source_manager.get(source_id).is_none() {
1186            return Err(StoreError::EntityNotFound(source_id));
1187        }
1188
1189        // Check target exists
1190        let target_manager = self
1191            .get_collection(target_collection)
1192            .ok_or_else(|| StoreError::CollectionNotFound(target_collection.to_string()))?;
1193
1194        if target_manager.get(target_id).is_none() {
1195            return Err(StoreError::EntityNotFound(target_id));
1196        }
1197
1198        // Check limits
1199        let current_refs = self
1200            .cross_refs
1201            .read()
1202            .get(&source_id)
1203            .map_or(0, |v| v.len());
1204
1205        if current_refs >= self.config.max_cross_refs {
1206            return Err(StoreError::TooManyRefs(source_id));
1207        }
1208
1209        let mut registry_dirty = false;
1210        {
1211            let mut forward = self.cross_refs.write();
1212            let refs = forward.entry(source_id).or_default();
1213            let inserted = !refs.iter().any(|(id, kind, coll)| {
1214                *id == target_id && *kind == ref_type && coll == target_collection
1215            });
1216            if inserted {
1217                refs.push((target_id, ref_type, target_collection.to_string()));
1218                registry_dirty = true;
1219            }
1220        }
1221
1222        {
1223            let mut reverse = self.reverse_refs.write();
1224            let refs = reverse.entry(target_id).or_default();
1225            let inserted = !refs.iter().any(|(id, kind, coll)| {
1226                *id == source_id && *kind == ref_type && coll == source_collection
1227            });
1228            if inserted {
1229                refs.push((source_id, ref_type, source_collection.to_string()));
1230                registry_dirty = true;
1231            }
1232        }
1233
1234        if let Some(mut entity) = source_manager.get(source_id) {
1235            if !entity.cross_refs().iter().any(|xref| {
1236                xref.target == target_id
1237                    && xref.ref_type == ref_type
1238                    && xref.target_collection == target_collection
1239            }) {
1240                let cross_ref = CrossRef::with_weight(
1241                    source_id,
1242                    target_id,
1243                    target_collection,
1244                    ref_type,
1245                    weight,
1246                );
1247                entity.add_cross_ref(cross_ref);
1248                let _ = source_manager.update(entity.clone());
1249                registry_dirty = true;
1250                self.persist_entities_to_pager(source_collection, std::slice::from_ref(&entity))?;
1251            }
1252        }
1253
1254        if registry_dirty {
1255            self.mark_paged_registry_dirty();
1256            if matches!(
1257                self.config.durability_mode,
1258                crate::api::DurabilityMode::Strict
1259            ) {
1260                self.flush_paged_state()?;
1261            }
1262        }
1263
1264        Ok(())
1265    }
1266
1267    /// Get cross-references from an entity
1268    pub fn get_refs_from(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1269        self.cross_refs.read().get(&id).cloned().unwrap_or_default()
1270    }
1271
1272    /// Get cross-references to an entity
1273    pub fn get_refs_to(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1274        self.reverse_refs
1275            .read()
1276            .get(&id)
1277            .cloned()
1278            .unwrap_or_default()
1279    }
1280
1281    /// Expand cross-references to get related entities
1282    pub fn expand_refs(
1283        &self,
1284        id: EntityId,
1285        depth: u32,
1286        ref_types: Option<&[RefType]>,
1287    ) -> Vec<(UnifiedEntity, u32, RefType)> {
1288        let mut results = Vec::new();
1289        let mut visited = std::collections::HashSet::new();
1290        visited.insert(id);
1291
1292        self.expand_refs_recursive(id, depth, ref_types, &mut visited, &mut results, 1);
1293
1294        results
1295    }
1296
1297    fn expand_refs_recursive(
1298        &self,
1299        id: EntityId,
1300        max_depth: u32,
1301        ref_types: Option<&[RefType]>,
1302        visited: &mut std::collections::HashSet<EntityId>,
1303        results: &mut Vec<(UnifiedEntity, u32, RefType)>,
1304        current_depth: u32,
1305    ) {
1306        if current_depth > max_depth {
1307            return;
1308        }
1309
1310        for (target_id, ref_type, target_collection) in self.get_refs_from(id) {
1311            if visited.contains(&target_id) {
1312                continue;
1313            }
1314
1315            if let Some(types) = ref_types {
1316                if !types.contains(&ref_type) {
1317                    continue;
1318                }
1319            }
1320
1321            visited.insert(target_id);
1322
1323            if let Some(entity) = self.get(&target_collection, target_id) {
1324                results.push((entity, current_depth, ref_type));
1325
1326                // Recurse
1327                self.expand_refs_recursive(
1328                    target_id,
1329                    max_depth,
1330                    ref_types,
1331                    visited,
1332                    results,
1333                    current_depth + 1,
1334                );
1335            }
1336        }
1337    }
1338
1339    /// Index cross-references from an entity
1340    pub(crate) fn index_cross_refs(
1341        &self,
1342        entity: &UnifiedEntity,
1343        collection: &str,
1344    ) -> Result<(), StoreError> {
1345        let mut registry_dirty = false;
1346        for cross_ref in entity.cross_refs() {
1347            if cross_ref.target_collection.is_empty() {
1348                continue;
1349            }
1350            {
1351                let mut forward = self.cross_refs.write();
1352                let refs = forward.entry(cross_ref.source).or_default();
1353                let inserted = !refs.iter().any(|(id, kind, coll)| {
1354                    *id == cross_ref.target
1355                        && *kind == cross_ref.ref_type
1356                        && coll == &cross_ref.target_collection
1357                });
1358                if inserted {
1359                    refs.push((
1360                        cross_ref.target,
1361                        cross_ref.ref_type,
1362                        cross_ref.target_collection.clone(),
1363                    ));
1364                    registry_dirty = true;
1365                }
1366            }
1367
1368            {
1369                let mut reverse = self.reverse_refs.write();
1370                let refs = reverse.entry(cross_ref.target).or_default();
1371                let inserted = !refs.iter().any(|(id, kind, coll)| {
1372                    *id == cross_ref.source && *kind == cross_ref.ref_type && coll == collection
1373                });
1374                if inserted {
1375                    refs.push((cross_ref.source, cross_ref.ref_type, collection.to_string()));
1376                    registry_dirty = true;
1377                }
1378            }
1379        }
1380
1381        if registry_dirty {
1382            self.mark_paged_registry_dirty();
1383        }
1384
1385        Ok(())
1386    }
1387
1388    /// Remove cross-references for an entity
1389    pub(crate) fn unindex_cross_refs(&self, id: EntityId) -> Result<(), StoreError> {
1390        // Remove forward refs
1391        self.cross_refs.write().remove(&id);
1392
1393        // Remove from reverse refs (scan all)
1394        let mut reverse = self.reverse_refs.write();
1395        for refs in reverse.values_mut() {
1396            refs.retain(|(source, _, _)| *source != id);
1397        }
1398        reverse.remove(&id);
1399        self.mark_paged_registry_dirty();
1400
1401        Ok(())
1402    }
1403
1404    pub(crate) fn unindex_cross_refs_batch(&self, ids: &[EntityId]) -> Result<(), StoreError> {
1405        if ids.is_empty() {
1406            return Ok(());
1407        }
1408
1409        let id_set: std::collections::HashSet<EntityId> = ids.iter().copied().collect();
1410
1411        // Read-only probe: under steady-state delete workloads most rows have
1412        // no cross-refs at all, so both maps are entirely cold for the batch.
1413        // Acquiring the write lock to scan `reverse_refs.values_mut()` over an
1414        // unrelated dictionary was the dominant cost in #85's `delete_sequential`
1415        // profile. The read-lock probe below is O(|ids|) hashmap lookups against
1416        // each map, vs. O(|reverse_refs|) for the write-path scan it replaces.
1417        //
1418        // Decision: skip the write path iff
1419        //   - no deleted id is a key in `cross_refs`  (no outbound refs to drop,
1420        //     and so no `(deleted_id, _, _)` source entry exists anywhere in
1421        //     `reverse_refs.values()` either — the two maps are kept in sync by
1422        //     `add_cross_ref`), AND
1423        //   - no deleted id is a key in `reverse_refs` (no inbound refs to drop).
1424        let needs_forward_cleanup = {
1425            let forward = self.cross_refs.read();
1426            id_set.iter().any(|id| forward.contains_key(id))
1427        };
1428        let needs_reverse_cleanup = {
1429            let reverse = self.reverse_refs.read();
1430            id_set.iter().any(|id| reverse.contains_key(id))
1431        };
1432
1433        if !needs_forward_cleanup && !needs_reverse_cleanup {
1434            self.unindex_cross_refs_fast_path
1435                .fetch_add(1, Ordering::Relaxed);
1436            return Ok(());
1437        }
1438
1439        if needs_forward_cleanup {
1440            let mut forward = self.cross_refs.write();
1441            for id in &id_set {
1442                forward.remove(id);
1443            }
1444        }
1445
1446        if needs_reverse_cleanup || needs_forward_cleanup {
1447            // The inner `values_mut()` scan is only needed when at least one
1448            // deleted id was a *source* somewhere — which, by the invariant
1449            // above, can only happen when it had outbound forward refs.
1450            let mut reverse = self.reverse_refs.write();
1451            if needs_forward_cleanup {
1452                for refs in reverse.values_mut() {
1453                    refs.retain(|(source, _, _)| !id_set.contains(source));
1454                }
1455            }
1456            reverse.retain(|target, refs| !id_set.contains(target) && !refs.is_empty());
1457        }
1458        self.mark_paged_registry_dirty();
1459
1460        Ok(())
1461    }
1462
1463    /// Test/observability hook: number of times `unindex_cross_refs_batch`
1464    /// took the read-only fast path. Used to pin the early-exit in tests
1465    /// and as a cheap signal in delete-heavy benchmarks.
1466    pub fn unindex_cross_refs_fast_path_hits(&self) -> u64 {
1467        self.unindex_cross_refs_fast_path.load(Ordering::Relaxed)
1468    }
1469
1470    /// Query across all collections with a filter
1471    pub fn query_all<F>(&self, filter: F) -> Vec<(String, UnifiedEntity)>
1472    where
1473        F: Fn(&UnifiedEntity) -> bool + Clone + Send + Sync,
1474    {
1475        let collections = self.collections.read();
1476        let pairs: Vec<_> = collections.iter().collect();
1477
1478        let use_parallel = pairs.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1479        if !use_parallel {
1480            // Single collection — no parallelism overhead
1481            return pairs
1482                .into_iter()
1483                .flat_map(|(name, mgr)| {
1484                    mgr.query_all(filter.clone())
1485                        .into_iter()
1486                        .map(move |e| (name.clone(), e))
1487                })
1488                .collect();
1489        }
1490
1491        // Multiple collections — scan in parallel
1492        let filter_ref = &filter;
1493        let collection_results: Vec<Vec<(String, UnifiedEntity)>> = std::thread::scope(|s| {
1494            pairs
1495                .iter()
1496                .map(|(name, manager)| {
1497                    let name = (*name).clone();
1498                    s.spawn(move || {
1499                        manager
1500                            .query_all(|e| filter_ref(e))
1501                            .into_iter()
1502                            .map(|e| (name.clone(), e))
1503                            .collect::<Vec<_>>()
1504                    })
1505                })
1506                .collect::<Vec<_>>()
1507                .into_iter()
1508                .map(|h| h.join().unwrap_or_default())
1509                .collect()
1510        });
1511
1512        collection_results.into_iter().flatten().collect()
1513    }
1514
1515    /// Filter by metadata across all collections
1516    pub fn filter_metadata_all(
1517        &self,
1518        filters: &[(String, MetadataFilter)],
1519    ) -> Vec<(String, EntityId)> {
1520        let mut results = Vec::new();
1521        let collections = self.collections.read();
1522
1523        for (name, manager) in collections.iter() {
1524            for id in manager.filter_metadata(filters) {
1525                results.push((name.clone(), id));
1526            }
1527        }
1528
1529        results
1530    }
1531
1532    /// Get statistics
1533    pub fn stats(&self) -> StoreStats {
1534        let collections = self.collections.read();
1535
1536        let mut stats = StoreStats {
1537            collection_count: collections.len(),
1538            ..Default::default()
1539        };
1540
1541        for (name, manager) in collections.iter() {
1542            let manager_stats = manager.stats();
1543            stats.total_entities += manager_stats.total_entities;
1544            stats.total_memory_bytes += manager_stats.total_memory_bytes;
1545            stats.collections.insert(name.clone(), manager_stats);
1546        }
1547
1548        stats
1549    }
1550
1551    /// Run maintenance on all collections
1552    pub fn run_maintenance(&self) -> Result<(), StoreError> {
1553        let collections = self.collections.read();
1554        for manager in collections.values() {
1555            manager.run_maintenance()?;
1556        }
1557        Ok(())
1558    }
1559}
1560
1561/// Flatten a JSON value into dot-notation key-value pairs for red_config.
1562fn flatten_config_json(
1563    prefix: &str,
1564    value: &crate::serde_json::Value,
1565    out: &mut Vec<(String, crate::storage::schema::Value)>,
1566) {
1567    use crate::storage::schema::Value;
1568    match value {
1569        crate::serde_json::Value::Object(map) => {
1570            for (k, v) in map {
1571                let key = if prefix.is_empty() {
1572                    k.clone()
1573                } else {
1574                    format!("{prefix}.{k}")
1575                };
1576                flatten_config_json(&key, v, out);
1577            }
1578        }
1579        crate::serde_json::Value::String(s) => {
1580            out.push((prefix.to_string(), Value::text(s.clone())));
1581        }
1582        crate::serde_json::Value::Number(n) => {
1583            if n.fract().abs() < f64::EPSILON {
1584                out.push((prefix.to_string(), Value::UnsignedInteger(*n as u64)));
1585            } else {
1586                out.push((prefix.to_string(), Value::Float(*n)));
1587            }
1588        }
1589        crate::serde_json::Value::Bool(b) => {
1590            out.push((prefix.to_string(), Value::Boolean(*b)));
1591        }
1592        crate::serde_json::Value::Null => {
1593            out.push((prefix.to_string(), Value::Null));
1594        }
1595        crate::serde_json::Value::Array(arr) => {
1596            let json_str = crate::serde_json::to_string(value).unwrap_or_default();
1597            out.push((prefix.to_string(), Value::text(json_str)));
1598        }
1599    }
1600}