Skip to main content

reddb_server/storage/unified/store/
impl_entities.rs

1use super::*;
2
3impl UnifiedStore {
4    pub(crate) fn persist_entities_to_pager(
5        &self,
6        collection: &str,
7        entities: &[UnifiedEntity],
8    ) -> Result<(), StoreError> {
9        self.persist_entities_impl(collection, entities, /* skip_btree= */ false)
10    }
11
12    /// Same as `persist_entities_to_pager` but takes `&[&UnifiedEntity]` so
13    /// callers that already hold references (SQL UPDATE fast path) don't
14    /// have to clone a `Vec<UnifiedEntity>` just to satisfy the signature.
15    pub(crate) fn persist_entity_refs_to_pager(
16        &self,
17        collection: &str,
18        entities: &[&UnifiedEntity],
19    ) -> Result<(), StoreError> {
20        self.persist_entity_refs_impl(collection, entities, /* skip_btree= */ false)
21    }
22
23    /// Reference-taking WAL-only variant — see
24    /// `persist_entities_to_pager_wal_only` for semantics.
25    pub(crate) fn persist_entity_refs_to_pager_wal_only(
26        &self,
27        collection: &str,
28        entities: &[&UnifiedEntity],
29    ) -> Result<(), StoreError> {
30        self.persist_entity_refs_impl(collection, entities, /* skip_btree= */ true)
31    }
32
33    fn persist_entity_refs_impl(
34        &self,
35        collection: &str,
36        entities: &[&UnifiedEntity],
37        skip_btree: bool,
38    ) -> Result<(), StoreError> {
39        if entities.is_empty() {
40            return Ok(());
41        }
42        let Some(pager) = &self.pager else {
43            return Ok(());
44        };
45        let fv = self.format_version();
46        let manager = self
47            .get_collection(collection)
48            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
49        let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
50            .iter()
51            .map(|entity| {
52                let metadata = manager.get_metadata(entity.id);
53                (
54                    entity.id.raw().to_be_bytes().to_vec(),
55                    Self::serialize_entity_record(entity, metadata.as_ref(), fv),
56                )
57            })
58            .collect();
59        serialized.sort_by(|a, b| a.0.cmp(&b.0));
60
61        if !skip_btree {
62            let mut btree_indices = self.btree_indices.write();
63            let btree = btree_indices
64                .entry(collection.to_string())
65                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
66            let root_before = btree.root_page_id();
67            btree.upsert_batch_sorted(&serialized).map_err(|e| {
68                StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
69            })?;
70            let root_after = btree.root_page_id();
71            drop(btree_indices);
72            if root_before != root_after {
73                self.mark_paged_registry_dirty();
74            }
75        }
76        let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
77        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
78            collection: collection.to_string(),
79            records,
80        }])?;
81        Ok(())
82    }
83
84    /// PG-HOT-like UPDATE persist: emit the WAL record so the update
85    /// survives a crash, but skip the in-line B-tree upsert. Reads
86    /// continue to see the new entity because `manager.get()` prefers
87    /// the segment over the B-tree; WAL replay re-applies the upsert
88    /// to the B-tree on recovery so the two views converge before
89    /// any stale B-tree page can be surfaced.
90    ///
91    /// Safe to use only when the caller has already updated the
92    /// segment in-place (so live reads see the new value). Cuts out
93    /// the dominant cost of single-row UPDATEs: per-entity serialize
94    /// + leaf descent + page decompress / compress / write_page.
95    pub(crate) fn persist_entities_to_pager_wal_only(
96        &self,
97        collection: &str,
98        entities: &[UnifiedEntity],
99    ) -> Result<(), StoreError> {
100        self.persist_entities_impl(collection, entities, /* skip_btree= */ true)
101    }
102
103    fn persist_entities_impl(
104        &self,
105        collection: &str,
106        entities: &[UnifiedEntity],
107        skip_btree: bool,
108    ) -> Result<(), StoreError> {
109        if entities.is_empty() {
110            return Ok(());
111        }
112
113        let Some(pager) = &self.pager else {
114            return Ok(());
115        };
116
117        let fv = self.format_version();
118        let manager = self
119            .get_collection(collection)
120            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
121        let mut serialized: Vec<(Vec<u8>, Vec<u8>)> = entities
122            .iter()
123            .map(|entity| {
124                let metadata = manager.get_metadata(entity.id);
125                (
126                    entity.id.raw().to_be_bytes().to_vec(),
127                    Self::serialize_entity_record(entity, metadata.as_ref(), fv),
128                )
129            })
130            .collect();
131        // u64 IDs encoded as big-endian — lex order = numeric order.
132        serialized.sort_by(|a, b| a.0.cmp(&b.0));
133
134        if !skip_btree {
135            let mut btree_indices = self.btree_indices.write();
136            let btree = btree_indices
137                .entry(collection.to_string())
138                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
139            let root_before = btree.root_page_id();
140
141            // Walks each distinct leaf once, applies all in-place overwrites
142            // that belong there under one read+write. Keys that miss or grow
143            // fall back to the per-key `upsert` path internally.
144            btree.upsert_batch_sorted(&serialized).map_err(|e| {
145                StoreError::Io(std::io::Error::other(format!("B-tree upsert error: {}", e)))
146            })?;
147            let root_after = btree.root_page_id();
148            drop(btree_indices);
149            if root_before != root_after {
150                self.mark_paged_registry_dirty();
151            }
152        }
153        // One WAL action covering the whole bulk. Previously each entity
154        // produced its own `UpsertEntityRecord` action, which then became
155        // its own `WalRecord::PageWrite` triple inside
156        // `StoreCommitCoordinator::append_actions` — N WAL records per
157        // bulk. The batched variant is one WAL record per bulk.
158        //
159        // WAL is emitted even in the `skip_btree` variant — recovery
160        // replay reapplies the upsert to the B-tree, so durability
161        // is unchanged.
162        let records: Vec<Vec<u8>> = serialized.into_iter().map(|(_id, record)| record).collect();
163        self.finish_paged_write([StoreWalAction::BulkUpsertEntityRecords {
164            collection: collection.to_string(),
165            records,
166        }])?;
167
168        Ok(())
169    }
170
171    /// Insert a label→entity_id mapping into the graph label index.
172    pub(crate) fn update_graph_label_index(
173        &self,
174        collection: &str,
175        label: &str,
176        entity_id: EntityId,
177    ) {
178        let key = (collection.to_string(), label.to_string());
179        let mut idx = self.graph_label_index.write();
180        idx.entry(key).or_default().push(entity_id);
181    }
182
183    /// Remove a specific entity_id from the graph label index (called on delete).
184    pub(crate) fn remove_from_graph_label_index(&self, collection: &str, entity_id: EntityId) {
185        let mut idx = self.graph_label_index.write();
186        for ((col, _), ids) in idx.iter_mut() {
187            if col == collection {
188                ids.retain(|&id| id != entity_id);
189            }
190        }
191        // Prune empty entries to keep the index compact
192        idx.retain(|_, ids| !ids.is_empty());
193    }
194
195    pub(crate) fn remove_from_graph_label_index_batch(
196        &self,
197        collection: &str,
198        entity_ids: &[EntityId],
199    ) {
200        if entity_ids.is_empty() {
201            return;
202        }
203        let id_set: std::collections::HashSet<EntityId> = entity_ids.iter().copied().collect();
204        let mut idx = self.graph_label_index.write();
205        for ((col, _), ids) in idx.iter_mut() {
206            if col == collection {
207                ids.retain(|id| !id_set.contains(id));
208            }
209        }
210        idx.retain(|_, ids| !ids.is_empty());
211    }
212
213    /// Look up entity IDs for a graph node label across all collections.
214    pub fn lookup_graph_nodes_by_label(&self, label: &str) -> Vec<EntityId> {
215        let idx = self.graph_label_index.read();
216        idx.iter()
217            .filter(|((_, l), _)| l == label)
218            .flat_map(|(_, ids)| ids.iter().copied())
219            .collect()
220    }
221
222    pub fn create_collection(&self, name: impl Into<String>) -> Result<(), StoreError> {
223        let name = name.into();
224        let mut collections = self.collections.write();
225
226        if collections.contains_key(&name) {
227            return Err(StoreError::CollectionExists(name));
228        }
229
230        let manager = SegmentManager::with_config(&name, self.config.manager_config.clone());
231        collections.insert(name.clone(), Arc::new(manager));
232        drop(collections);
233        self.mark_paged_registry_dirty();
234        self.finish_paged_write([StoreWalAction::CreateCollection { name }])?;
235
236        Ok(())
237    }
238
239    /// Get or create a collection
240    pub fn get_or_create_collection(&self, name: impl Into<String>) -> Arc<SegmentManager> {
241        let name = name.into();
242        // Fast path: shared read lock — zero contention for existing collections
243        {
244            let collections = self.collections.read();
245            if let Some(manager) = collections.get(&name) {
246                return Arc::clone(manager);
247            }
248        }
249        // Slow path: exclusive write lock — only when collection is missing
250        let mut collections = self.collections.write();
251        // Double-check after acquiring write lock (another thread may have created it)
252        if let Some(manager) = collections.get(&name) {
253            return Arc::clone(manager);
254        }
255        let manager = Arc::new(SegmentManager::with_config(
256            &name,
257            self.config.manager_config.clone(),
258        ));
259        collections.insert(name, Arc::clone(&manager));
260        self.mark_paged_registry_dirty();
261        manager
262    }
263
264    /// Get a collection
265    pub fn get_collection(&self, name: &str) -> Option<Arc<SegmentManager>> {
266        self.collections.read().get(name).map(Arc::clone)
267    }
268
269    /// Get the context index for cross-structure search.
270    pub fn context_index(&self) -> &ContextIndex {
271        &self.context_index
272    }
273
274    /// Set multiple config KV pairs at once from a JSON tree.
275    /// Keys are flattened with dot-notation: `{"a":{"b":1}}` → `a.b = 1`.
276    pub fn set_config_tree(&self, prefix: &str, json: &crate::serde_json::Value) -> usize {
277        let _ = self.get_or_create_collection("red_config");
278        let mut pairs = Vec::new();
279        flatten_config_json(prefix, json, &mut pairs);
280        let mut saved = 0;
281        for (key, value) in pairs {
282            let entity = UnifiedEntity::new(
283                EntityId::new(0),
284                EntityKind::TableRow {
285                    table: Arc::from("red_config"),
286                    row_id: 0,
287                },
288                EntityData::Row(RowData {
289                    columns: Vec::new(),
290                    named: Some(
291                        [
292                            ("key".to_string(), crate::storage::schema::Value::text(key)),
293                            ("value".to_string(), value),
294                        ]
295                        .into_iter()
296                        .collect(),
297                    ),
298                    schema: None,
299                }),
300            );
301            if self.insert_auto("red_config", entity).is_ok() {
302                saved += 1;
303            }
304        }
305        saved
306    }
307
308    /// Read a single config value from `red_config` by dot-notation key.
309    pub fn get_config(&self, key: &str) -> Option<crate::storage::schema::Value> {
310        let manager = self.get_collection("red_config")?;
311        for entity in manager.query_all(|_| true) {
312            if let EntityData::Row(row) = &entity.data {
313                if let Some(named) = &row.named {
314                    let key_matches = named
315                        .get("key")
316                        .and_then(|v| match v {
317                            crate::storage::schema::Value::Text(s) => Some(s.as_ref() == key),
318                            _ => None,
319                        })
320                        .unwrap_or(false);
321                    if key_matches {
322                        return named.get("value").cloned();
323                    }
324                }
325            }
326        }
327        None
328    }
329
330    /// List all collections
331    pub fn list_collections(&self) -> Vec<String> {
332        self.collections.read().keys().cloned().collect()
333    }
334
335    /// Drop a collection
336    pub fn drop_collection(&self, name: &str) -> Result<(), StoreError> {
337        let manager = {
338            let mut collections = self.collections.write();
339
340            collections
341                .remove(name)
342                .ok_or_else(|| StoreError::CollectionNotFound(name.to_string()))?
343        };
344
345        let entities = manager.query_all(|_| true);
346        let entity_ids: Vec<EntityId> = entities.iter().map(|entity| entity.id).collect();
347
348        for entity_id in &entity_ids {
349            self.context_index.remove_entity(*entity_id);
350            let _ = self.unindex_cross_refs(*entity_id);
351        }
352
353        self.btree_indices.write().remove(name);
354
355        self.entity_cache.retain(|entity_id, (collection, _)| {
356            collection != name && !entity_ids.iter().any(|id| id.raw() == entity_id)
357        });
358
359        self.cross_refs.write().retain(|source_id, refs| {
360            refs.retain(|(target_id, _, target_collection)| {
361                target_collection != name && !entity_ids.iter().any(|id| id == target_id)
362            });
363            !entity_ids.iter().any(|id| id == source_id)
364        });
365
366        self.reverse_refs.write().retain(|target_id, refs| {
367            refs.retain(|(source_id, _, source_collection)| {
368                source_collection != name && !entity_ids.iter().any(|id| id == source_id)
369            });
370            !entity_ids.iter().any(|id| id == target_id)
371        });
372
373        self.mark_paged_registry_dirty();
374        self.finish_paged_write([StoreWalAction::DropCollection {
375            name: name.to_string(),
376        }])?;
377
378        Ok(())
379    }
380
381    /// Insert an entity into a collection
382    pub fn insert(&self, collection: &str, entity: UnifiedEntity) -> Result<EntityId, StoreError> {
383        let manager = self
384            .get_collection(collection)
385            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
386
387        let mut entity = entity;
388        if entity.id.raw() == 0 {
389            entity.id = self.next_entity_id();
390        } else {
391            self.register_entity_id(entity.id);
392        }
393        // Assign per-table sequential row_id if not set
394        if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
395            if *row_id == 0 {
396                *row_id = manager.next_row_id();
397            } else {
398                manager.register_row_id(*row_id);
399            }
400        }
401        // Capture graph node label before entity is moved into the manager
402        let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
403        {
404            Some(node.label.clone())
405        } else {
406            None
407        };
408
409        let id = manager.insert(entity)?;
410        self.register_entity_id(id);
411
412        // Update graph label index for GraphNode entities
413        if let Some(ref label) = graph_node_label {
414            self.update_graph_label_index(collection, label, id);
415        }
416
417        // Also insert into B-tree index if pager is active
418        let mut registry_dirty = false;
419        if let Some(pager) = &self.pager {
420            if let Some(entity) = manager.get(id) {
421                let mut btree_indices = self.btree_indices.write();
422                let btree = btree_indices
423                    .entry(collection.to_string())
424                    .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
425                let root_before = btree.root_page_id();
426
427                let key = id.raw().to_be_bytes();
428                let metadata = manager.get_metadata(id);
429                let value = Self::serialize_entity_record(
430                    &entity,
431                    metadata.as_ref(),
432                    self.format_version(),
433                );
434                // Ignore duplicate key errors (update scenario)
435                let _ = btree.insert(&key, &value);
436                registry_dirty = root_before != btree.root_page_id();
437            }
438        }
439
440        // Index cross-references if enabled
441        if self.config.auto_index_refs {
442            if let Some(entity) = manager.get(id) {
443                self.index_cross_refs(&entity, collection)?;
444            }
445        }
446
447        // Perf: skip WAL-action construction when the store is
448        // pagerless. For in-memory benchmarks this saved another
449        // `manager.get(id)` + `serialize_entity_record` per call.
450        if self.pager.is_some() {
451            let actions = manager
452                .get(id)
453                .map(|entity| {
454                    let metadata = manager.get_metadata(id);
455                    vec![StoreWalAction::upsert_entity(
456                        collection,
457                        &entity,
458                        metadata.as_ref(),
459                        self.format_version(),
460                    )]
461                })
462                .unwrap_or_default();
463            if registry_dirty {
464                self.mark_paged_registry_dirty();
465            }
466            self.finish_paged_write(actions)?;
467        }
468
469        Ok(id)
470    }
471
472    /// Turbo bulk insert — optimized fast path.
473    ///
474    /// Single lock for the entire batch. Skips bloom filter, memtable,
475    /// context index, and cross-ref indexing. B-tree writes are batched.
476    pub fn bulk_insert(
477        &self,
478        collection: &str,
479        mut entities: Vec<UnifiedEntity>,
480    ) -> Result<Vec<EntityId>, StoreError> {
481        // REDDB_BULK_TIMING=1 prints a per-call breakdown of the bulk
482        // insert path to stderr. Off by default — used by the reddb
483        // benchmark harness to locate ingest bottlenecks.
484        let trace = matches!(
485            std::env::var("REDDB_BULK_TIMING").ok().as_deref(),
486            Some("1") | Some("true") | Some("on")
487        );
488        let t_start = std::time::Instant::now();
489        let n = entities.len();
490        let manager = self.get_or_create_collection(collection);
491        let t_get_coll = t_start.elapsed();
492
493        // Assign IDs and per-table row_ids before serialization. Bulk
494        // insert must follow the same global ID semantics as
495        // insert()/insert_auto(). Reserve ID ranges in single fetch_add
496        // calls instead of one-per-entity. The overwhelming common case
497        // is "every entity comes in with id==0 and kind==TableRow with
498        // row_id==0" (the wire bulk insert path). Any entity that
499        // already carries an id or row_id falls back to the individual
500        // register_* path to preserve those semantics exactly.
501        let t0 = std::time::Instant::now();
502        let n_missing_entity_ids = entities.iter().filter(|e| e.id.raw() == 0).count() as u64;
503        let n_missing_row_ids = entities
504            .iter()
505            .filter(|e| matches!(e.kind, EntityKind::TableRow { row_id: 0, .. }))
506            .count() as u64;
507        let mut entity_id_range = if n_missing_entity_ids > 0 {
508            self.reserve_entity_ids(n_missing_entity_ids)
509        } else {
510            0..0
511        };
512        let mut row_id_range = if n_missing_row_ids > 0 {
513            manager.reserve_row_ids(n_missing_row_ids)
514        } else {
515            0..0
516        };
517        for entity in &mut entities {
518            if entity.id.raw() == 0 {
519                let next = entity_id_range
520                    .next()
521                    .expect("reserved entity-id range exhausted");
522                entity.id = EntityId::new(next);
523            } else {
524                self.register_entity_id(entity.id);
525            }
526            if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
527                if *row_id == 0 {
528                    *row_id = row_id_range
529                        .next()
530                        .expect("reserved row-id range exhausted");
531                } else {
532                    manager.register_row_id(*row_id);
533                }
534            }
535        }
536        let t_assign_ids = t0.elapsed();
537
538        // Capture graph node labels before entities are moved into the segment manager
539        let graph_labels: Vec<Option<(String, EntityId)>> = entities
540            .iter()
541            .map(|e| {
542                if let EntityKind::GraphNode(ref node) = e.kind {
543                    Some((node.label.clone(), e.id))
544                } else {
545                    None
546                }
547            })
548            .collect();
549
550        // Pre-serialize for B-tree while we still have references.
551        // `serialize_entity_record` is pure; with `n >= 1024` the
552        // rayon dispatch cost is amortised and the 16-core serialize
553        // fan-out becomes the biggest single win on insert_bulk. For
554        // smaller batches, stay serial — micro-batches pay more in
555        // work-stealing overhead than they save.
556        let t0 = std::time::Instant::now();
557        let serialized: Option<Vec<(Vec<u8>, Vec<u8>)>> = if self.pager.is_some() {
558            let fv = self.format_version();
559            let serial_map = |e: &UnifiedEntity| {
560                (
561                    e.id.raw().to_be_bytes().to_vec(),
562                    Self::serialize_entity_record(e, None, fv),
563                )
564            };
565            // Gate chosen to match the bench's `BULK_BATCH_SIZE=1000`.
566            // Rayon's dispatch overhead is ~30µs/batch — on a 15-col
567            // row serializing for ~40µs the break-even is around
568            // 200-300 rows on a 16-core host. Keep a safety margin
569            // at 512.
570            if entities.len() >= 512 {
571                use rayon::prelude::*;
572                Some(entities.par_iter().map(serial_map).collect())
573            } else {
574                Some(entities.iter().map(serial_map).collect())
575            }
576        } else {
577            None
578        };
579        let t_serialize = t0.elapsed();
580
581        // Move entities into segment
582        let t0 = std::time::Instant::now();
583        let ids = manager.bulk_insert(entities)?;
584        let t_manager = t0.elapsed();
585        for id in &ids {
586            self.register_entity_id(*id);
587        }
588
589        // Update graph label index for bulk-inserted GraphNode entities
590        for (label, entity_id) in graph_labels.iter().flatten() {
591            self.update_graph_label_index(collection, label, *entity_id);
592        }
593
594        // REDDB_BULK_SKIP_PERSIST_UNSAFE=1 skips the persistent B-tree index
595        // during bulk ingest.
596        //
597        // UNSAFE: for ephemeral benchmark containers ONLY.
598        // This flag is silently ignored when a pager (durable storage) is active.
599        // In persistent mode, bulk inserts ALWAYS write to the B-tree so the data
600        // survives a cold restart without any manual rebuild step.
601        //
602        // The flag is only honoured when self.pager is None (in-memory / ephemeral).
603        let skip_btree_requested = matches!(
604            std::env::var("REDDB_BULK_SKIP_PERSIST_UNSAFE")
605                .ok()
606                .as_deref(),
607            Some("1") | Some("true") | Some("on")
608        );
609        // Honour the flag only when there is no durable pager.
610        // If a pager exists we are in persistent mode → always persist.
611        let skip_btree = skip_btree_requested && self.pager.is_none();
612        if skip_btree_requested && !skip_btree {
613            // Flag was set but we ignored it because we have a real pager.
614            static IGNORED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
615            IGNORED.get_or_init(|| {
616                tracing::warn!(
617                    "REDDB_BULK_SKIP_PERSIST_UNSAFE set but durable pager is \
618                     active — flag ignored; bulk inserts will be persisted normally"
619                );
620            });
621        } else if skip_btree {
622            // Ephemeral mode and flag is active — warn once.
623            static WARNED: std::sync::OnceLock<()> = std::sync::OnceLock::new();
624            WARNED.get_or_init(|| {
625                tracing::warn!(
626                    "REDDB_BULK_SKIP_PERSIST_UNSAFE set (ephemeral/no-pager mode) — \
627                     bulk inserts NOT durable; data will be lost on restart"
628                );
629            });
630        }
631
632        // Batch B-tree write from pre-serialized data.
633        // Uses sorted bulk insert: walks to a leaf once, appends many entries,
634        // writes each leaf exactly once per batch — O(N) instead of O(N²).
635        let mut t_btree_lock = std::time::Duration::ZERO;
636        let mut t_btree_insert = std::time::Duration::ZERO;
637        let mut t_flush = std::time::Duration::ZERO;
638        if !skip_btree {
639            if let (Some(pager), Some(batch)) = (&self.pager, serialized.as_ref()) {
640                let t0 = std::time::Instant::now();
641                let mut btree_indices = self.btree_indices.write();
642                let btree = btree_indices
643                    .entry(collection.to_string())
644                    .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
645                let root_before = btree.root_page_id();
646                t_btree_lock = t0.elapsed();
647
648                let t0 = std::time::Instant::now();
649                let _ = btree.bulk_insert_sorted(batch);
650                t_btree_insert = t0.elapsed();
651                let registry_dirty = root_before != btree.root_page_id();
652
653                let t0 = std::time::Instant::now();
654                if registry_dirty {
655                    self.mark_paged_registry_dirty();
656                }
657                t_flush = t0.elapsed();
658            }
659        }
660
661        // Fold 25k per-entity `UpsertEntityRecord` actions into one
662        // `BulkUpsertEntityRecords` — same on-disk semantics (replay
663        // applies each record by id), one `WalRecord::PageWrite`
664        // instead of N, and `record.clone()` drops out because we
665        // consume `serialized`.
666        let actions = serialized
667            .map(|batch| {
668                let records: Vec<Vec<u8>> =
669                    batch.into_iter().map(|(_key, record)| record).collect();
670                vec![StoreWalAction::BulkUpsertEntityRecords {
671                    collection: collection.to_string(),
672                    records,
673                }]
674            })
675            .unwrap_or_default();
676        self.finish_paged_write(actions)?;
677
678        if trace {
679            tracing::debug!(
680                n,
681                total = ?t_start.elapsed(),
682                get_coll = ?t_get_coll,
683                assign = ?t_assign_ids,
684                serialize = ?t_serialize,
685                manager = ?t_manager,
686                btree_lock = ?t_btree_lock,
687                btree = ?t_btree_insert,
688                flush = ?t_flush,
689                "bulk_insert timing"
690            );
691        }
692
693        Ok(ids)
694    }
695
696    /// Insert an entity, creating collection if needed
697    pub fn insert_auto(
698        &self,
699        collection: &str,
700        entity: UnifiedEntity,
701    ) -> Result<EntityId, StoreError> {
702        let manager = self.get_or_create_collection(collection);
703        let mut entity = entity;
704        if entity.id.raw() == 0 {
705            entity.id = self.next_entity_id();
706        } else {
707            self.register_entity_id(entity.id);
708        }
709        // Assign per-table sequential row_id if not set
710        if let EntityKind::TableRow { ref mut row_id, .. } = entity.kind {
711            if *row_id == 0 {
712                *row_id = manager.next_row_id();
713            } else {
714                manager.register_row_id(*row_id);
715            }
716        }
717
718        // Capture graph node label before entity is moved into the manager
719        let graph_node_label: Option<String> = if let EntityKind::GraphNode(ref node) = entity.kind
720        {
721            Some(node.label.clone())
722        } else {
723            None
724        };
725        // Index into context index before consuming the entity
726        self.context_index.index_entity(collection, &entity);
727
728        // Pre-serialize the entity record (B-tree image == WAL action body)
729        // and index cross-refs while we still own the entity. Mirrors the
730        // bulk_insert path (see line 559-580 above): segment.insert assigns
731        // a fresh `sequence_id`, but the bulk path already accepts a
732        // sequence_id=0 on-disk image (replay calls manager.insert which
733        // re-assigns sequence_id from the segment's allocator), so the
734        // single-row path can do the same. Fresh inserts never carry
735        // metadata — set_metadata is a separate call — so we pass None,
736        // matching how the bulk path serializes records.
737        //
738        // Eliminates the post-insert `manager.get(id)` clone of the just-
739        // inserted UnifiedEntity (HashMap<String, Value> + per-field
740        // Strings). Per `docs/perf/insert_sequential-2026-05-05.md` P2 this
741        // was the dominant clone in the insert_sequential hot path.
742        let id_for_serialize = entity.id;
743        let serialized_record: Option<Vec<u8>> = if self.pager.is_some() {
744            Some(Self::serialize_entity_record(
745                &entity,
746                None,
747                self.format_version(),
748            ))
749        } else {
750            None
751        };
752        if self.config.auto_index_refs {
753            self.index_cross_refs(&entity, collection)?;
754        }
755
756        let id = manager.insert(entity)?;
757        debug_assert_eq!(id, id_for_serialize);
758        // `register_entity_id` already advances the atomic counter on
759        // the allocation path above (`self.next_entity_id()`), so the
760        // second call here is a no-op CAS loop on the hot path. Only
761        // needed for the caller-supplied-id branch which happens via
762        // the `register_entity_id` call on line 573.
763
764        // Update graph label index for GraphNode entities
765        if let Some(ref label) = graph_node_label {
766            self.update_graph_label_index(collection, label, id);
767        }
768
769        let mut registry_dirty = false;
770        if let (Some(_pager), Some(record)) = (&self.pager, serialized_record.as_ref()) {
771            if let Some(btree) = self.get_or_create_btree(collection) {
772                let root_before = btree.root_page_id();
773
774                let key = id.raw().to_be_bytes();
775                btree.insert(&key, record).map_err(|e| {
776                    StoreError::Io(std::io::Error::other(format!(
777                        "B-tree insert error while inserting '{collection}'/{id}: {e}"
778                    )))
779                })?;
780                registry_dirty = root_before != btree.root_page_id();
781            }
782        }
783
784        // Perf: pagerless → skip WAL-action construction (saves a
785        // third manager.get + entity serialize per insert). For
786        // in-memory runtimes finish_paged_write is a no-op.
787        if self.pager.is_some() {
788            let actions = serialized_record
789                .map(|record| {
790                    vec![StoreWalAction::UpsertEntityRecord {
791                        collection: collection.to_string(),
792                        record,
793                    }]
794                })
795                .unwrap_or_default();
796            if registry_dirty {
797                self.mark_paged_registry_dirty();
798            }
799            self.finish_paged_write(actions)?;
800        }
801        Ok(id)
802    }
803
804    /// Get an entity from a collection
805    ///
806    /// Prefers the live SegmentManager view so reads after update/delete observe
807    /// the current in-memory state even when the paged B-tree image has not been
808    /// refreshed yet. Falls back to the B-tree image for recovery-oriented reads.
809    pub fn get(&self, collection: &str, id: EntityId) -> Option<UnifiedEntity> {
810        // Prefer the live manager state to avoid stale reads after manager.update().
811        if let Some(entity) = self
812            .get_collection(collection)
813            .and_then(|manager| manager.get(id))
814        {
815            return Some(entity);
816        }
817
818        // Fall back to the paged B-tree image if the manager does not currently hold the row.
819        if self.pager.is_some() {
820            let btree_indices = self.btree_indices.read();
821            if let Some(btree) = btree_indices.get(collection) {
822                let key = id.raw().to_be_bytes();
823                if let Ok(Some(value)) = btree.get(&key) {
824                    if let Ok((entity, _)) =
825                        Self::deserialize_entity_record(&value, self.format_version())
826                    {
827                        return Some(entity);
828                    }
829                }
830            }
831        }
832
833        None
834    }
835
836    /// Batch-fetch multiple entities from the same collection in minimal lock acquisitions.
837    ///
838    /// Preferred over N individual `get()` calls in indexed-scan loops (sorted index,
839    /// bitmap, hash). Reduces lock acquisitions from N×3 to 2-3 total.
840    /// Preserves input order: `result[i]` corresponds to `ids[i]`.
841    pub fn get_batch(&self, collection: &str, ids: &[EntityId]) -> Vec<Option<UnifiedEntity>> {
842        match self.get_collection(collection) {
843            Some(manager) => manager.get_many(ids),
844            None => vec![None; ids.len()],
845        }
846    }
847
848    /// Get an entity from any collection
849    pub fn get_any(&self, id: EntityId) -> Option<(String, UnifiedEntity)> {
850        // Sharded LRU probe: hits / misses are counted by `EntityCache::get`,
851        // so external observers can read `entity_cache_hit_rate()` without
852        // any extra wiring on the call site.
853        if let Some(cached) = self.entity_cache.get(id.raw()) {
854            return Some(cached);
855        }
856
857        // Cache miss — fall back to the full collection scan, then memoise.
858        let collections = self.collections.read();
859        for (name, manager) in collections.iter() {
860            if let Some(entity) = manager.get(id) {
861                let result = (name.clone(), entity);
862                // Drop the collections read guard before taking any cache
863                // lock — the cache shards are independent, but releasing
864                // here keeps `collections` contention low.
865                drop(collections);
866                self.entity_cache.insert(id.raw(), result.clone());
867                return Some(result);
868            }
869        }
870        None
871    }
872
873    /// Hit rate of the store's entity cache (`get_any` lookups).
874    ///
875    /// Returns `None` until the cache has served at least one lookup.
876    /// Exposed for observability — e.g. dashboards distinguishing graph
877    /// workloads (high hit rate) from OLTP DML (≈ 0 % hit rate).
878    pub fn entity_cache_hit_rate(&self) -> Option<f64> {
879        self.entity_cache.hit_rate()
880    }
881
882    /// Snapshot of cache hit / miss / eviction counters and current size.
883    pub fn entity_cache_stats(&self) -> super::super::entity_cache::EntityCacheStats {
884        self.entity_cache.stats()
885    }
886
887    /// Delete an entity
888    pub fn delete(&self, collection: &str, id: EntityId) -> Result<bool, StoreError> {
889        // Invalidate entity cache (read-lock probe; no write lock when cold).
890        self.entity_cache.remove(id.raw());
891        let manager = self
892            .get_collection(collection)
893            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
894
895        let deleted = manager.delete(id)?;
896        if !deleted {
897            return Ok(false);
898        }
899
900        // Remove from B-tree index if active
901        let mut registry_dirty = false;
902        if self.pager.is_some() {
903            let btree_indices = self.btree_indices.read();
904            if let Some(btree) = btree_indices.get(collection) {
905                let root_before = btree.root_page_id();
906                let key = id.raw().to_be_bytes();
907                let _ = btree.delete(&key);
908                registry_dirty = root_before != btree.root_page_id();
909            }
910        }
911
912        // Remove cross-references
913        self.unindex_cross_refs(id)?;
914
915        // Remove from graph label index
916        self.remove_from_graph_label_index(collection, id);
917
918        if registry_dirty {
919            self.mark_paged_registry_dirty();
920        }
921        self.finish_paged_write([StoreWalAction::DeleteEntityRecord {
922            collection: collection.to_string(),
923            entity_id: id.raw(),
924        }])?;
925
926        Ok(true)
927    }
928
929    pub fn delete_batch(
930        &self,
931        collection: &str,
932        ids: &[EntityId],
933    ) -> Result<Vec<EntityId>, StoreError> {
934        if ids.is_empty() {
935            return Ok(Vec::new());
936        }
937
938        // Sharded invalidation. The bounded LRU's shard write lock is
939        // only acquired when the shard actually carries one of these ids,
940        // so the bench-typical zero-hit `delete_sequential` workload never
941        // takes a write lock here at all.
942        self.entity_cache.remove_many(ids.iter().map(|id| id.raw()));
943
944        let manager = self
945            .get_collection(collection)
946            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
947
948        let deleted_ids = manager.delete_batch(ids)?;
949        if deleted_ids.is_empty() {
950            return Ok(deleted_ids);
951        }
952
953        let mut registry_dirty = false;
954        if self.pager.is_some() {
955            let btree_indices = self.btree_indices.read();
956            if let Some(btree) = btree_indices.get(collection) {
957                let root_before = btree.root_page_id();
958                for id in &deleted_ids {
959                    let key = id.raw().to_be_bytes();
960                    let _ = btree.delete(&key);
961                }
962                registry_dirty = root_before != btree.root_page_id();
963            }
964        }
965
966        self.unindex_cross_refs_batch(&deleted_ids)?;
967        self.remove_from_graph_label_index_batch(collection, &deleted_ids);
968        if registry_dirty {
969            self.mark_paged_registry_dirty();
970        }
971        let actions = deleted_ids
972            .iter()
973            .map(|id| StoreWalAction::DeleteEntityRecord {
974                collection: collection.to_string(),
975                entity_id: id.raw(),
976            })
977            .collect::<Vec<_>>();
978        self.finish_paged_write(actions)?;
979
980        Ok(deleted_ids)
981    }
982
983    /// Set metadata for an entity
984    pub fn set_metadata(
985        &self,
986        collection: &str,
987        id: EntityId,
988        metadata: Metadata,
989    ) -> Result<(), StoreError> {
990        let manager = self
991            .get_collection(collection)
992            .ok_or_else(|| StoreError::CollectionNotFound(collection.to_string()))?;
993
994        manager.set_metadata(id, metadata)?;
995        if let Some(entity) = manager.get(id) {
996            self.persist_entities_to_pager(collection, std::slice::from_ref(&entity))?;
997        }
998        Ok(())
999    }
1000
1001    /// Get metadata for an entity
1002    pub fn get_metadata(&self, collection: &str, id: EntityId) -> Option<Metadata> {
1003        self.get_collection(collection)?.get_metadata(id)
1004    }
1005
1006    /// Add a cross-reference between entities
1007    pub fn add_cross_ref(
1008        &self,
1009        source_collection: &str,
1010        source_id: EntityId,
1011        target_collection: &str,
1012        target_id: EntityId,
1013        ref_type: RefType,
1014        weight: f32,
1015    ) -> Result<(), StoreError> {
1016        // Check source exists
1017        let source_manager = self
1018            .get_collection(source_collection)
1019            .ok_or_else(|| StoreError::CollectionNotFound(source_collection.to_string()))?;
1020
1021        if source_manager.get(source_id).is_none() {
1022            return Err(StoreError::EntityNotFound(source_id));
1023        }
1024
1025        // Check target exists
1026        let target_manager = self
1027            .get_collection(target_collection)
1028            .ok_or_else(|| StoreError::CollectionNotFound(target_collection.to_string()))?;
1029
1030        if target_manager.get(target_id).is_none() {
1031            return Err(StoreError::EntityNotFound(target_id));
1032        }
1033
1034        // Check limits
1035        let current_refs = self
1036            .cross_refs
1037            .read()
1038            .get(&source_id)
1039            .map_or(0, |v| v.len());
1040
1041        if current_refs >= self.config.max_cross_refs {
1042            return Err(StoreError::TooManyRefs(source_id));
1043        }
1044
1045        let mut registry_dirty = false;
1046        {
1047            let mut forward = self.cross_refs.write();
1048            let refs = forward.entry(source_id).or_default();
1049            let inserted = !refs.iter().any(|(id, kind, coll)| {
1050                *id == target_id && *kind == ref_type && coll == target_collection
1051            });
1052            if inserted {
1053                refs.push((target_id, ref_type, target_collection.to_string()));
1054                registry_dirty = true;
1055            }
1056        }
1057
1058        {
1059            let mut reverse = self.reverse_refs.write();
1060            let refs = reverse.entry(target_id).or_default();
1061            let inserted = !refs.iter().any(|(id, kind, coll)| {
1062                *id == source_id && *kind == ref_type && coll == source_collection
1063            });
1064            if inserted {
1065                refs.push((source_id, ref_type, source_collection.to_string()));
1066                registry_dirty = true;
1067            }
1068        }
1069
1070        if let Some(mut entity) = source_manager.get(source_id) {
1071            if !entity.cross_refs().iter().any(|xref| {
1072                xref.target == target_id
1073                    && xref.ref_type == ref_type
1074                    && xref.target_collection == target_collection
1075            }) {
1076                let cross_ref = CrossRef::with_weight(
1077                    source_id,
1078                    target_id,
1079                    target_collection,
1080                    ref_type,
1081                    weight,
1082                );
1083                entity.add_cross_ref(cross_ref);
1084                let _ = source_manager.update(entity.clone());
1085                registry_dirty = true;
1086                self.persist_entities_to_pager(source_collection, std::slice::from_ref(&entity))?;
1087            }
1088        }
1089
1090        if registry_dirty {
1091            self.mark_paged_registry_dirty();
1092            if matches!(
1093                self.config.durability_mode,
1094                crate::api::DurabilityMode::Strict
1095            ) {
1096                self.flush_paged_state()?;
1097            }
1098        }
1099
1100        Ok(())
1101    }
1102
1103    /// Get cross-references from an entity
1104    pub fn get_refs_from(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1105        self.cross_refs.read().get(&id).cloned().unwrap_or_default()
1106    }
1107
1108    /// Get cross-references to an entity
1109    pub fn get_refs_to(&self, id: EntityId) -> Vec<(EntityId, RefType, String)> {
1110        self.reverse_refs
1111            .read()
1112            .get(&id)
1113            .cloned()
1114            .unwrap_or_default()
1115    }
1116
1117    /// Expand cross-references to get related entities
1118    pub fn expand_refs(
1119        &self,
1120        id: EntityId,
1121        depth: u32,
1122        ref_types: Option<&[RefType]>,
1123    ) -> Vec<(UnifiedEntity, u32, RefType)> {
1124        let mut results = Vec::new();
1125        let mut visited = std::collections::HashSet::new();
1126        visited.insert(id);
1127
1128        self.expand_refs_recursive(id, depth, ref_types, &mut visited, &mut results, 1);
1129
1130        results
1131    }
1132
1133    fn expand_refs_recursive(
1134        &self,
1135        id: EntityId,
1136        max_depth: u32,
1137        ref_types: Option<&[RefType]>,
1138        visited: &mut std::collections::HashSet<EntityId>,
1139        results: &mut Vec<(UnifiedEntity, u32, RefType)>,
1140        current_depth: u32,
1141    ) {
1142        if current_depth > max_depth {
1143            return;
1144        }
1145
1146        for (target_id, ref_type, target_collection) in self.get_refs_from(id) {
1147            if visited.contains(&target_id) {
1148                continue;
1149            }
1150
1151            if let Some(types) = ref_types {
1152                if !types.contains(&ref_type) {
1153                    continue;
1154                }
1155            }
1156
1157            visited.insert(target_id);
1158
1159            if let Some(entity) = self.get(&target_collection, target_id) {
1160                results.push((entity, current_depth, ref_type));
1161
1162                // Recurse
1163                self.expand_refs_recursive(
1164                    target_id,
1165                    max_depth,
1166                    ref_types,
1167                    visited,
1168                    results,
1169                    current_depth + 1,
1170                );
1171            }
1172        }
1173    }
1174
1175    /// Index cross-references from an entity
1176    pub(crate) fn index_cross_refs(
1177        &self,
1178        entity: &UnifiedEntity,
1179        collection: &str,
1180    ) -> Result<(), StoreError> {
1181        let mut registry_dirty = false;
1182        for cross_ref in entity.cross_refs() {
1183            if cross_ref.target_collection.is_empty() {
1184                continue;
1185            }
1186            {
1187                let mut forward = self.cross_refs.write();
1188                let refs = forward.entry(cross_ref.source).or_default();
1189                let inserted = !refs.iter().any(|(id, kind, coll)| {
1190                    *id == cross_ref.target
1191                        && *kind == cross_ref.ref_type
1192                        && coll == &cross_ref.target_collection
1193                });
1194                if inserted {
1195                    refs.push((
1196                        cross_ref.target,
1197                        cross_ref.ref_type,
1198                        cross_ref.target_collection.clone(),
1199                    ));
1200                    registry_dirty = true;
1201                }
1202            }
1203
1204            {
1205                let mut reverse = self.reverse_refs.write();
1206                let refs = reverse.entry(cross_ref.target).or_default();
1207                let inserted = !refs.iter().any(|(id, kind, coll)| {
1208                    *id == cross_ref.source && *kind == cross_ref.ref_type && coll == collection
1209                });
1210                if inserted {
1211                    refs.push((cross_ref.source, cross_ref.ref_type, collection.to_string()));
1212                    registry_dirty = true;
1213                }
1214            }
1215        }
1216
1217        if registry_dirty {
1218            self.mark_paged_registry_dirty();
1219        }
1220
1221        Ok(())
1222    }
1223
1224    /// Remove cross-references for an entity
1225    pub(crate) fn unindex_cross_refs(&self, id: EntityId) -> Result<(), StoreError> {
1226        // Remove forward refs
1227        self.cross_refs.write().remove(&id);
1228
1229        // Remove from reverse refs (scan all)
1230        let mut reverse = self.reverse_refs.write();
1231        for refs in reverse.values_mut() {
1232            refs.retain(|(source, _, _)| *source != id);
1233        }
1234        reverse.remove(&id);
1235        self.mark_paged_registry_dirty();
1236
1237        Ok(())
1238    }
1239
1240    pub(crate) fn unindex_cross_refs_batch(&self, ids: &[EntityId]) -> Result<(), StoreError> {
1241        if ids.is_empty() {
1242            return Ok(());
1243        }
1244
1245        let id_set: std::collections::HashSet<EntityId> = ids.iter().copied().collect();
1246
1247        // Read-only probe: under steady-state delete workloads most rows have
1248        // no cross-refs at all, so both maps are entirely cold for the batch.
1249        // Acquiring the write lock to scan `reverse_refs.values_mut()` over an
1250        // unrelated dictionary was the dominant cost in #85's `delete_sequential`
1251        // profile. The read-lock probe below is O(|ids|) hashmap lookups against
1252        // each map, vs. O(|reverse_refs|) for the write-path scan it replaces.
1253        //
1254        // Decision: skip the write path iff
1255        //   - no deleted id is a key in `cross_refs`  (no outbound refs to drop,
1256        //     and so no `(deleted_id, _, _)` source entry exists anywhere in
1257        //     `reverse_refs.values()` either — the two maps are kept in sync by
1258        //     `add_cross_ref`), AND
1259        //   - no deleted id is a key in `reverse_refs` (no inbound refs to drop).
1260        let needs_forward_cleanup = {
1261            let forward = self.cross_refs.read();
1262            id_set.iter().any(|id| forward.contains_key(id))
1263        };
1264        let needs_reverse_cleanup = {
1265            let reverse = self.reverse_refs.read();
1266            id_set.iter().any(|id| reverse.contains_key(id))
1267        };
1268
1269        if !needs_forward_cleanup && !needs_reverse_cleanup {
1270            self.unindex_cross_refs_fast_path
1271                .fetch_add(1, Ordering::Relaxed);
1272            return Ok(());
1273        }
1274
1275        if needs_forward_cleanup {
1276            let mut forward = self.cross_refs.write();
1277            for id in &id_set {
1278                forward.remove(id);
1279            }
1280        }
1281
1282        if needs_reverse_cleanup || needs_forward_cleanup {
1283            // The inner `values_mut()` scan is only needed when at least one
1284            // deleted id was a *source* somewhere — which, by the invariant
1285            // above, can only happen when it had outbound forward refs.
1286            let mut reverse = self.reverse_refs.write();
1287            if needs_forward_cleanup {
1288                for refs in reverse.values_mut() {
1289                    refs.retain(|(source, _, _)| !id_set.contains(source));
1290                }
1291            }
1292            reverse.retain(|target, refs| !id_set.contains(target) && !refs.is_empty());
1293        }
1294        self.mark_paged_registry_dirty();
1295
1296        Ok(())
1297    }
1298
1299    /// Test/observability hook: number of times `unindex_cross_refs_batch`
1300    /// took the read-only fast path. Used to pin the early-exit in tests
1301    /// and as a cheap signal in delete-heavy benchmarks.
1302    pub fn unindex_cross_refs_fast_path_hits(&self) -> u64 {
1303        self.unindex_cross_refs_fast_path.load(Ordering::Relaxed)
1304    }
1305
1306    /// Query across all collections with a filter
1307    pub fn query_all<F>(&self, filter: F) -> Vec<(String, UnifiedEntity)>
1308    where
1309        F: Fn(&UnifiedEntity) -> bool + Clone + Send + Sync,
1310    {
1311        let collections = self.collections.read();
1312        let pairs: Vec<_> = collections.iter().collect();
1313
1314        let use_parallel = pairs.len() > 1 && crate::runtime::SystemInfo::should_parallelize();
1315        if !use_parallel {
1316            // Single collection — no parallelism overhead
1317            return pairs
1318                .into_iter()
1319                .flat_map(|(name, mgr)| {
1320                    mgr.query_all(filter.clone())
1321                        .into_iter()
1322                        .map(move |e| (name.clone(), e))
1323                })
1324                .collect();
1325        }
1326
1327        // Multiple collections — scan in parallel
1328        let filter_ref = &filter;
1329        let collection_results: Vec<Vec<(String, UnifiedEntity)>> = std::thread::scope(|s| {
1330            pairs
1331                .iter()
1332                .map(|(name, manager)| {
1333                    let name = (*name).clone();
1334                    s.spawn(move || {
1335                        manager
1336                            .query_all(|e| filter_ref(e))
1337                            .into_iter()
1338                            .map(|e| (name.clone(), e))
1339                            .collect::<Vec<_>>()
1340                    })
1341                })
1342                .collect::<Vec<_>>()
1343                .into_iter()
1344                .map(|h| h.join().unwrap_or_default())
1345                .collect()
1346        });
1347
1348        collection_results.into_iter().flatten().collect()
1349    }
1350
1351    /// Filter by metadata across all collections
1352    pub fn filter_metadata_all(
1353        &self,
1354        filters: &[(String, MetadataFilter)],
1355    ) -> Vec<(String, EntityId)> {
1356        let mut results = Vec::new();
1357        let collections = self.collections.read();
1358
1359        for (name, manager) in collections.iter() {
1360            for id in manager.filter_metadata(filters) {
1361                results.push((name.clone(), id));
1362            }
1363        }
1364
1365        results
1366    }
1367
1368    /// Get statistics
1369    pub fn stats(&self) -> StoreStats {
1370        let collections = self.collections.read();
1371
1372        let mut stats = StoreStats {
1373            collection_count: collections.len(),
1374            ..Default::default()
1375        };
1376
1377        for (name, manager) in collections.iter() {
1378            let manager_stats = manager.stats();
1379            stats.total_entities += manager_stats.total_entities;
1380            stats.total_memory_bytes += manager_stats.total_memory_bytes;
1381            stats.collections.insert(name.clone(), manager_stats);
1382        }
1383
1384        stats
1385    }
1386
1387    /// Run maintenance on all collections
1388    pub fn run_maintenance(&self) -> Result<(), StoreError> {
1389        let collections = self.collections.read();
1390        for manager in collections.values() {
1391            manager.run_maintenance()?;
1392        }
1393        Ok(())
1394    }
1395}
1396
1397/// Flatten a JSON value into dot-notation key-value pairs for red_config.
1398fn flatten_config_json(
1399    prefix: &str,
1400    value: &crate::serde_json::Value,
1401    out: &mut Vec<(String, crate::storage::schema::Value)>,
1402) {
1403    use crate::storage::schema::Value;
1404    match value {
1405        crate::serde_json::Value::Object(map) => {
1406            for (k, v) in map {
1407                let key = if prefix.is_empty() {
1408                    k.clone()
1409                } else {
1410                    format!("{prefix}.{k}")
1411                };
1412                flatten_config_json(&key, v, out);
1413            }
1414        }
1415        crate::serde_json::Value::String(s) => {
1416            out.push((prefix.to_string(), Value::text(s.clone())));
1417        }
1418        crate::serde_json::Value::Number(n) => {
1419            if n.fract().abs() < f64::EPSILON {
1420                out.push((prefix.to_string(), Value::UnsignedInteger(*n as u64)));
1421            } else {
1422                out.push((prefix.to_string(), Value::Float(*n)));
1423            }
1424        }
1425        crate::serde_json::Value::Bool(b) => {
1426            out.push((prefix.to_string(), Value::Boolean(*b)));
1427        }
1428        crate::serde_json::Value::Null => {
1429            out.push((prefix.to_string(), Value::Null));
1430        }
1431        crate::serde_json::Value::Array(arr) => {
1432            let json_str = crate::serde_json::to_string(value).unwrap_or_default();
1433            out.push((prefix.to_string(), Value::text(json_str)));
1434        }
1435    }
1436}