Skip to main content

reddb_server/storage/unified/store/
impl_pages.rs

1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5// ── Pager-meta overflow chain (gh-477) ──────────────────────────────────────
6// When the serialized collection registry + cross-refs exceed a single page,
7// page 1 carries a native metadata-overflow header pointing at an overflow chain of
8// `PageType::Overflow` pages. Single-page metadata keeps the historical
9// bit-identical layout (`METADATA_MAGIC = "RDM2"` written directly at the
10// content offset).
11//
12// Page 1 (overflow form), starting at `HEADER_SIZE`:
13//   [0..4]   native metadata-overflow magic
14//   [4..8]   format_version (u32, mirrors inner payload version for debug)
15//   [8..12]  total_payload_bytes (u32)
16//   [12..16] next_overflow_page_id (u32, > 0)
17//   [16..]   first payload chunk (up to META_V3_FIRST_PAYLOAD_CAP bytes)
18//
19// Overflow continuation page, starting at `HEADER_SIZE`:
20//   [0..4]   next_overflow_page_id (u32, 0 if last)
21//   [4..8]   chunk_bytes (u32)
22//   [8..]    chunk payload (up to META_V3_OVERFLOW_PAYLOAD_CAP bytes)
23const META_PAGE_CONTENT_CAP: usize =
24    crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE;
25const META_V3_PAGE1_HEADER: usize = reddb_file::METADATA_OVERFLOW_HEADER_BYTES;
26const META_V3_OVERFLOW_HEADER: usize = reddb_file::METADATA_OVERFLOW_CONTINUATION_HEADER_BYTES;
27const META_V3_FIRST_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_PAGE1_HEADER;
28const META_V3_OVERFLOW_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_OVERFLOW_HEADER;
29
30fn free_existing_overflow_chain(pager: &Pager) -> Result<(), PagerError> {
31    let cs = crate::storage::engine::HEADER_SIZE;
32    let page = match pager.read_page(1) {
33        Ok(p) => p,
34        Err(_) => return Ok(()),
35    };
36    let bytes = page.as_bytes();
37    if bytes.len() < cs + META_V3_PAGE1_HEADER {
38        return Ok(());
39    }
40    let Some(header) =
41        reddb_file::decode_native_metadata_overflow_header(&bytes[cs..]).map_err(|err| {
42            PagerError::InvalidDatabase(format!("invalid metadata overflow header: {err}"))
43        })?
44    else {
45        return Ok(());
46    };
47    let mut next = header.next_overflow_page_id;
48    while next != 0 {
49        let ov = match pager.read_page(next) {
50            Ok(p) => p,
51            Err(_) => break,
52        };
53        let ob = ov.as_bytes();
54        let nn = match reddb_file::decode_native_metadata_overflow_continuation_header(&ob[cs..]) {
55            Ok(header) => header.next_overflow_page_id,
56            Err(_) => 0,
57        };
58        let _ = pager.free_page(next);
59        next = nn;
60    }
61    Ok(())
62}
63
64fn build_meta_page1_with_overflow(
65    pager: &Pager,
66    meta_data: &[u8],
67) -> Result<crate::storage::engine::Page, PagerError> {
68    use crate::storage::engine::{Page, PageType, HEADER_SIZE};
69    free_existing_overflow_chain(pager)?;
70
71    let mut page1 = Page::new(PageType::Header, 1);
72    let cs = HEADER_SIZE;
73
74    if meta_data.len() <= META_PAGE_CONTENT_CAP {
75        // Single-page: bit-identical to the historical layout.
76        let buf = page1.as_bytes_mut();
77        buf[cs..cs + meta_data.len()].copy_from_slice(meta_data);
78        return Ok(page1);
79    }
80
81    // Multi-page overflow form. Split the inner payload into the first chunk
82    // (held on page 1) followed by zero-or-more continuation chunks chained
83    // through `PageType::Overflow` pages.
84    let first_chunk = &meta_data[..META_V3_FIRST_PAYLOAD_CAP];
85    let mut tail = &meta_data[META_V3_FIRST_PAYLOAD_CAP..];
86    let mut chunks: Vec<&[u8]> = Vec::new();
87    while !tail.is_empty() {
88        let take = tail.len().min(META_V3_OVERFLOW_PAYLOAD_CAP);
89        chunks.push(&tail[..take]);
90        tail = &tail[take..];
91    }
92
93    let mut overflow_pages: Vec<Page> = Vec::with_capacity(chunks.len());
94    let mut overflow_ids: Vec<u32> = Vec::with_capacity(chunks.len());
95    for _ in 0..chunks.len() {
96        let pg = pager.allocate_page(PageType::Overflow)?;
97        overflow_ids.push(pg.page_id());
98        overflow_pages.push(pg);
99    }
100
101    for i in 0..chunks.len() {
102        let next = if i + 1 < chunks.len() {
103            overflow_ids[i + 1]
104        } else {
105            0u32
106        };
107        let len = chunks[i].len() as u32;
108        let buf = overflow_pages[i].as_bytes_mut();
109        reddb_file::encode_native_metadata_overflow_continuation_header(
110            &mut buf[cs..cs + META_V3_OVERFLOW_HEADER],
111            reddb_file::NativeMetadataOverflowContinuationHeader {
112                next_overflow_page_id: next,
113                chunk_bytes: len,
114            },
115        )
116        .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
117        buf[cs + 8..cs + 8 + chunks[i].len()].copy_from_slice(chunks[i]);
118    }
119    for (idx, page) in overflow_pages.into_iter().enumerate() {
120        let id = overflow_ids[idx];
121        pager.write_page(id, page)?;
122    }
123
124    // Mirror the inner format_version for debug-friendly hex dumps.
125    let format_version = reddb_file::decode_native_paged_metadata_header(meta_data)
126        .ok()
127        .flatten()
128        .map_or(0, |header| header.format_version);
129
130    let buf = page1.as_bytes_mut();
131    reddb_file::encode_native_metadata_overflow_header(
132        &mut buf[cs..cs + META_V3_PAGE1_HEADER],
133        reddb_file::NativeMetadataOverflowHeader {
134            format_version,
135            total_payload_bytes: meta_data.len() as u32,
136            next_overflow_page_id: overflow_ids[0],
137        },
138    )
139    .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
140    buf[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_chunk.len()]
141        .copy_from_slice(first_chunk);
142
143    Ok(page1)
144}
145
146/// Assemble the full metadata payload from page 1 (plus its overflow chain
147/// when the native overflow wrapper is present). Returns the bytes that the
148/// metadata parser would see starting from the content offset of page 1.
149/// Single-page metadata returns the raw page content (including trailing
150/// zero-pad), so the legacy parser sees the same bytes it always saw.
151fn read_meta_payload(pager: &Pager) -> Option<Vec<u8>> {
152    let cs = crate::storage::engine::HEADER_SIZE;
153    let meta_page = pager
154        .read_page(1)
155        .or_else(|_| pager.recover_meta_from_shadow())
156        .ok()?;
157    let bytes = meta_page.as_bytes();
158    if bytes.len() < cs + 4 {
159        return Some(bytes.get(cs..).unwrap_or(&[]).to_vec());
160    }
161    let header = match reddb_file::decode_native_metadata_overflow_header(&bytes[cs..]).ok()? {
162        Some(header) => header,
163        None => {
164            return Some(bytes[cs..].to_vec());
165        }
166    };
167    if bytes.len() < cs + META_V3_PAGE1_HEADER {
168        return None;
169    }
170    let total = header.total_payload_bytes as usize;
171    let mut next = header.next_overflow_page_id;
172    let mut payload: Vec<u8> = Vec::with_capacity(total);
173    let first_take = total.min(META_V3_FIRST_PAYLOAD_CAP);
174    payload.extend_from_slice(
175        &bytes[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_take],
176    );
177    while next != 0 && payload.len() < total {
178        let ov = pager.read_page(next).ok()?;
179        let ob = ov.as_bytes();
180        if ob.len() < cs + META_V3_OVERFLOW_HEADER {
181            return None;
182        }
183        let continuation =
184            reddb_file::decode_native_metadata_overflow_continuation_header(&ob[cs..]).ok()?;
185        let nn = continuation.next_overflow_page_id;
186        let len = continuation.chunk_bytes as usize;
187        let remaining = total - payload.len();
188        let take = len.min(remaining).min(META_V3_OVERFLOW_PAYLOAD_CAP);
189        payload.extend_from_slice(
190            &ob[cs + META_V3_OVERFLOW_HEADER..cs + META_V3_OVERFLOW_HEADER + take],
191        );
192        next = nn;
193    }
194    Some(payload)
195}
196
197impl UnifiedStore {
198    pub(crate) fn mark_paged_registry_dirty(&self) {
199        self.paged_registry_dirty.store(true, Ordering::Release);
200    }
201
202    /// Get (or lazily create) the per-collection B-tree under a *read*
203    /// lock whenever possible. Returns a cloned `Arc<BTree>` so callers
204    /// can mutate the tree without holding the outer map's RwLock —
205    /// previously every insert serialised on `btree_indices.write()`,
206    /// costing ~60% of the concurrent-insert throughput ceiling.
207    pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
208        let pager = self.pager.as_ref()?;
209        if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
210            return Some(btree);
211        }
212        let mut write = self.btree_indices.write();
213        let btree = write
214            .entry(collection.to_string())
215            .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
216            .clone();
217        Some(btree)
218    }
219
220    pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
221        let Some(pager) = &self.pager else {
222            return Ok(());
223        };
224
225        if self.paged_registry_dirty.load(Ordering::Acquire) {
226            self.flush_paged_registry()?;
227            self.paged_registry_dirty.store(false, Ordering::Release);
228            return Ok(());
229        }
230
231        pager
232            .flush()
233            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
234    }
235
236    pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
237        let Some(pager) = &self.pager else {
238            return Ok(());
239        };
240
241        match pager.read_page(1) {
242            Ok(_) => {}
243            Err(PagerError::PageNotFound(_)) => {
244                let meta_page = pager
245                    .allocate_page(crate::storage::engine::PageType::Header)
246                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
247                pager
248                    .write_page(meta_page.page_id(), meta_page)
249                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
250            }
251            Err(e) => {
252                return Err(StoreError::Io(std::io::Error::other(e.to_string())));
253            }
254        }
255
256        let format_version = STORE_VERSION_V9;
257        self.set_format_version(format_version);
258
259        let collections = self.collections.read();
260        let btree_indices = self.btree_indices.read();
261        let mut collection_roots = Vec::with_capacity(collections.len());
262        for (name, _) in collections.iter() {
263            let root_page = btree_indices
264                .get(name)
265                .map_or(0, |btree| btree.root_page_id());
266            collection_roots.push((name.clone(), root_page));
267        }
268        drop(btree_indices);
269        drop(collections);
270
271        let mut meta_data = Vec::with_capacity(4096);
272        reddb_file::encode_native_paged_metadata_header(
273            &mut meta_data,
274            reddb_file::NativePagedMetadataHeader {
275                format_version,
276                collection_count: collection_roots.len() as u32,
277            },
278        );
279        for (name, root_page) in &collection_roots {
280            reddb_file::encode_native_paged_collection_root(&mut meta_data, name, *root_page);
281        }
282
283        let cross_refs = self.cross_refs.read();
284        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
285        meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
286        for (source_id, refs) in cross_refs.iter() {
287            for (target_id, ref_type, collection) in refs {
288                reddb_file::encode_native_paged_cross_ref(
289                    &mut meta_data,
290                    source_id.raw(),
291                    target_id.raw(),
292                    ref_type.to_byte(),
293                    collection,
294                );
295            }
296        }
297
298        let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
299            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
300
301        pager
302            .write_meta_shadow(&meta_page)
303            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
304        pager
305            .write_page(1, meta_page)
306            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
307        pager
308            .flush()
309            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
310
311        Ok(())
312    }
313
314    /// Get a reference to the underlying pager (if in paged mode).
315    pub fn pager(&self) -> Option<&Arc<Pager>> {
316        self.pager.as_ref()
317    }
318
319    /// Borrow the immutable store configuration. Runtime hooks (e.g. the
320    /// `auto_index_id` first-insert hook in `MutationEngine`) read knobs
321    /// off this struct without going through the legacy global config tree.
322    pub fn config(&self) -> &UnifiedStoreConfig {
323        &self.config
324    }
325
326    pub fn with_config(config: UnifiedStoreConfig) -> Self {
327        Self {
328            config,
329            format_version: AtomicU32::new(STORE_VERSION_V9),
330            next_entity_id: AtomicU64::new(1),
331            collections: RwLock::new(HashMap::new()),
332            cross_refs: RwLock::new(HashMap::new()),
333            reverse_refs: RwLock::new(HashMap::new()),
334            pager: None,
335            db_path: None,
336            btree_indices: RwLock::new(HashMap::new()),
337            context_index: ContextIndex::new(),
338            entity_cache: EntityCache::new(),
339            graph_label_index: RwLock::new(HashMap::new()),
340            paged_registry_dirty: AtomicBool::new(false),
341            commit: None,
342            unindex_cross_refs_fast_path: AtomicU64::new(0),
343            replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
344        }
345    }
346
347    /// Open or create a page-based database
348    ///
349    /// This uses the page engine for ACID durability with B-tree indices.
350    /// The database file uses 4KB pages with checksums and efficient caching.
351    ///
352    /// # Arguments
353    ///
354    /// * `path` - Path to the database file (e.g., "data.rdb")
355    ///
356    /// # Example
357    ///
358    /// ```rust,ignore
359    /// let store = UnifiedStore::open("security.rdb")?;
360    /// store.create_collection("hosts")?;
361    /// // ... operations ...
362    /// store.persist()?; // Flush to disk
363    /// ```
364    pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
365        Self::open_with_config(path, UnifiedStoreConfig::default())
366    }
367
368    pub fn open_with_config(
369        path: impl AsRef<Path>,
370        config: UnifiedStoreConfig,
371    ) -> Result<Self, StoreError> {
372        let path = path.as_ref();
373        let mut pager_config = PagerConfig::default();
374        // Tunables via env — experimental, used by the benchmark harness
375        // to compare durability profiles head-to-head with Postgres.
376        // REDDB_DOUBLE_WRITE=0 requests skipping the double-write buffer,
377        // which otherwise adds two fsyncs per pager flush (one on DWB, one
378        // on the main file). The pager honors this only when the actual
379        // data file is proven to live on a CoW filesystem with atomic page
380        // writes; otherwise it fails closed and keeps DWB enabled.
381        if matches!(
382            std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
383            Some("0") | Some("false") | Some("off")
384        ) {
385            pager_config.double_write = false;
386        }
387        let pager = Pager::open(path, pager_config)
388            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
389
390        let wal_path = reddb_file::layout::unified_wal_path(path);
391        let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
392            Some(Arc::new(
393                StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
394                    .map_err(StoreError::Io)?,
395            ))
396        } else {
397            None
398        };
399
400        let store = Self {
401            config,
402            format_version: AtomicU32::new(STORE_VERSION_V9),
403            next_entity_id: AtomicU64::new(1),
404            collections: RwLock::new(HashMap::new()),
405            cross_refs: RwLock::new(HashMap::new()),
406            reverse_refs: RwLock::new(HashMap::new()),
407            pager: Some(Arc::new(pager)),
408            db_path: Some(path.to_path_buf()),
409            btree_indices: RwLock::new(HashMap::new()),
410            context_index: ContextIndex::new(),
411            entity_cache: EntityCache::new(),
412            graph_label_index: RwLock::new(HashMap::new()),
413            paged_registry_dirty: AtomicBool::new(false),
414            commit,
415            unindex_cross_refs_fast_path: AtomicU64::new(0),
416            replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
417        };
418
419        // Load existing data from pages if database exists
420        store.load_from_pages()?;
421        if let Some(commit) = &store.commit {
422            commit.replay_into(&store).map_err(StoreError::Io)?;
423        }
424        store.recover_operational_manifest()?;
425
426        Ok(store)
427    }
428
429    pub(crate) fn recover_operational_manifest(&self) -> Result<(), StoreError> {
430        let Some(path) = &self.db_path else {
431            return Ok(());
432        };
433        let mut collections = self.list_collections();
434        collections.sort();
435        let pending_drops =
436            crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
437                .recover_or_bootstrap(&collections)
438                .map_err(StoreError::Io)?;
439        for name in pending_drops {
440            if self.get_collection(&name).is_some() {
441                self.drop_collection(&name)?;
442            }
443        }
444        Ok(())
445    }
446
447    pub(crate) fn publish_operational_collection_create(
448        &self,
449        name: &str,
450    ) -> Result<(), StoreError> {
451        let Some(path) = &self.db_path else {
452            return Ok(());
453        };
454        crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
455            .create_collection(name)
456            .map_err(StoreError::Io)
457    }
458
459    pub(crate) fn publish_operational_collection_pending_drop(
460        &self,
461        name: &str,
462    ) -> Result<(), StoreError> {
463        let Some(path) = &self.db_path else {
464            return Ok(());
465        };
466        crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
467            .begin_drop_collection(name)
468            .map_err(StoreError::Io)
469    }
470
471    pub(crate) fn publish_operational_collection_drop_finished(
472        &self,
473        name: &str,
474    ) -> Result<(), StoreError> {
475        let Some(path) = &self.db_path else {
476            return Ok(());
477        };
478        crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
479            .finish_drop_collection(name)
480            .map_err(StoreError::Io)
481    }
482
483    /// Load data from page-based storage
484    ///
485    /// Reads the B-tree indices and reconstructs collections from pages.
486    fn load_from_pages(&self) -> Result<(), StoreError> {
487        let pager = match &self.pager {
488            Some(p) => p,
489            None => return Ok(()), // No pager, nothing to load
490        };
491
492        // Get page count
493        let page_count = pager.page_count().map_err(|e| {
494            StoreError::Io(std::io::Error::other(format!(
495                "failed to read page count: {}",
496                e
497            )))
498        })?;
499        if page_count <= 1 {
500            // Empty database (only header page)
501            return Ok(());
502        }
503
504        // Read metadata starting from page 1 (collections registry). The
505        // helper transparently follows the `RDM3` overflow chain when the
506        // metadata blob spans multiple pages and falls back to the legacy
507        // `<data>-meta` shadow when page 1 itself is corrupted.
508        if let Some(content_vec) = read_meta_payload(pager) {
509            let content: &[u8] = &content_vec;
510            if content.len() >= 4 {
511                let mut pos = 0;
512                let mut format_version = STORE_VERSION_V1;
513
514                let collection_count = if let Some(header) =
515                    reddb_file::decode_native_paged_metadata_header(content)
516                        .map_err(|err| StoreError::Serialization(err.to_string()))?
517                {
518                    format_version = header.format_version;
519                    pos += reddb_file::METADATA_HEADER_BYTES;
520                    header.collection_count as usize
521                } else {
522                    let count = u32::from_le_bytes([
523                        content[pos],
524                        content[pos + 1],
525                        content[pos + 2],
526                        content[pos + 3],
527                    ]) as usize;
528                    pos += 4;
529                    count
530                };
531
532                self.set_format_version(format_version);
533
534                if pos > content.len() {
535                    return Ok(());
536                }
537
538                // Read collection names and their B-tree root pages
539                for _ in 0..collection_count {
540                    if let Ok(root) =
541                        reddb_file::decode_native_paged_collection_root(content, &mut pos)
542                    {
543                        // Root page ID for this collection's B-tree
544                        let root_page = root.root_page;
545                        let name = root.collection;
546
547                        // Hydrate the collection in memory only. Loading must
548                        // not emit WAL entries or rewrite the on-disk registry
549                        // before the existing B-tree roots are attached.
550                        let _ = self.create_collection_in_memory(&name);
551
552                        // Load B-tree with root page if it exists
553                        if root_page > 0 {
554                            let btree = BTree::with_root(Arc::clone(pager), root_page);
555
556                            // Load all entities from B-tree into the collection
557                            if let Ok(mut cursor) = btree.cursor_first() {
558                                let manager = self.get_collection(&name);
559                                while let Ok(Some((key, value))) = cursor.next() {
560                                    // Deserialize entity from value bytes
561                                    if let Ok((entity, metadata)) = Self::deserialize_entity_record(
562                                        &value,
563                                        self.format_version(),
564                                    ) {
565                                        if let Some(m) = &manager {
566                                            let id = entity.id;
567                                            if let EntityKind::TableRow { row_id, .. } =
568                                                &entity.kind
569                                            {
570                                                m.register_row_id(*row_id);
571                                            }
572                                            self.context_index.index_entity(&name, &entity);
573                                            let _ = m.insert(entity.clone());
574                                            if let Some(metadata) = metadata {
575                                                let _ = m.set_metadata(id, metadata);
576                                            }
577                                            self.register_entity_id(id);
578                                            if self.config.auto_index_refs {
579                                                self.index_cross_refs(&entity, &name)?;
580                                            }
581                                        }
582                                    }
583                                }
584                            }
585
586                            // Store the B-tree for future lookups
587                            self.btree_indices.write().insert(name, Arc::new(btree));
588                        }
589                    } else {
590                        break;
591                    }
592                }
593
594                if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
595                    let cross_ref_count = u32::from_le_bytes([
596                        content[pos],
597                        content[pos + 1],
598                        content[pos + 2],
599                        content[pos + 3],
600                    ]) as usize;
601                    pos += 4;
602
603                    for _ in 0..cross_ref_count {
604                        let Ok(cross_ref) =
605                            reddb_file::decode_native_paged_cross_ref(content, &mut pos)
606                        else {
607                            break;
608                        };
609                        let source_id = EntityId::new(cross_ref.source_id);
610                        let target_id = EntityId::new(cross_ref.target_id);
611                        let ref_type = RefType::from_byte(cross_ref.ref_type);
612                        let target_collection = cross_ref.target_collection;
613
614                        self.cross_refs.write().entry(source_id).or_default().push((
615                            target_id,
616                            ref_type,
617                            target_collection.clone(),
618                        ));
619
620                        if let Some((collection, mut entity)) = self.get_any(source_id) {
621                            let exists = entity.cross_refs().iter().any(|xref| {
622                                xref.target == target_id
623                                    && xref.ref_type == ref_type
624                                    && xref.target_collection == target_collection
625                            });
626                            if !exists {
627                                entity.cross_refs_mut().push(CrossRef::new(
628                                    source_id,
629                                    target_id,
630                                    target_collection.clone(),
631                                    ref_type,
632                                ));
633                                if let Some(manager) = self.get_collection(&collection) {
634                                    let _ = manager.update(entity);
635                                }
636                            }
637                        }
638                    }
639                }
640            }
641        }
642
643        if self.format_version() < STORE_VERSION_V9 {
644            self.set_format_version(STORE_VERSION_V9);
645        }
646
647        Ok(())
648    }
649
650    /// Deserialize an entity from binary bytes
651    pub(crate) fn deserialize_entity(
652        data: &[u8],
653        format_version: u32,
654    ) -> Result<UnifiedEntity, StoreError> {
655        let mut pos = 0;
656        Self::read_entity_binary(data, &mut pos, format_version)
657            .map_err(|e| StoreError::Serialization(e.to_string()))
658    }
659
660    /// Serialize an entity to binary bytes
661    pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
662        // Pre-allocate ~256 bytes to cover the typical 15-column
663        // typed row without any Vec growth. Bulk insert calls this
664        // millions of times per bench run; saving 2-3 reallocs per
665        // entity amortises.
666        let mut buf = Vec::with_capacity(256);
667        Self::write_entity_binary(&mut buf, entity, format_version);
668        buf
669    }
670
671    pub(crate) fn serialize_entity_record(
672        entity: &UnifiedEntity,
673        metadata: Option<&Metadata>,
674        format_version: u32,
675    ) -> Vec<u8> {
676        let entity_bytes = Self::serialize_entity(entity, format_version);
677        // Skip the intermediate metadata Vec when there's no metadata
678        // (common OLTP bulk-insert case): write a zero-length prefix
679        // directly into the record buffer. Only fall back to the old
680        // serialize_metadata() allocation when the caller actually
681        // has fields to persist.
682        let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
683        if has_meta {
684            let metadata_bytes = serialize_metadata(metadata);
685            reddb_file::encode_native_entity_record_frame(&entity_bytes, Some(&metadata_bytes))
686        } else {
687            reddb_file::encode_native_entity_record_frame(&entity_bytes, None)
688        }
689    }
690
691    pub(crate) fn deserialize_entity_record(
692        data: &[u8],
693        format_version: u32,
694    ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
695        let Some(frame) = reddb_file::decode_native_entity_record_frame(data)
696            .map_err(|err| StoreError::Serialization(err.to_string()))?
697        else {
698            return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
699        };
700
701        let entity = Self::deserialize_entity(frame.entity, format_version)?;
702        let metadata = if frame.metadata.is_empty() {
703            None
704        } else {
705            let metadata = deserialize_metadata(frame.metadata)?;
706            if metadata.is_empty() {
707                None
708            } else {
709                Some(metadata)
710            }
711        };
712
713        Ok((entity, metadata))
714    }
715
716    /// Persist all data to page-based storage
717    ///
718    /// Writes all entities to B-tree pages and flushes to disk.
719    /// This provides ACID durability guarantees.
720    pub fn persist(&self) -> Result<(), StoreError> {
721        let pager = match &self.pager {
722            Some(p) => p,
723            None => {
724                // No pager attached - use binary file fallback if path available
725                if let Some(path) = &self.db_path {
726                    return self
727                        .save_to_file(path)
728                        .map_err(|e| StoreError::Serialization(e.to_string()));
729                }
730                return Err(StoreError::Io(std::io::Error::other(
731                    "No pager or path configured for persistence",
732                )));
733            }
734        };
735
736        match pager.read_page(1) {
737            Ok(_) => {}
738            Err(PagerError::PageNotFound(_)) => {
739                let meta_page = pager
740                    .allocate_page(crate::storage::engine::PageType::Header)
741                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
742                pager
743                    .write_page(meta_page.page_id(), meta_page)
744                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
745            }
746            Err(e) => {
747                return Err(StoreError::Io(std::io::Error::other(e.to_string())));
748            }
749        }
750
751        if let Some(commit) = &self.commit {
752            commit.force_sync().map_err(StoreError::Io)?;
753        }
754
755        let collections = self.collections.read();
756        let mut btree_indices = self.btree_indices.write();
757
758        // Collect collection names and their B-tree root pages
759        let mut collection_roots: Vec<(String, u32)> = Vec::new();
760
761        // For each collection, rebuild the B-tree from the live manager state.
762        // A checkpoint must preserve deletes too, not just upsert the current rows.
763        for (name, manager) in collections.iter() {
764            let btree = btree_indices
765                .entry(name.clone())
766                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
767
768            let mut existing_keys = Vec::new();
769            if !btree.is_empty() {
770                let mut cursor = btree.cursor_first().map_err(|e| {
771                    StoreError::Io(std::io::Error::other(format!(
772                        "B-tree cursor error while rebuilding '{name}': {e}"
773                    )))
774                })?;
775                while let Some((key, _)) = cursor.next().map_err(|e| {
776                    StoreError::Io(std::io::Error::other(format!(
777                        "B-tree scan error while rebuilding '{name}': {e}"
778                    )))
779                })? {
780                    existing_keys.push(key);
781                }
782            }
783
784            for key in existing_keys {
785                btree.delete(&key).map_err(|e| {
786                    StoreError::Io(std::io::Error::other(format!(
787                        "B-tree delete error while rebuilding '{name}': {e}"
788                    )))
789                })?;
790            }
791
792            let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
793                .query_all(|_| true)
794                .into_iter()
795                .map(|entity| {
796                    let metadata = manager.get_metadata(entity.id);
797                    (
798                        entity.id.raw().to_be_bytes().to_vec(),
799                        Self::serialize_entity_record(
800                            &entity,
801                            metadata.as_ref(),
802                            self.format_version(),
803                        ),
804                    )
805                })
806                .collect();
807            records.sort_by(|left, right| left.0.cmp(&right.0));
808
809            // Slice G (#704): no per-row skip. Oversized values are
810            // spilled through the slice-E write ladder inside
811            // `bulk_insert_sorted` (inline → compressed inline →
812            // overflow chain). The only rejection is the hard
813            // `MAX_VALUE_SIZE` (256 MiB) ceiling, which surfaces as
814            // `ValueTooLarge` from the bulk path after the rest of
815            // the batch has landed.
816            if !records.is_empty() {
817                btree.bulk_insert_sorted(&records).map_err(|e| {
818                    StoreError::Io(std::io::Error::other(format!(
819                        "B-tree bulk rebuild error for '{name}': {e}"
820                    )))
821                })?;
822            }
823
824            collection_roots.push((name.clone(), btree.root_page_id()));
825        }
826
827        // Write collection metadata to page 1
828        let mut meta_data = Vec::with_capacity(4096);
829
830        let format_version = STORE_VERSION_V9;
831        self.set_format_version(format_version);
832
833        reddb_file::encode_native_paged_metadata_header(
834            &mut meta_data,
835            reddb_file::NativePagedMetadataHeader {
836                format_version,
837                collection_count: collection_roots.len() as u32,
838            },
839        );
840
841        // Write each collection's name and B-tree root page
842        for (name, root_page) in &collection_roots {
843            reddb_file::encode_native_paged_collection_root(&mut meta_data, name, *root_page);
844        }
845
846        // Write cross-reference metadata
847        let cross_refs = self.cross_refs.read();
848        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
849        meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
850        for (source_id, refs) in cross_refs.iter() {
851            for (target_id, ref_type, collection) in refs {
852                reddb_file::encode_native_paged_cross_ref(
853                    &mut meta_data,
854                    source_id.raw(),
855                    target_id.raw(),
856                    ref_type.to_byte(),
857                    collection,
858                );
859            }
860        }
861
862        // Build page 1 (+ overflow chain when needed) for the metadata blob.
863        let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
864            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
865
866        // Write metadata shadow FIRST (intact copy in case main write fails).
867        // The shadow is a no-op when `fold_pager_meta` is enabled.
868        pager
869            .write_meta_shadow(&meta_page)
870            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
871
872        // Write page
873        pager
874            .write_page(1, meta_page)
875            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
876
877        // Flush and fsync all pages to disk
878        pager
879            .sync()
880            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
881
882        if let Some(commit) = &self.commit {
883            commit.truncate().map_err(StoreError::Io)?;
884        }
885
886        Ok(())
887    }
888
889    /// Check if the store is using page-based persistence
890    pub fn is_paged(&self) -> bool {
891        self.pager.is_some()
892    }
893
894    /// Current root page for a collection's primary B-tree, if one has
895    /// been materialized in this store.
896    pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
897        self.btree_indices
898            .read()
899            .get(collection)
900            .map(|btree| btree.root_page_id())
901            .filter(|root| *root != 0)
902    }
903
904    /// Get the database file path (if using paged mode)
905    pub fn db_path(&self) -> Option<&Path> {
906        self.db_path.as_deref()
907    }
908}
909
910fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
911    let Some(metadata) = metadata else {
912        return Vec::new();
913    };
914    if metadata.is_empty() {
915        return Vec::new();
916    }
917
918    let mut entries: Vec<_> = metadata.iter().collect();
919    entries.sort_by_key(|(a, _)| *a);
920
921    let mut buf = Vec::new();
922    buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
923    for (key, value) in entries {
924        write_string(&mut buf, key);
925        write_metadata_value(&mut buf, value);
926    }
927    buf
928}
929
930fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
931    let mut pos = 0usize;
932    let count = read_u32(data, &mut pos)? as usize;
933    let mut metadata = Metadata::new();
934    for _ in 0..count {
935        let key = read_string(data, &mut pos)?;
936        let value = read_metadata_value(data, &mut pos)?;
937        metadata.set(key, value);
938    }
939    Ok(metadata)
940}
941
942fn write_string(buf: &mut Vec<u8>, value: &str) {
943    reddb_file::encode_native_len_prefixed_str(buf, value);
944}
945
946fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
947    reddb_file::encode_native_len_prefixed_bytes(buf, value);
948}
949
950fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
951    use crate::storage::unified::RefTarget;
952
953    match target {
954        RefTarget::TableRow { table, row_id } => {
955            buf.push(0);
956            write_string(buf, table);
957            buf.extend_from_slice(&row_id.to_le_bytes());
958        }
959        RefTarget::Node {
960            collection,
961            node_id,
962        } => {
963            buf.push(1);
964            write_string(buf, collection);
965            buf.extend_from_slice(&node_id.raw().to_le_bytes());
966        }
967        RefTarget::Edge {
968            collection,
969            edge_id,
970        } => {
971            buf.push(2);
972            write_string(buf, collection);
973            buf.extend_from_slice(&edge_id.raw().to_le_bytes());
974        }
975        RefTarget::Vector {
976            collection,
977            vector_id,
978        } => {
979            buf.push(3);
980            write_string(buf, collection);
981            buf.extend_from_slice(&vector_id.raw().to_le_bytes());
982        }
983        RefTarget::Entity {
984            collection,
985            entity_id,
986        } => {
987            buf.push(4);
988            write_string(buf, collection);
989            buf.extend_from_slice(&entity_id.raw().to_le_bytes());
990        }
991    }
992}
993
994fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
995    match value {
996        MetadataValue::Null => buf.push(0),
997        MetadataValue::Bool(v) => {
998            buf.push(1);
999            buf.push(u8::from(*v));
1000        }
1001        MetadataValue::Int(v) => {
1002            buf.push(2);
1003            buf.extend_from_slice(&v.to_le_bytes());
1004        }
1005        MetadataValue::Float(v) => {
1006            buf.push(3);
1007            buf.extend_from_slice(&v.to_le_bytes());
1008        }
1009        MetadataValue::String(v) => {
1010            buf.push(4);
1011            write_string(buf, v);
1012        }
1013        MetadataValue::Bytes(v) => {
1014            buf.push(5);
1015            write_bytes(buf, v);
1016        }
1017        MetadataValue::Array(values) => {
1018            buf.push(6);
1019            buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1020            for value in values {
1021                write_metadata_value(buf, value);
1022            }
1023        }
1024        MetadataValue::Object(values) => {
1025            buf.push(7);
1026            let mut entries: Vec<_> = values.iter().collect();
1027            entries.sort_by_key(|(a, _)| *a);
1028            buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
1029            for (key, value) in entries {
1030                write_string(buf, key);
1031                write_metadata_value(buf, value);
1032            }
1033        }
1034        MetadataValue::Timestamp(v) => {
1035            buf.push(8);
1036            buf.extend_from_slice(&v.to_le_bytes());
1037        }
1038        MetadataValue::Geo { lat, lon } => {
1039            buf.push(9);
1040            buf.extend_from_slice(&lat.to_le_bytes());
1041            buf.extend_from_slice(&lon.to_le_bytes());
1042        }
1043        MetadataValue::Reference(target) => {
1044            buf.push(10);
1045            write_ref_target(buf, target);
1046        }
1047        MetadataValue::References(targets) => {
1048            buf.push(11);
1049            buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
1050            for target in targets {
1051                write_ref_target(buf, target);
1052            }
1053        }
1054    }
1055}
1056
1057fn read_exact_slice<'a>(
1058    data: &'a [u8],
1059    pos: &mut usize,
1060    len: usize,
1061) -> Result<&'a [u8], StoreError> {
1062    if *pos + len > data.len() {
1063        return Err(StoreError::Serialization(
1064            "truncated metadata payload".to_string(),
1065        ));
1066    }
1067    let slice = &data[*pos..*pos + len];
1068    *pos += len;
1069    Ok(slice)
1070}
1071
1072fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
1073    let bytes = read_exact_slice(data, pos, 4)?;
1074    let mut raw = [0u8; 4];
1075    raw.copy_from_slice(bytes);
1076    Ok(u32::from_le_bytes(raw))
1077}
1078
1079fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
1080    let bytes = read_exact_slice(data, pos, 8)?;
1081    let mut raw = [0u8; 8];
1082    raw.copy_from_slice(bytes);
1083    Ok(u64::from_le_bytes(raw))
1084}
1085
1086fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
1087    let bytes = read_exact_slice(data, pos, 8)?;
1088    let mut raw = [0u8; 8];
1089    raw.copy_from_slice(bytes);
1090    Ok(i64::from_le_bytes(raw))
1091}
1092
1093fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
1094    let bytes = read_exact_slice(data, pos, 8)?;
1095    let mut raw = [0u8; 8];
1096    raw.copy_from_slice(bytes);
1097    Ok(f64::from_le_bytes(raw))
1098}
1099
1100fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
1101    let bytes = read_exact_slice(data, pos, 1)?;
1102    Ok(bytes[0])
1103}
1104
1105fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
1106    reddb_file::decode_native_len_prefixed_string(data, pos)
1107        .map_err(|err| StoreError::Serialization(err.to_string()))
1108}
1109
1110fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
1111    reddb_file::decode_native_len_prefixed_bytes(data, pos)
1112        .map(|bytes| bytes.to_vec())
1113        .map_err(|err| StoreError::Serialization(err.to_string()))
1114}
1115
1116fn read_ref_target(
1117    data: &[u8],
1118    pos: &mut usize,
1119) -> Result<crate::storage::unified::RefTarget, StoreError> {
1120    use crate::storage::unified::RefTarget;
1121
1122    match read_u8(data, pos)? {
1123        0 => Ok(RefTarget::TableRow {
1124            table: read_string(data, pos)?,
1125            row_id: read_u64(data, pos)?,
1126        }),
1127        1 => Ok(RefTarget::Node {
1128            collection: read_string(data, pos)?,
1129            node_id: EntityId::new(read_u64(data, pos)?),
1130        }),
1131        2 => Ok(RefTarget::Edge {
1132            collection: read_string(data, pos)?,
1133            edge_id: EntityId::new(read_u64(data, pos)?),
1134        }),
1135        3 => Ok(RefTarget::Vector {
1136            collection: read_string(data, pos)?,
1137            vector_id: EntityId::new(read_u64(data, pos)?),
1138        }),
1139        4 => Ok(RefTarget::Entity {
1140            collection: read_string(data, pos)?,
1141            entity_id: EntityId::new(read_u64(data, pos)?),
1142        }),
1143        tag => Err(StoreError::Serialization(format!(
1144            "unknown metadata ref target tag {tag}"
1145        ))),
1146    }
1147}
1148
1149fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1150    match read_u8(data, pos)? {
1151        0 => Ok(MetadataValue::Null),
1152        1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1153        2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1154        3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1155        4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1156        5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1157        6 => {
1158            let count = read_u32(data, pos)? as usize;
1159            let mut values = Vec::with_capacity(count);
1160            for _ in 0..count {
1161                values.push(read_metadata_value(data, pos)?);
1162            }
1163            Ok(MetadataValue::Array(values))
1164        }
1165        7 => {
1166            let count = read_u32(data, pos)? as usize;
1167            let mut values = std::collections::HashMap::with_capacity(count);
1168            for _ in 0..count {
1169                let key = read_string(data, pos)?;
1170                let value = read_metadata_value(data, pos)?;
1171                values.insert(key, value);
1172            }
1173            Ok(MetadataValue::Object(values))
1174        }
1175        8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1176        9 => Ok(MetadataValue::Geo {
1177            lat: read_f64(data, pos)?,
1178            lon: read_f64(data, pos)?,
1179        }),
1180        10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1181        11 => {
1182            let count = read_u32(data, pos)? as usize;
1183            let mut targets = Vec::with_capacity(count);
1184            for _ in 0..count {
1185                targets.push(read_ref_target(data, pos)?);
1186            }
1187            Ok(MetadataValue::References(targets))
1188        }
1189        tag => Err(StoreError::Serialization(format!(
1190            "unknown metadata value tag {tag}"
1191        ))),
1192    }
1193}