Skip to main content

reddb_server/storage/unified/store/
impl_pages.rs

1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5// ── Pager-meta overflow chain (gh-477) ──────────────────────────────────────
6// When the serialized collection registry + cross-refs exceed a single page,
7// page 1 carries a native metadata-overflow header pointing at an overflow chain of
8// `PageType::Overflow` pages. Single-page metadata keeps the historical
9// bit-identical layout (`METADATA_MAGIC = "RDM2"` written directly at the
10// content offset).
11//
12// Page 1 (overflow form), starting at `HEADER_SIZE`:
13//   [0..4]   native metadata-overflow magic
14//   [4..8]   format_version (u32, mirrors inner payload version for debug)
15//   [8..12]  total_payload_bytes (u32)
16//   [12..16] next_overflow_page_id (u32, > 0)
17//   [16..]   first payload chunk (up to META_V3_FIRST_PAYLOAD_CAP bytes)
18//
19// Overflow continuation page, starting at `HEADER_SIZE`:
20//   [0..4]   next_overflow_page_id (u32, 0 if last)
21//   [4..8]   chunk_bytes (u32)
22//   [8..]    chunk payload (up to META_V3_OVERFLOW_PAYLOAD_CAP bytes)
23const META_PAGE_CONTENT_CAP: usize =
24    crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE;
25const META_V3_PAGE1_HEADER: usize = reddb_file::METADATA_OVERFLOW_HEADER_BYTES;
26const META_V3_OVERFLOW_HEADER: usize = reddb_file::METADATA_OVERFLOW_CONTINUATION_HEADER_BYTES;
27const META_V3_FIRST_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_PAGE1_HEADER;
28const META_V3_OVERFLOW_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_OVERFLOW_HEADER;
29
30fn free_existing_overflow_chain(pager: &Pager) -> Result<(), PagerError> {
31    let cs = crate::storage::engine::HEADER_SIZE;
32    let page = match pager.read_page(1) {
33        Ok(p) => p,
34        Err(_) => return Ok(()),
35    };
36    let bytes = page.as_bytes();
37    if bytes.len() < cs + META_V3_PAGE1_HEADER {
38        return Ok(());
39    }
40    let Some(header) =
41        reddb_file::decode_native_metadata_overflow_header(&bytes[cs..]).map_err(|err| {
42            PagerError::InvalidDatabase(format!("invalid metadata overflow header: {err}"))
43        })?
44    else {
45        return Ok(());
46    };
47    let mut next = header.next_overflow_page_id;
48    while next != 0 {
49        let ov = match pager.read_page(next) {
50            Ok(p) => p,
51            Err(_) => break,
52        };
53        let ob = ov.as_bytes();
54        let nn = match reddb_file::decode_native_metadata_overflow_continuation_header(&ob[cs..]) {
55            Ok(header) => header.next_overflow_page_id,
56            Err(_) => 0,
57        };
58        let _ = pager.free_page(next);
59        next = nn;
60    }
61    Ok(())
62}
63
64fn build_meta_page1_with_overflow(
65    pager: &Pager,
66    meta_data: &[u8],
67) -> Result<crate::storage::engine::Page, PagerError> {
68    use crate::storage::engine::{Page, PageType, HEADER_SIZE};
69    free_existing_overflow_chain(pager)?;
70
71    let mut page1 = Page::new(PageType::Header, 1);
72    let cs = HEADER_SIZE;
73
74    if meta_data.len() <= META_PAGE_CONTENT_CAP {
75        // Single-page: bit-identical to the historical layout.
76        let buf = page1.as_bytes_mut();
77        buf[cs..cs + meta_data.len()].copy_from_slice(meta_data);
78        return Ok(page1);
79    }
80
81    // Multi-page overflow form. Split the inner payload into the first chunk
82    // (held on page 1) followed by zero-or-more continuation chunks chained
83    // through `PageType::Overflow` pages.
84    let first_chunk = &meta_data[..META_V3_FIRST_PAYLOAD_CAP];
85    let mut tail = &meta_data[META_V3_FIRST_PAYLOAD_CAP..];
86    let mut chunks: Vec<&[u8]> = Vec::new();
87    while !tail.is_empty() {
88        let take = tail.len().min(META_V3_OVERFLOW_PAYLOAD_CAP);
89        chunks.push(&tail[..take]);
90        tail = &tail[take..];
91    }
92
93    let mut overflow_pages: Vec<Page> = Vec::with_capacity(chunks.len());
94    let mut overflow_ids: Vec<u32> = Vec::with_capacity(chunks.len());
95    for _ in 0..chunks.len() {
96        let pg = pager.allocate_page(PageType::Overflow)?;
97        overflow_ids.push(pg.page_id());
98        overflow_pages.push(pg);
99    }
100
101    for i in 0..chunks.len() {
102        let next = if i + 1 < chunks.len() {
103            overflow_ids[i + 1]
104        } else {
105            0u32
106        };
107        let len = chunks[i].len() as u32;
108        let buf = overflow_pages[i].as_bytes_mut();
109        reddb_file::encode_native_metadata_overflow_continuation_header(
110            &mut buf[cs..cs + META_V3_OVERFLOW_HEADER],
111            reddb_file::NativeMetadataOverflowContinuationHeader {
112                next_overflow_page_id: next,
113                chunk_bytes: len,
114            },
115        )
116        .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
117        buf[cs + 8..cs + 8 + chunks[i].len()].copy_from_slice(chunks[i]);
118    }
119    for (idx, page) in overflow_pages.into_iter().enumerate() {
120        let id = overflow_ids[idx];
121        pager.write_page(id, page)?;
122    }
123
124    // Mirror the inner format_version for debug-friendly hex dumps.
125    let format_version = reddb_file::decode_native_paged_metadata_header(meta_data)
126        .ok()
127        .flatten()
128        .map_or(0, |header| header.format_version);
129
130    let buf = page1.as_bytes_mut();
131    reddb_file::encode_native_metadata_overflow_header(
132        &mut buf[cs..cs + META_V3_PAGE1_HEADER],
133        reddb_file::NativeMetadataOverflowHeader {
134            format_version,
135            total_payload_bytes: meta_data.len() as u32,
136            next_overflow_page_id: overflow_ids[0],
137        },
138    )
139    .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
140    buf[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_chunk.len()]
141        .copy_from_slice(first_chunk);
142
143    Ok(page1)
144}
145
146/// Assemble the full metadata payload from page 1 (plus its overflow chain
147/// when the native overflow wrapper is present). Returns the bytes that the
148/// metadata parser would see starting from the content offset of page 1.
149/// Single-page metadata returns the raw page content (including trailing
150/// zero-pad), so the legacy parser sees the same bytes it always saw.
151fn read_meta_payload(pager: &Pager) -> Option<Vec<u8>> {
152    let cs = crate::storage::engine::HEADER_SIZE;
153    let meta_page = pager
154        .read_page(1)
155        .or_else(|_| pager.recover_meta_from_shadow())
156        .ok()?;
157    let bytes = meta_page.as_bytes();
158    if bytes.len() < cs + 4 {
159        return Some(bytes.get(cs..).unwrap_or(&[]).to_vec());
160    }
161    let header = match reddb_file::decode_native_metadata_overflow_header(&bytes[cs..]).ok()? {
162        Some(header) => header,
163        None => {
164            return Some(bytes[cs..].to_vec());
165        }
166    };
167    if bytes.len() < cs + META_V3_PAGE1_HEADER {
168        return None;
169    }
170    let total = header.total_payload_bytes as usize;
171    let mut next = header.next_overflow_page_id;
172    let mut payload: Vec<u8> = Vec::with_capacity(total);
173    let first_take = total.min(META_V3_FIRST_PAYLOAD_CAP);
174    payload.extend_from_slice(
175        &bytes[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_take],
176    );
177    while next != 0 && payload.len() < total {
178        let ov = pager.read_page(next).ok()?;
179        let ob = ov.as_bytes();
180        if ob.len() < cs + META_V3_OVERFLOW_HEADER {
181            return None;
182        }
183        let continuation =
184            reddb_file::decode_native_metadata_overflow_continuation_header(&ob[cs..]).ok()?;
185        let nn = continuation.next_overflow_page_id;
186        let len = continuation.chunk_bytes as usize;
187        let remaining = total - payload.len();
188        let take = len.min(remaining).min(META_V3_OVERFLOW_PAYLOAD_CAP);
189        payload.extend_from_slice(
190            &ob[cs + META_V3_OVERFLOW_HEADER..cs + META_V3_OVERFLOW_HEADER + take],
191        );
192        next = nn;
193    }
194    Some(payload)
195}
196
197impl UnifiedStore {
198    pub(crate) fn mark_paged_registry_dirty(&self) {
199        self.paged_registry_dirty.store(true, Ordering::Release);
200    }
201
202    /// Get (or lazily create) the per-collection B-tree under a *read*
203    /// lock whenever possible. Returns a cloned `Arc<BTree>` so callers
204    /// can mutate the tree without holding the outer map's RwLock —
205    /// previously every insert serialised on `btree_indices.write()`,
206    /// costing ~60% of the concurrent-insert throughput ceiling.
207    pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
208        let pager = self.pager.as_ref()?;
209        if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
210            return Some(btree);
211        }
212        let mut write = self.btree_indices.write();
213        let btree = write
214            .entry(collection.to_string())
215            .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
216            .clone();
217        Some(btree)
218    }
219
220    pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
221        let Some(pager) = &self.pager else {
222            return Ok(());
223        };
224
225        if self.paged_registry_dirty.load(Ordering::Acquire) {
226            self.flush_paged_registry()?;
227            self.paged_registry_dirty.store(false, Ordering::Release);
228            return Ok(());
229        }
230
231        pager
232            .flush()
233            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
234    }
235
236    pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
237        let Some(pager) = &self.pager else {
238            return Ok(());
239        };
240
241        match pager.read_page(1) {
242            Ok(_) => {}
243            Err(PagerError::PageNotFound(_)) => {
244                let meta_page = pager
245                    .allocate_page(crate::storage::engine::PageType::Header)
246                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
247                pager
248                    .write_page(meta_page.page_id(), meta_page)
249                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
250            }
251            Err(e) => {
252                return Err(StoreError::Io(std::io::Error::other(e.to_string())));
253            }
254        }
255
256        let format_version = STORE_VERSION_V9;
257        self.set_format_version(format_version);
258
259        let collections = self.collections.read();
260        let btree_indices = self.btree_indices.read();
261        let mut collection_roots = Vec::with_capacity(collections.len());
262        for (name, _) in collections.iter() {
263            let root_page = btree_indices
264                .get(name)
265                .map_or(0, |btree| btree.root_page_id());
266            collection_roots.push((name.clone(), root_page));
267        }
268        drop(btree_indices);
269        drop(collections);
270
271        let mut meta_data = Vec::with_capacity(4096);
272        reddb_file::encode_native_paged_metadata_header(
273            &mut meta_data,
274            reddb_file::NativePagedMetadataHeader {
275                format_version,
276                collection_count: collection_roots.len() as u32,
277            },
278        );
279        for (name, root_page) in &collection_roots {
280            reddb_file::encode_native_paged_collection_root(&mut meta_data, name, *root_page);
281        }
282
283        let cross_refs = self.cross_refs.read();
284        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
285        meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
286        for (source_id, refs) in cross_refs.iter() {
287            for (target_id, ref_type, collection) in refs {
288                reddb_file::encode_native_paged_cross_ref(
289                    &mut meta_data,
290                    source_id.raw(),
291                    target_id.raw(),
292                    ref_type.to_byte(),
293                    collection,
294                );
295            }
296        }
297
298        let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
299            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
300
301        pager
302            .write_meta_shadow(&meta_page)
303            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
304        pager
305            .write_page(1, meta_page)
306            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
307        pager
308            .flush()
309            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
310
311        Ok(())
312    }
313
314    /// Get a reference to the underlying pager (if in paged mode).
315    pub fn pager(&self) -> Option<&Arc<Pager>> {
316        self.pager.as_ref()
317    }
318
319    /// Borrow the immutable store configuration. Runtime hooks (e.g. the
320    /// `auto_index_id` first-insert hook in `MutationEngine`) read knobs
321    /// off this struct without going through the legacy global config tree.
322    pub fn config(&self) -> &UnifiedStoreConfig {
323        &self.config
324    }
325
326    pub fn with_config(config: UnifiedStoreConfig) -> Self {
327        Self {
328            config,
329            format_version: AtomicU32::new(STORE_VERSION_V9),
330            next_entity_id: AtomicU64::new(1),
331            collections: RwLock::new(HashMap::new()),
332            cross_refs: RwLock::new(HashMap::new()),
333            reverse_refs: RwLock::new(HashMap::new()),
334            pager: None,
335            db_path: None,
336            btree_indices: RwLock::new(HashMap::new()),
337            context_index: ContextIndex::new(),
338            entity_cache: EntityCache::new(),
339            graph_label_index: RwLock::new(HashMap::new()),
340            paged_registry_dirty: AtomicBool::new(false),
341            commit: None,
342            unindex_cross_refs_fast_path: AtomicU64::new(0),
343            replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
344            aux_metadata: RwLock::new(Vec::new()),
345        }
346    }
347
348    /// Open or create a page-based database
349    ///
350    /// This uses the page engine for ACID durability with B-tree indices.
351    /// The database file uses 4KB pages with checksums and efficient caching.
352    ///
353    /// # Arguments
354    ///
355    /// * `path` - Path to the database file (e.g., "data.rdb")
356    ///
357    /// # Example
358    ///
359    /// ```rust,ignore
360    /// let store = UnifiedStore::open("security.rdb")?;
361    /// store.create_collection("hosts")?;
362    /// // ... operations ...
363    /// store.persist()?; // Flush to disk
364    /// ```
365    pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
366        Self::open_with_config(path, UnifiedStoreConfig::default())
367    }
368
369    pub fn open_with_config(
370        path: impl AsRef<Path>,
371        config: UnifiedStoreConfig,
372    ) -> Result<Self, StoreError> {
373        let path = path.as_ref();
374        let mut pager_config = PagerConfig::default();
375        // Tunables via env — experimental, used by the benchmark harness
376        // to compare durability profiles head-to-head with Postgres.
377        // REDDB_DOUBLE_WRITE=0 requests skipping the double-write buffer,
378        // which otherwise adds two fsyncs per pager flush (one on DWB, one
379        // on the main file). The pager honors this only when the actual
380        // data file is proven to live on a CoW filesystem with atomic page
381        // writes; otherwise it fails closed and keeps DWB enabled.
382        if matches!(
383            std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
384            Some("0") | Some("false") | Some("off")
385        ) {
386            pager_config.double_write = false;
387        }
388        let pager = Pager::open(path, pager_config)
389            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
390
391        let wal_path = reddb_file::layout::unified_wal_path(path);
392        let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
393            Some(Arc::new(
394                StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
395                    .map_err(StoreError::Io)?,
396            ))
397        } else {
398            None
399        };
400
401        let store = Self {
402            config,
403            format_version: AtomicU32::new(STORE_VERSION_V9),
404            next_entity_id: AtomicU64::new(1),
405            collections: RwLock::new(HashMap::new()),
406            cross_refs: RwLock::new(HashMap::new()),
407            reverse_refs: RwLock::new(HashMap::new()),
408            pager: Some(Arc::new(pager)),
409            db_path: Some(path.to_path_buf()),
410            btree_indices: RwLock::new(HashMap::new()),
411            context_index: ContextIndex::new(),
412            entity_cache: EntityCache::new(),
413            graph_label_index: RwLock::new(HashMap::new()),
414            paged_registry_dirty: AtomicBool::new(false),
415            commit,
416            unindex_cross_refs_fast_path: AtomicU64::new(0),
417            replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
418            aux_metadata: RwLock::new(Vec::new()),
419        };
420
421        // Load existing data from pages if database exists
422        store.load_from_pages()?;
423        if let Some(commit) = &store.commit {
424            commit.replay_into(&store).map_err(StoreError::Io)?;
425        }
426        store.recover_operational_manifest()?;
427
428        Ok(store)
429    }
430
431    pub(crate) fn recover_operational_manifest(&self) -> Result<(), StoreError> {
432        let Some(path) = &self.db_path else {
433            return Ok(());
434        };
435        let mut collections = self.list_collections();
436        collections.sort();
437        let pending_drops =
438            crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
439                .recover_or_bootstrap(&collections)
440                .map_err(StoreError::Io)?;
441        for name in pending_drops {
442            if self.get_collection(&name).is_some() {
443                self.drop_collection(&name)?;
444            }
445        }
446        Ok(())
447    }
448
449    pub(crate) fn publish_operational_collection_create(
450        &self,
451        name: &str,
452    ) -> Result<(), StoreError> {
453        let Some(path) = &self.db_path else {
454            return Ok(());
455        };
456        crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
457            .create_collection(name)
458            .map_err(StoreError::Io)
459    }
460
461    pub(crate) fn publish_operational_collection_pending_drop(
462        &self,
463        name: &str,
464    ) -> Result<(), StoreError> {
465        let Some(path) = &self.db_path else {
466            return Ok(());
467        };
468        crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
469            .begin_drop_collection(name)
470            .map_err(StoreError::Io)
471    }
472
473    pub(crate) fn publish_operational_collection_drop_finished(
474        &self,
475        name: &str,
476    ) -> Result<(), StoreError> {
477        let Some(path) = &self.db_path else {
478            return Ok(());
479        };
480        crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
481            .finish_drop_collection(name)
482            .map_err(StoreError::Io)
483    }
484
485    /// Load data from page-based storage
486    ///
487    /// Reads the B-tree indices and reconstructs collections from pages.
488    fn load_from_pages(&self) -> Result<(), StoreError> {
489        let pager = match &self.pager {
490            Some(p) => p,
491            None => return Ok(()), // No pager, nothing to load
492        };
493
494        // Get page count
495        let page_count = pager.page_count().map_err(|e| {
496            StoreError::Io(std::io::Error::other(format!(
497                "failed to read page count: {}",
498                e
499            )))
500        })?;
501        if page_count <= 1 {
502            // Empty database (only header page)
503            return Ok(());
504        }
505
506        // Read metadata starting from page 1 (collections registry). The
507        // helper transparently follows the `RDM3` overflow chain when the
508        // metadata blob spans multiple pages and falls back to the legacy
509        // `<data>-meta` shadow when page 1 itself is corrupted.
510        if let Some(content_vec) = read_meta_payload(pager) {
511            let content: &[u8] = &content_vec;
512            if content.len() >= 4 {
513                let mut pos = 0;
514                let mut format_version = STORE_VERSION_V1;
515
516                let collection_count = if let Some(header) =
517                    reddb_file::decode_native_paged_metadata_header(content)
518                        .map_err(|err| StoreError::Serialization(err.to_string()))?
519                {
520                    format_version = header.format_version;
521                    pos += reddb_file::METADATA_HEADER_BYTES;
522                    header.collection_count as usize
523                } else {
524                    let count = u32::from_le_bytes([
525                        content[pos],
526                        content[pos + 1],
527                        content[pos + 2],
528                        content[pos + 3],
529                    ]) as usize;
530                    pos += 4;
531                    count
532                };
533
534                self.set_format_version(format_version);
535
536                if pos > content.len() {
537                    return Ok(());
538                }
539
540                // Read collection names and their B-tree root pages
541                for _ in 0..collection_count {
542                    if let Ok(root) =
543                        reddb_file::decode_native_paged_collection_root(content, &mut pos)
544                    {
545                        // Root page ID for this collection's B-tree
546                        let root_page = root.root_page;
547                        let name = root.collection;
548
549                        // Hydrate the collection in memory only. Loading must
550                        // not emit WAL entries or rewrite the on-disk registry
551                        // before the existing B-tree roots are attached.
552                        let _ = self.create_collection_in_memory(&name);
553
554                        // Load B-tree with root page if it exists
555                        if root_page > 0 {
556                            let btree = BTree::with_root(Arc::clone(pager), root_page);
557
558                            // Load all entities from B-tree into the collection
559                            if let Ok(mut cursor) = btree.cursor_first() {
560                                let manager = self.get_collection(&name);
561                                while let Ok(Some((key, value))) = cursor.next() {
562                                    // Deserialize entity from value bytes
563                                    if let Ok((entity, metadata)) = Self::deserialize_entity_record(
564                                        &value,
565                                        self.format_version(),
566                                    ) {
567                                        if let Some(m) = &manager {
568                                            let id = entity.id;
569                                            if let EntityKind::TableRow { row_id, .. } =
570                                                &entity.kind
571                                            {
572                                                m.register_row_id(*row_id);
573                                            }
574                                            self.context_index.index_entity(&name, &entity);
575                                            let _ = m.insert(entity.clone());
576                                            if let Some(metadata) = metadata {
577                                                let _ = m.set_metadata(id, metadata);
578                                            }
579                                            self.register_entity_id(id);
580                                            if self.config.auto_index_refs {
581                                                self.index_cross_refs(&entity, &name)?;
582                                            }
583                                        }
584                                    }
585                                }
586                            }
587
588                            // Store the B-tree for future lookups
589                            self.btree_indices.write().insert(name, Arc::new(btree));
590                        }
591                    } else {
592                        break;
593                    }
594                }
595
596                if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
597                    let cross_ref_count = u32::from_le_bytes([
598                        content[pos],
599                        content[pos + 1],
600                        content[pos + 2],
601                        content[pos + 3],
602                    ]) as usize;
603                    pos += 4;
604
605                    for _ in 0..cross_ref_count {
606                        let Ok(cross_ref) =
607                            reddb_file::decode_native_paged_cross_ref(content, &mut pos)
608                        else {
609                            break;
610                        };
611                        let source_id = EntityId::new(cross_ref.source_id);
612                        let target_id = EntityId::new(cross_ref.target_id);
613                        let ref_type = RefType::from_byte(cross_ref.ref_type);
614                        let target_collection = cross_ref.target_collection;
615
616                        self.cross_refs.write().entry(source_id).or_default().push((
617                            target_id,
618                            ref_type,
619                            target_collection.clone(),
620                        ));
621
622                        if let Some((collection, mut entity)) = self.get_any(source_id) {
623                            let exists = entity.cross_refs().iter().any(|xref| {
624                                xref.target == target_id
625                                    && xref.ref_type == ref_type
626                                    && xref.target_collection == target_collection
627                            });
628                            if !exists {
629                                entity.cross_refs_mut().push(CrossRef::new(
630                                    source_id,
631                                    target_id,
632                                    target_collection.clone(),
633                                    ref_type,
634                                ));
635                                if let Some(manager) = self.get_collection(&collection) {
636                                    let _ = manager.update(entity);
637                                }
638                            }
639                        }
640                    }
641                }
642            }
643        }
644
645        if self.format_version() < STORE_VERSION_V9 {
646            self.set_format_version(STORE_VERSION_V9);
647        }
648
649        Ok(())
650    }
651
652    /// Deserialize an entity from binary bytes
653    pub(crate) fn deserialize_entity(
654        data: &[u8],
655        format_version: u32,
656    ) -> Result<UnifiedEntity, StoreError> {
657        let mut pos = 0;
658        Self::read_entity_binary(data, &mut pos, format_version)
659            .map_err(|e| StoreError::Serialization(e.to_string()))
660    }
661
662    /// Serialize an entity to binary bytes
663    pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
664        // Pre-allocate ~256 bytes to cover the typical 15-column
665        // typed row without any Vec growth. Bulk insert calls this
666        // millions of times per bench run; saving 2-3 reallocs per
667        // entity amortises.
668        let mut buf = Vec::with_capacity(256);
669        Self::write_entity_binary(&mut buf, entity, format_version);
670        buf
671    }
672
673    pub(crate) fn serialize_entity_record(
674        entity: &UnifiedEntity,
675        metadata: Option<&Metadata>,
676        format_version: u32,
677    ) -> Vec<u8> {
678        let entity_bytes = Self::serialize_entity(entity, format_version);
679        // Skip the intermediate metadata Vec when there's no metadata
680        // (common OLTP bulk-insert case): write a zero-length prefix
681        // directly into the record buffer. Only fall back to the old
682        // serialize_metadata() allocation when the caller actually
683        // has fields to persist.
684        let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
685        if has_meta {
686            let metadata_bytes = serialize_metadata(metadata);
687            reddb_file::encode_native_entity_record_frame(&entity_bytes, Some(&metadata_bytes))
688        } else {
689            reddb_file::encode_native_entity_record_frame(&entity_bytes, None)
690        }
691    }
692
693    pub(crate) fn deserialize_entity_record(
694        data: &[u8],
695        format_version: u32,
696    ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
697        let Some(frame) = reddb_file::decode_native_entity_record_frame(data)
698            .map_err(|err| StoreError::Serialization(err.to_string()))?
699        else {
700            return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
701        };
702
703        let entity = Self::deserialize_entity(frame.entity, format_version)?;
704        let metadata = if frame.metadata.is_empty() {
705            None
706        } else {
707            let metadata = deserialize_metadata(frame.metadata)?;
708            if metadata.is_empty() {
709                None
710            } else {
711                Some(metadata)
712            }
713        };
714
715        Ok((entity, metadata))
716    }
717
718    /// Persist all data to page-based storage
719    ///
720    /// Writes all entities to B-tree pages and flushes to disk.
721    /// This provides ACID durability guarantees.
722    pub fn persist(&self) -> Result<(), StoreError> {
723        let pager = match &self.pager {
724            Some(p) => p,
725            None => {
726                // No pager attached - use binary file fallback if path available
727                if let Some(path) = &self.db_path {
728                    return self
729                        .save_to_file(path)
730                        .map_err(|e| StoreError::Serialization(e.to_string()));
731                }
732                return Err(StoreError::Io(std::io::Error::other(
733                    "No pager or path configured for persistence",
734                )));
735            }
736        };
737
738        match pager.read_page(1) {
739            Ok(_) => {}
740            Err(PagerError::PageNotFound(_)) => {
741                let meta_page = pager
742                    .allocate_page(crate::storage::engine::PageType::Header)
743                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
744                pager
745                    .write_page(meta_page.page_id(), meta_page)
746                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
747            }
748            Err(e) => {
749                return Err(StoreError::Io(std::io::Error::other(e.to_string())));
750            }
751        }
752
753        if let Some(commit) = &self.commit {
754            commit.force_sync().map_err(StoreError::Io)?;
755        }
756
757        let collections = self.collections.read();
758        let mut btree_indices = self.btree_indices.write();
759
760        // Collect collection names and their B-tree root pages
761        let mut collection_roots: Vec<(String, u32)> = Vec::new();
762
763        // For each collection, rebuild the B-tree from the live manager state.
764        // A checkpoint must preserve deletes too, not just upsert the current rows.
765        for (name, manager) in collections.iter() {
766            let btree = btree_indices
767                .entry(name.clone())
768                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
769
770            let mut existing_keys = Vec::new();
771            if !btree.is_empty() {
772                let mut cursor = btree.cursor_first().map_err(|e| {
773                    StoreError::Io(std::io::Error::other(format!(
774                        "B-tree cursor error while rebuilding '{name}': {e}"
775                    )))
776                })?;
777                while let Some((key, _)) = cursor.next().map_err(|e| {
778                    StoreError::Io(std::io::Error::other(format!(
779                        "B-tree scan error while rebuilding '{name}': {e}"
780                    )))
781                })? {
782                    existing_keys.push(key);
783                }
784            }
785
786            for key in existing_keys {
787                btree.delete(&key).map_err(|e| {
788                    StoreError::Io(std::io::Error::other(format!(
789                        "B-tree delete error while rebuilding '{name}': {e}"
790                    )))
791                })?;
792            }
793
794            let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
795                .query_all(|_| true)
796                .into_iter()
797                .map(|entity| {
798                    let metadata = manager.get_metadata(entity.id);
799                    (
800                        entity.id.raw().to_be_bytes().to_vec(),
801                        Self::serialize_entity_record(
802                            &entity,
803                            metadata.as_ref(),
804                            self.format_version(),
805                        ),
806                    )
807                })
808                .collect();
809            records.sort_by(|left, right| left.0.cmp(&right.0));
810
811            // Slice G (#704): no per-row skip. Oversized values are
812            // spilled through the slice-E write ladder inside
813            // `bulk_insert_sorted` (inline → compressed inline →
814            // overflow chain). The only rejection is the hard
815            // `MAX_VALUE_SIZE` (256 MiB) ceiling, which surfaces as
816            // `ValueTooLarge` from the bulk path after the rest of
817            // the batch has landed.
818            if !records.is_empty() {
819                btree.bulk_insert_sorted(&records).map_err(|e| {
820                    StoreError::Io(std::io::Error::other(format!(
821                        "B-tree bulk rebuild error for '{name}': {e}"
822                    )))
823                })?;
824            }
825
826            collection_roots.push((name.clone(), btree.root_page_id()));
827        }
828
829        // Write collection metadata to page 1
830        let mut meta_data = Vec::with_capacity(4096);
831
832        let format_version = STORE_VERSION_V9;
833        self.set_format_version(format_version);
834
835        reddb_file::encode_native_paged_metadata_header(
836            &mut meta_data,
837            reddb_file::NativePagedMetadataHeader {
838                format_version,
839                collection_count: collection_roots.len() as u32,
840            },
841        );
842
843        // Write each collection's name and B-tree root page
844        for (name, root_page) in &collection_roots {
845            reddb_file::encode_native_paged_collection_root(&mut meta_data, name, *root_page);
846        }
847
848        // Write cross-reference metadata
849        let cross_refs = self.cross_refs.read();
850        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
851        meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
852        for (source_id, refs) in cross_refs.iter() {
853            for (target_id, ref_type, collection) in refs {
854                reddb_file::encode_native_paged_cross_ref(
855                    &mut meta_data,
856                    source_id.raw(),
857                    target_id.raw(),
858                    ref_type.to_byte(),
859                    collection,
860                );
861            }
862        }
863
864        // Build page 1 (+ overflow chain when needed) for the metadata blob.
865        let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
866            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
867
868        // Write metadata shadow FIRST (intact copy in case main write fails).
869        // The shadow is a no-op when `fold_pager_meta` is enabled.
870        pager
871            .write_meta_shadow(&meta_page)
872            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
873
874        // Write page
875        pager
876            .write_page(1, meta_page)
877            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
878
879        // Flush and fsync all pages to disk
880        pager
881            .sync()
882            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
883
884        if let Some(commit) = &self.commit {
885            commit.truncate().map_err(StoreError::Io)?;
886        }
887
888        Ok(())
889    }
890
891    /// Check if the store is using page-based persistence
892    pub fn is_paged(&self) -> bool {
893        self.pager.is_some()
894    }
895
896    /// Current root page for a collection's primary B-tree, if one has
897    /// been materialized in this store.
898    pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
899        self.btree_indices
900            .read()
901            .get(collection)
902            .map(|btree| btree.root_page_id())
903            .filter(|root| *root != 0)
904    }
905
906    /// Get the database file path (if using paged mode)
907    pub fn db_path(&self) -> Option<&Path> {
908        self.db_path.as_deref()
909    }
910}
911
912fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
913    let Some(metadata) = metadata else {
914        return Vec::new();
915    };
916    if metadata.is_empty() {
917        return Vec::new();
918    }
919
920    let mut entries: Vec<_> = metadata.iter().collect();
921    entries.sort_by_key(|(a, _)| *a);
922
923    let mut buf = Vec::new();
924    buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
925    for (key, value) in entries {
926        write_string(&mut buf, key);
927        write_metadata_value(&mut buf, value);
928    }
929    buf
930}
931
932fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
933    let mut pos = 0usize;
934    let count = read_u32(data, &mut pos)? as usize;
935    let mut metadata = Metadata::new();
936    for _ in 0..count {
937        let key = read_string(data, &mut pos)?;
938        let value = read_metadata_value(data, &mut pos)?;
939        metadata.set(key, value);
940    }
941    Ok(metadata)
942}
943
944fn write_string(buf: &mut Vec<u8>, value: &str) {
945    reddb_file::encode_native_len_prefixed_str(buf, value);
946}
947
948fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
949    reddb_file::encode_native_len_prefixed_bytes(buf, value);
950}
951
952fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
953    use crate::storage::unified::RefTarget;
954
955    match target {
956        RefTarget::TableRow { table, row_id } => {
957            buf.push(0);
958            write_string(buf, table);
959            buf.extend_from_slice(&row_id.to_le_bytes());
960        }
961        RefTarget::Node {
962            collection,
963            node_id,
964        } => {
965            buf.push(1);
966            write_string(buf, collection);
967            buf.extend_from_slice(&node_id.raw().to_le_bytes());
968        }
969        RefTarget::Edge {
970            collection,
971            edge_id,
972        } => {
973            buf.push(2);
974            write_string(buf, collection);
975            buf.extend_from_slice(&edge_id.raw().to_le_bytes());
976        }
977        RefTarget::Vector {
978            collection,
979            vector_id,
980        } => {
981            buf.push(3);
982            write_string(buf, collection);
983            buf.extend_from_slice(&vector_id.raw().to_le_bytes());
984        }
985        RefTarget::Entity {
986            collection,
987            entity_id,
988        } => {
989            buf.push(4);
990            write_string(buf, collection);
991            buf.extend_from_slice(&entity_id.raw().to_le_bytes());
992        }
993    }
994}
995
996fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
997    match value {
998        MetadataValue::Null => buf.push(0),
999        MetadataValue::Bool(v) => {
1000            buf.push(1);
1001            buf.push(u8::from(*v));
1002        }
1003        MetadataValue::Int(v) => {
1004            buf.push(2);
1005            buf.extend_from_slice(&v.to_le_bytes());
1006        }
1007        MetadataValue::Float(v) => {
1008            buf.push(3);
1009            buf.extend_from_slice(&v.to_le_bytes());
1010        }
1011        MetadataValue::String(v) => {
1012            buf.push(4);
1013            write_string(buf, v);
1014        }
1015        MetadataValue::Bytes(v) => {
1016            buf.push(5);
1017            write_bytes(buf, v);
1018        }
1019        MetadataValue::Array(values) => {
1020            buf.push(6);
1021            buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1022            for value in values {
1023                write_metadata_value(buf, value);
1024            }
1025        }
1026        MetadataValue::Object(values) => {
1027            buf.push(7);
1028            let mut entries: Vec<_> = values.iter().collect();
1029            entries.sort_by_key(|(a, _)| *a);
1030            buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
1031            for (key, value) in entries {
1032                write_string(buf, key);
1033                write_metadata_value(buf, value);
1034            }
1035        }
1036        MetadataValue::Timestamp(v) => {
1037            buf.push(8);
1038            buf.extend_from_slice(&v.to_le_bytes());
1039        }
1040        MetadataValue::Geo { lat, lon } => {
1041            buf.push(9);
1042            buf.extend_from_slice(&lat.to_le_bytes());
1043            buf.extend_from_slice(&lon.to_le_bytes());
1044        }
1045        MetadataValue::Reference(target) => {
1046            buf.push(10);
1047            write_ref_target(buf, target);
1048        }
1049        MetadataValue::References(targets) => {
1050            buf.push(11);
1051            buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
1052            for target in targets {
1053                write_ref_target(buf, target);
1054            }
1055        }
1056    }
1057}
1058
1059fn read_exact_slice<'a>(
1060    data: &'a [u8],
1061    pos: &mut usize,
1062    len: usize,
1063) -> Result<&'a [u8], StoreError> {
1064    if *pos + len > data.len() {
1065        return Err(StoreError::Serialization(
1066            "truncated metadata payload".to_string(),
1067        ));
1068    }
1069    let slice = &data[*pos..*pos + len];
1070    *pos += len;
1071    Ok(slice)
1072}
1073
1074fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
1075    let bytes = read_exact_slice(data, pos, 4)?;
1076    let mut raw = [0u8; 4];
1077    raw.copy_from_slice(bytes);
1078    Ok(u32::from_le_bytes(raw))
1079}
1080
1081fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
1082    let bytes = read_exact_slice(data, pos, 8)?;
1083    let mut raw = [0u8; 8];
1084    raw.copy_from_slice(bytes);
1085    Ok(u64::from_le_bytes(raw))
1086}
1087
1088fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
1089    let bytes = read_exact_slice(data, pos, 8)?;
1090    let mut raw = [0u8; 8];
1091    raw.copy_from_slice(bytes);
1092    Ok(i64::from_le_bytes(raw))
1093}
1094
1095fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
1096    let bytes = read_exact_slice(data, pos, 8)?;
1097    let mut raw = [0u8; 8];
1098    raw.copy_from_slice(bytes);
1099    Ok(f64::from_le_bytes(raw))
1100}
1101
1102fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
1103    let bytes = read_exact_slice(data, pos, 1)?;
1104    Ok(bytes[0])
1105}
1106
1107fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
1108    reddb_file::decode_native_len_prefixed_string(data, pos)
1109        .map_err(|err| StoreError::Serialization(err.to_string()))
1110}
1111
1112fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
1113    reddb_file::decode_native_len_prefixed_bytes(data, pos)
1114        .map(|bytes| bytes.to_vec())
1115        .map_err(|err| StoreError::Serialization(err.to_string()))
1116}
1117
1118fn read_ref_target(
1119    data: &[u8],
1120    pos: &mut usize,
1121) -> Result<crate::storage::unified::RefTarget, StoreError> {
1122    use crate::storage::unified::RefTarget;
1123
1124    match read_u8(data, pos)? {
1125        0 => Ok(RefTarget::TableRow {
1126            table: read_string(data, pos)?,
1127            row_id: read_u64(data, pos)?,
1128        }),
1129        1 => Ok(RefTarget::Node {
1130            collection: read_string(data, pos)?,
1131            node_id: EntityId::new(read_u64(data, pos)?),
1132        }),
1133        2 => Ok(RefTarget::Edge {
1134            collection: read_string(data, pos)?,
1135            edge_id: EntityId::new(read_u64(data, pos)?),
1136        }),
1137        3 => Ok(RefTarget::Vector {
1138            collection: read_string(data, pos)?,
1139            vector_id: EntityId::new(read_u64(data, pos)?),
1140        }),
1141        4 => Ok(RefTarget::Entity {
1142            collection: read_string(data, pos)?,
1143            entity_id: EntityId::new(read_u64(data, pos)?),
1144        }),
1145        tag => Err(StoreError::Serialization(format!(
1146            "unknown metadata ref target tag {tag}"
1147        ))),
1148    }
1149}
1150
1151fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1152    match read_u8(data, pos)? {
1153        0 => Ok(MetadataValue::Null),
1154        1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1155        2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1156        3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1157        4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1158        5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1159        6 => {
1160            let count = read_u32(data, pos)? as usize;
1161            let mut values = Vec::with_capacity(count);
1162            for _ in 0..count {
1163                values.push(read_metadata_value(data, pos)?);
1164            }
1165            Ok(MetadataValue::Array(values))
1166        }
1167        7 => {
1168            let count = read_u32(data, pos)? as usize;
1169            let mut values = std::collections::HashMap::with_capacity(count);
1170            for _ in 0..count {
1171                let key = read_string(data, pos)?;
1172                let value = read_metadata_value(data, pos)?;
1173                values.insert(key, value);
1174            }
1175            Ok(MetadataValue::Object(values))
1176        }
1177        8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1178        9 => Ok(MetadataValue::Geo {
1179            lat: read_f64(data, pos)?,
1180            lon: read_f64(data, pos)?,
1181        }),
1182        10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1183        11 => {
1184            let count = read_u32(data, pos)? as usize;
1185            let mut targets = Vec::with_capacity(count);
1186            for _ in 0..count {
1187                targets.push(read_ref_target(data, pos)?);
1188            }
1189            Ok(MetadataValue::References(targets))
1190        }
1191        tag => Err(StoreError::Serialization(format!(
1192            "unknown metadata value tag {tag}"
1193        ))),
1194    }
1195}