Skip to main content

reddb_server/storage/unified/store/
impl_pages.rs

1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5const ENTITY_RECORD_MAGIC: &[u8; 4] = b"RER1";
6
7// ── Pager-meta overflow chain (gh-477) ──────────────────────────────────────
8// When the serialized collection registry + cross-refs exceed a single page,
9// page 1 carries a "RDM3" wrapper header pointing at an overflow chain of
10// `PageType::Overflow` pages. Single-page metadata keeps the historical
11// bit-identical layout (`METADATA_MAGIC = "RDM2"` written directly at the
12// content offset).
13//
14// Page 1 (overflow form), starting at `HEADER_SIZE`:
15//   [0..4]   magic = "RDM3"
16//   [4..8]   format_version (u32, mirrors inner payload version for debug)
17//   [8..12]  total_payload_bytes (u32)
18//   [12..16] next_overflow_page_id (u32, > 0)
19//   [16..]   first payload chunk (up to META_V3_FIRST_PAYLOAD_CAP bytes)
20//
21// Overflow continuation page, starting at `HEADER_SIZE`:
22//   [0..4]   next_overflow_page_id (u32, 0 if last)
23//   [4..8]   chunk_bytes (u32)
24//   [8..]    chunk payload (up to META_V3_OVERFLOW_PAYLOAD_CAP bytes)
25const METADATA_OVERFLOW_MAGIC: &[u8; 4] = b"RDM3";
26const META_PAGE_CONTENT_CAP: usize =
27    crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE;
28const META_V3_PAGE1_HEADER: usize = 16;
29const META_V3_OVERFLOW_HEADER: usize = 8;
30const META_V3_FIRST_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_PAGE1_HEADER;
31const META_V3_OVERFLOW_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_OVERFLOW_HEADER;
32
33fn free_existing_overflow_chain(pager: &Pager) -> Result<(), PagerError> {
34    let cs = crate::storage::engine::HEADER_SIZE;
35    let page = match pager.read_page(1) {
36        Ok(p) => p,
37        Err(_) => return Ok(()),
38    };
39    let bytes = page.as_bytes();
40    if bytes.len() < cs + META_V3_PAGE1_HEADER {
41        return Ok(());
42    }
43    if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
44        return Ok(());
45    }
46    let mut next = u32::from_le_bytes([
47        bytes[cs + 12],
48        bytes[cs + 13],
49        bytes[cs + 14],
50        bytes[cs + 15],
51    ]);
52    while next != 0 {
53        let ov = match pager.read_page(next) {
54            Ok(p) => p,
55            Err(_) => break,
56        };
57        let ob = ov.as_bytes();
58        let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
59        let _ = pager.free_page(next);
60        next = nn;
61    }
62    Ok(())
63}
64
65fn build_meta_page1_with_overflow(
66    pager: &Pager,
67    meta_data: &[u8],
68) -> Result<crate::storage::engine::Page, PagerError> {
69    use crate::storage::engine::{Page, PageType, HEADER_SIZE};
70    free_existing_overflow_chain(pager)?;
71
72    let mut page1 = Page::new(PageType::Header, 1);
73    let cs = HEADER_SIZE;
74
75    if meta_data.len() <= META_PAGE_CONTENT_CAP {
76        // Single-page: bit-identical to the historical layout.
77        let buf = page1.as_bytes_mut();
78        buf[cs..cs + meta_data.len()].copy_from_slice(meta_data);
79        return Ok(page1);
80    }
81
82    // Multi-page overflow form. Split the inner payload into the first chunk
83    // (held on page 1) followed by zero-or-more continuation chunks chained
84    // through `PageType::Overflow` pages.
85    let first_chunk = &meta_data[..META_V3_FIRST_PAYLOAD_CAP];
86    let mut tail = &meta_data[META_V3_FIRST_PAYLOAD_CAP..];
87    let mut chunks: Vec<&[u8]> = Vec::new();
88    while !tail.is_empty() {
89        let take = tail.len().min(META_V3_OVERFLOW_PAYLOAD_CAP);
90        chunks.push(&tail[..take]);
91        tail = &tail[take..];
92    }
93
94    let mut overflow_pages: Vec<Page> = Vec::with_capacity(chunks.len());
95    let mut overflow_ids: Vec<u32> = Vec::with_capacity(chunks.len());
96    for _ in 0..chunks.len() {
97        let pg = pager.allocate_page(PageType::Overflow)?;
98        overflow_ids.push(pg.page_id());
99        overflow_pages.push(pg);
100    }
101
102    for i in 0..chunks.len() {
103        let next = if i + 1 < chunks.len() {
104            overflow_ids[i + 1]
105        } else {
106            0u32
107        };
108        let len = chunks[i].len() as u32;
109        let buf = overflow_pages[i].as_bytes_mut();
110        buf[cs..cs + 4].copy_from_slice(&next.to_le_bytes());
111        buf[cs + 4..cs + 8].copy_from_slice(&len.to_le_bytes());
112        buf[cs + 8..cs + 8 + chunks[i].len()].copy_from_slice(chunks[i]);
113    }
114    for (idx, page) in overflow_pages.into_iter().enumerate() {
115        let id = overflow_ids[idx];
116        pager.write_page(id, page)?;
117    }
118
119    // Mirror the inner format_version for debug-friendly hex dumps.
120    let format_version = if meta_data.len() >= 8 && &meta_data[0..4] == METADATA_MAGIC {
121        u32::from_le_bytes([meta_data[4], meta_data[5], meta_data[6], meta_data[7]])
122    } else {
123        0
124    };
125
126    let buf = page1.as_bytes_mut();
127    buf[cs..cs + 4].copy_from_slice(METADATA_OVERFLOW_MAGIC);
128    buf[cs + 4..cs + 8].copy_from_slice(&format_version.to_le_bytes());
129    buf[cs + 8..cs + 12].copy_from_slice(&(meta_data.len() as u32).to_le_bytes());
130    buf[cs + 12..cs + 16].copy_from_slice(&overflow_ids[0].to_le_bytes());
131    buf[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_chunk.len()]
132        .copy_from_slice(first_chunk);
133
134    Ok(page1)
135}
136
137/// Assemble the full metadata payload from page 1 (plus its overflow chain
138/// when the `RDM3` wrapper is present). Returns the bytes that the metadata
139/// parser would see starting from the content offset of page 1. Single-page
140/// metadata returns the raw page content (including trailing zero-pad), so
141/// the legacy parser sees the same bytes it always saw.
142fn read_meta_payload(pager: &Pager) -> Option<Vec<u8>> {
143    let cs = crate::storage::engine::HEADER_SIZE;
144    let meta_page = pager
145        .read_page(1)
146        .or_else(|_| pager.recover_meta_from_shadow())
147        .ok()?;
148    let bytes = meta_page.as_bytes();
149    if bytes.len() < cs + 4 {
150        return Some(bytes.get(cs..).unwrap_or(&[]).to_vec());
151    }
152    if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
153        return Some(bytes[cs..].to_vec());
154    }
155    if bytes.len() < cs + META_V3_PAGE1_HEADER {
156        return None;
157    }
158    let total =
159        u32::from_le_bytes([bytes[cs + 8], bytes[cs + 9], bytes[cs + 10], bytes[cs + 11]]) as usize;
160    let mut next = u32::from_le_bytes([
161        bytes[cs + 12],
162        bytes[cs + 13],
163        bytes[cs + 14],
164        bytes[cs + 15],
165    ]);
166    let mut payload: Vec<u8> = Vec::with_capacity(total);
167    let first_take = total.min(META_V3_FIRST_PAYLOAD_CAP);
168    payload.extend_from_slice(
169        &bytes[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_take],
170    );
171    while next != 0 && payload.len() < total {
172        let ov = pager.read_page(next).ok()?;
173        let ob = ov.as_bytes();
174        if ob.len() < cs + META_V3_OVERFLOW_HEADER {
175            return None;
176        }
177        let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
178        let len =
179            u32::from_le_bytes([ob[cs + 4], ob[cs + 5], ob[cs + 6], ob[cs + 7]]) as usize;
180        let remaining = total - payload.len();
181        let take = len.min(remaining).min(META_V3_OVERFLOW_PAYLOAD_CAP);
182        payload.extend_from_slice(&ob[cs + META_V3_OVERFLOW_HEADER..cs + META_V3_OVERFLOW_HEADER + take]);
183        next = nn;
184    }
185    Some(payload)
186}
187
188impl UnifiedStore {
189    pub(crate) fn mark_paged_registry_dirty(&self) {
190        self.paged_registry_dirty.store(true, Ordering::Release);
191    }
192
193    /// Get (or lazily create) the per-collection B-tree under a *read*
194    /// lock whenever possible. Returns a cloned `Arc<BTree>` so callers
195    /// can mutate the tree without holding the outer map's RwLock —
196    /// previously every insert serialised on `btree_indices.write()`,
197    /// costing ~60% of the concurrent-insert throughput ceiling.
198    pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
199        let pager = self.pager.as_ref()?;
200        if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
201            return Some(btree);
202        }
203        let mut write = self.btree_indices.write();
204        let btree = write
205            .entry(collection.to_string())
206            .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
207            .clone();
208        Some(btree)
209    }
210
211    pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
212        let Some(pager) = &self.pager else {
213            return Ok(());
214        };
215
216        if self.paged_registry_dirty.load(Ordering::Acquire) {
217            self.flush_paged_registry()?;
218            self.paged_registry_dirty.store(false, Ordering::Release);
219            return Ok(());
220        }
221
222        pager
223            .flush()
224            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
225    }
226
227    pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
228        let Some(pager) = &self.pager else {
229            return Ok(());
230        };
231
232        match pager.read_page(1) {
233            Ok(_) => {}
234            Err(PagerError::PageNotFound(_)) => {
235                let meta_page = pager
236                    .allocate_page(crate::storage::engine::PageType::Header)
237                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
238                pager
239                    .write_page(meta_page.page_id(), meta_page)
240                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
241            }
242            Err(e) => {
243                return Err(StoreError::Io(std::io::Error::other(e.to_string())));
244            }
245        }
246
247        let format_version = STORE_VERSION_V9;
248        self.set_format_version(format_version);
249
250        let collections = self.collections.read();
251        let btree_indices = self.btree_indices.read();
252        let mut collection_roots = Vec::with_capacity(collections.len());
253        for (name, _) in collections.iter() {
254            let root_page = btree_indices
255                .get(name)
256                .map_or(0, |btree| btree.root_page_id());
257            collection_roots.push((name.clone(), root_page));
258        }
259        drop(btree_indices);
260        drop(collections);
261
262        let mut meta_data = Vec::with_capacity(4096);
263        meta_data.extend_from_slice(METADATA_MAGIC);
264        meta_data.extend_from_slice(&format_version.to_le_bytes());
265        meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
266        for (name, root_page) in &collection_roots {
267            meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
268            meta_data.extend_from_slice(name.as_bytes());
269            meta_data.extend_from_slice(&root_page.to_le_bytes());
270        }
271
272        let cross_refs = self.cross_refs.read();
273        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
274        meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
275        for (source_id, refs) in cross_refs.iter() {
276            for (target_id, ref_type, collection) in refs {
277                meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
278                meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
279                meta_data.push(ref_type.to_byte());
280                meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
281                meta_data.extend_from_slice(collection.as_bytes());
282            }
283        }
284
285        let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
286            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
287
288        pager
289            .write_meta_shadow(&meta_page)
290            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
291        pager
292            .write_page(1, meta_page)
293            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
294        pager
295            .flush()
296            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
297
298        Ok(())
299    }
300
301    /// Get a reference to the underlying pager (if in paged mode).
302    pub fn pager(&self) -> Option<&Arc<Pager>> {
303        self.pager.as_ref()
304    }
305
306    /// Borrow the immutable store configuration. Runtime hooks (e.g. the
307    /// `auto_index_id` first-insert hook in `MutationEngine`) read knobs
308    /// off this struct without going through the legacy global config tree.
309    pub fn config(&self) -> &UnifiedStoreConfig {
310        &self.config
311    }
312
313    pub fn with_config(config: UnifiedStoreConfig) -> Self {
314        Self {
315            config,
316            format_version: AtomicU32::new(STORE_VERSION_V9),
317            next_entity_id: AtomicU64::new(1),
318            collections: RwLock::new(HashMap::new()),
319            cross_refs: RwLock::new(HashMap::new()),
320            reverse_refs: RwLock::new(HashMap::new()),
321            pager: None,
322            db_path: None,
323            btree_indices: RwLock::new(HashMap::new()),
324            context_index: ContextIndex::new(),
325            entity_cache: EntityCache::new(),
326            graph_label_index: RwLock::new(HashMap::new()),
327            paged_registry_dirty: AtomicBool::new(false),
328            commit: None,
329            unindex_cross_refs_fast_path: AtomicU64::new(0),
330        }
331    }
332
333    /// Open or create a page-based database
334    ///
335    /// This uses the page engine for ACID durability with B-tree indices.
336    /// The database file uses 4KB pages with checksums and efficient caching.
337    ///
338    /// # Arguments
339    ///
340    /// * `path` - Path to the database file (e.g., "data.rdb")
341    ///
342    /// # Example
343    ///
344    /// ```rust,ignore
345    /// let store = UnifiedStore::open("security.rdb")?;
346    /// store.create_collection("hosts")?;
347    /// // ... operations ...
348    /// store.persist()?; // Flush to disk
349    /// ```
350    pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
351        Self::open_with_config(path, UnifiedStoreConfig::default())
352    }
353
354    pub fn open_with_config(
355        path: impl AsRef<Path>,
356        config: UnifiedStoreConfig,
357    ) -> Result<Self, StoreError> {
358        let path = path.as_ref();
359        let mut pager_config = PagerConfig::default();
360        // Tunables via env — experimental, used by the benchmark harness
361        // to compare durability profiles head-to-head with Postgres.
362        // REDDB_DOUBLE_WRITE=0 disables the double-write buffer, which
363        // otherwise adds two fsyncs per pager flush (one on DWB, one
364        // on the main file). With DWB off the pager behaves more like
365        // Postgres + full_page_writes=off — we trade torn-page
366        // protection for ingest throughput.
367        if matches!(
368            std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
369            Some("0") | Some("false") | Some("off")
370        ) {
371            pager_config.double_write = false;
372        }
373        let pager = Pager::open(path, pager_config)
374            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
375
376        let wal_path = Self::wal_path_for_db(path);
377        let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
378            Some(Arc::new(
379                StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
380                    .map_err(StoreError::Io)?,
381            ))
382        } else {
383            None
384        };
385
386        let store = Self {
387            config,
388            format_version: AtomicU32::new(STORE_VERSION_V9),
389            next_entity_id: AtomicU64::new(1),
390            collections: RwLock::new(HashMap::new()),
391            cross_refs: RwLock::new(HashMap::new()),
392            reverse_refs: RwLock::new(HashMap::new()),
393            pager: Some(Arc::new(pager)),
394            db_path: Some(path.to_path_buf()),
395            btree_indices: RwLock::new(HashMap::new()),
396            context_index: ContextIndex::new(),
397            entity_cache: EntityCache::new(),
398            graph_label_index: RwLock::new(HashMap::new()),
399            paged_registry_dirty: AtomicBool::new(false),
400            commit,
401            unindex_cross_refs_fast_path: AtomicU64::new(0),
402        };
403
404        // Load existing data from pages if database exists
405        store.load_from_pages()?;
406        if let Some(commit) = &store.commit {
407            commit.replay_into(&store).map_err(StoreError::Io)?;
408        }
409
410        Ok(store)
411    }
412
413    /// Load data from page-based storage
414    ///
415    /// Reads the B-tree indices and reconstructs collections from pages.
416    fn load_from_pages(&self) -> Result<(), StoreError> {
417        let pager = match &self.pager {
418            Some(p) => p,
419            None => return Ok(()), // No pager, nothing to load
420        };
421
422        // Get page count
423        let page_count = pager.page_count().map_err(|e| {
424            StoreError::Io(std::io::Error::other(format!(
425                "failed to read page count: {}",
426                e
427            )))
428        })?;
429        if page_count <= 1 {
430            // Empty database (only header page)
431            return Ok(());
432        }
433
434        // Read metadata starting from page 1 (collections registry). The
435        // helper transparently follows the `RDM3` overflow chain when the
436        // metadata blob spans multiple pages and falls back to the legacy
437        // `<data>-meta` shadow when page 1 itself is corrupted.
438        if let Some(content_vec) = read_meta_payload(pager) {
439            let content: &[u8] = &content_vec;
440            if content.len() >= 4 {
441                let mut pos = 0;
442                let mut format_version = STORE_VERSION_V1;
443
444                if content.len() >= 8 && &content[0..4] == METADATA_MAGIC {
445                    format_version =
446                        u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
447                    pos += 8;
448                }
449
450                self.set_format_version(format_version);
451
452                // Collection count
453                let collection_count = u32::from_le_bytes([
454                    content[pos],
455                    content[pos + 1],
456                    content[pos + 2],
457                    content[pos + 3],
458                ]) as usize;
459                pos += 4;
460
461                // Read collection names and their B-tree root pages
462                for _ in 0..collection_count {
463                    if pos + 4 > content.len() {
464                        break;
465                    }
466
467                    let name_len = u32::from_le_bytes([
468                        content[pos],
469                        content[pos + 1],
470                        content[pos + 2],
471                        content[pos + 3],
472                    ]) as usize;
473                    pos += 4;
474
475                    if pos + name_len + 4 > content.len() {
476                        break;
477                    }
478
479                    if let Ok(name) = String::from_utf8(content[pos..pos + name_len].to_vec()) {
480                        pos += name_len;
481
482                        // Root page ID for this collection's B-tree
483                        let root_page = u32::from_le_bytes([
484                            content[pos],
485                            content[pos + 1],
486                            content[pos + 2],
487                            content[pos + 3],
488                        ]);
489                        pos += 4;
490
491                        // Hydrate the collection in memory only. Loading must
492                        // not emit WAL entries or rewrite the on-disk registry
493                        // before the existing B-tree roots are attached.
494                        let _ = self.create_collection_in_memory(&name);
495
496                        // Load B-tree with root page if it exists
497                        if root_page > 0 {
498                            let btree = BTree::with_root(Arc::clone(pager), root_page);
499
500                            // Load all entities from B-tree into the collection
501                            if let Ok(mut cursor) = btree.cursor_first() {
502                                let manager = self.get_collection(&name);
503                                while let Ok(Some((key, value))) = cursor.next() {
504                                    // Deserialize entity from value bytes
505                                    if let Ok((entity, metadata)) = Self::deserialize_entity_record(
506                                        &value,
507                                        self.format_version(),
508                                    ) {
509                                        if let Some(m) = &manager {
510                                            let id = entity.id;
511                                            if let EntityKind::TableRow { row_id, .. } =
512                                                &entity.kind
513                                            {
514                                                m.register_row_id(*row_id);
515                                            }
516                                            self.context_index.index_entity(&name, &entity);
517                                            let _ = m.insert(entity.clone());
518                                            if let Some(metadata) = metadata {
519                                                let _ = m.set_metadata(id, metadata);
520                                            }
521                                            self.register_entity_id(id);
522                                            if self.config.auto_index_refs {
523                                                self.index_cross_refs(&entity, &name)?;
524                                            }
525                                        }
526                                    }
527                                }
528                            }
529
530                            // Store the B-tree for future lookups
531                            self.btree_indices.write().insert(name, Arc::new(btree));
532                        }
533                    } else {
534                        pos += name_len + 4;
535                    }
536                }
537
538                if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
539                    let cross_ref_count = u32::from_le_bytes([
540                        content[pos],
541                        content[pos + 1],
542                        content[pos + 2],
543                        content[pos + 3],
544                    ]) as usize;
545                    pos += 4;
546
547                    for _ in 0..cross_ref_count {
548                        if pos + 17 > content.len() {
549                            break;
550                        }
551                        let source_id = u64::from_le_bytes([
552                            content[pos],
553                            content[pos + 1],
554                            content[pos + 2],
555                            content[pos + 3],
556                            content[pos + 4],
557                            content[pos + 5],
558                            content[pos + 6],
559                            content[pos + 7],
560                        ]);
561                        pos += 8;
562                        let target_id = u64::from_le_bytes([
563                            content[pos],
564                            content[pos + 1],
565                            content[pos + 2],
566                            content[pos + 3],
567                            content[pos + 4],
568                            content[pos + 5],
569                            content[pos + 6],
570                            content[pos + 7],
571                        ]);
572                        pos += 8;
573                        let ref_type = RefType::from_byte(content[pos]);
574                        pos += 1;
575
576                        if pos + 4 > content.len() {
577                            break;
578                        }
579                        let name_len = u32::from_le_bytes([
580                            content[pos],
581                            content[pos + 1],
582                            content[pos + 2],
583                            content[pos + 3],
584                        ]) as usize;
585                        pos += 4;
586                        if pos + name_len > content.len() {
587                            break;
588                        }
589                        let target_collection =
590                            String::from_utf8_lossy(&content[pos..pos + name_len]).to_string();
591                        pos += name_len;
592
593                        let source_id = EntityId::new(source_id);
594                        let target_id = EntityId::new(target_id);
595
596                        self.cross_refs.write().entry(source_id).or_default().push((
597                            target_id,
598                            ref_type,
599                            target_collection.clone(),
600                        ));
601
602                        if let Some((collection, mut entity)) = self.get_any(source_id) {
603                            let exists = entity.cross_refs().iter().any(|xref| {
604                                xref.target == target_id
605                                    && xref.ref_type == ref_type
606                                    && xref.target_collection == target_collection
607                            });
608                            if !exists {
609                                entity.cross_refs_mut().push(CrossRef::new(
610                                    source_id,
611                                    target_id,
612                                    target_collection.clone(),
613                                    ref_type,
614                                ));
615                                if let Some(manager) = self.get_collection(&collection) {
616                                    let _ = manager.update(entity);
617                                }
618                            }
619                        }
620                    }
621                }
622            }
623        }
624
625        if self.format_version() < STORE_VERSION_V9 {
626            self.set_format_version(STORE_VERSION_V9);
627        }
628
629        Ok(())
630    }
631
632    /// Deserialize an entity from binary bytes
633    pub(crate) fn deserialize_entity(
634        data: &[u8],
635        format_version: u32,
636    ) -> Result<UnifiedEntity, StoreError> {
637        let mut pos = 0;
638        Self::read_entity_binary(data, &mut pos, format_version)
639            .map_err(|e| StoreError::Serialization(e.to_string()))
640    }
641
642    /// Serialize an entity to binary bytes
643    pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
644        // Pre-allocate ~256 bytes to cover the typical 15-column
645        // typed row without any Vec growth. Bulk insert calls this
646        // millions of times per bench run; saving 2-3 reallocs per
647        // entity amortises.
648        let mut buf = Vec::with_capacity(256);
649        Self::write_entity_binary(&mut buf, entity, format_version);
650        buf
651    }
652
653    pub(crate) fn serialize_entity_record(
654        entity: &UnifiedEntity,
655        metadata: Option<&Metadata>,
656        format_version: u32,
657    ) -> Vec<u8> {
658        let entity_bytes = Self::serialize_entity(entity, format_version);
659        // Skip the intermediate metadata Vec when there's no metadata
660        // (common OLTP bulk-insert case): write a zero-length prefix
661        // directly into the record buffer. Only fall back to the old
662        // serialize_metadata() allocation when the caller actually
663        // has fields to persist.
664        let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
665        if has_meta {
666            let metadata_bytes = serialize_metadata(metadata);
667            let mut buf = Vec::with_capacity(12 + entity_bytes.len() + metadata_bytes.len());
668            buf.extend_from_slice(ENTITY_RECORD_MAGIC);
669            buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
670            buf.extend_from_slice(&entity_bytes);
671            buf.extend_from_slice(&(metadata_bytes.len() as u32).to_le_bytes());
672            buf.extend_from_slice(&metadata_bytes);
673            buf
674        } else {
675            let mut buf = Vec::with_capacity(12 + entity_bytes.len());
676            buf.extend_from_slice(ENTITY_RECORD_MAGIC);
677            buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
678            buf.extend_from_slice(&entity_bytes);
679            buf.extend_from_slice(&0u32.to_le_bytes());
680            buf
681        }
682    }
683
684    pub(crate) fn deserialize_entity_record(
685        data: &[u8],
686        format_version: u32,
687    ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
688        if data.len() < 8 || &data[..4] != ENTITY_RECORD_MAGIC {
689            return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
690        }
691
692        let mut pos = 4usize;
693        let entity_len = read_u32(data, &mut pos)? as usize;
694        if pos + entity_len > data.len() {
695            return Err(StoreError::Serialization(
696                "truncated entity record payload".to_string(),
697            ));
698        }
699        let entity = Self::deserialize_entity(&data[pos..pos + entity_len], format_version)?;
700        pos += entity_len;
701
702        let metadata_len = read_u32(data, &mut pos)? as usize;
703        if pos + metadata_len > data.len() {
704            return Err(StoreError::Serialization(
705                "truncated entity record metadata".to_string(),
706            ));
707        }
708        let metadata = if metadata_len == 0 {
709            None
710        } else {
711            let metadata = deserialize_metadata(&data[pos..pos + metadata_len])?;
712            if metadata.is_empty() {
713                None
714            } else {
715                Some(metadata)
716            }
717        };
718
719        Ok((entity, metadata))
720    }
721
722    /// Persist all data to page-based storage
723    ///
724    /// Writes all entities to B-tree pages and flushes to disk.
725    /// This provides ACID durability guarantees.
726    pub fn persist(&self) -> Result<(), StoreError> {
727        let pager = match &self.pager {
728            Some(p) => p,
729            None => {
730                // No pager attached - use binary file fallback if path available
731                if let Some(path) = &self.db_path {
732                    return self
733                        .save_to_file(path)
734                        .map_err(|e| StoreError::Serialization(e.to_string()));
735                }
736                return Err(StoreError::Io(std::io::Error::other(
737                    "No pager or path configured for persistence",
738                )));
739            }
740        };
741
742        match pager.read_page(1) {
743            Ok(_) => {}
744            Err(PagerError::PageNotFound(_)) => {
745                let meta_page = pager
746                    .allocate_page(crate::storage::engine::PageType::Header)
747                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
748                pager
749                    .write_page(meta_page.page_id(), meta_page)
750                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
751            }
752            Err(e) => {
753                return Err(StoreError::Io(std::io::Error::other(e.to_string())));
754            }
755        }
756
757        if let Some(commit) = &self.commit {
758            commit.force_sync().map_err(StoreError::Io)?;
759        }
760
761        let collections = self.collections.read();
762        let mut btree_indices = self.btree_indices.write();
763
764        // Collect collection names and their B-tree root pages
765        let mut collection_roots: Vec<(String, u32)> = Vec::new();
766
767        // For each collection, rebuild the B-tree from the live manager state.
768        // A checkpoint must preserve deletes too, not just upsert the current rows.
769        for (name, manager) in collections.iter() {
770            let btree = btree_indices
771                .entry(name.clone())
772                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
773
774            let mut existing_keys = Vec::new();
775            if !btree.is_empty() {
776                let mut cursor = btree.cursor_first().map_err(|e| {
777                    StoreError::Io(std::io::Error::other(format!(
778                        "B-tree cursor error while rebuilding '{name}': {e}"
779                    )))
780                })?;
781                while let Some((key, _)) = cursor.next().map_err(|e| {
782                    StoreError::Io(std::io::Error::other(format!(
783                        "B-tree scan error while rebuilding '{name}': {e}"
784                    )))
785                })? {
786                    existing_keys.push(key);
787                }
788            }
789
790            for key in existing_keys {
791                btree.delete(&key).map_err(|e| {
792                    StoreError::Io(std::io::Error::other(format!(
793                        "B-tree delete error while rebuilding '{name}': {e}"
794                    )))
795                })?;
796            }
797
798            let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
799                .query_all(|_| true)
800                .into_iter()
801                .map(|entity| {
802                    let metadata = manager.get_metadata(entity.id);
803                    (
804                        entity.id.raw().to_be_bytes().to_vec(),
805                        Self::serialize_entity_record(
806                            &entity,
807                            metadata.as_ref(),
808                            self.format_version(),
809                        ),
810                    )
811                })
812                .collect();
813            records.sort_by(|left, right| left.0.cmp(&right.0));
814
815            // Skip rows whose serialised value exceeds the B-tree's
816            // per-value limit. The bulk insert otherwise aborts the
817            // whole rebuild, which historically poisoned downstream
818            // operations on this collection (callers saw the rebuild
819            // error bubble up as `grpc BulkInsertBinary` failures even
820            // though the user-issued write itself was harmless). The
821            // primary write path enforces `MAX_VALUE_SIZE` already,
822            // so oversized rows here are leftovers from older inserts
823            // (e.g. `red_stats` MCV/histogram arrays predating the
824            // size cap in `stats_catalog`). Logging keeps them
825            // visible for VACUUM-style cleanup later.
826            let max_value_size = crate::storage::engine::btree::MAX_VALUE_SIZE;
827            let original_len = records.len();
828            records.retain(|(_, value)| {
829                if value.len() > max_value_size {
830                    // F-04: `name` is a tenant-supplied collection name.
831                    // Route through LogField escaper to neutralise
832                    // CR/LF/control-byte injection (ADR 0010).
833                    tracing::warn!(
834                        collection = %reddb_wire::audit_safe_log_field(name),
835                        bytes = value.len(),
836                        max = max_value_size,
837                        "skipping oversized row during B-tree bulk rebuild"
838                    );
839                    false
840                } else {
841                    true
842                }
843            });
844            let dropped = original_len - records.len();
845            if dropped > 0 {
846                // F-04: tenant-supplied `name` interpolated into the
847                // structured `collection` field AND the message
848                // string. Sanitize both via the LogField escaper.
849                let safe_name = format!("{}", reddb_wire::audit_safe_log_field(name));
850                tracing::warn!(
851                    collection = %safe_name,
852                    dropped,
853                    "dropped {dropped} oversized row(s) from '{safe_name}' on rebuild — \
854                     the rows remain readable via the in-memory entity store but \
855                     are absent from the on-disk B-tree until they are rewritten"
856                );
857            }
858
859            if !records.is_empty() {
860                btree.bulk_insert_sorted(&records).map_err(|e| {
861                    StoreError::Io(std::io::Error::other(format!(
862                        "B-tree bulk rebuild error for '{name}': {e}"
863                    )))
864                })?;
865            }
866
867            collection_roots.push((name.clone(), btree.root_page_id()));
868        }
869
870        // Write collection metadata to page 1
871        let mut meta_data = Vec::with_capacity(4096);
872
873        let format_version = STORE_VERSION_V9;
874        self.set_format_version(format_version);
875
876        // Metadata header: magic + version + collection count
877        meta_data.extend_from_slice(METADATA_MAGIC);
878        meta_data.extend_from_slice(&format_version.to_le_bytes());
879        meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
880
881        // Write each collection's name and B-tree root page
882        for (name, root_page) in &collection_roots {
883            // Name length
884            meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
885            // Name
886            meta_data.extend_from_slice(name.as_bytes());
887            // Root page ID from actual B-tree
888            meta_data.extend_from_slice(&root_page.to_le_bytes());
889        }
890
891        // Write cross-reference metadata
892        let cross_refs = self.cross_refs.read();
893        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
894        meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
895        for (source_id, refs) in cross_refs.iter() {
896            for (target_id, ref_type, collection) in refs {
897                meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
898                meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
899                meta_data.push(ref_type.to_byte());
900                meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
901                meta_data.extend_from_slice(collection.as_bytes());
902            }
903        }
904
905        // Build page 1 (+ overflow chain when needed) for the metadata blob.
906        let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
907            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
908
909        // Write metadata shadow FIRST (intact copy in case main write fails).
910        // The shadow is a no-op when `fold_pager_meta` is enabled.
911        pager
912            .write_meta_shadow(&meta_page)
913            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
914
915        // Write page
916        pager
917            .write_page(1, meta_page)
918            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
919
920        // Flush and fsync all pages to disk
921        pager
922            .sync()
923            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
924
925        if let Some(commit) = &self.commit {
926            commit.truncate().map_err(StoreError::Io)?;
927        }
928
929        Ok(())
930    }
931
932    /// Check if the store is using page-based persistence
933    pub fn is_paged(&self) -> bool {
934        self.pager.is_some()
935    }
936
937    /// Current root page for a collection's primary B-tree, if one has
938    /// been materialized in this store.
939    pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
940        self.btree_indices
941            .read()
942            .get(collection)
943            .map(|btree| btree.root_page_id())
944            .filter(|root| *root != 0)
945    }
946
947    /// Get the database file path (if using paged mode)
948    pub fn db_path(&self) -> Option<&Path> {
949        self.db_path.as_deref()
950    }
951}
952
953fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
954    let Some(metadata) = metadata else {
955        return Vec::new();
956    };
957    if metadata.is_empty() {
958        return Vec::new();
959    }
960
961    let mut entries: Vec<_> = metadata.iter().collect();
962    entries.sort_by_key(|(a, _)| *a);
963
964    let mut buf = Vec::new();
965    buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
966    for (key, value) in entries {
967        write_string(&mut buf, key);
968        write_metadata_value(&mut buf, value);
969    }
970    buf
971}
972
973fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
974    let mut pos = 0usize;
975    let count = read_u32(data, &mut pos)? as usize;
976    let mut metadata = Metadata::new();
977    for _ in 0..count {
978        let key = read_string(data, &mut pos)?;
979        let value = read_metadata_value(data, &mut pos)?;
980        metadata.set(key, value);
981    }
982    Ok(metadata)
983}
984
985fn write_string(buf: &mut Vec<u8>, value: &str) {
986    buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
987    buf.extend_from_slice(value.as_bytes());
988}
989
990fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
991    buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
992    buf.extend_from_slice(value);
993}
994
995fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
996    use crate::storage::unified::RefTarget;
997
998    match target {
999        RefTarget::TableRow { table, row_id } => {
1000            buf.push(0);
1001            write_string(buf, table);
1002            buf.extend_from_slice(&row_id.to_le_bytes());
1003        }
1004        RefTarget::Node {
1005            collection,
1006            node_id,
1007        } => {
1008            buf.push(1);
1009            write_string(buf, collection);
1010            buf.extend_from_slice(&node_id.raw().to_le_bytes());
1011        }
1012        RefTarget::Edge {
1013            collection,
1014            edge_id,
1015        } => {
1016            buf.push(2);
1017            write_string(buf, collection);
1018            buf.extend_from_slice(&edge_id.raw().to_le_bytes());
1019        }
1020        RefTarget::Vector {
1021            collection,
1022            vector_id,
1023        } => {
1024            buf.push(3);
1025            write_string(buf, collection);
1026            buf.extend_from_slice(&vector_id.raw().to_le_bytes());
1027        }
1028        RefTarget::Entity {
1029            collection,
1030            entity_id,
1031        } => {
1032            buf.push(4);
1033            write_string(buf, collection);
1034            buf.extend_from_slice(&entity_id.raw().to_le_bytes());
1035        }
1036    }
1037}
1038
1039fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
1040    match value {
1041        MetadataValue::Null => buf.push(0),
1042        MetadataValue::Bool(v) => {
1043            buf.push(1);
1044            buf.push(u8::from(*v));
1045        }
1046        MetadataValue::Int(v) => {
1047            buf.push(2);
1048            buf.extend_from_slice(&v.to_le_bytes());
1049        }
1050        MetadataValue::Float(v) => {
1051            buf.push(3);
1052            buf.extend_from_slice(&v.to_le_bytes());
1053        }
1054        MetadataValue::String(v) => {
1055            buf.push(4);
1056            write_string(buf, v);
1057        }
1058        MetadataValue::Bytes(v) => {
1059            buf.push(5);
1060            write_bytes(buf, v);
1061        }
1062        MetadataValue::Array(values) => {
1063            buf.push(6);
1064            buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1065            for value in values {
1066                write_metadata_value(buf, value);
1067            }
1068        }
1069        MetadataValue::Object(values) => {
1070            buf.push(7);
1071            let mut entries: Vec<_> = values.iter().collect();
1072            entries.sort_by_key(|(a, _)| *a);
1073            buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
1074            for (key, value) in entries {
1075                write_string(buf, key);
1076                write_metadata_value(buf, value);
1077            }
1078        }
1079        MetadataValue::Timestamp(v) => {
1080            buf.push(8);
1081            buf.extend_from_slice(&v.to_le_bytes());
1082        }
1083        MetadataValue::Geo { lat, lon } => {
1084            buf.push(9);
1085            buf.extend_from_slice(&lat.to_le_bytes());
1086            buf.extend_from_slice(&lon.to_le_bytes());
1087        }
1088        MetadataValue::Reference(target) => {
1089            buf.push(10);
1090            write_ref_target(buf, target);
1091        }
1092        MetadataValue::References(targets) => {
1093            buf.push(11);
1094            buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
1095            for target in targets {
1096                write_ref_target(buf, target);
1097            }
1098        }
1099    }
1100}
1101
1102fn read_exact_slice<'a>(
1103    data: &'a [u8],
1104    pos: &mut usize,
1105    len: usize,
1106) -> Result<&'a [u8], StoreError> {
1107    if *pos + len > data.len() {
1108        return Err(StoreError::Serialization(
1109            "truncated metadata payload".to_string(),
1110        ));
1111    }
1112    let slice = &data[*pos..*pos + len];
1113    *pos += len;
1114    Ok(slice)
1115}
1116
1117fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
1118    let bytes = read_exact_slice(data, pos, 4)?;
1119    let mut raw = [0u8; 4];
1120    raw.copy_from_slice(bytes);
1121    Ok(u32::from_le_bytes(raw))
1122}
1123
1124fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
1125    let bytes = read_exact_slice(data, pos, 8)?;
1126    let mut raw = [0u8; 8];
1127    raw.copy_from_slice(bytes);
1128    Ok(u64::from_le_bytes(raw))
1129}
1130
1131fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
1132    let bytes = read_exact_slice(data, pos, 8)?;
1133    let mut raw = [0u8; 8];
1134    raw.copy_from_slice(bytes);
1135    Ok(i64::from_le_bytes(raw))
1136}
1137
1138fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
1139    let bytes = read_exact_slice(data, pos, 8)?;
1140    let mut raw = [0u8; 8];
1141    raw.copy_from_slice(bytes);
1142    Ok(f64::from_le_bytes(raw))
1143}
1144
1145fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
1146    let bytes = read_exact_slice(data, pos, 1)?;
1147    Ok(bytes[0])
1148}
1149
1150fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
1151    let len = read_u32(data, pos)? as usize;
1152    let bytes = read_exact_slice(data, pos, len)?;
1153    String::from_utf8(bytes.to_vec()).map_err(|err| StoreError::Serialization(err.to_string()))
1154}
1155
1156fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
1157    let len = read_u32(data, pos)? as usize;
1158    Ok(read_exact_slice(data, pos, len)?.to_vec())
1159}
1160
1161fn read_ref_target(
1162    data: &[u8],
1163    pos: &mut usize,
1164) -> Result<crate::storage::unified::RefTarget, StoreError> {
1165    use crate::storage::unified::RefTarget;
1166
1167    match read_u8(data, pos)? {
1168        0 => Ok(RefTarget::TableRow {
1169            table: read_string(data, pos)?,
1170            row_id: read_u64(data, pos)?,
1171        }),
1172        1 => Ok(RefTarget::Node {
1173            collection: read_string(data, pos)?,
1174            node_id: EntityId::new(read_u64(data, pos)?),
1175        }),
1176        2 => Ok(RefTarget::Edge {
1177            collection: read_string(data, pos)?,
1178            edge_id: EntityId::new(read_u64(data, pos)?),
1179        }),
1180        3 => Ok(RefTarget::Vector {
1181            collection: read_string(data, pos)?,
1182            vector_id: EntityId::new(read_u64(data, pos)?),
1183        }),
1184        4 => Ok(RefTarget::Entity {
1185            collection: read_string(data, pos)?,
1186            entity_id: EntityId::new(read_u64(data, pos)?),
1187        }),
1188        tag => Err(StoreError::Serialization(format!(
1189            "unknown metadata ref target tag {tag}"
1190        ))),
1191    }
1192}
1193
1194fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1195    match read_u8(data, pos)? {
1196        0 => Ok(MetadataValue::Null),
1197        1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1198        2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1199        3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1200        4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1201        5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1202        6 => {
1203            let count = read_u32(data, pos)? as usize;
1204            let mut values = Vec::with_capacity(count);
1205            for _ in 0..count {
1206                values.push(read_metadata_value(data, pos)?);
1207            }
1208            Ok(MetadataValue::Array(values))
1209        }
1210        7 => {
1211            let count = read_u32(data, pos)? as usize;
1212            let mut values = std::collections::HashMap::with_capacity(count);
1213            for _ in 0..count {
1214                let key = read_string(data, pos)?;
1215                let value = read_metadata_value(data, pos)?;
1216                values.insert(key, value);
1217            }
1218            Ok(MetadataValue::Object(values))
1219        }
1220        8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1221        9 => Ok(MetadataValue::Geo {
1222            lat: read_f64(data, pos)?,
1223            lon: read_f64(data, pos)?,
1224        }),
1225        10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1226        11 => {
1227            let count = read_u32(data, pos)? as usize;
1228            let mut targets = Vec::with_capacity(count);
1229            for _ in 0..count {
1230                targets.push(read_ref_target(data, pos)?);
1231            }
1232            Ok(MetadataValue::References(targets))
1233        }
1234        tag => Err(StoreError::Serialization(format!(
1235            "unknown metadata value tag {tag}"
1236        ))),
1237    }
1238}