Skip to main content

reddb_server/storage/unified/store/
impl_pages.rs

1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5const ENTITY_RECORD_MAGIC: &[u8; 4] = b"RER1";
6
7// ── Pager-meta overflow chain (gh-477) ──────────────────────────────────────
8// When the serialized collection registry + cross-refs exceed a single page,
9// page 1 carries a "RDM3" wrapper header pointing at an overflow chain of
10// `PageType::Overflow` pages. Single-page metadata keeps the historical
11// bit-identical layout (`METADATA_MAGIC = "RDM2"` written directly at the
12// content offset).
13//
14// Page 1 (overflow form), starting at `HEADER_SIZE`:
15//   [0..4]   magic = "RDM3"
16//   [4..8]   format_version (u32, mirrors inner payload version for debug)
17//   [8..12]  total_payload_bytes (u32)
18//   [12..16] next_overflow_page_id (u32, > 0)
19//   [16..]   first payload chunk (up to META_V3_FIRST_PAYLOAD_CAP bytes)
20//
21// Overflow continuation page, starting at `HEADER_SIZE`:
22//   [0..4]   next_overflow_page_id (u32, 0 if last)
23//   [4..8]   chunk_bytes (u32)
24//   [8..]    chunk payload (up to META_V3_OVERFLOW_PAYLOAD_CAP bytes)
25const METADATA_OVERFLOW_MAGIC: &[u8; 4] = b"RDM3";
26const META_PAGE_CONTENT_CAP: usize =
27    crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE;
28const META_V3_PAGE1_HEADER: usize = 16;
29const META_V3_OVERFLOW_HEADER: usize = 8;
30const META_V3_FIRST_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_PAGE1_HEADER;
31const META_V3_OVERFLOW_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_OVERFLOW_HEADER;
32
33fn free_existing_overflow_chain(pager: &Pager) -> Result<(), PagerError> {
34    let cs = crate::storage::engine::HEADER_SIZE;
35    let page = match pager.read_page(1) {
36        Ok(p) => p,
37        Err(_) => return Ok(()),
38    };
39    let bytes = page.as_bytes();
40    if bytes.len() < cs + META_V3_PAGE1_HEADER {
41        return Ok(());
42    }
43    if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
44        return Ok(());
45    }
46    let mut next = u32::from_le_bytes([
47        bytes[cs + 12],
48        bytes[cs + 13],
49        bytes[cs + 14],
50        bytes[cs + 15],
51    ]);
52    while next != 0 {
53        let ov = match pager.read_page(next) {
54            Ok(p) => p,
55            Err(_) => break,
56        };
57        let ob = ov.as_bytes();
58        let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
59        let _ = pager.free_page(next);
60        next = nn;
61    }
62    Ok(())
63}
64
65fn build_meta_page1_with_overflow(
66    pager: &Pager,
67    meta_data: &[u8],
68) -> Result<crate::storage::engine::Page, PagerError> {
69    use crate::storage::engine::{Page, PageType, HEADER_SIZE};
70    free_existing_overflow_chain(pager)?;
71
72    let mut page1 = Page::new(PageType::Header, 1);
73    let cs = HEADER_SIZE;
74
75    if meta_data.len() <= META_PAGE_CONTENT_CAP {
76        // Single-page: bit-identical to the historical layout.
77        let buf = page1.as_bytes_mut();
78        buf[cs..cs + meta_data.len()].copy_from_slice(meta_data);
79        return Ok(page1);
80    }
81
82    // Multi-page overflow form. Split the inner payload into the first chunk
83    // (held on page 1) followed by zero-or-more continuation chunks chained
84    // through `PageType::Overflow` pages.
85    let first_chunk = &meta_data[..META_V3_FIRST_PAYLOAD_CAP];
86    let mut tail = &meta_data[META_V3_FIRST_PAYLOAD_CAP..];
87    let mut chunks: Vec<&[u8]> = Vec::new();
88    while !tail.is_empty() {
89        let take = tail.len().min(META_V3_OVERFLOW_PAYLOAD_CAP);
90        chunks.push(&tail[..take]);
91        tail = &tail[take..];
92    }
93
94    let mut overflow_pages: Vec<Page> = Vec::with_capacity(chunks.len());
95    let mut overflow_ids: Vec<u32> = Vec::with_capacity(chunks.len());
96    for _ in 0..chunks.len() {
97        let pg = pager.allocate_page(PageType::Overflow)?;
98        overflow_ids.push(pg.page_id());
99        overflow_pages.push(pg);
100    }
101
102    for i in 0..chunks.len() {
103        let next = if i + 1 < chunks.len() {
104            overflow_ids[i + 1]
105        } else {
106            0u32
107        };
108        let len = chunks[i].len() as u32;
109        let buf = overflow_pages[i].as_bytes_mut();
110        buf[cs..cs + 4].copy_from_slice(&next.to_le_bytes());
111        buf[cs + 4..cs + 8].copy_from_slice(&len.to_le_bytes());
112        buf[cs + 8..cs + 8 + chunks[i].len()].copy_from_slice(chunks[i]);
113    }
114    for (idx, page) in overflow_pages.into_iter().enumerate() {
115        let id = overflow_ids[idx];
116        pager.write_page(id, page)?;
117    }
118
119    // Mirror the inner format_version for debug-friendly hex dumps.
120    let format_version = if meta_data.len() >= 8 && &meta_data[0..4] == METADATA_MAGIC {
121        u32::from_le_bytes([meta_data[4], meta_data[5], meta_data[6], meta_data[7]])
122    } else {
123        0
124    };
125
126    let buf = page1.as_bytes_mut();
127    buf[cs..cs + 4].copy_from_slice(METADATA_OVERFLOW_MAGIC);
128    buf[cs + 4..cs + 8].copy_from_slice(&format_version.to_le_bytes());
129    buf[cs + 8..cs + 12].copy_from_slice(&(meta_data.len() as u32).to_le_bytes());
130    buf[cs + 12..cs + 16].copy_from_slice(&overflow_ids[0].to_le_bytes());
131    buf[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_chunk.len()]
132        .copy_from_slice(first_chunk);
133
134    Ok(page1)
135}
136
137/// Assemble the full metadata payload from page 1 (plus its overflow chain
138/// when the `RDM3` wrapper is present). Returns the bytes that the metadata
139/// parser would see starting from the content offset of page 1. Single-page
140/// metadata returns the raw page content (including trailing zero-pad), so
141/// the legacy parser sees the same bytes it always saw.
142fn read_meta_payload(pager: &Pager) -> Option<Vec<u8>> {
143    let cs = crate::storage::engine::HEADER_SIZE;
144    let meta_page = pager
145        .read_page(1)
146        .or_else(|_| pager.recover_meta_from_shadow())
147        .ok()?;
148    let bytes = meta_page.as_bytes();
149    if bytes.len() < cs + 4 {
150        return Some(bytes.get(cs..).unwrap_or(&[]).to_vec());
151    }
152    if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
153        return Some(bytes[cs..].to_vec());
154    }
155    if bytes.len() < cs + META_V3_PAGE1_HEADER {
156        return None;
157    }
158    let total =
159        u32::from_le_bytes([bytes[cs + 8], bytes[cs + 9], bytes[cs + 10], bytes[cs + 11]]) as usize;
160    let mut next = u32::from_le_bytes([
161        bytes[cs + 12],
162        bytes[cs + 13],
163        bytes[cs + 14],
164        bytes[cs + 15],
165    ]);
166    let mut payload: Vec<u8> = Vec::with_capacity(total);
167    let first_take = total.min(META_V3_FIRST_PAYLOAD_CAP);
168    payload.extend_from_slice(
169        &bytes[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_take],
170    );
171    while next != 0 && payload.len() < total {
172        let ov = pager.read_page(next).ok()?;
173        let ob = ov.as_bytes();
174        if ob.len() < cs + META_V3_OVERFLOW_HEADER {
175            return None;
176        }
177        let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
178        let len = u32::from_le_bytes([ob[cs + 4], ob[cs + 5], ob[cs + 6], ob[cs + 7]]) as usize;
179        let remaining = total - payload.len();
180        let take = len.min(remaining).min(META_V3_OVERFLOW_PAYLOAD_CAP);
181        payload.extend_from_slice(
182            &ob[cs + META_V3_OVERFLOW_HEADER..cs + META_V3_OVERFLOW_HEADER + take],
183        );
184        next = nn;
185    }
186    Some(payload)
187}
188
189impl UnifiedStore {
190    pub(crate) fn mark_paged_registry_dirty(&self) {
191        self.paged_registry_dirty.store(true, Ordering::Release);
192    }
193
194    /// Get (or lazily create) the per-collection B-tree under a *read*
195    /// lock whenever possible. Returns a cloned `Arc<BTree>` so callers
196    /// can mutate the tree without holding the outer map's RwLock —
197    /// previously every insert serialised on `btree_indices.write()`,
198    /// costing ~60% of the concurrent-insert throughput ceiling.
199    pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
200        let pager = self.pager.as_ref()?;
201        if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
202            return Some(btree);
203        }
204        let mut write = self.btree_indices.write();
205        let btree = write
206            .entry(collection.to_string())
207            .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
208            .clone();
209        Some(btree)
210    }
211
212    pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
213        let Some(pager) = &self.pager else {
214            return Ok(());
215        };
216
217        if self.paged_registry_dirty.load(Ordering::Acquire) {
218            self.flush_paged_registry()?;
219            self.paged_registry_dirty.store(false, Ordering::Release);
220            return Ok(());
221        }
222
223        pager
224            .flush()
225            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
226    }
227
228    pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
229        let Some(pager) = &self.pager else {
230            return Ok(());
231        };
232
233        match pager.read_page(1) {
234            Ok(_) => {}
235            Err(PagerError::PageNotFound(_)) => {
236                let meta_page = pager
237                    .allocate_page(crate::storage::engine::PageType::Header)
238                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
239                pager
240                    .write_page(meta_page.page_id(), meta_page)
241                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
242            }
243            Err(e) => {
244                return Err(StoreError::Io(std::io::Error::other(e.to_string())));
245            }
246        }
247
248        let format_version = STORE_VERSION_V9;
249        self.set_format_version(format_version);
250
251        let collections = self.collections.read();
252        let btree_indices = self.btree_indices.read();
253        let mut collection_roots = Vec::with_capacity(collections.len());
254        for (name, _) in collections.iter() {
255            let root_page = btree_indices
256                .get(name)
257                .map_or(0, |btree| btree.root_page_id());
258            collection_roots.push((name.clone(), root_page));
259        }
260        drop(btree_indices);
261        drop(collections);
262
263        let mut meta_data = Vec::with_capacity(4096);
264        meta_data.extend_from_slice(METADATA_MAGIC);
265        meta_data.extend_from_slice(&format_version.to_le_bytes());
266        meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
267        for (name, root_page) in &collection_roots {
268            meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
269            meta_data.extend_from_slice(name.as_bytes());
270            meta_data.extend_from_slice(&root_page.to_le_bytes());
271        }
272
273        let cross_refs = self.cross_refs.read();
274        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
275        meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
276        for (source_id, refs) in cross_refs.iter() {
277            for (target_id, ref_type, collection) in refs {
278                meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
279                meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
280                meta_data.push(ref_type.to_byte());
281                meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
282                meta_data.extend_from_slice(collection.as_bytes());
283            }
284        }
285
286        let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
287            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
288
289        pager
290            .write_meta_shadow(&meta_page)
291            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
292        pager
293            .write_page(1, meta_page)
294            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
295        pager
296            .flush()
297            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
298
299        Ok(())
300    }
301
302    /// Get a reference to the underlying pager (if in paged mode).
303    pub fn pager(&self) -> Option<&Arc<Pager>> {
304        self.pager.as_ref()
305    }
306
307    /// Borrow the immutable store configuration. Runtime hooks (e.g. the
308    /// `auto_index_id` first-insert hook in `MutationEngine`) read knobs
309    /// off this struct without going through the legacy global config tree.
310    pub fn config(&self) -> &UnifiedStoreConfig {
311        &self.config
312    }
313
314    pub fn with_config(config: UnifiedStoreConfig) -> Self {
315        Self {
316            config,
317            format_version: AtomicU32::new(STORE_VERSION_V9),
318            next_entity_id: AtomicU64::new(1),
319            collections: RwLock::new(HashMap::new()),
320            cross_refs: RwLock::new(HashMap::new()),
321            reverse_refs: RwLock::new(HashMap::new()),
322            pager: None,
323            db_path: None,
324            btree_indices: RwLock::new(HashMap::new()),
325            context_index: ContextIndex::new(),
326            entity_cache: EntityCache::new(),
327            graph_label_index: RwLock::new(HashMap::new()),
328            paged_registry_dirty: AtomicBool::new(false),
329            commit: None,
330            unindex_cross_refs_fast_path: AtomicU64::new(0),
331        }
332    }
333
334    /// Open or create a page-based database
335    ///
336    /// This uses the page engine for ACID durability with B-tree indices.
337    /// The database file uses 4KB pages with checksums and efficient caching.
338    ///
339    /// # Arguments
340    ///
341    /// * `path` - Path to the database file (e.g., "data.rdb")
342    ///
343    /// # Example
344    ///
345    /// ```rust,ignore
346    /// let store = UnifiedStore::open("security.rdb")?;
347    /// store.create_collection("hosts")?;
348    /// // ... operations ...
349    /// store.persist()?; // Flush to disk
350    /// ```
351    pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
352        Self::open_with_config(path, UnifiedStoreConfig::default())
353    }
354
355    pub fn open_with_config(
356        path: impl AsRef<Path>,
357        config: UnifiedStoreConfig,
358    ) -> Result<Self, StoreError> {
359        let path = path.as_ref();
360        let mut pager_config = PagerConfig::default();
361        // Tunables via env — experimental, used by the benchmark harness
362        // to compare durability profiles head-to-head with Postgres.
363        // REDDB_DOUBLE_WRITE=0 disables the double-write buffer, which
364        // otherwise adds two fsyncs per pager flush (one on DWB, one
365        // on the main file). With DWB off the pager behaves more like
366        // Postgres + full_page_writes=off — we trade torn-page
367        // protection for ingest throughput.
368        if matches!(
369            std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
370            Some("0") | Some("false") | Some("off")
371        ) {
372            pager_config.double_write = false;
373        }
374        let pager = Pager::open(path, pager_config)
375            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
376
377        let wal_path = Self::wal_path_for_db(path);
378        let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
379            Some(Arc::new(
380                StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
381                    .map_err(StoreError::Io)?,
382            ))
383        } else {
384            None
385        };
386
387        let store = Self {
388            config,
389            format_version: AtomicU32::new(STORE_VERSION_V9),
390            next_entity_id: AtomicU64::new(1),
391            collections: RwLock::new(HashMap::new()),
392            cross_refs: RwLock::new(HashMap::new()),
393            reverse_refs: RwLock::new(HashMap::new()),
394            pager: Some(Arc::new(pager)),
395            db_path: Some(path.to_path_buf()),
396            btree_indices: RwLock::new(HashMap::new()),
397            context_index: ContextIndex::new(),
398            entity_cache: EntityCache::new(),
399            graph_label_index: RwLock::new(HashMap::new()),
400            paged_registry_dirty: AtomicBool::new(false),
401            commit,
402            unindex_cross_refs_fast_path: AtomicU64::new(0),
403        };
404
405        // Load existing data from pages if database exists
406        store.load_from_pages()?;
407        if let Some(commit) = &store.commit {
408            commit.replay_into(&store).map_err(StoreError::Io)?;
409        }
410
411        Ok(store)
412    }
413
414    /// Load data from page-based storage
415    ///
416    /// Reads the B-tree indices and reconstructs collections from pages.
417    fn load_from_pages(&self) -> Result<(), StoreError> {
418        let pager = match &self.pager {
419            Some(p) => p,
420            None => return Ok(()), // No pager, nothing to load
421        };
422
423        // Get page count
424        let page_count = pager.page_count().map_err(|e| {
425            StoreError::Io(std::io::Error::other(format!(
426                "failed to read page count: {}",
427                e
428            )))
429        })?;
430        if page_count <= 1 {
431            // Empty database (only header page)
432            return Ok(());
433        }
434
435        // Read metadata starting from page 1 (collections registry). The
436        // helper transparently follows the `RDM3` overflow chain when the
437        // metadata blob spans multiple pages and falls back to the legacy
438        // `<data>-meta` shadow when page 1 itself is corrupted.
439        if let Some(content_vec) = read_meta_payload(pager) {
440            let content: &[u8] = &content_vec;
441            if content.len() >= 4 {
442                let mut pos = 0;
443                let mut format_version = STORE_VERSION_V1;
444
445                if content.len() >= 8 && &content[0..4] == METADATA_MAGIC {
446                    format_version =
447                        u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
448                    pos += 8;
449                }
450
451                self.set_format_version(format_version);
452
453                // Collection count
454                let collection_count = u32::from_le_bytes([
455                    content[pos],
456                    content[pos + 1],
457                    content[pos + 2],
458                    content[pos + 3],
459                ]) as usize;
460                pos += 4;
461
462                // Read collection names and their B-tree root pages
463                for _ in 0..collection_count {
464                    if pos + 4 > content.len() {
465                        break;
466                    }
467
468                    let name_len = u32::from_le_bytes([
469                        content[pos],
470                        content[pos + 1],
471                        content[pos + 2],
472                        content[pos + 3],
473                    ]) as usize;
474                    pos += 4;
475
476                    if pos + name_len + 4 > content.len() {
477                        break;
478                    }
479
480                    if let Ok(name) = String::from_utf8(content[pos..pos + name_len].to_vec()) {
481                        pos += name_len;
482
483                        // Root page ID for this collection's B-tree
484                        let root_page = u32::from_le_bytes([
485                            content[pos],
486                            content[pos + 1],
487                            content[pos + 2],
488                            content[pos + 3],
489                        ]);
490                        pos += 4;
491
492                        // Hydrate the collection in memory only. Loading must
493                        // not emit WAL entries or rewrite the on-disk registry
494                        // before the existing B-tree roots are attached.
495                        let _ = self.create_collection_in_memory(&name);
496
497                        // Load B-tree with root page if it exists
498                        if root_page > 0 {
499                            let btree = BTree::with_root(Arc::clone(pager), root_page);
500
501                            // Load all entities from B-tree into the collection
502                            if let Ok(mut cursor) = btree.cursor_first() {
503                                let manager = self.get_collection(&name);
504                                while let Ok(Some((key, value))) = cursor.next() {
505                                    // Deserialize entity from value bytes
506                                    if let Ok((entity, metadata)) = Self::deserialize_entity_record(
507                                        &value,
508                                        self.format_version(),
509                                    ) {
510                                        if let Some(m) = &manager {
511                                            let id = entity.id;
512                                            if let EntityKind::TableRow { row_id, .. } =
513                                                &entity.kind
514                                            {
515                                                m.register_row_id(*row_id);
516                                            }
517                                            self.context_index.index_entity(&name, &entity);
518                                            let _ = m.insert(entity.clone());
519                                            if let Some(metadata) = metadata {
520                                                let _ = m.set_metadata(id, metadata);
521                                            }
522                                            self.register_entity_id(id);
523                                            if self.config.auto_index_refs {
524                                                self.index_cross_refs(&entity, &name)?;
525                                            }
526                                        }
527                                    }
528                                }
529                            }
530
531                            // Store the B-tree for future lookups
532                            self.btree_indices.write().insert(name, Arc::new(btree));
533                        }
534                    } else {
535                        pos += name_len + 4;
536                    }
537                }
538
539                if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
540                    let cross_ref_count = u32::from_le_bytes([
541                        content[pos],
542                        content[pos + 1],
543                        content[pos + 2],
544                        content[pos + 3],
545                    ]) as usize;
546                    pos += 4;
547
548                    for _ in 0..cross_ref_count {
549                        if pos + 17 > content.len() {
550                            break;
551                        }
552                        let source_id = u64::from_le_bytes([
553                            content[pos],
554                            content[pos + 1],
555                            content[pos + 2],
556                            content[pos + 3],
557                            content[pos + 4],
558                            content[pos + 5],
559                            content[pos + 6],
560                            content[pos + 7],
561                        ]);
562                        pos += 8;
563                        let target_id = u64::from_le_bytes([
564                            content[pos],
565                            content[pos + 1],
566                            content[pos + 2],
567                            content[pos + 3],
568                            content[pos + 4],
569                            content[pos + 5],
570                            content[pos + 6],
571                            content[pos + 7],
572                        ]);
573                        pos += 8;
574                        let ref_type = RefType::from_byte(content[pos]);
575                        pos += 1;
576
577                        if pos + 4 > content.len() {
578                            break;
579                        }
580                        let name_len = u32::from_le_bytes([
581                            content[pos],
582                            content[pos + 1],
583                            content[pos + 2],
584                            content[pos + 3],
585                        ]) as usize;
586                        pos += 4;
587                        if pos + name_len > content.len() {
588                            break;
589                        }
590                        let target_collection =
591                            String::from_utf8_lossy(&content[pos..pos + name_len]).to_string();
592                        pos += name_len;
593
594                        let source_id = EntityId::new(source_id);
595                        let target_id = EntityId::new(target_id);
596
597                        self.cross_refs.write().entry(source_id).or_default().push((
598                            target_id,
599                            ref_type,
600                            target_collection.clone(),
601                        ));
602
603                        if let Some((collection, mut entity)) = self.get_any(source_id) {
604                            let exists = entity.cross_refs().iter().any(|xref| {
605                                xref.target == target_id
606                                    && xref.ref_type == ref_type
607                                    && xref.target_collection == target_collection
608                            });
609                            if !exists {
610                                entity.cross_refs_mut().push(CrossRef::new(
611                                    source_id,
612                                    target_id,
613                                    target_collection.clone(),
614                                    ref_type,
615                                ));
616                                if let Some(manager) = self.get_collection(&collection) {
617                                    let _ = manager.update(entity);
618                                }
619                            }
620                        }
621                    }
622                }
623            }
624        }
625
626        if self.format_version() < STORE_VERSION_V9 {
627            self.set_format_version(STORE_VERSION_V9);
628        }
629
630        Ok(())
631    }
632
633    /// Deserialize an entity from binary bytes
634    pub(crate) fn deserialize_entity(
635        data: &[u8],
636        format_version: u32,
637    ) -> Result<UnifiedEntity, StoreError> {
638        let mut pos = 0;
639        Self::read_entity_binary(data, &mut pos, format_version)
640            .map_err(|e| StoreError::Serialization(e.to_string()))
641    }
642
643    /// Serialize an entity to binary bytes
644    pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
645        // Pre-allocate ~256 bytes to cover the typical 15-column
646        // typed row without any Vec growth. Bulk insert calls this
647        // millions of times per bench run; saving 2-3 reallocs per
648        // entity amortises.
649        let mut buf = Vec::with_capacity(256);
650        Self::write_entity_binary(&mut buf, entity, format_version);
651        buf
652    }
653
654    pub(crate) fn serialize_entity_record(
655        entity: &UnifiedEntity,
656        metadata: Option<&Metadata>,
657        format_version: u32,
658    ) -> Vec<u8> {
659        let entity_bytes = Self::serialize_entity(entity, format_version);
660        // Skip the intermediate metadata Vec when there's no metadata
661        // (common OLTP bulk-insert case): write a zero-length prefix
662        // directly into the record buffer. Only fall back to the old
663        // serialize_metadata() allocation when the caller actually
664        // has fields to persist.
665        let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
666        if has_meta {
667            let metadata_bytes = serialize_metadata(metadata);
668            let mut buf = Vec::with_capacity(12 + entity_bytes.len() + metadata_bytes.len());
669            buf.extend_from_slice(ENTITY_RECORD_MAGIC);
670            buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
671            buf.extend_from_slice(&entity_bytes);
672            buf.extend_from_slice(&(metadata_bytes.len() as u32).to_le_bytes());
673            buf.extend_from_slice(&metadata_bytes);
674            buf
675        } else {
676            let mut buf = Vec::with_capacity(12 + entity_bytes.len());
677            buf.extend_from_slice(ENTITY_RECORD_MAGIC);
678            buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
679            buf.extend_from_slice(&entity_bytes);
680            buf.extend_from_slice(&0u32.to_le_bytes());
681            buf
682        }
683    }
684
685    pub(crate) fn deserialize_entity_record(
686        data: &[u8],
687        format_version: u32,
688    ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
689        if data.len() < 8 || &data[..4] != ENTITY_RECORD_MAGIC {
690            return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
691        }
692
693        let mut pos = 4usize;
694        let entity_len = read_u32(data, &mut pos)? as usize;
695        if pos + entity_len > data.len() {
696            return Err(StoreError::Serialization(
697                "truncated entity record payload".to_string(),
698            ));
699        }
700        let entity = Self::deserialize_entity(&data[pos..pos + entity_len], format_version)?;
701        pos += entity_len;
702
703        let metadata_len = read_u32(data, &mut pos)? as usize;
704        if pos + metadata_len > data.len() {
705            return Err(StoreError::Serialization(
706                "truncated entity record metadata".to_string(),
707            ));
708        }
709        let metadata = if metadata_len == 0 {
710            None
711        } else {
712            let metadata = deserialize_metadata(&data[pos..pos + metadata_len])?;
713            if metadata.is_empty() {
714                None
715            } else {
716                Some(metadata)
717            }
718        };
719
720        Ok((entity, metadata))
721    }
722
723    /// Persist all data to page-based storage
724    ///
725    /// Writes all entities to B-tree pages and flushes to disk.
726    /// This provides ACID durability guarantees.
727    pub fn persist(&self) -> Result<(), StoreError> {
728        let pager = match &self.pager {
729            Some(p) => p,
730            None => {
731                // No pager attached - use binary file fallback if path available
732                if let Some(path) = &self.db_path {
733                    return self
734                        .save_to_file(path)
735                        .map_err(|e| StoreError::Serialization(e.to_string()));
736                }
737                return Err(StoreError::Io(std::io::Error::other(
738                    "No pager or path configured for persistence",
739                )));
740            }
741        };
742
743        match pager.read_page(1) {
744            Ok(_) => {}
745            Err(PagerError::PageNotFound(_)) => {
746                let meta_page = pager
747                    .allocate_page(crate::storage::engine::PageType::Header)
748                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
749                pager
750                    .write_page(meta_page.page_id(), meta_page)
751                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
752            }
753            Err(e) => {
754                return Err(StoreError::Io(std::io::Error::other(e.to_string())));
755            }
756        }
757
758        if let Some(commit) = &self.commit {
759            commit.force_sync().map_err(StoreError::Io)?;
760        }
761
762        let collections = self.collections.read();
763        let mut btree_indices = self.btree_indices.write();
764
765        // Collect collection names and their B-tree root pages
766        let mut collection_roots: Vec<(String, u32)> = Vec::new();
767
768        // For each collection, rebuild the B-tree from the live manager state.
769        // A checkpoint must preserve deletes too, not just upsert the current rows.
770        for (name, manager) in collections.iter() {
771            let btree = btree_indices
772                .entry(name.clone())
773                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
774
775            let mut existing_keys = Vec::new();
776            if !btree.is_empty() {
777                let mut cursor = btree.cursor_first().map_err(|e| {
778                    StoreError::Io(std::io::Error::other(format!(
779                        "B-tree cursor error while rebuilding '{name}': {e}"
780                    )))
781                })?;
782                while let Some((key, _)) = cursor.next().map_err(|e| {
783                    StoreError::Io(std::io::Error::other(format!(
784                        "B-tree scan error while rebuilding '{name}': {e}"
785                    )))
786                })? {
787                    existing_keys.push(key);
788                }
789            }
790
791            for key in existing_keys {
792                btree.delete(&key).map_err(|e| {
793                    StoreError::Io(std::io::Error::other(format!(
794                        "B-tree delete error while rebuilding '{name}': {e}"
795                    )))
796                })?;
797            }
798
799            let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
800                .query_all(|_| true)
801                .into_iter()
802                .map(|entity| {
803                    let metadata = manager.get_metadata(entity.id);
804                    (
805                        entity.id.raw().to_be_bytes().to_vec(),
806                        Self::serialize_entity_record(
807                            &entity,
808                            metadata.as_ref(),
809                            self.format_version(),
810                        ),
811                    )
812                })
813                .collect();
814            records.sort_by(|left, right| left.0.cmp(&right.0));
815
816            // Skip rows whose serialised value exceeds the B-tree's
817            // per-value limit. The bulk insert otherwise aborts the
818            // whole rebuild, which historically poisoned downstream
819            // operations on this collection (callers saw the rebuild
820            // error bubble up as `grpc BulkInsertBinary` failures even
821            // though the user-issued write itself was harmless). The
822            // primary write path enforces `MAX_VALUE_SIZE` already,
823            // so oversized rows here are leftovers from older inserts
824            // (e.g. `red_stats` MCV/histogram arrays predating the
825            // size cap in `stats_catalog`). Logging keeps them
826            // visible for VACUUM-style cleanup later.
827            let max_value_size = crate::storage::engine::btree::MAX_VALUE_SIZE;
828            let original_len = records.len();
829            records.retain(|(_, value)| {
830                if value.len() > max_value_size {
831                    // F-04: `name` is a tenant-supplied collection name.
832                    // Route through LogField escaper to neutralise
833                    // CR/LF/control-byte injection (ADR 0010).
834                    tracing::warn!(
835                        collection = %reddb_wire::audit_safe_log_field(name),
836                        bytes = value.len(),
837                        max = max_value_size,
838                        "skipping oversized row during B-tree bulk rebuild"
839                    );
840                    false
841                } else {
842                    true
843                }
844            });
845            let dropped = original_len - records.len();
846            if dropped > 0 {
847                // F-04: tenant-supplied `name` interpolated into the
848                // structured `collection` field AND the message
849                // string. Sanitize both via the LogField escaper.
850                let safe_name = format!("{}", reddb_wire::audit_safe_log_field(name));
851                tracing::warn!(
852                    collection = %safe_name,
853                    dropped,
854                    "dropped {dropped} oversized row(s) from '{safe_name}' on rebuild — \
855                     the rows remain readable via the in-memory entity store but \
856                     are absent from the on-disk B-tree until they are rewritten"
857                );
858            }
859
860            if !records.is_empty() {
861                btree.bulk_insert_sorted(&records).map_err(|e| {
862                    StoreError::Io(std::io::Error::other(format!(
863                        "B-tree bulk rebuild error for '{name}': {e}"
864                    )))
865                })?;
866            }
867
868            collection_roots.push((name.clone(), btree.root_page_id()));
869        }
870
871        // Write collection metadata to page 1
872        let mut meta_data = Vec::with_capacity(4096);
873
874        let format_version = STORE_VERSION_V9;
875        self.set_format_version(format_version);
876
877        // Metadata header: magic + version + collection count
878        meta_data.extend_from_slice(METADATA_MAGIC);
879        meta_data.extend_from_slice(&format_version.to_le_bytes());
880        meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
881
882        // Write each collection's name and B-tree root page
883        for (name, root_page) in &collection_roots {
884            // Name length
885            meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
886            // Name
887            meta_data.extend_from_slice(name.as_bytes());
888            // Root page ID from actual B-tree
889            meta_data.extend_from_slice(&root_page.to_le_bytes());
890        }
891
892        // Write cross-reference metadata
893        let cross_refs = self.cross_refs.read();
894        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
895        meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
896        for (source_id, refs) in cross_refs.iter() {
897            for (target_id, ref_type, collection) in refs {
898                meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
899                meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
900                meta_data.push(ref_type.to_byte());
901                meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
902                meta_data.extend_from_slice(collection.as_bytes());
903            }
904        }
905
906        // Build page 1 (+ overflow chain when needed) for the metadata blob.
907        let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
908            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
909
910        // Write metadata shadow FIRST (intact copy in case main write fails).
911        // The shadow is a no-op when `fold_pager_meta` is enabled.
912        pager
913            .write_meta_shadow(&meta_page)
914            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
915
916        // Write page
917        pager
918            .write_page(1, meta_page)
919            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
920
921        // Flush and fsync all pages to disk
922        pager
923            .sync()
924            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
925
926        if let Some(commit) = &self.commit {
927            commit.truncate().map_err(StoreError::Io)?;
928        }
929
930        Ok(())
931    }
932
933    /// Check if the store is using page-based persistence
934    pub fn is_paged(&self) -> bool {
935        self.pager.is_some()
936    }
937
938    /// Current root page for a collection's primary B-tree, if one has
939    /// been materialized in this store.
940    pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
941        self.btree_indices
942            .read()
943            .get(collection)
944            .map(|btree| btree.root_page_id())
945            .filter(|root| *root != 0)
946    }
947
948    /// Get the database file path (if using paged mode)
949    pub fn db_path(&self) -> Option<&Path> {
950        self.db_path.as_deref()
951    }
952}
953
954fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
955    let Some(metadata) = metadata else {
956        return Vec::new();
957    };
958    if metadata.is_empty() {
959        return Vec::new();
960    }
961
962    let mut entries: Vec<_> = metadata.iter().collect();
963    entries.sort_by_key(|(a, _)| *a);
964
965    let mut buf = Vec::new();
966    buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
967    for (key, value) in entries {
968        write_string(&mut buf, key);
969        write_metadata_value(&mut buf, value);
970    }
971    buf
972}
973
974fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
975    let mut pos = 0usize;
976    let count = read_u32(data, &mut pos)? as usize;
977    let mut metadata = Metadata::new();
978    for _ in 0..count {
979        let key = read_string(data, &mut pos)?;
980        let value = read_metadata_value(data, &mut pos)?;
981        metadata.set(key, value);
982    }
983    Ok(metadata)
984}
985
986fn write_string(buf: &mut Vec<u8>, value: &str) {
987    buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
988    buf.extend_from_slice(value.as_bytes());
989}
990
991fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
992    buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
993    buf.extend_from_slice(value);
994}
995
996fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
997    use crate::storage::unified::RefTarget;
998
999    match target {
1000        RefTarget::TableRow { table, row_id } => {
1001            buf.push(0);
1002            write_string(buf, table);
1003            buf.extend_from_slice(&row_id.to_le_bytes());
1004        }
1005        RefTarget::Node {
1006            collection,
1007            node_id,
1008        } => {
1009            buf.push(1);
1010            write_string(buf, collection);
1011            buf.extend_from_slice(&node_id.raw().to_le_bytes());
1012        }
1013        RefTarget::Edge {
1014            collection,
1015            edge_id,
1016        } => {
1017            buf.push(2);
1018            write_string(buf, collection);
1019            buf.extend_from_slice(&edge_id.raw().to_le_bytes());
1020        }
1021        RefTarget::Vector {
1022            collection,
1023            vector_id,
1024        } => {
1025            buf.push(3);
1026            write_string(buf, collection);
1027            buf.extend_from_slice(&vector_id.raw().to_le_bytes());
1028        }
1029        RefTarget::Entity {
1030            collection,
1031            entity_id,
1032        } => {
1033            buf.push(4);
1034            write_string(buf, collection);
1035            buf.extend_from_slice(&entity_id.raw().to_le_bytes());
1036        }
1037    }
1038}
1039
1040fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
1041    match value {
1042        MetadataValue::Null => buf.push(0),
1043        MetadataValue::Bool(v) => {
1044            buf.push(1);
1045            buf.push(u8::from(*v));
1046        }
1047        MetadataValue::Int(v) => {
1048            buf.push(2);
1049            buf.extend_from_slice(&v.to_le_bytes());
1050        }
1051        MetadataValue::Float(v) => {
1052            buf.push(3);
1053            buf.extend_from_slice(&v.to_le_bytes());
1054        }
1055        MetadataValue::String(v) => {
1056            buf.push(4);
1057            write_string(buf, v);
1058        }
1059        MetadataValue::Bytes(v) => {
1060            buf.push(5);
1061            write_bytes(buf, v);
1062        }
1063        MetadataValue::Array(values) => {
1064            buf.push(6);
1065            buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1066            for value in values {
1067                write_metadata_value(buf, value);
1068            }
1069        }
1070        MetadataValue::Object(values) => {
1071            buf.push(7);
1072            let mut entries: Vec<_> = values.iter().collect();
1073            entries.sort_by_key(|(a, _)| *a);
1074            buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
1075            for (key, value) in entries {
1076                write_string(buf, key);
1077                write_metadata_value(buf, value);
1078            }
1079        }
1080        MetadataValue::Timestamp(v) => {
1081            buf.push(8);
1082            buf.extend_from_slice(&v.to_le_bytes());
1083        }
1084        MetadataValue::Geo { lat, lon } => {
1085            buf.push(9);
1086            buf.extend_from_slice(&lat.to_le_bytes());
1087            buf.extend_from_slice(&lon.to_le_bytes());
1088        }
1089        MetadataValue::Reference(target) => {
1090            buf.push(10);
1091            write_ref_target(buf, target);
1092        }
1093        MetadataValue::References(targets) => {
1094            buf.push(11);
1095            buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
1096            for target in targets {
1097                write_ref_target(buf, target);
1098            }
1099        }
1100    }
1101}
1102
1103fn read_exact_slice<'a>(
1104    data: &'a [u8],
1105    pos: &mut usize,
1106    len: usize,
1107) -> Result<&'a [u8], StoreError> {
1108    if *pos + len > data.len() {
1109        return Err(StoreError::Serialization(
1110            "truncated metadata payload".to_string(),
1111        ));
1112    }
1113    let slice = &data[*pos..*pos + len];
1114    *pos += len;
1115    Ok(slice)
1116}
1117
1118fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
1119    let bytes = read_exact_slice(data, pos, 4)?;
1120    let mut raw = [0u8; 4];
1121    raw.copy_from_slice(bytes);
1122    Ok(u32::from_le_bytes(raw))
1123}
1124
1125fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
1126    let bytes = read_exact_slice(data, pos, 8)?;
1127    let mut raw = [0u8; 8];
1128    raw.copy_from_slice(bytes);
1129    Ok(u64::from_le_bytes(raw))
1130}
1131
1132fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
1133    let bytes = read_exact_slice(data, pos, 8)?;
1134    let mut raw = [0u8; 8];
1135    raw.copy_from_slice(bytes);
1136    Ok(i64::from_le_bytes(raw))
1137}
1138
1139fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
1140    let bytes = read_exact_slice(data, pos, 8)?;
1141    let mut raw = [0u8; 8];
1142    raw.copy_from_slice(bytes);
1143    Ok(f64::from_le_bytes(raw))
1144}
1145
1146fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
1147    let bytes = read_exact_slice(data, pos, 1)?;
1148    Ok(bytes[0])
1149}
1150
1151fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
1152    let len = read_u32(data, pos)? as usize;
1153    let bytes = read_exact_slice(data, pos, len)?;
1154    String::from_utf8(bytes.to_vec()).map_err(|err| StoreError::Serialization(err.to_string()))
1155}
1156
1157fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
1158    let len = read_u32(data, pos)? as usize;
1159    Ok(read_exact_slice(data, pos, len)?.to_vec())
1160}
1161
1162fn read_ref_target(
1163    data: &[u8],
1164    pos: &mut usize,
1165) -> Result<crate::storage::unified::RefTarget, StoreError> {
1166    use crate::storage::unified::RefTarget;
1167
1168    match read_u8(data, pos)? {
1169        0 => Ok(RefTarget::TableRow {
1170            table: read_string(data, pos)?,
1171            row_id: read_u64(data, pos)?,
1172        }),
1173        1 => Ok(RefTarget::Node {
1174            collection: read_string(data, pos)?,
1175            node_id: EntityId::new(read_u64(data, pos)?),
1176        }),
1177        2 => Ok(RefTarget::Edge {
1178            collection: read_string(data, pos)?,
1179            edge_id: EntityId::new(read_u64(data, pos)?),
1180        }),
1181        3 => Ok(RefTarget::Vector {
1182            collection: read_string(data, pos)?,
1183            vector_id: EntityId::new(read_u64(data, pos)?),
1184        }),
1185        4 => Ok(RefTarget::Entity {
1186            collection: read_string(data, pos)?,
1187            entity_id: EntityId::new(read_u64(data, pos)?),
1188        }),
1189        tag => Err(StoreError::Serialization(format!(
1190            "unknown metadata ref target tag {tag}"
1191        ))),
1192    }
1193}
1194
1195fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1196    match read_u8(data, pos)? {
1197        0 => Ok(MetadataValue::Null),
1198        1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1199        2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1200        3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1201        4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1202        5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1203        6 => {
1204            let count = read_u32(data, pos)? as usize;
1205            let mut values = Vec::with_capacity(count);
1206            for _ in 0..count {
1207                values.push(read_metadata_value(data, pos)?);
1208            }
1209            Ok(MetadataValue::Array(values))
1210        }
1211        7 => {
1212            let count = read_u32(data, pos)? as usize;
1213            let mut values = std::collections::HashMap::with_capacity(count);
1214            for _ in 0..count {
1215                let key = read_string(data, pos)?;
1216                let value = read_metadata_value(data, pos)?;
1217                values.insert(key, value);
1218            }
1219            Ok(MetadataValue::Object(values))
1220        }
1221        8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1222        9 => Ok(MetadataValue::Geo {
1223            lat: read_f64(data, pos)?,
1224            lon: read_f64(data, pos)?,
1225        }),
1226        10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1227        11 => {
1228            let count = read_u32(data, pos)? as usize;
1229            let mut targets = Vec::with_capacity(count);
1230            for _ in 0..count {
1231                targets.push(read_ref_target(data, pos)?);
1232            }
1233            Ok(MetadataValue::References(targets))
1234        }
1235        tag => Err(StoreError::Serialization(format!(
1236            "unknown metadata value tag {tag}"
1237        ))),
1238    }
1239}