Skip to main content

reddb_server/storage/unified/store/
impl_pages.rs

1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5const ENTITY_RECORD_MAGIC: &[u8; 4] = b"RER1";
6
7impl UnifiedStore {
8    pub(crate) fn mark_paged_registry_dirty(&self) {
9        self.paged_registry_dirty.store(true, Ordering::Release);
10    }
11
12    /// Get (or lazily create) the per-collection B-tree under a *read*
13    /// lock whenever possible. Returns a cloned `Arc<BTree>` so callers
14    /// can mutate the tree without holding the outer map's RwLock —
15    /// previously every insert serialised on `btree_indices.write()`,
16    /// costing ~60% of the concurrent-insert throughput ceiling.
17    pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
18        let pager = self.pager.as_ref()?;
19        if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
20            return Some(btree);
21        }
22        let mut write = self.btree_indices.write();
23        let btree = write
24            .entry(collection.to_string())
25            .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
26            .clone();
27        Some(btree)
28    }
29
30    pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
31        let Some(pager) = &self.pager else {
32            return Ok(());
33        };
34
35        if self.paged_registry_dirty.load(Ordering::Acquire) {
36            self.flush_paged_registry()?;
37            self.paged_registry_dirty.store(false, Ordering::Release);
38            return Ok(());
39        }
40
41        pager
42            .flush()
43            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
44    }
45
46    pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
47        let Some(pager) = &self.pager else {
48            return Ok(());
49        };
50
51        match pager.read_page(1) {
52            Ok(_) => {}
53            Err(PagerError::PageNotFound(_)) => {
54                let meta_page = pager
55                    .allocate_page(crate::storage::engine::PageType::Header)
56                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
57                pager
58                    .write_page(meta_page.page_id(), meta_page)
59                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
60            }
61            Err(e) => {
62                return Err(StoreError::Io(std::io::Error::other(e.to_string())));
63            }
64        }
65
66        let format_version = STORE_VERSION_V6;
67        self.set_format_version(format_version);
68
69        let collections = self.collections.read();
70        let btree_indices = self.btree_indices.read();
71        let mut collection_roots = Vec::with_capacity(collections.len());
72        for (name, _) in collections.iter() {
73            let root_page = btree_indices
74                .get(name)
75                .map_or(0, |btree| btree.root_page_id());
76            collection_roots.push((name.clone(), root_page));
77        }
78        drop(btree_indices);
79        drop(collections);
80
81        let mut meta_data = Vec::with_capacity(4096);
82        meta_data.extend_from_slice(METADATA_MAGIC);
83        meta_data.extend_from_slice(&format_version.to_le_bytes());
84        meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
85        for (name, root_page) in &collection_roots {
86            meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
87            meta_data.extend_from_slice(name.as_bytes());
88            meta_data.extend_from_slice(&root_page.to_le_bytes());
89        }
90
91        let cross_refs = self.cross_refs.read();
92        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
93        meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
94        for (source_id, refs) in cross_refs.iter() {
95            for (target_id, ref_type, collection) in refs {
96                meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
97                meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
98                meta_data.push(ref_type.to_byte());
99                meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
100                meta_data.extend_from_slice(collection.as_bytes());
101            }
102        }
103
104        let mut meta_page =
105            crate::storage::engine::Page::new(crate::storage::engine::PageType::Header, 1);
106        let page_data = meta_page.as_bytes_mut();
107        let content_start = crate::storage::engine::HEADER_SIZE;
108        let copy_len = meta_data.len().min(page_data.len() - content_start);
109        page_data[content_start..content_start + copy_len].copy_from_slice(&meta_data[..copy_len]);
110
111        pager
112            .write_meta_shadow(&meta_page)
113            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
114        pager
115            .write_page(1, meta_page)
116            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
117        pager
118            .flush()
119            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
120
121        Ok(())
122    }
123
124    /// Get a reference to the underlying pager (if in paged mode).
125    pub fn pager(&self) -> Option<&Arc<Pager>> {
126        self.pager.as_ref()
127    }
128
129    /// Borrow the immutable store configuration. Runtime hooks (e.g. the
130    /// `auto_index_id` first-insert hook in `MutationEngine`) read knobs
131    /// off this struct without going through the legacy global config tree.
132    pub fn config(&self) -> &UnifiedStoreConfig {
133        &self.config
134    }
135
136    pub fn with_config(config: UnifiedStoreConfig) -> Self {
137        Self {
138            config,
139            format_version: AtomicU32::new(STORE_VERSION_V6),
140            next_entity_id: AtomicU64::new(1),
141            collections: RwLock::new(HashMap::new()),
142            cross_refs: RwLock::new(HashMap::new()),
143            reverse_refs: RwLock::new(HashMap::new()),
144            pager: None,
145            db_path: None,
146            btree_indices: RwLock::new(HashMap::new()),
147            context_index: ContextIndex::new(),
148            entity_cache: EntityCache::new(),
149            graph_label_index: RwLock::new(HashMap::new()),
150            paged_registry_dirty: AtomicBool::new(false),
151            commit: None,
152            unindex_cross_refs_fast_path: AtomicU64::new(0),
153        }
154    }
155
156    /// Open or create a page-based database
157    ///
158    /// This uses the page engine for ACID durability with B-tree indices.
159    /// The database file uses 4KB pages with checksums and efficient caching.
160    ///
161    /// # Arguments
162    ///
163    /// * `path` - Path to the database file (e.g., "data.rdb")
164    ///
165    /// # Example
166    ///
167    /// ```rust,ignore
168    /// let store = UnifiedStore::open("security.rdb")?;
169    /// store.create_collection("hosts")?;
170    /// // ... operations ...
171    /// store.persist()?; // Flush to disk
172    /// ```
173    pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
174        Self::open_with_config(path, UnifiedStoreConfig::default())
175    }
176
177    pub fn open_with_config(
178        path: impl AsRef<Path>,
179        config: UnifiedStoreConfig,
180    ) -> Result<Self, StoreError> {
181        let path = path.as_ref();
182        let mut pager_config = PagerConfig::default();
183        // Tunables via env — experimental, used by the benchmark harness
184        // to compare durability profiles head-to-head with Postgres.
185        // REDDB_DOUBLE_WRITE=0 disables the double-write buffer, which
186        // otherwise adds two fsyncs per pager flush (one on DWB, one
187        // on the main file). With DWB off the pager behaves more like
188        // Postgres + full_page_writes=off — we trade torn-page
189        // protection for ingest throughput.
190        if matches!(
191            std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
192            Some("0") | Some("false") | Some("off")
193        ) {
194            pager_config.double_write = false;
195        }
196        let pager = Pager::open(path, pager_config)
197            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
198
199        let wal_path = Self::wal_path_for_db(path);
200        let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
201            Some(Arc::new(
202                StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
203                    .map_err(StoreError::Io)?,
204            ))
205        } else {
206            None
207        };
208
209        let store = Self {
210            config,
211            format_version: AtomicU32::new(STORE_VERSION_V6),
212            next_entity_id: AtomicU64::new(1),
213            collections: RwLock::new(HashMap::new()),
214            cross_refs: RwLock::new(HashMap::new()),
215            reverse_refs: RwLock::new(HashMap::new()),
216            pager: Some(Arc::new(pager)),
217            db_path: Some(path.to_path_buf()),
218            btree_indices: RwLock::new(HashMap::new()),
219            context_index: ContextIndex::new(),
220            entity_cache: EntityCache::new(),
221            graph_label_index: RwLock::new(HashMap::new()),
222            paged_registry_dirty: AtomicBool::new(false),
223            commit,
224            unindex_cross_refs_fast_path: AtomicU64::new(0),
225        };
226
227        // Load existing data from pages if database exists
228        store.load_from_pages()?;
229        if let Some(commit) = &store.commit {
230            commit.replay_into(&store).map_err(StoreError::Io)?;
231        }
232
233        Ok(store)
234    }
235
236    /// Load data from page-based storage
237    ///
238    /// Reads the B-tree indices and reconstructs collections from pages.
239    fn load_from_pages(&self) -> Result<(), StoreError> {
240        let pager = match &self.pager {
241            Some(p) => p,
242            None => return Ok(()), // No pager, nothing to load
243        };
244
245        // Get page count
246        let page_count = pager.page_count().map_err(|e| {
247            StoreError::Io(std::io::Error::other(format!(
248                "failed to read page count: {}",
249                e
250            )))
251        })?;
252        if page_count <= 1 {
253            // Empty database (only header page)
254            return Ok(());
255        }
256
257        // Read metadata from page 1 (collections registry)
258        // Falls back to metadata shadow if page 1 is corrupted
259        let meta_page_result = pager
260            .read_page(1)
261            .or_else(|_| pager.recover_meta_from_shadow());
262        if let Ok(meta_page) = meta_page_result {
263            let data = meta_page.as_bytes();
264            // Skip header (32 bytes), read content area
265            let content = &data[crate::storage::engine::HEADER_SIZE..];
266            if content.len() >= 4 {
267                let mut pos = 0;
268                let mut format_version = STORE_VERSION_V1;
269
270                if content.len() >= 8 && &content[0..4] == METADATA_MAGIC {
271                    format_version =
272                        u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
273                    pos += 8;
274                }
275
276                self.set_format_version(format_version);
277
278                // Collection count
279                let collection_count = u32::from_le_bytes([
280                    content[pos],
281                    content[pos + 1],
282                    content[pos + 2],
283                    content[pos + 3],
284                ]) as usize;
285                pos += 4;
286
287                // Read collection names and their B-tree root pages
288                for _ in 0..collection_count {
289                    if pos + 4 > content.len() {
290                        break;
291                    }
292
293                    let name_len = u32::from_le_bytes([
294                        content[pos],
295                        content[pos + 1],
296                        content[pos + 2],
297                        content[pos + 3],
298                    ]) as usize;
299                    pos += 4;
300
301                    if pos + name_len + 4 > content.len() {
302                        break;
303                    }
304
305                    if let Ok(name) = String::from_utf8(content[pos..pos + name_len].to_vec()) {
306                        pos += name_len;
307
308                        // Root page ID for this collection's B-tree
309                        let root_page = u32::from_le_bytes([
310                            content[pos],
311                            content[pos + 1],
312                            content[pos + 2],
313                            content[pos + 3],
314                        ]);
315                        pos += 4;
316
317                        // Hydrate the collection in memory only. Loading must
318                        // not emit WAL entries or rewrite the on-disk registry
319                        // before the existing B-tree roots are attached.
320                        let _ = self.create_collection_in_memory(&name);
321
322                        // Load B-tree with root page if it exists
323                        if root_page > 0 {
324                            let btree = BTree::with_root(Arc::clone(pager), root_page);
325
326                            // Load all entities from B-tree into the collection
327                            if let Ok(mut cursor) = btree.cursor_first() {
328                                let manager = self.get_collection(&name);
329                                while let Ok(Some((key, value))) = cursor.next() {
330                                    // Deserialize entity from value bytes
331                                    if let Ok((entity, metadata)) = Self::deserialize_entity_record(
332                                        &value,
333                                        self.format_version(),
334                                    ) {
335                                        if let Some(m) = &manager {
336                                            let id = entity.id;
337                                            if let EntityKind::TableRow { row_id, .. } =
338                                                &entity.kind
339                                            {
340                                                m.register_row_id(*row_id);
341                                            }
342                                            self.context_index.index_entity(&name, &entity);
343                                            let _ = m.insert(entity.clone());
344                                            if let Some(metadata) = metadata {
345                                                let _ = m.set_metadata(id, metadata);
346                                            }
347                                            self.register_entity_id(id);
348                                            if self.config.auto_index_refs {
349                                                self.index_cross_refs(&entity, &name)?;
350                                            }
351                                        }
352                                    }
353                                }
354                            }
355
356                            // Store the B-tree for future lookups
357                            self.btree_indices.write().insert(name, Arc::new(btree));
358                        }
359                    } else {
360                        pos += name_len + 4;
361                    }
362                }
363
364                if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
365                    let cross_ref_count = u32::from_le_bytes([
366                        content[pos],
367                        content[pos + 1],
368                        content[pos + 2],
369                        content[pos + 3],
370                    ]) as usize;
371                    pos += 4;
372
373                    for _ in 0..cross_ref_count {
374                        if pos + 17 > content.len() {
375                            break;
376                        }
377                        let source_id = u64::from_le_bytes([
378                            content[pos],
379                            content[pos + 1],
380                            content[pos + 2],
381                            content[pos + 3],
382                            content[pos + 4],
383                            content[pos + 5],
384                            content[pos + 6],
385                            content[pos + 7],
386                        ]);
387                        pos += 8;
388                        let target_id = u64::from_le_bytes([
389                            content[pos],
390                            content[pos + 1],
391                            content[pos + 2],
392                            content[pos + 3],
393                            content[pos + 4],
394                            content[pos + 5],
395                            content[pos + 6],
396                            content[pos + 7],
397                        ]);
398                        pos += 8;
399                        let ref_type = RefType::from_byte(content[pos]);
400                        pos += 1;
401
402                        if pos + 4 > content.len() {
403                            break;
404                        }
405                        let name_len = u32::from_le_bytes([
406                            content[pos],
407                            content[pos + 1],
408                            content[pos + 2],
409                            content[pos + 3],
410                        ]) as usize;
411                        pos += 4;
412                        if pos + name_len > content.len() {
413                            break;
414                        }
415                        let target_collection =
416                            String::from_utf8_lossy(&content[pos..pos + name_len]).to_string();
417                        pos += name_len;
418
419                        let source_id = EntityId::new(source_id);
420                        let target_id = EntityId::new(target_id);
421
422                        self.cross_refs.write().entry(source_id).or_default().push((
423                            target_id,
424                            ref_type,
425                            target_collection.clone(),
426                        ));
427
428                        if let Some((collection, mut entity)) = self.get_any(source_id) {
429                            let exists = entity.cross_refs().iter().any(|xref| {
430                                xref.target == target_id
431                                    && xref.ref_type == ref_type
432                                    && xref.target_collection == target_collection
433                            });
434                            if !exists {
435                                entity.cross_refs_mut().push(CrossRef::new(
436                                    source_id,
437                                    target_id,
438                                    target_collection.clone(),
439                                    ref_type,
440                                ));
441                                if let Some(manager) = self.get_collection(&collection) {
442                                    let _ = manager.update(entity);
443                                }
444                            }
445                        }
446                    }
447                }
448            }
449        }
450
451        if self.format_version() < STORE_VERSION_V6 {
452            self.set_format_version(STORE_VERSION_V6);
453        }
454
455        Ok(())
456    }
457
458    /// Deserialize an entity from binary bytes
459    pub(crate) fn deserialize_entity(
460        data: &[u8],
461        format_version: u32,
462    ) -> Result<UnifiedEntity, StoreError> {
463        let mut pos = 0;
464        Self::read_entity_binary(data, &mut pos, format_version)
465            .map_err(|e| StoreError::Serialization(e.to_string()))
466    }
467
468    /// Serialize an entity to binary bytes
469    pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
470        // Pre-allocate ~256 bytes to cover the typical 15-column
471        // typed row without any Vec growth. Bulk insert calls this
472        // millions of times per bench run; saving 2-3 reallocs per
473        // entity amortises.
474        let mut buf = Vec::with_capacity(256);
475        Self::write_entity_binary(&mut buf, entity, format_version);
476        buf
477    }
478
479    pub(crate) fn serialize_entity_record(
480        entity: &UnifiedEntity,
481        metadata: Option<&Metadata>,
482        format_version: u32,
483    ) -> Vec<u8> {
484        let entity_bytes = Self::serialize_entity(entity, format_version);
485        // Skip the intermediate metadata Vec when there's no metadata
486        // (common OLTP bulk-insert case): write a zero-length prefix
487        // directly into the record buffer. Only fall back to the old
488        // serialize_metadata() allocation when the caller actually
489        // has fields to persist.
490        let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
491        if has_meta {
492            let metadata_bytes = serialize_metadata(metadata);
493            let mut buf = Vec::with_capacity(12 + entity_bytes.len() + metadata_bytes.len());
494            buf.extend_from_slice(ENTITY_RECORD_MAGIC);
495            buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
496            buf.extend_from_slice(&entity_bytes);
497            buf.extend_from_slice(&(metadata_bytes.len() as u32).to_le_bytes());
498            buf.extend_from_slice(&metadata_bytes);
499            buf
500        } else {
501            let mut buf = Vec::with_capacity(12 + entity_bytes.len());
502            buf.extend_from_slice(ENTITY_RECORD_MAGIC);
503            buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
504            buf.extend_from_slice(&entity_bytes);
505            buf.extend_from_slice(&0u32.to_le_bytes());
506            buf
507        }
508    }
509
510    pub(crate) fn deserialize_entity_record(
511        data: &[u8],
512        format_version: u32,
513    ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
514        if data.len() < 8 || &data[..4] != ENTITY_RECORD_MAGIC {
515            return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
516        }
517
518        let mut pos = 4usize;
519        let entity_len = read_u32(data, &mut pos)? as usize;
520        if pos + entity_len > data.len() {
521            return Err(StoreError::Serialization(
522                "truncated entity record payload".to_string(),
523            ));
524        }
525        let entity = Self::deserialize_entity(&data[pos..pos + entity_len], format_version)?;
526        pos += entity_len;
527
528        let metadata_len = read_u32(data, &mut pos)? as usize;
529        if pos + metadata_len > data.len() {
530            return Err(StoreError::Serialization(
531                "truncated entity record metadata".to_string(),
532            ));
533        }
534        let metadata = if metadata_len == 0 {
535            None
536        } else {
537            let metadata = deserialize_metadata(&data[pos..pos + metadata_len])?;
538            if metadata.is_empty() {
539                None
540            } else {
541                Some(metadata)
542            }
543        };
544
545        Ok((entity, metadata))
546    }
547
548    /// Persist all data to page-based storage
549    ///
550    /// Writes all entities to B-tree pages and flushes to disk.
551    /// This provides ACID durability guarantees.
552    pub fn persist(&self) -> Result<(), StoreError> {
553        let pager = match &self.pager {
554            Some(p) => p,
555            None => {
556                // No pager attached - use binary file fallback if path available
557                if let Some(path) = &self.db_path {
558                    return self
559                        .save_to_file(path)
560                        .map_err(|e| StoreError::Serialization(e.to_string()));
561                }
562                return Err(StoreError::Io(std::io::Error::other(
563                    "No pager or path configured for persistence",
564                )));
565            }
566        };
567
568        match pager.read_page(1) {
569            Ok(_) => {}
570            Err(PagerError::PageNotFound(_)) => {
571                let meta_page = pager
572                    .allocate_page(crate::storage::engine::PageType::Header)
573                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
574                pager
575                    .write_page(meta_page.page_id(), meta_page)
576                    .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
577            }
578            Err(e) => {
579                return Err(StoreError::Io(std::io::Error::other(e.to_string())));
580            }
581        }
582
583        if let Some(commit) = &self.commit {
584            commit.force_sync().map_err(StoreError::Io)?;
585        }
586
587        let collections = self.collections.read();
588        let mut btree_indices = self.btree_indices.write();
589
590        // Collect collection names and their B-tree root pages
591        let mut collection_roots: Vec<(String, u32)> = Vec::new();
592
593        // For each collection, rebuild the B-tree from the live manager state.
594        // A checkpoint must preserve deletes too, not just upsert the current rows.
595        for (name, manager) in collections.iter() {
596            let btree = btree_indices
597                .entry(name.clone())
598                .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
599
600            let mut existing_keys = Vec::new();
601            if !btree.is_empty() {
602                let mut cursor = btree.cursor_first().map_err(|e| {
603                    StoreError::Io(std::io::Error::other(format!(
604                        "B-tree cursor error while rebuilding '{name}': {e}"
605                    )))
606                })?;
607                while let Some((key, _)) = cursor.next().map_err(|e| {
608                    StoreError::Io(std::io::Error::other(format!(
609                        "B-tree scan error while rebuilding '{name}': {e}"
610                    )))
611                })? {
612                    existing_keys.push(key);
613                }
614            }
615
616            for key in existing_keys {
617                btree.delete(&key).map_err(|e| {
618                    StoreError::Io(std::io::Error::other(format!(
619                        "B-tree delete error while rebuilding '{name}': {e}"
620                    )))
621                })?;
622            }
623
624            let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
625                .query_all(|_| true)
626                .into_iter()
627                .map(|entity| {
628                    let metadata = manager.get_metadata(entity.id);
629                    (
630                        entity.id.raw().to_be_bytes().to_vec(),
631                        Self::serialize_entity_record(
632                            &entity,
633                            metadata.as_ref(),
634                            self.format_version(),
635                        ),
636                    )
637                })
638                .collect();
639            records.sort_by(|left, right| left.0.cmp(&right.0));
640
641            // Skip rows whose serialised value exceeds the B-tree's
642            // per-value limit. The bulk insert otherwise aborts the
643            // whole rebuild, which historically poisoned downstream
644            // operations on this collection (callers saw the rebuild
645            // error bubble up as `grpc BulkInsertBinary` failures even
646            // though the user-issued write itself was harmless). The
647            // primary write path enforces `MAX_VALUE_SIZE` already,
648            // so oversized rows here are leftovers from older inserts
649            // (e.g. `red_stats` MCV/histogram arrays predating the
650            // size cap in `stats_catalog`). Logging keeps them
651            // visible for VACUUM-style cleanup later.
652            let max_value_size = crate::storage::engine::btree::MAX_VALUE_SIZE;
653            let original_len = records.len();
654            records.retain(|(_, value)| {
655                if value.len() > max_value_size {
656                    // F-04: `name` is a tenant-supplied collection name.
657                    // Route through LogField escaper to neutralise
658                    // CR/LF/control-byte injection (ADR 0010).
659                    tracing::warn!(
660                        collection = %reddb_wire::audit_safe_log_field(name),
661                        bytes = value.len(),
662                        max = max_value_size,
663                        "skipping oversized row during B-tree bulk rebuild"
664                    );
665                    false
666                } else {
667                    true
668                }
669            });
670            let dropped = original_len - records.len();
671            if dropped > 0 {
672                // F-04: tenant-supplied `name` interpolated into the
673                // structured `collection` field AND the message
674                // string. Sanitize both via the LogField escaper.
675                let safe_name = format!("{}", reddb_wire::audit_safe_log_field(name));
676                tracing::warn!(
677                    collection = %safe_name,
678                    dropped,
679                    "dropped {dropped} oversized row(s) from '{safe_name}' on rebuild — \
680                     the rows remain readable via the in-memory entity store but \
681                     are absent from the on-disk B-tree until they are rewritten"
682                );
683            }
684
685            if !records.is_empty() {
686                btree.bulk_insert_sorted(&records).map_err(|e| {
687                    StoreError::Io(std::io::Error::other(format!(
688                        "B-tree bulk rebuild error for '{name}': {e}"
689                    )))
690                })?;
691            }
692
693            collection_roots.push((name.clone(), btree.root_page_id()));
694        }
695
696        // Write collection metadata to page 1
697        let mut meta_data = Vec::with_capacity(4096);
698
699        let format_version = STORE_VERSION_V6;
700        self.set_format_version(format_version);
701
702        // Metadata header: magic + version + collection count
703        meta_data.extend_from_slice(METADATA_MAGIC);
704        meta_data.extend_from_slice(&format_version.to_le_bytes());
705        meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
706
707        // Write each collection's name and B-tree root page
708        for (name, root_page) in &collection_roots {
709            // Name length
710            meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
711            // Name
712            meta_data.extend_from_slice(name.as_bytes());
713            // Root page ID from actual B-tree
714            meta_data.extend_from_slice(&root_page.to_le_bytes());
715        }
716
717        // Write cross-reference metadata
718        let cross_refs = self.cross_refs.read();
719        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
720        meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
721        for (source_id, refs) in cross_refs.iter() {
722            for (target_id, ref_type, collection) in refs {
723                meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
724                meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
725                meta_data.push(ref_type.to_byte());
726                meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
727                meta_data.extend_from_slice(collection.as_bytes());
728            }
729        }
730
731        // Create metadata page with Header type
732        let mut meta_page = crate::storage::engine::Page::new(
733            crate::storage::engine::PageType::Header,
734            1, // page_id = 1
735        );
736        // Copy metadata into page content area (after header)
737        let page_data = meta_page.as_bytes_mut();
738        let content_start = crate::storage::engine::HEADER_SIZE;
739        let copy_len = meta_data.len().min(page_data.len() - content_start);
740        page_data[content_start..content_start + copy_len].copy_from_slice(&meta_data[..copy_len]);
741
742        // Write metadata shadow FIRST (intact copy in case main write fails)
743        pager
744            .write_meta_shadow(&meta_page)
745            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
746
747        // Write page
748        pager
749            .write_page(1, meta_page)
750            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
751
752        // Flush and fsync all pages to disk
753        pager
754            .sync()
755            .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
756
757        if let Some(commit) = &self.commit {
758            commit.truncate().map_err(StoreError::Io)?;
759        }
760
761        Ok(())
762    }
763
764    /// Check if the store is using page-based persistence
765    pub fn is_paged(&self) -> bool {
766        self.pager.is_some()
767    }
768
769    /// Current root page for a collection's primary B-tree, if one has
770    /// been materialized in this store.
771    pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
772        self.btree_indices
773            .read()
774            .get(collection)
775            .map(|btree| btree.root_page_id())
776            .filter(|root| *root != 0)
777    }
778
779    /// Get the database file path (if using paged mode)
780    pub fn db_path(&self) -> Option<&Path> {
781        self.db_path.as_deref()
782    }
783}
784
785fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
786    let Some(metadata) = metadata else {
787        return Vec::new();
788    };
789    if metadata.is_empty() {
790        return Vec::new();
791    }
792
793    let mut entries: Vec<_> = metadata.iter().collect();
794    entries.sort_by_key(|(a, _)| *a);
795
796    let mut buf = Vec::new();
797    buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
798    for (key, value) in entries {
799        write_string(&mut buf, key);
800        write_metadata_value(&mut buf, value);
801    }
802    buf
803}
804
805fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
806    let mut pos = 0usize;
807    let count = read_u32(data, &mut pos)? as usize;
808    let mut metadata = Metadata::new();
809    for _ in 0..count {
810        let key = read_string(data, &mut pos)?;
811        let value = read_metadata_value(data, &mut pos)?;
812        metadata.set(key, value);
813    }
814    Ok(metadata)
815}
816
817fn write_string(buf: &mut Vec<u8>, value: &str) {
818    buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
819    buf.extend_from_slice(value.as_bytes());
820}
821
822fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
823    buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
824    buf.extend_from_slice(value);
825}
826
827fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
828    use crate::storage::unified::RefTarget;
829
830    match target {
831        RefTarget::TableRow { table, row_id } => {
832            buf.push(0);
833            write_string(buf, table);
834            buf.extend_from_slice(&row_id.to_le_bytes());
835        }
836        RefTarget::Node {
837            collection,
838            node_id,
839        } => {
840            buf.push(1);
841            write_string(buf, collection);
842            buf.extend_from_slice(&node_id.raw().to_le_bytes());
843        }
844        RefTarget::Edge {
845            collection,
846            edge_id,
847        } => {
848            buf.push(2);
849            write_string(buf, collection);
850            buf.extend_from_slice(&edge_id.raw().to_le_bytes());
851        }
852        RefTarget::Vector {
853            collection,
854            vector_id,
855        } => {
856            buf.push(3);
857            write_string(buf, collection);
858            buf.extend_from_slice(&vector_id.raw().to_le_bytes());
859        }
860        RefTarget::Entity {
861            collection,
862            entity_id,
863        } => {
864            buf.push(4);
865            write_string(buf, collection);
866            buf.extend_from_slice(&entity_id.raw().to_le_bytes());
867        }
868    }
869}
870
871fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
872    match value {
873        MetadataValue::Null => buf.push(0),
874        MetadataValue::Bool(v) => {
875            buf.push(1);
876            buf.push(u8::from(*v));
877        }
878        MetadataValue::Int(v) => {
879            buf.push(2);
880            buf.extend_from_slice(&v.to_le_bytes());
881        }
882        MetadataValue::Float(v) => {
883            buf.push(3);
884            buf.extend_from_slice(&v.to_le_bytes());
885        }
886        MetadataValue::String(v) => {
887            buf.push(4);
888            write_string(buf, v);
889        }
890        MetadataValue::Bytes(v) => {
891            buf.push(5);
892            write_bytes(buf, v);
893        }
894        MetadataValue::Array(values) => {
895            buf.push(6);
896            buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
897            for value in values {
898                write_metadata_value(buf, value);
899            }
900        }
901        MetadataValue::Object(values) => {
902            buf.push(7);
903            let mut entries: Vec<_> = values.iter().collect();
904            entries.sort_by_key(|(a, _)| *a);
905            buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
906            for (key, value) in entries {
907                write_string(buf, key);
908                write_metadata_value(buf, value);
909            }
910        }
911        MetadataValue::Timestamp(v) => {
912            buf.push(8);
913            buf.extend_from_slice(&v.to_le_bytes());
914        }
915        MetadataValue::Geo { lat, lon } => {
916            buf.push(9);
917            buf.extend_from_slice(&lat.to_le_bytes());
918            buf.extend_from_slice(&lon.to_le_bytes());
919        }
920        MetadataValue::Reference(target) => {
921            buf.push(10);
922            write_ref_target(buf, target);
923        }
924        MetadataValue::References(targets) => {
925            buf.push(11);
926            buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
927            for target in targets {
928                write_ref_target(buf, target);
929            }
930        }
931    }
932}
933
934fn read_exact_slice<'a>(
935    data: &'a [u8],
936    pos: &mut usize,
937    len: usize,
938) -> Result<&'a [u8], StoreError> {
939    if *pos + len > data.len() {
940        return Err(StoreError::Serialization(
941            "truncated metadata payload".to_string(),
942        ));
943    }
944    let slice = &data[*pos..*pos + len];
945    *pos += len;
946    Ok(slice)
947}
948
949fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
950    let bytes = read_exact_slice(data, pos, 4)?;
951    let mut raw = [0u8; 4];
952    raw.copy_from_slice(bytes);
953    Ok(u32::from_le_bytes(raw))
954}
955
956fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
957    let bytes = read_exact_slice(data, pos, 8)?;
958    let mut raw = [0u8; 8];
959    raw.copy_from_slice(bytes);
960    Ok(u64::from_le_bytes(raw))
961}
962
963fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
964    let bytes = read_exact_slice(data, pos, 8)?;
965    let mut raw = [0u8; 8];
966    raw.copy_from_slice(bytes);
967    Ok(i64::from_le_bytes(raw))
968}
969
970fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
971    let bytes = read_exact_slice(data, pos, 8)?;
972    let mut raw = [0u8; 8];
973    raw.copy_from_slice(bytes);
974    Ok(f64::from_le_bytes(raw))
975}
976
977fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
978    let bytes = read_exact_slice(data, pos, 1)?;
979    Ok(bytes[0])
980}
981
982fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
983    let len = read_u32(data, pos)? as usize;
984    let bytes = read_exact_slice(data, pos, len)?;
985    String::from_utf8(bytes.to_vec()).map_err(|err| StoreError::Serialization(err.to_string()))
986}
987
988fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
989    let len = read_u32(data, pos)? as usize;
990    Ok(read_exact_slice(data, pos, len)?.to_vec())
991}
992
993fn read_ref_target(
994    data: &[u8],
995    pos: &mut usize,
996) -> Result<crate::storage::unified::RefTarget, StoreError> {
997    use crate::storage::unified::RefTarget;
998
999    match read_u8(data, pos)? {
1000        0 => Ok(RefTarget::TableRow {
1001            table: read_string(data, pos)?,
1002            row_id: read_u64(data, pos)?,
1003        }),
1004        1 => Ok(RefTarget::Node {
1005            collection: read_string(data, pos)?,
1006            node_id: EntityId::new(read_u64(data, pos)?),
1007        }),
1008        2 => Ok(RefTarget::Edge {
1009            collection: read_string(data, pos)?,
1010            edge_id: EntityId::new(read_u64(data, pos)?),
1011        }),
1012        3 => Ok(RefTarget::Vector {
1013            collection: read_string(data, pos)?,
1014            vector_id: EntityId::new(read_u64(data, pos)?),
1015        }),
1016        4 => Ok(RefTarget::Entity {
1017            collection: read_string(data, pos)?,
1018            entity_id: EntityId::new(read_u64(data, pos)?),
1019        }),
1020        tag => Err(StoreError::Serialization(format!(
1021            "unknown metadata ref target tag {tag}"
1022        ))),
1023    }
1024}
1025
1026fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1027    match read_u8(data, pos)? {
1028        0 => Ok(MetadataValue::Null),
1029        1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1030        2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1031        3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1032        4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1033        5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1034        6 => {
1035            let count = read_u32(data, pos)? as usize;
1036            let mut values = Vec::with_capacity(count);
1037            for _ in 0..count {
1038                values.push(read_metadata_value(data, pos)?);
1039            }
1040            Ok(MetadataValue::Array(values))
1041        }
1042        7 => {
1043            let count = read_u32(data, pos)? as usize;
1044            let mut values = std::collections::HashMap::with_capacity(count);
1045            for _ in 0..count {
1046                let key = read_string(data, pos)?;
1047                let value = read_metadata_value(data, pos)?;
1048                values.insert(key, value);
1049            }
1050            Ok(MetadataValue::Object(values))
1051        }
1052        8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1053        9 => Ok(MetadataValue::Geo {
1054            lat: read_f64(data, pos)?,
1055            lon: read_f64(data, pos)?,
1056        }),
1057        10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1058        11 => {
1059            let count = read_u32(data, pos)? as usize;
1060            let mut targets = Vec::with_capacity(count);
1061            for _ in 0..count {
1062                targets.push(read_ref_target(data, pos)?);
1063            }
1064            Ok(MetadataValue::References(targets))
1065        }
1066        tag => Err(StoreError::Serialization(format!(
1067            "unknown metadata value tag {tag}"
1068        ))),
1069    }
1070}