1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5const META_PAGE_CONTENT_CAP: usize =
24 crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE;
25const META_V3_PAGE1_HEADER: usize = reddb_file::METADATA_OVERFLOW_HEADER_BYTES;
26const META_V3_OVERFLOW_HEADER: usize = reddb_file::METADATA_OVERFLOW_CONTINUATION_HEADER_BYTES;
27const META_V3_FIRST_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_PAGE1_HEADER;
28const META_V3_OVERFLOW_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_OVERFLOW_HEADER;
29
30fn free_existing_overflow_chain(pager: &Pager) -> Result<(), PagerError> {
31 let cs = crate::storage::engine::HEADER_SIZE;
32 let page = match pager.read_page(1) {
33 Ok(p) => p,
34 Err(_) => return Ok(()),
35 };
36 let bytes = page.as_bytes();
37 if bytes.len() < cs + META_V3_PAGE1_HEADER {
38 return Ok(());
39 }
40 let Some(header) =
41 reddb_file::decode_native_metadata_overflow_header(&bytes[cs..]).map_err(|err| {
42 PagerError::InvalidDatabase(format!("invalid metadata overflow header: {err}"))
43 })?
44 else {
45 return Ok(());
46 };
47 let mut next = header.next_overflow_page_id;
48 while next != 0 {
49 let ov = match pager.read_page(next) {
50 Ok(p) => p,
51 Err(_) => break,
52 };
53 let ob = ov.as_bytes();
54 let nn = match reddb_file::decode_native_metadata_overflow_continuation_header(&ob[cs..]) {
55 Ok(header) => header.next_overflow_page_id,
56 Err(_) => 0,
57 };
58 let _ = pager.free_page(next);
59 next = nn;
60 }
61 Ok(())
62}
63
64fn build_meta_page1_with_overflow(
65 pager: &Pager,
66 meta_data: &[u8],
67) -> Result<crate::storage::engine::Page, PagerError> {
68 use crate::storage::engine::{Page, PageType, HEADER_SIZE};
69 free_existing_overflow_chain(pager)?;
70
71 let mut page1 = Page::new(PageType::Header, 1);
72 let cs = HEADER_SIZE;
73
74 if meta_data.len() <= META_PAGE_CONTENT_CAP {
75 let buf = page1.as_bytes_mut();
77 buf[cs..cs + meta_data.len()].copy_from_slice(meta_data);
78 return Ok(page1);
79 }
80
81 let first_chunk = &meta_data[..META_V3_FIRST_PAYLOAD_CAP];
85 let mut tail = &meta_data[META_V3_FIRST_PAYLOAD_CAP..];
86 let mut chunks: Vec<&[u8]> = Vec::new();
87 while !tail.is_empty() {
88 let take = tail.len().min(META_V3_OVERFLOW_PAYLOAD_CAP);
89 chunks.push(&tail[..take]);
90 tail = &tail[take..];
91 }
92
93 let mut overflow_pages: Vec<Page> = Vec::with_capacity(chunks.len());
94 let mut overflow_ids: Vec<u32> = Vec::with_capacity(chunks.len());
95 for _ in 0..chunks.len() {
96 let pg = pager.allocate_page(PageType::Overflow)?;
97 overflow_ids.push(pg.page_id());
98 overflow_pages.push(pg);
99 }
100
101 for i in 0..chunks.len() {
102 let next = if i + 1 < chunks.len() {
103 overflow_ids[i + 1]
104 } else {
105 0u32
106 };
107 let len = chunks[i].len() as u32;
108 let buf = overflow_pages[i].as_bytes_mut();
109 reddb_file::encode_native_metadata_overflow_continuation_header(
110 &mut buf[cs..cs + META_V3_OVERFLOW_HEADER],
111 reddb_file::NativeMetadataOverflowContinuationHeader {
112 next_overflow_page_id: next,
113 chunk_bytes: len,
114 },
115 )
116 .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
117 buf[cs + 8..cs + 8 + chunks[i].len()].copy_from_slice(chunks[i]);
118 }
119 for (idx, page) in overflow_pages.into_iter().enumerate() {
120 let id = overflow_ids[idx];
121 pager.write_page(id, page)?;
122 }
123
124 let format_version = reddb_file::decode_native_paged_metadata_header(meta_data)
126 .ok()
127 .flatten()
128 .map_or(0, |header| header.format_version);
129
130 let buf = page1.as_bytes_mut();
131 reddb_file::encode_native_metadata_overflow_header(
132 &mut buf[cs..cs + META_V3_PAGE1_HEADER],
133 reddb_file::NativeMetadataOverflowHeader {
134 format_version,
135 total_payload_bytes: meta_data.len() as u32,
136 next_overflow_page_id: overflow_ids[0],
137 },
138 )
139 .map_err(|err| PagerError::InvalidDatabase(err.to_string()))?;
140 buf[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_chunk.len()]
141 .copy_from_slice(first_chunk);
142
143 Ok(page1)
144}
145
146fn read_meta_payload(pager: &Pager) -> Option<Vec<u8>> {
152 let cs = crate::storage::engine::HEADER_SIZE;
153 let meta_page = pager
154 .read_page(1)
155 .or_else(|_| pager.recover_meta_from_shadow())
156 .ok()?;
157 let bytes = meta_page.as_bytes();
158 if bytes.len() < cs + 4 {
159 return Some(bytes.get(cs..).unwrap_or(&[]).to_vec());
160 }
161 let header = match reddb_file::decode_native_metadata_overflow_header(&bytes[cs..]).ok()? {
162 Some(header) => header,
163 None => {
164 return Some(bytes[cs..].to_vec());
165 }
166 };
167 if bytes.len() < cs + META_V3_PAGE1_HEADER {
168 return None;
169 }
170 let total = header.total_payload_bytes as usize;
171 let mut next = header.next_overflow_page_id;
172 let mut payload: Vec<u8> = Vec::with_capacity(total);
173 let first_take = total.min(META_V3_FIRST_PAYLOAD_CAP);
174 payload.extend_from_slice(
175 &bytes[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_take],
176 );
177 while next != 0 && payload.len() < total {
178 let ov = pager.read_page(next).ok()?;
179 let ob = ov.as_bytes();
180 if ob.len() < cs + META_V3_OVERFLOW_HEADER {
181 return None;
182 }
183 let continuation =
184 reddb_file::decode_native_metadata_overflow_continuation_header(&ob[cs..]).ok()?;
185 let nn = continuation.next_overflow_page_id;
186 let len = continuation.chunk_bytes as usize;
187 let remaining = total - payload.len();
188 let take = len.min(remaining).min(META_V3_OVERFLOW_PAYLOAD_CAP);
189 payload.extend_from_slice(
190 &ob[cs + META_V3_OVERFLOW_HEADER..cs + META_V3_OVERFLOW_HEADER + take],
191 );
192 next = nn;
193 }
194 Some(payload)
195}
196
197impl UnifiedStore {
198 pub(crate) fn mark_paged_registry_dirty(&self) {
199 self.paged_registry_dirty.store(true, Ordering::Release);
200 }
201
202 pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
208 let pager = self.pager.as_ref()?;
209 if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
210 return Some(btree);
211 }
212 let mut write = self.btree_indices.write();
213 let btree = write
214 .entry(collection.to_string())
215 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
216 .clone();
217 Some(btree)
218 }
219
220 pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
221 let Some(pager) = &self.pager else {
222 return Ok(());
223 };
224
225 if self.paged_registry_dirty.load(Ordering::Acquire) {
226 self.flush_paged_registry()?;
227 self.paged_registry_dirty.store(false, Ordering::Release);
228 return Ok(());
229 }
230
231 pager
232 .flush()
233 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
234 }
235
236 pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
237 let Some(pager) = &self.pager else {
238 return Ok(());
239 };
240
241 match pager.read_page(1) {
242 Ok(_) => {}
243 Err(PagerError::PageNotFound(_)) => {
244 let meta_page = pager
245 .allocate_page(crate::storage::engine::PageType::Header)
246 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
247 pager
248 .write_page(meta_page.page_id(), meta_page)
249 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
250 }
251 Err(e) => {
252 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
253 }
254 }
255
256 let format_version = STORE_VERSION_V9;
257 self.set_format_version(format_version);
258
259 let collections = self.collections.read();
260 let btree_indices = self.btree_indices.read();
261 let mut collection_roots = Vec::with_capacity(collections.len());
262 for (name, _) in collections.iter() {
263 let root_page = btree_indices
264 .get(name)
265 .map_or(0, |btree| btree.root_page_id());
266 collection_roots.push((name.clone(), root_page));
267 }
268 drop(btree_indices);
269 drop(collections);
270
271 let mut meta_data = Vec::with_capacity(4096);
272 reddb_file::encode_native_paged_metadata_header(
273 &mut meta_data,
274 reddb_file::NativePagedMetadataHeader {
275 format_version,
276 collection_count: collection_roots.len() as u32,
277 },
278 );
279 for (name, root_page) in &collection_roots {
280 reddb_file::encode_native_paged_collection_root(&mut meta_data, name, *root_page);
281 }
282
283 let cross_refs = self.cross_refs.read();
284 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
285 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
286 for (source_id, refs) in cross_refs.iter() {
287 for (target_id, ref_type, collection) in refs {
288 reddb_file::encode_native_paged_cross_ref(
289 &mut meta_data,
290 source_id.raw(),
291 target_id.raw(),
292 ref_type.to_byte(),
293 collection,
294 );
295 }
296 }
297
298 let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
299 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
300
301 pager
302 .write_meta_shadow(&meta_page)
303 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
304 pager
305 .write_page(1, meta_page)
306 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
307 pager
308 .flush()
309 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
310
311 Ok(())
312 }
313
314 pub fn pager(&self) -> Option<&Arc<Pager>> {
316 self.pager.as_ref()
317 }
318
319 pub fn config(&self) -> &UnifiedStoreConfig {
323 &self.config
324 }
325
326 pub fn with_config(config: UnifiedStoreConfig) -> Self {
327 Self {
328 config,
329 format_version: AtomicU32::new(STORE_VERSION_V9),
330 next_entity_id: AtomicU64::new(1),
331 collections: RwLock::new(HashMap::new()),
332 cross_refs: RwLock::new(HashMap::new()),
333 reverse_refs: RwLock::new(HashMap::new()),
334 pager: None,
335 db_path: None,
336 btree_indices: RwLock::new(HashMap::new()),
337 context_index: ContextIndex::new(),
338 entity_cache: EntityCache::new(),
339 graph_label_index: RwLock::new(HashMap::new()),
340 paged_registry_dirty: AtomicBool::new(false),
341 commit: None,
342 unindex_cross_refs_fast_path: AtomicU64::new(0),
343 replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
344 }
345 }
346
347 pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
365 Self::open_with_config(path, UnifiedStoreConfig::default())
366 }
367
368 pub fn open_with_config(
369 path: impl AsRef<Path>,
370 config: UnifiedStoreConfig,
371 ) -> Result<Self, StoreError> {
372 let path = path.as_ref();
373 let mut pager_config = PagerConfig::default();
374 if matches!(
382 std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
383 Some("0") | Some("false") | Some("off")
384 ) {
385 pager_config.double_write = false;
386 }
387 let pager = Pager::open(path, pager_config)
388 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
389
390 let wal_path = reddb_file::layout::unified_wal_path(path);
391 let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
392 Some(Arc::new(
393 StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
394 .map_err(StoreError::Io)?,
395 ))
396 } else {
397 None
398 };
399
400 let store = Self {
401 config,
402 format_version: AtomicU32::new(STORE_VERSION_V9),
403 next_entity_id: AtomicU64::new(1),
404 collections: RwLock::new(HashMap::new()),
405 cross_refs: RwLock::new(HashMap::new()),
406 reverse_refs: RwLock::new(HashMap::new()),
407 pager: Some(Arc::new(pager)),
408 db_path: Some(path.to_path_buf()),
409 btree_indices: RwLock::new(HashMap::new()),
410 context_index: ContextIndex::new(),
411 entity_cache: EntityCache::new(),
412 graph_label_index: RwLock::new(HashMap::new()),
413 paged_registry_dirty: AtomicBool::new(false),
414 commit,
415 unindex_cross_refs_fast_path: AtomicU64::new(0),
416 replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
417 };
418
419 store.load_from_pages()?;
421 if let Some(commit) = &store.commit {
422 commit.replay_into(&store).map_err(StoreError::Io)?;
423 }
424 store.recover_operational_manifest()?;
425
426 Ok(store)
427 }
428
429 pub(crate) fn recover_operational_manifest(&self) -> Result<(), StoreError> {
430 let Some(path) = &self.db_path else {
431 return Ok(());
432 };
433 let mut collections = self.list_collections();
434 collections.sort();
435 let pending_drops =
436 crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
437 .recover_or_bootstrap(&collections)
438 .map_err(StoreError::Io)?;
439 for name in pending_drops {
440 if self.get_collection(&name).is_some() {
441 self.drop_collection(&name)?;
442 }
443 }
444 Ok(())
445 }
446
447 pub(crate) fn publish_operational_collection_create(
448 &self,
449 name: &str,
450 ) -> Result<(), StoreError> {
451 let Some(path) = &self.db_path else {
452 return Ok(());
453 };
454 crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
455 .create_collection(name)
456 .map_err(StoreError::Io)
457 }
458
459 pub(crate) fn publish_operational_collection_pending_drop(
460 &self,
461 name: &str,
462 ) -> Result<(), StoreError> {
463 let Some(path) = &self.db_path else {
464 return Ok(());
465 };
466 crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
467 .begin_drop_collection(name)
468 .map_err(StoreError::Io)
469 }
470
471 pub(crate) fn publish_operational_collection_drop_finished(
472 &self,
473 name: &str,
474 ) -> Result<(), StoreError> {
475 let Some(path) = &self.db_path else {
476 return Ok(());
477 };
478 crate::storage::operational_manifest::OperationalManifest::for_db_path(path)
479 .finish_drop_collection(name)
480 .map_err(StoreError::Io)
481 }
482
483 fn load_from_pages(&self) -> Result<(), StoreError> {
487 let pager = match &self.pager {
488 Some(p) => p,
489 None => return Ok(()), };
491
492 let page_count = pager.page_count().map_err(|e| {
494 StoreError::Io(std::io::Error::other(format!(
495 "failed to read page count: {}",
496 e
497 )))
498 })?;
499 if page_count <= 1 {
500 return Ok(());
502 }
503
504 if let Some(content_vec) = read_meta_payload(pager) {
509 let content: &[u8] = &content_vec;
510 if content.len() >= 4 {
511 let mut pos = 0;
512 let mut format_version = STORE_VERSION_V1;
513
514 let collection_count = if let Some(header) =
515 reddb_file::decode_native_paged_metadata_header(content)
516 .map_err(|err| StoreError::Serialization(err.to_string()))?
517 {
518 format_version = header.format_version;
519 pos += reddb_file::METADATA_HEADER_BYTES;
520 header.collection_count as usize
521 } else {
522 let count = u32::from_le_bytes([
523 content[pos],
524 content[pos + 1],
525 content[pos + 2],
526 content[pos + 3],
527 ]) as usize;
528 pos += 4;
529 count
530 };
531
532 self.set_format_version(format_version);
533
534 if pos > content.len() {
535 return Ok(());
536 }
537
538 for _ in 0..collection_count {
540 if let Ok(root) =
541 reddb_file::decode_native_paged_collection_root(content, &mut pos)
542 {
543 let root_page = root.root_page;
545 let name = root.collection;
546
547 let _ = self.create_collection_in_memory(&name);
551
552 if root_page > 0 {
554 let btree = BTree::with_root(Arc::clone(pager), root_page);
555
556 if let Ok(mut cursor) = btree.cursor_first() {
558 let manager = self.get_collection(&name);
559 while let Ok(Some((key, value))) = cursor.next() {
560 if let Ok((entity, metadata)) = Self::deserialize_entity_record(
562 &value,
563 self.format_version(),
564 ) {
565 if let Some(m) = &manager {
566 let id = entity.id;
567 if let EntityKind::TableRow { row_id, .. } =
568 &entity.kind
569 {
570 m.register_row_id(*row_id);
571 }
572 self.context_index.index_entity(&name, &entity);
573 let _ = m.insert(entity.clone());
574 if let Some(metadata) = metadata {
575 let _ = m.set_metadata(id, metadata);
576 }
577 self.register_entity_id(id);
578 if self.config.auto_index_refs {
579 self.index_cross_refs(&entity, &name)?;
580 }
581 }
582 }
583 }
584 }
585
586 self.btree_indices.write().insert(name, Arc::new(btree));
588 }
589 } else {
590 break;
591 }
592 }
593
594 if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
595 let cross_ref_count = u32::from_le_bytes([
596 content[pos],
597 content[pos + 1],
598 content[pos + 2],
599 content[pos + 3],
600 ]) as usize;
601 pos += 4;
602
603 for _ in 0..cross_ref_count {
604 let Ok(cross_ref) =
605 reddb_file::decode_native_paged_cross_ref(content, &mut pos)
606 else {
607 break;
608 };
609 let source_id = EntityId::new(cross_ref.source_id);
610 let target_id = EntityId::new(cross_ref.target_id);
611 let ref_type = RefType::from_byte(cross_ref.ref_type);
612 let target_collection = cross_ref.target_collection;
613
614 self.cross_refs.write().entry(source_id).or_default().push((
615 target_id,
616 ref_type,
617 target_collection.clone(),
618 ));
619
620 if let Some((collection, mut entity)) = self.get_any(source_id) {
621 let exists = entity.cross_refs().iter().any(|xref| {
622 xref.target == target_id
623 && xref.ref_type == ref_type
624 && xref.target_collection == target_collection
625 });
626 if !exists {
627 entity.cross_refs_mut().push(CrossRef::new(
628 source_id,
629 target_id,
630 target_collection.clone(),
631 ref_type,
632 ));
633 if let Some(manager) = self.get_collection(&collection) {
634 let _ = manager.update(entity);
635 }
636 }
637 }
638 }
639 }
640 }
641 }
642
643 if self.format_version() < STORE_VERSION_V9 {
644 self.set_format_version(STORE_VERSION_V9);
645 }
646
647 Ok(())
648 }
649
650 pub(crate) fn deserialize_entity(
652 data: &[u8],
653 format_version: u32,
654 ) -> Result<UnifiedEntity, StoreError> {
655 let mut pos = 0;
656 Self::read_entity_binary(data, &mut pos, format_version)
657 .map_err(|e| StoreError::Serialization(e.to_string()))
658 }
659
660 pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
662 let mut buf = Vec::with_capacity(256);
667 Self::write_entity_binary(&mut buf, entity, format_version);
668 buf
669 }
670
671 pub(crate) fn serialize_entity_record(
672 entity: &UnifiedEntity,
673 metadata: Option<&Metadata>,
674 format_version: u32,
675 ) -> Vec<u8> {
676 let entity_bytes = Self::serialize_entity(entity, format_version);
677 let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
683 if has_meta {
684 let metadata_bytes = serialize_metadata(metadata);
685 reddb_file::encode_native_entity_record_frame(&entity_bytes, Some(&metadata_bytes))
686 } else {
687 reddb_file::encode_native_entity_record_frame(&entity_bytes, None)
688 }
689 }
690
691 pub(crate) fn deserialize_entity_record(
692 data: &[u8],
693 format_version: u32,
694 ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
695 let Some(frame) = reddb_file::decode_native_entity_record_frame(data)
696 .map_err(|err| StoreError::Serialization(err.to_string()))?
697 else {
698 return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
699 };
700
701 let entity = Self::deserialize_entity(frame.entity, format_version)?;
702 let metadata = if frame.metadata.is_empty() {
703 None
704 } else {
705 let metadata = deserialize_metadata(frame.metadata)?;
706 if metadata.is_empty() {
707 None
708 } else {
709 Some(metadata)
710 }
711 };
712
713 Ok((entity, metadata))
714 }
715
716 pub fn persist(&self) -> Result<(), StoreError> {
721 let pager = match &self.pager {
722 Some(p) => p,
723 None => {
724 if let Some(path) = &self.db_path {
726 return self
727 .save_to_file(path)
728 .map_err(|e| StoreError::Serialization(e.to_string()));
729 }
730 return Err(StoreError::Io(std::io::Error::other(
731 "No pager or path configured for persistence",
732 )));
733 }
734 };
735
736 match pager.read_page(1) {
737 Ok(_) => {}
738 Err(PagerError::PageNotFound(_)) => {
739 let meta_page = pager
740 .allocate_page(crate::storage::engine::PageType::Header)
741 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
742 pager
743 .write_page(meta_page.page_id(), meta_page)
744 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
745 }
746 Err(e) => {
747 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
748 }
749 }
750
751 if let Some(commit) = &self.commit {
752 commit.force_sync().map_err(StoreError::Io)?;
753 }
754
755 let collections = self.collections.read();
756 let mut btree_indices = self.btree_indices.write();
757
758 let mut collection_roots: Vec<(String, u32)> = Vec::new();
760
761 for (name, manager) in collections.iter() {
764 let btree = btree_indices
765 .entry(name.clone())
766 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
767
768 let mut existing_keys = Vec::new();
769 if !btree.is_empty() {
770 let mut cursor = btree.cursor_first().map_err(|e| {
771 StoreError::Io(std::io::Error::other(format!(
772 "B-tree cursor error while rebuilding '{name}': {e}"
773 )))
774 })?;
775 while let Some((key, _)) = cursor.next().map_err(|e| {
776 StoreError::Io(std::io::Error::other(format!(
777 "B-tree scan error while rebuilding '{name}': {e}"
778 )))
779 })? {
780 existing_keys.push(key);
781 }
782 }
783
784 for key in existing_keys {
785 btree.delete(&key).map_err(|e| {
786 StoreError::Io(std::io::Error::other(format!(
787 "B-tree delete error while rebuilding '{name}': {e}"
788 )))
789 })?;
790 }
791
792 let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
793 .query_all(|_| true)
794 .into_iter()
795 .map(|entity| {
796 let metadata = manager.get_metadata(entity.id);
797 (
798 entity.id.raw().to_be_bytes().to_vec(),
799 Self::serialize_entity_record(
800 &entity,
801 metadata.as_ref(),
802 self.format_version(),
803 ),
804 )
805 })
806 .collect();
807 records.sort_by(|left, right| left.0.cmp(&right.0));
808
809 if !records.is_empty() {
817 btree.bulk_insert_sorted(&records).map_err(|e| {
818 StoreError::Io(std::io::Error::other(format!(
819 "B-tree bulk rebuild error for '{name}': {e}"
820 )))
821 })?;
822 }
823
824 collection_roots.push((name.clone(), btree.root_page_id()));
825 }
826
827 let mut meta_data = Vec::with_capacity(4096);
829
830 let format_version = STORE_VERSION_V9;
831 self.set_format_version(format_version);
832
833 reddb_file::encode_native_paged_metadata_header(
834 &mut meta_data,
835 reddb_file::NativePagedMetadataHeader {
836 format_version,
837 collection_count: collection_roots.len() as u32,
838 },
839 );
840
841 for (name, root_page) in &collection_roots {
843 reddb_file::encode_native_paged_collection_root(&mut meta_data, name, *root_page);
844 }
845
846 let cross_refs = self.cross_refs.read();
848 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
849 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
850 for (source_id, refs) in cross_refs.iter() {
851 for (target_id, ref_type, collection) in refs {
852 reddb_file::encode_native_paged_cross_ref(
853 &mut meta_data,
854 source_id.raw(),
855 target_id.raw(),
856 ref_type.to_byte(),
857 collection,
858 );
859 }
860 }
861
862 let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
864 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
865
866 pager
869 .write_meta_shadow(&meta_page)
870 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
871
872 pager
874 .write_page(1, meta_page)
875 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
876
877 pager
879 .sync()
880 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
881
882 if let Some(commit) = &self.commit {
883 commit.truncate().map_err(StoreError::Io)?;
884 }
885
886 Ok(())
887 }
888
889 pub fn is_paged(&self) -> bool {
891 self.pager.is_some()
892 }
893
894 pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
897 self.btree_indices
898 .read()
899 .get(collection)
900 .map(|btree| btree.root_page_id())
901 .filter(|root| *root != 0)
902 }
903
904 pub fn db_path(&self) -> Option<&Path> {
906 self.db_path.as_deref()
907 }
908}
909
910fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
911 let Some(metadata) = metadata else {
912 return Vec::new();
913 };
914 if metadata.is_empty() {
915 return Vec::new();
916 }
917
918 let mut entries: Vec<_> = metadata.iter().collect();
919 entries.sort_by_key(|(a, _)| *a);
920
921 let mut buf = Vec::new();
922 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
923 for (key, value) in entries {
924 write_string(&mut buf, key);
925 write_metadata_value(&mut buf, value);
926 }
927 buf
928}
929
930fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
931 let mut pos = 0usize;
932 let count = read_u32(data, &mut pos)? as usize;
933 let mut metadata = Metadata::new();
934 for _ in 0..count {
935 let key = read_string(data, &mut pos)?;
936 let value = read_metadata_value(data, &mut pos)?;
937 metadata.set(key, value);
938 }
939 Ok(metadata)
940}
941
942fn write_string(buf: &mut Vec<u8>, value: &str) {
943 reddb_file::encode_native_len_prefixed_str(buf, value);
944}
945
946fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
947 reddb_file::encode_native_len_prefixed_bytes(buf, value);
948}
949
950fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
951 use crate::storage::unified::RefTarget;
952
953 match target {
954 RefTarget::TableRow { table, row_id } => {
955 buf.push(0);
956 write_string(buf, table);
957 buf.extend_from_slice(&row_id.to_le_bytes());
958 }
959 RefTarget::Node {
960 collection,
961 node_id,
962 } => {
963 buf.push(1);
964 write_string(buf, collection);
965 buf.extend_from_slice(&node_id.raw().to_le_bytes());
966 }
967 RefTarget::Edge {
968 collection,
969 edge_id,
970 } => {
971 buf.push(2);
972 write_string(buf, collection);
973 buf.extend_from_slice(&edge_id.raw().to_le_bytes());
974 }
975 RefTarget::Vector {
976 collection,
977 vector_id,
978 } => {
979 buf.push(3);
980 write_string(buf, collection);
981 buf.extend_from_slice(&vector_id.raw().to_le_bytes());
982 }
983 RefTarget::Entity {
984 collection,
985 entity_id,
986 } => {
987 buf.push(4);
988 write_string(buf, collection);
989 buf.extend_from_slice(&entity_id.raw().to_le_bytes());
990 }
991 }
992}
993
994fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
995 match value {
996 MetadataValue::Null => buf.push(0),
997 MetadataValue::Bool(v) => {
998 buf.push(1);
999 buf.push(u8::from(*v));
1000 }
1001 MetadataValue::Int(v) => {
1002 buf.push(2);
1003 buf.extend_from_slice(&v.to_le_bytes());
1004 }
1005 MetadataValue::Float(v) => {
1006 buf.push(3);
1007 buf.extend_from_slice(&v.to_le_bytes());
1008 }
1009 MetadataValue::String(v) => {
1010 buf.push(4);
1011 write_string(buf, v);
1012 }
1013 MetadataValue::Bytes(v) => {
1014 buf.push(5);
1015 write_bytes(buf, v);
1016 }
1017 MetadataValue::Array(values) => {
1018 buf.push(6);
1019 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1020 for value in values {
1021 write_metadata_value(buf, value);
1022 }
1023 }
1024 MetadataValue::Object(values) => {
1025 buf.push(7);
1026 let mut entries: Vec<_> = values.iter().collect();
1027 entries.sort_by_key(|(a, _)| *a);
1028 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
1029 for (key, value) in entries {
1030 write_string(buf, key);
1031 write_metadata_value(buf, value);
1032 }
1033 }
1034 MetadataValue::Timestamp(v) => {
1035 buf.push(8);
1036 buf.extend_from_slice(&v.to_le_bytes());
1037 }
1038 MetadataValue::Geo { lat, lon } => {
1039 buf.push(9);
1040 buf.extend_from_slice(&lat.to_le_bytes());
1041 buf.extend_from_slice(&lon.to_le_bytes());
1042 }
1043 MetadataValue::Reference(target) => {
1044 buf.push(10);
1045 write_ref_target(buf, target);
1046 }
1047 MetadataValue::References(targets) => {
1048 buf.push(11);
1049 buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
1050 for target in targets {
1051 write_ref_target(buf, target);
1052 }
1053 }
1054 }
1055}
1056
1057fn read_exact_slice<'a>(
1058 data: &'a [u8],
1059 pos: &mut usize,
1060 len: usize,
1061) -> Result<&'a [u8], StoreError> {
1062 if *pos + len > data.len() {
1063 return Err(StoreError::Serialization(
1064 "truncated metadata payload".to_string(),
1065 ));
1066 }
1067 let slice = &data[*pos..*pos + len];
1068 *pos += len;
1069 Ok(slice)
1070}
1071
1072fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
1073 let bytes = read_exact_slice(data, pos, 4)?;
1074 let mut raw = [0u8; 4];
1075 raw.copy_from_slice(bytes);
1076 Ok(u32::from_le_bytes(raw))
1077}
1078
1079fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
1080 let bytes = read_exact_slice(data, pos, 8)?;
1081 let mut raw = [0u8; 8];
1082 raw.copy_from_slice(bytes);
1083 Ok(u64::from_le_bytes(raw))
1084}
1085
1086fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
1087 let bytes = read_exact_slice(data, pos, 8)?;
1088 let mut raw = [0u8; 8];
1089 raw.copy_from_slice(bytes);
1090 Ok(i64::from_le_bytes(raw))
1091}
1092
1093fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
1094 let bytes = read_exact_slice(data, pos, 8)?;
1095 let mut raw = [0u8; 8];
1096 raw.copy_from_slice(bytes);
1097 Ok(f64::from_le_bytes(raw))
1098}
1099
1100fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
1101 let bytes = read_exact_slice(data, pos, 1)?;
1102 Ok(bytes[0])
1103}
1104
1105fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
1106 reddb_file::decode_native_len_prefixed_string(data, pos)
1107 .map_err(|err| StoreError::Serialization(err.to_string()))
1108}
1109
1110fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
1111 reddb_file::decode_native_len_prefixed_bytes(data, pos)
1112 .map(|bytes| bytes.to_vec())
1113 .map_err(|err| StoreError::Serialization(err.to_string()))
1114}
1115
1116fn read_ref_target(
1117 data: &[u8],
1118 pos: &mut usize,
1119) -> Result<crate::storage::unified::RefTarget, StoreError> {
1120 use crate::storage::unified::RefTarget;
1121
1122 match read_u8(data, pos)? {
1123 0 => Ok(RefTarget::TableRow {
1124 table: read_string(data, pos)?,
1125 row_id: read_u64(data, pos)?,
1126 }),
1127 1 => Ok(RefTarget::Node {
1128 collection: read_string(data, pos)?,
1129 node_id: EntityId::new(read_u64(data, pos)?),
1130 }),
1131 2 => Ok(RefTarget::Edge {
1132 collection: read_string(data, pos)?,
1133 edge_id: EntityId::new(read_u64(data, pos)?),
1134 }),
1135 3 => Ok(RefTarget::Vector {
1136 collection: read_string(data, pos)?,
1137 vector_id: EntityId::new(read_u64(data, pos)?),
1138 }),
1139 4 => Ok(RefTarget::Entity {
1140 collection: read_string(data, pos)?,
1141 entity_id: EntityId::new(read_u64(data, pos)?),
1142 }),
1143 tag => Err(StoreError::Serialization(format!(
1144 "unknown metadata ref target tag {tag}"
1145 ))),
1146 }
1147}
1148
1149fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1150 match read_u8(data, pos)? {
1151 0 => Ok(MetadataValue::Null),
1152 1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1153 2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1154 3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1155 4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1156 5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1157 6 => {
1158 let count = read_u32(data, pos)? as usize;
1159 let mut values = Vec::with_capacity(count);
1160 for _ in 0..count {
1161 values.push(read_metadata_value(data, pos)?);
1162 }
1163 Ok(MetadataValue::Array(values))
1164 }
1165 7 => {
1166 let count = read_u32(data, pos)? as usize;
1167 let mut values = std::collections::HashMap::with_capacity(count);
1168 for _ in 0..count {
1169 let key = read_string(data, pos)?;
1170 let value = read_metadata_value(data, pos)?;
1171 values.insert(key, value);
1172 }
1173 Ok(MetadataValue::Object(values))
1174 }
1175 8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1176 9 => Ok(MetadataValue::Geo {
1177 lat: read_f64(data, pos)?,
1178 lon: read_f64(data, pos)?,
1179 }),
1180 10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1181 11 => {
1182 let count = read_u32(data, pos)? as usize;
1183 let mut targets = Vec::with_capacity(count);
1184 for _ in 0..count {
1185 targets.push(read_ref_target(data, pos)?);
1186 }
1187 Ok(MetadataValue::References(targets))
1188 }
1189 tag => Err(StoreError::Serialization(format!(
1190 "unknown metadata value tag {tag}"
1191 ))),
1192 }
1193}