1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5const ENTITY_RECORD_MAGIC: &[u8; 4] = b"RER1";
6
7const METADATA_OVERFLOW_MAGIC: &[u8; 4] = b"RDM3";
26const META_PAGE_CONTENT_CAP: usize =
27 crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE;
28const META_V3_PAGE1_HEADER: usize = 16;
29const META_V3_OVERFLOW_HEADER: usize = 8;
30const META_V3_FIRST_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_PAGE1_HEADER;
31const META_V3_OVERFLOW_PAYLOAD_CAP: usize = META_PAGE_CONTENT_CAP - META_V3_OVERFLOW_HEADER;
32
33fn free_existing_overflow_chain(pager: &Pager) -> Result<(), PagerError> {
34 let cs = crate::storage::engine::HEADER_SIZE;
35 let page = match pager.read_page(1) {
36 Ok(p) => p,
37 Err(_) => return Ok(()),
38 };
39 let bytes = page.as_bytes();
40 if bytes.len() < cs + META_V3_PAGE1_HEADER {
41 return Ok(());
42 }
43 if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
44 return Ok(());
45 }
46 let mut next = u32::from_le_bytes([
47 bytes[cs + 12],
48 bytes[cs + 13],
49 bytes[cs + 14],
50 bytes[cs + 15],
51 ]);
52 while next != 0 {
53 let ov = match pager.read_page(next) {
54 Ok(p) => p,
55 Err(_) => break,
56 };
57 let ob = ov.as_bytes();
58 let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
59 let _ = pager.free_page(next);
60 next = nn;
61 }
62 Ok(())
63}
64
65fn build_meta_page1_with_overflow(
66 pager: &Pager,
67 meta_data: &[u8],
68) -> Result<crate::storage::engine::Page, PagerError> {
69 use crate::storage::engine::{Page, PageType, HEADER_SIZE};
70 free_existing_overflow_chain(pager)?;
71
72 let mut page1 = Page::new(PageType::Header, 1);
73 let cs = HEADER_SIZE;
74
75 if meta_data.len() <= META_PAGE_CONTENT_CAP {
76 let buf = page1.as_bytes_mut();
78 buf[cs..cs + meta_data.len()].copy_from_slice(meta_data);
79 return Ok(page1);
80 }
81
82 let first_chunk = &meta_data[..META_V3_FIRST_PAYLOAD_CAP];
86 let mut tail = &meta_data[META_V3_FIRST_PAYLOAD_CAP..];
87 let mut chunks: Vec<&[u8]> = Vec::new();
88 while !tail.is_empty() {
89 let take = tail.len().min(META_V3_OVERFLOW_PAYLOAD_CAP);
90 chunks.push(&tail[..take]);
91 tail = &tail[take..];
92 }
93
94 let mut overflow_pages: Vec<Page> = Vec::with_capacity(chunks.len());
95 let mut overflow_ids: Vec<u32> = Vec::with_capacity(chunks.len());
96 for _ in 0..chunks.len() {
97 let pg = pager.allocate_page(PageType::Overflow)?;
98 overflow_ids.push(pg.page_id());
99 overflow_pages.push(pg);
100 }
101
102 for i in 0..chunks.len() {
103 let next = if i + 1 < chunks.len() {
104 overflow_ids[i + 1]
105 } else {
106 0u32
107 };
108 let len = chunks[i].len() as u32;
109 let buf = overflow_pages[i].as_bytes_mut();
110 buf[cs..cs + 4].copy_from_slice(&next.to_le_bytes());
111 buf[cs + 4..cs + 8].copy_from_slice(&len.to_le_bytes());
112 buf[cs + 8..cs + 8 + chunks[i].len()].copy_from_slice(chunks[i]);
113 }
114 for (idx, page) in overflow_pages.into_iter().enumerate() {
115 let id = overflow_ids[idx];
116 pager.write_page(id, page)?;
117 }
118
119 let format_version = if meta_data.len() >= 8 && &meta_data[0..4] == METADATA_MAGIC {
121 u32::from_le_bytes([meta_data[4], meta_data[5], meta_data[6], meta_data[7]])
122 } else {
123 0
124 };
125
126 let buf = page1.as_bytes_mut();
127 buf[cs..cs + 4].copy_from_slice(METADATA_OVERFLOW_MAGIC);
128 buf[cs + 4..cs + 8].copy_from_slice(&format_version.to_le_bytes());
129 buf[cs + 8..cs + 12].copy_from_slice(&(meta_data.len() as u32).to_le_bytes());
130 buf[cs + 12..cs + 16].copy_from_slice(&overflow_ids[0].to_le_bytes());
131 buf[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_chunk.len()]
132 .copy_from_slice(first_chunk);
133
134 Ok(page1)
135}
136
137fn read_meta_payload(pager: &Pager) -> Option<Vec<u8>> {
143 let cs = crate::storage::engine::HEADER_SIZE;
144 let meta_page = pager
145 .read_page(1)
146 .or_else(|_| pager.recover_meta_from_shadow())
147 .ok()?;
148 let bytes = meta_page.as_bytes();
149 if bytes.len() < cs + 4 {
150 return Some(bytes.get(cs..).unwrap_or(&[]).to_vec());
151 }
152 if &bytes[cs..cs + 4] != METADATA_OVERFLOW_MAGIC {
153 return Some(bytes[cs..].to_vec());
154 }
155 if bytes.len() < cs + META_V3_PAGE1_HEADER {
156 return None;
157 }
158 let total =
159 u32::from_le_bytes([bytes[cs + 8], bytes[cs + 9], bytes[cs + 10], bytes[cs + 11]]) as usize;
160 let mut next = u32::from_le_bytes([
161 bytes[cs + 12],
162 bytes[cs + 13],
163 bytes[cs + 14],
164 bytes[cs + 15],
165 ]);
166 let mut payload: Vec<u8> = Vec::with_capacity(total);
167 let first_take = total.min(META_V3_FIRST_PAYLOAD_CAP);
168 payload.extend_from_slice(
169 &bytes[cs + META_V3_PAGE1_HEADER..cs + META_V3_PAGE1_HEADER + first_take],
170 );
171 while next != 0 && payload.len() < total {
172 let ov = pager.read_page(next).ok()?;
173 let ob = ov.as_bytes();
174 if ob.len() < cs + META_V3_OVERFLOW_HEADER {
175 return None;
176 }
177 let nn = u32::from_le_bytes([ob[cs], ob[cs + 1], ob[cs + 2], ob[cs + 3]]);
178 let len = u32::from_le_bytes([ob[cs + 4], ob[cs + 5], ob[cs + 6], ob[cs + 7]]) as usize;
179 let remaining = total - payload.len();
180 let take = len.min(remaining).min(META_V3_OVERFLOW_PAYLOAD_CAP);
181 payload.extend_from_slice(
182 &ob[cs + META_V3_OVERFLOW_HEADER..cs + META_V3_OVERFLOW_HEADER + take],
183 );
184 next = nn;
185 }
186 Some(payload)
187}
188
189impl UnifiedStore {
190 pub(crate) fn mark_paged_registry_dirty(&self) {
191 self.paged_registry_dirty.store(true, Ordering::Release);
192 }
193
194 pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
200 let pager = self.pager.as_ref()?;
201 if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
202 return Some(btree);
203 }
204 let mut write = self.btree_indices.write();
205 let btree = write
206 .entry(collection.to_string())
207 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
208 .clone();
209 Some(btree)
210 }
211
212 pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
213 let Some(pager) = &self.pager else {
214 return Ok(());
215 };
216
217 if self.paged_registry_dirty.load(Ordering::Acquire) {
218 self.flush_paged_registry()?;
219 self.paged_registry_dirty.store(false, Ordering::Release);
220 return Ok(());
221 }
222
223 pager
224 .flush()
225 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
226 }
227
228 pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
229 let Some(pager) = &self.pager else {
230 return Ok(());
231 };
232
233 match pager.read_page(1) {
234 Ok(_) => {}
235 Err(PagerError::PageNotFound(_)) => {
236 let meta_page = pager
237 .allocate_page(crate::storage::engine::PageType::Header)
238 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
239 pager
240 .write_page(meta_page.page_id(), meta_page)
241 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
242 }
243 Err(e) => {
244 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
245 }
246 }
247
248 let format_version = STORE_VERSION_V9;
249 self.set_format_version(format_version);
250
251 let collections = self.collections.read();
252 let btree_indices = self.btree_indices.read();
253 let mut collection_roots = Vec::with_capacity(collections.len());
254 for (name, _) in collections.iter() {
255 let root_page = btree_indices
256 .get(name)
257 .map_or(0, |btree| btree.root_page_id());
258 collection_roots.push((name.clone(), root_page));
259 }
260 drop(btree_indices);
261 drop(collections);
262
263 let mut meta_data = Vec::with_capacity(4096);
264 meta_data.extend_from_slice(METADATA_MAGIC);
265 meta_data.extend_from_slice(&format_version.to_le_bytes());
266 meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
267 for (name, root_page) in &collection_roots {
268 meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
269 meta_data.extend_from_slice(name.as_bytes());
270 meta_data.extend_from_slice(&root_page.to_le_bytes());
271 }
272
273 let cross_refs = self.cross_refs.read();
274 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
275 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
276 for (source_id, refs) in cross_refs.iter() {
277 for (target_id, ref_type, collection) in refs {
278 meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
279 meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
280 meta_data.push(ref_type.to_byte());
281 meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
282 meta_data.extend_from_slice(collection.as_bytes());
283 }
284 }
285
286 let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
287 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
288
289 pager
290 .write_meta_shadow(&meta_page)
291 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
292 pager
293 .write_page(1, meta_page)
294 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
295 pager
296 .flush()
297 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
298
299 Ok(())
300 }
301
302 pub fn pager(&self) -> Option<&Arc<Pager>> {
304 self.pager.as_ref()
305 }
306
307 pub fn config(&self) -> &UnifiedStoreConfig {
311 &self.config
312 }
313
314 pub fn with_config(config: UnifiedStoreConfig) -> Self {
315 Self {
316 config,
317 format_version: AtomicU32::new(STORE_VERSION_V9),
318 next_entity_id: AtomicU64::new(1),
319 collections: RwLock::new(HashMap::new()),
320 cross_refs: RwLock::new(HashMap::new()),
321 reverse_refs: RwLock::new(HashMap::new()),
322 pager: None,
323 db_path: None,
324 btree_indices: RwLock::new(HashMap::new()),
325 context_index: ContextIndex::new(),
326 entity_cache: EntityCache::new(),
327 graph_label_index: RwLock::new(HashMap::new()),
328 paged_registry_dirty: AtomicBool::new(false),
329 commit: None,
330 unindex_cross_refs_fast_path: AtomicU64::new(0),
331 replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
332 }
333 }
334
335 pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
353 Self::open_with_config(path, UnifiedStoreConfig::default())
354 }
355
356 pub fn open_with_config(
357 path: impl AsRef<Path>,
358 config: UnifiedStoreConfig,
359 ) -> Result<Self, StoreError> {
360 let path = path.as_ref();
361 let mut pager_config = PagerConfig::default();
362 if matches!(
370 std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
371 Some("0") | Some("false") | Some("off")
372 ) {
373 pager_config.double_write = false;
374 }
375 let pager = Pager::open(path, pager_config)
376 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
377
378 let wal_path = Self::wal_path_for_db(path);
379 let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
380 Some(Arc::new(
381 StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
382 .map_err(StoreError::Io)?,
383 ))
384 } else {
385 None
386 };
387
388 let store = Self {
389 config,
390 format_version: AtomicU32::new(STORE_VERSION_V9),
391 next_entity_id: AtomicU64::new(1),
392 collections: RwLock::new(HashMap::new()),
393 cross_refs: RwLock::new(HashMap::new()),
394 reverse_refs: RwLock::new(HashMap::new()),
395 pager: Some(Arc::new(pager)),
396 db_path: Some(path.to_path_buf()),
397 btree_indices: RwLock::new(HashMap::new()),
398 context_index: ContextIndex::new(),
399 entity_cache: EntityCache::new(),
400 graph_label_index: RwLock::new(HashMap::new()),
401 paged_registry_dirty: AtomicBool::new(false),
402 commit,
403 unindex_cross_refs_fast_path: AtomicU64::new(0),
404 replayed_turbo_inserts: parking_lot::Mutex::new(HashMap::new()),
405 };
406
407 store.load_from_pages()?;
409 if let Some(commit) = &store.commit {
410 commit.replay_into(&store).map_err(StoreError::Io)?;
411 }
412
413 Ok(store)
414 }
415
416 fn load_from_pages(&self) -> Result<(), StoreError> {
420 let pager = match &self.pager {
421 Some(p) => p,
422 None => return Ok(()), };
424
425 let page_count = pager.page_count().map_err(|e| {
427 StoreError::Io(std::io::Error::other(format!(
428 "failed to read page count: {}",
429 e
430 )))
431 })?;
432 if page_count <= 1 {
433 return Ok(());
435 }
436
437 if let Some(content_vec) = read_meta_payload(pager) {
442 let content: &[u8] = &content_vec;
443 if content.len() >= 4 {
444 let mut pos = 0;
445 let mut format_version = STORE_VERSION_V1;
446
447 if content.len() >= 8 && &content[0..4] == METADATA_MAGIC {
448 format_version =
449 u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
450 pos += 8;
451 }
452
453 self.set_format_version(format_version);
454
455 let collection_count = u32::from_le_bytes([
457 content[pos],
458 content[pos + 1],
459 content[pos + 2],
460 content[pos + 3],
461 ]) as usize;
462 pos += 4;
463
464 for _ in 0..collection_count {
466 if pos + 4 > content.len() {
467 break;
468 }
469
470 let name_len = u32::from_le_bytes([
471 content[pos],
472 content[pos + 1],
473 content[pos + 2],
474 content[pos + 3],
475 ]) as usize;
476 pos += 4;
477
478 if pos + name_len + 4 > content.len() {
479 break;
480 }
481
482 if let Ok(name) = String::from_utf8(content[pos..pos + name_len].to_vec()) {
483 pos += name_len;
484
485 let root_page = u32::from_le_bytes([
487 content[pos],
488 content[pos + 1],
489 content[pos + 2],
490 content[pos + 3],
491 ]);
492 pos += 4;
493
494 let _ = self.create_collection_in_memory(&name);
498
499 if root_page > 0 {
501 let btree = BTree::with_root(Arc::clone(pager), root_page);
502
503 if let Ok(mut cursor) = btree.cursor_first() {
505 let manager = self.get_collection(&name);
506 while let Ok(Some((key, value))) = cursor.next() {
507 if let Ok((entity, metadata)) = Self::deserialize_entity_record(
509 &value,
510 self.format_version(),
511 ) {
512 if let Some(m) = &manager {
513 let id = entity.id;
514 if let EntityKind::TableRow { row_id, .. } =
515 &entity.kind
516 {
517 m.register_row_id(*row_id);
518 }
519 self.context_index.index_entity(&name, &entity);
520 let _ = m.insert(entity.clone());
521 if let Some(metadata) = metadata {
522 let _ = m.set_metadata(id, metadata);
523 }
524 self.register_entity_id(id);
525 if self.config.auto_index_refs {
526 self.index_cross_refs(&entity, &name)?;
527 }
528 }
529 }
530 }
531 }
532
533 self.btree_indices.write().insert(name, Arc::new(btree));
535 }
536 } else {
537 pos += name_len + 4;
538 }
539 }
540
541 if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
542 let cross_ref_count = u32::from_le_bytes([
543 content[pos],
544 content[pos + 1],
545 content[pos + 2],
546 content[pos + 3],
547 ]) as usize;
548 pos += 4;
549
550 for _ in 0..cross_ref_count {
551 if pos + 17 > content.len() {
552 break;
553 }
554 let source_id = u64::from_le_bytes([
555 content[pos],
556 content[pos + 1],
557 content[pos + 2],
558 content[pos + 3],
559 content[pos + 4],
560 content[pos + 5],
561 content[pos + 6],
562 content[pos + 7],
563 ]);
564 pos += 8;
565 let target_id = u64::from_le_bytes([
566 content[pos],
567 content[pos + 1],
568 content[pos + 2],
569 content[pos + 3],
570 content[pos + 4],
571 content[pos + 5],
572 content[pos + 6],
573 content[pos + 7],
574 ]);
575 pos += 8;
576 let ref_type = RefType::from_byte(content[pos]);
577 pos += 1;
578
579 if pos + 4 > content.len() {
580 break;
581 }
582 let name_len = u32::from_le_bytes([
583 content[pos],
584 content[pos + 1],
585 content[pos + 2],
586 content[pos + 3],
587 ]) as usize;
588 pos += 4;
589 if pos + name_len > content.len() {
590 break;
591 }
592 let target_collection =
593 String::from_utf8_lossy(&content[pos..pos + name_len]).to_string();
594 pos += name_len;
595
596 let source_id = EntityId::new(source_id);
597 let target_id = EntityId::new(target_id);
598
599 self.cross_refs.write().entry(source_id).or_default().push((
600 target_id,
601 ref_type,
602 target_collection.clone(),
603 ));
604
605 if let Some((collection, mut entity)) = self.get_any(source_id) {
606 let exists = entity.cross_refs().iter().any(|xref| {
607 xref.target == target_id
608 && xref.ref_type == ref_type
609 && xref.target_collection == target_collection
610 });
611 if !exists {
612 entity.cross_refs_mut().push(CrossRef::new(
613 source_id,
614 target_id,
615 target_collection.clone(),
616 ref_type,
617 ));
618 if let Some(manager) = self.get_collection(&collection) {
619 let _ = manager.update(entity);
620 }
621 }
622 }
623 }
624 }
625 }
626 }
627
628 if self.format_version() < STORE_VERSION_V9 {
629 self.set_format_version(STORE_VERSION_V9);
630 }
631
632 Ok(())
633 }
634
635 pub(crate) fn deserialize_entity(
637 data: &[u8],
638 format_version: u32,
639 ) -> Result<UnifiedEntity, StoreError> {
640 let mut pos = 0;
641 Self::read_entity_binary(data, &mut pos, format_version)
642 .map_err(|e| StoreError::Serialization(e.to_string()))
643 }
644
645 pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
647 let mut buf = Vec::with_capacity(256);
652 Self::write_entity_binary(&mut buf, entity, format_version);
653 buf
654 }
655
656 pub(crate) fn serialize_entity_record(
657 entity: &UnifiedEntity,
658 metadata: Option<&Metadata>,
659 format_version: u32,
660 ) -> Vec<u8> {
661 let entity_bytes = Self::serialize_entity(entity, format_version);
662 let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
668 if has_meta {
669 let metadata_bytes = serialize_metadata(metadata);
670 let mut buf = Vec::with_capacity(12 + entity_bytes.len() + metadata_bytes.len());
671 buf.extend_from_slice(ENTITY_RECORD_MAGIC);
672 buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
673 buf.extend_from_slice(&entity_bytes);
674 buf.extend_from_slice(&(metadata_bytes.len() as u32).to_le_bytes());
675 buf.extend_from_slice(&metadata_bytes);
676 buf
677 } else {
678 let mut buf = Vec::with_capacity(12 + entity_bytes.len());
679 buf.extend_from_slice(ENTITY_RECORD_MAGIC);
680 buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
681 buf.extend_from_slice(&entity_bytes);
682 buf.extend_from_slice(&0u32.to_le_bytes());
683 buf
684 }
685 }
686
687 pub(crate) fn deserialize_entity_record(
688 data: &[u8],
689 format_version: u32,
690 ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
691 if data.len() < 8 || &data[..4] != ENTITY_RECORD_MAGIC {
692 return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
693 }
694
695 let mut pos = 4usize;
696 let entity_len = read_u32(data, &mut pos)? as usize;
697 if pos + entity_len > data.len() {
698 return Err(StoreError::Serialization(
699 "truncated entity record payload".to_string(),
700 ));
701 }
702 let entity = Self::deserialize_entity(&data[pos..pos + entity_len], format_version)?;
703 pos += entity_len;
704
705 let metadata_len = read_u32(data, &mut pos)? as usize;
706 if pos + metadata_len > data.len() {
707 return Err(StoreError::Serialization(
708 "truncated entity record metadata".to_string(),
709 ));
710 }
711 let metadata = if metadata_len == 0 {
712 None
713 } else {
714 let metadata = deserialize_metadata(&data[pos..pos + metadata_len])?;
715 if metadata.is_empty() {
716 None
717 } else {
718 Some(metadata)
719 }
720 };
721
722 Ok((entity, metadata))
723 }
724
725 pub fn persist(&self) -> Result<(), StoreError> {
730 let pager = match &self.pager {
731 Some(p) => p,
732 None => {
733 if let Some(path) = &self.db_path {
735 return self
736 .save_to_file(path)
737 .map_err(|e| StoreError::Serialization(e.to_string()));
738 }
739 return Err(StoreError::Io(std::io::Error::other(
740 "No pager or path configured for persistence",
741 )));
742 }
743 };
744
745 match pager.read_page(1) {
746 Ok(_) => {}
747 Err(PagerError::PageNotFound(_)) => {
748 let meta_page = pager
749 .allocate_page(crate::storage::engine::PageType::Header)
750 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
751 pager
752 .write_page(meta_page.page_id(), meta_page)
753 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
754 }
755 Err(e) => {
756 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
757 }
758 }
759
760 if let Some(commit) = &self.commit {
761 commit.force_sync().map_err(StoreError::Io)?;
762 }
763
764 let collections = self.collections.read();
765 let mut btree_indices = self.btree_indices.write();
766
767 let mut collection_roots: Vec<(String, u32)> = Vec::new();
769
770 for (name, manager) in collections.iter() {
773 let btree = btree_indices
774 .entry(name.clone())
775 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
776
777 let mut existing_keys = Vec::new();
778 if !btree.is_empty() {
779 let mut cursor = btree.cursor_first().map_err(|e| {
780 StoreError::Io(std::io::Error::other(format!(
781 "B-tree cursor error while rebuilding '{name}': {e}"
782 )))
783 })?;
784 while let Some((key, _)) = cursor.next().map_err(|e| {
785 StoreError::Io(std::io::Error::other(format!(
786 "B-tree scan error while rebuilding '{name}': {e}"
787 )))
788 })? {
789 existing_keys.push(key);
790 }
791 }
792
793 for key in existing_keys {
794 btree.delete(&key).map_err(|e| {
795 StoreError::Io(std::io::Error::other(format!(
796 "B-tree delete error while rebuilding '{name}': {e}"
797 )))
798 })?;
799 }
800
801 let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
802 .query_all(|_| true)
803 .into_iter()
804 .map(|entity| {
805 let metadata = manager.get_metadata(entity.id);
806 (
807 entity.id.raw().to_be_bytes().to_vec(),
808 Self::serialize_entity_record(
809 &entity,
810 metadata.as_ref(),
811 self.format_version(),
812 ),
813 )
814 })
815 .collect();
816 records.sort_by(|left, right| left.0.cmp(&right.0));
817
818 if !records.is_empty() {
826 btree.bulk_insert_sorted(&records).map_err(|e| {
827 StoreError::Io(std::io::Error::other(format!(
828 "B-tree bulk rebuild error for '{name}': {e}"
829 )))
830 })?;
831 }
832
833 collection_roots.push((name.clone(), btree.root_page_id()));
834 }
835
836 let mut meta_data = Vec::with_capacity(4096);
838
839 let format_version = STORE_VERSION_V9;
840 self.set_format_version(format_version);
841
842 meta_data.extend_from_slice(METADATA_MAGIC);
844 meta_data.extend_from_slice(&format_version.to_le_bytes());
845 meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
846
847 for (name, root_page) in &collection_roots {
849 meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
851 meta_data.extend_from_slice(name.as_bytes());
853 meta_data.extend_from_slice(&root_page.to_le_bytes());
855 }
856
857 let cross_refs = self.cross_refs.read();
859 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
860 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
861 for (source_id, refs) in cross_refs.iter() {
862 for (target_id, ref_type, collection) in refs {
863 meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
864 meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
865 meta_data.push(ref_type.to_byte());
866 meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
867 meta_data.extend_from_slice(collection.as_bytes());
868 }
869 }
870
871 let meta_page = build_meta_page1_with_overflow(pager, &meta_data)
873 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
874
875 pager
878 .write_meta_shadow(&meta_page)
879 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
880
881 pager
883 .write_page(1, meta_page)
884 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
885
886 pager
888 .sync()
889 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
890
891 if let Some(commit) = &self.commit {
892 commit.truncate().map_err(StoreError::Io)?;
893 }
894
895 Ok(())
896 }
897
898 pub fn is_paged(&self) -> bool {
900 self.pager.is_some()
901 }
902
903 pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
906 self.btree_indices
907 .read()
908 .get(collection)
909 .map(|btree| btree.root_page_id())
910 .filter(|root| *root != 0)
911 }
912
913 pub fn db_path(&self) -> Option<&Path> {
915 self.db_path.as_deref()
916 }
917}
918
919fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
920 let Some(metadata) = metadata else {
921 return Vec::new();
922 };
923 if metadata.is_empty() {
924 return Vec::new();
925 }
926
927 let mut entries: Vec<_> = metadata.iter().collect();
928 entries.sort_by_key(|(a, _)| *a);
929
930 let mut buf = Vec::new();
931 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
932 for (key, value) in entries {
933 write_string(&mut buf, key);
934 write_metadata_value(&mut buf, value);
935 }
936 buf
937}
938
939fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
940 let mut pos = 0usize;
941 let count = read_u32(data, &mut pos)? as usize;
942 let mut metadata = Metadata::new();
943 for _ in 0..count {
944 let key = read_string(data, &mut pos)?;
945 let value = read_metadata_value(data, &mut pos)?;
946 metadata.set(key, value);
947 }
948 Ok(metadata)
949}
950
951fn write_string(buf: &mut Vec<u8>, value: &str) {
952 buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
953 buf.extend_from_slice(value.as_bytes());
954}
955
956fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
957 buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
958 buf.extend_from_slice(value);
959}
960
961fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
962 use crate::storage::unified::RefTarget;
963
964 match target {
965 RefTarget::TableRow { table, row_id } => {
966 buf.push(0);
967 write_string(buf, table);
968 buf.extend_from_slice(&row_id.to_le_bytes());
969 }
970 RefTarget::Node {
971 collection,
972 node_id,
973 } => {
974 buf.push(1);
975 write_string(buf, collection);
976 buf.extend_from_slice(&node_id.raw().to_le_bytes());
977 }
978 RefTarget::Edge {
979 collection,
980 edge_id,
981 } => {
982 buf.push(2);
983 write_string(buf, collection);
984 buf.extend_from_slice(&edge_id.raw().to_le_bytes());
985 }
986 RefTarget::Vector {
987 collection,
988 vector_id,
989 } => {
990 buf.push(3);
991 write_string(buf, collection);
992 buf.extend_from_slice(&vector_id.raw().to_le_bytes());
993 }
994 RefTarget::Entity {
995 collection,
996 entity_id,
997 } => {
998 buf.push(4);
999 write_string(buf, collection);
1000 buf.extend_from_slice(&entity_id.raw().to_le_bytes());
1001 }
1002 }
1003}
1004
1005fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
1006 match value {
1007 MetadataValue::Null => buf.push(0),
1008 MetadataValue::Bool(v) => {
1009 buf.push(1);
1010 buf.push(u8::from(*v));
1011 }
1012 MetadataValue::Int(v) => {
1013 buf.push(2);
1014 buf.extend_from_slice(&v.to_le_bytes());
1015 }
1016 MetadataValue::Float(v) => {
1017 buf.push(3);
1018 buf.extend_from_slice(&v.to_le_bytes());
1019 }
1020 MetadataValue::String(v) => {
1021 buf.push(4);
1022 write_string(buf, v);
1023 }
1024 MetadataValue::Bytes(v) => {
1025 buf.push(5);
1026 write_bytes(buf, v);
1027 }
1028 MetadataValue::Array(values) => {
1029 buf.push(6);
1030 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
1031 for value in values {
1032 write_metadata_value(buf, value);
1033 }
1034 }
1035 MetadataValue::Object(values) => {
1036 buf.push(7);
1037 let mut entries: Vec<_> = values.iter().collect();
1038 entries.sort_by_key(|(a, _)| *a);
1039 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
1040 for (key, value) in entries {
1041 write_string(buf, key);
1042 write_metadata_value(buf, value);
1043 }
1044 }
1045 MetadataValue::Timestamp(v) => {
1046 buf.push(8);
1047 buf.extend_from_slice(&v.to_le_bytes());
1048 }
1049 MetadataValue::Geo { lat, lon } => {
1050 buf.push(9);
1051 buf.extend_from_slice(&lat.to_le_bytes());
1052 buf.extend_from_slice(&lon.to_le_bytes());
1053 }
1054 MetadataValue::Reference(target) => {
1055 buf.push(10);
1056 write_ref_target(buf, target);
1057 }
1058 MetadataValue::References(targets) => {
1059 buf.push(11);
1060 buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
1061 for target in targets {
1062 write_ref_target(buf, target);
1063 }
1064 }
1065 }
1066}
1067
1068fn read_exact_slice<'a>(
1069 data: &'a [u8],
1070 pos: &mut usize,
1071 len: usize,
1072) -> Result<&'a [u8], StoreError> {
1073 if *pos + len > data.len() {
1074 return Err(StoreError::Serialization(
1075 "truncated metadata payload".to_string(),
1076 ));
1077 }
1078 let slice = &data[*pos..*pos + len];
1079 *pos += len;
1080 Ok(slice)
1081}
1082
1083fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
1084 let bytes = read_exact_slice(data, pos, 4)?;
1085 let mut raw = [0u8; 4];
1086 raw.copy_from_slice(bytes);
1087 Ok(u32::from_le_bytes(raw))
1088}
1089
1090fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
1091 let bytes = read_exact_slice(data, pos, 8)?;
1092 let mut raw = [0u8; 8];
1093 raw.copy_from_slice(bytes);
1094 Ok(u64::from_le_bytes(raw))
1095}
1096
1097fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
1098 let bytes = read_exact_slice(data, pos, 8)?;
1099 let mut raw = [0u8; 8];
1100 raw.copy_from_slice(bytes);
1101 Ok(i64::from_le_bytes(raw))
1102}
1103
1104fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
1105 let bytes = read_exact_slice(data, pos, 8)?;
1106 let mut raw = [0u8; 8];
1107 raw.copy_from_slice(bytes);
1108 Ok(f64::from_le_bytes(raw))
1109}
1110
1111fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
1112 let bytes = read_exact_slice(data, pos, 1)?;
1113 Ok(bytes[0])
1114}
1115
1116fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
1117 let len = read_u32(data, pos)? as usize;
1118 let bytes = read_exact_slice(data, pos, len)?;
1119 String::from_utf8(bytes.to_vec()).map_err(|err| StoreError::Serialization(err.to_string()))
1120}
1121
1122fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
1123 let len = read_u32(data, pos)? as usize;
1124 Ok(read_exact_slice(data, pos, len)?.to_vec())
1125}
1126
1127fn read_ref_target(
1128 data: &[u8],
1129 pos: &mut usize,
1130) -> Result<crate::storage::unified::RefTarget, StoreError> {
1131 use crate::storage::unified::RefTarget;
1132
1133 match read_u8(data, pos)? {
1134 0 => Ok(RefTarget::TableRow {
1135 table: read_string(data, pos)?,
1136 row_id: read_u64(data, pos)?,
1137 }),
1138 1 => Ok(RefTarget::Node {
1139 collection: read_string(data, pos)?,
1140 node_id: EntityId::new(read_u64(data, pos)?),
1141 }),
1142 2 => Ok(RefTarget::Edge {
1143 collection: read_string(data, pos)?,
1144 edge_id: EntityId::new(read_u64(data, pos)?),
1145 }),
1146 3 => Ok(RefTarget::Vector {
1147 collection: read_string(data, pos)?,
1148 vector_id: EntityId::new(read_u64(data, pos)?),
1149 }),
1150 4 => Ok(RefTarget::Entity {
1151 collection: read_string(data, pos)?,
1152 entity_id: EntityId::new(read_u64(data, pos)?),
1153 }),
1154 tag => Err(StoreError::Serialization(format!(
1155 "unknown metadata ref target tag {tag}"
1156 ))),
1157 }
1158}
1159
1160fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1161 match read_u8(data, pos)? {
1162 0 => Ok(MetadataValue::Null),
1163 1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1164 2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1165 3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1166 4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1167 5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1168 6 => {
1169 let count = read_u32(data, pos)? as usize;
1170 let mut values = Vec::with_capacity(count);
1171 for _ in 0..count {
1172 values.push(read_metadata_value(data, pos)?);
1173 }
1174 Ok(MetadataValue::Array(values))
1175 }
1176 7 => {
1177 let count = read_u32(data, pos)? as usize;
1178 let mut values = std::collections::HashMap::with_capacity(count);
1179 for _ in 0..count {
1180 let key = read_string(data, pos)?;
1181 let value = read_metadata_value(data, pos)?;
1182 values.insert(key, value);
1183 }
1184 Ok(MetadataValue::Object(values))
1185 }
1186 8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1187 9 => Ok(MetadataValue::Geo {
1188 lat: read_f64(data, pos)?,
1189 lon: read_f64(data, pos)?,
1190 }),
1191 10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1192 11 => {
1193 let count = read_u32(data, pos)? as usize;
1194 let mut targets = Vec::with_capacity(count);
1195 for _ in 0..count {
1196 targets.push(read_ref_target(data, pos)?);
1197 }
1198 Ok(MetadataValue::References(targets))
1199 }
1200 tag => Err(StoreError::Serialization(format!(
1201 "unknown metadata value tag {tag}"
1202 ))),
1203 }
1204}