Skip to main content

reddb_server/storage/unified/store/
impl_entities.rs

1use super::*;
2
3impl UnifiedStore {
4    pub(crate) fn persist_entities_to_pager(
5        &self,
6        collection: &str,
7        entities: &[UnifiedEntity],
8    ) -> Result<(), StoreError> {
9        self.persist_entities_impl(collection, entities, /* skip_btree= */ false)
10    }
11
12    /// Same as `persist_entities_to_pager` but takes `&[&UnifiedEntity]` so
13    /// callers that already hold references (SQL UPDATE fast path) don't
14    /// have to clone a `Vec<UnifiedEntity>` just to satisfy the signature.
15    pub(crate) fn persist_entity_refs_to_pager(
16        &self,
17        collection: &str,
18        entities: &[&UnifiedEntity],
19    ) -> Result<(), StoreError> {
20        self.persist_entity_refs_impl(collection, entities, /* skip_btree= */ false)
21    }
22
23    /// Reference-taking WAL-only variant — see
24    /// `persist_entities_to_pager_wal_only` for semantics.
25    pub(crate) fn persist_entity_refs_to_pager_wal_only(
26        &self,
27        collection: &str,
28        entities: &[&UnifiedEntity],
29    ) -> Result<(), StoreError> {
30        self.persist_entity_refs_impl(collection, entities, /* skip_btree= */ true)
31    }
32
33    fn persist_entity_refs_impl(
34        &self,
35        collection: &str,
36        entities: &[&UnifiedEntity],
37        skip_btree: bool,
38    ) -> Result<(), StoreError> {
39        if entities.is_empty() {
40            return Ok(());
41        }
42        if self.pager.is_none() && self.config.embedded_wal_path.is_none() {
43            return Ok(());
44        }
45        let fv = self.format_version();
46        let manager = self
47            .get_collection(collection)
48            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
49        let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
50            .iter()
51            .map(|entity| {
52                let metadata = manager.get_metadata(entity.id);
53                (
54                    entity.id.raw().to_be_bytes().to_vec(),
55                    Self::serialize_entity_record(entity, metadata.as_ref(), fv),
56                )
57            })
58            .collect();
59        serialized.sort_by(|a, b| a.0.cmp(&b.0));
60
61        if !skip_btree {
62            let Some(pager) = &self.pager else {
63                let records: Vec<Vec<u8>> =
64                    serialized.into_iter().map(|(_id, record)| record).collect();
65                self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
66                    collection: collection.to_string(),
67                    records,
68                }])?;
69                return Ok(());
70            };
71            let mut btree_indices = self.btree_indices.write();
72            let btree = btree_indices
73                .entry(collection.to_string())
74                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
75            let root_before = btree.root_page_id();
76            btree.upsert_batch_sorted(&serialized).map_err(|e| {
77                StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
78            })?;
79            let root_after = btree.root_page_id();
80            drop(btree_indices);
81            if root_before != root_after {
82                self.mark_paged_registry_dirty();
83            }
84        }
85        let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
86        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
87            collection: collection.to_string(),
88            records,
89        }])?;
90        Ok(())
91    }
92
93    /// PG-HOT-like UPDATE persist: emit the WAL record so the update
94    /// survives a crash, but skip the in-line B-tree upsert. Reads
95    /// continue to see the new entity because `manager.get()` prefers
96    /// the segment over the B-tree; WAL replay re-applies the upsert
97    /// to the B-tree on recovery so the two views converge before
98    /// any stale B-tree page can be surfaced.
99    ///
100    /// Safe to use only when the caller has already updated the
101    /// segment in-place (so live reads see the new value). Cuts out
102    /// the dominant cost of single-row UPDATEs: per-entity serialize
103    /// + leaf descent + page decompress / compress / write_page.
104    pub(crate) fn persist_entities_to_pager_wal_only(
105        &self,
106        collection: &str,
107        entities: &[UnifiedEntity],
108    ) -> Result<(), StoreError> {
109        self.persist_entities_impl(collection, entities, /* skip_btree= */ true)
110    }
111
112    fn persist_entities_impl(
113        &self,
114        collection: &str,
115        entities: &[UnifiedEntity],
116        skip_btree: bool,
117    ) -> Result<(), StoreError> {
118        if entities.is_empty() {
119            return Ok(());
120        }
121
122        if self.pager.is_none() && self.config.embedded_wal_path.is_none() {
123            return Ok(());
124        }
125
126        let fv = self.format_version();
127        let manager = self
128            .get_collection(collection)
129            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
130        let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
131            .iter()
132            .map(|entity| {
133                let metadata = manager.get_metadata(entity.id);
134                (
135                    entity.id.raw().to_be_bytes().to_vec(),
136                    Self::serialize_entity_record(entity, metadata.as_ref(), fv),
137                )
138            })
139            .collect();
140        // u64 IDs encoded as big-endian — lex order = numeric order.
141        serialized.sort_by(|a, b| a.0.cmp(&b.0));
142
143        if !skip_btree {
144            let Some(pager) = &self.pager else {
145                let records: Vec<Vec<u8>> =
146                    serialized.into_iter().map(|(_id, record)| record).collect();
147                self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
148                    collection: collection.to_string(),
149                    records,
150                }])?;
151                return Ok(());
152            };
153            let mut btree_indices = self.btree_indices.write();
154            let btree = btree_indices
155                .entry(collection.to_string())
156                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
157            let root_before = btree.root_page_id();
158
159            // Walks each distinct leaf once, applies all in-place overwrites
160            // that belong there under one read+write. Keys that miss or grow
161            // fall back to the per-key `upsert` path internally.
162            btree.upsert_batch_sorted(&serialized).map_err(|e| {
163                StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
164            })?;
165            let root_after = btree.root_page_id();
166            drop(btree_indices);
167            if root_before != root_after {
168                self.mark_paged_registry_dirty();
169            }
170        }
171        // One WAL action covering the whole bulk. Previously each entity
172        // produced its own `UpsertEntityRecord` action, which then became
173        // its own `WalRecord::PageWrite` triple inside
174        // `StoreCommitCoordinator::append_actions` — N WAL records per
175        // bulk. The batched variant is one WAL record per bulk.
176        //
177        // WAL is emitted even in the `skip_btree` variant — recovery
178        // replay reapplies the upsert to the B-tree, so durability
179        // is unchanged.
180        let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
181        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
182            collection: collection.to_string(),
183            records,
184        }])?;
185
186        Ok(())
187    }
188
189    /// Insert a label→entity_id mapping into the graph label index.
190    pub(crate) fn update_graph_label_index(
191        &self,
192        collection: &str,
193        label: &str,
194        entity_id: EntityId,
195    ) {
196        let key = (collection.to_string(), label.to_string());
197        let mut idx = self.graph_label_index.write();
198        idx.entry(key).or_default().push(entity_id);
199    }
200
201    /// Remove a specific entity_id from the graph label index (called on delete).
202    pub(crate) fn remove_from_graph_label_index(&self, collection: &str, entity_id: EntityId) {
203        let mut idx = self.graph_label_index.write();
204        for ((col, _), ids) in idx.iter_mut() {
205            if col == collection {
206                ids.retain(|&id| id != entity_id);
207            }
208        }
209        // Prune empty entries to keep the index compact
210        idx.retain(|_, ids| !ids.is_empty());
211    }
212
213    pub(crate) fn remove_from_graph_label_index_batch(
214        &self,
215        collection: &str,
216        entity_ids: &[EntityId],
217    ) {
218        if entity_ids.is_empty() {
219            return;
220        }
221        let id_set: std::collections::HashSet<EntityId> = entity_ids.iter().copied().collect();
222        let mut idx = self.graph_label_index.write();
223        for ((col, _), ids) in idx.iter_mut() {
224            if col == collection {
225                ids.retain(|id| !id_set.contains(id));
226            }
227        }
228        idx.retain(|_, ids| !ids.is_empty());
229    }
230
231    /// Look up entity IDs for a graph node label across all collections.
232    pub fn lookup_graph_nodes_by_label(&self, label: &str) -> Vec<EntityId> {
233        let idx = self.graph_label_index.read();
234        idx.iter()
235            .filter(|((_, l), _)| l == label)
236            .flat_map(|(_, ids)| ids.iter().copied())
237            .collect()
238    }
239
240    /// Look up entity IDs for a graph node label scoped to a single collection.
241    pub fn lookup_graph_nodes_by_label_in(&self, collection: &str, label: &str) -> Vec<EntityId> {
242        let idx = self.graph_label_index.read();
243        idx.get(&(collection.to_string(), label.to_string()))
244            .cloned()
245            .unwrap_or_default()
246    }
247
248    pub fn create_collection(&self, name: impl Into<String>) -> Result<(), StoreError> {
249        let name = name.into();
250        let mut collections = self.collections.write();
251
252        if collections.contains_key(&name) {
253            return Err(StoreError::CollectionExists(name));
254        }
255
256        let manager = SegmentManager::with_config(&name, self.config.manager_config.clone());
257        collections.insert(name.clone(), Arc::new(manager));
258        drop(collections);
259
260        if let Err(err) = self.publish_operational_collection_create(&name) {
261            let mut collections = self.collections.write();
262            collections.remove(&name);
263            return Err(err);
264        }
265
266        self.mark_paged_registry_dirty();
267        self.finish_paged_write([StoreWalAction::CreateCollection { name }])?;
268
269        Ok(())
270    }
271
272    /// Get or create a collection
273    pub fn get_or_create_collection(&self, name: impl Into<String>) -> Arc<SegmentManager> {
274        let name = name.into();
275        // Fast path: shared read lock — zero contention for existing collections
276        {
277            let collections = self.collections.read();
278            if let Some(manager) = collections.get(&name) {
279                return Arc::clone(manager);
280            }
281        }
282        // Slow path: exclusive write lock — only when collection is missing
283        let mut collections = self.collections.write();
284        // Double-check after acquiring write lock (another thread may have created it)
285        if let Some(manager) = collections.get(&name) {
286            return Arc::clone(manager);
287        }
288        let manager = Arc::new(SegmentManager::with_config(
289            &name,
290            self.config.manager_config.clone(),
291        ));
292        collections.insert(name, Arc::clone(&manager));
293        self.mark_paged_registry_dirty();
294        manager
295    }
296
297    /// Get a collection
298    pub fn get_collection(&self, name: &str) -> Option<Arc<SegmentManager>> {
299        self.collections.read().get(name).map(Arc::clone)
300    }
301
302    /// Get the context index for cross-structure search.
303    pub fn context_index(&self) -> &ContextIndex {
304        &self.context_index
305    }
306
307    /// Drain WAL-replayed `VectorInsert` records for `collection`
308    /// (issue #694). Returns `None` when nothing was captured at open
309    /// time (in-memory mode, fresh database, or non-turbo collection).
310    /// One-shot: the caller owns the records after this call, the
311    /// store's internal buffer is cleared for that collection.
312    pub fn take_replayed_turbo_inserts(&self, collection: &str) -> Option<Vec<(u64, Vec<f32>)>> {
313        let mut map = self.replayed_turbo_inserts.lock();
314        map.remove(collection)
315    }
316
317    /// Set multiple config KV pairs at once from a JSON tree.
318    /// Keys are flattened with dot-notation: `{"a":{"b":1}}` → `a.b = 1`.
319    pub fn set_config_tree(&self, prefix: &str, json: &crate::serde_json::Value) -> usize {
320        let _ = self.get_or_create_collection("red_config");
321        let mut pairs = Vec::new();
322        flatten_config_json(prefix, json, &mut pairs);
323        let mut saved = 0;
324        for (key, value) in pairs {
325            let entity = UnifiedEntity::new(
326                EntityId::new(0),
327                EntityKind::TableRow {
328                    table: Arc::from("red_config"),
329                    row_id: 0,
330                },
331                EntityData::Row(RowData {
332                    columns: Vec::new(),
333                    named: Some(
334                        [
335                            ("key".to_string(), crate::storage::schema::Value::text(key)),
336                            ("value".to_string(), value),
337                        ]
338                        .into_iter()
339                        .collect(),
340                    ),
341                    schema: None,
342                }),
343            );
344            if self.insert_auto("red_config", entity).is_ok() {
345                saved += 1;
346            }
347        }
348        saved
349    }
350
351    /// Read a single config value from `red_config` by dot-notation key.
352    pub fn get_config(&self, key: &str) -> Option<crate::storage::schema::Value> {
353        let manager = self.get_collection("red_config")?;
354        for entity in manager.query_all(|_| true) {
355            if let EntityData::Row(row) = &entity.data {
356                if let Some(named) = &row.named {
357                    let key_matches = named
358                        .get("key")
359                        .and_then(|v| match v {
360                            crate::storage::schema::Value::Text(s) => Some(s.as_ref() == key),
361                            _ => None,
362                        })
363                        .unwrap_or(false);
364                    if key_matches {
365                        return named.get("value").cloned();
366                    }
367                }
368            }
369        }
370        None
371    }
372
373    /// Replace the opaque store-level auxiliary metadata blob. Persisted
374    /// inside the binary dump (store format V10+); see the `aux_metadata`
375    /// field. RedDB uses it to carry collection contracts through the
376    /// single-file artifact.
377    pub fn set_aux_metadata(&self, bytes: Vec<u8>) {
378        *self.aux_metadata.write() = bytes;
379    }
380
381    /// Read the opaque store-level auxiliary metadata blob. Empty when unset.
382    pub fn aux_metadata(&self) -> Vec<u8> {
383        self.aux_metadata.read().clone()
384    }
385
386    /// List all collections
387    pub fn list_collections(&self) -> Vec<String> {
388        self.collections.read().keys().cloned().collect()
389    }
390
391    /// Drop a collection
392    pub fn drop_collection(&self, name: &str) -> Result<(), StoreError> {
393        self.publish_operational_collection_pending_drop(name)?;
394        let manager = {
395            let mut collections = self.collections.write();
396
397            collections
398                .remove(name)
399                .ok_or_else(|| StoreError::CollectionNotFound(name.to_string()))?
400        };
401
402        let entities = manager.query_all(|_| true);
403        let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
404
405        for entity_id in &entity_ids {
406            self.context_index.remove_entity(*entity_id);
407            let _ = self.unindex_cross_refs(*entity_id);
408        }
409
410        self.btree_indices.write().remove(name);
411
412        self.entity_cache.retain(|entity_id, (collection, _)| {
413            collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
414        });
415
416        self.cross_refs.write().retain(|source_id, refs| {
417            refs.retain(|(target_id, _, target_collection)| {
418                target_collection != name && !entity_ids.iter().any(|id| id == target_id)
419            });
420            !entity_ids.iter().any(|id| id == source_id)
421        });
422
423        self.reverse_refs.write().retain(|target_id, refs| {
424            refs.retain(|(source_id, _, source_collection)| {
425                source_collection != name && !entity_ids.iter().any(|id| id == source_id)
426            });
427            !entity_ids.iter().any(|id| id == target_id)
428        });
429
430        self.mark_paged_registry_dirty();
431        self.finish_paged_write([StoreWalAction::DropCollection {
432            name: name.to_string(),
433        }])?;
434        self.publish_operational_collection_drop_finished(name)?;
435
436        Ok(())
437    }
438
439    /// Insert an entity into a collection
440    pub fn insert(&self, collection: &str, entity: UnifiedEntity) -> Result<EntityId, StoreError> {
441        let manager = self
442            .get_collection(collection)
443            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
444
445        let mut entity = entity;
446        if entity.id.raw() == 0 {
447            entity.id = self.next_entity_id();
448        } else {
449            self.register_entity_id(entity.id);
450        }
451        // Assign per-table sequential row_id if not set
452        if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
453            if *row_id == 0 {
454                *row_id = manager.next_row_id();
455            } else {
456                manager.register_row_id(*row_id);
457            }
458        }
459        entity.ensure_table_logical_id();
460        // Capture graph node label before entity is moved into the manager
461        let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
462        {
463            Some(node.label.clone())
464        } else {
465            None
466        };
467
468        let id = manager.insert(entity)?;
469        self.register_entity_id(id);
470
471        // Update graph label index for GraphNode entities
472        if let Some(ref label) = graph_node_label {
473            self.update_graph_label_index(collection, label, id);
474        }
475
476        // Also insert into B-tree index if pager is active
477        let mut registry_dirty = false;
478        if let Some(pager) = &self.pager {
479            if let Some(entity) = manager.get(id) {
480                let mut btree_indices = self.btree_indices.write();
481                let btree = btree_indices
482                    .entry(collection.to_string())
483                    .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
484                let root_before = btree.root_page_id();
485
486                let key = id.raw().to_be_bytes();
487                let metadata = manager.get_metadata(id);
488                let value = Self::serialize_entity_record(
489                    &entity,
490                    metadata.as_ref(),
491                    self.format_version(),
492                );
493                // Ignore duplicate key errors (update scenario)
494                let _ = btree.insert(&key, &value);
495                registry_dirty = root_before != btree.root_page_id();
496            }
497        }
498
499        // Index cross-references if enabled
500        if self.config.auto_index_refs {
501            if let Some(entity) = manager.get(id) {
502                self.index_cross_refs(&entity, collection)?;
503            }
504        }
505
506        // Perf: skip WAL-action construction when the store is
507        // pagerless. For in-memory benchmarks this saved another
508        // `manager.get(id)` + `serialize_entity_record` per call.
509        if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
510            let actions = manager
511                .get(id)
512                .map(|entity| {
513                    let metadata = manager.get_metadata(id);
514                    vec![StoreWalAction::upsert_entity(
515                        collection,
516                        &entity,
517                        metadata.as_ref(),
518                        self.format_version(),
519                    )]
520                })
521                .unwrap_or_default();
522            if registry_dirty {
523                self.mark_paged_registry_dirty();
524            }
525            self.finish_paged_write(actions)?;
526        }
527
528        Ok(id)
529    }
530
531    /// Turbo bulk insert — optimized fast path.
532    ///
533    /// Single lock for the entire batch. Skips bloom filter, memtable,
534    /// context index, and cross-ref indexing. B-tree writes are batched.
535    pub fn bulk_insert(
536        &self,
537        collection: &str,
538        mut entities: Vec<UnifiedEntity>,
539    ) -> Result<Vec<EntityId>, StoreError> {
540        // REDDB_BULK_TIMING=1 prints a per-call breakdown of the bulk
541        // insert path to stderr. Off by default — used by the reddb
542        // benchmark harness to locate ingest bottlenecks.
543        let trace = matches!(
544            std::env::var("REDDB_BULK_TIMING").ok().as_deref(),
545            Some("1") | Some("true") | Some("on")
546        );
547        let t_start = std::time::Instant::now();
548        let n = entities.len();
549        let manager = self.get_or_create_collection(collection);
550        let t_get_coll = t_start.elapsed();
551
552        // Assign IDs and per-table row_ids before serialization. Bulk
553        // insert must follow the same global ID semantics as
554        // insert()/insert_auto(). Reserve ID ranges in single fetch_add
555        // calls instead of one-per-entity. The overwhelming common case
556        // is "every entity comes in with id==0 and kind==TableRow with
557        // row_id==0" (the wire bulk insert path). Any entity that
558        // already carries an id or row_id falls back to the individual
559        // register_* path to preserve those semantics exactly.
560        let t0 = std::time::Instant::now();
561        let n_missing_entity_ids = entities.iter().filter(|e| e.id.raw() == 0).count() as u64;
562        let n_missing_row_ids = entities
563            .iter()
564            .filter(|e| matches!(e.kind, EntityKind::TableRow { row_id: 0, .. }))
565            .count() as u64;
566        let mut entity_id_range = if n_missing_entity_ids > 0 {
567            self.reserve_entity_ids(n_missing_entity_ids)
568        } else {
569            0..0
570        };
571        let mut row_id_range = if n_missing_row_ids > 0 {
572            manager.reserve_row_ids(n_missing_row_ids)
573        } else {
574            0..0
575        };
576        for entity in &mut entities {
577            if entity.id.raw() == 0 {
578                let next = entity_id_range
579                    .next()
580                    .expect("reserved entity-id range exhausted");
581                entity.id = EntityId::new(next);
582            } else {
583                self.register_entity_id(entity.id);
584            }
585            if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
586                if *row_id == 0 {
587                    *row_id = row_id_range
588                        .next()
589                        .expect("reserved row-id range exhausted");
590                } else {
591                    manager.register_row_id(*row_id);
592                }
593            }
594            entity.ensure_table_logical_id();
595        }
596        let t_assign_ids = t0.elapsed();
597
598        // Capture graph node labels before entities are moved into the segment manager
599        let graph_labels: Vec<Option<(String, EntityId)>> = entities
600            .iter()
601            .map(|e| {
602                if let EntityKind::GraphNode(ref node) = e.kind {
603                    Some((node.label.clone(), e.id))
604                } else {
605                    None
606                }
607            })
608            .collect();
609
610        // Pre-serialize for B-tree while we still have references.
611        // `serialize_entity_record` is pure; with `n >= 1024` the
612        // rayon dispatch cost is amortised and the 16-core serialize
613        // fan-out becomes the biggest single win on insert_bulk. For
614        // smaller batches, stay serial — micro-batches pay more in
615        // work-stealing overhead than they save.
616        let t0 = std::time::Instant::now();
617        let serialized: Option<Vec<(Vec<u8>, Vec<u8>)>> =
618            if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
619                let fv = self.format_version();
620                let serial_map = |e: &UnifiedEntity| {
621                    (
622                        e.id.raw().to_be_bytes().to_vec(),
623                        Self::serialize_entity_record(e, None, fv),
624                    )
625                };
626                // Gate chosen to match the bench's `BULK_BATCH_SIZE=1000`.
627                // Rayon's dispatch overhead is ~30µs/batch — on a 15-col
628                // row serializing for ~40µs the break-even is around
629                // 200-300 rows on a 16-core host. Keep a safety margin
630                // at 512.
631                if entities.len() >= 512 {
632                    use rayon::prelude::*;
633                    Some(entities.par_iter().map(serial_map).collect())
634                } else {
635                    Some(entities.iter().map(serial_map).collect())
636                }
637            } else {
638                None
639            };
640        let t_serialize = t0.elapsed();
641
642        // Move entities into segment
643        let t0 = std::time::Instant::now();
644        let ids = manager.bulk_insert(entities)?;
645        let t_manager = t0.elapsed();
646        for id in &ids {
647            self.register_entity_id(*id);
648        }
649
650        // Update graph label index for bulk-inserted GraphNode entities
651        for (label, entity_id) in graph_labels.iter().flatten() {
652            self.update_graph_label_index(collection, label, *entity_id);
653        }
654
655        // REDDB_BULK_SKIP_PERSIST_UNSAFE=1 skips the persistent B-tree index
656        // during bulk ingest.
657        //
658        // UNSAFE: for ephemeral benchmark containers ONLY.
659        // This flag is silently ignored when a pager (durable storage) is active.
660        // In persistent mode, bulk inserts ALWAYS write to the B-tree so the data
661        // survives a cold restart without any manual rebuild step.
662        //
663        // The flag is only honoured when self.pager is None (in-memory / ephemeral).
664        let skip_btree_requested = matches!(
665            std::env::var("REDDB_BULK_SKIP_PERSIST_UNSAFE")
666                .ok()
667                .as_deref(),
668            Some("1") | Some("true") | Some("on")
669        );
670        // Honour the flag only when there is no durable pager.
671        // If a pager exists we are in persistent mode → always persist.
672        let skip_btree = skip_btree_requested && self.pager.is_none();
673        if skip_btree_requested && !skip_btree {
674            // Flag was set but we ignored it because we have a real pager.
675            static IGNORED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
676            IGNORED.get_or_init(|| {
677                tracing::warn!(
678                    "REDDB_BULK_SKIP_PERSIST_UNSAFE set but durable pager is \
679                     active — flag ignored; bulk inserts will be persisted normally"
680                );
681            });
682        } else if skip_btree {
683            // Ephemeral mode and flag is active — warn once.
684            static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
685            WARNED.get_or_init(|| {
686                tracing::warn!(
687                    "REDDB_BULK_SKIP_PERSIST_UNSAFE set (ephemeral/no-pager mode) — \
688                     bulk inserts NOT durable; data will be lost on restart"
689                );
690            });
691        }
692
693        // Batch B-tree write from pre-serialized data.
694        // Uses sorted bulk insert: walks to a leaf once, appends many entries,
695        // writes each leaf exactly once per batch — O(N) instead of O(N²).
696        let mut t_btree_lock = std::time::Duration::ZERO;
697        let mut t_btree_insert = std::time::Duration::ZERO;
698        let mut t_flush = std::time::Duration::ZERO;
699        if !skip_btree {
700            if let (Some(pager), Some(batch)) = (&self.pager, serialized.as_ref()) {
701                let t0 = std::time::Instant::now();
702                let mut btree_indices = self.btree_indices.write();
703                let btree = btree_indices
704                    .entry(collection.to_string())
705                    .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
706                let root_before = btree.root_page_id();
707                t_btree_lock = t0.elapsed();
708
709                let t0 = std::time::Instant::now();
710                let _ = btree.bulk_insert_sorted(batch);
711                t_btree_insert = t0.elapsed();
712                let registry_dirty = root_before != btree.root_page_id();
713
714                let t0 = std::time::Instant::now();
715                if registry_dirty {
716                    self.mark_paged_registry_dirty();
717                }
718                t_flush = t0.elapsed();
719            }
720        }
721
722        // Fold 25k per-entity `UpsertEntityRecord` actions into one
723        // `BulkUpsertEntityRecords` — same on-disk semantics (replay
724        // applies each record by id), one `WalRecord::PageWrite`
725        // instead of N, and `record.clone()` drops out because we
726        // consume `serialized`.
727        let actions = serialized
728            .map(|batch| {
729                let records: Vec<Vec<u8>> =
730                    batch.into_iter().map(|(_key, record)| record).collect();
731                vec![StoreWalAction::BulkUpsertEntityRecords {
732                    collection: collection.to_string(),
733                    records,
734                }]
735            })
736            .unwrap_or_default();
737        self.finish_paged_write(actions)?;
738
739        if trace {
740            tracing::debug!(
741                n,
742                total = ?t_start.elapsed(),
743                get_coll = ?t_get_coll,
744                assign = ?t_assign_ids,
745                serialize = ?t_serialize,
746                manager = ?t_manager,
747                btree_lock = ?t_btree_lock,
748                btree = ?t_btree_insert,
749                flush = ?t_flush,
750                "bulk_insert timing"
751            );
752        }
753
754        Ok(ids)
755    }
756
757    /// Insert an entity, creating collection if needed
758    pub fn insert_auto(
759        &self,
760        collection: &str,
761        entity: UnifiedEntity,
762    ) -> Result<EntityId, StoreError> {
763        let manager = self.get_or_create_collection(collection);
764        let mut entity = entity;
765        if entity.id.raw() == 0 {
766            entity.id = self.next_entity_id();
767        } else {
768            self.register_entity_id(entity.id);
769        }
770        // Assign per-table sequential row_id if not set
771        if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
772            if *row_id == 0 {
773                *row_id = manager.next_row_id();
774            } else {
775                manager.register_row_id(*row_id);
776            }
777        }
778        entity.ensure_table_logical_id();
779
780        // Capture graph node label before entity is moved into the manager
781        let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
782        {
783            Some(node.label.clone())
784        } else {
785            None
786        };
787        // Index into context index before consuming the entity
788        self.context_index.index_entity(collection, &entity);
789
790        // Pre-serialize the entity record (B-tree image == WAL action body)
791        // and index cross-refs while we still own the entity. Mirrors the
792        // bulk_insert path (see line 559-580 above): segment.insert assigns
793        // a fresh `sequence_id`, but the bulk path already accepts a
794        // sequence_id=0 on-disk image (replay calls manager.insert which
795        // re-assigns sequence_id from the segment's allocator), so the
796        // single-row path can do the same. Fresh inserts never carry
797        // metadata — set_metadata is a separate call — so we pass None,
798        // matching how the bulk path serializes records.
799        //
800        // Eliminates the post-insert `manager.get(id)` clone of the just-
801        // inserted UnifiedEntity (HashMap<String, Value> + per-field
802        // Strings). Per `docs/perf/insert_sequential-2026-05-05.md` P2 this
803        // was the dominant clone in the insert_sequential hot path.
804        let id_for_serialize = entity.id;
805        let serialized_record: Option<Vec<u8>> =
806            if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
807                Some(Self::serialize_entity_record(
808                    &entity,
809                    None,
810                    self.format_version(),
811                ))
812            } else {
813                None
814            };
815        if self.config.auto_index_refs {
816            self.index_cross_refs(&entity, collection)?;
817        }
818
819        let id = manager.insert(entity)?;
820        debug_assert_eq!(id, id_for_serialize);
821        // `register_entity_id` already advances the atomic counter on
822        // the allocation path above (`self.next_entity_id()`), so the
823        // second call here is a no-op CAS loop on the hot path. Only
824        // needed for the caller-supplied-id branch which happens via
825        // the `register_entity_id` call on line 573.
826
827        // Update graph label index for GraphNode entities
828        if let Some(ref label) = graph_node_label {
829            self.update_graph_label_index(collection, label, id);
830        }
831
832        let mut registry_dirty = false;
833        if let (Some(_pager), Some(record)) = (&self.pager, serialized_record.as_ref()) {
834            if let Some(btree) = self.get_or_create_btree(collection) {
835                let root_before = btree.root_page_id();
836
837                let key = id.raw().to_be_bytes();
838                btree.insert(&key, record).map_err(|e| {
839                    StoreError::Io(std::io::Error::other(format!(
840                        "B-tree insert error while inserting '{collection}'/{id}: {e}"
841                    )))
842                })?;
843                registry_dirty = root_before != btree.root_page_id();
844            }
845        }
846
847        // Perf: pagerless → skip WAL-action construction (saves a
848        // third manager.get + entity serialize per insert). For
849        // in-memory runtimes finish_paged_write is a no-op.
850        if self.pager.is_some() || self.config.embedded_wal_path.is_some() {
851            let actions = serialized_record
852                .map(|record| {
853                    vec![StoreWalAction::UpsertEntityRecord {
854                        collection: collection.to_string(),
855                        record,
856                    }]
857                })
858                .unwrap_or_default();
859            if registry_dirty {
860                self.mark_paged_registry_dirty();
861            }
862            self.finish_paged_write(actions)?;
863        }
864        Ok(id)
865    }
866
867    /// Get an entity from a collection
868    ///
869    /// Prefers the live SegmentManager view so reads after update/delete observe
870    /// the current in-memory state even when the paged B-tree image has not been
871    /// refreshed yet. Falls back to the B-tree image for recovery-oriented reads.
872    pub fn get(&self, collection: &str, id: EntityId) -> Option<UnifiedEntity> {
873        // Prefer the live manager state to avoid stale reads after manager.update().
874        if let Some(entity) = self
875            .get_collection(collection)
876            .and_then(|manager| manager.get(id))
877        {
878            return Some(entity);
879        }
880
881        // Fall back to the paged B-tree image if the manager does not currently hold the row.
882        if self.pager.is_some() {
883            let btree_indices = self.btree_indices.read();
884            if let Some(btree) = btree_indices.get(collection) {
885                let key = id.raw().to_be_bytes();
886                if let Ok(Some(value)) = btree.get(&key) {
887                    if let Ok((entity, _)) =
888                        Self::deserialize_entity_record(&value, self.format_version())
889                    {
890                        return Some(entity);
891                    }
892                }
893            }
894        }
895
896        None
897    }
898
899    /// Resolve a table row by stable logical identity.
900    ///
901    /// Legacy rows use `logical_id == id`, so the physical-id lookup remains
902    /// the fast path. Versioned rows may have a different physical id and fall
903    /// back to a scan until the history-store/index slices add a dedicated map.
904    pub fn get_table_row_by_logical_id(
905        &self,
906        collection: &str,
907        logical_id: EntityId,
908    ) -> Option<UnifiedEntity> {
909        if let Some(entity) = self.get(collection, logical_id) {
910            if matches!(entity.kind, EntityKind::TableRow { .. })
911                && entity.logical_id() == logical_id
912                && entity.xmax == 0
913            {
914                return Some(entity);
915            }
916        }
917
918        let manager = self.get_collection(collection)?;
919        let mut matches = manager.query_all(|entity| {
920            matches!(entity.kind, EntityKind::TableRow { .. }) && entity.logical_id() == logical_id
921        });
922        matches
923            .iter()
924            .find(|entity| entity.xmax == 0)
925            .cloned()
926            .or_else(|| matches.pop())
927    }
928
929    pub fn table_row_versions_by_logical_id(
930        &self,
931        collection: &str,
932        logical_id: EntityId,
933    ) -> Vec<UnifiedEntity> {
934        self.get_collection(collection)
935            .map(|manager| {
936                manager.query_all(|entity| {
937                    matches!(entity.kind, EntityKind::TableRow { .. })
938                        && entity.logical_id() == logical_id
939                })
940            })
941            .unwrap_or_default()
942    }
943
944    pub fn vacuum_mvcc_history(
945        &self,
946        collection: &str,
947        cutoff_xid: u64,
948    ) -> Result<MvccVacuumStats, StoreError> {
949        let manager = self
950            .get_collection(collection)
951            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
952        let entities =
953            manager.query_all(|entity| matches!(entity.kind, EntityKind::TableRow { .. }));
954        let mut logical = HashMap::<EntityId, (bool, u64)>::new();
955        for entity in &entities {
956            let entry = logical.entry(entity.logical_id()).or_insert((false, 0));
957            if entity.xmax == 0 {
958                entry.0 = true;
959            }
960            entry.1 = entry.1.max(entity.xmin);
961        }
962
963        let mut stats = MvccVacuumStats::default();
964        let mut reclaim_ids = Vec::new();
965        for entity in entities {
966            if entity.xmax == 0 {
967                continue;
968            }
969            stats.scanned_versions += 1;
970            let (has_live_version, max_xmin) = logical
971                .get(&entity.logical_id())
972                .copied()
973                .unwrap_or((false, entity.xmin));
974            let is_delete_tombstone = !has_live_version && entity.xmin == max_xmin;
975            if entity.xmax < cutoff_xid {
976                stats.reclaimed_versions += 1;
977                if is_delete_tombstone {
978                    stats.reclaimed_tombstones += 1;
979                } else {
980                    stats.reclaimed_history_versions += 1;
981                }
982                reclaim_ids.push(entity.id);
983            } else {
984                stats.retained_versions += 1;
985                if is_delete_tombstone {
986                    stats.retained_tombstones += 1;
987                } else {
988                    stats.retained_history_versions += 1;
989                }
990            }
991        }
992
993        if !reclaim_ids.is_empty() {
994            self.delete_batch(collection, &reclaim_ids)?;
995        }
996        Ok(stats)
997    }
998
999    pub(crate) fn install_versioned_table_row_update(
1000        &self,
1001        collection: &str,
1002        old_version: UnifiedEntity,
1003        mut new_version: UnifiedEntity,
1004        metadata: Option<&Metadata>,
1005    ) -> Result<(), StoreError> {
1006        let manager = self
1007            .get_collection(collection)
1008            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1009
1010        let old_id = old_version.id;
1011        let new_id = new_version.id;
1012        let inherited_metadata = metadata.cloned().or_else(|| manager.get_metadata(old_id));
1013
1014        self.entity_cache.remove(old_id.raw());
1015        self.entity_cache.remove(new_id.raw());
1016        manager.update(old_version.clone())?;
1017        self.context_index.remove_entity(old_id);
1018
1019        self.register_entity_id(new_version.id);
1020        if let EntityKind::TableRow { ref mut row_id, .. } = new_version.kind {
1021            if *row_id == 0 {
1022                *row_id = manager.next_row_id();
1023            } else {
1024                manager.register_row_id(*row_id);
1025            }
1026        }
1027        new_version.ensure_table_logical_id();
1028        manager.insert(new_version.clone())?;
1029        if let Some(metadata) = inherited_metadata {
1030            manager.set_metadata(new_id, metadata)?;
1031        }
1032        self.context_index.index_entity(collection, &new_version);
1033        if self.config.auto_index_refs {
1034            self.index_cross_refs(&new_version, collection)?;
1035        }
1036
1037        let old_metadata = manager.get_metadata(old_id);
1038        let new_metadata = manager.get_metadata(new_id);
1039        let fv = self.format_version();
1040        let records = vec![
1041            Self::serialize_entity_record(&old_version, old_metadata.as_ref(), fv),
1042            Self::serialize_entity_record(&new_version, new_metadata.as_ref(), fv),
1043        ];
1044        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
1045            collection: collection.to_string(),
1046            records,
1047        }])?;
1048
1049        Ok(())
1050    }
1051
1052    /// Batch-fetch multiple entities from the same collection in minimal lock acquisitions.
1053    ///
1054    /// Preferred over N individual `get()` calls in indexed-scan loops (sorted index,
1055    /// bitmap, hash). Reduces lock acquisitions from N×3 to 2-3 total.
1056    /// Preserves input order: `result[i]` corresponds to `ids[i]`.
1057    pub fn get_batch(&self, collection: &str, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
1058        match self.get_collection(collection) {
1059            Some(manager) => manager.get_many(ids),
1060            None => vec![None; ids.len()],
1061        }
1062    }
1063
1064    /// Get an entity from any collection
1065    pub fn get_any(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
1066        // Sharded LRU probe: hits / misses are counted by `EntityCache::get`,
1067        // so external observers can read `entity_cache_hit_rate()` without
1068        // any extra wiring on the call site.
1069        if let Some(cached) = self.entity_cache.get(id.raw()) {
1070            return Some(cached);
1071        }
1072
1073        // Cache miss — fall back to the full collection scan, then memoise.
1074        let collections = self.collections.read();
1075        for (name, manager) in collections.iter() {
1076            if let Some(entity) = manager.get(id) {
1077                let result = (name.clone(), entity);
1078                // Drop the collections read guard before taking any cache
1079                // lock — the cache shards are independent, but releasing
1080                // here keeps `collections` contention low.
1081                drop(collections);
1082                self.entity_cache.insert(id.raw(), result.clone());
1083                return Some(result);
1084            }
1085        }
1086        None
1087    }
1088
1089    /// Hit rate of the store's entity cache (`get_any` lookups).
1090    ///
1091    /// Returns `None` until the cache has served at least one lookup.
1092    /// Exposed for observability — e.g. dashboards distinguishing graph
1093    /// workloads (high hit rate) from OLTP DML (≈ 0 % hit rate).
1094    pub fn entity_cache_hit_rate(&self) -> Option<f64> {
1095        self.entity_cache.hit_rate()
1096    }
1097
1098    /// Snapshot of cache hit / miss / eviction counters and current size.
1099    pub fn entity_cache_stats(&self) -> super::super::entity_cache::EntityCacheStats {
1100        self.entity_cache.stats()
1101    }
1102
1103    /// Delete an entity
1104    pub fn delete(&self, collection: &str, id: EntityId) -> Result<bool, StoreError> {
1105        // Invalidate entity cache (read-lock probe; no write lock when cold).
1106        self.entity_cache.remove(id.raw());
1107        let manager = self
1108            .get_collection(collection)
1109            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1110
1111        let deleted = manager.delete(id)?;
1112        if !deleted {
1113            return Ok(false);
1114        }
1115
1116        // Remove from B-tree index if active
1117        let mut registry_dirty = false;
1118        if self.pager.is_some() {
1119            let btree_indices = self.btree_indices.read();
1120            if let Some(btree) = btree_indices.get(collection) {
1121                let root_before = btree.root_page_id();
1122                let key = id.raw().to_be_bytes();
1123                let _ = btree.delete(&key);
1124                registry_dirty = root_before != btree.root_page_id();
1125            }
1126        }
1127
1128        // Remove cross-references
1129        self.unindex_cross_refs(id)?;
1130
1131        // Remove from graph label index
1132        self.remove_from_graph_label_index(collection, id);
1133
1134        if registry_dirty {
1135            self.mark_paged_registry_dirty();
1136        }
1137        self.finish_paged_write([StoreWalAction::DeleteEntityRecord {
1138            collection: collection.to_string(),
1139            entity_id: id.raw(),
1140        }])?;
1141
1142        Ok(true)
1143    }
1144
1145    pub fn delete_batch(
1146        &self,
1147        collection: &str,
1148        ids: &[EntityId],
1149    ) -> Result<Vec<EntityId>, StoreError> {
1150        if ids.is_empty() {
1151            return Ok(Vec::new());
1152        }
1153
1154        // Sharded invalidation. The bounded LRU's shard write lock is
1155        // only acquired when the shard actually carries one of these ids,
1156        // so the bench-typical zero-hit `delete_sequential` workload never
1157        // takes a write lock here at all.
1158        self.entity_cache.remove_many(ids.iter().map(|id| id.raw()));
1159
1160        let manager = self
1161            .get_collection(collection)
1162            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1163
1164        let deleted_ids = manager.delete_batch(ids)?;
1165        if deleted_ids.is_empty() {
1166            return Ok(deleted_ids);
1167        }
1168
1169        let mut registry_dirty = false;
1170        if self.pager.is_some() {
1171            let btree_indices = self.btree_indices.read();
1172            if let Some(btree) = btree_indices.get(collection) {
1173                let root_before = btree.root_page_id();
1174                for id in &deleted_ids {
1175                    let key = id.raw().to_be_bytes();
1176                    let _ = btree.delete(&key);
1177                }
1178                registry_dirty = root_before != btree.root_page_id();
1179            }
1180        }
1181
1182        self.unindex_cross_refs_batch(&deleted_ids)?;
1183        self.remove_from_graph_label_index_batch(collection, &deleted_ids);
1184        if registry_dirty {
1185            self.mark_paged_registry_dirty();
1186        }
1187        let actions = deleted_ids
1188            .iter()
1189            .map(|id| StoreWalAction::DeleteEntityRecord {
1190                collection: collection.to_string(),
1191                entity_id: id.raw(),
1192            })
1193            .collect::<Vec<_>>();
1194        self.finish_paged_write(actions)?;
1195
1196        Ok(deleted_ids)
1197    }
1198
1199    /// Set metadata for an entity
1200    pub fn set_metadata(
1201        &self,
1202        collection: &str,
1203        id: EntityId,
1204        metadata: Metadata,
1205    ) -> Result<(), StoreError> {
1206        let manager = self
1207            .get_collection(collection)
1208            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
1209
1210        manager.set_metadata(id, metadata)?;
1211        if let Some(entity) = manager.get(id) {
1212            self.persist_entities_to_pager(collection, std::slice::from_ref(&entity))?;
1213        }
1214        Ok(())
1215    }
1216
1217    /// Get metadata for an entity
1218    pub fn get_metadata(&self, collection: &str, id: EntityId) -> Option<Metadata> {
1219        self.get_collection(collection)?.get_metadata(id)
1220    }
1221
1222    /// Add a cross-reference between entities
1223    pub fn add_cross_ref(
1224        &self,
1225        source_collection: &str,
1226        source_id: EntityId,
1227        target_collection: &str,
1228        target_id: EntityId,
1229        ref_type: RefType,
1230        weight: f32,
1231    ) -> Result<(), StoreError> {
1232        // Check source exists
1233        let source_manager = self
1234            .get_collection(source_collection)
1235            .ok_or_else(|| StoreError::CollectionNotFound(source_collection.to_string()))?;
1236
1237        if source_manager.get(source_id).is_none() {
1238            return Err(StoreError::EntityNotFound(source_id));
1239        }
1240
1241        // Check target exists
1242        let target_manager = self
1243            .get_collection(target_collection)
1244            .ok_or_else(|| StoreError::CollectionNotFound(target_collection.to_string()))?;
1245
1246        if target_manager.get(target_id).is_none() {
1247            return Err(StoreError::EntityNotFound(target_id));
1248        }
1249
1250        // Check limits
1251        let current_refs = self
1252            .cross_refs
1253            .read()
1254            .get(&source_id)
1255            .map_or(0, |v| v.len());
1256
1257        if current_refs >= self.config.max_cross_refs {
1258            return Err(StoreError::TooManyRefs(source_id));
1259        }
1260
1261        let mut registry_dirty = false;
1262        {
1263            let mut forward = self.cross_refs.write();
1264            let refs = forward.entry(source_id).or_default();
1265            let inserted = !refs.iter().any(|(id, kind, coll)| {
1266                *id == target_id && *kind == ref_type && coll == target_collection
1267            });
1268            if inserted {
1269                refs.push((target_id, ref_type, target_collection.to_string()));
1270                registry_dirty = true;
1271            }
1272        }
1273
1274        {
1275            let mut reverse = self.reverse_refs.write();
1276            let refs = reverse.entry(target_id).or_default();
1277            let inserted = !refs.iter().any(|(id, kind, coll)| {
1278                *id == source_id && *kind == ref_type && coll == source_collection
1279            });
1280            if inserted {
1281                refs.push((source_id, ref_type, source_collection.to_string()));
1282                registry_dirty = true;
1283            }
1284        }
1285
1286        if let Some(mut entity) = source_manager.get(source_id) {
1287            if !entity.cross_refs().iter().any(|xref| {
1288                xref.target == target_id
1289                    && xref.ref_type == ref_type
1290                    && xref.target_collection == target_collection
1291            }) {
1292                let cross_ref = CrossRef::with_weight(
1293                    source_id,
1294                    target_id,
1295                    target_collection,
1296                    ref_type,
1297                    weight,
1298                );
1299                entity.add_cross_ref(cross_ref);
1300                let _ = source_manager.update(entity.clone());
1301                registry_dirty = true;
1302                self.persist_entities_to_pager(source_collection, std::slice::from_ref(&entity))?;
1303            }
1304        }
1305
1306        if registry_dirty {
1307            self.mark_paged_registry_dirty();
1308            if matches!(
1309                self.config.durability_mode,
1310                crate::api::DurabilityMode::Strict
1311            ) {
1312                self.flush_paged_state()?;
1313            }
1314        }
1315
1316        Ok(())
1317    }
1318
1319    /// Get cross-references from an entity
1320    pub fn get_refs_from(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1321        self.cross_refs.read().get(&id).cloned().unwrap_or_default()
1322    }
1323
1324    /// Get cross-references to an entity
1325    pub fn get_refs_to(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1326        self.reverse_refs
1327            .read()
1328            .get(&id)
1329            .cloned()
1330            .unwrap_or_default()
1331    }
1332
1333    /// Expand cross-references to get related entities
1334    pub fn expand_refs(
1335        &self,
1336        id: EntityId,
1337        depth: u32,
1338        ref_types: Option<&[RefType]>,
1339    ) -> Vec<(UnifiedEntity, u32, RefType)> {
1340        let mut results = Vec::new();
1341        let mut visited = std::collections::HashSet::new();
1342        visited.insert(id);
1343
1344        self.expand_refs_recursive(id, depth, ref_types, &mut visited, &mut results, 1);
1345
1346        results
1347    }
1348
1349    fn expand_refs_recursive(
1350        &self,
1351        id: EntityId,
1352        max_depth: u32,
1353        ref_types: Option<&[RefType]>,
1354        visited: &mut std::collections::HashSet<EntityId>,
1355        results: &mut Vec<(UnifiedEntity, u32, RefType)>,
1356        current_depth: u32,
1357    ) {
1358        if current_depth > max_depth {
1359            return;
1360        }
1361
1362        for (target_id, ref_type, target_collection) in self.get_refs_from(id) {
1363            if visited.contains(&target_id) {
1364                continue;
1365            }
1366
1367            if let Some(types) = ref_types {
1368                if !types.contains(&ref_type) {
1369                    continue;
1370                }
1371            }
1372
1373            visited.insert(target_id);
1374
1375            if let Some(entity) = self.get(&target_collection, target_id) {
1376                results.push((entity, current_depth, ref_type));
1377
1378                // Recurse
1379                self.expand_refs_recursive(
1380                    target_id,
1381                    max_depth,
1382                    ref_types,
1383                    visited,
1384                    results,
1385                    current_depth + 1,
1386                );
1387            }
1388        }
1389    }
1390
1391    /// Index cross-references from an entity
1392    pub(crate) fn index_cross_refs(
1393        &self,
1394        entity: &UnifiedEntity,
1395        collection: &str,
1396    ) -> Result<(), StoreError> {
1397        let mut registry_dirty = false;
1398        for cross_ref in entity.cross_refs() {
1399            if cross_ref.target_collection.is_empty() {
1400                continue;
1401            }
1402            {
1403                let mut forward = self.cross_refs.write();
1404                let refs = forward.entry(cross_ref.source).or_default();
1405                let inserted = !refs.iter().any(|(id, kind, coll)| {
1406                    *id == cross_ref.target
1407                        && *kind == cross_ref.ref_type
1408                        && coll == &cross_ref.target_collection
1409                });
1410                if inserted {
1411                    refs.push((
1412                        cross_ref.target,
1413                        cross_ref.ref_type,
1414                        cross_ref.target_collection.clone(),
1415                    ));
1416                    registry_dirty = true;
1417                }
1418            }
1419
1420            {
1421                let mut reverse = self.reverse_refs.write();
1422                let refs = reverse.entry(cross_ref.target).or_default();
1423                let inserted = !refs.iter().any(|(id, kind, coll)| {
1424                    *id == cross_ref.source && *kind == cross_ref.ref_type && coll == collection
1425                });
1426                if inserted {
1427                    refs.push((cross_ref.source, cross_ref.ref_type, collection.to_string()));
1428                    registry_dirty = true;
1429                }
1430            }
1431        }
1432
1433        if registry_dirty {
1434            self.mark_paged_registry_dirty();
1435        }
1436
1437        Ok(())
1438    }
1439
1440    /// Remove cross-references for an entity
1441    pub(crate) fn unindex_cross_refs(&self, id: EntityId) -> Result<(), StoreError> {
1442        // Remove forward refs
1443        self.cross_refs.write().remove(&id);
1444
1445        // Remove from reverse refs (scan all)
1446        let mut reverse = self.reverse_refs.write();
1447        for refs in reverse.values_mut() {
1448            refs.retain(|(source, _, _)| *source != id);
1449        }
1450        reverse.remove(&id);
1451        self.mark_paged_registry_dirty();
1452
1453        Ok(())
1454    }
1455
1456    pub(crate) fn unindex_cross_refs_batch(&self, ids: &[EntityId]) -> Result<(), StoreError> {
1457        if ids.is_empty() {
1458            return Ok(());
1459        }
1460
1461        let id_set: std::collections::HashSet<EntityId> = ids.iter().copied().collect();
1462
1463        // Read-only probe: under steady-state delete workloads most rows have
1464        // no cross-refs at all, so both maps are entirely cold for the batch.
1465        // Acquiring the write lock to scan `reverse_refs.values_mut()` over an
1466        // unrelated dictionary was the dominant cost in #85's `delete_sequential`
1467        // profile. The read-lock probe below is O(|ids|) hashmap lookups against
1468        // each map, vs. O(|reverse_refs|) for the write-path scan it replaces.
1469        //
1470        // Decision: skip the write path iff
1471        //   - no deleted id is a key in `cross_refs`  (no outbound refs to drop,
1472        //     and so no `(deleted_id, _, _)` source entry exists anywhere in
1473        //     `reverse_refs.values()` either — the two maps are kept in sync by
1474        //     `add_cross_ref`), AND
1475        //   - no deleted id is a key in `reverse_refs` (no inbound refs to drop).
1476        let needs_forward_cleanup = {
1477            let forward = self.cross_refs.read();
1478            id_set.iter().any(|id| forward.contains_key(id))
1479        };
1480        let needs_reverse_cleanup = {
1481            let reverse = self.reverse_refs.read();
1482            id_set.iter().any(|id| reverse.contains_key(id))
1483        };
1484
1485        if !needs_forward_cleanup && !needs_reverse_cleanup {
1486            self.unindex_cross_refs_fast_path
1487                .fetch_add(1, Ordering::Relaxed);
1488            return Ok(());
1489        }
1490
1491        if needs_forward_cleanup {
1492            let mut forward = self.cross_refs.write();
1493            for id in &id_set {
1494                forward.remove(id);
1495            }
1496        }
1497
1498        if needs_reverse_cleanup || needs_forward_cleanup {
1499            // The inner `values_mut()` scan is only needed when at least one
1500            // deleted id was a *source* somewhere — which, by the invariant
1501            // above, can only happen when it had outbound forward refs.
1502            let mut reverse = self.reverse_refs.write();
1503            if needs_forward_cleanup {
1504                for refs in reverse.values_mut() {
1505                    refs.retain(|(source, _, _)| !id_set.contains(source));
1506                }
1507            }
1508            reverse.retain(|target, refs| !id_set.contains(target) && !refs.is_empty());
1509        }
1510        self.mark_paged_registry_dirty();
1511
1512        Ok(())
1513    }
1514
1515    /// Test/observability hook: number of times `unindex_cross_refs_batch`
1516    /// took the read-only fast path. Used to pin the early-exit in tests
1517    /// and as a cheap signal in delete-heavy benchmarks.
1518    pub fn unindex_cross_refs_fast_path_hits(&self) -> u64 {
1519        self.unindex_cross_refs_fast_path.load(Ordering::Relaxed)
1520    }
1521
1522    /// Query across all collections with a filter
1523    pub fn query_all<F>(&self, filter: F) -> Vec<(String, UnifiedEntity)>
1524    where
1525        F: Fn(&UnifiedEntity) -> bool + Clone + Send + Sync,
1526    {
1527        // Issue #815 — snapshot the (name, manager) handles under a brief
1528        // read lock, then release the `collections` lock *before* the
1529        // potentially long per-collection scan. Holding `collections.read()`
1530        // across a full-store scan let a large `/catalog` snapshot block (and
1531        // be blocked by) the replica apply loop's collection-creation writes:
1532        // under parking_lot a writer waiting behind the long read parks every
1533        // subsequent reader, so `/catalog` and readiness wedged during a big
1534        // WAL re-apply. The managers are `Arc`-shared with their own internal
1535        // locks, so scanning them off the map lock is safe and keeps the HTTP
1536        // surface responsive.
1537        let pairs: Vec<(String, Arc<SegmentManager>)> = {
1538            let collections = self.collections.read();
1539            collections
1540                .iter()
1541                .map(|(name, mgr)| (name.clone(), Arc::clone(mgr)))
1542                .collect()
1543        };
1544
1545        let use_parallel = pairs.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1546        if !use_parallel {
1547            // Single collection — no parallelism overhead
1548            return pairs
1549                .into_iter()
1550                .flat_map(|(name, mgr)| {
1551                    mgr.query_all(filter.clone())
1552                        .into_iter()
1553                        .map(move |e| (name.clone(), e))
1554                })
1555                .collect();
1556        }
1557
1558        // Multiple collections — scan in parallel
1559        let filter_ref = &filter;
1560        let collection_results: Vec<Vec<(String, UnifiedEntity)>> = std::thread::scope(|s| {
1561            pairs
1562                .iter()
1563                .map(|(name, manager)| {
1564                    let name = (*name).clone();
1565                    s.spawn(move || {
1566                        manager
1567                            .query_all(|e| filter_ref(e))
1568                            .into_iter()
1569                            .map(|e| (name.clone(), e))
1570                            .collect::<Vec<_>>()
1571                    })
1572                })
1573                .collect::<Vec<_>>()
1574                .into_iter()
1575                .map(|h| h.join().unwrap_or_default())
1576                .collect()
1577        });
1578
1579        collection_results.into_iter().flatten().collect()
1580    }
1581
1582    /// Filter by metadata across all collections
1583    pub fn filter_metadata_all(
1584        &self,
1585        filters: &[(String, MetadataFilter)],
1586    ) -> Vec<(String, EntityId)> {
1587        let mut results = Vec::new();
1588        let collections = self.collections.read();
1589
1590        for (name, manager) in collections.iter() {
1591            for id in manager.filter_metadata(filters) {
1592                results.push((name.clone(), id));
1593            }
1594        }
1595
1596        results
1597    }
1598
1599    /// Get statistics
1600    pub fn stats(&self) -> StoreStats {
1601        // Issue #815 — snapshot the manager handles, drop the `collections`
1602        // read lock, then gather per-collection stats off the map lock. The
1603        // readiness probe (via `health()`) walks `stats()`, so holding the
1604        // store lock across a full-store stats scan was part of the same
1605        // wedge that froze `/catalog` during a large replica re-apply.
1606        let pairs: Vec<(String, Arc<SegmentManager>)> = {
1607            let collections = self.collections.read();
1608            collections
1609                .iter()
1610                .map(|(name, mgr)| (name.clone(), Arc::clone(mgr)))
1611                .collect()
1612        };
1613
1614        let mut stats = StoreStats {
1615            collection_count: pairs.len(),
1616            ..Default::default()
1617        };
1618
1619        for (name, manager) in &pairs {
1620            let manager_stats = manager.stats();
1621            stats.total_entities += manager_stats.total_entities;
1622            stats.total_memory_bytes += manager_stats.total_memory_bytes;
1623            stats.collections.insert(name.clone(), manager_stats);
1624        }
1625
1626        stats
1627    }
1628
1629    /// Run maintenance on all collections
1630    pub fn run_maintenance(&self) -> Result<(), StoreError> {
1631        let collections = self.collections.read();
1632        for manager in collections.values() {
1633            manager.run_maintenance()?;
1634        }
1635        Ok(())
1636    }
1637}
1638
1639/// Flatten a JSON value into dot-notation key-value pairs for red_config.
1640fn flatten_config_json(
1641    prefix: &str,
1642    value: &crate::serde_json::Value,
1643    out: &mut Vec<(String, crate::storage::schema::Value)>,
1644) {
1645    use crate::storage::schema::Value;
1646    match value {
1647        crate::serde_json::Value::Object(map) => {
1648            for (k, v) in map {
1649                let key = if prefix.is_empty() {
1650                    k.clone()
1651                } else {
1652                    format!("{prefix}.{k}")
1653                };
1654                flatten_config_json(&key, v, out);
1655            }
1656        }
1657        crate::serde_json::Value::String(s) => {
1658            out.push((prefix.to_string(), Value::text(s.clone())));
1659        }
1660        crate::serde_json::Value::Number(n) => {
1661            if n.fract().abs() < f64::EPSILON {
1662                out.push((prefix.to_string(), Value::UnsignedInteger(*n as u64)));
1663            } else {
1664                out.push((prefix.to_string(), Value::Float(*n)));
1665            }
1666        }
1667        crate::serde_json::Value::Bool(b) => {
1668            out.push((prefix.to_string(), Value::Boolean(*b)));
1669        }
1670        crate::serde_json::Value::Null => {
1671            out.push((prefix.to_string(), Value::Null));
1672        }
1673        crate::serde_json::Value::Array(arr) => {
1674            let json_str = crate::serde_json::to_string(value).unwrap_or_default();
1675            out.push((prefix.to_string(), Value::text(json_str)));
1676        }
1677    }
1678}