1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5const META_PAGE_CONTENT_CAP: usize =
24 crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE;
25const META_V3_PAGE1_HEADER: usize = reddb_file::METADATA_OVERFLOW_HEADER_BYTES;
26const META_V3_OVERFLOW_HEADER: usize = reddb_file::METADATA_OVERFLOW_CONTINUATION_HEADER_BYTES;
27const META_V3_FIRST_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_PAGE1_HEADER;
28const META_V3_OVERFLOW_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_OVERFLOW_HEADER;
29
30fn free_existing_overflow_chain(pager: &Pager) -> Result<(), PagerError> {
31 let cs = crate::storage::engine::HEADER_SIZE;
32 let page = match pager.read_page(1) {
33 Ok(p) => p,
34 Err(_) => return Ok(()),
35 };
36 let bytes = page.as_bytes();
37 if bytes.len() < cs + META_V3_PAGE1_HEADER {
38 return Ok(());
39 }
40 let Some(header) =
41 reddb_file::decode_native_metadata_overflow_header(&bytes[cs..]).map_err(|err| {
42 PagerError::InvalidDatabase(format!("invalid metadata overflow header: {err}"))
43 })?
44 else {
45 return Ok(());
46 };
47 let mut next = header.next_overflow_page_id;
48 while next != 0 {
49 let ov = match pager.read_page(next) {
50 Ok(p) => p,
51 Err(_) => break,
52 };
53 let ob = ov.as_bytes();
54 let nn = match reddb_file::decode_native_metadata_overflow_continuation_header(&ob[cs..]) {
55 Ok(header) => header.next_overflow_page_id,
56 Err(_) => 0,
57 };
58 let _ = pager.free_page(next);
59 next = nn;
60 }
61 Ok(())
62}
63
64fn build_meta_page1_with_overflow(
65 pager: &Pager,
66 meta_data: &[u8],
67) -> Result<crate::storage::engine::Page, PagerError> {
68 use crate::storage::engine::{Page, PageType, HEADER_SIZE};
69 free_existing_overflow_chain(pager)?;
70
71 let mut page1 = Page::new(PageType::Header, 1);
72 let cs = HEADER_SIZE;
73
74 if meta_data.len() <= META_PAGE_CONTENT_CAP {
75 let buf = page1.as_bytes_mut();
77 buf[cs..cs + meta_data.len()].copy_from_slice(meta_data);
78 return Ok(page1);
79 }
80
81 let first_chunk = &meta_data[..META_V3_FIRST_PAYLOAD_CAP];
85 let mut tail = &meta_data[META_V3_FIRST_PAYLOAD_CAP..];
86 let mut chunks: Vec<&[u8]> = Vec::new();
87 while !tail.is_empty() {
88 let take = tail.len().min(META_V3_OVERFLOW_PAYLOAD_CAP);
89 chunks.push(&tail[..take]);
90 tail = &tail[take..];
91 }
92
93 let mut overflow_pages: Vec<Page> = Vec::with_capacity(chunks.len());
94 let mut overflow_ids: Vec<u32> = Vec::with_capacity(chunks.len());
95 for _ in 0..chunks.len() {
96 let pg = pager.allocate_page(PageType::Overflow)?;
97 overflow_ids.push(pg.page_id());
98 overflow_pages.push(pg);
99 }
100
101 for i in 0..chunks.len() {
102 let next = if i + 1 < chunks.len() {
103 overflow_ids[i + 1]
104 } else {
105 0u32
106 };
107 let len = chunks[i].len() as u32;
108 let buf = overflow_pages[i].as_bytes_mut();
109 reddb_file::encode_native_metadata_overflow_continuation_header(
110 &mut buf[cs..cs + META_V3_OVERFLOW_HEADER],
111 reddb_file::NativeMetadataOverflowContinuationHeader {
112 next_overflow_page_id: next,
113 chunk_bytes: len,
114 },
115 )
116 .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
117 buf[cs + 8..cs + 8 + chunks[i].len()].copy_from_slice(chunks[i]);
118 }
119 for (idx, page) in overflow_pages.into_iter().enumerate() {
120 let id = overflow_ids[idx];
121 pager.write_page(id, page)?;
122 }
123
124 let format_version = reddb_file::decode_native_paged_metadata_header(meta_data)
126 .ok()
127 .flatten()
128 .map_or(0, |header| header.format_version);
129
130 let buf = page1.as_bytes_mut();
131 reddb_file::encode_native_metadata_overflow_header(
132 &mut buf[cs..cs + META_V3_PAGE1_HEADER],
133 reddb_file::NativeMetadataOverflowHeader {
134 format_version,
135 total_payload_bytes: meta_data.len() as u32,
136 next_overflow_page_id: overflow_ids[0],
137 },
138 )
139 .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
140 buf[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_chunk.len()]
141 .copy_from_slice(first_chunk);
142
143 Ok(page1)
144}
145
146fn read_meta_payload(pager: &Pager) -> Option<Vec<u8>> {
152 let cs = crate::storage::engine::HEADER_SIZE;
153 let meta_page = pager
154 .read_page(1)
155 .or_else(|_| pager.recover_meta_from_shadow())
156 .ok()?;
157 let bytes = meta_page.as_bytes();
158 if bytes.len() < cs + 4 {
159 return Some(bytes.get(cs..).unwrap_or(&[]).to_vec());
160 }
161 let header = match reddb_file::decode_native_metadata_overflow_header(&bytes[cs..]).ok()? {
162 Some(header) => header,
163 None => {
164 return Some(bytes[cs..].to_vec());
165 }
166 };
167 if bytes.len() < cs + META_V3_PAGE1_HEADER {
168 return None;
169 }
170 let total = header.total_payload_bytes as usize;
171 let mut next = header.next_overflow_page_id;
172 let mut payload: Vec<u8> = Vec::with_capacity(total);
173 let first_take = total.min(META_V3_FIRST_PAYLOAD_CAP);
174 payload.extend_from_slice(
175 &bytes[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_take],
176 );
177 while next != 0 && payload.len() < total {
178 let ov = pager.read_page(next).ok()?;
179 let ob = ov.as_bytes();
180 if ob.len() < cs + META_V3_OVERFLOW_HEADER {
181 return None;
182 }
183 let continuation =
184 reddb_file::decode_native_metadata_overflow_continuation_header(&ob[cs..]).ok()?;
185 let nn = continuation.next_overflow_page_id;
186 let len = continuation.chunk_bytes as usize;
187 let remaining = total - payload.len();
188 let take = len.min(remaining).min(META_V3_OVERFLOW_PAYLOAD_CAP);
189 payload.extend_from_slice(
190 &ob[cs + META_V3_OVERFLOW_HEADER..cs + META_V3_OVERFLOW_HEADER + take],
191 );
192 next = nn;
193 }
194 Some(payload)
195}
196
197impl UnifiedStore {
198 pub(crate) fn mark_paged_registry_dirty(&self) {
199 self.paged_registry_dirty.store(true, Ordering::Release);
200 }
201
202 pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
208 let pager = self.pager.as_ref()?;
209 if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
210 return Some(btree);
211 }
212 let mut write = self.btree_indices.write();
213 let btree = write
214 .entry(collection.to_string())
215 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
216 .clone();
217 Some(btree)
218 }
219
220 pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
221 let Some(pager) = &self.pager else {
222 return Ok(());
223 };
224
225 if self.paged_registry_dirty.load(Ordering::Acquire) {
226 self.flush_paged_registry()?;
227 self.paged_registry_dirty.store(false, Ordering::Release);
228 return Ok(());
229 }
230
231 pager
232 .flush()
233 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
234 }
235
236 pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
237 let Some(pager) = &self.pager else {
238 return Ok(());
239 };
240
241 match pager.read_page(1) {
242 Ok(_) => {}
243 Err(PagerError::PageNotFound(_)) => {
244 let meta_page = pager
245 .allocate_page(crate::storage::engine::PageType::Header)
246 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
247 pager
248 .write_page(meta_page.page_id(), meta_page)
249 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
250 }
251 Err(e) => {
252 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
253 }
254 }
255
256 let format_version = STORE_VERSION_V9;
257 self.set_format_version(format_version);
258
259 let collections = self.collections.read();
260 let btree_indices = self.btree_indices.read();
261 let mut collection_roots = Vec::with_capacity(collections.len());
262 for (name, _) in collections.iter() {
263 let root_page = btree_indices
264 .get(name)
265 .map_or(0, |btree| btree.root_page_id());
266 collection_roots.push((name.clone(), root_page));
267 }
268 drop(btree_indices);
269 drop(collections);
270
271 let mut meta_data = Vec::with_capacity(4096);
272 reddb_file::encode_native_paged_metadata_header(
273 &mut meta_data,
274 reddb_file::NativePagedMetadataHeader {
275 format_version,
276 collection_count: collection_roots.len() as u32,
277 },
278 );
279 for (name, root_page) in &collection_roots {
280 reddb_file::encode_native_paged_collection_root(&mut meta_data, name, *root_page);
281 }
282
283 let cross_refs = self.cross_refs.read();
284 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
285 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
286 for (source_id, refs) in cross_refs.iter() {
287 for (target_id, ref_type, collection) in refs {
288 reddb_file::encode_native_paged_cross_ref(
289 &mut meta_data,
290 source_id.raw(),
291 target_id.raw(),
292 ref_type.to_byte(),
293 collection,
294 );
295 }
296 }
297
298 let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
299 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
300
301 pager
302 .write_meta_shadow(&meta_page)
303 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
304 pager
305 .write_page(1, meta_page)
306 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
307 pager
308 .flush()
309 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
310
311 Ok(())
312 }
313
314 pub fn pager(&self) -> Option<&Arc<Pager>> {
316 self.pager.as_ref()
317 }
318
319 pub fn config(&self) -> &UnifiedStoreConfig {
323 &self.config
324 }
325
326 pub fn with_config(config: UnifiedStoreConfig) -> Self {
327 Self {
328 config,
329 format_version: AtomicU32::new(STORE_VERSION_V9),
330 next_entity_id: AtomicU64::new(1),
331 collections: RwLock::new(HashMap::new()),
332 cross_refs: RwLock::new(HashMap::new()),
333 reverse_refs: RwLock::new(HashMap::new()),
334 pager: None,
335 db_path: None,
336 btree_indices: RwLock::new(HashMap::new()),
337 context_index: ContextIndex::new(),
338 entity_cache: EntityCache::new(),
339 graph_label_index: RwLock::new(HashMap::new()),
340 paged_registry_dirty: AtomicBool::new(false),
341 commit: None,
342 unindex_cross_refs_fast_path: AtomicU64::new(0),
343 replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
344 aux_metadata: RwLock::new(Vec::new()),
345 }
346 }
347
348 pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
366 Self::open_with_config(path, UnifiedStoreConfig::default())
367 }
368
369 pub fn open_with_config(
370 path: impl AsRef<Path>,
371 config: UnifiedStoreConfig,
372 ) -> Result<Self, StoreError> {
373 let path = path.as_ref();
374 let mut pager_config = PagerConfig::default();
375 if matches!(
383 std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
384 Some("0") | Some("false") | Some("off")
385 ) {
386 pager_config.double_write = false;
387 }
388 let pager = Pager::open(path, pager_config)
389 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
390
391 let wal_path = reddb_file::layout::unified_wal_path(path);
392 let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
393 Some(Arc::new(
394 StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
395 .map_err(StoreError::Io)?,
396 ))
397 } else {
398 None
399 };
400
401 let store = Self {
402 config,
403 format_version: AtomicU32::new(STORE_VERSION_V9),
404 next_entity_id: AtomicU64::new(1),
405 collections: RwLock::new(HashMap::new()),
406 cross_refs: RwLock::new(HashMap::new()),
407 reverse_refs: RwLock::new(HashMap::new()),
408 pager: Some(Arc::new(pager)),
409 db_path: Some(path.to_path_buf()),
410 btree_indices: RwLock::new(HashMap::new()),
411 context_index: ContextIndex::new(),
412 entity_cache: EntityCache::new(),
413 graph_label_index: RwLock::new(HashMap::new()),
414 paged_registry_dirty: AtomicBool::new(false),
415 commit,
416 unindex_cross_refs_fast_path: AtomicU64::new(0),
417 replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
418 aux_metadata: RwLock::new(Vec::new()),
419 };
420
421 store.load_from_pages()?;
423 if let Some(commit) = &store.commit {
424 commit.replay_into(&store).map_err(StoreError::Io)?;
425 }
426 store.recover_operational_manifest()?;
427
428 Ok(store)
429 }
430
431 pub(crate) fn recover_operational_manifest(&self) -> Result<(), StoreError> {
432 let Some(path) = &self.db_path else {
433 return Ok(());
434 };
435 let mut collections = self.list_collections();
436 collections.sort();
437 let pending_drops =
438 crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
439 .recover_or_bootstrap(&collections)
440 .map_err(StoreError::Io)?;
441 for name in pending_drops {
442 if self.get_collection(&name).is_some() {
443 self.drop_collection(&name)?;
444 }
445 }
446 Ok(())
447 }
448
449 pub(crate) fn publish_operational_collection_create(
450 &self,
451 name: &str,
452 ) -> Result<(), StoreError> {
453 let Some(path) = &self.db_path else {
454 return Ok(());
455 };
456 crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
457 .create_collection(name)
458 .map_err(StoreError::Io)
459 }
460
461 pub(crate) fn publish_operational_collection_pending_drop(
462 &self,
463 name: &str,
464 ) -> Result<(), StoreError> {
465 let Some(path) = &self.db_path else {
466 return Ok(());
467 };
468 crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
469 .begin_drop_collection(name)
470 .map_err(StoreError::Io)
471 }
472
473 pub(crate) fn publish_operational_collection_drop_finished(
474 &self,
475 name: &str,
476 ) -> Result<(), StoreError> {
477 let Some(path) = &self.db_path else {
478 return Ok(());
479 };
480 crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
481 .finish_drop_collection(name)
482 .map_err(StoreError::Io)
483 }
484
485 fn load_from_pages(&self) -> Result<(), StoreError> {
489 let pager = match &self.pager {
490 Some(p) => p,
491 None => return Ok(()), };
493
494 let page_count = pager.page_count().map_err(|e| {
496 StoreError::Io(std::io::Error::other(format!(
497 "failed to read page count: {}",
498 e
499 )))
500 })?;
501 if page_count <= 1 {
502 return Ok(());
504 }
505
506 if let Some(content_vec) = read_meta_payload(pager) {
511 let content: &[u8] = &content_vec;
512 if content.len() >= 4 {
513 let mut pos = 0;
514 let mut format_version = STORE_VERSION_V1;
515
516 let collection_count = if let Some(header) =
517 reddb_file::decode_native_paged_metadata_header(content)
518 .map_err(|err| StoreError::Serialization(err.to_string()))?
519 {
520 format_version = header.format_version;
521 pos += reddb_file::METADATA_HEADER_BYTES;
522 header.collection_count as usize
523 } else {
524 let count = u32::from_le_bytes([
525 content[pos],
526 content[pos + 1],
527 content[pos + 2],
528 content[pos + 3],
529 ]) as usize;
530 pos += 4;
531 count
532 };
533
534 self.set_format_version(format_version);
535
536 if pos > content.len() {
537 return Ok(());
538 }
539
540 for _ in 0..collection_count {
542 if let Ok(root) =
543 reddb_file::decode_native_paged_collection_root(content, &mut pos)
544 {
545 let root_page = root.root_page;
547 let name = root.collection;
548
549 let _ = self.create_collection_in_memory(&name);
553
554 if root_page > 0 {
556 let btree = BTree::with_root(Arc::clone(pager), root_page);
557
558 if let Ok(mut cursor) = btree.cursor_first() {
560 let manager = self.get_collection(&name);
561 while let Ok(Some((key, value))) = cursor.next() {
562 if let Ok((entity, metadata)) = Self::deserialize_entity_record(
564 &value,
565 self.format_version(),
566 ) {
567 if let Some(m) = &manager {
568 let id = entity.id;
569 if let EntityKind::TableRow { row_id, .. } =
570 &entity.kind
571 {
572 m.register_row_id(*row_id);
573 }
574 self.context_index.index_entity(&name, &entity);
575 let _ = m.insert(entity.clone());
576 if let Some(metadata) = metadata {
577 let _ = m.set_metadata(id, metadata);
578 }
579 self.register_entity_id(id);
580 if self.config.auto_index_refs {
581 self.index_cross_refs(&entity, &name)?;
582 }
583 }
584 }
585 }
586 }
587
588 self.btree_indices.write().insert(name, Arc::new(btree));
590 }
591 } else {
592 break;
593 }
594 }
595
596 if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
597 let cross_ref_count = u32::from_le_bytes([
598 content[pos],
599 content[pos + 1],
600 content[pos + 2],
601 content[pos + 3],
602 ]) as usize;
603 pos += 4;
604
605 for _ in 0..cross_ref_count {
606 let Ok(cross_ref) =
607 reddb_file::decode_native_paged_cross_ref(content, &mut pos)
608 else {
609 break;
610 };
611 let source_id = EntityId::new(cross_ref.source_id);
612 let target_id = EntityId::new(cross_ref.target_id);
613 let ref_type = RefType::from_byte(cross_ref.ref_type);
614 let target_collection = cross_ref.target_collection;
615
616 self.cross_refs.write().entry(source_id).or_default().push((
617 target_id,
618 ref_type,
619 target_collection.clone(),
620 ));
621
622 if let Some((collection, mut entity)) = self.get_any(source_id) {
623 let exists = entity.cross_refs().iter().any(|xref| {
624 xref.target == target_id
625 && xref.ref_type == ref_type
626 && xref.target_collection == target_collection
627 });
628 if !exists {
629 entity.cross_refs_mut().push(CrossRef::new(
630 source_id,
631 target_id,
632 target_collection.clone(),
633 ref_type,
634 ));
635 if let Some(manager) = self.get_collection(&collection) {
636 let _ = manager.update(entity);
637 }
638 }
639 }
640 }
641 }
642 }
643 }
644
645 if self.format_version() < STORE_VERSION_V9 {
646 self.set_format_version(STORE_VERSION_V9);
647 }
648
649 Ok(())
650 }
651
652 pub(crate) fn deserialize_entity(
654 data: &[u8],
655 format_version: u32,
656 ) -> Result<UnifiedEntity, StoreError> {
657 let mut pos = 0;
658 Self::read_entity_binary(data, &mut pos, format_version)
659 .map_err(|e| StoreError::Serialization(e.to_string()))
660 }
661
662 pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
664 let mut buf = Vec::with_capacity(256);
669 Self::write_entity_binary(&mut buf, entity, format_version);
670 buf
671 }
672
673 pub(crate) fn serialize_entity_record(
674 entity: &UnifiedEntity,
675 metadata: Option<&Metadata>,
676 format_version: u32,
677 ) -> Vec<u8> {
678 let entity_bytes = Self::serialize_entity(entity, format_version);
679 let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
685 if has_meta {
686 let metadata_bytes = serialize_metadata(metadata);
687 reddb_file::encode_native_entity_record_frame(&entity_bytes, Some(&metadata_bytes))
688 } else {
689 reddb_file::encode_native_entity_record_frame(&entity_bytes, None)
690 }
691 }
692
693 pub(crate) fn deserialize_entity_record(
694 data: &[u8],
695 format_version: u32,
696 ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
697 let Some(frame) = reddb_file::decode_native_entity_record_frame(data)
698 .map_err(|err| StoreError::Serialization(err.to_string()))?
699 else {
700 return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
701 };
702
703 let entity = Self::deserialize_entity(frame.entity, format_version)?;
704 let metadata = if frame.metadata.is_empty() {
705 None
706 } else {
707 let metadata = deserialize_metadata(frame.metadata)?;
708 if metadata.is_empty() {
709 None
710 } else {
711 Some(metadata)
712 }
713 };
714
715 Ok((entity, metadata))
716 }
717
718 pub fn persist(&self) -> Result<(), StoreError> {
723 let pager = match &self.pager {
724 Some(p) => p,
725 None => {
726 if let Some(path) = &self.db_path {
728 return self
729 .save_to_file(path)
730 .map_err(|e| StoreError::Serialization(e.to_string()));
731 }
732 return Err(StoreError::Io(std::io::Error::other(
733 "No pager or path configured for persistence",
734 )));
735 }
736 };
737
738 match pager.read_page(1) {
739 Ok(_) => {}
740 Err(PagerError::PageNotFound(_)) => {
741 let meta_page = pager
742 .allocate_page(crate::storage::engine::PageType::Header)
743 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
744 pager
745 .write_page(meta_page.page_id(), meta_page)
746 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
747 }
748 Err(e) => {
749 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
750 }
751 }
752
753 if let Some(commit) = &self.commit {
754 commit.force_sync().map_err(StoreError::Io)?;
755 }
756
757 let collections = self.collections.read();
758 let mut btree_indices = self.btree_indices.write();
759
760 let mut collection_roots: Vec<(String, u32)> = Vec::new();
762
763 for (name, manager) in collections.iter() {
766 let btree = btree_indices
767 .entry(name.clone())
768 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
769
770 let mut existing_keys = Vec::new();
771 if !btree.is_empty() {
772 let mut cursor = btree.cursor_first().map_err(|e| {
773 StoreError::Io(std::io::Error::other(format!(
774 "B-tree cursor error while rebuilding '{name}': {e}"
775 )))
776 })?;
777 while let Some((key, _)) = cursor.next().map_err(|e| {
778 StoreError::Io(std::io::Error::other(format!(
779 "B-tree scan error while rebuilding '{name}': {e}"
780 )))
781 })? {
782 existing_keys.push(key);
783 }
784 }
785
786 for key in existing_keys {
787 btree.delete(&key).map_err(|e| {
788 StoreError::Io(std::io::Error::other(format!(
789 "B-tree delete error while rebuilding '{name}': {e}"
790 )))
791 })?;
792 }
793
794 let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
795 .query_all(|_| true)
796 .into_iter()
797 .map(|entity| {
798 let metadata = manager.get_metadata(entity.id);
799 (
800 entity.id.raw().to_be_bytes().to_vec(),
801 Self::serialize_entity_record(
802 &entity,
803 metadata.as_ref(),
804 self.format_version(),
805 ),
806 )
807 })
808 .collect();
809 records.sort_by(|left, right| left.0.cmp(&right.0));
810
811 if !records.is_empty() {
819 btree.bulk_insert_sorted(&records).map_err(|e| {
820 StoreError::Io(std::io::Error::other(format!(
821 "B-tree bulk rebuild error for '{name}': {e}"
822 )))
823 })?;
824 }
825
826 collection_roots.push((name.clone(), btree.root_page_id()));
827 }
828
829 let mut meta_data = Vec::with_capacity(4096);
831
832 let format_version = STORE_VERSION_V9;
833 self.set_format_version(format_version);
834
835 reddb_file::encode_native_paged_metadata_header(
836 &mut meta_data,
837 reddb_file::NativePagedMetadataHeader {
838 format_version,
839 collection_count: collection_roots.len() as u32,
840 },
841 );
842
843 for (name, root_page) in &collection_roots {
845 reddb_file::encode_native_paged_collection_root(&mut meta_data, name, *root_page);
846 }
847
848 let cross_refs = self.cross_refs.read();
850 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
851 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
852 for (source_id, refs) in cross_refs.iter() {
853 for (target_id, ref_type, collection) in refs {
854 reddb_file::encode_native_paged_cross_ref(
855 &mut meta_data,
856 source_id.raw(),
857 target_id.raw(),
858 ref_type.to_byte(),
859 collection,
860 );
861 }
862 }
863
864 let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
866 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
867
868 pager
871 .write_meta_shadow(&meta_page)
872 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
873
874 pager
876 .write_page(1, meta_page)
877 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
878
879 pager
881 .sync()
882 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
883
884 if let Some(commit) = &self.commit {
885 commit.truncate().map_err(StoreError::Io)?;
886 }
887
888 Ok(())
889 }
890
891 pub fn is_paged(&self) -> bool {
893 self.pager.is_some()
894 }
895
896 pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
899 self.btree_indices
900 .read()
901 .get(collection)
902 .map(|btree| btree.root_page_id())
903 .filter(|root| *root != 0)
904 }
905
906 pub fn db_path(&self) -> Option<&Path> {
908 self.db_path.as_deref()
909 }
910}
911
912fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
913 let Some(metadata) = metadata else {
914 return Vec::new();
915 };
916 if metadata.is_empty() {
917 return Vec::new();
918 }
919
920 let mut entries: Vec<_> = metadata.iter().collect();
921 entries.sort_by_key(|(a, _)| *a);
922
923 let mut buf = Vec::new();
924 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
925 for (key, value) in entries {
926 write_string(&mut buf, key);
927 write_metadata_value(&mut buf, value);
928 }
929 buf
930}
931
932fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
933 let mut pos = 0usize;
934 let count = read_u32(data, &mut pos)? as usize;
935 let mut metadata = Metadata::new();
936 for _ in 0..count {
937 let key = read_string(data, &mut pos)?;
938 let value = read_metadata_value(data, &mut pos)?;
939 metadata.set(key, value);
940 }
941 Ok(metadata)
942}
943
944fn write_string(buf: &mut Vec<u8>, value: &str) {
945 reddb_file::encode_native_len_prefixed_str(buf, value);
946}
947
948fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
949 reddb_file::encode_native_len_prefixed_bytes(buf, value);
950}
951
952fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
953 use crate::storage::unified::RefTarget;
954
955 match target {
956 RefTarget::TableRow { table, row_id } => {
957 buf.push(0);
958 write_string(buf, table);
959 buf.extend_from_slice(&row_id.to_le_bytes());
960 }
961 RefTarget::Node {
962 collection,
963 node_id,
964 } => {
965 buf.push(1);
966 write_string(buf, collection);
967 buf.extend_from_slice(&node_id.raw().to_le_bytes());
968 }
969 RefTarget::Edge {
970 collection,
971 edge_id,
972 } => {
973 buf.push(2);
974 write_string(buf, collection);
975 buf.extend_from_slice(&edge_id.raw().to_le_bytes());
976 }
977 RefTarget::Vector {
978 collection,
979 vector_id,
980 } => {
981 buf.push(3);
982 write_string(buf, collection);
983 buf.extend_from_slice(&vector_id.raw().to_le_bytes());
984 }
985 RefTarget::Entity {
986 collection,
987 entity_id,
988 } => {
989 buf.push(4);
990 write_string(buf, collection);
991 buf.extend_from_slice(&entity_id.raw().to_le_bytes());
992 }
993 }
994}
995
996fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
997 match value {
998 MetadataValue::Null => buf.push(0),
999 MetadataValue::Bool(v) => {
1000 buf.push(1);
1001 buf.push(u8::from(*v));
1002 }
1003 MetadataValue::Int(v) => {
1004 buf.push(2);
1005 buf.extend_from_slice(&v.to_le_bytes());
1006 }
1007 MetadataValue::Float(v) => {
1008 buf.push(3);
1009 buf.extend_from_slice(&v.to_le_bytes());
1010 }
1011 MetadataValue::String(v) => {
1012 buf.push(4);
1013 write_string(buf, v);
1014 }
1015 MetadataValue::Bytes(v) => {
1016 buf.push(5);
1017 write_bytes(buf, v);
1018 }
1019 MetadataValue::Array(values) => {
1020 buf.push(6);
1021 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1022 for value in values {
1023 write_metadata_value(buf, value);
1024 }
1025 }
1026 MetadataValue::Object(values) => {
1027 buf.push(7);
1028 let mut entries: Vec<_> = values.iter().collect();
1029 entries.sort_by_key(|(a, _)| *a);
1030 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
1031 for (key, value) in entries {
1032 write_string(buf, key);
1033 write_metadata_value(buf, value);
1034 }
1035 }
1036 MetadataValue::Timestamp(v) => {
1037 buf.push(8);
1038 buf.extend_from_slice(&v.to_le_bytes());
1039 }
1040 MetadataValue::Geo { lat, lon } => {
1041 buf.push(9);
1042 buf.extend_from_slice(&lat.to_le_bytes());
1043 buf.extend_from_slice(&lon.to_le_bytes());
1044 }
1045 MetadataValue::Reference(target) => {
1046 buf.push(10);
1047 write_ref_target(buf, target);
1048 }
1049 MetadataValue::References(targets) => {
1050 buf.push(11);
1051 buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
1052 for target in targets {
1053 write_ref_target(buf, target);
1054 }
1055 }
1056 }
1057}
1058
1059fn read_exact_slice<'a>(
1060 data: &'a [u8],
1061 pos: &mut usize,
1062 len: usize,
1063) -> Result<&'a [u8], StoreError> {
1064 if *pos + len > data.len() {
1065 return Err(StoreError::Serialization(
1066 "truncated metadata payload".to_string(),
1067 ));
1068 }
1069 let slice = &data[*pos..*pos + len];
1070 *pos += len;
1071 Ok(slice)
1072}
1073
1074fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
1075 let bytes = read_exact_slice(data, pos, 4)?;
1076 let mut raw = [0u8; 4];
1077 raw.copy_from_slice(bytes);
1078 Ok(u32::from_le_bytes(raw))
1079}
1080
1081fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
1082 let bytes = read_exact_slice(data, pos, 8)?;
1083 let mut raw = [0u8; 8];
1084 raw.copy_from_slice(bytes);
1085 Ok(u64::from_le_bytes(raw))
1086}
1087
1088fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
1089 let bytes = read_exact_slice(data, pos, 8)?;
1090 let mut raw = [0u8; 8];
1091 raw.copy_from_slice(bytes);
1092 Ok(i64::from_le_bytes(raw))
1093}
1094
1095fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
1096 let bytes = read_exact_slice(data, pos, 8)?;
1097 let mut raw = [0u8; 8];
1098 raw.copy_from_slice(bytes);
1099 Ok(f64::from_le_bytes(raw))
1100}
1101
1102fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
1103 let bytes = read_exact_slice(data, pos, 1)?;
1104 Ok(bytes[0])
1105}
1106
1107fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
1108 reddb_file::decode_native_len_prefixed_string(data, pos)
1109 .map_err(|err| StoreError::Serialization(err.to_string()))
1110}
1111
1112fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
1113 reddb_file::decode_native_len_prefixed_bytes(data, pos)
1114 .map(|bytes| bytes.to_vec())
1115 .map_err(|err| StoreError::Serialization(err.to_string()))
1116}
1117
1118fn read_ref_target(
1119 data: &[u8],
1120 pos: &mut usize,
1121) -> Result<crate::storage::unified::RefTarget, StoreError> {
1122 use crate::storage::unified::RefTarget;
1123
1124 match read_u8(data, pos)? {
1125 0 => Ok(RefTarget::TableRow {
1126 table: read_string(data, pos)?,
1127 row_id: read_u64(data, pos)?,
1128 }),
1129 1 => Ok(RefTarget::Node {
1130 collection: read_string(data, pos)?,
1131 node_id: EntityId::new(read_u64(data, pos)?),
1132 }),
1133 2 => Ok(RefTarget::Edge {
1134 collection: read_string(data, pos)?,
1135 edge_id: EntityId::new(read_u64(data, pos)?),
1136 }),
1137 3 => Ok(RefTarget::Vector {
1138 collection: read_string(data, pos)?,
1139 vector_id: EntityId::new(read_u64(data, pos)?),
1140 }),
1141 4 => Ok(RefTarget::Entity {
1142 collection: read_string(data, pos)?,
1143 entity_id: EntityId::new(read_u64(data, pos)?),
1144 }),
1145 tag => Err(StoreError::Serialization(format!(
1146 "unknown metadata ref target tag {tag}"
1147 ))),
1148 }
1149}
1150
1151fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1152 match read_u8(data, pos)? {
1153 0 => Ok(MetadataValue::Null),
1154 1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1155 2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1156 3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1157 4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1158 5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1159 6 => {
1160 let count = read_u32(data, pos)? as usize;
1161 let mut values = Vec::with_capacity(count);
1162 for _ in 0..count {
1163 values.push(read_metadata_value(data, pos)?);
1164 }
1165 Ok(MetadataValue::Array(values))
1166 }
1167 7 => {
1168 let count = read_u32(data, pos)? as usize;
1169 let mut values = std::collections::HashMap::with_capacity(count);
1170 for _ in 0..count {
1171 let key = read_string(data, pos)?;
1172 let value = read_metadata_value(data, pos)?;
1173 values.insert(key, value);
1174 }
1175 Ok(MetadataValue::Object(values))
1176 }
1177 8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1178 9 => Ok(MetadataValue::Geo {
1179 lat: read_f64(data, pos)?,
1180 lon: read_f64(data, pos)?,
1181 }),
1182 10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1183 11 => {
1184 let count = read_u32(data, pos)? as usize;
1185 let mut targets = Vec::with_capacity(count);
1186 for _ in 0..count {
1187 targets.push(read_ref_target(data, pos)?);
1188 }
1189 Ok(MetadataValue::References(targets))
1190 }
1191 tag => Err(StoreError::Serialization(format!(
1192 "unknown metadata value tag {tag}"
1193 ))),
1194 }
1195}