1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5const ENTITY_RECORD_MAGIC: &[u8; 4] = b"RER1";
6
7const METADATA_OVERFLOW_MAGIC: &[u8; 4] = b"RDM3";
26const META_PAGE_CONTENT_CAP: usize =
27 crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE;
28const META_V3_PAGE1_HEADER: usize = 16;
29const META_V3_OVERFLOW_HEADER: usize = 8;
30const META_V3_FIRST_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_PAGE1_HEADER;
31const META_V3_OVERFLOW_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_OVERFLOW_HEADER;
32
33fn free_existing_overflow_chain(pager: &Pager) -> Result<(), PagerError> {
34 let cs = crate::storage::engine::HEADER_SIZE;
35 let page = match pager.read_page(1) {
36 Ok(p) => p,
37 Err(_) => return Ok(()),
38 };
39 let bytes = page.as_bytes();
40 if bytes.len() < cs + META_V3_PAGE1_HEADER {
41 return Ok(());
42 }
43 if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
44 return Ok(());
45 }
46 let mut next = u32::from_le_bytes([
47 bytes[cs + 12],
48 bytes[cs + 13],
49 bytes[cs + 14],
50 bytes[cs + 15],
51 ]);
52 while next != 0 {
53 let ov = match pager.read_page(next) {
54 Ok(p) => p,
55 Err(_) => break,
56 };
57 let ob = ov.as_bytes();
58 let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
59 let _ = pager.free_page(next);
60 next = nn;
61 }
62 Ok(())
63}
64
65fn build_meta_page1_with_overflow(
66 pager: &Pager,
67 meta_data: &[u8],
68) -> Result<crate::storage::engine::Page, PagerError> {
69 use crate::storage::engine::{Page, PageType, HEADER_SIZE};
70 free_existing_overflow_chain(pager)?;
71
72 let mut page1 = Page::new(PageType::Header, 1);
73 let cs = HEADER_SIZE;
74
75 if meta_data.len() <= META_PAGE_CONTENT_CAP {
76 let buf = page1.as_bytes_mut();
78 buf[cs..cs + meta_data.len()].copy_from_slice(meta_data);
79 return Ok(page1);
80 }
81
82 let first_chunk = &meta_data[..META_V3_FIRST_PAYLOAD_CAP];
86 let mut tail = &meta_data[META_V3_FIRST_PAYLOAD_CAP..];
87 let mut chunks: Vec<&[u8]> = Vec::new();
88 while !tail.is_empty() {
89 let take = tail.len().min(META_V3_OVERFLOW_PAYLOAD_CAP);
90 chunks.push(&tail[..take]);
91 tail = &tail[take..];
92 }
93
94 let mut overflow_pages: Vec<Page> = Vec::with_capacity(chunks.len());
95 let mut overflow_ids: Vec<u32> = Vec::with_capacity(chunks.len());
96 for _ in 0..chunks.len() {
97 let pg = pager.allocate_page(PageType::Overflow)?;
98 overflow_ids.push(pg.page_id());
99 overflow_pages.push(pg);
100 }
101
102 for i in 0..chunks.len() {
103 let next = if i + 1 < chunks.len() {
104 overflow_ids[i + 1]
105 } else {
106 0u32
107 };
108 let len = chunks[i].len() as u32;
109 let buf = overflow_pages[i].as_bytes_mut();
110 buf[cs..cs + 4].copy_from_slice(&next.to_le_bytes());
111 buf[cs + 4..cs + 8].copy_from_slice(&len.to_le_bytes());
112 buf[cs + 8..cs + 8 + chunks[i].len()].copy_from_slice(chunks[i]);
113 }
114 for (idx, page) in overflow_pages.into_iter().enumerate() {
115 let id = overflow_ids[idx];
116 pager.write_page(id, page)?;
117 }
118
119 let format_version = if meta_data.len() >= 8 && &meta_data[0..4] == METADATA_MAGIC {
121 u32::from_le_bytes([meta_data[4], meta_data[5], meta_data[6], meta_data[7]])
122 } else {
123 0
124 };
125
126 let buf = page1.as_bytes_mut();
127 buf[cs..cs + 4].copy_from_slice(METADATA_OVERFLOW_MAGIC);
128 buf[cs + 4..cs + 8].copy_from_slice(&format_version.to_le_bytes());
129 buf[cs + 8..cs + 12].copy_from_slice(&(meta_data.len() as u32).to_le_bytes());
130 buf[cs + 12..cs + 16].copy_from_slice(&overflow_ids[0].to_le_bytes());
131 buf[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_chunk.len()]
132 .copy_from_slice(first_chunk);
133
134 Ok(page1)
135}
136
137fn read_meta_payload(pager: &Pager) -> Option<Vec<u8>> {
143 let cs = crate::storage::engine::HEADER_SIZE;
144 let meta_page = pager
145 .read_page(1)
146 .or_else(|_| pager.recover_meta_from_shadow())
147 .ok()?;
148 let bytes = meta_page.as_bytes();
149 if bytes.len() < cs + 4 {
150 return Some(bytes.get(cs..).unwrap_or(&[]).to_vec());
151 }
152 if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
153 return Some(bytes[cs..].to_vec());
154 }
155 if bytes.len() < cs + META_V3_PAGE1_HEADER {
156 return None;
157 }
158 let total =
159 u32::from_le_bytes([bytes[cs + 8], bytes[cs + 9], bytes[cs + 10], bytes[cs + 11]]) as usize;
160 let mut next = u32::from_le_bytes([
161 bytes[cs + 12],
162 bytes[cs + 13],
163 bytes[cs + 14],
164 bytes[cs + 15],
165 ]);
166 let mut payload: Vec<u8> = Vec::with_capacity(total);
167 let first_take = total.min(META_V3_FIRST_PAYLOAD_CAP);
168 payload.extend_from_slice(
169 &bytes[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_take],
170 );
171 while next != 0 && payload.len() < total {
172 let ov = pager.read_page(next).ok()?;
173 let ob = ov.as_bytes();
174 if ob.len() < cs + META_V3_OVERFLOW_HEADER {
175 return None;
176 }
177 let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
178 let len = u32::from_le_bytes([ob[cs + 4], ob[cs + 5], ob[cs + 6], ob[cs + 7]]) as usize;
179 let remaining = total - payload.len();
180 let take = len.min(remaining).min(META_V3_OVERFLOW_PAYLOAD_CAP);
181 payload.extend_from_slice(
182 &ob[cs + META_V3_OVERFLOW_HEADER..cs + META_V3_OVERFLOW_HEADER + take],
183 );
184 next = nn;
185 }
186 Some(payload)
187}
188
189impl UnifiedStore {
190 pub(crate) fn mark_paged_registry_dirty(&self) {
191 self.paged_registry_dirty.store(true, Ordering::Release);
192 }
193
194 pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
200 let pager = self.pager.as_ref()?;
201 if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
202 return Some(btree);
203 }
204 let mut write = self.btree_indices.write();
205 let btree = write
206 .entry(collection.to_string())
207 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
208 .clone();
209 Some(btree)
210 }
211
212 pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
213 let Some(pager) = &self.pager else {
214 return Ok(());
215 };
216
217 if self.paged_registry_dirty.load(Ordering::Acquire) {
218 self.flush_paged_registry()?;
219 self.paged_registry_dirty.store(false, Ordering::Release);
220 return Ok(());
221 }
222
223 pager
224 .flush()
225 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
226 }
227
228 pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
229 let Some(pager) = &self.pager else {
230 return Ok(());
231 };
232
233 match pager.read_page(1) {
234 Ok(_) => {}
235 Err(PagerError::PageNotFound(_)) => {
236 let meta_page = pager
237 .allocate_page(crate::storage::engine::PageType::Header)
238 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
239 pager
240 .write_page(meta_page.page_id(), meta_page)
241 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
242 }
243 Err(e) => {
244 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
245 }
246 }
247
248 let format_version = STORE_VERSION_V9;
249 self.set_format_version(format_version);
250
251 let collections = self.collections.read();
252 let btree_indices = self.btree_indices.read();
253 let mut collection_roots = Vec::with_capacity(collections.len());
254 for (name, _) in collections.iter() {
255 let root_page = btree_indices
256 .get(name)
257 .map_or(0, |btree| btree.root_page_id());
258 collection_roots.push((name.clone(), root_page));
259 }
260 drop(btree_indices);
261 drop(collections);
262
263 let mut meta_data = Vec::with_capacity(4096);
264 meta_data.extend_from_slice(METADATA_MAGIC);
265 meta_data.extend_from_slice(&format_version.to_le_bytes());
266 meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
267 for (name, root_page) in &collection_roots {
268 meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
269 meta_data.extend_from_slice(name.as_bytes());
270 meta_data.extend_from_slice(&root_page.to_le_bytes());
271 }
272
273 let cross_refs = self.cross_refs.read();
274 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
275 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
276 for (source_id, refs) in cross_refs.iter() {
277 for (target_id, ref_type, collection) in refs {
278 meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
279 meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
280 meta_data.push(ref_type.to_byte());
281 meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
282 meta_data.extend_from_slice(collection.as_bytes());
283 }
284 }
285
286 let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
287 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
288
289 pager
290 .write_meta_shadow(&meta_page)
291 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
292 pager
293 .write_page(1, meta_page)
294 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
295 pager
296 .flush()
297 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
298
299 Ok(())
300 }
301
302 pub fn pager(&self) -> Option<&Arc<Pager>> {
304 self.pager.as_ref()
305 }
306
307 pub fn config(&self) -> &UnifiedStoreConfig {
311 &self.config
312 }
313
314 pub fn with_config(config: UnifiedStoreConfig) -> Self {
315 Self {
316 config,
317 format_version: AtomicU32::new(STORE_VERSION_V9),
318 next_entity_id: AtomicU64::new(1),
319 collections: RwLock::new(HashMap::new()),
320 cross_refs: RwLock::new(HashMap::new()),
321 reverse_refs: RwLock::new(HashMap::new()),
322 pager: None,
323 db_path: None,
324 btree_indices: RwLock::new(HashMap::new()),
325 context_index: ContextIndex::new(),
326 entity_cache: EntityCache::new(),
327 graph_label_index: RwLock::new(HashMap::new()),
328 paged_registry_dirty: AtomicBool::new(false),
329 commit: None,
330 unindex_cross_refs_fast_path: AtomicU64::new(0),
331 }
332 }
333
334 pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
352 Self::open_with_config(path, UnifiedStoreConfig::default())
353 }
354
355 pub fn open_with_config(
356 path: impl AsRef<Path>,
357 config: UnifiedStoreConfig,
358 ) -> Result<Self, StoreError> {
359 let path = path.as_ref();
360 let mut pager_config = PagerConfig::default();
361 if matches!(
369 std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
370 Some("0") | Some("false") | Some("off")
371 ) {
372 pager_config.double_write = false;
373 }
374 let pager = Pager::open(path, pager_config)
375 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
376
377 let wal_path = Self::wal_path_for_db(path);
378 let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
379 Some(Arc::new(
380 StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
381 .map_err(StoreError::Io)?,
382 ))
383 } else {
384 None
385 };
386
387 let store = Self {
388 config,
389 format_version: AtomicU32::new(STORE_VERSION_V9),
390 next_entity_id: AtomicU64::new(1),
391 collections: RwLock::new(HashMap::new()),
392 cross_refs: RwLock::new(HashMap::new()),
393 reverse_refs: RwLock::new(HashMap::new()),
394 pager: Some(Arc::new(pager)),
395 db_path: Some(path.to_path_buf()),
396 btree_indices: RwLock::new(HashMap::new()),
397 context_index: ContextIndex::new(),
398 entity_cache: EntityCache::new(),
399 graph_label_index: RwLock::new(HashMap::new()),
400 paged_registry_dirty: AtomicBool::new(false),
401 commit,
402 unindex_cross_refs_fast_path: AtomicU64::new(0),
403 };
404
405 store.load_from_pages()?;
407 if let Some(commit) = &store.commit {
408 commit.replay_into(&store).map_err(StoreError::Io)?;
409 }
410
411 Ok(store)
412 }
413
414 fn load_from_pages(&self) -> Result<(), StoreError> {
418 let pager = match &self.pager {
419 Some(p) => p,
420 None => return Ok(()), };
422
423 let page_count = pager.page_count().map_err(|e| {
425 StoreError::Io(std::io::Error::other(format!(
426 "failed to read page count: {}",
427 e
428 )))
429 })?;
430 if page_count <= 1 {
431 return Ok(());
433 }
434
435 if let Some(content_vec) = read_meta_payload(pager) {
440 let content: &[u8] = &content_vec;
441 if content.len() >= 4 {
442 let mut pos = 0;
443 let mut format_version = STORE_VERSION_V1;
444
445 if content.len() >= 8 && &content[0..4] == METADATA_MAGIC {
446 format_version =
447 u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
448 pos += 8;
449 }
450
451 self.set_format_version(format_version);
452
453 let collection_count = u32::from_le_bytes([
455 content[pos],
456 content[pos + 1],
457 content[pos + 2],
458 content[pos + 3],
459 ]) as usize;
460 pos += 4;
461
462 for _ in 0..collection_count {
464 if pos + 4 > content.len() {
465 break;
466 }
467
468 let name_len = u32::from_le_bytes([
469 content[pos],
470 content[pos + 1],
471 content[pos + 2],
472 content[pos + 3],
473 ]) as usize;
474 pos += 4;
475
476 if pos + name_len + 4 > content.len() {
477 break;
478 }
479
480 if let Ok(name) = String::from_utf8(content[pos..pos + name_len].to_vec()) {
481 pos += name_len;
482
483 let root_page = u32::from_le_bytes([
485 content[pos],
486 content[pos + 1],
487 content[pos + 2],
488 content[pos + 3],
489 ]);
490 pos += 4;
491
492 let _ = self.create_collection_in_memory(&name);
496
497 if root_page > 0 {
499 let btree = BTree::with_root(Arc::clone(pager), root_page);
500
501 if let Ok(mut cursor) = btree.cursor_first() {
503 let manager = self.get_collection(&name);
504 while let Ok(Some((key, value))) = cursor.next() {
505 if let Ok((entity, metadata)) = Self::deserialize_entity_record(
507 &value,
508 self.format_version(),
509 ) {
510 if let Some(m) = &manager {
511 let id = entity.id;
512 if let EntityKind::TableRow { row_id, .. } =
513 &entity.kind
514 {
515 m.register_row_id(*row_id);
516 }
517 self.context_index.index_entity(&name, &entity);
518 let _ = m.insert(entity.clone());
519 if let Some(metadata) = metadata {
520 let _ = m.set_metadata(id, metadata);
521 }
522 self.register_entity_id(id);
523 if self.config.auto_index_refs {
524 self.index_cross_refs(&entity, &name)?;
525 }
526 }
527 }
528 }
529 }
530
531 self.btree_indices.write().insert(name, Arc::new(btree));
533 }
534 } else {
535 pos += name_len + 4;
536 }
537 }
538
539 if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
540 let cross_ref_count = u32::from_le_bytes([
541 content[pos],
542 content[pos + 1],
543 content[pos + 2],
544 content[pos + 3],
545 ]) as usize;
546 pos += 4;
547
548 for _ in 0..cross_ref_count {
549 if pos + 17 > content.len() {
550 break;
551 }
552 let source_id = u64::from_le_bytes([
553 content[pos],
554 content[pos + 1],
555 content[pos + 2],
556 content[pos + 3],
557 content[pos + 4],
558 content[pos + 5],
559 content[pos + 6],
560 content[pos + 7],
561 ]);
562 pos += 8;
563 let target_id = u64::from_le_bytes([
564 content[pos],
565 content[pos + 1],
566 content[pos + 2],
567 content[pos + 3],
568 content[pos + 4],
569 content[pos + 5],
570 content[pos + 6],
571 content[pos + 7],
572 ]);
573 pos += 8;
574 let ref_type = RefType::from_byte(content[pos]);
575 pos += 1;
576
577 if pos + 4 > content.len() {
578 break;
579 }
580 let name_len = u32::from_le_bytes([
581 content[pos],
582 content[pos + 1],
583 content[pos + 2],
584 content[pos + 3],
585 ]) as usize;
586 pos += 4;
587 if pos + name_len > content.len() {
588 break;
589 }
590 let target_collection =
591 String::from_utf8_lossy(&content[pos..pos + name_len]).to_string();
592 pos += name_len;
593
594 let source_id = EntityId::new(source_id);
595 let target_id = EntityId::new(target_id);
596
597 self.cross_refs.write().entry(source_id).or_default().push((
598 target_id,
599 ref_type,
600 target_collection.clone(),
601 ));
602
603 if let Some((collection, mut entity)) = self.get_any(source_id) {
604 let exists = entity.cross_refs().iter().any(|xref| {
605 xref.target == target_id
606 && xref.ref_type == ref_type
607 && xref.target_collection == target_collection
608 });
609 if !exists {
610 entity.cross_refs_mut().push(CrossRef::new(
611 source_id,
612 target_id,
613 target_collection.clone(),
614 ref_type,
615 ));
616 if let Some(manager) = self.get_collection(&collection) {
617 let _ = manager.update(entity);
618 }
619 }
620 }
621 }
622 }
623 }
624 }
625
626 if self.format_version() < STORE_VERSION_V9 {
627 self.set_format_version(STORE_VERSION_V9);
628 }
629
630 Ok(())
631 }
632
633 pub(crate) fn deserialize_entity(
635 data: &[u8],
636 format_version: u32,
637 ) -> Result<UnifiedEntity, StoreError> {
638 let mut pos = 0;
639 Self::read_entity_binary(data, &mut pos, format_version)
640 .map_err(|e| StoreError::Serialization(e.to_string()))
641 }
642
643 pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
645 let mut buf = Vec::with_capacity(256);
650 Self::write_entity_binary(&mut buf, entity, format_version);
651 buf
652 }
653
654 pub(crate) fn serialize_entity_record(
655 entity: &UnifiedEntity,
656 metadata: Option<&Metadata>,
657 format_version: u32,
658 ) -> Vec<u8> {
659 let entity_bytes = Self::serialize_entity(entity, format_version);
660 let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
666 if has_meta {
667 let metadata_bytes = serialize_metadata(metadata);
668 let mut buf = Vec::with_capacity(12 + entity_bytes.len() + metadata_bytes.len());
669 buf.extend_from_slice(ENTITY_RECORD_MAGIC);
670 buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
671 buf.extend_from_slice(&entity_bytes);
672 buf.extend_from_slice(&(metadata_bytes.len() as u32).to_le_bytes());
673 buf.extend_from_slice(&metadata_bytes);
674 buf
675 } else {
676 let mut buf = Vec::with_capacity(12 + entity_bytes.len());
677 buf.extend_from_slice(ENTITY_RECORD_MAGIC);
678 buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
679 buf.extend_from_slice(&entity_bytes);
680 buf.extend_from_slice(&0u32.to_le_bytes());
681 buf
682 }
683 }
684
685 pub(crate) fn deserialize_entity_record(
686 data: &[u8],
687 format_version: u32,
688 ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
689 if data.len() < 8 || &data[..4] != ENTITY_RECORD_MAGIC {
690 return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
691 }
692
693 let mut pos = 4usize;
694 let entity_len = read_u32(data, &mut pos)? as usize;
695 if pos + entity_len > data.len() {
696 return Err(StoreError::Serialization(
697 "truncated entity record payload".to_string(),
698 ));
699 }
700 let entity = Self::deserialize_entity(&data[pos..pos + entity_len], format_version)?;
701 pos += entity_len;
702
703 let metadata_len = read_u32(data, &mut pos)? as usize;
704 if pos + metadata_len > data.len() {
705 return Err(StoreError::Serialization(
706 "truncated entity record metadata".to_string(),
707 ));
708 }
709 let metadata = if metadata_len == 0 {
710 None
711 } else {
712 let metadata = deserialize_metadata(&data[pos..pos + metadata_len])?;
713 if metadata.is_empty() {
714 None
715 } else {
716 Some(metadata)
717 }
718 };
719
720 Ok((entity, metadata))
721 }
722
723 pub fn persist(&self) -> Result<(), StoreError> {
728 let pager = match &self.pager {
729 Some(p) => p,
730 None => {
731 if let Some(path) = &self.db_path {
733 return self
734 .save_to_file(path)
735 .map_err(|e| StoreError::Serialization(e.to_string()));
736 }
737 return Err(StoreError::Io(std::io::Error::other(
738 "No pager or path configured for persistence",
739 )));
740 }
741 };
742
743 match pager.read_page(1) {
744 Ok(_) => {}
745 Err(PagerError::PageNotFound(_)) => {
746 let meta_page = pager
747 .allocate_page(crate::storage::engine::PageType::Header)
748 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
749 pager
750 .write_page(meta_page.page_id(), meta_page)
751 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
752 }
753 Err(e) => {
754 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
755 }
756 }
757
758 if let Some(commit) = &self.commit {
759 commit.force_sync().map_err(StoreError::Io)?;
760 }
761
762 let collections = self.collections.read();
763 let mut btree_indices = self.btree_indices.write();
764
765 let mut collection_roots: Vec<(String, u32)> = Vec::new();
767
768 for (name, manager) in collections.iter() {
771 let btree = btree_indices
772 .entry(name.clone())
773 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
774
775 let mut existing_keys = Vec::new();
776 if !btree.is_empty() {
777 let mut cursor = btree.cursor_first().map_err(|e| {
778 StoreError::Io(std::io::Error::other(format!(
779 "B-tree cursor error while rebuilding '{name}': {e}"
780 )))
781 })?;
782 while let Some((key, _)) = cursor.next().map_err(|e| {
783 StoreError::Io(std::io::Error::other(format!(
784 "B-tree scan error while rebuilding '{name}': {e}"
785 )))
786 })? {
787 existing_keys.push(key);
788 }
789 }
790
791 for key in existing_keys {
792 btree.delete(&key).map_err(|e| {
793 StoreError::Io(std::io::Error::other(format!(
794 "B-tree delete error while rebuilding '{name}': {e}"
795 )))
796 })?;
797 }
798
799 let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
800 .query_all(|_| true)
801 .into_iter()
802 .map(|entity| {
803 let metadata = manager.get_metadata(entity.id);
804 (
805 entity.id.raw().to_be_bytes().to_vec(),
806 Self::serialize_entity_record(
807 &entity,
808 metadata.as_ref(),
809 self.format_version(),
810 ),
811 )
812 })
813 .collect();
814 records.sort_by(|left, right| left.0.cmp(&right.0));
815
816 let max_value_size = crate::storage::engine::btree::MAX_VALUE_SIZE;
828 let original_len = records.len();
829 records.retain(|(_, value)| {
830 if value.len() > max_value_size {
831 tracing::warn!(
835 collection = %reddb_wire::audit_safe_log_field(name),
836 bytes = value.len(),
837 max = max_value_size,
838 "skipping oversized row during B-tree bulk rebuild"
839 );
840 false
841 } else {
842 true
843 }
844 });
845 let dropped = original_len - records.len();
846 if dropped > 0 {
847 let safe_name = format!("{}", reddb_wire::audit_safe_log_field(name));
851 tracing::warn!(
852 collection = %safe_name,
853 dropped,
854 "dropped {dropped} oversized row(s) from '{safe_name}' on rebuild — \
855 the rows remain readable via the in-memory entity store but \
856 are absent from the on-disk B-tree until they are rewritten"
857 );
858 }
859
860 if !records.is_empty() {
861 btree.bulk_insert_sorted(&records).map_err(|e| {
862 StoreError::Io(std::io::Error::other(format!(
863 "B-tree bulk rebuild error for '{name}': {e}"
864 )))
865 })?;
866 }
867
868 collection_roots.push((name.clone(), btree.root_page_id()));
869 }
870
871 let mut meta_data = Vec::with_capacity(4096);
873
874 let format_version = STORE_VERSION_V9;
875 self.set_format_version(format_version);
876
877 meta_data.extend_from_slice(METADATA_MAGIC);
879 meta_data.extend_from_slice(&format_version.to_le_bytes());
880 meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
881
882 for (name, root_page) in &collection_roots {
884 meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
886 meta_data.extend_from_slice(name.as_bytes());
888 meta_data.extend_from_slice(&root_page.to_le_bytes());
890 }
891
892 let cross_refs = self.cross_refs.read();
894 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
895 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
896 for (source_id, refs) in cross_refs.iter() {
897 for (target_id, ref_type, collection) in refs {
898 meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
899 meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
900 meta_data.push(ref_type.to_byte());
901 meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
902 meta_data.extend_from_slice(collection.as_bytes());
903 }
904 }
905
906 let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
908 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
909
910 pager
913 .write_meta_shadow(&meta_page)
914 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
915
916 pager
918 .write_page(1, meta_page)
919 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
920
921 pager
923 .sync()
924 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
925
926 if let Some(commit) = &self.commit {
927 commit.truncate().map_err(StoreError::Io)?;
928 }
929
930 Ok(())
931 }
932
933 pub fn is_paged(&self) -> bool {
935 self.pager.is_some()
936 }
937
938 pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
941 self.btree_indices
942 .read()
943 .get(collection)
944 .map(|btree| btree.root_page_id())
945 .filter(|root| *root != 0)
946 }
947
948 pub fn db_path(&self) -> Option<&Path> {
950 self.db_path.as_deref()
951 }
952}
953
954fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
955 let Some(metadata) = metadata else {
956 return Vec::new();
957 };
958 if metadata.is_empty() {
959 return Vec::new();
960 }
961
962 let mut entries: Vec<_> = metadata.iter().collect();
963 entries.sort_by_key(|(a, _)| *a);
964
965 let mut buf = Vec::new();
966 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
967 for (key, value) in entries {
968 write_string(&mut buf, key);
969 write_metadata_value(&mut buf, value);
970 }
971 buf
972}
973
974fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
975 let mut pos = 0usize;
976 let count = read_u32(data, &mut pos)? as usize;
977 let mut metadata = Metadata::new();
978 for _ in 0..count {
979 let key = read_string(data, &mut pos)?;
980 let value = read_metadata_value(data, &mut pos)?;
981 metadata.set(key, value);
982 }
983 Ok(metadata)
984}
985
986fn write_string(buf: &mut Vec<u8>, value: &str) {
987 buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
988 buf.extend_from_slice(value.as_bytes());
989}
990
991fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
992 buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
993 buf.extend_from_slice(value);
994}
995
996fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
997 use crate::storage::unified::RefTarget;
998
999 match target {
1000 RefTarget::TableRow { table, row_id } => {
1001 buf.push(0);
1002 write_string(buf, table);
1003 buf.extend_from_slice(&row_id.to_le_bytes());
1004 }
1005 RefTarget::Node {
1006 collection,
1007 node_id,
1008 } => {
1009 buf.push(1);
1010 write_string(buf, collection);
1011 buf.extend_from_slice(&node_id.raw().to_le_bytes());
1012 }
1013 RefTarget::Edge {
1014 collection,
1015 edge_id,
1016 } => {
1017 buf.push(2);
1018 write_string(buf, collection);
1019 buf.extend_from_slice(&edge_id.raw().to_le_bytes());
1020 }
1021 RefTarget::Vector {
1022 collection,
1023 vector_id,
1024 } => {
1025 buf.push(3);
1026 write_string(buf, collection);
1027 buf.extend_from_slice(&vector_id.raw().to_le_bytes());
1028 }
1029 RefTarget::Entity {
1030 collection,
1031 entity_id,
1032 } => {
1033 buf.push(4);
1034 write_string(buf, collection);
1035 buf.extend_from_slice(&entity_id.raw().to_le_bytes());
1036 }
1037 }
1038}
1039
1040fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
1041 match value {
1042 MetadataValue::Null => buf.push(0),
1043 MetadataValue::Bool(v) => {
1044 buf.push(1);
1045 buf.push(u8::from(*v));
1046 }
1047 MetadataValue::Int(v) => {
1048 buf.push(2);
1049 buf.extend_from_slice(&v.to_le_bytes());
1050 }
1051 MetadataValue::Float(v) => {
1052 buf.push(3);
1053 buf.extend_from_slice(&v.to_le_bytes());
1054 }
1055 MetadataValue::String(v) => {
1056 buf.push(4);
1057 write_string(buf, v);
1058 }
1059 MetadataValue::Bytes(v) => {
1060 buf.push(5);
1061 write_bytes(buf, v);
1062 }
1063 MetadataValue::Array(values) => {
1064 buf.push(6);
1065 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1066 for value in values {
1067 write_metadata_value(buf, value);
1068 }
1069 }
1070 MetadataValue::Object(values) => {
1071 buf.push(7);
1072 let mut entries: Vec<_> = values.iter().collect();
1073 entries.sort_by_key(|(a, _)| *a);
1074 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
1075 for (key, value) in entries {
1076 write_string(buf, key);
1077 write_metadata_value(buf, value);
1078 }
1079 }
1080 MetadataValue::Timestamp(v) => {
1081 buf.push(8);
1082 buf.extend_from_slice(&v.to_le_bytes());
1083 }
1084 MetadataValue::Geo { lat, lon } => {
1085 buf.push(9);
1086 buf.extend_from_slice(&lat.to_le_bytes());
1087 buf.extend_from_slice(&lon.to_le_bytes());
1088 }
1089 MetadataValue::Reference(target) => {
1090 buf.push(10);
1091 write_ref_target(buf, target);
1092 }
1093 MetadataValue::References(targets) => {
1094 buf.push(11);
1095 buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
1096 for target in targets {
1097 write_ref_target(buf, target);
1098 }
1099 }
1100 }
1101}
1102
1103fn read_exact_slice<'a>(
1104 data: &'a [u8],
1105 pos: &mut usize,
1106 len: usize,
1107) -> Result<&'a [u8], StoreError> {
1108 if *pos + len > data.len() {
1109 return Err(StoreError::Serialization(
1110 "truncated metadata payload".to_string(),
1111 ));
1112 }
1113 let slice = &data[*pos..*pos + len];
1114 *pos += len;
1115 Ok(slice)
1116}
1117
1118fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
1119 let bytes = read_exact_slice(data, pos, 4)?;
1120 let mut raw = [0u8; 4];
1121 raw.copy_from_slice(bytes);
1122 Ok(u32::from_le_bytes(raw))
1123}
1124
1125fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
1126 let bytes = read_exact_slice(data, pos, 8)?;
1127 let mut raw = [0u8; 8];
1128 raw.copy_from_slice(bytes);
1129 Ok(u64::from_le_bytes(raw))
1130}
1131
1132fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
1133 let bytes = read_exact_slice(data, pos, 8)?;
1134 let mut raw = [0u8; 8];
1135 raw.copy_from_slice(bytes);
1136 Ok(i64::from_le_bytes(raw))
1137}
1138
1139fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
1140 let bytes = read_exact_slice(data, pos, 8)?;
1141 let mut raw = [0u8; 8];
1142 raw.copy_from_slice(bytes);
1143 Ok(f64::from_le_bytes(raw))
1144}
1145
1146fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
1147 let bytes = read_exact_slice(data, pos, 1)?;
1148 Ok(bytes[0])
1149}
1150
1151fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
1152 let len = read_u32(data, pos)? as usize;
1153 let bytes = read_exact_slice(data, pos, len)?;
1154 String::from_utf8(bytes.to_vec()).map_err(|err| StoreError::Serialization(err.to_string()))
1155}
1156
1157fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
1158 let len = read_u32(data, pos)? as usize;
1159 Ok(read_exact_slice(data, pos, len)?.to_vec())
1160}
1161
1162fn read_ref_target(
1163 data: &[u8],
1164 pos: &mut usize,
1165) -> Result<crate::storage::unified::RefTarget, StoreError> {
1166 use crate::storage::unified::RefTarget;
1167
1168 match read_u8(data, pos)? {
1169 0 => Ok(RefTarget::TableRow {
1170 table: read_string(data, pos)?,
1171 row_id: read_u64(data, pos)?,
1172 }),
1173 1 => Ok(RefTarget::Node {
1174 collection: read_string(data, pos)?,
1175 node_id: EntityId::new(read_u64(data, pos)?),
1176 }),
1177 2 => Ok(RefTarget::Edge {
1178 collection: read_string(data, pos)?,
1179 edge_id: EntityId::new(read_u64(data, pos)?),
1180 }),
1181 3 => Ok(RefTarget::Vector {
1182 collection: read_string(data, pos)?,
1183 vector_id: EntityId::new(read_u64(data, pos)?),
1184 }),
1185 4 => Ok(RefTarget::Entity {
1186 collection: read_string(data, pos)?,
1187 entity_id: EntityId::new(read_u64(data, pos)?),
1188 }),
1189 tag => Err(StoreError::Serialization(format!(
1190 "unknown metadata ref target tag {tag}"
1191 ))),
1192 }
1193}
1194
1195fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1196 match read_u8(data, pos)? {
1197 0 => Ok(MetadataValue::Null),
1198 1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1199 2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1200 3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1201 4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1202 5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1203 6 => {
1204 let count = read_u32(data, pos)? as usize;
1205 let mut values = Vec::with_capacity(count);
1206 for _ in 0..count {
1207 values.push(read_metadata_value(data, pos)?);
1208 }
1209 Ok(MetadataValue::Array(values))
1210 }
1211 7 => {
1212 let count = read_u32(data, pos)? as usize;
1213 let mut values = std::collections::HashMap::with_capacity(count);
1214 for _ in 0..count {
1215 let key = read_string(data, pos)?;
1216 let value = read_metadata_value(data, pos)?;
1217 values.insert(key, value);
1218 }
1219 Ok(MetadataValue::Object(values))
1220 }
1221 8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1222 9 => Ok(MetadataValue::Geo {
1223 lat: read_f64(data, pos)?,
1224 lon: read_f64(data, pos)?,
1225 }),
1226 10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1227 11 => {
1228 let count = read_u32(data, pos)? as usize;
1229 let mut targets = Vec::with_capacity(count);
1230 for _ in 0..count {
1231 targets.push(read_ref_target(data, pos)?);
1232 }
1233 Ok(MetadataValue::References(targets))
1234 }
1235 tag => Err(StoreError::Serialization(format!(
1236 "unknown metadata value tag {tag}"
1237 ))),
1238 }
1239}