Skip to main content

reddb_server/storage/unified/store/
impl_pages.rs

1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5const ENTITY_RECORD_MAGIC: &[u8; 4] = b"RER1";
6
7// ── Pager-meta overflow chain (gh-477) ──────────────────────────────────────
8// When the serialized collection registry + cross-refs exceed a single page,
9// page 1 carries a "RDM3" wrapper header pointing at an overflow chain of
10// `PageType::Overflow` pages. Single-page metadata keeps the historical
11// bit-identical layout (`METADATA_MAGIC = "RDM2"` written directly at the
12// content offset).
13//
14// Page 1 (overflow form), starting at `HEADER_SIZE`:
15//   [0..4]   magic = "RDM3"
16//   [4..8]   format_version (u32, mirrors inner payload version for debug)
17//   [8..12]  total_payload_bytes (u32)
18//   [12..16] next_overflow_page_id (u32, > 0)
19//   [16..]   first payload chunk (up to META_V3_FIRST_PAYLOAD_CAP bytes)
20//
21// Overflow continuation page, starting at `HEADER_SIZE`:
22//   [0..4]   next_overflow_page_id (u32, 0 if last)
23//   [4..8]   chunk_bytes (u32)
24//   [8..]    chunk payload (up to META_V3_OVERFLOW_PAYLOAD_CAP bytes)
25const METADATA_OVERFLOW_MAGIC: &[u8; 4] = b"RDM3";
26const META_PAGE_CONTENT_CAP: usize =
27    crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE;
28const META_V3_PAGE1_HEADER: usize = 16;
29const META_V3_OVERFLOW_HEADER: usize = 8;
30const META_V3_FIRST_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_PAGE1_HEADER;
31const META_V3_OVERFLOW_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_OVERFLOW_HEADER;
32
33fn free_existing_overflow_chain(pager: &Pager) -> Result<(), PagerError> {
34    let cs = crate::storage::engine::HEADER_SIZE;
35    let page = match pager.read_page(1) {
36        Ok(p) => p,
37        Err(_) => return Ok(()),
38    };
39    let bytes = page.as_bytes();
40    if bytes.len() < cs + META_V3_PAGE1_HEADER {
41        return Ok(());
42    }
43    if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
44        return Ok(());
45    }
46    let mut next = u32::from_le_bytes([
47        bytes[cs + 12],
48        bytes[cs + 13],
49        bytes[cs + 14],
50        bytes[cs + 15],
51    ]);
52    while next != 0 {
53        let ov = match pager.read_page(next) {
54            Ok(p) => p,
55            Err(_) => break,
56        };
57        let ob = ov.as_bytes();
58        let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
59        let _ = pager.free_page(next);
60        next = nn;
61    }
62    Ok(())
63}
64
65fn build_meta_page1_with_overflow(
66    pager: &Pager,
67    meta_data: &[u8],
68) -> Result<crate::storage::engine::Page, PagerError> {
69    use crate::storage::engine::{Page, PageType, HEADER_SIZE};
70    free_existing_overflow_chain(pager)?;
71
72    let mut page1 = Page::new(PageType::Header, 1);
73    let cs = HEADER_SIZE;
74
75    if meta_data.len() <= META_PAGE_CONTENT_CAP {
76        // Single-page: bit-identical to the historical layout.
77        let buf = page1.as_bytes_mut();
78        buf[cs..cs + meta_data.len()].copy_from_slice(meta_data);
79        return Ok(page1);
80    }
81
82    // Multi-page overflow form. Split the inner payload into the first chunk
83    // (held on page 1) followed by zero-or-more continuation chunks chained
84    // through `PageType::Overflow` pages.
85    let first_chunk = &meta_data[..META_V3_FIRST_PAYLOAD_CAP];
86    let mut tail = &meta_data[META_V3_FIRST_PAYLOAD_CAP..];
87    let mut chunks: Vec<&[u8]> = Vec::new();
88    while !tail.is_empty() {
89        let take = tail.len().min(META_V3_OVERFLOW_PAYLOAD_CAP);
90        chunks.push(&tail[..take]);
91        tail = &tail[take..];
92    }
93
94    let mut overflow_pages: Vec<Page> = Vec::with_capacity(chunks.len());
95    let mut overflow_ids: Vec<u32> = Vec::with_capacity(chunks.len());
96    for _ in 0..chunks.len() {
97        let pg = pager.allocate_page(PageType::Overflow)?;
98        overflow_ids.push(pg.page_id());
99        overflow_pages.push(pg);
100    }
101
102    for i in 0..chunks.len() {
103        let next = if i + 1 < chunks.len() {
104            overflow_ids[i + 1]
105        } else {
106            0u32
107        };
108        let len = chunks[i].len() as u32;
109        let buf = overflow_pages[i].as_bytes_mut();
110        buf[cs..cs + 4].copy_from_slice(&next.to_le_bytes());
111        buf[cs + 4..cs + 8].copy_from_slice(&len.to_le_bytes());
112        buf[cs + 8..cs + 8 + chunks[i].len()].copy_from_slice(chunks[i]);
113    }
114    for (idx, page) in overflow_pages.into_iter().enumerate() {
115        let id = overflow_ids[idx];
116        pager.write_page(id, page)?;
117    }
118
119    // Mirror the inner format_version for debug-friendly hex dumps.
120    let format_version = if meta_data.len() >= 8 && &meta_data[0..4] == METADATA_MAGIC {
121        u32::from_le_bytes([meta_data[4], meta_data[5], meta_data[6], meta_data[7]])
122    } else {
123        0
124    };
125
126    let buf = page1.as_bytes_mut();
127    buf[cs..cs + 4].copy_from_slice(METADATA_OVERFLOW_MAGIC);
128    buf[cs + 4..cs + 8].copy_from_slice(&format_version.to_le_bytes());
129    buf[cs + 8..cs + 12].copy_from_slice(&(meta_data.len() as u32).to_le_bytes());
130    buf[cs + 12..cs + 16].copy_from_slice(&overflow_ids[0].to_le_bytes());
131    buf[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_chunk.len()]
132        .copy_from_slice(first_chunk);
133
134    Ok(page1)
135}
136
137/// Assemble the full metadata payload from page 1 (plus its overflow chain
138/// when the `RDM3` wrapper is present). Returns the bytes that the metadata
139/// parser would see starting from the content offset of page 1. Single-page
140/// metadata returns the raw page content (including trailing zero-pad), so
141/// the legacy parser sees the same bytes it always saw.
142fn read_meta_payload(pager: &Pager) -> Option<Vec<u8>> {
143    let cs = crate::storage::engine::HEADER_SIZE;
144    let meta_page = pager
145        .read_page(1)
146        .or_else(|_| pager.recover_meta_from_shadow())
147        .ok()?;
148    let bytes = meta_page.as_bytes();
149    if bytes.len() < cs + 4 {
150        return Some(bytes.get(cs..).unwrap_or(&[]).to_vec());
151    }
152    if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
153        return Some(bytes[cs..].to_vec());
154    }
155    if bytes.len() < cs + META_V3_PAGE1_HEADER {
156        return None;
157    }
158    let total =
159        u32::from_le_bytes([bytes[cs + 8], bytes[cs + 9], bytes[cs + 10], bytes[cs + 11]]) as usize;
160    let mut next = u32::from_le_bytes([
161        bytes[cs + 12],
162        bytes[cs + 13],
163        bytes[cs + 14],
164        bytes[cs + 15],
165    ]);
166    let mut payload: Vec<u8> = Vec::with_capacity(total);
167    let first_take = total.min(META_V3_FIRST_PAYLOAD_CAP);
168    payload.extend_from_slice(
169        &bytes[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_take],
170    );
171    while next != 0 && payload.len() < total {
172        let ov = pager.read_page(next).ok()?;
173        let ob = ov.as_bytes();
174        if ob.len() < cs + META_V3_OVERFLOW_HEADER {
175            return None;
176        }
177        let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
178        let len = u32::from_le_bytes([ob[cs + 4], ob[cs + 5], ob[cs + 6], ob[cs + 7]]) as usize;
179        let remaining = total - payload.len();
180        let take = len.min(remaining).min(META_V3_OVERFLOW_PAYLOAD_CAP);
181        payload.extend_from_slice(
182            &ob[cs + META_V3_OVERFLOW_HEADER..cs + META_V3_OVERFLOW_HEADER + take],
183        );
184        next = nn;
185    }
186    Some(payload)
187}
188
189impl UnifiedStore {
190    pub(crate) fn mark_paged_registry_dirty(&self) {
191        self.paged_registry_dirty.store(true, Ordering::Release);
192    }
193
194    /// Get (or lazily create) the per-collection B-tree under a *read*
195    /// lock whenever possible. Returns a cloned `Arc<BTree>` so callers
196    /// can mutate the tree without holding the outer map's RwLock —
197    /// previously every insert serialised on `btree_indices.write()`,
198    /// costing ~60% of the concurrent-insert throughput ceiling.
199    pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
200        let pager = self.pager.as_ref()?;
201        if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
202            return Some(btree);
203        }
204        let mut write = self.btree_indices.write();
205        let btree = write
206            .entry(collection.to_string())
207            .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
208            .clone();
209        Some(btree)
210    }
211
212    pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
213        let Some(pager) = &self.pager else {
214            return Ok(());
215        };
216
217        if self.paged_registry_dirty.load(Ordering::Acquire) {
218            self.flush_paged_registry()?;
219            self.paged_registry_dirty.store(false, Ordering::Release);
220            return Ok(());
221        }
222
223        pager
224            .flush()
225            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
226    }
227
228    pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
229        let Some(pager) = &self.pager else {
230            return Ok(());
231        };
232
233        match pager.read_page(1) {
234            Ok(_) => {}
235            Err(PagerError::PageNotFound(_)) => {
236                let meta_page = pager
237                    .allocate_page(crate::storage::engine::PageType::Header)
238                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
239                pager
240                    .write_page(meta_page.page_id(), meta_page)
241                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
242            }
243            Err(e) => {
244                return Err(StoreError::Io(std::io::Error::other(e.to_string())));
245            }
246        }
247
248        let format_version = STORE_VERSION_V9;
249        self.set_format_version(format_version);
250
251        let collections = self.collections.read();
252        let btree_indices = self.btree_indices.read();
253        let mut collection_roots = Vec::with_capacity(collections.len());
254        for (name, _) in collections.iter() {
255            let root_page = btree_indices
256                .get(name)
257                .map_or(0, |btree| btree.root_page_id());
258            collection_roots.push((name.clone(), root_page));
259        }
260        drop(btree_indices);
261        drop(collections);
262
263        let mut meta_data = Vec::with_capacity(4096);
264        meta_data.extend_from_slice(METADATA_MAGIC);
265        meta_data.extend_from_slice(&format_version.to_le_bytes());
266        meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
267        for (name, root_page) in &collection_roots {
268            meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
269            meta_data.extend_from_slice(name.as_bytes());
270            meta_data.extend_from_slice(&root_page.to_le_bytes());
271        }
272
273        let cross_refs = self.cross_refs.read();
274        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
275        meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
276        for (source_id, refs) in cross_refs.iter() {
277            for (target_id, ref_type, collection) in refs {
278                meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
279                meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
280                meta_data.push(ref_type.to_byte());
281                meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
282                meta_data.extend_from_slice(collection.as_bytes());
283            }
284        }
285
286        let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
287            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
288
289        pager
290            .write_meta_shadow(&meta_page)
291            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
292        pager
293            .write_page(1, meta_page)
294            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
295        pager
296            .flush()
297            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
298
299        Ok(())
300    }
301
302    /// Get a reference to the underlying pager (if in paged mode).
303    pub fn pager(&self) -> Option<&Arc<Pager>> {
304        self.pager.as_ref()
305    }
306
307    /// Borrow the immutable store configuration. Runtime hooks (e.g. the
308    /// `auto_index_id` first-insert hook in `MutationEngine`) read knobs
309    /// off this struct without going through the legacy global config tree.
310    pub fn config(&self) -> &UnifiedStoreConfig {
311        &self.config
312    }
313
314    pub fn with_config(config: UnifiedStoreConfig) -> Self {
315        Self {
316            config,
317            format_version: AtomicU32::new(STORE_VERSION_V9),
318            next_entity_id: AtomicU64::new(1),
319            collections: RwLock::new(HashMap::new()),
320            cross_refs: RwLock::new(HashMap::new()),
321            reverse_refs: RwLock::new(HashMap::new()),
322            pager: None,
323            db_path: None,
324            btree_indices: RwLock::new(HashMap::new()),
325            context_index: ContextIndex::new(),
326            entity_cache: EntityCache::new(),
327            graph_label_index: RwLock::new(HashMap::new()),
328            paged_registry_dirty: AtomicBool::new(false),
329            commit: None,
330            unindex_cross_refs_fast_path: AtomicU64::new(0),
331            replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
332        }
333    }
334
335    /// Open or create a page-based database
336    ///
337    /// This uses the page engine for ACID durability with B-tree indices.
338    /// The database file uses 4KB pages with checksums and efficient caching.
339    ///
340    /// # Arguments
341    ///
342    /// * `path` - Path to the database file (e.g., "data.rdb")
343    ///
344    /// # Example
345    ///
346    /// ```rust,ignore
347    /// let store = UnifiedStore::open("security.rdb")?;
348    /// store.create_collection("hosts")?;
349    /// // ... operations ...
350    /// store.persist()?; // Flush to disk
351    /// ```
352    pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
353        Self::open_with_config(path, UnifiedStoreConfig::default())
354    }
355
356    pub fn open_with_config(
357        path: impl AsRef<Path>,
358        config: UnifiedStoreConfig,
359    ) -> Result<Self, StoreError> {
360        let path = path.as_ref();
361        let mut pager_config = PagerConfig::default();
362        // Tunables via env — experimental, used by the benchmark harness
363        // to compare durability profiles head-to-head with Postgres.
364        // REDDB_DOUBLE_WRITE=0 disables the double-write buffer, which
365        // otherwise adds two fsyncs per pager flush (one on DWB, one
366        // on the main file). With DWB off the pager behaves more like
367        // Postgres + full_page_writes=off — we trade torn-page
368        // protection for ingest throughput.
369        if matches!(
370            std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
371            Some("0") | Some("false") | Some("off")
372        ) {
373            pager_config.double_write = false;
374        }
375        let pager = Pager::open(path, pager_config)
376            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
377
378        let wal_path = Self::wal_path_for_db(path);
379        let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
380            Some(Arc::new(
381                StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
382                    .map_err(StoreError::Io)?,
383            ))
384        } else {
385            None
386        };
387
388        let store = Self {
389            config,
390            format_version: AtomicU32::new(STORE_VERSION_V9),
391            next_entity_id: AtomicU64::new(1),
392            collections: RwLock::new(HashMap::new()),
393            cross_refs: RwLock::new(HashMap::new()),
394            reverse_refs: RwLock::new(HashMap::new()),
395            pager: Some(Arc::new(pager)),
396            db_path: Some(path.to_path_buf()),
397            btree_indices: RwLock::new(HashMap::new()),
398            context_index: ContextIndex::new(),
399            entity_cache: EntityCache::new(),
400            graph_label_index: RwLock::new(HashMap::new()),
401            paged_registry_dirty: AtomicBool::new(false),
402            commit,
403            unindex_cross_refs_fast_path: AtomicU64::new(0),
404            replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
405        };
406
407        // Load existing data from pages if database exists
408        store.load_from_pages()?;
409        if let Some(commit) = &store.commit {
410            commit.replay_into(&store).map_err(StoreError::Io)?;
411        }
412
413        Ok(store)
414    }
415
416    /// Load data from page-based storage
417    ///
418    /// Reads the B-tree indices and reconstructs collections from pages.
419    fn load_from_pages(&self) -> Result<(), StoreError> {
420        let pager = match &self.pager {
421            Some(p) => p,
422            None => return Ok(()), // No pager, nothing to load
423        };
424
425        // Get page count
426        let page_count = pager.page_count().map_err(|e| {
427            StoreError::Io(std::io::Error::other(format!(
428                "failed to read page count: {}",
429                e
430            )))
431        })?;
432        if page_count <= 1 {
433            // Empty database (only header page)
434            return Ok(());
435        }
436
437        // Read metadata starting from page 1 (collections registry). The
438        // helper transparently follows the `RDM3` overflow chain when the
439        // metadata blob spans multiple pages and falls back to the legacy
440        // `<data>-meta` shadow when page 1 itself is corrupted.
441        if let Some(content_vec) = read_meta_payload(pager) {
442            let content: &[u8] = &content_vec;
443            if content.len() >= 4 {
444                let mut pos = 0;
445                let mut format_version = STORE_VERSION_V1;
446
447                if content.len() >= 8 && &content[0..4] == METADATA_MAGIC {
448                    format_version =
449                        u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
450                    pos += 8;
451                }
452
453                self.set_format_version(format_version);
454
455                // Collection count
456                let collection_count = u32::from_le_bytes([
457                    content[pos],
458                    content[pos + 1],
459                    content[pos + 2],
460                    content[pos + 3],
461                ]) as usize;
462                pos += 4;
463
464                // Read collection names and their B-tree root pages
465                for _ in 0..collection_count {
466                    if pos + 4 > content.len() {
467                        break;
468                    }
469
470                    let name_len = u32::from_le_bytes([
471                        content[pos],
472                        content[pos + 1],
473                        content[pos + 2],
474                        content[pos + 3],
475                    ]) as usize;
476                    pos += 4;
477
478                    if pos + name_len + 4 > content.len() {
479                        break;
480                    }
481
482                    if let Ok(name) = String::from_utf8(content[pos..pos + name_len].to_vec()) {
483                        pos += name_len;
484
485                        // Root page ID for this collection's B-tree
486                        let root_page = u32::from_le_bytes([
487                            content[pos],
488                            content[pos + 1],
489                            content[pos + 2],
490                            content[pos + 3],
491                        ]);
492                        pos += 4;
493
494                        // Hydrate the collection in memory only. Loading must
495                        // not emit WAL entries or rewrite the on-disk registry
496                        // before the existing B-tree roots are attached.
497                        let _ = self.create_collection_in_memory(&name);
498
499                        // Load B-tree with root page if it exists
500                        if root_page > 0 {
501                            let btree = BTree::with_root(Arc::clone(pager), root_page);
502
503                            // Load all entities from B-tree into the collection
504                            if let Ok(mut cursor) = btree.cursor_first() {
505                                let manager = self.get_collection(&name);
506                                while let Ok(Some((key, value))) = cursor.next() {
507                                    // Deserialize entity from value bytes
508                                    if let Ok((entity, metadata)) = Self::deserialize_entity_record(
509                                        &value,
510                                        self.format_version(),
511                                    ) {
512                                        if let Some(m) = &manager {
513                                            let id = entity.id;
514                                            if let EntityKind::TableRow { row_id, .. } =
515                                                &entity.kind
516                                            {
517                                                m.register_row_id(*row_id);
518                                            }
519                                            self.context_index.index_entity(&name, &entity);
520                                            let _ = m.insert(entity.clone());
521                                            if let Some(metadata) = metadata {
522                                                let _ = m.set_metadata(id, metadata);
523                                            }
524                                            self.register_entity_id(id);
525                                            if self.config.auto_index_refs {
526                                                self.index_cross_refs(&entity, &name)?;
527                                            }
528                                        }
529                                    }
530                                }
531                            }
532
533                            // Store the B-tree for future lookups
534                            self.btree_indices.write().insert(name, Arc::new(btree));
535                        }
536                    } else {
537                        pos += name_len + 4;
538                    }
539                }
540
541                if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
542                    let cross_ref_count = u32::from_le_bytes([
543                        content[pos],
544                        content[pos + 1],
545                        content[pos + 2],
546                        content[pos + 3],
547                    ]) as usize;
548                    pos += 4;
549
550                    for _ in 0..cross_ref_count {
551                        if pos + 17 > content.len() {
552                            break;
553                        }
554                        let source_id = u64::from_le_bytes([
555                            content[pos],
556                            content[pos + 1],
557                            content[pos + 2],
558                            content[pos + 3],
559                            content[pos + 4],
560                            content[pos + 5],
561                            content[pos + 6],
562                            content[pos + 7],
563                        ]);
564                        pos += 8;
565                        let target_id = u64::from_le_bytes([
566                            content[pos],
567                            content[pos + 1],
568                            content[pos + 2],
569                            content[pos + 3],
570                            content[pos + 4],
571                            content[pos + 5],
572                            content[pos + 6],
573                            content[pos + 7],
574                        ]);
575                        pos += 8;
576                        let ref_type = RefType::from_byte(content[pos]);
577                        pos += 1;
578
579                        if pos + 4 > content.len() {
580                            break;
581                        }
582                        let name_len = u32::from_le_bytes([
583                            content[pos],
584                            content[pos + 1],
585                            content[pos + 2],
586                            content[pos + 3],
587                        ]) as usize;
588                        pos += 4;
589                        if pos + name_len > content.len() {
590                            break;
591                        }
592                        let target_collection =
593                            String::from_utf8_lossy(&content[pos..pos + name_len]).to_string();
594                        pos += name_len;
595
596                        let source_id = EntityId::new(source_id);
597                        let target_id = EntityId::new(target_id);
598
599                        self.cross_refs.write().entry(source_id).or_default().push((
600                            target_id,
601                            ref_type,
602                            target_collection.clone(),
603                        ));
604
605                        if let Some((collection, mut entity)) = self.get_any(source_id) {
606                            let exists = entity.cross_refs().iter().any(|xref| {
607                                xref.target == target_id
608                                    && xref.ref_type == ref_type
609                                    && xref.target_collection == target_collection
610                            });
611                            if !exists {
612                                entity.cross_refs_mut().push(CrossRef::new(
613                                    source_id,
614                                    target_id,
615                                    target_collection.clone(),
616                                    ref_type,
617                                ));
618                                if let Some(manager) = self.get_collection(&collection) {
619                                    let _ = manager.update(entity);
620                                }
621                            }
622                        }
623                    }
624                }
625            }
626        }
627
628        if self.format_version() < STORE_VERSION_V9 {
629            self.set_format_version(STORE_VERSION_V9);
630        }
631
632        Ok(())
633    }
634
635    /// Deserialize an entity from binary bytes
636    pub(crate) fn deserialize_entity(
637        data: &[u8],
638        format_version: u32,
639    ) -> Result<UnifiedEntity, StoreError> {
640        let mut pos = 0;
641        Self::read_entity_binary(data, &mut pos, format_version)
642            .map_err(|e| StoreError::Serialization(e.to_string()))
643    }
644
645    /// Serialize an entity to binary bytes
646    pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
647        // Pre-allocate ~256 bytes to cover the typical 15-column
648        // typed row without any Vec growth. Bulk insert calls this
649        // millions of times per bench run; saving 2-3 reallocs per
650        // entity amortises.
651        let mut buf = Vec::with_capacity(256);
652        Self::write_entity_binary(&mut buf, entity, format_version);
653        buf
654    }
655
656    pub(crate) fn serialize_entity_record(
657        entity: &UnifiedEntity,
658        metadata: Option<&Metadata>,
659        format_version: u32,
660    ) -> Vec<u8> {
661        let entity_bytes = Self::serialize_entity(entity, format_version);
662        // Skip the intermediate metadata Vec when there's no metadata
663        // (common OLTP bulk-insert case): write a zero-length prefix
664        // directly into the record buffer. Only fall back to the old
665        // serialize_metadata() allocation when the caller actually
666        // has fields to persist.
667        let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
668        if has_meta {
669            let metadata_bytes = serialize_metadata(metadata);
670            let mut buf = Vec::with_capacity(12 + entity_bytes.len() + metadata_bytes.len());
671            buf.extend_from_slice(ENTITY_RECORD_MAGIC);
672            buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
673            buf.extend_from_slice(&entity_bytes);
674            buf.extend_from_slice(&(metadata_bytes.len() as u32).to_le_bytes());
675            buf.extend_from_slice(&metadata_bytes);
676            buf
677        } else {
678            let mut buf = Vec::with_capacity(12 + entity_bytes.len());
679            buf.extend_from_slice(ENTITY_RECORD_MAGIC);
680            buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
681            buf.extend_from_slice(&entity_bytes);
682            buf.extend_from_slice(&0u32.to_le_bytes());
683            buf
684        }
685    }
686
687    pub(crate) fn deserialize_entity_record(
688        data: &[u8],
689        format_version: u32,
690    ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
691        if data.len() < 8 || &data[..4] != ENTITY_RECORD_MAGIC {
692            return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
693        }
694
695        let mut pos = 4usize;
696        let entity_len = read_u32(data, &mut pos)? as usize;
697        if pos + entity_len > data.len() {
698            return Err(StoreError::Serialization(
699                "truncated entity record payload".to_string(),
700            ));
701        }
702        let entity = Self::deserialize_entity(&data[pos..pos + entity_len], format_version)?;
703        pos += entity_len;
704
705        let metadata_len = read_u32(data, &mut pos)? as usize;
706        if pos + metadata_len > data.len() {
707            return Err(StoreError::Serialization(
708                "truncated entity record metadata".to_string(),
709            ));
710        }
711        let metadata = if metadata_len == 0 {
712            None
713        } else {
714            let metadata = deserialize_metadata(&data[pos..pos + metadata_len])?;
715            if metadata.is_empty() {
716                None
717            } else {
718                Some(metadata)
719            }
720        };
721
722        Ok((entity, metadata))
723    }
724
725    /// Persist all data to page-based storage
726    ///
727    /// Writes all entities to B-tree pages and flushes to disk.
728    /// This provides ACID durability guarantees.
729    pub fn persist(&self) -> Result<(), StoreError> {
730        let pager = match &self.pager {
731            Some(p) => p,
732            None => {
733                // No pager attached - use binary file fallback if path available
734                if let Some(path) = &self.db_path {
735                    return self
736                        .save_to_file(path)
737                        .map_err(|e| StoreError::Serialization(e.to_string()));
738                }
739                return Err(StoreError::Io(std::io::Error::other(
740                    "No pager or path configured for persistence",
741                )));
742            }
743        };
744
745        match pager.read_page(1) {
746            Ok(_) => {}
747            Err(PagerError::PageNotFound(_)) => {
748                let meta_page = pager
749                    .allocate_page(crate::storage::engine::PageType::Header)
750                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
751                pager
752                    .write_page(meta_page.page_id(), meta_page)
753                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
754            }
755            Err(e) => {
756                return Err(StoreError::Io(std::io::Error::other(e.to_string())));
757            }
758        }
759
760        if let Some(commit) = &self.commit {
761            commit.force_sync().map_err(StoreError::Io)?;
762        }
763
764        let collections = self.collections.read();
765        let mut btree_indices = self.btree_indices.write();
766
767        // Collect collection names and their B-tree root pages
768        let mut collection_roots: Vec<(String, u32)> = Vec::new();
769
770        // For each collection, rebuild the B-tree from the live manager state.
771        // A checkpoint must preserve deletes too, not just upsert the current rows.
772        for (name, manager) in collections.iter() {
773            let btree = btree_indices
774                .entry(name.clone())
775                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
776
777            let mut existing_keys = Vec::new();
778            if !btree.is_empty() {
779                let mut cursor = btree.cursor_first().map_err(|e| {
780                    StoreError::Io(std::io::Error::other(format!(
781                        "B-tree cursor error while rebuilding '{name}': {e}"
782                    )))
783                })?;
784                while let Some((key, _)) = cursor.next().map_err(|e| {
785                    StoreError::Io(std::io::Error::other(format!(
786                        "B-tree scan error while rebuilding '{name}': {e}"
787                    )))
788                })? {
789                    existing_keys.push(key);
790                }
791            }
792
793            for key in existing_keys {
794                btree.delete(&key).map_err(|e| {
795                    StoreError::Io(std::io::Error::other(format!(
796                        "B-tree delete error while rebuilding '{name}': {e}"
797                    )))
798                })?;
799            }
800
801            let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
802                .query_all(|_| true)
803                .into_iter()
804                .map(|entity| {
805                    let metadata = manager.get_metadata(entity.id);
806                    (
807                        entity.id.raw().to_be_bytes().to_vec(),
808                        Self::serialize_entity_record(
809                            &entity,
810                            metadata.as_ref(),
811                            self.format_version(),
812                        ),
813                    )
814                })
815                .collect();
816            records.sort_by(|left, right| left.0.cmp(&right.0));
817
818            // Slice G (#704): no per-row skip. Oversized values are
819            // spilled through the slice-E write ladder inside
820            // `bulk_insert_sorted` (inline → compressed inline →
821            // overflow chain). The only rejection is the hard
822            // `MAX_VALUE_SIZE` (256 MiB) ceiling, which surfaces as
823            // `ValueTooLarge` from the bulk path after the rest of
824            // the batch has landed.
825            if !records.is_empty() {
826                btree.bulk_insert_sorted(&records).map_err(|e| {
827                    StoreError::Io(std::io::Error::other(format!(
828                        "B-tree bulk rebuild error for '{name}': {e}"
829                    )))
830                })?;
831            }
832
833            collection_roots.push((name.clone(), btree.root_page_id()));
834        }
835
836        // Write collection metadata to page 1
837        let mut meta_data = Vec::with_capacity(4096);
838
839        let format_version = STORE_VERSION_V9;
840        self.set_format_version(format_version);
841
842        // Metadata header: magic + version + collection count
843        meta_data.extend_from_slice(METADATA_MAGIC);
844        meta_data.extend_from_slice(&format_version.to_le_bytes());
845        meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
846
847        // Write each collection's name and B-tree root page
848        for (name, root_page) in &collection_roots {
849            // Name length
850            meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
851            // Name
852            meta_data.extend_from_slice(name.as_bytes());
853            // Root page ID from actual B-tree
854            meta_data.extend_from_slice(&root_page.to_le_bytes());
855        }
856
857        // Write cross-reference metadata
858        let cross_refs = self.cross_refs.read();
859        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
860        meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
861        for (source_id, refs) in cross_refs.iter() {
862            for (target_id, ref_type, collection) in refs {
863                meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
864                meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
865                meta_data.push(ref_type.to_byte());
866                meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
867                meta_data.extend_from_slice(collection.as_bytes());
868            }
869        }
870
871        // Build page 1 (+ overflow chain when needed) for the metadata blob.
872        let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
873            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
874
875        // Write metadata shadow FIRST (intact copy in case main write fails).
876        // The shadow is a no-op when `fold_pager_meta` is enabled.
877        pager
878            .write_meta_shadow(&meta_page)
879            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
880
881        // Write page
882        pager
883            .write_page(1, meta_page)
884            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
885
886        // Flush and fsync all pages to disk
887        pager
888            .sync()
889            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
890
891        if let Some(commit) = &self.commit {
892            commit.truncate().map_err(StoreError::Io)?;
893        }
894
895        Ok(())
896    }
897
898    /// Check if the store is using page-based persistence
899    pub fn is_paged(&self) -> bool {
900        self.pager.is_some()
901    }
902
903    /// Current root page for a collection's primary B-tree, if one has
904    /// been materialized in this store.
905    pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
906        self.btree_indices
907            .read()
908            .get(collection)
909            .map(|btree| btree.root_page_id())
910            .filter(|root| *root != 0)
911    }
912
913    /// Get the database file path (if using paged mode)
914    pub fn db_path(&self) -> Option<&Path> {
915        self.db_path.as_deref()
916    }
917}
918
919fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
920    let Some(metadata) = metadata else {
921        return Vec::new();
922    };
923    if metadata.is_empty() {
924        return Vec::new();
925    }
926
927    let mut entries: Vec<_> = metadata.iter().collect();
928    entries.sort_by_key(|(a, _)| *a);
929
930    let mut buf = Vec::new();
931    buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
932    for (key, value) in entries {
933        write_string(&mut buf, key);
934        write_metadata_value(&mut buf, value);
935    }
936    buf
937}
938
939fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
940    let mut pos = 0usize;
941    let count = read_u32(data, &mut pos)? as usize;
942    let mut metadata = Metadata::new();
943    for _ in 0..count {
944        let key = read_string(data, &mut pos)?;
945        let value = read_metadata_value(data, &mut pos)?;
946        metadata.set(key, value);
947    }
948    Ok(metadata)
949}
950
951fn write_string(buf: &mut Vec<u8>, value: &str) {
952    buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
953    buf.extend_from_slice(value.as_bytes());
954}
955
956fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
957    buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
958    buf.extend_from_slice(value);
959}
960
961fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
962    use crate::storage::unified::RefTarget;
963
964    match target {
965        RefTarget::TableRow { table, row_id } => {
966            buf.push(0);
967            write_string(buf, table);
968            buf.extend_from_slice(&row_id.to_le_bytes());
969        }
970        RefTarget::Node {
971            collection,
972            node_id,
973        } => {
974            buf.push(1);
975            write_string(buf, collection);
976            buf.extend_from_slice(&node_id.raw().to_le_bytes());
977        }
978        RefTarget::Edge {
979            collection,
980            edge_id,
981        } => {
982            buf.push(2);
983            write_string(buf, collection);
984            buf.extend_from_slice(&edge_id.raw().to_le_bytes());
985        }
986        RefTarget::Vector {
987            collection,
988            vector_id,
989        } => {
990            buf.push(3);
991            write_string(buf, collection);
992            buf.extend_from_slice(&vector_id.raw().to_le_bytes());
993        }
994        RefTarget::Entity {
995            collection,
996            entity_id,
997        } => {
998            buf.push(4);
999            write_string(buf, collection);
1000            buf.extend_from_slice(&entity_id.raw().to_le_bytes());
1001        }
1002    }
1003}
1004
1005fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
1006    match value {
1007        MetadataValue::Null => buf.push(0),
1008        MetadataValue::Bool(v) => {
1009            buf.push(1);
1010            buf.push(u8::from(*v));
1011        }
1012        MetadataValue::Int(v) => {
1013            buf.push(2);
1014            buf.extend_from_slice(&v.to_le_bytes());
1015        }
1016        MetadataValue::Float(v) => {
1017            buf.push(3);
1018            buf.extend_from_slice(&v.to_le_bytes());
1019        }
1020        MetadataValue::String(v) => {
1021            buf.push(4);
1022            write_string(buf, v);
1023        }
1024        MetadataValue::Bytes(v) => {
1025            buf.push(5);
1026            write_bytes(buf, v);
1027        }
1028        MetadataValue::Array(values) => {
1029            buf.push(6);
1030            buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1031            for value in values {
1032                write_metadata_value(buf, value);
1033            }
1034        }
1035        MetadataValue::Object(values) => {
1036            buf.push(7);
1037            let mut entries: Vec<_> = values.iter().collect();
1038            entries.sort_by_key(|(a, _)| *a);
1039            buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
1040            for (key, value) in entries {
1041                write_string(buf, key);
1042                write_metadata_value(buf, value);
1043            }
1044        }
1045        MetadataValue::Timestamp(v) => {
1046            buf.push(8);
1047            buf.extend_from_slice(&v.to_le_bytes());
1048        }
1049        MetadataValue::Geo { lat, lon } => {
1050            buf.push(9);
1051            buf.extend_from_slice(&lat.to_le_bytes());
1052            buf.extend_from_slice(&lon.to_le_bytes());
1053        }
1054        MetadataValue::Reference(target) => {
1055            buf.push(10);
1056            write_ref_target(buf, target);
1057        }
1058        MetadataValue::References(targets) => {
1059            buf.push(11);
1060            buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
1061            for target in targets {
1062                write_ref_target(buf, target);
1063            }
1064        }
1065    }
1066}
1067
1068fn read_exact_slice<'a>(
1069    data: &'a [u8],
1070    pos: &mut usize,
1071    len: usize,
1072) -> Result<&'a [u8], StoreError> {
1073    if *pos + len > data.len() {
1074        return Err(StoreError::Serialization(
1075            "truncated metadata payload".to_string(),
1076        ));
1077    }
1078    let slice = &data[*pos..*pos + len];
1079    *pos += len;
1080    Ok(slice)
1081}
1082
1083fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
1084    let bytes = read_exact_slice(data, pos, 4)?;
1085    let mut raw = [0u8; 4];
1086    raw.copy_from_slice(bytes);
1087    Ok(u32::from_le_bytes(raw))
1088}
1089
1090fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
1091    let bytes = read_exact_slice(data, pos, 8)?;
1092    let mut raw = [0u8; 8];
1093    raw.copy_from_slice(bytes);
1094    Ok(u64::from_le_bytes(raw))
1095}
1096
1097fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
1098    let bytes = read_exact_slice(data, pos, 8)?;
1099    let mut raw = [0u8; 8];
1100    raw.copy_from_slice(bytes);
1101    Ok(i64::from_le_bytes(raw))
1102}
1103
1104fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
1105    let bytes = read_exact_slice(data, pos, 8)?;
1106    let mut raw = [0u8; 8];
1107    raw.copy_from_slice(bytes);
1108    Ok(f64::from_le_bytes(raw))
1109}
1110
1111fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
1112    let bytes = read_exact_slice(data, pos, 1)?;
1113    Ok(bytes[0])
1114}
1115
1116fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
1117    let len = read_u32(data, pos)? as usize;
1118    let bytes = read_exact_slice(data, pos, len)?;
1119    String::from_utf8(bytes.to_vec()).map_err(|err| StoreError::Serialization(err.to_string()))
1120}
1121
1122fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
1123    let len = read_u32(data, pos)? as usize;
1124    Ok(read_exact_slice(data, pos, len)?.to_vec())
1125}
1126
1127fn read_ref_target(
1128    data: &[u8],
1129    pos: &mut usize,
1130) -> Result<crate::storage::unified::RefTarget, StoreError> {
1131    use crate::storage::unified::RefTarget;
1132
1133    match read_u8(data, pos)? {
1134        0 => Ok(RefTarget::TableRow {
1135            table: read_string(data, pos)?,
1136            row_id: read_u64(data, pos)?,
1137        }),
1138        1 => Ok(RefTarget::Node {
1139            collection: read_string(data, pos)?,
1140            node_id: EntityId::new(read_u64(data, pos)?),
1141        }),
1142        2 => Ok(RefTarget::Edge {
1143            collection: read_string(data, pos)?,
1144            edge_id: EntityId::new(read_u64(data, pos)?),
1145        }),
1146        3 => Ok(RefTarget::Vector {
1147            collection: read_string(data, pos)?,
1148            vector_id: EntityId::new(read_u64(data, pos)?),
1149        }),
1150        4 => Ok(RefTarget::Entity {
1151            collection: read_string(data, pos)?,
1152            entity_id: EntityId::new(read_u64(data, pos)?),
1153        }),
1154        tag => Err(StoreError::Serialization(format!(
1155            "unknown metadata ref target tag {tag}"
1156        ))),
1157    }
1158}
1159
1160fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1161    match read_u8(data, pos)? {
1162        0 => Ok(MetadataValue::Null),
1163        1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1164        2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1165        3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1166        4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1167        5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1168        6 => {
1169            let count = read_u32(data, pos)? as usize;
1170            let mut values = Vec::with_capacity(count);
1171            for _ in 0..count {
1172                values.push(read_metadata_value(data, pos)?);
1173            }
1174            Ok(MetadataValue::Array(values))
1175        }
1176        7 => {
1177            let count = read_u32(data, pos)? as usize;
1178            let mut values = std::collections::HashMap::with_capacity(count);
1179            for _ in 0..count {
1180                let key = read_string(data, pos)?;
1181                let value = read_metadata_value(data, pos)?;
1182                values.insert(key, value);
1183            }
1184            Ok(MetadataValue::Object(values))
1185        }
1186        8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1187        9 => Ok(MetadataValue::Geo {
1188            lat: read_f64(data, pos)?,
1189            lon: read_f64(data, pos)?,
1190        }),
1191        10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1192        11 => {
1193            let count = read_u32(data, pos)? as usize;
1194            let mut targets = Vec::with_capacity(count);
1195            for _ in 0..count {
1196                targets.push(read_ref_target(data, pos)?);
1197            }
1198            Ok(MetadataValue::References(targets))
1199        }
1200        tag => Err(StoreError::Serialization(format!(
1201            "unknown metadata value tag {tag}"
1202        ))),
1203    }
1204}