1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5const ENTITY_RECORD_MAGIC: &[u8; 4] = b"RER1";
6
7const METADATA_OVERFLOW_MAGIC: &[u8; 4] = b"RDM3";
26const META_PAGE_CONTENT_CAP: usize =
27 crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE;
28const META_V3_PAGE1_HEADER: usize = 16;
29const META_V3_OVERFLOW_HEADER: usize = 8;
30const META_V3_FIRST_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_PAGE1_HEADER;
31const META_V3_OVERFLOW_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_OVERFLOW_HEADER;
32
33fn free_existing_overflow_chain(pager: &Pager) -> Result<(), PagerError> {
34 let cs = crate::storage::engine::HEADER_SIZE;
35 let page = match pager.read_page(1) {
36 Ok(p) => p,
37 Err(_) => return Ok(()),
38 };
39 let bytes = page.as_bytes();
40 if bytes.len() < cs + META_V3_PAGE1_HEADER {
41 return Ok(());
42 }
43 if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
44 return Ok(());
45 }
46 let mut next = u32::from_le_bytes([
47 bytes[cs + 12],
48 bytes[cs + 13],
49 bytes[cs + 14],
50 bytes[cs + 15],
51 ]);
52 while next != 0 {
53 let ov = match pager.read_page(next) {
54 Ok(p) => p,
55 Err(_) => break,
56 };
57 let ob = ov.as_bytes();
58 let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
59 let _ = pager.free_page(next);
60 next = nn;
61 }
62 Ok(())
63}
64
65fn build_meta_page1_with_overflow(
66 pager: &Pager,
67 meta_data: &[u8],
68) -> Result<crate::storage::engine::Page, PagerError> {
69 use crate::storage::engine::{Page, PageType, HEADER_SIZE};
70 free_existing_overflow_chain(pager)?;
71
72 let mut page1 = Page::new(PageType::Header, 1);
73 let cs = HEADER_SIZE;
74
75 if meta_data.len() <= META_PAGE_CONTENT_CAP {
76 let buf = page1.as_bytes_mut();
78 buf[cs..cs + meta_data.len()].copy_from_slice(meta_data);
79 return Ok(page1);
80 }
81
82 let first_chunk = &meta_data[..META_V3_FIRST_PAYLOAD_CAP];
86 let mut tail = &meta_data[META_V3_FIRST_PAYLOAD_CAP..];
87 let mut chunks: Vec<&[u8]> = Vec::new();
88 while !tail.is_empty() {
89 let take = tail.len().min(META_V3_OVERFLOW_PAYLOAD_CAP);
90 chunks.push(&tail[..take]);
91 tail = &tail[take..];
92 }
93
94 let mut overflow_pages: Vec<Page> = Vec::with_capacity(chunks.len());
95 let mut overflow_ids: Vec<u32> = Vec::with_capacity(chunks.len());
96 for _ in 0..chunks.len() {
97 let pg = pager.allocate_page(PageType::Overflow)?;
98 overflow_ids.push(pg.page_id());
99 overflow_pages.push(pg);
100 }
101
102 for i in 0..chunks.len() {
103 let next = if i + 1 < chunks.len() {
104 overflow_ids[i + 1]
105 } else {
106 0u32
107 };
108 let len = chunks[i].len() as u32;
109 let buf = overflow_pages[i].as_bytes_mut();
110 buf[cs..cs + 4].copy_from_slice(&next.to_le_bytes());
111 buf[cs + 4..cs + 8].copy_from_slice(&len.to_le_bytes());
112 buf[cs + 8..cs + 8 + chunks[i].len()].copy_from_slice(chunks[i]);
113 }
114 for (idx, page) in overflow_pages.into_iter().enumerate() {
115 let id = overflow_ids[idx];
116 pager.write_page(id, page)?;
117 }
118
119 let format_version = if meta_data.len() >= 8 && &meta_data[0..4] == METADATA_MAGIC {
121 u32::from_le_bytes([meta_data[4], meta_data[5], meta_data[6], meta_data[7]])
122 } else {
123 0
124 };
125
126 let buf = page1.as_bytes_mut();
127 buf[cs..cs + 4].copy_from_slice(METADATA_OVERFLOW_MAGIC);
128 buf[cs + 4..cs + 8].copy_from_slice(&format_version.to_le_bytes());
129 buf[cs + 8..cs + 12].copy_from_slice(&(meta_data.len() as u32).to_le_bytes());
130 buf[cs + 12..cs + 16].copy_from_slice(&overflow_ids[0].to_le_bytes());
131 buf[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_chunk.len()]
132 .copy_from_slice(first_chunk);
133
134 Ok(page1)
135}
136
137fn read_meta_payload(pager: &Pager) -> Option<Vec<u8>> {
143 let cs = crate::storage::engine::HEADER_SIZE;
144 let meta_page = pager
145 .read_page(1)
146 .or_else(|_| pager.recover_meta_from_shadow())
147 .ok()?;
148 let bytes = meta_page.as_bytes();
149 if bytes.len() < cs + 4 {
150 return Some(bytes.get(cs..).unwrap_or(&[]).to_vec());
151 }
152 if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
153 return Some(bytes[cs..].to_vec());
154 }
155 if bytes.len() < cs + META_V3_PAGE1_HEADER {
156 return None;
157 }
158 let total =
159 u32::from_le_bytes([bytes[cs + 8], bytes[cs + 9], bytes[cs + 10], bytes[cs + 11]]) as usize;
160 let mut next = u32::from_le_bytes([
161 bytes[cs + 12],
162 bytes[cs + 13],
163 bytes[cs + 14],
164 bytes[cs + 15],
165 ]);
166 let mut payload: Vec<u8> = Vec::with_capacity(total);
167 let first_take = total.min(META_V3_FIRST_PAYLOAD_CAP);
168 payload.extend_from_slice(
169 &bytes[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_take],
170 );
171 while next != 0 && payload.len() < total {
172 let ov = pager.read_page(next).ok()?;
173 let ob = ov.as_bytes();
174 if ob.len() < cs + META_V3_OVERFLOW_HEADER {
175 return None;
176 }
177 let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
178 let len = u32::from_le_bytes([ob[cs + 4], ob[cs + 5], ob[cs + 6], ob[cs + 7]]) as usize;
179 let remaining = total - payload.len();
180 let take = len.min(remaining).min(META_V3_OVERFLOW_PAYLOAD_CAP);
181 payload.extend_from_slice(
182 &ob[cs + META_V3_OVERFLOW_HEADER..cs + META_V3_OVERFLOW_HEADER + take],
183 );
184 next = nn;
185 }
186 Some(payload)
187}
188
189impl UnifiedStore {
190 pub(crate) fn mark_paged_registry_dirty(&self) {
191 self.paged_registry_dirty.store(true, Ordering::Release);
192 }
193
194 pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
200 let pager = self.pager.as_ref()?;
201 if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
202 return Some(btree);
203 }
204 let mut write = self.btree_indices.write();
205 let btree = write
206 .entry(collection.to_string())
207 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
208 .clone();
209 Some(btree)
210 }
211
212 pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
213 let Some(pager) = &self.pager else {
214 return Ok(());
215 };
216
217 if self.paged_registry_dirty.load(Ordering::Acquire) {
218 self.flush_paged_registry()?;
219 self.paged_registry_dirty.store(false, Ordering::Release);
220 return Ok(());
221 }
222
223 pager
224 .flush()
225 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
226 }
227
228 pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
229 let Some(pager) = &self.pager else {
230 return Ok(());
231 };
232
233 match pager.read_page(1) {
234 Ok(_) => {}
235 Err(PagerError::PageNotFound(_)) => {
236 let meta_page = pager
237 .allocate_page(crate::storage::engine::PageType::Header)
238 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
239 pager
240 .write_page(meta_page.page_id(), meta_page)
241 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
242 }
243 Err(e) => {
244 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
245 }
246 }
247
248 let format_version = STORE_VERSION_V9;
249 self.set_format_version(format_version);
250
251 let collections = self.collections.read();
252 let btree_indices = self.btree_indices.read();
253 let mut collection_roots = Vec::with_capacity(collections.len());
254 for (name, _) in collections.iter() {
255 let root_page = btree_indices
256 .get(name)
257 .map_or(0, |btree| btree.root_page_id());
258 collection_roots.push((name.clone(), root_page));
259 }
260 drop(btree_indices);
261 drop(collections);
262
263 let mut meta_data = Vec::with_capacity(4096);
264 meta_data.extend_from_slice(METADATA_MAGIC);
265 meta_data.extend_from_slice(&format_version.to_le_bytes());
266 meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
267 for (name, root_page) in &collection_roots {
268 meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
269 meta_data.extend_from_slice(name.as_bytes());
270 meta_data.extend_from_slice(&root_page.to_le_bytes());
271 }
272
273 let cross_refs = self.cross_refs.read();
274 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
275 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
276 for (source_id, refs) in cross_refs.iter() {
277 for (target_id, ref_type, collection) in refs {
278 meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
279 meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
280 meta_data.push(ref_type.to_byte());
281 meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
282 meta_data.extend_from_slice(collection.as_bytes());
283 }
284 }
285
286 let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
287 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
288
289 pager
290 .write_meta_shadow(&meta_page)
291 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
292 pager
293 .write_page(1, meta_page)
294 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
295 pager
296 .flush()
297 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
298
299 Ok(())
300 }
301
302 pub fn pager(&self) -> Option<&Arc<Pager>> {
304 self.pager.as_ref()
305 }
306
307 pub fn config(&self) -> &UnifiedStoreConfig {
311 &self.config
312 }
313
314 pub fn with_config(config: UnifiedStoreConfig) -> Self {
315 Self {
316 config,
317 format_version: AtomicU32::new(STORE_VERSION_V9),
318 next_entity_id: AtomicU64::new(1),
319 collections: RwLock::new(HashMap::new()),
320 cross_refs: RwLock::new(HashMap::new()),
321 reverse_refs: RwLock::new(HashMap::new()),
322 pager: None,
323 db_path: None,
324 btree_indices: RwLock::new(HashMap::new()),
325 context_index: ContextIndex::new(),
326 entity_cache: EntityCache::new(),
327 graph_label_index: RwLock::new(HashMap::new()),
328 paged_registry_dirty: AtomicBool::new(false),
329 commit: None,
330 unindex_cross_refs_fast_path: AtomicU64::new(0),
331 replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
332 }
333 }
334
335 pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
353 Self::open_with_config(path, UnifiedStoreConfig::default())
354 }
355
356 pub fn open_with_config(
357 path: impl AsRef<Path>,
358 config: UnifiedStoreConfig,
359 ) -> Result<Self, StoreError> {
360 let path = path.as_ref();
361 let mut pager_config = PagerConfig::default();
362 if matches!(
370 std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
371 Some("0") | Some("false") | Some("off")
372 ) {
373 pager_config.double_write = false;
374 }
375 let pager = Pager::open(path, pager_config)
376 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
377
378 let wal_path = Self::wal_path_for_db(path);
379 let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
380 Some(Arc::new(
381 StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
382 .map_err(StoreError::Io)?,
383 ))
384 } else {
385 None
386 };
387
388 let store = Self {
389 config,
390 format_version: AtomicU32::new(STORE_VERSION_V9),
391 next_entity_id: AtomicU64::new(1),
392 collections: RwLock::new(HashMap::new()),
393 cross_refs: RwLock::new(HashMap::new()),
394 reverse_refs: RwLock::new(HashMap::new()),
395 pager: Some(Arc::new(pager)),
396 db_path: Some(path.to_path_buf()),
397 btree_indices: RwLock::new(HashMap::new()),
398 context_index: ContextIndex::new(),
399 entity_cache: EntityCache::new(),
400 graph_label_index: RwLock::new(HashMap::new()),
401 paged_registry_dirty: AtomicBool::new(false),
402 commit,
403 unindex_cross_refs_fast_path: AtomicU64::new(0),
404 replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
405 };
406
407 store.load_from_pages()?;
409 if let Some(commit) = &store.commit {
410 commit.replay_into(&store).map_err(StoreError::Io)?;
411 }
412
413 Ok(store)
414 }
415
416 fn load_from_pages(&self) -> Result<(), StoreError> {
420 let pager = match &self.pager {
421 Some(p) => p,
422 None => return Ok(()), };
424
425 let page_count = pager.page_count().map_err(|e| {
427 StoreError::Io(std::io::Error::other(format!(
428 "failed to read page count: {}",
429 e
430 )))
431 })?;
432 if page_count <= 1 {
433 return Ok(());
435 }
436
437 if let Some(content_vec) = read_meta_payload(pager) {
442 let content: &[u8] = &content_vec;
443 if content.len() >= 4 {
444 let mut pos = 0;
445 let mut format_version = STORE_VERSION_V1;
446
447 if content.len() >= 8 && &content[0..4] == METADATA_MAGIC {
448 format_version =
449 u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
450 pos += 8;
451 }
452
453 self.set_format_version(format_version);
454
455 let collection_count = u32::from_le_bytes([
457 content[pos],
458 content[pos + 1],
459 content[pos + 2],
460 content[pos + 3],
461 ]) as usize;
462 pos += 4;
463
464 for _ in 0..collection_count {
466 if pos + 4 > content.len() {
467 break;
468 }
469
470 let name_len = u32::from_le_bytes([
471 content[pos],
472 content[pos + 1],
473 content[pos + 2],
474 content[pos + 3],
475 ]) as usize;
476 pos += 4;
477
478 if pos + name_len + 4 > content.len() {
479 break;
480 }
481
482 if let Ok(name) = String::from_utf8(content[pos..pos + name_len].to_vec()) {
483 pos += name_len;
484
485 let root_page = u32::from_le_bytes([
487 content[pos],
488 content[pos + 1],
489 content[pos + 2],
490 content[pos + 3],
491 ]);
492 pos += 4;
493
494 let _ = self.create_collection_in_memory(&name);
498
499 if root_page > 0 {
501 let btree = BTree::with_root(Arc::clone(pager), root_page);
502
503 if let Ok(mut cursor) = btree.cursor_first() {
505 let manager = self.get_collection(&name);
506 while let Ok(Some((key, value))) = cursor.next() {
507 if let Ok((entity, metadata)) = Self::deserialize_entity_record(
509 &value,
510 self.format_version(),
511 ) {
512 if let Some(m) = &manager {
513 let id = entity.id;
514 if let EntityKind::TableRow { row_id, .. } =
515 &entity.kind
516 {
517 m.register_row_id(*row_id);
518 }
519 self.context_index.index_entity(&name, &entity);
520 let _ = m.insert(entity.clone());
521 if let Some(metadata) = metadata {
522 let _ = m.set_metadata(id, metadata);
523 }
524 self.register_entity_id(id);
525 if self.config.auto_index_refs {
526 self.index_cross_refs(&entity, &name)?;
527 }
528 }
529 }
530 }
531 }
532
533 self.btree_indices.write().insert(name, Arc::new(btree));
535 }
536 } else {
537 pos += name_len + 4;
538 }
539 }
540
541 if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
542 let cross_ref_count = u32::from_le_bytes([
543 content[pos],
544 content[pos + 1],
545 content[pos + 2],
546 content[pos + 3],
547 ]) as usize;
548 pos += 4;
549
550 for _ in 0..cross_ref_count {
551 if pos + 17 > content.len() {
552 break;
553 }
554 let source_id = u64::from_le_bytes([
555 content[pos],
556 content[pos + 1],
557 content[pos + 2],
558 content[pos + 3],
559 content[pos + 4],
560 content[pos + 5],
561 content[pos + 6],
562 content[pos + 7],
563 ]);
564 pos += 8;
565 let target_id = u64::from_le_bytes([
566 content[pos],
567 content[pos + 1],
568 content[pos + 2],
569 content[pos + 3],
570 content[pos + 4],
571 content[pos + 5],
572 content[pos + 6],
573 content[pos + 7],
574 ]);
575 pos += 8;
576 let ref_type = RefType::from_byte(content[pos]);
577 pos += 1;
578
579 if pos + 4 > content.len() {
580 break;
581 }
582 let name_len = u32::from_le_bytes([
583 content[pos],
584 content[pos + 1],
585 content[pos + 2],
586 content[pos + 3],
587 ]) as usize;
588 pos += 4;
589 if pos + name_len > content.len() {
590 break;
591 }
592 let target_collection =
593 String::from_utf8_lossy(&content[pos..pos + name_len]).to_string();
594 pos += name_len;
595
596 let source_id = EntityId::new(source_id);
597 let target_id = EntityId::new(target_id);
598
599 self.cross_refs.write().entry(source_id).or_default().push((
600 target_id,
601 ref_type,
602 target_collection.clone(),
603 ));
604
605 if let Some((collection, mut entity)) = self.get_any(source_id) {
606 let exists = entity.cross_refs().iter().any(|xref| {
607 xref.target == target_id
608 && xref.ref_type == ref_type
609 && xref.target_collection == target_collection
610 });
611 if !exists {
612 entity.cross_refs_mut().push(CrossRef::new(
613 source_id,
614 target_id,
615 target_collection.clone(),
616 ref_type,
617 ));
618 if let Some(manager) = self.get_collection(&collection) {
619 let _ = manager.update(entity);
620 }
621 }
622 }
623 }
624 }
625 }
626 }
627
628 if self.format_version() < STORE_VERSION_V9 {
629 self.set_format_version(STORE_VERSION_V9);
630 }
631
632 Ok(())
633 }
634
635 pub(crate) fn deserialize_entity(
637 data: &[u8],
638 format_version: u32,
639 ) -> Result<UnifiedEntity, StoreError> {
640 let mut pos = 0;
641 Self::read_entity_binary(data, &mut pos, format_version)
642 .map_err(|e| StoreError::Serialization(e.to_string()))
643 }
644
645 pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
647 let mut buf = Vec::with_capacity(256);
652 Self::write_entity_binary(&mut buf, entity, format_version);
653 buf
654 }
655
656 pub(crate) fn serialize_entity_record(
657 entity: &UnifiedEntity,
658 metadata: Option<&Metadata>,
659 format_version: u32,
660 ) -> Vec<u8> {
661 let entity_bytes = Self::serialize_entity(entity, format_version);
662 let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
668 if has_meta {
669 let metadata_bytes = serialize_metadata(metadata);
670 let mut buf = Vec::with_capacity(12 + entity_bytes.len() + metadata_bytes.len());
671 buf.extend_from_slice(ENTITY_RECORD_MAGIC);
672 buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
673 buf.extend_from_slice(&entity_bytes);
674 buf.extend_from_slice(&(metadata_bytes.len() as u32).to_le_bytes());
675 buf.extend_from_slice(&metadata_bytes);
676 buf
677 } else {
678 let mut buf = Vec::with_capacity(12 + entity_bytes.len());
679 buf.extend_from_slice(ENTITY_RECORD_MAGIC);
680 buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
681 buf.extend_from_slice(&entity_bytes);
682 buf.extend_from_slice(&0u32.to_le_bytes());
683 buf
684 }
685 }
686
687 pub(crate) fn deserialize_entity_record(
688 data: &[u8],
689 format_version: u32,
690 ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
691 if data.len() < 8 || &data[..4] != ENTITY_RECORD_MAGIC {
692 return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
693 }
694
695 let mut pos = 4usize;
696 let entity_len = read_u32(data, &mut pos)? as usize;
697 if pos + entity_len > data.len() {
698 return Err(StoreError::Serialization(
699 "truncated entity record payload".to_string(),
700 ));
701 }
702 let entity = Self::deserialize_entity(&data[pos..pos + entity_len], format_version)?;
703 pos += entity_len;
704
705 let metadata_len = read_u32(data, &mut pos)? as usize;
706 if pos + metadata_len > data.len() {
707 return Err(StoreError::Serialization(
708 "truncated entity record metadata".to_string(),
709 ));
710 }
711 let metadata = if metadata_len == 0 {
712 None
713 } else {
714 let metadata = deserialize_metadata(&data[pos..pos + metadata_len])?;
715 if metadata.is_empty() {
716 None
717 } else {
718 Some(metadata)
719 }
720 };
721
722 Ok((entity, metadata))
723 }
724
725 pub fn persist(&self) -> Result<(), StoreError> {
730 let pager = match &self.pager {
731 Some(p) => p,
732 None => {
733 if let Some(path) = &self.db_path {
735 return self
736 .save_to_file(path)
737 .map_err(|e| StoreError::Serialization(e.to_string()));
738 }
739 return Err(StoreError::Io(std::io::Error::other(
740 "No pager or path configured for persistence",
741 )));
742 }
743 };
744
745 match pager.read_page(1) {
746 Ok(_) => {}
747 Err(PagerError::PageNotFound(_)) => {
748 let meta_page = pager
749 .allocate_page(crate::storage::engine::PageType::Header)
750 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
751 pager
752 .write_page(meta_page.page_id(), meta_page)
753 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
754 }
755 Err(e) => {
756 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
757 }
758 }
759
760 if let Some(commit) = &self.commit {
761 commit.force_sync().map_err(StoreError::Io)?;
762 }
763
764 let collections = self.collections.read();
765 let mut btree_indices = self.btree_indices.write();
766
767 let mut collection_roots: Vec<(String, u32)> = Vec::new();
769
770 for (name, manager) in collections.iter() {
773 let btree = btree_indices
774 .entry(name.clone())
775 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
776
777 let mut existing_keys = Vec::new();
778 if !btree.is_empty() {
779 let mut cursor = btree.cursor_first().map_err(|e| {
780 StoreError::Io(std::io::Error::other(format!(
781 "B-tree cursor error while rebuilding '{name}': {e}"
782 )))
783 })?;
784 while let Some((key, _)) = cursor.next().map_err(|e| {
785 StoreError::Io(std::io::Error::other(format!(
786 "B-tree scan error while rebuilding '{name}': {e}"
787 )))
788 })? {
789 existing_keys.push(key);
790 }
791 }
792
793 for key in existing_keys {
794 btree.delete(&key).map_err(|e| {
795 StoreError::Io(std::io::Error::other(format!(
796 "B-tree delete error while rebuilding '{name}': {e}"
797 )))
798 })?;
799 }
800
801 let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
802 .query_all(|_| true)
803 .into_iter()
804 .map(|entity| {
805 let metadata = manager.get_metadata(entity.id);
806 (
807 entity.id.raw().to_be_bytes().to_vec(),
808 Self::serialize_entity_record(
809 &entity,
810 metadata.as_ref(),
811 self.format_version(),
812 ),
813 )
814 })
815 .collect();
816 records.sort_by(|left, right| left.0.cmp(&right.0));
817
818 let max_value_size = crate::storage::engine::btree::MAX_VALUE_SIZE;
830 let original_len = records.len();
831 records.retain(|(_, value)| {
832 if value.len() > max_value_size {
833 tracing::warn!(
837 collection = %reddb_wire::audit_safe_log_field(name),
838 bytes = value.len(),
839 max = max_value_size,
840 "skipping oversized row during B-tree bulk rebuild"
841 );
842 false
843 } else {
844 true
845 }
846 });
847 let dropped = original_len - records.len();
848 if dropped > 0 {
849 let safe_name = format!("{}", reddb_wire::audit_safe_log_field(name));
853 tracing::warn!(
854 collection = %safe_name,
855 dropped,
856 "dropped {dropped} oversized row(s) from '{safe_name}' on rebuild — \
857 the rows remain readable via the in-memory entity store but \
858 are absent from the on-disk B-tree until they are rewritten"
859 );
860 }
861
862 if !records.is_empty() {
863 btree.bulk_insert_sorted(&records).map_err(|e| {
864 StoreError::Io(std::io::Error::other(format!(
865 "B-tree bulk rebuild error for '{name}': {e}"
866 )))
867 })?;
868 }
869
870 collection_roots.push((name.clone(), btree.root_page_id()));
871 }
872
873 let mut meta_data = Vec::with_capacity(4096);
875
876 let format_version = STORE_VERSION_V9;
877 self.set_format_version(format_version);
878
879 meta_data.extend_from_slice(METADATA_MAGIC);
881 meta_data.extend_from_slice(&format_version.to_le_bytes());
882 meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
883
884 for (name, root_page) in &collection_roots {
886 meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
888 meta_data.extend_from_slice(name.as_bytes());
890 meta_data.extend_from_slice(&root_page.to_le_bytes());
892 }
893
894 let cross_refs = self.cross_refs.read();
896 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
897 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
898 for (source_id, refs) in cross_refs.iter() {
899 for (target_id, ref_type, collection) in refs {
900 meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
901 meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
902 meta_data.push(ref_type.to_byte());
903 meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
904 meta_data.extend_from_slice(collection.as_bytes());
905 }
906 }
907
908 let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
910 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
911
912 pager
915 .write_meta_shadow(&meta_page)
916 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
917
918 pager
920 .write_page(1, meta_page)
921 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
922
923 pager
925 .sync()
926 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
927
928 if let Some(commit) = &self.commit {
929 commit.truncate().map_err(StoreError::Io)?;
930 }
931
932 Ok(())
933 }
934
935 pub fn is_paged(&self) -> bool {
937 self.pager.is_some()
938 }
939
940 pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
943 self.btree_indices
944 .read()
945 .get(collection)
946 .map(|btree| btree.root_page_id())
947 .filter(|root| *root != 0)
948 }
949
950 pub fn db_path(&self) -> Option<&Path> {
952 self.db_path.as_deref()
953 }
954}
955
956fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
957 let Some(metadata) = metadata else {
958 return Vec::new();
959 };
960 if metadata.is_empty() {
961 return Vec::new();
962 }
963
964 let mut entries: Vec<_> = metadata.iter().collect();
965 entries.sort_by_key(|(a, _)| *a);
966
967 let mut buf = Vec::new();
968 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
969 for (key, value) in entries {
970 write_string(&mut buf, key);
971 write_metadata_value(&mut buf, value);
972 }
973 buf
974}
975
976fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
977 let mut pos = 0usize;
978 let count = read_u32(data, &mut pos)? as usize;
979 let mut metadata = Metadata::new();
980 for _ in 0..count {
981 let key = read_string(data, &mut pos)?;
982 let value = read_metadata_value(data, &mut pos)?;
983 metadata.set(key, value);
984 }
985 Ok(metadata)
986}
987
988fn write_string(buf: &mut Vec<u8>, value: &str) {
989 buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
990 buf.extend_from_slice(value.as_bytes());
991}
992
993fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
994 buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
995 buf.extend_from_slice(value);
996}
997
998fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
999 use crate::storage::unified::RefTarget;
1000
1001 match target {
1002 RefTarget::TableRow { table, row_id } => {
1003 buf.push(0);
1004 write_string(buf, table);
1005 buf.extend_from_slice(&row_id.to_le_bytes());
1006 }
1007 RefTarget::Node {
1008 collection,
1009 node_id,
1010 } => {
1011 buf.push(1);
1012 write_string(buf, collection);
1013 buf.extend_from_slice(&node_id.raw().to_le_bytes());
1014 }
1015 RefTarget::Edge {
1016 collection,
1017 edge_id,
1018 } => {
1019 buf.push(2);
1020 write_string(buf, collection);
1021 buf.extend_from_slice(&edge_id.raw().to_le_bytes());
1022 }
1023 RefTarget::Vector {
1024 collection,
1025 vector_id,
1026 } => {
1027 buf.push(3);
1028 write_string(buf, collection);
1029 buf.extend_from_slice(&vector_id.raw().to_le_bytes());
1030 }
1031 RefTarget::Entity {
1032 collection,
1033 entity_id,
1034 } => {
1035 buf.push(4);
1036 write_string(buf, collection);
1037 buf.extend_from_slice(&entity_id.raw().to_le_bytes());
1038 }
1039 }
1040}
1041
1042fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
1043 match value {
1044 MetadataValue::Null => buf.push(0),
1045 MetadataValue::Bool(v) => {
1046 buf.push(1);
1047 buf.push(u8::from(*v));
1048 }
1049 MetadataValue::Int(v) => {
1050 buf.push(2);
1051 buf.extend_from_slice(&v.to_le_bytes());
1052 }
1053 MetadataValue::Float(v) => {
1054 buf.push(3);
1055 buf.extend_from_slice(&v.to_le_bytes());
1056 }
1057 MetadataValue::String(v) => {
1058 buf.push(4);
1059 write_string(buf, v);
1060 }
1061 MetadataValue::Bytes(v) => {
1062 buf.push(5);
1063 write_bytes(buf, v);
1064 }
1065 MetadataValue::Array(values) => {
1066 buf.push(6);
1067 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1068 for value in values {
1069 write_metadata_value(buf, value);
1070 }
1071 }
1072 MetadataValue::Object(values) => {
1073 buf.push(7);
1074 let mut entries: Vec<_> = values.iter().collect();
1075 entries.sort_by_key(|(a, _)| *a);
1076 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
1077 for (key, value) in entries {
1078 write_string(buf, key);
1079 write_metadata_value(buf, value);
1080 }
1081 }
1082 MetadataValue::Timestamp(v) => {
1083 buf.push(8);
1084 buf.extend_from_slice(&v.to_le_bytes());
1085 }
1086 MetadataValue::Geo { lat, lon } => {
1087 buf.push(9);
1088 buf.extend_from_slice(&lat.to_le_bytes());
1089 buf.extend_from_slice(&lon.to_le_bytes());
1090 }
1091 MetadataValue::Reference(target) => {
1092 buf.push(10);
1093 write_ref_target(buf, target);
1094 }
1095 MetadataValue::References(targets) => {
1096 buf.push(11);
1097 buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
1098 for target in targets {
1099 write_ref_target(buf, target);
1100 }
1101 }
1102 }
1103}
1104
1105fn read_exact_slice<'a>(
1106 data: &'a [u8],
1107 pos: &mut usize,
1108 len: usize,
1109) -> Result<&'a [u8], StoreError> {
1110 if *pos + len > data.len() {
1111 return Err(StoreError::Serialization(
1112 "truncated metadata payload".to_string(),
1113 ));
1114 }
1115 let slice = &data[*pos..*pos + len];
1116 *pos += len;
1117 Ok(slice)
1118}
1119
1120fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
1121 let bytes = read_exact_slice(data, pos, 4)?;
1122 let mut raw = [0u8; 4];
1123 raw.copy_from_slice(bytes);
1124 Ok(u32::from_le_bytes(raw))
1125}
1126
1127fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
1128 let bytes = read_exact_slice(data, pos, 8)?;
1129 let mut raw = [0u8; 8];
1130 raw.copy_from_slice(bytes);
1131 Ok(u64::from_le_bytes(raw))
1132}
1133
1134fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
1135 let bytes = read_exact_slice(data, pos, 8)?;
1136 let mut raw = [0u8; 8];
1137 raw.copy_from_slice(bytes);
1138 Ok(i64::from_le_bytes(raw))
1139}
1140
1141fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
1142 let bytes = read_exact_slice(data, pos, 8)?;
1143 let mut raw = [0u8; 8];
1144 raw.copy_from_slice(bytes);
1145 Ok(f64::from_le_bytes(raw))
1146}
1147
1148fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
1149 let bytes = read_exact_slice(data, pos, 1)?;
1150 Ok(bytes[0])
1151}
1152
1153fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
1154 let len = read_u32(data, pos)? as usize;
1155 let bytes = read_exact_slice(data, pos, len)?;
1156 String::from_utf8(bytes.to_vec()).map_err(|err| StoreError::Serialization(err.to_string()))
1157}
1158
1159fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
1160 let len = read_u32(data, pos)? as usize;
1161 Ok(read_exact_slice(data, pos, len)?.to_vec())
1162}
1163
1164fn read_ref_target(
1165 data: &[u8],
1166 pos: &mut usize,
1167) -> Result<crate::storage::unified::RefTarget, StoreError> {
1168 use crate::storage::unified::RefTarget;
1169
1170 match read_u8(data, pos)? {
1171 0 => Ok(RefTarget::TableRow {
1172 table: read_string(data, pos)?,
1173 row_id: read_u64(data, pos)?,
1174 }),
1175 1 => Ok(RefTarget::Node {
1176 collection: read_string(data, pos)?,
1177 node_id: EntityId::new(read_u64(data, pos)?),
1178 }),
1179 2 => Ok(RefTarget::Edge {
1180 collection: read_string(data, pos)?,
1181 edge_id: EntityId::new(read_u64(data, pos)?),
1182 }),
1183 3 => Ok(RefTarget::Vector {
1184 collection: read_string(data, pos)?,
1185 vector_id: EntityId::new(read_u64(data, pos)?),
1186 }),
1187 4 => Ok(RefTarget::Entity {
1188 collection: read_string(data, pos)?,
1189 entity_id: EntityId::new(read_u64(data, pos)?),
1190 }),
1191 tag => Err(StoreError::Serialization(format!(
1192 "unknown metadata ref target tag {tag}"
1193 ))),
1194 }
1195}
1196
1197fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1198 match read_u8(data, pos)? {
1199 0 => Ok(MetadataValue::Null),
1200 1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1201 2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1202 3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1203 4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1204 5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1205 6 => {
1206 let count = read_u32(data, pos)? as usize;
1207 let mut values = Vec::with_capacity(count);
1208 for _ in 0..count {
1209 values.push(read_metadata_value(data, pos)?);
1210 }
1211 Ok(MetadataValue::Array(values))
1212 }
1213 7 => {
1214 let count = read_u32(data, pos)? as usize;
1215 let mut values = std::collections::HashMap::with_capacity(count);
1216 for _ in 0..count {
1217 let key = read_string(data, pos)?;
1218 let value = read_metadata_value(data, pos)?;
1219 values.insert(key, value);
1220 }
1221 Ok(MetadataValue::Object(values))
1222 }
1223 8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1224 9 => Ok(MetadataValue::Geo {
1225 lat: read_f64(data, pos)?,
1226 lon: read_f64(data, pos)?,
1227 }),
1228 10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1229 11 => {
1230 let count = read_u32(data, pos)? as usize;
1231 let mut targets = Vec::with_capacity(count);
1232 for _ in 0..count {
1233 targets.push(read_ref_target(data, pos)?);
1234 }
1235 Ok(MetadataValue::References(targets))
1236 }
1237 tag => Err(StoreError::Serialization(format!(
1238 "unknown metadata value tag {tag}"
1239 ))),
1240 }
1241}