1use super::*;
2use crate::storage::unified::entity_cache::EntityCache;
3use parking_lot::RwLock;
4
5const ENTITY_RECORD_MAGIC: &[u8; 4] = b"RER1";
6
7impl UnifiedStore {
8 pub(crate) fn mark_paged_registry_dirty(&self) {
9 self.paged_registry_dirty.store(true, Ordering::Release);
10 }
11
12 pub(crate) fn get_or_create_btree(&self, collection: &str) -> Option<Arc<BTree>> {
18 let pager = self.pager.as_ref()?;
19 if let Some(btree) = self.btree_indices.read().get(collection).cloned() {
20 return Some(btree);
21 }
22 let mut write = self.btree_indices.write();
23 let btree = write
24 .entry(collection.to_string())
25 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))))
26 .clone();
27 Some(btree)
28 }
29
30 pub(crate) fn flush_paged_state(&self) -> Result<(), StoreError> {
31 let Some(pager) = &self.pager else {
32 return Ok(());
33 };
34
35 if self.paged_registry_dirty.load(Ordering::Acquire) {
36 self.flush_paged_registry()?;
37 self.paged_registry_dirty.store(false, Ordering::Release);
38 return Ok(());
39 }
40
41 pager
42 .flush()
43 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))
44 }
45
46 pub(crate) fn flush_paged_registry(&self) -> Result<(), StoreError> {
47 let Some(pager) = &self.pager else {
48 return Ok(());
49 };
50
51 match pager.read_page(1) {
52 Ok(_) => {}
53 Err(PagerError::PageNotFound(_)) => {
54 let meta_page = pager
55 .allocate_page(crate::storage::engine::PageType::Header)
56 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
57 pager
58 .write_page(meta_page.page_id(), meta_page)
59 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
60 }
61 Err(e) => {
62 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
63 }
64 }
65
66 let format_version = STORE_VERSION_V9;
67 self.set_format_version(format_version);
68
69 let collections = self.collections.read();
70 let btree_indices = self.btree_indices.read();
71 let mut collection_roots = Vec::with_capacity(collections.len());
72 for (name, _) in collections.iter() {
73 let root_page = btree_indices
74 .get(name)
75 .map_or(0, |btree| btree.root_page_id());
76 collection_roots.push((name.clone(), root_page));
77 }
78 drop(btree_indices);
79 drop(collections);
80
81 let mut meta_data = Vec::with_capacity(4096);
82 meta_data.extend_from_slice(METADATA_MAGIC);
83 meta_data.extend_from_slice(&format_version.to_le_bytes());
84 meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
85 for (name, root_page) in &collection_roots {
86 meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
87 meta_data.extend_from_slice(name.as_bytes());
88 meta_data.extend_from_slice(&root_page.to_le_bytes());
89 }
90
91 let cross_refs = self.cross_refs.read();
92 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
93 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
94 for (source_id, refs) in cross_refs.iter() {
95 for (target_id, ref_type, collection) in refs {
96 meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
97 meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
98 meta_data.push(ref_type.to_byte());
99 meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
100 meta_data.extend_from_slice(collection.as_bytes());
101 }
102 }
103
104 let mut meta_page =
105 crate::storage::engine::Page::new(crate::storage::engine::PageType::Header, 1);
106 let page_data = meta_page.as_bytes_mut();
107 let content_start = crate::storage::engine::HEADER_SIZE;
108 let copy_len = meta_data.len().min(page_data.len() - content_start);
109 page_data[content_start..content_start + copy_len].copy_from_slice(&meta_data[..copy_len]);
110
111 pager
112 .write_meta_shadow(&meta_page)
113 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
114 pager
115 .write_page(1, meta_page)
116 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
117 pager
118 .flush()
119 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
120
121 Ok(())
122 }
123
124 pub fn pager(&self) -> Option<&Arc<Pager>> {
126 self.pager.as_ref()
127 }
128
129 pub fn config(&self) -> &UnifiedStoreConfig {
133 &self.config
134 }
135
136 pub fn with_config(config: UnifiedStoreConfig) -> Self {
137 Self {
138 config,
139 format_version: AtomicU32::new(STORE_VERSION_V9),
140 next_entity_id: AtomicU64::new(1),
141 collections: RwLock::new(HashMap::new()),
142 cross_refs: RwLock::new(HashMap::new()),
143 reverse_refs: RwLock::new(HashMap::new()),
144 pager: None,
145 db_path: None,
146 btree_indices: RwLock::new(HashMap::new()),
147 context_index: ContextIndex::new(),
148 entity_cache: EntityCache::new(),
149 graph_label_index: RwLock::new(HashMap::new()),
150 paged_registry_dirty: AtomicBool::new(false),
151 commit: None,
152 unindex_cross_refs_fast_path: AtomicU64::new(0),
153 }
154 }
155
156 pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
174 Self::open_with_config(path, UnifiedStoreConfig::default())
175 }
176
177 pub fn open_with_config(
178 path: impl AsRef<Path>,
179 config: UnifiedStoreConfig,
180 ) -> Result<Self, StoreError> {
181 let path = path.as_ref();
182 let mut pager_config = PagerConfig::default();
183 if matches!(
191 std::env::var("REDDB_DOUBLE_WRITE").ok().as_deref(),
192 Some("0") | Some("false") | Some("off")
193 ) {
194 pager_config.double_write = false;
195 }
196 let pager = Pager::open(path, pager_config)
197 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
198
199 let wal_path = Self::wal_path_for_db(path);
200 let commit = if StoreCommitCoordinator::should_open(&wal_path, config.durability_mode) {
201 Some(Arc::new(
202 StoreCommitCoordinator::open(wal_path, config.durability_mode, config.group_commit)
203 .map_err(StoreError::Io)?,
204 ))
205 } else {
206 None
207 };
208
209 let store = Self {
210 config,
211 format_version: AtomicU32::new(STORE_VERSION_V9),
212 next_entity_id: AtomicU64::new(1),
213 collections: RwLock::new(HashMap::new()),
214 cross_refs: RwLock::new(HashMap::new()),
215 reverse_refs: RwLock::new(HashMap::new()),
216 pager: Some(Arc::new(pager)),
217 db_path: Some(path.to_path_buf()),
218 btree_indices: RwLock::new(HashMap::new()),
219 context_index: ContextIndex::new(),
220 entity_cache: EntityCache::new(),
221 graph_label_index: RwLock::new(HashMap::new()),
222 paged_registry_dirty: AtomicBool::new(false),
223 commit,
224 unindex_cross_refs_fast_path: AtomicU64::new(0),
225 };
226
227 store.load_from_pages()?;
229 if let Some(commit) = &store.commit {
230 commit.replay_into(&store).map_err(StoreError::Io)?;
231 }
232
233 Ok(store)
234 }
235
236 fn load_from_pages(&self) -> Result<(), StoreError> {
240 let pager = match &self.pager {
241 Some(p) => p,
242 None => return Ok(()), };
244
245 let page_count = pager.page_count().map_err(|e| {
247 StoreError::Io(std::io::Error::other(format!(
248 "failed to read page count: {}",
249 e
250 )))
251 })?;
252 if page_count <= 1 {
253 return Ok(());
255 }
256
257 let meta_page_result = pager
260 .read_page(1)
261 .or_else(|_| pager.recover_meta_from_shadow());
262 if let Ok(meta_page) = meta_page_result {
263 let data = meta_page.as_bytes();
264 let content = &data[crate::storage::engine::HEADER_SIZE..];
266 if content.len() >= 4 {
267 let mut pos = 0;
268 let mut format_version = STORE_VERSION_V1;
269
270 if content.len() >= 8 && &content[0..4] == METADATA_MAGIC {
271 format_version =
272 u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
273 pos += 8;
274 }
275
276 self.set_format_version(format_version);
277
278 let collection_count = u32::from_le_bytes([
280 content[pos],
281 content[pos + 1],
282 content[pos + 2],
283 content[pos + 3],
284 ]) as usize;
285 pos += 4;
286
287 for _ in 0..collection_count {
289 if pos + 4 > content.len() {
290 break;
291 }
292
293 let name_len = u32::from_le_bytes([
294 content[pos],
295 content[pos + 1],
296 content[pos + 2],
297 content[pos + 3],
298 ]) as usize;
299 pos += 4;
300
301 if pos + name_len + 4 > content.len() {
302 break;
303 }
304
305 if let Ok(name) = String::from_utf8(content[pos..pos + name_len].to_vec()) {
306 pos += name_len;
307
308 let root_page = u32::from_le_bytes([
310 content[pos],
311 content[pos + 1],
312 content[pos + 2],
313 content[pos + 3],
314 ]);
315 pos += 4;
316
317 let _ = self.create_collection_in_memory(&name);
321
322 if root_page > 0 {
324 let btree = BTree::with_root(Arc::clone(pager), root_page);
325
326 if let Ok(mut cursor) = btree.cursor_first() {
328 let manager = self.get_collection(&name);
329 while let Ok(Some((key, value))) = cursor.next() {
330 if let Ok((entity, metadata)) = Self::deserialize_entity_record(
332 &value,
333 self.format_version(),
334 ) {
335 if let Some(m) = &manager {
336 let id = entity.id;
337 if let EntityKind::TableRow { row_id, .. } =
338 &entity.kind
339 {
340 m.register_row_id(*row_id);
341 }
342 self.context_index.index_entity(&name, &entity);
343 let _ = m.insert(entity.clone());
344 if let Some(metadata) = metadata {
345 let _ = m.set_metadata(id, metadata);
346 }
347 self.register_entity_id(id);
348 if self.config.auto_index_refs {
349 self.index_cross_refs(&entity, &name)?;
350 }
351 }
352 }
353 }
354 }
355
356 self.btree_indices.write().insert(name, Arc::new(btree));
358 }
359 } else {
360 pos += name_len + 4;
361 }
362 }
363
364 if format_version >= STORE_VERSION_V2 && pos + 4 <= content.len() {
365 let cross_ref_count = u32::from_le_bytes([
366 content[pos],
367 content[pos + 1],
368 content[pos + 2],
369 content[pos + 3],
370 ]) as usize;
371 pos += 4;
372
373 for _ in 0..cross_ref_count {
374 if pos + 17 > content.len() {
375 break;
376 }
377 let source_id = u64::from_le_bytes([
378 content[pos],
379 content[pos + 1],
380 content[pos + 2],
381 content[pos + 3],
382 content[pos + 4],
383 content[pos + 5],
384 content[pos + 6],
385 content[pos + 7],
386 ]);
387 pos += 8;
388 let target_id = u64::from_le_bytes([
389 content[pos],
390 content[pos + 1],
391 content[pos + 2],
392 content[pos + 3],
393 content[pos + 4],
394 content[pos + 5],
395 content[pos + 6],
396 content[pos + 7],
397 ]);
398 pos += 8;
399 let ref_type = RefType::from_byte(content[pos]);
400 pos += 1;
401
402 if pos + 4 > content.len() {
403 break;
404 }
405 let name_len = u32::from_le_bytes([
406 content[pos],
407 content[pos + 1],
408 content[pos + 2],
409 content[pos + 3],
410 ]) as usize;
411 pos += 4;
412 if pos + name_len > content.len() {
413 break;
414 }
415 let target_collection =
416 String::from_utf8_lossy(&content[pos..pos + name_len]).to_string();
417 pos += name_len;
418
419 let source_id = EntityId::new(source_id);
420 let target_id = EntityId::new(target_id);
421
422 self.cross_refs.write().entry(source_id).or_default().push((
423 target_id,
424 ref_type,
425 target_collection.clone(),
426 ));
427
428 if let Some((collection, mut entity)) = self.get_any(source_id) {
429 let exists = entity.cross_refs().iter().any(|xref| {
430 xref.target == target_id
431 && xref.ref_type == ref_type
432 && xref.target_collection == target_collection
433 });
434 if !exists {
435 entity.cross_refs_mut().push(CrossRef::new(
436 source_id,
437 target_id,
438 target_collection.clone(),
439 ref_type,
440 ));
441 if let Some(manager) = self.get_collection(&collection) {
442 let _ = manager.update(entity);
443 }
444 }
445 }
446 }
447 }
448 }
449 }
450
451 if self.format_version() < STORE_VERSION_V9 {
452 self.set_format_version(STORE_VERSION_V9);
453 }
454
455 Ok(())
456 }
457
458 pub(crate) fn deserialize_entity(
460 data: &[u8],
461 format_version: u32,
462 ) -> Result<UnifiedEntity, StoreError> {
463 let mut pos = 0;
464 Self::read_entity_binary(data, &mut pos, format_version)
465 .map_err(|e| StoreError::Serialization(e.to_string()))
466 }
467
468 pub(crate) fn serialize_entity(entity: &UnifiedEntity, format_version: u32) -> Vec<u8> {
470 let mut buf = Vec::with_capacity(256);
475 Self::write_entity_binary(&mut buf, entity, format_version);
476 buf
477 }
478
479 pub(crate) fn serialize_entity_record(
480 entity: &UnifiedEntity,
481 metadata: Option<&Metadata>,
482 format_version: u32,
483 ) -> Vec<u8> {
484 let entity_bytes = Self::serialize_entity(entity, format_version);
485 let has_meta = matches!(metadata, Some(m) if !m.fields.is_empty());
491 if has_meta {
492 let metadata_bytes = serialize_metadata(metadata);
493 let mut buf = Vec::with_capacity(12 + entity_bytes.len() + metadata_bytes.len());
494 buf.extend_from_slice(ENTITY_RECORD_MAGIC);
495 buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
496 buf.extend_from_slice(&entity_bytes);
497 buf.extend_from_slice(&(metadata_bytes.len() as u32).to_le_bytes());
498 buf.extend_from_slice(&metadata_bytes);
499 buf
500 } else {
501 let mut buf = Vec::with_capacity(12 + entity_bytes.len());
502 buf.extend_from_slice(ENTITY_RECORD_MAGIC);
503 buf.extend_from_slice(&(entity_bytes.len() as u32).to_le_bytes());
504 buf.extend_from_slice(&entity_bytes);
505 buf.extend_from_slice(&0u32.to_le_bytes());
506 buf
507 }
508 }
509
510 pub(crate) fn deserialize_entity_record(
511 data: &[u8],
512 format_version: u32,
513 ) -> Result<(UnifiedEntity, Option<Metadata>), StoreError> {
514 if data.len() < 8 || &data[..4] != ENTITY_RECORD_MAGIC {
515 return Self::deserialize_entity(data, format_version).map(|entity| (entity, None));
516 }
517
518 let mut pos = 4usize;
519 let entity_len = read_u32(data, &mut pos)? as usize;
520 if pos + entity_len > data.len() {
521 return Err(StoreError::Serialization(
522 "truncated entity record payload".to_string(),
523 ));
524 }
525 let entity = Self::deserialize_entity(&data[pos..pos + entity_len], format_version)?;
526 pos += entity_len;
527
528 let metadata_len = read_u32(data, &mut pos)? as usize;
529 if pos + metadata_len > data.len() {
530 return Err(StoreError::Serialization(
531 "truncated entity record metadata".to_string(),
532 ));
533 }
534 let metadata = if metadata_len == 0 {
535 None
536 } else {
537 let metadata = deserialize_metadata(&data[pos..pos + metadata_len])?;
538 if metadata.is_empty() {
539 None
540 } else {
541 Some(metadata)
542 }
543 };
544
545 Ok((entity, metadata))
546 }
547
548 pub fn persist(&self) -> Result<(), StoreError> {
553 let pager = match &self.pager {
554 Some(p) => p,
555 None => {
556 if let Some(path) = &self.db_path {
558 return self
559 .save_to_file(path)
560 .map_err(|e| StoreError::Serialization(e.to_string()));
561 }
562 return Err(StoreError::Io(std::io::Error::other(
563 "No pager or path configured for persistence",
564 )));
565 }
566 };
567
568 match pager.read_page(1) {
569 Ok(_) => {}
570 Err(PagerError::PageNotFound(_)) => {
571 let meta_page = pager
572 .allocate_page(crate::storage::engine::PageType::Header)
573 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
574 pager
575 .write_page(meta_page.page_id(), meta_page)
576 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
577 }
578 Err(e) => {
579 return Err(StoreError::Io(std::io::Error::other(e.to_string())));
580 }
581 }
582
583 if let Some(commit) = &self.commit {
584 commit.force_sync().map_err(StoreError::Io)?;
585 }
586
587 let collections = self.collections.read();
588 let mut btree_indices = self.btree_indices.write();
589
590 let mut collection_roots: Vec<(String, u32)> = Vec::new();
592
593 for (name, manager) in collections.iter() {
596 let btree = btree_indices
597 .entry(name.clone())
598 .or_insert_with(|| Arc::new(BTree::new(Arc::clone(pager))));
599
600 let mut existing_keys = Vec::new();
601 if !btree.is_empty() {
602 let mut cursor = btree.cursor_first().map_err(|e| {
603 StoreError::Io(std::io::Error::other(format!(
604 "B-tree cursor error while rebuilding '{name}': {e}"
605 )))
606 })?;
607 while let Some((key, _)) = cursor.next().map_err(|e| {
608 StoreError::Io(std::io::Error::other(format!(
609 "B-tree scan error while rebuilding '{name}': {e}"
610 )))
611 })? {
612 existing_keys.push(key);
613 }
614 }
615
616 for key in existing_keys {
617 btree.delete(&key).map_err(|e| {
618 StoreError::Io(std::io::Error::other(format!(
619 "B-tree delete error while rebuilding '{name}': {e}"
620 )))
621 })?;
622 }
623
624 let mut records: Vec<(Vec<u8>, Vec<u8>)> = manager
625 .query_all(|_| true)
626 .into_iter()
627 .map(|entity| {
628 let metadata = manager.get_metadata(entity.id);
629 (
630 entity.id.raw().to_be_bytes().to_vec(),
631 Self::serialize_entity_record(
632 &entity,
633 metadata.as_ref(),
634 self.format_version(),
635 ),
636 )
637 })
638 .collect();
639 records.sort_by(|left, right| left.0.cmp(&right.0));
640
641 let max_value_size = crate::storage::engine::btree::MAX_VALUE_SIZE;
653 let original_len = records.len();
654 records.retain(|(_, value)| {
655 if value.len() > max_value_size {
656 tracing::warn!(
660 collection = %reddb_wire::audit_safe_log_field(name),
661 bytes = value.len(),
662 max = max_value_size,
663 "skipping oversized row during B-tree bulk rebuild"
664 );
665 false
666 } else {
667 true
668 }
669 });
670 let dropped = original_len - records.len();
671 if dropped > 0 {
672 let safe_name = format!("{}", reddb_wire::audit_safe_log_field(name));
676 tracing::warn!(
677 collection = %safe_name,
678 dropped,
679 "dropped {dropped} oversized row(s) from '{safe_name}' on rebuild — \
680 the rows remain readable via the in-memory entity store but \
681 are absent from the on-disk B-tree until they are rewritten"
682 );
683 }
684
685 if !records.is_empty() {
686 btree.bulk_insert_sorted(&records).map_err(|e| {
687 StoreError::Io(std::io::Error::other(format!(
688 "B-tree bulk rebuild error for '{name}': {e}"
689 )))
690 })?;
691 }
692
693 collection_roots.push((name.clone(), btree.root_page_id()));
694 }
695
696 let mut meta_data = Vec::with_capacity(4096);
698
699 let format_version = STORE_VERSION_V9;
700 self.set_format_version(format_version);
701
702 meta_data.extend_from_slice(METADATA_MAGIC);
704 meta_data.extend_from_slice(&format_version.to_le_bytes());
705 meta_data.extend_from_slice(&(collection_roots.len() as u32).to_le_bytes());
706
707 for (name, root_page) in &collection_roots {
709 meta_data.extend_from_slice(&(name.len() as u32).to_le_bytes());
711 meta_data.extend_from_slice(name.as_bytes());
713 meta_data.extend_from_slice(&root_page.to_le_bytes());
715 }
716
717 let cross_refs = self.cross_refs.read();
719 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
720 meta_data.extend_from_slice(&(total_refs as u32).to_le_bytes());
721 for (source_id, refs) in cross_refs.iter() {
722 for (target_id, ref_type, collection) in refs {
723 meta_data.extend_from_slice(&source_id.raw().to_le_bytes());
724 meta_data.extend_from_slice(&target_id.raw().to_le_bytes());
725 meta_data.push(ref_type.to_byte());
726 meta_data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
727 meta_data.extend_from_slice(collection.as_bytes());
728 }
729 }
730
731 let mut meta_page = crate::storage::engine::Page::new(
733 crate::storage::engine::PageType::Header,
734 1, );
736 let page_data = meta_page.as_bytes_mut();
738 let content_start = crate::storage::engine::HEADER_SIZE;
739 let copy_len = meta_data.len().min(page_data.len() - content_start);
740 page_data[content_start..content_start + copy_len].copy_from_slice(&meta_data[..copy_len]);
741
742 pager
744 .write_meta_shadow(&meta_page)
745 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
746
747 pager
749 .write_page(1, meta_page)
750 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
751
752 pager
754 .sync()
755 .map_err(|e| StoreError::Io(std::io::Error::other(e.to_string())))?;
756
757 if let Some(commit) = &self.commit {
758 commit.truncate().map_err(StoreError::Io)?;
759 }
760
761 Ok(())
762 }
763
764 pub fn is_paged(&self) -> bool {
766 self.pager.is_some()
767 }
768
769 pub(crate) fn collection_root_page(&self, collection: &str) -> Option<u32> {
772 self.btree_indices
773 .read()
774 .get(collection)
775 .map(|btree| btree.root_page_id())
776 .filter(|root| *root != 0)
777 }
778
779 pub fn db_path(&self) -> Option<&Path> {
781 self.db_path.as_deref()
782 }
783}
784
785fn serialize_metadata(metadata: Option<&Metadata>) -> Vec<u8> {
786 let Some(metadata) = metadata else {
787 return Vec::new();
788 };
789 if metadata.is_empty() {
790 return Vec::new();
791 }
792
793 let mut entries: Vec<_> = metadata.iter().collect();
794 entries.sort_by_key(|(a, _)| *a);
795
796 let mut buf = Vec::new();
797 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
798 for (key, value) in entries {
799 write_string(&mut buf, key);
800 write_metadata_value(&mut buf, value);
801 }
802 buf
803}
804
805fn deserialize_metadata(data: &[u8]) -> Result<Metadata, StoreError> {
806 let mut pos = 0usize;
807 let count = read_u32(data, &mut pos)? as usize;
808 let mut metadata = Metadata::new();
809 for _ in 0..count {
810 let key = read_string(data, &mut pos)?;
811 let value = read_metadata_value(data, &mut pos)?;
812 metadata.set(key, value);
813 }
814 Ok(metadata)
815}
816
817fn write_string(buf: &mut Vec<u8>, value: &str) {
818 buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
819 buf.extend_from_slice(value.as_bytes());
820}
821
822fn write_bytes(buf: &mut Vec<u8>, value: &[u8]) {
823 buf.extend_from_slice(&(value.len() as u32).to_le_bytes());
824 buf.extend_from_slice(value);
825}
826
827fn write_ref_target(buf: &mut Vec<u8>, target: &crate::storage::unified::RefTarget) {
828 use crate::storage::unified::RefTarget;
829
830 match target {
831 RefTarget::TableRow { table, row_id } => {
832 buf.push(0);
833 write_string(buf, table);
834 buf.extend_from_slice(&row_id.to_le_bytes());
835 }
836 RefTarget::Node {
837 collection,
838 node_id,
839 } => {
840 buf.push(1);
841 write_string(buf, collection);
842 buf.extend_from_slice(&node_id.raw().to_le_bytes());
843 }
844 RefTarget::Edge {
845 collection,
846 edge_id,
847 } => {
848 buf.push(2);
849 write_string(buf, collection);
850 buf.extend_from_slice(&edge_id.raw().to_le_bytes());
851 }
852 RefTarget::Vector {
853 collection,
854 vector_id,
855 } => {
856 buf.push(3);
857 write_string(buf, collection);
858 buf.extend_from_slice(&vector_id.raw().to_le_bytes());
859 }
860 RefTarget::Entity {
861 collection,
862 entity_id,
863 } => {
864 buf.push(4);
865 write_string(buf, collection);
866 buf.extend_from_slice(&entity_id.raw().to_le_bytes());
867 }
868 }
869}
870
871fn write_metadata_value(buf: &mut Vec<u8>, value: &MetadataValue) {
872 match value {
873 MetadataValue::Null => buf.push(0),
874 MetadataValue::Bool(v) => {
875 buf.push(1);
876 buf.push(u8::from(*v));
877 }
878 MetadataValue::Int(v) => {
879 buf.push(2);
880 buf.extend_from_slice(&v.to_le_bytes());
881 }
882 MetadataValue::Float(v) => {
883 buf.push(3);
884 buf.extend_from_slice(&v.to_le_bytes());
885 }
886 MetadataValue::String(v) => {
887 buf.push(4);
888 write_string(buf, v);
889 }
890 MetadataValue::Bytes(v) => {
891 buf.push(5);
892 write_bytes(buf, v);
893 }
894 MetadataValue::Array(values) => {
895 buf.push(6);
896 buf.extend_from_slice(&(values.len() as u32).to_le_bytes());
897 for value in values {
898 write_metadata_value(buf, value);
899 }
900 }
901 MetadataValue::Object(values) => {
902 buf.push(7);
903 let mut entries: Vec<_> = values.iter().collect();
904 entries.sort_by_key(|(a, _)| *a);
905 buf.extend_from_slice(&(entries.len() as u32).to_le_bytes());
906 for (key, value) in entries {
907 write_string(buf, key);
908 write_metadata_value(buf, value);
909 }
910 }
911 MetadataValue::Timestamp(v) => {
912 buf.push(8);
913 buf.extend_from_slice(&v.to_le_bytes());
914 }
915 MetadataValue::Geo { lat, lon } => {
916 buf.push(9);
917 buf.extend_from_slice(&lat.to_le_bytes());
918 buf.extend_from_slice(&lon.to_le_bytes());
919 }
920 MetadataValue::Reference(target) => {
921 buf.push(10);
922 write_ref_target(buf, target);
923 }
924 MetadataValue::References(targets) => {
925 buf.push(11);
926 buf.extend_from_slice(&(targets.len() as u32).to_le_bytes());
927 for target in targets {
928 write_ref_target(buf, target);
929 }
930 }
931 }
932}
933
934fn read_exact_slice<'a>(
935 data: &'a [u8],
936 pos: &mut usize,
937 len: usize,
938) -> Result<&'a [u8], StoreError> {
939 if *pos + len > data.len() {
940 return Err(StoreError::Serialization(
941 "truncated metadata payload".to_string(),
942 ));
943 }
944 let slice = &data[*pos..*pos + len];
945 *pos += len;
946 Ok(slice)
947}
948
949fn read_u32(data: &[u8], pos: &mut usize) -> Result<u32, StoreError> {
950 let bytes = read_exact_slice(data, pos, 4)?;
951 let mut raw = [0u8; 4];
952 raw.copy_from_slice(bytes);
953 Ok(u32::from_le_bytes(raw))
954}
955
956fn read_u64(data: &[u8], pos: &mut usize) -> Result<u64, StoreError> {
957 let bytes = read_exact_slice(data, pos, 8)?;
958 let mut raw = [0u8; 8];
959 raw.copy_from_slice(bytes);
960 Ok(u64::from_le_bytes(raw))
961}
962
963fn read_i64(data: &[u8], pos: &mut usize) -> Result<i64, StoreError> {
964 let bytes = read_exact_slice(data, pos, 8)?;
965 let mut raw = [0u8; 8];
966 raw.copy_from_slice(bytes);
967 Ok(i64::from_le_bytes(raw))
968}
969
970fn read_f64(data: &[u8], pos: &mut usize) -> Result<f64, StoreError> {
971 let bytes = read_exact_slice(data, pos, 8)?;
972 let mut raw = [0u8; 8];
973 raw.copy_from_slice(bytes);
974 Ok(f64::from_le_bytes(raw))
975}
976
977fn read_u8(data: &[u8], pos: &mut usize) -> Result<u8, StoreError> {
978 let bytes = read_exact_slice(data, pos, 1)?;
979 Ok(bytes[0])
980}
981
982fn read_string(data: &[u8], pos: &mut usize) -> Result<String, StoreError> {
983 let len = read_u32(data, pos)? as usize;
984 let bytes = read_exact_slice(data, pos, len)?;
985 String::from_utf8(bytes.to_vec()).map_err(|err| StoreError::Serialization(err.to_string()))
986}
987
988fn read_bytes(data: &[u8], pos: &mut usize) -> Result<Vec<u8>, StoreError> {
989 let len = read_u32(data, pos)? as usize;
990 Ok(read_exact_slice(data, pos, len)?.to_vec())
991}
992
993fn read_ref_target(
994 data: &[u8],
995 pos: &mut usize,
996) -> Result<crate::storage::unified::RefTarget, StoreError> {
997 use crate::storage::unified::RefTarget;
998
999 match read_u8(data, pos)? {
1000 0 => Ok(RefTarget::TableRow {
1001 table: read_string(data, pos)?,
1002 row_id: read_u64(data, pos)?,
1003 }),
1004 1 => Ok(RefTarget::Node {
1005 collection: read_string(data, pos)?,
1006 node_id: EntityId::new(read_u64(data, pos)?),
1007 }),
1008 2 => Ok(RefTarget::Edge {
1009 collection: read_string(data, pos)?,
1010 edge_id: EntityId::new(read_u64(data, pos)?),
1011 }),
1012 3 => Ok(RefTarget::Vector {
1013 collection: read_string(data, pos)?,
1014 vector_id: EntityId::new(read_u64(data, pos)?),
1015 }),
1016 4 => Ok(RefTarget::Entity {
1017 collection: read_string(data, pos)?,
1018 entity_id: EntityId::new(read_u64(data, pos)?),
1019 }),
1020 tag => Err(StoreError::Serialization(format!(
1021 "unknown metadata ref target tag {tag}"
1022 ))),
1023 }
1024}
1025
1026fn read_metadata_value(data: &[u8], pos: &mut usize) -> Result<MetadataValue, StoreError> {
1027 match read_u8(data, pos)? {
1028 0 => Ok(MetadataValue::Null),
1029 1 => Ok(MetadataValue::Bool(read_u8(data, pos)? != 0)),
1030 2 => Ok(MetadataValue::Int(read_i64(data, pos)?)),
1031 3 => Ok(MetadataValue::Float(read_f64(data, pos)?)),
1032 4 => Ok(MetadataValue::String(read_string(data, pos)?)),
1033 5 => Ok(MetadataValue::Bytes(read_bytes(data, pos)?)),
1034 6 => {
1035 let count = read_u32(data, pos)? as usize;
1036 let mut values = Vec::with_capacity(count);
1037 for _ in 0..count {
1038 values.push(read_metadata_value(data, pos)?);
1039 }
1040 Ok(MetadataValue::Array(values))
1041 }
1042 7 => {
1043 let count = read_u32(data, pos)? as usize;
1044 let mut values = std::collections::HashMap::with_capacity(count);
1045 for _ in 0..count {
1046 let key = read_string(data, pos)?;
1047 let value = read_metadata_value(data, pos)?;
1048 values.insert(key, value);
1049 }
1050 Ok(MetadataValue::Object(values))
1051 }
1052 8 => Ok(MetadataValue::Timestamp(read_u64(data, pos)?)),
1053 9 => Ok(MetadataValue::Geo {
1054 lat: read_f64(data, pos)?,
1055 lon: read_f64(data, pos)?,
1056 }),
1057 10 => Ok(MetadataValue::Reference(read_ref_target(data, pos)?)),
1058 11 => {
1059 let count = read_u32(data, pos)? as usize;
1060 let mut targets = Vec::with_capacity(count);
1061 for _ in 0..count {
1062 targets.push(read_ref_target(data, pos)?);
1063 }
1064 Ok(MetadataValue::References(targets))
1065 }
1066 tag => Err(StoreError::Serialization(format!(
1067 "unknown metadata value tag {tag}"
1068 ))),
1069 }
1070}