Skip to main content

reddb_server/storage/unified/store/
impl_entities.rs

1use super::*;
2
3impl UnifiedStore {
4    pub(crate) fn persist_entities_to_pager(
5        &self,
6        collection: &str,
7        entities: &[UnifiedEntity],
8    ) -> Result<(), StoreError> {
9        self.persist_entities_impl(collection, entities, /* skip_btree= */ false)
10    }
11
12    /// Same as `persist_entities_to_pager` but takes `&[&UnifiedEntity]` so
13    /// callers that already hold references (SQL UPDATE fast path) don't
14    /// have to clone a `Vec<UnifiedEntity>` just to satisfy the signature.
15    pub(crate) fn persist_entity_refs_to_pager(
16        &self,
17        collection: &str,
18        entities: &[&UnifiedEntity],
19    ) -> Result<(), StoreError> {
20        self.persist_entity_refs_impl(collection, entities, /* skip_btree= */ false)
21    }
22
23    /// Reference-taking WAL-only variant — see
24    /// `persist_entities_to_pager_wal_only` for semantics.
25    pub(crate) fn persist_entity_refs_to_pager_wal_only(
26        &self,
27        collection: &str,
28        entities: &[&UnifiedEntity],
29    ) -> Result<(), StoreError> {
30        self.persist_entity_refs_impl(collection, entities, /* skip_btree= */ true)
31    }
32
33    fn persist_entity_refs_impl(
34        &self,
35        collection: &str,
36        entities: &[&UnifiedEntity],
37        skip_btree: bool,
38    ) -> Result<(), StoreError> {
39        if entities.is_empty() {
40            return Ok(());
41        }
42        let Some(pager) = &self.pager else {
43            return Ok(());
44        };
45        let fv = self.format_version();
46        let manager = self
47            .get_collection(collection)
48            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
49        let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
50            .iter()
51            .map(|entity| {
52                let metadata = manager.get_metadata(entity.id);
53                (
54                    entity.id.raw().to_be_bytes().to_vec(),
55                    Self::serialize_entity_record(entity, metadata.as_ref(), fv),
56                )
57            })
58            .collect();
59        serialized.sort_by(|a, b| a.0.cmp(&b.0));
60
61        if !skip_btree {
62            let mut btree_indices = self.btree_indices.write();
63            let btree = btree_indices
64                .entry(collection.to_string())
65                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
66            let root_before = btree.root_page_id();
67            btree.upsert_batch_sorted(&serialized).map_err(|e| {
68                StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
69            })?;
70            let root_after = btree.root_page_id();
71            drop(btree_indices);
72            if root_before != root_after {
73                self.mark_paged_registry_dirty();
74            }
75        }
76        let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
77        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
78            collection: collection.to_string(),
79            records,
80        }])?;
81        Ok(())
82    }
83
84    /// PG-HOT-like UPDATE persist: emit the WAL record so the update
85    /// survives a crash, but skip the in-line B-tree upsert. Reads
86    /// continue to see the new entity because `manager.get()` prefers
87    /// the segment over the B-tree; WAL replay re-applies the upsert
88    /// to the B-tree on recovery so the two views converge before
89    /// any stale B-tree page can be surfaced.
90    ///
91    /// Safe to use only when the caller has already updated the
92    /// segment in-place (so live reads see the new value). Cuts out
93    /// the dominant cost of single-row UPDATEs: per-entity serialize
94    /// + leaf descent + page decompress / compress / write_page.
95    pub(crate) fn persist_entities_to_pager_wal_only(
96        &self,
97        collection: &str,
98        entities: &[UnifiedEntity],
99    ) -> Result<(), StoreError> {
100        self.persist_entities_impl(collection, entities, /* skip_btree= */ true)
101    }
102
103    fn persist_entities_impl(
104        &self,
105        collection: &str,
106        entities: &[UnifiedEntity],
107        skip_btree: bool,
108    ) -> Result<(), StoreError> {
109        if entities.is_empty() {
110            return Ok(());
111        }
112
113        let Some(pager) = &self.pager else {
114            return Ok(());
115        };
116
117        let fv = self.format_version();
118        let manager = self
119            .get_collection(collection)
120            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
121        let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
122            .iter()
123            .map(|entity| {
124                let metadata = manager.get_metadata(entity.id);
125                (
126                    entity.id.raw().to_be_bytes().to_vec(),
127                    Self::serialize_entity_record(entity, metadata.as_ref(), fv),
128                )
129            })
130            .collect();
131        // u64 IDs encoded as big-endian — lex order = numeric order.
132        serialized.sort_by(|a, b| a.0.cmp(&b.0));
133
134        if !skip_btree {
135            let mut btree_indices = self.btree_indices.write();
136            let btree = btree_indices
137                .entry(collection.to_string())
138                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
139            let root_before = btree.root_page_id();
140
141            // Walks each distinct leaf once, applies all in-place overwrites
142            // that belong there under one read+write. Keys that miss or grow
143            // fall back to the per-key `upsert` path internally.
144            btree.upsert_batch_sorted(&serialized).map_err(|e| {
145                StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
146            })?;
147            let root_after = btree.root_page_id();
148            drop(btree_indices);
149            if root_before != root_after {
150                self.mark_paged_registry_dirty();
151            }
152        }
153        // One WAL action covering the whole bulk. Previously each entity
154        // produced its own `UpsertEntityRecord` action, which then became
155        // its own `WalRecord::PageWrite` triple inside
156        // `StoreCommitCoordinator::append_actions` — N WAL records per
157        // bulk. The batched variant is one WAL record per bulk.
158        //
159        // WAL is emitted even in the `skip_btree` variant — recovery
160        // replay reapplies the upsert to the B-tree, so durability
161        // is unchanged.
162        let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
163        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
164            collection: collection.to_string(),
165            records,
166        }])?;
167
168        Ok(())
169    }
170
171    /// Insert a label→entity_id mapping into the graph label index.
172    pub(crate) fn update_graph_label_index(
173        &self,
174        collection: &str,
175        label: &str,
176        entity_id: EntityId,
177    ) {
178        let key = (collection.to_string(), label.to_string());
179        let mut idx = self.graph_label_index.write();
180        idx.entry(key).or_default().push(entity_id);
181    }
182
183    /// Remove a specific entity_id from the graph label index (called on delete).
184    pub(crate) fn remove_from_graph_label_index(&self, collection: &str, entity_id: EntityId) {
185        let mut idx = self.graph_label_index.write();
186        for ((col, _), ids) in idx.iter_mut() {
187            if col == collection {
188                ids.retain(|&id| id != entity_id);
189            }
190        }
191        // Prune empty entries to keep the index compact
192        idx.retain(|_, ids| !ids.is_empty());
193    }
194
195    pub(crate) fn remove_from_graph_label_index_batch(
196        &self,
197        collection: &str,
198        entity_ids: &[EntityId],
199    ) {
200        if entity_ids.is_empty() {
201            return;
202        }
203        let id_set: std::collections::HashSet<EntityId> = entity_ids.iter().copied().collect();
204        let mut idx = self.graph_label_index.write();
205        for ((col, _), ids) in idx.iter_mut() {
206            if col == collection {
207                ids.retain(|id| !id_set.contains(id));
208            }
209        }
210        idx.retain(|_, ids| !ids.is_empty());
211    }
212
213    /// Look up entity IDs for a graph node label across all collections.
214    pub fn lookup_graph_nodes_by_label(&self, label: &str) -> Vec<EntityId> {
215        let idx = self.graph_label_index.read();
216        idx.iter()
217            .filter(|((_, l), _)| l == label)
218            .flat_map(|(_, ids)| ids.iter().copied())
219            .collect()
220    }
221
222    /// Look up entity IDs for a graph node label scoped to a single collection.
223    pub fn lookup_graph_nodes_by_label_in(&self, collection: &str, label: &str) -> Vec<EntityId> {
224        let idx = self.graph_label_index.read();
225        idx.get(&(collection.to_string(), label.to_string()))
226            .cloned()
227            .unwrap_or_default()
228    }
229
230    pub fn create_collection(&self, name: impl Into<String>) -> Result<(), StoreError> {
231        let name = name.into();
232        let mut collections = self.collections.write();
233
234        if collections.contains_key(&name) {
235            return Err(StoreError::CollectionExists(name));
236        }
237
238        let manager = SegmentManager::with_config(&name, self.config.manager_config.clone());
239        collections.insert(name.clone(), Arc::new(manager));
240        drop(collections);
241        self.mark_paged_registry_dirty();
242        self.finish_paged_write([StoreWalAction::CreateCollection { name }])?;
243
244        Ok(())
245    }
246
247    /// Get or create a collection
248    pub fn get_or_create_collection(&self, name: impl Into<String>) -> Arc<SegmentManager> {
249        let name = name.into();
250        // Fast path: shared read lock — zero contention for existing collections
251        {
252            let collections = self.collections.read();
253            if let Some(manager) = collections.get(&name) {
254                return Arc::clone(manager);
255            }
256        }
257        // Slow path: exclusive write lock — only when collection is missing
258        let mut collections = self.collections.write();
259        // Double-check after acquiring write lock (another thread may have created it)
260        if let Some(manager) = collections.get(&name) {
261            return Arc::clone(manager);
262        }
263        let manager = Arc::new(SegmentManager::with_config(
264            &name,
265            self.config.manager_config.clone(),
266        ));
267        collections.insert(name, Arc::clone(&manager));
268        self.mark_paged_registry_dirty();
269        manager
270    }
271
272    /// Get a collection
273    pub fn get_collection(&self, name: &str) -> Option<Arc<SegmentManager>> {
274        self.collections.read().get(name).map(Arc::clone)
275    }
276
277    /// Get the context index for cross-structure search.
278    pub fn context_index(&self) -> &ContextIndex {
279        &self.context_index
280    }
281
282    /// Drain WAL-replayed `VectorInsert` records for `collection`
283    /// (issue #694). Returns `None` when nothing was captured at open
284    /// time (in-memory mode, fresh database, or non-turbo collection).
285    /// One-shot: the caller owns the records after this call, the
286    /// store's internal buffer is cleared for that collection.
287    pub fn take_replayed_turbo_inserts(&self, collection: &str) -> Option<Vec<(u64, Vec<f32>)>> {
288        let mut map = self.replayed_turbo_inserts.lock();
289        map.remove(collection)
290    }
291
292    /// Set multiple config KV pairs at once from a JSON tree.
293    /// Keys are flattened with dot-notation: `{"a":{"b":1}}` → `a.b = 1`.
294    pub fn set_config_tree(&self, prefix: &str, json: &crate::serde_json::Value) -> usize {
295        let _ = self.get_or_create_collection("red_config");
296        let mut pairs = Vec::new();
297        flatten_config_json(prefix, json, &mut pairs);
298        let mut saved = 0;
299        for (key, value) in pairs {
300            let entity = UnifiedEntity::new(
301                EntityId::new(0),
302                EntityKind::TableRow {
303                    table: Arc::from("red_config"),
304                    row_id: 0,
305                },
306                EntityData::Row(RowData {
307                    columns: Vec::new(),
308                    named: Some(
309                        [
310                            ("key".to_string(), crate::storage::schema::Value::text(key)),
311                            ("value".to_string(), value),
312                        ]
313                        .into_iter()
314                        .collect(),
315                    ),
316                    schema: None,
317                }),
318            );
319            if self.insert_auto("red_config", entity).is_ok() {
320                saved += 1;
321            }
322        }
323        saved
324    }
325
326    /// Read a single config value from `red_config` by dot-notation key.
327    pub fn get_config(&self, key: &str) -> Option<crate::storage::schema::Value> {
328        let manager = self.get_collection("red_config")?;
329        for entity in manager.query_all(|_| true) {
330            if let EntityData::Row(row) = &entity.data {
331                if let Some(named) = &row.named {
332                    let key_matches = named
333                        .get("key")
334                        .and_then(|v| match v {
335                            crate::storage::schema::Value::Text(s) => Some(s.as_ref() == key),
336                            _ => None,
337                        })
338                        .unwrap_or(false);
339                    if key_matches {
340                        return named.get("value").cloned();
341                    }
342                }
343            }
344        }
345        None
346    }
347
348    /// List all collections
349    pub fn list_collections(&self) -> Vec<String> {
350        self.collections.read().keys().cloned().collect()
351    }
352
353    /// Drop a collection
354    pub fn drop_collection(&self, name: &str) -> Result<(), StoreError> {
355        let manager = {
356            let mut collections = self.collections.write();
357
358            collections
359                .remove(name)
360                .ok_or_else(|| StoreError::CollectionNotFound(name.to_string()))?
361        };
362
363        let entities = manager.query_all(|_| true);
364        let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
365
366        for entity_id in &entity_ids {
367            self.context_index.remove_entity(*entity_id);
368            let _ = self.unindex_cross_refs(*entity_id);
369        }
370
371        self.btree_indices.write().remove(name);
372
373        self.entity_cache.retain(|entity_id, (collection, _)| {
374            collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
375        });
376
377        self.cross_refs.write().retain(|source_id, refs| {
378            refs.retain(|(target_id, _, target_collection)| {
379                target_collection != name && !entity_ids.iter().any(|id| id == target_id)
380            });
381            !entity_ids.iter().any(|id| id == source_id)
382        });
383
384        self.reverse_refs.write().retain(|target_id, refs| {
385            refs.retain(|(source_id, _, source_collection)| {
386                source_collection != name && !entity_ids.iter().any(|id| id == source_id)
387            });
388            !entity_ids.iter().any(|id| id == target_id)
389        });
390
391        self.mark_paged_registry_dirty();
392        self.finish_paged_write([StoreWalAction::DropCollection {
393            name: name.to_string(),
394        }])?;
395
396        Ok(())
397    }
398
399    /// Insert an entity into a collection
400    pub fn insert(&self, collection: &str, entity: UnifiedEntity) -> Result<EntityId, StoreError> {
401        let manager = self
402            .get_collection(collection)
403            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
404
405        let mut entity = entity;
406        if entity.id.raw() == 0 {
407            entity.id = self.next_entity_id();
408        } else {
409            self.register_entity_id(entity.id);
410        }
411        // Assign per-table sequential row_id if not set
412        if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
413            if *row_id == 0 {
414                *row_id = manager.next_row_id();
415            } else {
416                manager.register_row_id(*row_id);
417            }
418        }
419        entity.ensure_table_logical_id();
420        // Capture graph node label before entity is moved into the manager
421        let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
422        {
423            Some(node.label.clone())
424        } else {
425            None
426        };
427
428        let id = manager.insert(entity)?;
429        self.register_entity_id(id);
430
431        // Update graph label index for GraphNode entities
432        if let Some(ref label) = graph_node_label {
433            self.update_graph_label_index(collection, label, id);
434        }
435
436        // Also insert into B-tree index if pager is active
437        let mut registry_dirty = false;
438        if let Some(pager) = &self.pager {
439            if let Some(entity) = manager.get(id) {
440                let mut btree_indices = self.btree_indices.write();
441                let btree = btree_indices
442                    .entry(collection.to_string())
443                    .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
444                let root_before = btree.root_page_id();
445
446                let key = id.raw().to_be_bytes();
447                let metadata = manager.get_metadata(id);
448                let value = Self::serialize_entity_record(
449                    &entity,
450                    metadata.as_ref(),
451                    self.format_version(),
452                );
453                // Ignore duplicate key errors (update scenario)
454                let _ = btree.insert(&key, &value);
455                registry_dirty = root_before != btree.root_page_id();
456            }
457        }
458
459        // Index cross-references if enabled
460        if self.config.auto_index_refs {
461            if let Some(entity) = manager.get(id) {
462                self.index_cross_refs(&entity, collection)?;
463            }
464        }
465
466        // Perf: skip WAL-action construction when the store is
467        // pagerless. For in-memory benchmarks this saved another
468        // `manager.get(id)` + `serialize_entity_record` per call.
469        if self.pager.is_some() {
470            let actions = manager
471                .get(id)
472                .map(|entity| {
473                    let metadata = manager.get_metadata(id);
474                    vec![StoreWalAction::upsert_entity(
475                        collection,
476                        &entity,
477                        metadata.as_ref(),
478                        self.format_version(),
479                    )]
480                })
481                .unwrap_or_default();
482            if registry_dirty {
483                self.mark_paged_registry_dirty();
484            }
485            self.finish_paged_write(actions)?;
486        }
487
488        Ok(id)
489    }
490
491    /// Turbo bulk insert — optimized fast path.
492    ///
493    /// Single lock for the entire batch. Skips bloom filter, memtable,
494    /// context index, and cross-ref indexing. B-tree writes are batched.
495    pub fn bulk_insert(
496        &self,
497        collection: &str,
498        mut entities: Vec<UnifiedEntity>,
499    ) -> Result<Vec<EntityId>, StoreError> {
500        // REDDB_BULK_TIMING=1 prints a per-call breakdown of the bulk
501        // insert path to stderr. Off by default — used by the reddb
502        // benchmark harness to locate ingest bottlenecks.
503        let trace = matches!(
504            std::env::var("REDDB_BULK_TIMING").ok().as_deref(),
505            Some("1") | Some("true") | Some("on")
506        );
507        let t_start = std::time::Instant::now();
508        let n = entities.len();
509        let manager = self.get_or_create_collection(collection);
510        let t_get_coll = t_start.elapsed();
511
512        // Assign IDs and per-table row_ids before serialization. Bulk
513        // insert must follow the same global ID semantics as
514        // insert()/insert_auto(). Reserve ID ranges in single fetch_add
515        // calls instead of one-per-entity. The overwhelming common case
516        // is "every entity comes in with id==0 and kind==TableRow with
517        // row_id==0" (the wire bulk insert path). Any entity that
518        // already carries an id or row_id falls back to the individual
519        // register_* path to preserve those semantics exactly.
520        let t0 = std::time::Instant::now();
521        let n_missing_entity_ids = entities.iter().filter(|e| e.id.raw() == 0).count() as u64;
522        let n_missing_row_ids = entities
523            .iter()
524            .filter(|e| matches!(e.kind, EntityKind::TableRow { row_id: 0, .. }))
525            .count() as u64;
526        let mut entity_id_range = if n_missing_entity_ids > 0 {
527            self.reserve_entity_ids(n_missing_entity_ids)
528        } else {
529            0..0
530        };
531        let mut row_id_range = if n_missing_row_ids > 0 {
532            manager.reserve_row_ids(n_missing_row_ids)
533        } else {
534            0..0
535        };
536        for entity in &mut entities {
537            if entity.id.raw() == 0 {
538                let next = entity_id_range
539                    .next()
540                    .expect("reserved entity-id range exhausted");
541                entity.id = EntityId::new(next);
542            } else {
543                self.register_entity_id(entity.id);
544            }
545            if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
546                if *row_id == 0 {
547                    *row_id = row_id_range
548                        .next()
549                        .expect("reserved row-id range exhausted");
550                } else {
551                    manager.register_row_id(*row_id);
552                }
553            }
554            entity.ensure_table_logical_id();
555        }
556        let t_assign_ids = t0.elapsed();
557
558        // Capture graph node labels before entities are moved into the segment manager
559        let graph_labels: Vec<Option<(String, EntityId)>> = entities
560            .iter()
561            .map(|e| {
562                if let EntityKind::GraphNode(ref node) = e.kind {
563                    Some((node.label.clone(), e.id))
564                } else {
565                    None
566                }
567            })
568            .collect();
569
570        // Pre-serialize for B-tree while we still have references.
571        // `serialize_entity_record` is pure; with `n >= 1024` the
572        // rayon dispatch cost is amortised and the 16-core serialize
573        // fan-out becomes the biggest single win on insert_bulk. For
574        // smaller batches, stay serial — micro-batches pay more in
575        // work-stealing overhead than they save.
576        let t0 = std::time::Instant::now();
577        let serialized: Option<Vec<(Vec<u8>, Vec<u8>)>> = if self.pager.is_some() {
578            let fv = self.format_version();
579            let serial_map = |e: &UnifiedEntity| {
580                (
581                    e.id.raw().to_be_bytes().to_vec(),
582                    Self::serialize_entity_record(e, None, fv),
583                )
584            };
585            // Gate chosen to match the bench's `BULK_BATCH_SIZE=1000`.
586            // Rayon's dispatch overhead is ~30µs/batch — on a 15-col
587            // row serializing for ~40µs the break-even is around
588            // 200-300 rows on a 16-core host. Keep a safety margin
589            // at 512.
590            if entities.len() >= 512 {
591                use rayon::prelude::*;
592                Some(entities.par_iter().map(serial_map).collect())
593            } else {
594                Some(entities.iter().map(serial_map).collect())
595            }
596        } else {
597            None
598        };
599        let t_serialize = t0.elapsed();
600
601        // Move entities into segment
602        let t0 = std::time::Instant::now();
603        let ids = manager.bulk_insert(entities)?;
604        let t_manager = t0.elapsed();
605        for id in &ids {
606            self.register_entity_id(*id);
607        }
608
609        // Update graph label index for bulk-inserted GraphNode entities
610        for (label, entity_id) in graph_labels.iter().flatten() {
611            self.update_graph_label_index(collection, label, *entity_id);
612        }
613
614        // REDDB_BULK_SKIP_PERSIST_UNSAFE=1 skips the persistent B-tree index
615        // during bulk ingest.
616        //
617        // UNSAFE: for ephemeral benchmark containers ONLY.
618        // This flag is silently ignored when a pager (durable storage) is active.
619        // In persistent mode, bulk inserts ALWAYS write to the B-tree so the data
620        // survives a cold restart without any manual rebuild step.
621        //
622        // The flag is only honoured when self.pager is None (in-memory / ephemeral).
623        let skip_btree_requested = matches!(
624            std::env::var("REDDB_BULK_SKIP_PERSIST_UNSAFE")
625                .ok()
626                .as_deref(),
627            Some("1") | Some("true") | Some("on")
628        );
629        // Honour the flag only when there is no durable pager.
630        // If a pager exists we are in persistent mode → always persist.
631        let skip_btree = skip_btree_requested && self.pager.is_none();
632        if skip_btree_requested && !skip_btree {
633            // Flag was set but we ignored it because we have a real pager.
634            static IGNORED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
635            IGNORED.get_or_init(|| {
636                tracing::warn!(
637                    "REDDB_BULK_SKIP_PERSIST_UNSAFE set but durable pager is \
638                     active — flag ignored; bulk inserts will be persisted normally"
639                );
640            });
641        } else if skip_btree {
642            // Ephemeral mode and flag is active — warn once.
643            static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
644            WARNED.get_or_init(|| {
645                tracing::warn!(
646                    "REDDB_BULK_SKIP_PERSIST_UNSAFE set (ephemeral/no-pager mode) — \
647                     bulk inserts NOT durable; data will be lost on restart"
648                );
649            });
650        }
651
652        // Batch B-tree write from pre-serialized data.
653        // Uses sorted bulk insert: walks to a leaf once, appends many entries,
654        // writes each leaf exactly once per batch — O(N) instead of O(N²).
655        let mut t_btree_lock = std::time::Duration::ZERO;
656        let mut t_btree_insert = std::time::Duration::ZERO;
657        let mut t_flush = std::time::Duration::ZERO;
658        if !skip_btree {
659            if let (Some(pager), Some(batch)) = (&self.pager, serialized.as_ref()) {
660                let t0 = std::time::Instant::now();
661                let mut btree_indices = self.btree_indices.write();
662                let btree = btree_indices
663                    .entry(collection.to_string())
664                    .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
665                let root_before = btree.root_page_id();
666                t_btree_lock = t0.elapsed();
667
668                let t0 = std::time::Instant::now();
669                let _ = btree.bulk_insert_sorted(batch);
670                t_btree_insert = t0.elapsed();
671                let registry_dirty = root_before != btree.root_page_id();
672
673                let t0 = std::time::Instant::now();
674                if registry_dirty {
675                    self.mark_paged_registry_dirty();
676                }
677                t_flush = t0.elapsed();
678            }
679        }
680
681        // Fold 25k per-entity `UpsertEntityRecord` actions into one
682        // `BulkUpsertEntityRecords` — same on-disk semantics (replay
683        // applies each record by id), one `WalRecord::PageWrite`
684        // instead of N, and `record.clone()` drops out because we
685        // consume `serialized`.
686        let actions = serialized
687            .map(|batch| {
688                let records: Vec<Vec<u8>> =
689                    batch.into_iter().map(|(_key, record)| record).collect();
690                vec![StoreWalAction::BulkUpsertEntityRecords {
691                    collection: collection.to_string(),
692                    records,
693                }]
694            })
695            .unwrap_or_default();
696        self.finish_paged_write(actions)?;
697
698        if trace {
699            tracing::debug!(
700                n,
701                total = ?t_start.elapsed(),
702                get_coll = ?t_get_coll,
703                assign = ?t_assign_ids,
704                serialize = ?t_serialize,
705                manager = ?t_manager,
706                btree_lock = ?t_btree_lock,
707                btree = ?t_btree_insert,
708                flush = ?t_flush,
709                "bulk_insert timing"
710            );
711        }
712
713        Ok(ids)
714    }
715
716    /// Insert an entity, creating collection if needed
717    pub fn insert_auto(
718        &self,
719        collection: &str,
720        entity: UnifiedEntity,
721    ) -> Result<EntityId, StoreError> {
722        let manager = self.get_or_create_collection(collection);
723        let mut entity = entity;
724        if entity.id.raw() == 0 {
725            entity.id = self.next_entity_id();
726        } else {
727            self.register_entity_id(entity.id);
728        }
729        // Assign per-table sequential row_id if not set
730        if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
731            if *row_id == 0 {
732                *row_id = manager.next_row_id();
733            } else {
734                manager.register_row_id(*row_id);
735            }
736        }
737        entity.ensure_table_logical_id();
738
739        // Capture graph node label before entity is moved into the manager
740        let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
741        {
742            Some(node.label.clone())
743        } else {
744            None
745        };
746        // Index into context index before consuming the entity
747        self.context_index.index_entity(collection, &entity);
748
749        // Pre-serialize the entity record (B-tree image == WAL action body)
750        // and index cross-refs while we still own the entity. Mirrors the
751        // bulk_insert path (see line 559-580 above): segment.insert assigns
752        // a fresh `sequence_id`, but the bulk path already accepts a
753        // sequence_id=0 on-disk image (replay calls manager.insert which
754        // re-assigns sequence_id from the segment's allocator), so the
755        // single-row path can do the same. Fresh inserts never carry
756        // metadata — set_metadata is a separate call — so we pass None,
757        // matching how the bulk path serializes records.
758        //
759        // Eliminates the post-insert `manager.get(id)` clone of the just-
760        // inserted UnifiedEntity (HashMap<String, Value> + per-field
761        // Strings). Per `docs/perf/insert_sequential-2026-05-05.md` P2 this
762        // was the dominant clone in the insert_sequential hot path.
763        let id_for_serialize = entity.id;
764        let serialized_record: Option<Vec<u8>> = if self.pager.is_some() {
765            Some(Self::serialize_entity_record(
766                &entity,
767                None,
768                self.format_version(),
769            ))
770        } else {
771            None
772        };
773        if self.config.auto_index_refs {
774            self.index_cross_refs(&entity, collection)?;
775        }
776
777        let id = manager.insert(entity)?;
778        debug_assert_eq!(id, id_for_serialize);
779        // `register_entity_id` already advances the atomic counter on
780        // the allocation path above (`self.next_entity_id()`), so the
781        // second call here is a no-op CAS loop on the hot path. Only
782        // needed for the caller-supplied-id branch which happens via
783        // the `register_entity_id` call on line 573.
784
785        // Update graph label index for GraphNode entities
786        if let Some(ref label) = graph_node_label {
787            self.update_graph_label_index(collection, label, id);
788        }
789
790        let mut registry_dirty = false;
791        if let (Some(_pager), Some(record)) = (&self.pager, serialized_record.as_ref()) {
792            if let Some(btree) = self.get_or_create_btree(collection) {
793                let root_before = btree.root_page_id();
794
795                let key = id.raw().to_be_bytes();
796                btree.insert(&key, record).map_err(|e| {
797                    StoreError::Io(std::io::Error::other(format!(
798                        "B-tree insert error while inserting '{collection}'/{id}: {e}"
799                    )))
800                })?;
801                registry_dirty = root_before != btree.root_page_id();
802            }
803        }
804
805        // Perf: pagerless → skip WAL-action construction (saves a
806        // third manager.get + entity serialize per insert). For
807        // in-memory runtimes finish_paged_write is a no-op.
808        if self.pager.is_some() {
809            let actions = serialized_record
810                .map(|record| {
811                    vec![StoreWalAction::UpsertEntityRecord {
812                        collection: collection.to_string(),
813                        record,
814                    }]
815                })
816                .unwrap_or_default();
817            if registry_dirty {
818                self.mark_paged_registry_dirty();
819            }
820            self.finish_paged_write(actions)?;
821        }
822        Ok(id)
823    }
824
825    /// Get an entity from a collection
826    ///
827    /// Prefers the live SegmentManager view so reads after update/delete observe
828    /// the current in-memory state even when the paged B-tree image has not been
829    /// refreshed yet. Falls back to the B-tree image for recovery-oriented reads.
830    pub fn get(&self, collection: &str, id: EntityId) -> Option<UnifiedEntity> {
831        // Prefer the live manager state to avoid stale reads after manager.update().
832        if let Some(entity) = self
833            .get_collection(collection)
834            .and_then(|manager| manager.get(id))
835        {
836            return Some(entity);
837        }
838
839        // Fall back to the paged B-tree image if the manager does not currently hold the row.
840        if self.pager.is_some() {
841            let btree_indices = self.btree_indices.read();
842            if let Some(btree) = btree_indices.get(collection) {
843                let key = id.raw().to_be_bytes();
844                if let Ok(Some(value)) = btree.get(&key) {
845                    if let Ok((entity, _)) =
846                        Self::deserialize_entity_record(&value, self.format_version())
847                    {
848                        return Some(entity);
849                    }
850                }
851            }
852        }
853
854        None
855    }
856
857    /// Resolve a table row by stable logical identity.
858    ///
859    /// Legacy rows use `logical_id == id`, so the physical-id lookup remains
860    /// the fast path. Versioned rows may have a different physical id and fall
861    /// back to a scan until the history-store/index slices add a dedicated map.
862    pub fn get_table_row_by_logical_id(
863        &self,
864        collection: &str,
865        logical_id: EntityId,
866    ) -> Option<UnifiedEntity> {
867        if let Some(entity) = self.get(collection, logical_id) {
868            if matches!(entity.kind, EntityKind::TableRow { .. })
869                && entity.logical_id() == logical_id
870                && entity.xmax == 0
871            {
872                return Some(entity);
873            }
874        }
875
876        let manager = self.get_collection(collection)?;
877        let mut matches = manager.query_all(|entity| {
878            matches!(entity.kind, EntityKind::TableRow { .. }) && entity.logical_id() == logical_id
879        });
880        matches
881            .iter()
882            .find(|entity| entity.xmax == 0)
883            .cloned()
884            .or_else(|| matches.pop())
885    }
886
887    pub fn table_row_versions_by_logical_id(
888        &self,
889        collection: &str,
890        logical_id: EntityId,
891    ) -> Vec<UnifiedEntity> {
892        self.get_collection(collection)
893            .map(|manager| {
894                manager.query_all(|entity| {
895                    matches!(entity.kind, EntityKind::TableRow { .. })
896                        && entity.logical_id() == logical_id
897                })
898            })
899            .unwrap_or_default()
900    }
901
902    pub fn vacuum_mvcc_history(
903        &self,
904        collection: &str,
905        cutoff_xid: u64,
906    ) -> Result<MvccVacuumStats, StoreError> {
907        let manager = self
908            .get_collection(collection)
909            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
910        let entities =
911            manager.query_all(|entity| matches!(entity.kind, EntityKind::TableRow { .. }));
912        let mut logical = HashMap::<EntityId, (bool, u64)>::new();
913        for entity in &entities {
914            let entry = logical.entry(entity.logical_id()).or_insert((false, 0));
915            if entity.xmax == 0 {
916                entry.0 = true;
917            }
918            entry.1 = entry.1.max(entity.xmin);
919        }
920
921        let mut stats = MvccVacuumStats::default();
922        let mut reclaim_ids = Vec::new();
923        for entity in entities {
924            if entity.xmax == 0 {
925                continue;
926            }
927            stats.scanned_versions += 1;
928            let (has_live_version, max_xmin) = logical
929                .get(&entity.logical_id())
930                .copied()
931                .unwrap_or((false, entity.xmin));
932            let is_delete_tombstone = !has_live_version && entity.xmin == max_xmin;
933            if entity.xmax < cutoff_xid {
934                stats.reclaimed_versions += 1;
935                if is_delete_tombstone {
936                    stats.reclaimed_tombstones += 1;
937                } else {
938                    stats.reclaimed_history_versions += 1;
939                }
940                reclaim_ids.push(entity.id);
941            } else {
942                stats.retained_versions += 1;
943                if is_delete_tombstone {
944                    stats.retained_tombstones += 1;
945                } else {
946                    stats.retained_history_versions += 1;
947                }
948            }
949        }
950
951        if !reclaim_ids.is_empty() {
952            self.delete_batch(collection, &reclaim_ids)?;
953        }
954        Ok(stats)
955    }
956
957    pub(crate) fn install_versioned_table_row_update(
958        &self,
959        collection: &str,
960        old_version: UnifiedEntity,
961        mut new_version: UnifiedEntity,
962        metadata: Option<&Metadata>,
963    ) -> Result<(), StoreError> {
964        let manager = self
965            .get_collection(collection)
966            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
967
968        let old_id = old_version.id;
969        let new_id = new_version.id;
970        let inherited_metadata = metadata.cloned().or_else(|| manager.get_metadata(old_id));
971
972        self.entity_cache.remove(old_id.raw());
973        self.entity_cache.remove(new_id.raw());
974        manager.update(old_version.clone())?;
975        self.context_index.remove_entity(old_id);
976
977        self.register_entity_id(new_version.id);
978        if let EntityKind::TableRow { ref mut row_id, .. } = new_version.kind {
979            if *row_id == 0 {
980                *row_id = manager.next_row_id();
981            } else {
982                manager.register_row_id(*row_id);
983            }
984        }
985        new_version.ensure_table_logical_id();
986        manager.insert(new_version.clone())?;
987        if let Some(metadata) = inherited_metadata {
988            manager.set_metadata(new_id, metadata)?;
989        }
990        self.context_index.index_entity(collection, &new_version);
991        if self.config.auto_index_refs {
992            self.index_cross_refs(&new_version, collection)?;
993        }
994
995        let old_metadata = manager.get_metadata(old_id);
996        let new_metadata = manager.get_metadata(new_id);
997        let fv = self.format_version();
998        let records = vec![
999            Self::serialize_entity_record(&old_version, old_metadata.as_ref(), fv),
1000            Self::serialize_entity_record(&new_version, new_metadata.as_ref(), fv),
1001        ];
1002        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
1003            collection: collection.to_string(),
1004            records,
1005        }])?;
1006
1007        Ok(())
1008    }
1009
1010    /// Batch-fetch multiple entities from the same collection in minimal lock acquisitions.
1011    ///
1012    /// Preferred over N individual `get()` calls in indexed-scan loops (sorted index,
1013    /// bitmap, hash). Reduces lock acquisitions from N×3 to 2-3 total.
1014    /// Preserves input order: `result[i]` corresponds to `ids[i]`.
1015    pub fn get_batch(&self, collection: &str, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1016        match self.get_collection(collection) {
1017            Some(manager) => manager.get_many(ids),
1018            None => vec![None; ids.len()],
1019        }
1020    }
1021
1022    /// Get an entity from any collection
1023    pub fn get_any(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1024        // Sharded LRU probe: hits / misses are counted by `EntityCache::get`,
1025        // so external observers can read `entity_cache_hit_rate()` without
1026        // any extra wiring on the call site.
1027        if let Some(cached) = self.entity_cache.get(id.raw()) {
1028            return Some(cached);
1029        }
1030
1031        // Cache miss — fall back to the full collection scan, then memoise.
1032        let collections = self.collections.read();
1033        for (name, manager) in collections.iter() {
1034            if let Some(entity) = manager.get(id) {
1035                let result = (name.clone(), entity);
1036                // Drop the collections read guard before taking any cache
1037                // lock — the cache shards are independent, but releasing
1038                // here keeps `collections` contention low.
1039                drop(collections);
1040                self.entity_cache.insert(id.raw(), result.clone());
1041                return Some(result);
1042            }
1043        }
1044        None
1045    }
1046
1047    /// Hit rate of the store's entity cache (`get_any` lookups).
1048    ///
1049    /// Returns `None` until the cache has served at least one lookup.
1050    /// Exposed for observability — e.g. dashboards distinguishing graph
1051    /// workloads (high hit rate) from OLTP DML (≈ 0 % hit rate).
1052    pub fn entity_cache_hit_rate(&self) -> Option<f64> {
1053        self.entity_cache.hit_rate()
1054    }
1055
1056    /// Snapshot of cache hit / miss / eviction counters and current size.
1057    pub fn entity_cache_stats(&self) -> super::super::entity_cache::EntityCacheStats {
1058        self.entity_cache.stats()
1059    }
1060
1061    /// Delete an entity
1062    pub fn delete(&self, collection: &str, id: EntityId) -> Result<bool, StoreError> {
1063        // Invalidate entity cache (read-lock probe; no write lock when cold).
1064        self.entity_cache.remove(id.raw());
1065        let manager = self
1066            .get_collection(collection)
1067            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1068
1069        let deleted = manager.delete(id)?;
1070        if !deleted {
1071            return Ok(false);
1072        }
1073
1074        // Remove from B-tree index if active
1075        let mut registry_dirty = false;
1076        if self.pager.is_some() {
1077            let btree_indices = self.btree_indices.read();
1078            if let Some(btree) = btree_indices.get(collection) {
1079                let root_before = btree.root_page_id();
1080                let key = id.raw().to_be_bytes();
1081                let _ = btree.delete(&key);
1082                registry_dirty = root_before != btree.root_page_id();
1083            }
1084        }
1085
1086        // Remove cross-references
1087        self.unindex_cross_refs(id)?;
1088
1089        // Remove from graph label index
1090        self.remove_from_graph_label_index(collection, id);
1091
1092        if registry_dirty {
1093            self.mark_paged_registry_dirty();
1094        }
1095        self.finish_paged_write([StoreWalAction::DeleteEntityRecord {
1096            collection: collection.to_string(),
1097            entity_id: id.raw(),
1098        }])?;
1099
1100        Ok(true)
1101    }
1102
1103    pub fn delete_batch(
1104        &self,
1105        collection: &str,
1106        ids: &[EntityId],
1107    ) -> Result<Vec<EntityId>, StoreError> {
1108        if ids.is_empty() {
1109            return Ok(Vec::new());
1110        }
1111
1112        // Sharded invalidation. The bounded LRU's shard write lock is
1113        // only acquired when the shard actually carries one of these ids,
1114        // so the bench-typical zero-hit `delete_sequential` workload never
1115        // takes a write lock here at all.
1116        self.entity_cache.remove_many(ids.iter().map(|id| id.raw()));
1117
1118        let manager = self
1119            .get_collection(collection)
1120            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1121
1122        let deleted_ids = manager.delete_batch(ids)?;
1123        if deleted_ids.is_empty() {
1124            return Ok(deleted_ids);
1125        }
1126
1127        let mut registry_dirty = false;
1128        if self.pager.is_some() {
1129            let btree_indices = self.btree_indices.read();
1130            if let Some(btree) = btree_indices.get(collection) {
1131                let root_before = btree.root_page_id();
1132                for id in &deleted_ids {
1133                    let key = id.raw().to_be_bytes();
1134                    let _ = btree.delete(&key);
1135                }
1136                registry_dirty = root_before != btree.root_page_id();
1137            }
1138        }
1139
1140        self.unindex_cross_refs_batch(&deleted_ids)?;
1141        self.remove_from_graph_label_index_batch(collection, &deleted_ids);
1142        if registry_dirty {
1143            self.mark_paged_registry_dirty();
1144        }
1145        let actions = deleted_ids
1146            .iter()
1147            .map(|id| StoreWalAction::DeleteEntityRecord {
1148                collection: collection.to_string(),
1149                entity_id: id.raw(),
1150            })
1151            .collect::<Vec<_>>();
1152        self.finish_paged_write(actions)?;
1153
1154        Ok(deleted_ids)
1155    }
1156
1157    /// Set metadata for an entity
1158    pub fn set_metadata(
1159        &self,
1160        collection: &str,
1161        id: EntityId,
1162        metadata: Metadata,
1163    ) -> Result<(), StoreError> {
1164        let manager = self
1165            .get_collection(collection)
1166            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1167
1168        manager.set_metadata(id, metadata)?;
1169        if let Some(entity) = manager.get(id) {
1170            self.persist_entities_to_pager(collection, std::slice::from_ref(&entity))?;
1171        }
1172        Ok(())
1173    }
1174
1175    /// Get metadata for an entity
1176    pub fn get_metadata(&self, collection: &str, id: EntityId) -> Option<Metadata> {
1177        self.get_collection(collection)?.get_metadata(id)
1178    }
1179
1180    /// Add a cross-reference between entities
1181    pub fn add_cross_ref(
1182        &self,
1183        source_collection: &str,
1184        source_id: EntityId,
1185        target_collection: &str,
1186        target_id: EntityId,
1187        ref_type: RefType,
1188        weight: f32,
1189    ) -> Result<(), StoreError> {
1190        // Check source exists
1191        let source_manager = self
1192            .get_collection(source_collection)
1193            .ok_or_else(|| StoreError::CollectionNotFound(source_collection.to_string()))?;
1194
1195        if source_manager.get(source_id).is_none() {
1196            return Err(StoreError::EntityNotFound(source_id));
1197        }
1198
1199        // Check target exists
1200        let target_manager = self
1201            .get_collection(target_collection)
1202            .ok_or_else(|| StoreError::CollectionNotFound(target_collection.to_string()))?;
1203
1204        if target_manager.get(target_id).is_none() {
1205            return Err(StoreError::EntityNotFound(target_id));
1206        }
1207
1208        // Check limits
1209        let current_refs = self
1210            .cross_refs
1211            .read()
1212            .get(&source_id)
1213            .map_or(0, |v| v.len());
1214
1215        if current_refs >= self.config.max_cross_refs {
1216            return Err(StoreError::TooManyRefs(source_id));
1217        }
1218
1219        let mut registry_dirty = false;
1220        {
1221            let mut forward = self.cross_refs.write();
1222            let refs = forward.entry(source_id).or_default();
1223            let inserted = !refs.iter().any(|(id, kind, coll)| {
1224                *id == target_id && *kind == ref_type && coll == target_collection
1225            });
1226            if inserted {
1227                refs.push((target_id, ref_type, target_collection.to_string()));
1228                registry_dirty = true;
1229            }
1230        }
1231
1232        {
1233            let mut reverse = self.reverse_refs.write();
1234            let refs = reverse.entry(target_id).or_default();
1235            let inserted = !refs.iter().any(|(id, kind, coll)| {
1236                *id == source_id && *kind == ref_type && coll == source_collection
1237            });
1238            if inserted {
1239                refs.push((source_id, ref_type, source_collection.to_string()));
1240                registry_dirty = true;
1241            }
1242        }
1243
1244        if let Some(mut entity) = source_manager.get(source_id) {
1245            if !entity.cross_refs().iter().any(|xref| {
1246                xref.target == target_id
1247                    && xref.ref_type == ref_type
1248                    && xref.target_collection == target_collection
1249            }) {
1250                let cross_ref = CrossRef::with_weight(
1251                    source_id,
1252                    target_id,
1253                    target_collection,
1254                    ref_type,
1255                    weight,
1256                );
1257                entity.add_cross_ref(cross_ref);
1258                let _ = source_manager.update(entity.clone());
1259                registry_dirty = true;
1260                self.persist_entities_to_pager(source_collection, std::slice::from_ref(&entity))?;
1261            }
1262        }
1263
1264        if registry_dirty {
1265            self.mark_paged_registry_dirty();
1266            if matches!(
1267                self.config.durability_mode,
1268                crate::api::DurabilityMode::Strict
1269            ) {
1270                self.flush_paged_state()?;
1271            }
1272        }
1273
1274        Ok(())
1275    }
1276
1277    /// Get cross-references from an entity
1278    pub fn get_refs_from(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1279        self.cross_refs.read().get(&id).cloned().unwrap_or_default()
1280    }
1281
1282    /// Get cross-references to an entity
1283    pub fn get_refs_to(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1284        self.reverse_refs
1285            .read()
1286            .get(&id)
1287            .cloned()
1288            .unwrap_or_default()
1289    }
1290
1291    /// Expand cross-references to get related entities
1292    pub fn expand_refs(
1293        &self,
1294        id: EntityId,
1295        depth: u32,
1296        ref_types: Option<&[RefType]>,
1297    ) -> Vec<(UnifiedEntity, u32, RefType)> {
1298        let mut results = Vec::new();
1299        let mut visited = std::collections::HashSet::new();
1300        visited.insert(id);
1301
1302        self.expand_refs_recursive(id, depth, ref_types, &mut visited, &mut results, 1);
1303
1304        results
1305    }
1306
1307    fn expand_refs_recursive(
1308        &self,
1309        id: EntityId,
1310        max_depth: u32,
1311        ref_types: Option<&[RefType]>,
1312        visited: &mut std::collections::HashSet<EntityId>,
1313        results: &mut Vec<(UnifiedEntity, u32, RefType)>,
1314        current_depth: u32,
1315    ) {
1316        if current_depth > max_depth {
1317            return;
1318        }
1319
1320        for (target_id, ref_type, target_collection) in self.get_refs_from(id) {
1321            if visited.contains(&target_id) {
1322                continue;
1323            }
1324
1325            if let Some(types) = ref_types {
1326                if !types.contains(&ref_type) {
1327                    continue;
1328                }
1329            }
1330
1331            visited.insert(target_id);
1332
1333            if let Some(entity) = self.get(&target_collection, target_id) {
1334                results.push((entity, current_depth, ref_type));
1335
1336                // Recurse
1337                self.expand_refs_recursive(
1338                    target_id,
1339                    max_depth,
1340                    ref_types,
1341                    visited,
1342                    results,
1343                    current_depth + 1,
1344                );
1345            }
1346        }
1347    }
1348
1349    /// Index cross-references from an entity
1350    pub(crate) fn index_cross_refs(
1351        &self,
1352        entity: &UnifiedEntity,
1353        collection: &str,
1354    ) -> Result<(), StoreError> {
1355        let mut registry_dirty = false;
1356        for cross_ref in entity.cross_refs() {
1357            if cross_ref.target_collection.is_empty() {
1358                continue;
1359            }
1360            {
1361                let mut forward = self.cross_refs.write();
1362                let refs = forward.entry(cross_ref.source).or_default();
1363                let inserted = !refs.iter().any(|(id, kind, coll)| {
1364                    *id == cross_ref.target
1365                        && *kind == cross_ref.ref_type
1366                        && coll == &cross_ref.target_collection
1367                });
1368                if inserted {
1369                    refs.push((
1370                        cross_ref.target,
1371                        cross_ref.ref_type,
1372                        cross_ref.target_collection.clone(),
1373                    ));
1374                    registry_dirty = true;
1375                }
1376            }
1377
1378            {
1379                let mut reverse = self.reverse_refs.write();
1380                let refs = reverse.entry(cross_ref.target).or_default();
1381                let inserted = !refs.iter().any(|(id, kind, coll)| {
1382                    *id == cross_ref.source && *kind == cross_ref.ref_type && coll == collection
1383                });
1384                if inserted {
1385                    refs.push((cross_ref.source, cross_ref.ref_type, collection.to_string()));
1386                    registry_dirty = true;
1387                }
1388            }
1389        }
1390
1391        if registry_dirty {
1392            self.mark_paged_registry_dirty();
1393        }
1394
1395        Ok(())
1396    }
1397
1398    /// Remove cross-references for an entity
1399    pub(crate) fn unindex_cross_refs(&self, id: EntityId) -> Result<(), StoreError> {
1400        // Remove forward refs
1401        self.cross_refs.write().remove(&id);
1402
1403        // Remove from reverse refs (scan all)
1404        let mut reverse = self.reverse_refs.write();
1405        for refs in reverse.values_mut() {
1406            refs.retain(|(source, _, _)| *source != id);
1407        }
1408        reverse.remove(&id);
1409        self.mark_paged_registry_dirty();
1410
1411        Ok(())
1412    }
1413
1414    pub(crate) fn unindex_cross_refs_batch(&self, ids: &[EntityId]) -> Result<(), StoreError> {
1415        if ids.is_empty() {
1416            return Ok(());
1417        }
1418
1419        let id_set: std::collections::HashSet<EntityId> = ids.iter().copied().collect();
1420
1421        // Read-only probe: under steady-state delete workloads most rows have
1422        // no cross-refs at all, so both maps are entirely cold for the batch.
1423        // Acquiring the write lock to scan `reverse_refs.values_mut()` over an
1424        // unrelated dictionary was the dominant cost in #85's `delete_sequential`
1425        // profile. The read-lock probe below is O(|ids|) hashmap lookups against
1426        // each map, vs. O(|reverse_refs|) for the write-path scan it replaces.
1427        //
1428        // Decision: skip the write path iff
1429        //   - no deleted id is a key in `cross_refs`  (no outbound refs to drop,
1430        //     and so no `(deleted_id, _, _)` source entry exists anywhere in
1431        //     `reverse_refs.values()` either — the two maps are kept in sync by
1432        //     `add_cross_ref`), AND
1433        //   - no deleted id is a key in `reverse_refs` (no inbound refs to drop).
1434        let needs_forward_cleanup = {
1435            let forward = self.cross_refs.read();
1436            id_set.iter().any(|id| forward.contains_key(id))
1437        };
1438        let needs_reverse_cleanup = {
1439            let reverse = self.reverse_refs.read();
1440            id_set.iter().any(|id| reverse.contains_key(id))
1441        };
1442
1443        if !needs_forward_cleanup && !needs_reverse_cleanup {
1444            self.unindex_cross_refs_fast_path
1445                .fetch_add(1, Ordering::Relaxed);
1446            return Ok(());
1447        }
1448
1449        if needs_forward_cleanup {
1450            let mut forward = self.cross_refs.write();
1451            for id in &id_set {
1452                forward.remove(id);
1453            }
1454        }
1455
1456        if needs_reverse_cleanup || needs_forward_cleanup {
1457            // The inner `values_mut()` scan is only needed when at least one
1458            // deleted id was a *source* somewhere — which, by the invariant
1459            // above, can only happen when it had outbound forward refs.
1460            let mut reverse = self.reverse_refs.write();
1461            if needs_forward_cleanup {
1462                for refs in reverse.values_mut() {
1463                    refs.retain(|(source, _, _)| !id_set.contains(source));
1464                }
1465            }
1466            reverse.retain(|target, refs| !id_set.contains(target) && !refs.is_empty());
1467        }
1468        self.mark_paged_registry_dirty();
1469
1470        Ok(())
1471    }
1472
1473    /// Test/observability hook: number of times `unindex_cross_refs_batch`
1474    /// took the read-only fast path. Used to pin the early-exit in tests
1475    /// and as a cheap signal in delete-heavy benchmarks.
1476    pub fn unindex_cross_refs_fast_path_hits(&self) -> u64 {
1477        self.unindex_cross_refs_fast_path.load(Ordering::Relaxed)
1478    }
1479
1480    /// Query across all collections with a filter
1481    pub fn query_all<F>(&self, filter: F) -> Vec<(String, UnifiedEntity)>
1482    where
1483        F: Fn(&UnifiedEntity) -> bool + Clone + Send + Sync,
1484    {
1485        // Issue #815 — snapshot the (name, manager) handles under a brief
1486        // read lock, then release the `collections` lock *before* the
1487        // potentially long per-collection scan. Holding `collections.read()`
1488        // across a full-store scan let a large `/catalog` snapshot block (and
1489        // be blocked by) the replica apply loop's collection-creation writes:
1490        // under parking_lot a writer waiting behind the long read parks every
1491        // subsequent reader, so `/catalog` and readiness wedged during a big
1492        // WAL re-apply. The managers are `Arc`-shared with their own internal
1493        // locks, so scanning them off the map lock is safe and keeps the HTTP
1494        // surface responsive.
1495        let pairs: Vec<(String, Arc<SegmentManager>)> = {
1496            let collections = self.collections.read();
1497            collections
1498                .iter()
1499                .map(|(name, mgr)| (name.clone(), Arc::clone(mgr)))
1500                .collect()
1501        };
1502
1503        let use_parallel = pairs.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1504        if !use_parallel {
1505            // Single collection — no parallelism overhead
1506            return pairs
1507                .into_iter()
1508                .flat_map(|(name, mgr)| {
1509                    mgr.query_all(filter.clone())
1510                        .into_iter()
1511                        .map(move |e| (name.clone(), e))
1512                })
1513                .collect();
1514        }
1515
1516        // Multiple collections — scan in parallel
1517        let filter_ref = &filter;
1518        let collection_results: Vec<Vec<(String, UnifiedEntity)>> = std::thread::scope(|s| {
1519            pairs
1520                .iter()
1521                .map(|(name, manager)| {
1522                    let name = (*name).clone();
1523                    s.spawn(move || {
1524                        manager
1525                            .query_all(|e| filter_ref(e))
1526                            .into_iter()
1527                            .map(|e| (name.clone(), e))
1528                            .collect::<Vec<_>>()
1529                    })
1530                })
1531                .collect::<Vec<_>>()
1532                .into_iter()
1533                .map(|h| h.join().unwrap_or_default())
1534                .collect()
1535        });
1536
1537        collection_results.into_iter().flatten().collect()
1538    }
1539
1540    /// Filter by metadata across all collections
1541    pub fn filter_metadata_all(
1542        &self,
1543        filters: &[(String, MetadataFilter)],
1544    ) -> Vec<(String, EntityId)> {
1545        let mut results = Vec::new();
1546        let collections = self.collections.read();
1547
1548        for (name, manager) in collections.iter() {
1549            for id in manager.filter_metadata(filters) {
1550                results.push((name.clone(), id));
1551            }
1552        }
1553
1554        results
1555    }
1556
1557    /// Get statistics
1558    pub fn stats(&self) -> StoreStats {
1559        // Issue #815 — snapshot the manager handles, drop the `collections`
1560        // read lock, then gather per-collection stats off the map lock. The
1561        // readiness probe (via `health()`) walks `stats()`, so holding the
1562        // store lock across a full-store stats scan was part of the same
1563        // wedge that froze `/catalog` during a large replica re-apply.
1564        let pairs: Vec<(String, Arc<SegmentManager>)> = {
1565            let collections = self.collections.read();
1566            collections
1567                .iter()
1568                .map(|(name, mgr)| (name.clone(), Arc::clone(mgr)))
1569                .collect()
1570        };
1571
1572        let mut stats = StoreStats {
1573            collection_count: pairs.len(),
1574            ..Default::default()
1575        };
1576
1577        for (name, manager) in &pairs {
1578            let manager_stats = manager.stats();
1579            stats.total_entities += manager_stats.total_entities;
1580            stats.total_memory_bytes += manager_stats.total_memory_bytes;
1581            stats.collections.insert(name.clone(), manager_stats);
1582        }
1583
1584        stats
1585    }
1586
1587    /// Run maintenance on all collections
1588    pub fn run_maintenance(&self) -> Result<(), StoreError> {
1589        let collections = self.collections.read();
1590        for manager in collections.values() {
1591            manager.run_maintenance()?;
1592        }
1593        Ok(())
1594    }
1595}
1596
1597/// Flatten a JSON value into dot-notation key-value pairs for red_config.
1598fn flatten_config_json(
1599    prefix: &str,
1600    value: &crate::serde_json::Value,
1601    out: &mut Vec<(String, crate::storage::schema::Value)>,
1602) {
1603    use crate::storage::schema::Value;
1604    match value {
1605        crate::serde_json::Value::Object(map) => {
1606            for (k, v) in map {
1607                let key = if prefix.is_empty() {
1608                    k.clone()
1609                } else {
1610                    format!("{prefix}.{k}")
1611                };
1612                flatten_config_json(&key, v, out);
1613            }
1614        }
1615        crate::serde_json::Value::String(s) => {
1616            out.push((prefix.to_string(), Value::text(s.clone())));
1617        }
1618        crate::serde_json::Value::Number(n) => {
1619            if n.fract().abs() < f64::EPSILON {
1620                out.push((prefix.to_string(), Value::UnsignedInteger(*n as u64)));
1621            } else {
1622                out.push((prefix.to_string(), Value::Float(*n)));
1623            }
1624        }
1625        crate::serde_json::Value::Bool(b) => {
1626            out.push((prefix.to_string(), Value::Boolean(*b)));
1627        }
1628        crate::serde_json::Value::Null => {
1629            out.push((prefix.to_string(), Value::Null));
1630        }
1631        crate::serde_json::Value::Array(arr) => {
1632            let json_str = crate::serde_json::to_string(value).unwrap_or_default();
1633            out.push((prefix.to_string(), Value::text(json_str)));
1634        }
1635    }
1636}