1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5const ENTITY_RECORD_MAGIC: &[u8; 4] = b"RER1";
6
7const METADATA_OVERFLOW_MAGIC: &[u8; 4] = b"RDM3";
26const META_PAGE_CONTENT_CAP: usize =
27 crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE;
28const META_V3_PAGE1_HEADER: usize = 16;
29const META_V3_OVERFLOW_HEADER: usize = 8;
30const META_V3_FIRST_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_PAGE1_HEADER;
31const META_V3_OVERFLOW_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_OVERFLOW_HEADER;
32
33fn free_existing_overflow_chain(pager: &Pager) -> Result<(), PagerError> {
34 let cs = crate::storage::engine::HEADER_SIZE;
35 let page = match pager.read_page(1) {
36 Ok(p) => p,
37 Err(_) => return Ok(()),
38 };
39 let bytes = page.as_bytes();
40 if bytes.len() < cs + META_V3_PAGE1_HEADER {
41 return Ok(());
42 }
43 if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
44 return Ok(());
45 }
46 let mut next = u32::from_le_bytes([
47 bytes[cs + 12],
48 bytes[cs + 13],
49 bytes[cs + 14],
50 bytes[cs + 15],
51 ]);
52 while next != 0 {
53 let ov = match pager.read_page(next) {
54 Ok(p) => p,
55 Err(_) => break,
56 };
57 let ob = ov.as_bytes();
58 let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
59 let _ = pager.free_page(next);
60 next = nn;
61 }
62 Ok(())
63}
64
65fn build_meta_page1_with_overflow(
66 pager: &Pager,
67 meta_data: &[u8],
68) -> Result<crate::storage::engine::Page, PagerError> {
69 use crate::storage::engine::{Page, PageType, HEADER_SIZE};
70 free_existing_overflow_chain(pager)?;
71
72 let mut page1 = Page::new(PageType::Header, 1);
73 let cs = HEADER_SIZE;
74
75 if meta_data.len() <= META_PAGE_CONTENT_CAP {
76 let buf = page1.as_bytes_mut();
78 buf[cs..cs + meta_data.len()].copy_from_slice(meta_data);
79 return Ok(page1);
80 }
81
82 let first_chunk = &meta_data[..META_V3_FIRST_PAYLOAD_CAP];
86 let mut tail = &meta_data[META_V3_FIRST_PAYLOAD_CAP..];
87 let mut chunks: Vec<&[u8]> = Vec::new();
88 while !tail.is_empty() {
89 let take = tail.len().min(META_V3_OVERFLOW_PAYLOAD_CAP);
90 chunks.push(&tail[..take]);
91 tail = &tail[take..];
92 }
93
94 let mut overflow_pages: Vec<Page> = Vec::with_capacity(chunks.len());
95 let mut overflow_ids: Vec<u32> = Vec::with_capacity(chunks.len());
96 for _ in 0..chunks.len() {
97 let pg = pager.allocate_page(PageType::Overflow)?;
98 overflow_ids.push(pg.page_id());
99 overflow_pages.push(pg);
100 }
101
102 for i in 0..chunks.len() {
103 let next = if i + 1 < chunks.len() {
104 overflow_ids[i + 1]
105 } else {
106 0u32
107 };
108 let len = chunks[i].len() as u32;
109 let buf = overflow_pages[i].as_bytes_mut();
110 buf[cs..cs + 4].copy_from_slice(&next.to_le_bytes());
111 buf[cs + 4..cs + 8].copy_from_slice(&len.to_le_bytes());
112 buf[cs + 8..cs + 8 + chunks[i].len()].copy_from_slice(chunks[i]);
113 }
114 for (idx, page) in overflow_pages.into_iter().enumerate() {
115 let id = overflow_ids[idx];
116 pager.write_page(id, page)?;
117 }
118
119 let format_version = if meta_data.len() >= 8 && &meta_data[0..4] == METADATA_MAGIC {
121 u32::from_le_bytes([meta_data[4], meta_data[5], meta_data[6], meta_data[7]])
122 } else {
123 0
124 };
125
126 let buf = page1.as_bytes_mut();
127 buf[cs..cs + 4].copy_from_slice(METADATA_OVERFLOW_MAGIC);
128 buf[cs + 4..cs + 8].copy_from_slice(&format_version.to_le_bytes());
129 buf[cs + 8..cs + 12].copy_from_slice(&(meta_data.len() as u32).to_le_bytes());
130 buf[cs + 12..cs + 16].copy_from_slice(&overflow_ids[0].to_le_bytes());
131 buf[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_chunk.len()]
132 .copy_from_slice(first_chunk);
133
134 Ok(page1)
135}
136
137fn read_meta_payload(pager: &Pager) -> Option<Vec<u8>> {
143 let cs = crate::storage::engine::HEADER_SIZE;
144 let meta_page = pager
145 .read_page(1)
146 .or_else(|_| pager.recover_meta_from_shadow())
147 .ok()?;
148 let bytes = meta_page.as_bytes();
149 if bytes.len() < cs + 4 {
150 return Some(bytes.get(cs..).unwrap_or(&[]).to_vec());
151 }
152 if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
153 return Some(bytes[cs..].to_vec());
154 }
155 if bytes.len() < cs + META_V3_PAGE1_HEADER {
156 return None;
157 }
158 let total =
159 u32::from_le_bytes([bytes[cs + 8], bytes[cs + 9], bytes[cs + 10], bytes[cs + 11]]) as usize;
160 let mut next = u32::from_le_bytes([
161 bytes[cs + 12],
162 bytes[cs + 13],
163 bytes[cs + 14],
164 bytes[cs + 15],
165 ]);
166 let mut payload: Vec<u8> = Vec::with_capacity(total);
167 let first_take = total.min(META_V3_FIRST_PAYLOAD_CAP);
168 payload.extend_from_slice(
169 &bytes[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_take],
170 );
171 while next != 0 && payload.len() < total {
172 let ov = pager.read_page(next).ok()?;
173 let ob = ov.as_bytes();
174 if ob.len() < cs + META_V3_OVERFLOW_HEADER {
175 return None;
176 }
177 let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
178 let len =
179 u32::from_le_bytes([ob[cs + 4], ob[cs + 5], ob[cs + 6], ob[cs + 7]]) as usize;
180 let remaining = total - payload.len();
181 let take = len.min(remaining).min(META_V3_OVERFLOW_PAYLOAD_CAP);
182 payload.extend_from_slice(&ob[cs + META_V3_OVERFLOW_HEADER..cs + META_V3_OVERFLOW_HEADER + take]);
183 next = nn;
184 }
185 Some(payload)
186}
187
188impl UnifiedStore {
189 pub(crate) fn mark_paged_registry_dirty(&self) {
190 self.paged_registry_dirty.store(true, Ordering::Release);
191 }
192
193 pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
199 let pager = self.pager.as_ref()?;
200 if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
201 return Some(btree);
202 }
203 let mut write = self.btree_indices.write();
204 let btree = write
205 .entry(collection.to_string())
206 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
207 .clone();
208 Some(btree)
209 }
210
211 pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
212 let Some(pager) = &self.pager else {
213 return Ok(());
214 };
215
216 if self.paged_registry_dirty.load(Ordering::Acquire) {
217 self.flush_paged_registry()?;
218 self.paged_registry_dirty.store(false, Ordering::Release);
219 return Ok(());
220 }
221
222 pager
223 .flush()
224 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
225 }
226
227 pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
228 let Some(pager) = &self.pager else {
229 return Ok(());
230 };
231
232 match pager.read_page(1) {
233 Ok(_) => {}
234 Err(PagerError::PageNotFound(_)) => {
235 let meta_page = pager
236 .allocate_page(crate::storage::engine::PageType::Header)
237 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
238 pager
239 .write_page(meta_page.page_id(), meta_page)
240 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
241 }
242 Err(e) => {
243 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
244 }
245 }
246
247 let format_version = STORE_VERSION_V9;
248 self.set_format_version(format_version);
249
250 let collections = self.collections.read();
251 let btree_indices = self.btree_indices.read();
252 let mut collection_roots = Vec::with_capacity(collections.len());
253 for (name, _) in collections.iter() {
254 let root_page = btree_indices
255 .get(name)
256 .map_or(0, |btree| btree.root_page_id());
257 collection_roots.push((name.clone(), root_page));
258 }
259 drop(btree_indices);
260 drop(collections);
261
262 let mut meta_data = Vec::with_capacity(4096);
263 meta_data.extend_from_slice(METADATA_MAGIC);
264 meta_data.extend_from_slice(&format_version.to_le_bytes());
265 meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
266 for (name, root_page) in &collection_roots {
267 meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
268 meta_data.extend_from_slice(name.as_bytes());
269 meta_data.extend_from_slice(&root_page.to_le_bytes());
270 }
271
272 let cross_refs = self.cross_refs.read();
273 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
274 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
275 for (source_id, refs) in cross_refs.iter() {
276 for (target_id, ref_type, collection) in refs {
277 meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
278 meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
279 meta_data.push(ref_type.to_byte());
280 meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
281 meta_data.extend_from_slice(collection.as_bytes());
282 }
283 }
284
285 let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
286 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
287
288 pager
289 .write_meta_shadow(&meta_page)
290 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
291 pager
292 .write_page(1, meta_page)
293 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
294 pager
295 .flush()
296 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
297
298 Ok(())
299 }
300
301 pub fn pager(&self) -> Option<&Arc<Pager>> {
303 self.pager.as_ref()
304 }
305
306 pub fn config(&self) -> &UnifiedStoreConfig {
310 &self.config
311 }
312
313 pub fn with_config(config: UnifiedStoreConfig) -> Self {
314 Self {
315 config,
316 format_version: AtomicU32::new(STORE_VERSION_V9),
317 next_entity_id: AtomicU64::new(1),
318 collections: RwLock::new(HashMap::new()),
319 cross_refs: RwLock::new(HashMap::new()),
320 reverse_refs: RwLock::new(HashMap::new()),
321 pager: None,
322 db_path: None,
323 btree_indices: RwLock::new(HashMap::new()),
324 context_index: ContextIndex::new(),
325 entity_cache: EntityCache::new(),
326 graph_label_index: RwLock::new(HashMap::new()),
327 paged_registry_dirty: AtomicBool::new(false),
328 commit: None,
329 unindex_cross_refs_fast_path: AtomicU64::new(0),
330 }
331 }
332
333 pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
351 Self::open_with_config(path, UnifiedStoreConfig::default())
352 }
353
354 pub fn open_with_config(
355 path: impl AsRef<Path>,
356 config: UnifiedStoreConfig,
357 ) -> Result<Self, StoreError> {
358 let path = path.as_ref();
359 let mut pager_config = PagerConfig::default();
360 if matches!(
368 std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
369 Some("0") | Some("false") | Some("off")
370 ) {
371 pager_config.double_write = false;
372 }
373 let pager = Pager::open(path, pager_config)
374 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
375
376 let wal_path = Self::wal_path_for_db(path);
377 let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
378 Some(Arc::new(
379 StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
380 .map_err(StoreError::Io)?,
381 ))
382 } else {
383 None
384 };
385
386 let store = Self {
387 config,
388 format_version: AtomicU32::new(STORE_VERSION_V9),
389 next_entity_id: AtomicU64::new(1),
390 collections: RwLock::new(HashMap::new()),
391 cross_refs: RwLock::new(HashMap::new()),
392 reverse_refs: RwLock::new(HashMap::new()),
393 pager: Some(Arc::new(pager)),
394 db_path: Some(path.to_path_buf()),
395 btree_indices: RwLock::new(HashMap::new()),
396 context_index: ContextIndex::new(),
397 entity_cache: EntityCache::new(),
398 graph_label_index: RwLock::new(HashMap::new()),
399 paged_registry_dirty: AtomicBool::new(false),
400 commit,
401 unindex_cross_refs_fast_path: AtomicU64::new(0),
402 };
403
404 store.load_from_pages()?;
406 if let Some(commit) = &store.commit {
407 commit.replay_into(&store).map_err(StoreError::Io)?;
408 }
409
410 Ok(store)
411 }
412
413 fn load_from_pages(&self) -> Result<(), StoreError> {
417 let pager = match &self.pager {
418 Some(p) => p,
419 None => return Ok(()), };
421
422 let page_count = pager.page_count().map_err(|e| {
424 StoreError::Io(std::io::Error::other(format!(
425 "failed to read page count: {}",
426 e
427 )))
428 })?;
429 if page_count <= 1 {
430 return Ok(());
432 }
433
434 if let Some(content_vec) = read_meta_payload(pager) {
439 let content: &[u8] = &content_vec;
440 if content.len() >= 4 {
441 let mut pos = 0;
442 let mut format_version = STORE_VERSION_V1;
443
444 if content.len() >= 8 && &content[0..4] == METADATA_MAGIC {
445 format_version =
446 u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
447 pos += 8;
448 }
449
450 self.set_format_version(format_version);
451
452 let collection_count = u32::from_le_bytes([
454 content[pos],
455 content[pos + 1],
456 content[pos + 2],
457 content[pos + 3],
458 ]) as usize;
459 pos += 4;
460
461 for _ in 0..collection_count {
463 if pos + 4 > content.len() {
464 break;
465 }
466
467 let name_len = u32::from_le_bytes([
468 content[pos],
469 content[pos + 1],
470 content[pos + 2],
471 content[pos + 3],
472 ]) as usize;
473 pos += 4;
474
475 if pos + name_len + 4 > content.len() {
476 break;
477 }
478
479 if let Ok(name) = String::from_utf8(content[pos..pos + name_len].to_vec()) {
480 pos += name_len;
481
482 let root_page = u32::from_le_bytes([
484 content[pos],
485 content[pos + 1],
486 content[pos + 2],
487 content[pos + 3],
488 ]);
489 pos += 4;
490
491 let _ = self.create_collection_in_memory(&name);
495
496 if root_page > 0 {
498 let btree = BTree::with_root(Arc::clone(pager), root_page);
499
500 if let Ok(mut cursor) = btree.cursor_first() {
502 let manager = self.get_collection(&name);
503 while let Ok(Some((key, value))) = cursor.next() {
504 if let Ok((entity, metadata)) = Self::deserialize_entity_record(
506 &value,
507 self.format_version(),
508 ) {
509 if let Some(m) = &manager {
510 let id = entity.id;
511 if let EntityKind::TableRow { row_id, .. } =
512 &entity.kind
513 {
514 m.register_row_id(*row_id);
515 }
516 self.context_index.index_entity(&name, &entity);
517 let _ = m.insert(entity.clone());
518 if let Some(metadata) = metadata {
519 let _ = m.set_metadata(id, metadata);
520 }
521 self.register_entity_id(id);
522 if self.config.auto_index_refs {
523 self.index_cross_refs(&entity, &name)?;
524 }
525 }
526 }
527 }
528 }
529
530 self.btree_indices.write().insert(name, Arc::new(btree));
532 }
533 } else {
534 pos += name_len + 4;
535 }
536 }
537
538 if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
539 let cross_ref_count = u32::from_le_bytes([
540 content[pos],
541 content[pos + 1],
542 content[pos + 2],
543 content[pos + 3],
544 ]) as usize;
545 pos += 4;
546
547 for _ in 0..cross_ref_count {
548 if pos + 17 > content.len() {
549 break;
550 }
551 let source_id = u64::from_le_bytes([
552 content[pos],
553 content[pos + 1],
554 content[pos + 2],
555 content[pos + 3],
556 content[pos + 4],
557 content[pos + 5],
558 content[pos + 6],
559 content[pos + 7],
560 ]);
561 pos += 8;
562 let target_id = u64::from_le_bytes([
563 content[pos],
564 content[pos + 1],
565 content[pos + 2],
566 content[pos + 3],
567 content[pos + 4],
568 content[pos + 5],
569 content[pos + 6],
570 content[pos + 7],
571 ]);
572 pos += 8;
573 let ref_type = RefType::from_byte(content[pos]);
574 pos += 1;
575
576 if pos + 4 > content.len() {
577 break;
578 }
579 let name_len = u32::from_le_bytes([
580 content[pos],
581 content[pos + 1],
582 content[pos + 2],
583 content[pos + 3],
584 ]) as usize;
585 pos += 4;
586 if pos + name_len > content.len() {
587 break;
588 }
589 let target_collection =
590 String::from_utf8_lossy(&content[pos..pos + name_len]).to_string();
591 pos += name_len;
592
593 let source_id = EntityId::new(source_id);
594 let target_id = EntityId::new(target_id);
595
596 self.cross_refs.write().entry(source_id).or_default().push((
597 target_id,
598 ref_type,
599 target_collection.clone(),
600 ));
601
602 if let Some((collection, mut entity)) = self.get_any(source_id) {
603 let exists = entity.cross_refs().iter().any(|xref| {
604 xref.target == target_id
605 && xref.ref_type == ref_type
606 && xref.target_collection == target_collection
607 });
608 if !exists {
609 entity.cross_refs_mut().push(CrossRef::new(
610 source_id,
611 target_id,
612 target_collection.clone(),
613 ref_type,
614 ));
615 if let Some(manager) = self.get_collection(&collection) {
616 let _ = manager.update(entity);
617 }
618 }
619 }
620 }
621 }
622 }
623 }
624
625 if self.format_version() < STORE_VERSION_V9 {
626 self.set_format_version(STORE_VERSION_V9);
627 }
628
629 Ok(())
630 }
631
632 pub(crate) fn deserialize_entity(
634 data: &[u8],
635 format_version: u32,
636 ) -> Result<UnifiedEntity, StoreError> {
637 let mut pos = 0;
638 Self::read_entity_binary(data, &mut pos, format_version)
639 .map_err(|e| StoreError::Serialization(e.to_string()))
640 }
641
642 pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
644 let mut buf = Vec::with_capacity(256);
649 Self::write_entity_binary(&mut buf, entity, format_version);
650 buf
651 }
652
653 pub(crate) fn serialize_entity_record(
654 entity: &UnifiedEntity,
655 metadata: Option<&Metadata>,
656 format_version: u32,
657 ) -> Vec<u8> {
658 let entity_bytes = Self::serialize_entity(entity, format_version);
659 let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
665 if has_meta {
666 let metadata_bytes = serialize_metadata(metadata);
667 let mut buf = Vec::with_capacity(12 + entity_bytes.len() + metadata_bytes.len());
668 buf.extend_from_slice(ENTITY_RECORD_MAGIC);
669 buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
670 buf.extend_from_slice(&entity_bytes);
671 buf.extend_from_slice(&(metadata_bytes.len() as u32).to_le_bytes());
672 buf.extend_from_slice(&metadata_bytes);
673 buf
674 } else {
675 let mut buf = Vec::with_capacity(12 + entity_bytes.len());
676 buf.extend_from_slice(ENTITY_RECORD_MAGIC);
677 buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
678 buf.extend_from_slice(&entity_bytes);
679 buf.extend_from_slice(&0u32.to_le_bytes());
680 buf
681 }
682 }
683
684 pub(crate) fn deserialize_entity_record(
685 data: &[u8],
686 format_version: u32,
687 ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
688 if data.len() < 8 || &data[..4] != ENTITY_RECORD_MAGIC {
689 return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
690 }
691
692 let mut pos = 4usize;
693 let entity_len = read_u32(data, &mut pos)? as usize;
694 if pos + entity_len > data.len() {
695 return Err(StoreError::Serialization(
696 "truncated entity record payload".to_string(),
697 ));
698 }
699 let entity = Self::deserialize_entity(&data[pos..pos + entity_len], format_version)?;
700 pos += entity_len;
701
702 let metadata_len = read_u32(data, &mut pos)? as usize;
703 if pos + metadata_len > data.len() {
704 return Err(StoreError::Serialization(
705 "truncated entity record metadata".to_string(),
706 ));
707 }
708 let metadata = if metadata_len == 0 {
709 None
710 } else {
711 let metadata = deserialize_metadata(&data[pos..pos + metadata_len])?;
712 if metadata.is_empty() {
713 None
714 } else {
715 Some(metadata)
716 }
717 };
718
719 Ok((entity, metadata))
720 }
721
722 pub fn persist(&self) -> Result<(), StoreError> {
727 let pager = match &self.pager {
728 Some(p) => p,
729 None => {
730 if let Some(path) = &self.db_path {
732 return self
733 .save_to_file(path)
734 .map_err(|e| StoreError::Serialization(e.to_string()));
735 }
736 return Err(StoreError::Io(std::io::Error::other(
737 "No pager or path configured for persistence",
738 )));
739 }
740 };
741
742 match pager.read_page(1) {
743 Ok(_) => {}
744 Err(PagerError::PageNotFound(_)) => {
745 let meta_page = pager
746 .allocate_page(crate::storage::engine::PageType::Header)
747 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
748 pager
749 .write_page(meta_page.page_id(), meta_page)
750 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
751 }
752 Err(e) => {
753 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
754 }
755 }
756
757 if let Some(commit) = &self.commit {
758 commit.force_sync().map_err(StoreError::Io)?;
759 }
760
761 let collections = self.collections.read();
762 let mut btree_indices = self.btree_indices.write();
763
764 let mut collection_roots: Vec<(String, u32)> = Vec::new();
766
767 for (name, manager) in collections.iter() {
770 let btree = btree_indices
771 .entry(name.clone())
772 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
773
774 let mut existing_keys = Vec::new();
775 if !btree.is_empty() {
776 let mut cursor = btree.cursor_first().map_err(|e| {
777 StoreError::Io(std::io::Error::other(format!(
778 "B-tree cursor error while rebuilding '{name}': {e}"
779 )))
780 })?;
781 while let Some((key, _)) = cursor.next().map_err(|e| {
782 StoreError::Io(std::io::Error::other(format!(
783 "B-tree scan error while rebuilding '{name}': {e}"
784 )))
785 })? {
786 existing_keys.push(key);
787 }
788 }
789
790 for key in existing_keys {
791 btree.delete(&key).map_err(|e| {
792 StoreError::Io(std::io::Error::other(format!(
793 "B-tree delete error while rebuilding '{name}': {e}"
794 )))
795 })?;
796 }
797
798 let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
799 .query_all(|_| true)
800 .into_iter()
801 .map(|entity| {
802 let metadata = manager.get_metadata(entity.id);
803 (
804 entity.id.raw().to_be_bytes().to_vec(),
805 Self::serialize_entity_record(
806 &entity,
807 metadata.as_ref(),
808 self.format_version(),
809 ),
810 )
811 })
812 .collect();
813 records.sort_by(|left, right| left.0.cmp(&right.0));
814
815 let max_value_size = crate::storage::engine::btree::MAX_VALUE_SIZE;
827 let original_len = records.len();
828 records.retain(|(_, value)| {
829 if value.len() > max_value_size {
830 tracing::warn!(
834 collection = %reddb_wire::audit_safe_log_field(name),
835 bytes = value.len(),
836 max = max_value_size,
837 "skipping oversized row during B-tree bulk rebuild"
838 );
839 false
840 } else {
841 true
842 }
843 });
844 let dropped = original_len - records.len();
845 if dropped > 0 {
846 let safe_name = format!("{}", reddb_wire::audit_safe_log_field(name));
850 tracing::warn!(
851 collection = %safe_name,
852 dropped,
853 "dropped {dropped} oversized row(s) from '{safe_name}' on rebuild — \
854 the rows remain readable via the in-memory entity store but \
855 are absent from the on-disk B-tree until they are rewritten"
856 );
857 }
858
859 if !records.is_empty() {
860 btree.bulk_insert_sorted(&records).map_err(|e| {
861 StoreError::Io(std::io::Error::other(format!(
862 "B-tree bulk rebuild error for '{name}': {e}"
863 )))
864 })?;
865 }
866
867 collection_roots.push((name.clone(), btree.root_page_id()));
868 }
869
870 let mut meta_data = Vec::with_capacity(4096);
872
873 let format_version = STORE_VERSION_V9;
874 self.set_format_version(format_version);
875
876 meta_data.extend_from_slice(METADATA_MAGIC);
878 meta_data.extend_from_slice(&format_version.to_le_bytes());
879 meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
880
881 for (name, root_page) in &collection_roots {
883 meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
885 meta_data.extend_from_slice(name.as_bytes());
887 meta_data.extend_from_slice(&root_page.to_le_bytes());
889 }
890
891 let cross_refs = self.cross_refs.read();
893 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
894 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
895 for (source_id, refs) in cross_refs.iter() {
896 for (target_id, ref_type, collection) in refs {
897 meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
898 meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
899 meta_data.push(ref_type.to_byte());
900 meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
901 meta_data.extend_from_slice(collection.as_bytes());
902 }
903 }
904
905 let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
907 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
908
909 pager
912 .write_meta_shadow(&meta_page)
913 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
914
915 pager
917 .write_page(1, meta_page)
918 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
919
920 pager
922 .sync()
923 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
924
925 if let Some(commit) = &self.commit {
926 commit.truncate().map_err(StoreError::Io)?;
927 }
928
929 Ok(())
930 }
931
932 pub fn is_paged(&self) -> bool {
934 self.pager.is_some()
935 }
936
937 pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
940 self.btree_indices
941 .read()
942 .get(collection)
943 .map(|btree| btree.root_page_id())
944 .filter(|root| *root != 0)
945 }
946
947 pub fn db_path(&self) -> Option<&Path> {
949 self.db_path.as_deref()
950 }
951}
952
953fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
954 let Some(metadata) = metadata else {
955 return Vec::new();
956 };
957 if metadata.is_empty() {
958 return Vec::new();
959 }
960
961 let mut entries: Vec<_> = metadata.iter().collect();
962 entries.sort_by_key(|(a, _)| *a);
963
964 let mut buf = Vec::new();
965 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
966 for (key, value) in entries {
967 write_string(&mut buf, key);
968 write_metadata_value(&mut buf, value);
969 }
970 buf
971}
972
973fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
974 let mut pos = 0usize;
975 let count = read_u32(data, &mut pos)? as usize;
976 let mut metadata = Metadata::new();
977 for _ in 0..count {
978 let key = read_string(data, &mut pos)?;
979 let value = read_metadata_value(data, &mut pos)?;
980 metadata.set(key, value);
981 }
982 Ok(metadata)
983}
984
985fn write_string(buf: &mut Vec<u8>, value: &str) {
986 buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
987 buf.extend_from_slice(value.as_bytes());
988}
989
990fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
991 buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
992 buf.extend_from_slice(value);
993}
994
995fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
996 use crate::storage::unified::RefTarget;
997
998 match target {
999 RefTarget::TableRow { table, row_id } => {
1000 buf.push(0);
1001 write_string(buf, table);
1002 buf.extend_from_slice(&row_id.to_le_bytes());
1003 }
1004 RefTarget::Node {
1005 collection,
1006 node_id,
1007 } => {
1008 buf.push(1);
1009 write_string(buf, collection);
1010 buf.extend_from_slice(&node_id.raw().to_le_bytes());
1011 }
1012 RefTarget::Edge {
1013 collection,
1014 edge_id,
1015 } => {
1016 buf.push(2);
1017 write_string(buf, collection);
1018 buf.extend_from_slice(&edge_id.raw().to_le_bytes());
1019 }
1020 RefTarget::Vector {
1021 collection,
1022 vector_id,
1023 } => {
1024 buf.push(3);
1025 write_string(buf, collection);
1026 buf.extend_from_slice(&vector_id.raw().to_le_bytes());
1027 }
1028 RefTarget::Entity {
1029 collection,
1030 entity_id,
1031 } => {
1032 buf.push(4);
1033 write_string(buf, collection);
1034 buf.extend_from_slice(&entity_id.raw().to_le_bytes());
1035 }
1036 }
1037}
1038
1039fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
1040 match value {
1041 MetadataValue::Null => buf.push(0),
1042 MetadataValue::Bool(v) => {
1043 buf.push(1);
1044 buf.push(u8::from(*v));
1045 }
1046 MetadataValue::Int(v) => {
1047 buf.push(2);
1048 buf.extend_from_slice(&v.to_le_bytes());
1049 }
1050 MetadataValue::Float(v) => {
1051 buf.push(3);
1052 buf.extend_from_slice(&v.to_le_bytes());
1053 }
1054 MetadataValue::String(v) => {
1055 buf.push(4);
1056 write_string(buf, v);
1057 }
1058 MetadataValue::Bytes(v) => {
1059 buf.push(5);
1060 write_bytes(buf, v);
1061 }
1062 MetadataValue::Array(values) => {
1063 buf.push(6);
1064 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1065 for value in values {
1066 write_metadata_value(buf, value);
1067 }
1068 }
1069 MetadataValue::Object(values) => {
1070 buf.push(7);
1071 let mut entries: Vec<_> = values.iter().collect();
1072 entries.sort_by_key(|(a, _)| *a);
1073 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
1074 for (key, value) in entries {
1075 write_string(buf, key);
1076 write_metadata_value(buf, value);
1077 }
1078 }
1079 MetadataValue::Timestamp(v) => {
1080 buf.push(8);
1081 buf.extend_from_slice(&v.to_le_bytes());
1082 }
1083 MetadataValue::Geo { lat, lon } => {
1084 buf.push(9);
1085 buf.extend_from_slice(&lat.to_le_bytes());
1086 buf.extend_from_slice(&lon.to_le_bytes());
1087 }
1088 MetadataValue::Reference(target) => {
1089 buf.push(10);
1090 write_ref_target(buf, target);
1091 }
1092 MetadataValue::References(targets) => {
1093 buf.push(11);
1094 buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
1095 for target in targets {
1096 write_ref_target(buf, target);
1097 }
1098 }
1099 }
1100}
1101
1102fn read_exact_slice<'a>(
1103 data: &'a [u8],
1104 pos: &mut usize,
1105 len: usize,
1106) -> Result<&'a [u8], StoreError> {
1107 if *pos + len > data.len() {
1108 return Err(StoreError::Serialization(
1109 "truncated metadata payload".to_string(),
1110 ));
1111 }
1112 let slice = &data[*pos..*pos + len];
1113 *pos += len;
1114 Ok(slice)
1115}
1116
1117fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
1118 let bytes = read_exact_slice(data, pos, 4)?;
1119 let mut raw = [0u8; 4];
1120 raw.copy_from_slice(bytes);
1121 Ok(u32::from_le_bytes(raw))
1122}
1123
1124fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
1125 let bytes = read_exact_slice(data, pos, 8)?;
1126 let mut raw = [0u8; 8];
1127 raw.copy_from_slice(bytes);
1128 Ok(u64::from_le_bytes(raw))
1129}
1130
1131fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
1132 let bytes = read_exact_slice(data, pos, 8)?;
1133 let mut raw = [0u8; 8];
1134 raw.copy_from_slice(bytes);
1135 Ok(i64::from_le_bytes(raw))
1136}
1137
1138fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
1139 let bytes = read_exact_slice(data, pos, 8)?;
1140 let mut raw = [0u8; 8];
1141 raw.copy_from_slice(bytes);
1142 Ok(f64::from_le_bytes(raw))
1143}
1144
1145fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
1146 let bytes = read_exact_slice(data, pos, 1)?;
1147 Ok(bytes[0])
1148}
1149
1150fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
1151 let len = read_u32(data, pos)? as usize;
1152 let bytes = read_exact_slice(data, pos, len)?;
1153 String::from_utf8(bytes.to_vec()).map_err(|err| StoreError::Serialization(err.to_string()))
1154}
1155
1156fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
1157 let len = read_u32(data, pos)? as usize;
1158 Ok(read_exact_slice(data, pos, len)?.to_vec())
1159}
1160
1161fn read_ref_target(
1162 data: &[u8],
1163 pos: &mut usize,
1164) -> Result<crate::storage::unified::RefTarget, StoreError> {
1165 use crate::storage::unified::RefTarget;
1166
1167 match read_u8(data, pos)? {
1168 0 => Ok(RefTarget::TableRow {
1169 table: read_string(data, pos)?,
1170 row_id: read_u64(data, pos)?,
1171 }),
1172 1 => Ok(RefTarget::Node {
1173 collection: read_string(data, pos)?,
1174 node_id: EntityId::new(read_u64(data, pos)?),
1175 }),
1176 2 => Ok(RefTarget::Edge {
1177 collection: read_string(data, pos)?,
1178 edge_id: EntityId::new(read_u64(data, pos)?),
1179 }),
1180 3 => Ok(RefTarget::Vector {
1181 collection: read_string(data, pos)?,
1182 vector_id: EntityId::new(read_u64(data, pos)?),
1183 }),
1184 4 => Ok(RefTarget::Entity {
1185 collection: read_string(data, pos)?,
1186 entity_id: EntityId::new(read_u64(data, pos)?),
1187 }),
1188 tag => Err(StoreError::Serialization(format!(
1189 "unknown metadata ref target tag {tag}"
1190 ))),
1191 }
1192}
1193
1194fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1195 match read_u8(data, pos)? {
1196 0 => Ok(MetadataValue::Null),
1197 1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1198 2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1199 3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1200 4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1201 5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1202 6 => {
1203 let count = read_u32(data, pos)? as usize;
1204 let mut values = Vec::with_capacity(count);
1205 for _ in 0..count {
1206 values.push(read_metadata_value(data, pos)?);
1207 }
1208 Ok(MetadataValue::Array(values))
1209 }
1210 7 => {
1211 let count = read_u32(data, pos)? as usize;
1212 let mut values = std::collections::HashMap::with_capacity(count);
1213 for _ in 0..count {
1214 let key = read_string(data, pos)?;
1215 let value = read_metadata_value(data, pos)?;
1216 values.insert(key, value);
1217 }
1218 Ok(MetadataValue::Object(values))
1219 }
1220 8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1221 9 => Ok(MetadataValue::Geo {
1222 lat: read_f64(data, pos)?,
1223 lon: read_f64(data, pos)?,
1224 }),
1225 10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1226 11 => {
1227 let count = read_u32(data, pos)? as usize;
1228 let mut targets = Vec::with_capacity(count);
1229 for _ in 0..count {
1230 targets.push(read_ref_target(data, pos)?);
1231 }
1232 Ok(MetadataValue::References(targets))
1233 }
1234 tag => Err(StoreError::Serialization(format!(
1235 "unknown metadata value tag {tag}"
1236 ))),
1237 }
1238}