Skip to main content

reddb_server/storage/unified/store/
impl_entities.rs

1use super::*;
2
3impl UnifiedStore {
4    pub(crate) fn persist_entities_to_pager(
5        &self,
6        collection: &str,
7        entities: &[UnifiedEntity],
8    ) -> Result<(), StoreError> {
9        self.persist_entities_impl(collection, entities, /* skip_btree= */ false)
10    }
11
12    /// Same as `persist_entities_to_pager` but takes `&[&UnifiedEntity]` so
13    /// callers that already hold references (SQL UPDATE fast path) don't
14    /// have to clone a `Vec<UnifiedEntity>` just to satisfy the signature.
15    pub(crate) fn persist_entity_refs_to_pager(
16        &self,
17        collection: &str,
18        entities: &[&UnifiedEntity],
19    ) -> Result<(), StoreError> {
20        self.persist_entity_refs_impl(collection, entities, /* skip_btree= */ false)
21    }
22
23    /// Reference-taking WAL-only variant — see
24    /// `persist_entities_to_pager_wal_only` for semantics.
25    pub(crate) fn persist_entity_refs_to_pager_wal_only(
26        &self,
27        collection: &str,
28        entities: &[&UnifiedEntity],
29    ) -> Result<(), StoreError> {
30        self.persist_entity_refs_impl(collection, entities, /* skip_btree= */ true)
31    }
32
33    fn persist_entity_refs_impl(
34        &self,
35        collection: &str,
36        entities: &[&UnifiedEntity],
37        skip_btree: bool,
38    ) -> Result<(), StoreError> {
39        if entities.is_empty() {
40            return Ok(());
41        }
42        if self.pager.is_none() && self.config.embedded_wal_path.is_none() {
43            return Ok(());
44        }
45        let fv = self.format_version();
46        let manager = self
47            .get_collection(collection)
48            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
49        let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
50            .iter()
51            .map(|entity| {
52                let metadata = manager.get_metadata(entity.id);
53                (
54                    entity.id.raw().to_be_bytes().to_vec(),
55                    Self::serialize_entity_record(entity, metadata.as_ref(), fv),
56                )
57            })
58            .collect();
59        serialized.sort_by(|a, b| a.0.cmp(&b.0));
60
61        if !skip_btree {
62            let Some(pager) = &self.pager else {
63                let records: Vec<Vec<u8>> =
64                    serialized.into_iter().map(|(_id, record)| record).collect();
65                self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
66                    collection: collection.to_string(),
67                    records,
68                }])?;
69                return Ok(());
70            };
71            let mut btree_indices = self.btree_indices.write();
72            let btree = btree_indices
73                .entry(collection.to_string())
74                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
75            let root_before = btree.root_page_id();
76            btree.upsert_batch_sorted(&serialized).map_err(|e| {
77                StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
78            })?;
79            let root_after = btree.root_page_id();
80            drop(btree_indices);
81            if root_before != root_after {
82                self.mark_paged_registry_dirty();
83            }
84        }
85        let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
86        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
87            collection: collection.to_string(),
88            records,
89        }])?;
90        Ok(())
91    }
92
93    /// PG-HOT-like UPDATE persist: emit the WAL record so the update
94    /// survives a crash, but skip the in-line B-tree upsert. Reads
95    /// continue to see the new entity because `manager.get()` prefers
96    /// the segment over the B-tree; WAL replay re-applies the upsert
97    /// to the B-tree on recovery so the two views converge before
98    /// any stale B-tree page can be surfaced.
99    ///
100    /// Safe to use only when the caller has already updated the
101    /// segment in-place (so live reads see the new value). Cuts out
102    /// the dominant cost of single-row UPDATEs: per-entity serialize
103    /// + leaf descent + page decompress / compress / write_page.
104    pub(crate) fn persist_entities_to_pager_wal_only(
105        &self,
106        collection: &str,
107        entities: &[UnifiedEntity],
108    ) -> Result<(), StoreError> {
109        self.persist_entities_impl(collection, entities, /* skip_btree= */ true)
110    }
111
112    fn persist_entities_impl(
113        &self,
114        collection: &str,
115        entities: &[UnifiedEntity],
116        skip_btree: bool,
117    ) -> Result<(), StoreError> {
118        if entities.is_empty() {
119            return Ok(());
120        }
121
122        if self.pager.is_none() && self.config.embedded_wal_path.is_none() {
123            return Ok(());
124        }
125
126        let fv = self.format_version();
127        let manager = self
128            .get_collection(collection)
129            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
130        let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
131            .iter()
132            .map(|entity| {
133                let metadata = manager.get_metadata(entity.id);
134                (
135                    entity.id.raw().to_be_bytes().to_vec(),
136                    Self::serialize_entity_record(entity, metadata.as_ref(), fv),
137                )
138            })
139            .collect();
140        // u64 IDs encoded as big-endian — lex order = numeric order.
141        serialized.sort_by(|a, b| a.0.cmp(&b.0));
142
143        if !skip_btree {
144            let Some(pager) = &self.pager else {
145                let records: Vec<Vec<u8>> =
146                    serialized.into_iter().map(|(_id, record)| record).collect();
147                self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
148                    collection: collection.to_string(),
149                    records,
150                }])?;
151                return Ok(());
152            };
153            let mut btree_indices = self.btree_indices.write();
154            let btree = btree_indices
155                .entry(collection.to_string())
156                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
157            let root_before = btree.root_page_id();
158
159            // Walks each distinct leaf once, applies all in-place overwrites
160            // that belong there under one read+write. Keys that miss or grow
161            // fall back to the per-key `upsert` path internally.
162            btree.upsert_batch_sorted(&serialized).map_err(|e| {
163                StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
164            })?;
165            let root_after = btree.root_page_id();
166            drop(btree_indices);
167            if root_before != root_after {
168                self.mark_paged_registry_dirty();
169            }
170        }
171        // One WAL action covering the whole bulk. Previously each entity
172        // produced its own `UpsertEntityRecord` action, which then became
173        // its own `WalRecord::PageWrite` triple inside
174        // `StoreCommitCoordinator::append_actions` — N WAL records per
175        // bulk. The batched variant is one WAL record per bulk.
176        //
177        // WAL is emitted even in the `skip_btree` variant — recovery
178        // replay reapplies the upsert to the B-tree, so durability
179        // is unchanged.
180        let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
181        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
182            collection: collection.to_string(),
183            records,
184        }])?;
185
186        Ok(())
187    }
188
189    /// Insert a label→entity_id mapping into the graph label index.
190    pub(crate) fn update_graph_label_index(
191        &self,
192        collection: &str,
193        label: &str,
194        entity_id: EntityId,
195    ) {
196        let key = (collection.to_string(), label.to_string());
197        let mut idx = self.graph_label_index.write();
198        idx.entry(key).or_default().push(entity_id);
199    }
200
201    /// Remove a specific entity_id from the graph label index (called on delete).
202    pub(crate) fn remove_from_graph_label_index(&self, collection: &str, entity_id: EntityId) {
203        let mut idx = self.graph_label_index.write();
204        for ((col, _), ids) in idx.iter_mut() {
205            if col == collection {
206                ids.retain(|&id| id != entity_id);
207            }
208        }
209        // Prune empty entries to keep the index compact
210        idx.retain(|_, ids| !ids.is_empty());
211    }
212
213    pub(crate) fn remove_from_graph_label_index_batch(
214        &self,
215        collection: &str,
216        entity_ids: &[EntityId],
217    ) {
218        if entity_ids.is_empty() {
219            return;
220        }
221        let id_set: std::collections::HashSet<EntityId> = entity_ids.iter().copied().collect();
222        let mut idx = self.graph_label_index.write();
223        for ((col, _), ids) in idx.iter_mut() {
224            if col == collection {
225                ids.retain(|id| !id_set.contains(id));
226            }
227        }
228        idx.retain(|_, ids| !ids.is_empty());
229    }
230
231    /// Look up entity IDs for a graph node label across all collections.
232    pub fn lookup_graph_nodes_by_label(&self, label: &str) -> Vec<EntityId> {
233        let idx = self.graph_label_index.read();
234        idx.iter()
235            .filter(|((_, l), _)| l == label)
236            .flat_map(|(_, ids)| ids.iter().copied())
237            .collect()
238    }
239
240    /// Look up entity IDs for a graph node label scoped to a single collection.
241    pub fn lookup_graph_nodes_by_label_in(&self, collection: &str, label: &str) -> Vec<EntityId> {
242        let idx = self.graph_label_index.read();
243        idx.get(&(collection.to_string(), label.to_string()))
244            .cloned()
245            .unwrap_or_default()
246    }
247
248    pub fn create_collection(&self, name: impl Into<String>) -> Result<(), StoreError> {
249        let name = name.into();
250        let mut collections = self.collections.write();
251
252        if collections.contains_key(&name) {
253            return Err(StoreError::CollectionExists(name));
254        }
255
256        let manager = SegmentManager::with_config(&name, self.config.manager_config.clone());
257        collections.insert(name.clone(), Arc::new(manager));
258        drop(collections);
259
260        if let Err(err) = self.publish_operational_collection_create(&name) {
261            let mut collections = self.collections.write();
262            collections.remove(&name);
263            return Err(err);
264        }
265
266        self.mark_paged_registry_dirty();
267        self.finish_paged_write([StoreWalAction::CreateCollection { name }])?;
268
269        Ok(())
270    }
271
272    /// Get or create a collection
273    pub fn get_or_create_collection(&self, name: impl Into<String>) -> Arc<SegmentManager> {
274        let name = name.into();
275        // Fast path: shared read lock — zero contention for existing collections
276        {
277            let collections = self.collections.read();
278            if let Some(manager) = collections.get(&name) {
279                return Arc::clone(manager);
280            }
281        }
282        // Slow path: exclusive write lock — only when collection is missing
283        let mut collections = self.collections.write();
284        // Double-check after acquiring write lock (another thread may have created it)
285        if let Some(manager) = collections.get(&name) {
286            return Arc::clone(manager);
287        }
288        let manager = Arc::new(SegmentManager::with_config(
289            &name,
290            self.config.manager_config.clone(),
291        ));
292        collections.insert(name, Arc::clone(&manager));
293        self.mark_paged_registry_dirty();
294        manager
295    }
296
297    /// Get a collection
298    pub fn get_collection(&self, name: &str) -> Option<Arc<SegmentManager>> {
299        self.collections.read().get(name).map(Arc::clone)
300    }
301
302    /// Get the context index for cross-structure search.
303    pub fn context_index(&self) -> &ContextIndex {
304        &self.context_index
305    }
306
307    /// Drain WAL-replayed `VectorInsert` records for `collection`
308    /// (issue #694). Returns `None` when nothing was captured at open
309    /// time (in-memory mode, fresh database, or non-turbo collection).
310    /// One-shot: the caller owns the records after this call, the
311    /// store's internal buffer is cleared for that collection.
312    pub fn take_replayed_turbo_inserts(&self, collection: &str) -> Option<Vec<(u64, Vec<f32>)>> {
313        let mut map = self.replayed_turbo_inserts.lock();
314        map.remove(collection)
315    }
316
317    /// Set multiple config KV pairs at once from a JSON tree.
318    /// Keys are flattened with dot-notation: `{"a":{"b":1}}` → `a.b = 1`.
319    pub fn set_config_tree(&self, prefix: &str, json: &crate::serde_json::Value) -> usize {
320        let _ = self.get_or_create_collection("red_config");
321        let mut pairs = Vec::new();
322        flatten_config_json(prefix, json, &mut pairs);
323        let mut saved = 0;
324        for (key, value) in pairs {
325            let entity = UnifiedEntity::new(
326                EntityId::new(0),
327                EntityKind::TableRow {
328                    table: Arc::from("red_config"),
329                    row_id: 0,
330                },
331                EntityData::Row(RowData {
332                    columns: Vec::new(),
333                    named: Some(
334                        [
335                            ("key".to_string(), crate::storage::schema::Value::text(key)),
336                            ("value".to_string(), value),
337                        ]
338                        .into_iter()
339                        .collect(),
340                    ),
341                    schema: None,
342                }),
343            );
344            if self.insert_auto("red_config", entity).is_ok() {
345                saved += 1;
346            }
347        }
348        saved
349    }
350
351    /// Read a single config value from `red_config` by dot-notation key.
352    pub fn get_config(&self, key: &str) -> Option<crate::storage::schema::Value> {
353        let manager = self.get_collection("red_config")?;
354        for entity in manager.query_all(|_| true) {
355            if let EntityData::Row(row) = &entity.data {
356                if let Some(named) = &row.named {
357                    let key_matches = named
358                        .get("key")
359                        .and_then(|v| match v {
360                            crate::storage::schema::Value::Text(s) => Some(s.as_ref() == key),
361                            _ => None,
362                        })
363                        .unwrap_or(false);
364                    if key_matches {
365                        return named.get("value").cloned();
366                    }
367                }
368            }
369        }
370        None
371    }
372
373    /// List all collections
374    pub fn list_collections(&self) -> Vec<String> {
375        self.collections.read().keys().cloned().collect()
376    }
377
378    /// Drop a collection
379    pub fn drop_collection(&self, name: &str) -> Result<(), StoreError> {
380        self.publish_operational_collection_pending_drop(name)?;
381        let manager = {
382            let mut collections = self.collections.write();
383
384            collections
385                .remove(name)
386                .ok_or_else(|| StoreError::CollectionNotFound(name.to_string()))?
387        };
388
389        let entities = manager.query_all(|_| true);
390        let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
391
392        for entity_id in &entity_ids {
393            self.context_index.remove_entity(*entity_id);
394            let _ = self.unindex_cross_refs(*entity_id);
395        }
396
397        self.btree_indices.write().remove(name);
398
399        self.entity_cache.retain(|entity_id, (collection, _)| {
400            collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
401        });
402
403        self.cross_refs.write().retain(|source_id, refs| {
404            refs.retain(|(target_id, _, target_collection)| {
405                target_collection != name && !entity_ids.iter().any(|id| id == target_id)
406            });
407            !entity_ids.iter().any(|id| id == source_id)
408        });
409
410        self.reverse_refs.write().retain(|target_id, refs| {
411            refs.retain(|(source_id, _, source_collection)| {
412                source_collection != name && !entity_ids.iter().any(|id| id == source_id)
413            });
414            !entity_ids.iter().any(|id| id == target_id)
415        });
416
417        self.mark_paged_registry_dirty();
418        self.finish_paged_write([StoreWalAction::DropCollection {
419            name: name.to_string(),
420        }])?;
421        self.publish_operational_collection_drop_finished(name)?;
422
423        Ok(())
424    }
425
426    /// Insert an entity into a collection
427    pub fn insert(&self, collection: &str, entity: UnifiedEntity) -> Result<EntityId, StoreError> {
428        let manager = self
429            .get_collection(collection)
430            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
431
432        let mut entity = entity;
433        if entity.id.raw() == 0 {
434            entity.id = self.next_entity_id();
435        } else {
436            self.register_entity_id(entity.id);
437        }
438        // Assign per-table sequential row_id if not set
439        if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
440            if *row_id == 0 {
441                *row_id = manager.next_row_id();
442            } else {
443                manager.register_row_id(*row_id);
444            }
445        }
446        entity.ensure_table_logical_id();
447        // Capture graph node label before entity is moved into the manager
448        let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
449        {
450            Some(node.label.clone())
451        } else {
452            None
453        };
454
455        let id = manager.insert(entity)?;
456        self.register_entity_id(id);
457
458        // Update graph label index for GraphNode entities
459        if let Some(ref label) = graph_node_label {
460            self.update_graph_label_index(collection, label, id);
461        }
462
463        // Also insert into B-tree index if pager is active
464        let mut registry_dirty = false;
465        if let Some(pager) = &self.pager {
466            if let Some(entity) = manager.get(id) {
467                let mut btree_indices = self.btree_indices.write();
468                let btree = btree_indices
469                    .entry(collection.to_string())
470                    .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
471                let root_before = btree.root_page_id();
472
473                let key = id.raw().to_be_bytes();
474                let metadata = manager.get_metadata(id);
475                let value = Self::serialize_entity_record(
476                    &entity,
477                    metadata.as_ref(),
478                    self.format_version(),
479                );
480                // Ignore duplicate key errors (update scenario)
481                let _ = btree.insert(&key, &value);
482                registry_dirty = root_before != btree.root_page_id();
483            }
484        }
485
486        // Index cross-references if enabled
487        if self.config.auto_index_refs {
488            if let Some(entity) = manager.get(id) {
489                self.index_cross_refs(&entity, collection)?;
490            }
491        }
492
493        // Perf: skip WAL-action construction when the store is
494        // pagerless. For in-memory benchmarks this saved another
495        // `manager.get(id)` + `serialize_entity_record` per call.
496        if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
497            let actions = manager
498                .get(id)
499                .map(|entity| {
500                    let metadata = manager.get_metadata(id);
501                    vec![StoreWalAction::upsert_entity(
502                        collection,
503                        &entity,
504                        metadata.as_ref(),
505                        self.format_version(),
506                    )]
507                })
508                .unwrap_or_default();
509            if registry_dirty {
510                self.mark_paged_registry_dirty();
511            }
512            self.finish_paged_write(actions)?;
513        }
514
515        Ok(id)
516    }
517
518    /// Turbo bulk insert — optimized fast path.
519    ///
520    /// Single lock for the entire batch. Skips bloom filter, memtable,
521    /// context index, and cross-ref indexing. B-tree writes are batched.
522    pub fn bulk_insert(
523        &self,
524        collection: &str,
525        mut entities: Vec<UnifiedEntity>,
526    ) -> Result<Vec<EntityId>, StoreError> {
527        // REDDB_BULK_TIMING=1 prints a per-call breakdown of the bulk
528        // insert path to stderr. Off by default — used by the reddb
529        // benchmark harness to locate ingest bottlenecks.
530        let trace = matches!(
531            std::env::var("REDDB_BULK_TIMING").ok().as_deref(),
532            Some("1") | Some("true") | Some("on")
533        );
534        let t_start = std::time::Instant::now();
535        let n = entities.len();
536        let manager = self.get_or_create_collection(collection);
537        let t_get_coll = t_start.elapsed();
538
539        // Assign IDs and per-table row_ids before serialization. Bulk
540        // insert must follow the same global ID semantics as
541        // insert()/insert_auto(). Reserve ID ranges in single fetch_add
542        // calls instead of one-per-entity. The overwhelming common case
543        // is "every entity comes in with id==0 and kind==TableRow with
544        // row_id==0" (the wire bulk insert path). Any entity that
545        // already carries an id or row_id falls back to the individual
546        // register_* path to preserve those semantics exactly.
547        let t0 = std::time::Instant::now();
548        let n_missing_entity_ids = entities.iter().filter(|e| e.id.raw() == 0).count() as u64;
549        let n_missing_row_ids = entities
550            .iter()
551            .filter(|e| matches!(e.kind, EntityKind::TableRow { row_id: 0, .. }))
552            .count() as u64;
553        let mut entity_id_range = if n_missing_entity_ids > 0 {
554            self.reserve_entity_ids(n_missing_entity_ids)
555        } else {
556            0..0
557        };
558        let mut row_id_range = if n_missing_row_ids > 0 {
559            manager.reserve_row_ids(n_missing_row_ids)
560        } else {
561            0..0
562        };
563        for entity in &mut entities {
564            if entity.id.raw() == 0 {
565                let next = entity_id_range
566                    .next()
567                    .expect("reserved entity-id range exhausted");
568                entity.id = EntityId::new(next);
569            } else {
570                self.register_entity_id(entity.id);
571            }
572            if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
573                if *row_id == 0 {
574                    *row_id = row_id_range
575                        .next()
576                        .expect("reserved row-id range exhausted");
577                } else {
578                    manager.register_row_id(*row_id);
579                }
580            }
581            entity.ensure_table_logical_id();
582        }
583        let t_assign_ids = t0.elapsed();
584
585        // Capture graph node labels before entities are moved into the segment manager
586        let graph_labels: Vec<Option<(String, EntityId)>> = entities
587            .iter()
588            .map(|e| {
589                if let EntityKind::GraphNode(ref node) = e.kind {
590                    Some((node.label.clone(), e.id))
591                } else {
592                    None
593                }
594            })
595            .collect();
596
597        // Pre-serialize for B-tree while we still have references.
598        // `serialize_entity_record` is pure; with `n >= 1024` the
599        // rayon dispatch cost is amortised and the 16-core serialize
600        // fan-out becomes the biggest single win on insert_bulk. For
601        // smaller batches, stay serial — micro-batches pay more in
602        // work-stealing overhead than they save.
603        let t0 = std::time::Instant::now();
604        let serialized: Option<Vec<(Vec<u8>, Vec<u8>)>> =
605            if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
606                let fv = self.format_version();
607                let serial_map = |e: &UnifiedEntity| {
608                    (
609                        e.id.raw().to_be_bytes().to_vec(),
610                        Self::serialize_entity_record(e, None, fv),
611                    )
612                };
613                // Gate chosen to match the bench's `BULK_BATCH_SIZE=1000`.
614                // Rayon's dispatch overhead is ~30µs/batch — on a 15-col
615                // row serializing for ~40µs the break-even is around
616                // 200-300 rows on a 16-core host. Keep a safety margin
617                // at 512.
618                if entities.len() >= 512 {
619                    use rayon::prelude::*;
620                    Some(entities.par_iter().map(serial_map).collect())
621                } else {
622                    Some(entities.iter().map(serial_map).collect())
623                }
624            } else {
625                None
626            };
627        let t_serialize = t0.elapsed();
628
629        // Move entities into segment
630        let t0 = std::time::Instant::now();
631        let ids = manager.bulk_insert(entities)?;
632        let t_manager = t0.elapsed();
633        for id in &ids {
634            self.register_entity_id(*id);
635        }
636
637        // Update graph label index for bulk-inserted GraphNode entities
638        for (label, entity_id) in graph_labels.iter().flatten() {
639            self.update_graph_label_index(collection, label, *entity_id);
640        }
641
642        // REDDB_BULK_SKIP_PERSIST_UNSAFE=1 skips the persistent B-tree index
643        // during bulk ingest.
644        //
645        // UNSAFE: for ephemeral benchmark containers ONLY.
646        // This flag is silently ignored when a pager (durable storage) is active.
647        // In persistent mode, bulk inserts ALWAYS write to the B-tree so the data
648        // survives a cold restart without any manual rebuild step.
649        //
650        // The flag is only honoured when self.pager is None (in-memory / ephemeral).
651        let skip_btree_requested = matches!(
652            std::env::var("REDDB_BULK_SKIP_PERSIST_UNSAFE")
653                .ok()
654                .as_deref(),
655            Some("1") | Some("true") | Some("on")
656        );
657        // Honour the flag only when there is no durable pager.
658        // If a pager exists we are in persistent mode → always persist.
659        let skip_btree = skip_btree_requested && self.pager.is_none();
660        if skip_btree_requested && !skip_btree {
661            // Flag was set but we ignored it because we have a real pager.
662            static IGNORED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
663            IGNORED.get_or_init(|| {
664                tracing::warn!(
665                    "REDDB_BULK_SKIP_PERSIST_UNSAFE set but durable pager is \
666                     active — flag ignored; bulk inserts will be persisted normally"
667                );
668            });
669        } else if skip_btree {
670            // Ephemeral mode and flag is active — warn once.
671            static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
672            WARNED.get_or_init(|| {
673                tracing::warn!(
674                    "REDDB_BULK_SKIP_PERSIST_UNSAFE set (ephemeral/no-pager mode) — \
675                     bulk inserts NOT durable; data will be lost on restart"
676                );
677            });
678        }
679
680        // Batch B-tree write from pre-serialized data.
681        // Uses sorted bulk insert: walks to a leaf once, appends many entries,
682        // writes each leaf exactly once per batch — O(N) instead of O(N²).
683        let mut t_btree_lock = std::time::Duration::ZERO;
684        let mut t_btree_insert = std::time::Duration::ZERO;
685        let mut t_flush = std::time::Duration::ZERO;
686        if !skip_btree {
687            if let (Some(pager), Some(batch)) = (&self.pager, serialized.as_ref()) {
688                let t0 = std::time::Instant::now();
689                let mut btree_indices = self.btree_indices.write();
690                let btree = btree_indices
691                    .entry(collection.to_string())
692                    .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
693                let root_before = btree.root_page_id();
694                t_btree_lock = t0.elapsed();
695
696                let t0 = std::time::Instant::now();
697                let _ = btree.bulk_insert_sorted(batch);
698                t_btree_insert = t0.elapsed();
699                let registry_dirty = root_before != btree.root_page_id();
700
701                let t0 = std::time::Instant::now();
702                if registry_dirty {
703                    self.mark_paged_registry_dirty();
704                }
705                t_flush = t0.elapsed();
706            }
707        }
708
709        // Fold 25k per-entity `UpsertEntityRecord` actions into one
710        // `BulkUpsertEntityRecords` — same on-disk semantics (replay
711        // applies each record by id), one `WalRecord::PageWrite`
712        // instead of N, and `record.clone()` drops out because we
713        // consume `serialized`.
714        let actions = serialized
715            .map(|batch| {
716                let records: Vec<Vec<u8>> =
717                    batch.into_iter().map(|(_key, record)| record).collect();
718                vec![StoreWalAction::BulkUpsertEntityRecords {
719                    collection: collection.to_string(),
720                    records,
721                }]
722            })
723            .unwrap_or_default();
724        self.finish_paged_write(actions)?;
725
726        if trace {
727            tracing::debug!(
728                n,
729                total = ?t_start.elapsed(),
730                get_coll = ?t_get_coll,
731                assign = ?t_assign_ids,
732                serialize = ?t_serialize,
733                manager = ?t_manager,
734                btree_lock = ?t_btree_lock,
735                btree = ?t_btree_insert,
736                flush = ?t_flush,
737                "bulk_insert timing"
738            );
739        }
740
741        Ok(ids)
742    }
743
744    /// Insert an entity, creating collection if needed
745    pub fn insert_auto(
746        &self,
747        collection: &str,
748        entity: UnifiedEntity,
749    ) -> Result<EntityId, StoreError> {
750        let manager = self.get_or_create_collection(collection);
751        let mut entity = entity;
752        if entity.id.raw() == 0 {
753            entity.id = self.next_entity_id();
754        } else {
755            self.register_entity_id(entity.id);
756        }
757        // Assign per-table sequential row_id if not set
758        if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
759            if *row_id == 0 {
760                *row_id = manager.next_row_id();
761            } else {
762                manager.register_row_id(*row_id);
763            }
764        }
765        entity.ensure_table_logical_id();
766
767        // Capture graph node label before entity is moved into the manager
768        let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
769        {
770            Some(node.label.clone())
771        } else {
772            None
773        };
774        // Index into context index before consuming the entity
775        self.context_index.index_entity(collection, &entity);
776
777        // Pre-serialize the entity record (B-tree image == WAL action body)
778        // and index cross-refs while we still own the entity. Mirrors the
779        // bulk_insert path (see line 559-580 above): segment.insert assigns
780        // a fresh `sequence_id`, but the bulk path already accepts a
781        // sequence_id=0 on-disk image (replay calls manager.insert which
782        // re-assigns sequence_id from the segment's allocator), so the
783        // single-row path can do the same. Fresh inserts never carry
784        // metadata — set_metadata is a separate call — so we pass None,
785        // matching how the bulk path serializes records.
786        //
787        // Eliminates the post-insert `manager.get(id)` clone of the just-
788        // inserted UnifiedEntity (HashMap<String, Value> + per-field
789        // Strings). Per `docs/perf/insert_sequential-2026-05-05.md` P2 this
790        // was the dominant clone in the insert_sequential hot path.
791        let id_for_serialize = entity.id;
792        let serialized_record: Option<Vec<u8>> =
793            if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
794                Some(Self::serialize_entity_record(
795                    &entity,
796                    None,
797                    self.format_version(),
798                ))
799            } else {
800                None
801            };
802        if self.config.auto_index_refs {
803            self.index_cross_refs(&entity, collection)?;
804        }
805
806        let id = manager.insert(entity)?;
807        debug_assert_eq!(id, id_for_serialize);
808        // `register_entity_id` already advances the atomic counter on
809        // the allocation path above (`self.next_entity_id()`), so the
810        // second call here is a no-op CAS loop on the hot path. Only
811        // needed for the caller-supplied-id branch which happens via
812        // the `register_entity_id` call on line 573.
813
814        // Update graph label index for GraphNode entities
815        if let Some(ref label) = graph_node_label {
816            self.update_graph_label_index(collection, label, id);
817        }
818
819        let mut registry_dirty = false;
820        if let (Some(_pager), Some(record)) = (&self.pager, serialized_record.as_ref()) {
821            if let Some(btree) = self.get_or_create_btree(collection) {
822                let root_before = btree.root_page_id();
823
824                let key = id.raw().to_be_bytes();
825                btree.insert(&key, record).map_err(|e| {
826                    StoreError::Io(std::io::Error::other(format!(
827                        "B-tree insert error while inserting '{collection}'/{id}: {e}"
828                    )))
829                })?;
830                registry_dirty = root_before != btree.root_page_id();
831            }
832        }
833
834        // Perf: pagerless → skip WAL-action construction (saves a
835        // third manager.get + entity serialize per insert). For
836        // in-memory runtimes finish_paged_write is a no-op.
837        if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
838            let actions = serialized_record
839                .map(|record| {
840                    vec![StoreWalAction::UpsertEntityRecord {
841                        collection: collection.to_string(),
842                        record,
843                    }]
844                })
845                .unwrap_or_default();
846            if registry_dirty {
847                self.mark_paged_registry_dirty();
848            }
849            self.finish_paged_write(actions)?;
850        }
851        Ok(id)
852    }
853
854    /// Get an entity from a collection
855    ///
856    /// Prefers the live SegmentManager view so reads after update/delete observe
857    /// the current in-memory state even when the paged B-tree image has not been
858    /// refreshed yet. Falls back to the B-tree image for recovery-oriented reads.
859    pub fn get(&self, collection: &str, id: EntityId) -> Option<UnifiedEntity> {
860        // Prefer the live manager state to avoid stale reads after manager.update().
861        if let Some(entity) = self
862            .get_collection(collection)
863            .and_then(|manager| manager.get(id))
864        {
865            return Some(entity);
866        }
867
868        // Fall back to the paged B-tree image if the manager does not currently hold the row.
869        if self.pager.is_some() {
870            let btree_indices = self.btree_indices.read();
871            if let Some(btree) = btree_indices.get(collection) {
872                let key = id.raw().to_be_bytes();
873                if let Ok(Some(value)) = btree.get(&key) {
874                    if let Ok((entity, _)) =
875                        Self::deserialize_entity_record(&value, self.format_version())
876                    {
877                        return Some(entity);
878                    }
879                }
880            }
881        }
882
883        None
884    }
885
886    /// Resolve a table row by stable logical identity.
887    ///
888    /// Legacy rows use `logical_id == id`, so the physical-id lookup remains
889    /// the fast path. Versioned rows may have a different physical id and fall
890    /// back to a scan until the history-store/index slices add a dedicated map.
891    pub fn get_table_row_by_logical_id(
892        &self,
893        collection: &str,
894        logical_id: EntityId,
895    ) -> Option<UnifiedEntity> {
896        if let Some(entity) = self.get(collection, logical_id) {
897            if matches!(entity.kind, EntityKind::TableRow { .. })
898                && entity.logical_id() == logical_id
899                && entity.xmax == 0
900            {
901                return Some(entity);
902            }
903        }
904
905        let manager = self.get_collection(collection)?;
906        let mut matches = manager.query_all(|entity| {
907            matches!(entity.kind, EntityKind::TableRow { .. }) && entity.logical_id() == logical_id
908        });
909        matches
910            .iter()
911            .find(|entity| entity.xmax == 0)
912            .cloned()
913            .or_else(|| matches.pop())
914    }
915
916    pub fn table_row_versions_by_logical_id(
917        &self,
918        collection: &str,
919        logical_id: EntityId,
920    ) -> Vec<UnifiedEntity> {
921        self.get_collection(collection)
922            .map(|manager| {
923                manager.query_all(|entity| {
924                    matches!(entity.kind, EntityKind::TableRow { .. })
925                        && entity.logical_id() == logical_id
926                })
927            })
928            .unwrap_or_default()
929    }
930
931    pub fn vacuum_mvcc_history(
932        &self,
933        collection: &str,
934        cutoff_xid: u64,
935    ) -> Result<MvccVacuumStats, StoreError> {
936        let manager = self
937            .get_collection(collection)
938            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
939        let entities =
940            manager.query_all(|entity| matches!(entity.kind, EntityKind::TableRow { .. }));
941        let mut logical = HashMap::<EntityId, (bool, u64)>::new();
942        for entity in &entities {
943            let entry = logical.entry(entity.logical_id()).or_insert((false, 0));
944            if entity.xmax == 0 {
945                entry.0 = true;
946            }
947            entry.1 = entry.1.max(entity.xmin);
948        }
949
950        let mut stats = MvccVacuumStats::default();
951        let mut reclaim_ids = Vec::new();
952        for entity in entities {
953            if entity.xmax == 0 {
954                continue;
955            }
956            stats.scanned_versions += 1;
957            let (has_live_version, max_xmin) = logical
958                .get(&entity.logical_id())
959                .copied()
960                .unwrap_or((false, entity.xmin));
961            let is_delete_tombstone = !has_live_version && entity.xmin == max_xmin;
962            if entity.xmax < cutoff_xid {
963                stats.reclaimed_versions += 1;
964                if is_delete_tombstone {
965                    stats.reclaimed_tombstones += 1;
966                } else {
967                    stats.reclaimed_history_versions += 1;
968                }
969                reclaim_ids.push(entity.id);
970            } else {
971                stats.retained_versions += 1;
972                if is_delete_tombstone {
973                    stats.retained_tombstones += 1;
974                } else {
975                    stats.retained_history_versions += 1;
976                }
977            }
978        }
979
980        if !reclaim_ids.is_empty() {
981            self.delete_batch(collection, &reclaim_ids)?;
982        }
983        Ok(stats)
984    }
985
986    pub(crate) fn install_versioned_table_row_update(
987        &self,
988        collection: &str,
989        old_version: UnifiedEntity,
990        mut new_version: UnifiedEntity,
991        metadata: Option<&Metadata>,
992    ) -> Result<(), StoreError> {
993        let manager = self
994            .get_collection(collection)
995            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
996
997        let old_id = old_version.id;
998        let new_id = new_version.id;
999        let inherited_metadata = metadata.cloned().or_else(|| manager.get_metadata(old_id));
1000
1001        self.entity_cache.remove(old_id.raw());
1002        self.entity_cache.remove(new_id.raw());
1003        manager.update(old_version.clone())?;
1004        self.context_index.remove_entity(old_id);
1005
1006        self.register_entity_id(new_version.id);
1007        if let EntityKind::TableRow { ref mut row_id, .. } = new_version.kind {
1008            if *row_id == 0 {
1009                *row_id = manager.next_row_id();
1010            } else {
1011                manager.register_row_id(*row_id);
1012            }
1013        }
1014        new_version.ensure_table_logical_id();
1015        manager.insert(new_version.clone())?;
1016        if let Some(metadata) = inherited_metadata {
1017            manager.set_metadata(new_id, metadata)?;
1018        }
1019        self.context_index.index_entity(collection, &new_version);
1020        if self.config.auto_index_refs {
1021            self.index_cross_refs(&new_version, collection)?;
1022        }
1023
1024        let old_metadata = manager.get_metadata(old_id);
1025        let new_metadata = manager.get_metadata(new_id);
1026        let fv = self.format_version();
1027        let records = vec![
1028            Self::serialize_entity_record(&old_version, old_metadata.as_ref(), fv),
1029            Self::serialize_entity_record(&new_version, new_metadata.as_ref(), fv),
1030        ];
1031        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
1032            collection: collection.to_string(),
1033            records,
1034        }])?;
1035
1036        Ok(())
1037    }
1038
1039    /// Batch-fetch multiple entities from the same collection in minimal lock acquisitions.
1040    ///
1041    /// Preferred over N individual `get()` calls in indexed-scan loops (sorted index,
1042    /// bitmap, hash). Reduces lock acquisitions from N×3 to 2-3 total.
1043    /// Preserves input order: `result[i]` corresponds to `ids[i]`.
1044    pub fn get_batch(&self, collection: &str, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1045        match self.get_collection(collection) {
1046            Some(manager) => manager.get_many(ids),
1047            None => vec![None; ids.len()],
1048        }
1049    }
1050
1051    /// Get an entity from any collection
1052    pub fn get_any(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1053        // Sharded LRU probe: hits / misses are counted by `EntityCache::get`,
1054        // so external observers can read `entity_cache_hit_rate()` without
1055        // any extra wiring on the call site.
1056        if let Some(cached) = self.entity_cache.get(id.raw()) {
1057            return Some(cached);
1058        }
1059
1060        // Cache miss — fall back to the full collection scan, then memoise.
1061        let collections = self.collections.read();
1062        for (name, manager) in collections.iter() {
1063            if let Some(entity) = manager.get(id) {
1064                let result = (name.clone(), entity);
1065                // Drop the collections read guard before taking any cache
1066                // lock — the cache shards are independent, but releasing
1067                // here keeps `collections` contention low.
1068                drop(collections);
1069                self.entity_cache.insert(id.raw(), result.clone());
1070                return Some(result);
1071            }
1072        }
1073        None
1074    }
1075
1076    /// Hit rate of the store's entity cache (`get_any` lookups).
1077    ///
1078    /// Returns `None` until the cache has served at least one lookup.
1079    /// Exposed for observability — e.g. dashboards distinguishing graph
1080    /// workloads (high hit rate) from OLTP DML (≈ 0 % hit rate).
1081    pub fn entity_cache_hit_rate(&self) -> Option<f64> {
1082        self.entity_cache.hit_rate()
1083    }
1084
1085    /// Snapshot of cache hit / miss / eviction counters and current size.
1086    pub fn entity_cache_stats(&self) -> super::super::entity_cache::EntityCacheStats {
1087        self.entity_cache.stats()
1088    }
1089
1090    /// Delete an entity
1091    pub fn delete(&self, collection: &str, id: EntityId) -> Result<bool, StoreError> {
1092        // Invalidate entity cache (read-lock probe; no write lock when cold).
1093        self.entity_cache.remove(id.raw());
1094        let manager = self
1095            .get_collection(collection)
1096            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1097
1098        let deleted = manager.delete(id)?;
1099        if !deleted {
1100            return Ok(false);
1101        }
1102
1103        // Remove from B-tree index if active
1104        let mut registry_dirty = false;
1105        if self.pager.is_some() {
1106            let btree_indices = self.btree_indices.read();
1107            if let Some(btree) = btree_indices.get(collection) {
1108                let root_before = btree.root_page_id();
1109                let key = id.raw().to_be_bytes();
1110                let _ = btree.delete(&key);
1111                registry_dirty = root_before != btree.root_page_id();
1112            }
1113        }
1114
1115        // Remove cross-references
1116        self.unindex_cross_refs(id)?;
1117
1118        // Remove from graph label index
1119        self.remove_from_graph_label_index(collection, id);
1120
1121        if registry_dirty {
1122            self.mark_paged_registry_dirty();
1123        }
1124        self.finish_paged_write([StoreWalAction::DeleteEntityRecord {
1125            collection: collection.to_string(),
1126            entity_id: id.raw(),
1127        }])?;
1128
1129        Ok(true)
1130    }
1131
1132    pub fn delete_batch(
1133        &self,
1134        collection: &str,
1135        ids: &[EntityId],
1136    ) -> Result<Vec<EntityId>, StoreError> {
1137        if ids.is_empty() {
1138            return Ok(Vec::new());
1139        }
1140
1141        // Sharded invalidation. The bounded LRU's shard write lock is
1142        // only acquired when the shard actually carries one of these ids,
1143        // so the bench-typical zero-hit `delete_sequential` workload never
1144        // takes a write lock here at all.
1145        self.entity_cache.remove_many(ids.iter().map(|id| id.raw()));
1146
1147        let manager = self
1148            .get_collection(collection)
1149            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1150
1151        let deleted_ids = manager.delete_batch(ids)?;
1152        if deleted_ids.is_empty() {
1153            return Ok(deleted_ids);
1154        }
1155
1156        let mut registry_dirty = false;
1157        if self.pager.is_some() {
1158            let btree_indices = self.btree_indices.read();
1159            if let Some(btree) = btree_indices.get(collection) {
1160                let root_before = btree.root_page_id();
1161                for id in &deleted_ids {
1162                    let key = id.raw().to_be_bytes();
1163                    let _ = btree.delete(&key);
1164                }
1165                registry_dirty = root_before != btree.root_page_id();
1166            }
1167        }
1168
1169        self.unindex_cross_refs_batch(&deleted_ids)?;
1170        self.remove_from_graph_label_index_batch(collection, &deleted_ids);
1171        if registry_dirty {
1172            self.mark_paged_registry_dirty();
1173        }
1174        let actions = deleted_ids
1175            .iter()
1176            .map(|id| StoreWalAction::DeleteEntityRecord {
1177                collection: collection.to_string(),
1178                entity_id: id.raw(),
1179            })
1180            .collect::<Vec<_>>();
1181        self.finish_paged_write(actions)?;
1182
1183        Ok(deleted_ids)
1184    }
1185
1186    /// Set metadata for an entity
1187    pub fn set_metadata(
1188        &self,
1189        collection: &str,
1190        id: EntityId,
1191        metadata: Metadata,
1192    ) -> Result<(), StoreError> {
1193        let manager = self
1194            .get_collection(collection)
1195            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1196
1197        manager.set_metadata(id, metadata)?;
1198        if let Some(entity) = manager.get(id) {
1199            self.persist_entities_to_pager(collection, std::slice::from_ref(&entity))?;
1200        }
1201        Ok(())
1202    }
1203
1204    /// Get metadata for an entity
1205    pub fn get_metadata(&self, collection: &str, id: EntityId) -> Option<Metadata> {
1206        self.get_collection(collection)?.get_metadata(id)
1207    }
1208
1209    /// Add a cross-reference between entities
1210    pub fn add_cross_ref(
1211        &self,
1212        source_collection: &str,
1213        source_id: EntityId,
1214        target_collection: &str,
1215        target_id: EntityId,
1216        ref_type: RefType,
1217        weight: f32,
1218    ) -> Result<(), StoreError> {
1219        // Check source exists
1220        let source_manager = self
1221            .get_collection(source_collection)
1222            .ok_or_else(|| StoreError::CollectionNotFound(source_collection.to_string()))?;
1223
1224        if source_manager.get(source_id).is_none() {
1225            return Err(StoreError::EntityNotFound(source_id));
1226        }
1227
1228        // Check target exists
1229        let target_manager = self
1230            .get_collection(target_collection)
1231            .ok_or_else(|| StoreError::CollectionNotFound(target_collection.to_string()))?;
1232
1233        if target_manager.get(target_id).is_none() {
1234            return Err(StoreError::EntityNotFound(target_id));
1235        }
1236
1237        // Check limits
1238        let current_refs = self
1239            .cross_refs
1240            .read()
1241            .get(&source_id)
1242            .map_or(0, |v| v.len());
1243
1244        if current_refs >= self.config.max_cross_refs {
1245            return Err(StoreError::TooManyRefs(source_id));
1246        }
1247
1248        let mut registry_dirty = false;
1249        {
1250            let mut forward = self.cross_refs.write();
1251            let refs = forward.entry(source_id).or_default();
1252            let inserted = !refs.iter().any(|(id, kind, coll)| {
1253                *id == target_id && *kind == ref_type && coll == target_collection
1254            });
1255            if inserted {
1256                refs.push((target_id, ref_type, target_collection.to_string()));
1257                registry_dirty = true;
1258            }
1259        }
1260
1261        {
1262            let mut reverse = self.reverse_refs.write();
1263            let refs = reverse.entry(target_id).or_default();
1264            let inserted = !refs.iter().any(|(id, kind, coll)| {
1265                *id == source_id && *kind == ref_type && coll == source_collection
1266            });
1267            if inserted {
1268                refs.push((source_id, ref_type, source_collection.to_string()));
1269                registry_dirty = true;
1270            }
1271        }
1272
1273        if let Some(mut entity) = source_manager.get(source_id) {
1274            if !entity.cross_refs().iter().any(|xref| {
1275                xref.target == target_id
1276                    && xref.ref_type == ref_type
1277                    && xref.target_collection == target_collection
1278            }) {
1279                let cross_ref = CrossRef::with_weight(
1280                    source_id,
1281                    target_id,
1282                    target_collection,
1283                    ref_type,
1284                    weight,
1285                );
1286                entity.add_cross_ref(cross_ref);
1287                let _ = source_manager.update(entity.clone());
1288                registry_dirty = true;
1289                self.persist_entities_to_pager(source_collection, std::slice::from_ref(&entity))?;
1290            }
1291        }
1292
1293        if registry_dirty {
1294            self.mark_paged_registry_dirty();
1295            if matches!(
1296                self.config.durability_mode,
1297                crate::api::DurabilityMode::Strict
1298            ) {
1299                self.flush_paged_state()?;
1300            }
1301        }
1302
1303        Ok(())
1304    }
1305
1306    /// Get cross-references from an entity
1307    pub fn get_refs_from(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1308        self.cross_refs.read().get(&id).cloned().unwrap_or_default()
1309    }
1310
1311    /// Get cross-references to an entity
1312    pub fn get_refs_to(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1313        self.reverse_refs
1314            .read()
1315            .get(&id)
1316            .cloned()
1317            .unwrap_or_default()
1318    }
1319
1320    /// Expand cross-references to get related entities
1321    pub fn expand_refs(
1322        &self,
1323        id: EntityId,
1324        depth: u32,
1325        ref_types: Option<&[RefType]>,
1326    ) -> Vec<(UnifiedEntity, u32, RefType)> {
1327        let mut results = Vec::new();
1328        let mut visited = std::collections::HashSet::new();
1329        visited.insert(id);
1330
1331        self.expand_refs_recursive(id, depth, ref_types, &mut visited, &mut results, 1);
1332
1333        results
1334    }
1335
1336    fn expand_refs_recursive(
1337        &self,
1338        id: EntityId,
1339        max_depth: u32,
1340        ref_types: Option<&[RefType]>,
1341        visited: &mut std::collections::HashSet<EntityId>,
1342        results: &mut Vec<(UnifiedEntity, u32, RefType)>,
1343        current_depth: u32,
1344    ) {
1345        if current_depth > max_depth {
1346            return;
1347        }
1348
1349        for (target_id, ref_type, target_collection) in self.get_refs_from(id) {
1350            if visited.contains(&target_id) {
1351                continue;
1352            }
1353
1354            if let Some(types) = ref_types {
1355                if !types.contains(&ref_type) {
1356                    continue;
1357                }
1358            }
1359
1360            visited.insert(target_id);
1361
1362            if let Some(entity) = self.get(&target_collection, target_id) {
1363                results.push((entity, current_depth, ref_type));
1364
1365                // Recurse
1366                self.expand_refs_recursive(
1367                    target_id,
1368                    max_depth,
1369                    ref_types,
1370                    visited,
1371                    results,
1372                    current_depth + 1,
1373                );
1374            }
1375        }
1376    }
1377
1378    /// Index cross-references from an entity
1379    pub(crate) fn index_cross_refs(
1380        &self,
1381        entity: &UnifiedEntity,
1382        collection: &str,
1383    ) -> Result<(), StoreError> {
1384        let mut registry_dirty = false;
1385        for cross_ref in entity.cross_refs() {
1386            if cross_ref.target_collection.is_empty() {
1387                continue;
1388            }
1389            {
1390                let mut forward = self.cross_refs.write();
1391                let refs = forward.entry(cross_ref.source).or_default();
1392                let inserted = !refs.iter().any(|(id, kind, coll)| {
1393                    *id == cross_ref.target
1394                        && *kind == cross_ref.ref_type
1395                        && coll == &cross_ref.target_collection
1396                });
1397                if inserted {
1398                    refs.push((
1399                        cross_ref.target,
1400                        cross_ref.ref_type,
1401                        cross_ref.target_collection.clone(),
1402                    ));
1403                    registry_dirty = true;
1404                }
1405            }
1406
1407            {
1408                let mut reverse = self.reverse_refs.write();
1409                let refs = reverse.entry(cross_ref.target).or_default();
1410                let inserted = !refs.iter().any(|(id, kind, coll)| {
1411                    *id == cross_ref.source && *kind == cross_ref.ref_type && coll == collection
1412                });
1413                if inserted {
1414                    refs.push((cross_ref.source, cross_ref.ref_type, collection.to_string()));
1415                    registry_dirty = true;
1416                }
1417            }
1418        }
1419
1420        if registry_dirty {
1421            self.mark_paged_registry_dirty();
1422        }
1423
1424        Ok(())
1425    }
1426
1427    /// Remove cross-references for an entity
1428    pub(crate) fn unindex_cross_refs(&self, id: EntityId) -> Result<(), StoreError> {
1429        // Remove forward refs
1430        self.cross_refs.write().remove(&id);
1431
1432        // Remove from reverse refs (scan all)
1433        let mut reverse = self.reverse_refs.write();
1434        for refs in reverse.values_mut() {
1435            refs.retain(|(source, _, _)| *source != id);
1436        }
1437        reverse.remove(&id);
1438        self.mark_paged_registry_dirty();
1439
1440        Ok(())
1441    }
1442
1443    pub(crate) fn unindex_cross_refs_batch(&self, ids: &[EntityId]) -> Result<(), StoreError> {
1444        if ids.is_empty() {
1445            return Ok(());
1446        }
1447
1448        let id_set: std::collections::HashSet<EntityId> = ids.iter().copied().collect();
1449
1450        // Read-only probe: under steady-state delete workloads most rows have
1451        // no cross-refs at all, so both maps are entirely cold for the batch.
1452        // Acquiring the write lock to scan `reverse_refs.values_mut()` over an
1453        // unrelated dictionary was the dominant cost in #85's `delete_sequential`
1454        // profile. The read-lock probe below is O(|ids|) hashmap lookups against
1455        // each map, vs. O(|reverse_refs|) for the write-path scan it replaces.
1456        //
1457        // Decision: skip the write path iff
1458        //   - no deleted id is a key in `cross_refs`  (no outbound refs to drop,
1459        //     and so no `(deleted_id, _, _)` source entry exists anywhere in
1460        //     `reverse_refs.values()` either — the two maps are kept in sync by
1461        //     `add_cross_ref`), AND
1462        //   - no deleted id is a key in `reverse_refs` (no inbound refs to drop).
1463        let needs_forward_cleanup = {
1464            let forward = self.cross_refs.read();
1465            id_set.iter().any(|id| forward.contains_key(id))
1466        };
1467        let needs_reverse_cleanup = {
1468            let reverse = self.reverse_refs.read();
1469            id_set.iter().any(|id| reverse.contains_key(id))
1470        };
1471
1472        if !needs_forward_cleanup && !needs_reverse_cleanup {
1473            self.unindex_cross_refs_fast_path
1474                .fetch_add(1, Ordering::Relaxed);
1475            return Ok(());
1476        }
1477
1478        if needs_forward_cleanup {
1479            let mut forward = self.cross_refs.write();
1480            for id in &id_set {
1481                forward.remove(id);
1482            }
1483        }
1484
1485        if needs_reverse_cleanup || needs_forward_cleanup {
1486            // The inner `values_mut()` scan is only needed when at least one
1487            // deleted id was a *source* somewhere — which, by the invariant
1488            // above, can only happen when it had outbound forward refs.
1489            let mut reverse = self.reverse_refs.write();
1490            if needs_forward_cleanup {
1491                for refs in reverse.values_mut() {
1492                    refs.retain(|(source, _, _)| !id_set.contains(source));
1493                }
1494            }
1495            reverse.retain(|target, refs| !id_set.contains(target) && !refs.is_empty());
1496        }
1497        self.mark_paged_registry_dirty();
1498
1499        Ok(())
1500    }
1501
1502    /// Test/observability hook: number of times `unindex_cross_refs_batch`
1503    /// took the read-only fast path. Used to pin the early-exit in tests
1504    /// and as a cheap signal in delete-heavy benchmarks.
1505    pub fn unindex_cross_refs_fast_path_hits(&self) -> u64 {
1506        self.unindex_cross_refs_fast_path.load(Ordering::Relaxed)
1507    }
1508
1509    /// Query across all collections with a filter
1510    pub fn query_all<F>(&self, filter: F) -> Vec<(String, UnifiedEntity)>
1511    where
1512        F: Fn(&UnifiedEntity) -> bool + Clone + Send + Sync,
1513    {
1514        // Issue #815 — snapshot the (name, manager) handles under a brief
1515        // read lock, then release the `collections` lock *before* the
1516        // potentially long per-collection scan. Holding `collections.read()`
1517        // across a full-store scan let a large `/catalog` snapshot block (and
1518        // be blocked by) the replica apply loop's collection-creation writes:
1519        // under parking_lot a writer waiting behind the long read parks every
1520        // subsequent reader, so `/catalog` and readiness wedged during a big
1521        // WAL re-apply. The managers are `Arc`-shared with their own internal
1522        // locks, so scanning them off the map lock is safe and keeps the HTTP
1523        // surface responsive.
1524        let pairs: Vec<(String, Arc<SegmentManager>)> = {
1525            let collections = self.collections.read();
1526            collections
1527                .iter()
1528                .map(|(name, mgr)| (name.clone(), Arc::clone(mgr)))
1529                .collect()
1530        };
1531
1532        let use_parallel = pairs.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1533        if !use_parallel {
1534            // Single collection — no parallelism overhead
1535            return pairs
1536                .into_iter()
1537                .flat_map(|(name, mgr)| {
1538                    mgr.query_all(filter.clone())
1539                        .into_iter()
1540                        .map(move |e| (name.clone(), e))
1541                })
1542                .collect();
1543        }
1544
1545        // Multiple collections — scan in parallel
1546        let filter_ref = &filter;
1547        let collection_results: Vec<Vec<(String, UnifiedEntity)>> = std::thread::scope(|s| {
1548            pairs
1549                .iter()
1550                .map(|(name, manager)| {
1551                    let name = (*name).clone();
1552                    s.spawn(move || {
1553                        manager
1554                            .query_all(|e| filter_ref(e))
1555                            .into_iter()
1556                            .map(|e| (name.clone(), e))
1557                            .collect::<Vec<_>>()
1558                    })
1559                })
1560                .collect::<Vec<_>>()
1561                .into_iter()
1562                .map(|h| h.join().unwrap_or_default())
1563                .collect()
1564        });
1565
1566        collection_results.into_iter().flatten().collect()
1567    }
1568
1569    /// Filter by metadata across all collections
1570    pub fn filter_metadata_all(
1571        &self,
1572        filters: &[(String, MetadataFilter)],
1573    ) -> Vec<(String, EntityId)> {
1574        let mut results = Vec::new();
1575        let collections = self.collections.read();
1576
1577        for (name, manager) in collections.iter() {
1578            for id in manager.filter_metadata(filters) {
1579                results.push((name.clone(), id));
1580            }
1581        }
1582
1583        results
1584    }
1585
1586    /// Get statistics
1587    pub fn stats(&self) -> StoreStats {
1588        // Issue #815 — snapshot the manager handles, drop the `collections`
1589        // read lock, then gather per-collection stats off the map lock. The
1590        // readiness probe (via `health()`) walks `stats()`, so holding the
1591        // store lock across a full-store stats scan was part of the same
1592        // wedge that froze `/catalog` during a large replica re-apply.
1593        let pairs: Vec<(String, Arc<SegmentManager>)> = {
1594            let collections = self.collections.read();
1595            collections
1596                .iter()
1597                .map(|(name, mgr)| (name.clone(), Arc::clone(mgr)))
1598                .collect()
1599        };
1600
1601        let mut stats = StoreStats {
1602            collection_count: pairs.len(),
1603            ..Default::default()
1604        };
1605
1606        for (name, manager) in &pairs {
1607            let manager_stats = manager.stats();
1608            stats.total_entities += manager_stats.total_entities;
1609            stats.total_memory_bytes += manager_stats.total_memory_bytes;
1610            stats.collections.insert(name.clone(), manager_stats);
1611        }
1612
1613        stats
1614    }
1615
1616    /// Run maintenance on all collections
1617    pub fn run_maintenance(&self) -> Result<(), StoreError> {
1618        let collections = self.collections.read();
1619        for manager in collections.values() {
1620            manager.run_maintenance()?;
1621        }
1622        Ok(())
1623    }
1624}
1625
1626/// Flatten a JSON value into dot-notation key-value pairs for red_config.
1627fn flatten_config_json(
1628    prefix: &str,
1629    value: &crate::serde_json::Value,
1630    out: &mut Vec<(String, crate::storage::schema::Value)>,
1631) {
1632    use crate::storage::schema::Value;
1633    match value {
1634        crate::serde_json::Value::Object(map) => {
1635            for (k, v) in map {
1636                let key = if prefix.is_empty() {
1637                    k.clone()
1638                } else {
1639                    format!("{prefix}.{k}")
1640                };
1641                flatten_config_json(&key, v, out);
1642            }
1643        }
1644        crate::serde_json::Value::String(s) => {
1645            out.push((prefix.to_string(), Value::text(s.clone())));
1646        }
1647        crate::serde_json::Value::Number(n) => {
1648            if n.fract().abs() < f64::EPSILON {
1649                out.push((prefix.to_string(), Value::UnsignedInteger(*n as u64)));
1650            } else {
1651                out.push((prefix.to_string(), Value::Float(*n)));
1652            }
1653        }
1654        crate::serde_json::Value::Bool(b) => {
1655            out.push((prefix.to_string(), Value::Boolean(*b)));
1656        }
1657        crate::serde_json::Value::Null => {
1658            out.push((prefix.to_string(), Value::Null));
1659        }
1660        crate::serde_json::Value::Array(arr) => {
1661            let json_str = crate::serde_json::to_string(value).unwrap_or_default();
1662            out.push((prefix.to_string(), Value::text(json_str)));
1663        }
1664    }
1665}