1use super::*;
2
3impl UnifiedStore {
4 pub fn new() -> Self {
5 Self::with_config(UnifiedStoreConfig::default())
6 }
7
8 pub fn format_version(&self) -> u32 {
10 self.format_version.load(Ordering::SeqCst)
11 }
12
13 pub(crate) fn set_format_version(&self, version: u32) {
14 self.format_version.store(version, Ordering::SeqCst);
15 }
16
17 pub fn next_entity_id(&self) -> EntityId {
19 EntityId::new(self.next_entity_id.fetch_add(1, Ordering::SeqCst))
20 }
21
22 pub fn reserve_entity_ids(&self, n: u64) -> std::ops::Range<u64> {
25 let start = self.next_entity_id.fetch_add(n, Ordering::SeqCst);
26 start..start + n
27 }
28
29 pub(crate) fn register_entity_id(&self, id: EntityId) {
30 let candidate = id.raw().saturating_add(1);
31 let mut current = self.next_entity_id.load(Ordering::SeqCst);
32 while candidate > current {
33 match self.next_entity_id.compare_exchange(
34 current,
35 candidate,
36 Ordering::SeqCst,
37 Ordering::SeqCst,
38 ) {
39 Ok(_) => break,
40 Err(updated) => current = updated,
41 }
42 }
43 }
44
45 pub fn load_from_file(path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
57 let file = File::open(path)?;
58 let mut reader = BufReader::new(file);
59 let mut buf = Vec::new();
60 reader.read_to_end(&mut buf)?;
61 Self::load_from_bytes(&buf)
62 }
63
64 pub(crate) fn load_from_bytes(buf: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
65 Self::load_from_bytes_with_config(buf, UnifiedStoreConfig::default())
66 }
67
68 pub(crate) fn load_from_bytes_with_config(
69 buf: &[u8],
70 config: UnifiedStoreConfig,
71 ) -> Result<Self, Box<dyn std::error::Error>> {
72 let mut buf = buf.to_vec();
73 let version =
74 reddb_file::decode_native_store_header(&buf).map_err(|err| err.to_string())?;
75 let mut pos = 8;
76
77 reddb_file::verify_native_store_crc32_footer(&mut buf, version)
79 .map_err(|err| err.to_string())?;
80
81 let store = Self::with_config(config);
82 store.set_format_version(version);
83
84 let collection_count =
86 reddb_file::decode_native_dump_count(&buf, &mut pos).map_err(|e| e.to_string())?;
87
88 for _ in 0..collection_count {
90 let collection_header =
91 reddb_file::decode_native_dump_collection_header(&buf, &mut pos)
92 .map_err(|e| e.to_string())?;
93
94 for _ in 0..collection_header.entity_count {
96 if version >= STORE_VERSION_V7 {
97 let record_bytes = reddb_file::decode_native_dump_entity_record(&buf, &mut pos)
99 .map_err(|e| e.to_string())?;
100
101 let (entity, metadata) = Self::deserialize_entity_record(record_bytes, version)
102 .map_err(|e| format!("Entity record deserialization error: {e}"))?;
103
104 store.insert_auto(&collection_header.name, entity.clone())?;
105
106 if let Some(metadata) = metadata {
107 if let Some(manager) = store.get_collection(&collection_header.name) {
108 let _ = manager.set_metadata(entity.id, metadata);
109 }
110 }
111 } else {
112 let entity = Self::read_entity_binary(&buf, &mut pos, version)?;
114 store.insert_auto(&collection_header.name, entity)?;
115 }
116 }
117 }
118
119 if pos < buf.len() {
120 let cross_ref_count =
122 reddb_file::decode_native_dump_count(&buf, &mut pos).map_err(|e| e.to_string())?;
123
124 for _ in 0..cross_ref_count {
125 let cross_ref = reddb_file::decode_native_dump_cross_ref(&buf, &mut pos)
126 .map_err(|e| e.to_string())?;
127 let ref_type = RefType::from_byte(cross_ref.ref_type);
128
129 let source_collection = store
130 .get_any(EntityId::new(cross_ref.source_id))
131 .map(|(name, _)| name)
132 .unwrap_or_else(|| cross_ref.target_collection.clone());
133 let _ = store.add_cross_ref(
134 &source_collection,
135 EntityId::new(cross_ref.source_id),
136 &cross_ref.target_collection,
137 EntityId::new(cross_ref.target_id),
138 ref_type,
139 1.0,
140 );
141 }
142 }
143
144 if version >= reddb_file::STORE_VERSION_V10 && pos < buf.len() {
149 let aux = reddb_file::decode_native_dump_entity_record(&buf, &mut pos)
150 .map_err(|e| e.to_string())?;
151 if !aux.is_empty() {
152 store.set_aux_metadata(aux.to_vec());
153 }
154 }
155
156 if store.format_version() < STORE_VERSION_CURRENT {
157 store.set_format_version(STORE_VERSION_CURRENT);
158 }
159
160 Ok(store)
161 }
162
163 pub fn save_to_file(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
168 let buf = self.to_binary_dump_bytes();
169 reddb_file::write_native_store_bytes_atomically(path, &buf)?;
170 Ok(())
171 }
172
173 pub(crate) fn to_binary_dump_bytes(&self) -> Vec<u8> {
174 let mut buf = Vec::new();
175
176 buf.extend_from_slice(&reddb_file::encode_native_store_header(
179 STORE_VERSION_CURRENT,
180 ));
181
182 let collections = self.collections.read();
184 reddb_file::encode_native_dump_count(&mut buf, collections.len() as u32);
185
186 let fv = STORE_VERSION_CURRENT;
187 for (name, manager) in collections.iter() {
188 let entities = manager.query_all(|_| true);
190 reddb_file::encode_native_dump_collection_header(&mut buf, name, entities.len() as u32);
191
192 for entity in entities {
195 let metadata = manager.get_metadata(entity.id);
196 let record = Self::serialize_entity_record(&entity, metadata.as_ref(), fv);
197 reddb_file::encode_native_dump_entity_record(&mut buf, &record);
198 }
199 }
200
201 let cross_refs = self.cross_refs.read();
203 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
204 reddb_file::encode_native_dump_count(&mut buf, total_refs as u32);
205
206 for (source_id, refs) in cross_refs.iter() {
207 for (target_id, ref_type, collection) in refs {
208 reddb_file::encode_native_dump_cross_ref(
209 &mut buf,
210 source_id.raw(),
211 target_id.raw(),
212 ref_type.to_byte(),
213 collection,
214 );
215 }
216 }
217
218 {
222 let aux = self.aux_metadata.read();
223 reddb_file::encode_native_dump_entity_record(&mut buf, &aux);
224 }
225
226 self.set_format_version(STORE_VERSION_CURRENT);
227
228 reddb_file::append_native_store_crc32_footer(&mut buf);
229 buf
230 }
231
232 pub(crate) fn read_entity_binary(
234 buf: &[u8],
235 pos: &mut usize,
236 format_version: u32,
237 ) -> Result<UnifiedEntity, Box<dyn std::error::Error>> {
238 let id = read_varu64(buf, pos).map_err(|e| format!("Failed to read entity id: {:?}", e))?;
240
241 let kind_type = buf[*pos];
243 *pos += 1;
244
245 let kind = match kind_type {
247 0 => {
248 let table_len = Self::read_varu32_safe(buf, pos)?;
250 let table = String::from_utf8(buf[*pos..*pos + table_len].to_vec())?;
251 *pos += table_len;
252 let row_id = Self::read_varu64_safe(buf, pos)?;
253 EntityKind::TableRow {
254 table: table.into(),
255 row_id,
256 }
257 }
258 1 => {
259 let label_len = Self::read_varu32_safe(buf, pos)?;
261 let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
262 *pos += label_len;
263 let node_type_len = Self::read_varu32_safe(buf, pos)?;
264 let node_type = String::from_utf8(buf[*pos..*pos + node_type_len].to_vec())?;
265 *pos += node_type_len;
266 EntityKind::GraphNode(Box::new(GraphNodeKind { label, node_type }))
267 }
268 2 => {
269 let label_len = Self::read_varu32_safe(buf, pos)?;
271 let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
272 *pos += label_len;
273 let from_node_len = Self::read_varu32_safe(buf, pos)?;
274 let from_node = String::from_utf8(buf[*pos..*pos + from_node_len].to_vec())?;
275 *pos += from_node_len;
276 let to_node_len = Self::read_varu32_safe(buf, pos)?;
277 let to_node = String::from_utf8(buf[*pos..*pos + to_node_len].to_vec())?;
278 *pos += to_node_len;
279 let weight =
280 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
281 *pos += 4;
282 EntityKind::GraphEdge(Box::new(GraphEdgeKind {
283 label,
284 from_node,
285 to_node,
286 weight,
287 }))
288 }
289 3 => {
290 let collection_len = Self::read_varu32_safe(buf, pos)?;
292 let collection = String::from_utf8(buf[*pos..*pos + collection_len].to_vec())?;
293 *pos += collection_len;
294 EntityKind::Vector { collection }
295 }
296 4 => {
297 let series_len = Self::read_varu32_safe(buf, pos)?;
299 let series = String::from_utf8(buf[*pos..*pos + series_len].to_vec())?;
300 *pos += series_len;
301 let metric_len = Self::read_varu32_safe(buf, pos)?;
302 let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
303 *pos += metric_len;
304 EntityKind::TimeSeriesPoint(Box::new(TimeSeriesPointKind { series, metric }))
305 }
306 5 => {
307 let queue_len = Self::read_varu32_safe(buf, pos)?;
309 let queue = String::from_utf8(buf[*pos..*pos + queue_len].to_vec())?;
310 *pos += queue_len;
311 let position = Self::read_varu64_safe(buf, pos)?;
312 EntityKind::QueueMessage { queue, position }
313 }
314 _ => return Err(format!("Unknown EntityKind type: {}", kind_type).into()),
315 };
316
317 let data_type = buf[*pos];
319 *pos += 1;
320
321 let mut data = match data_type {
323 0 => {
324 let col_count = Self::read_varu32_safe(buf, pos)?;
326 let mut columns = Vec::with_capacity(col_count);
327 for _ in 0..col_count {
328 columns.push(Self::read_value_binary(buf, pos)?);
329 }
330 EntityData::Row(RowData::new(columns))
331 }
332 1 => {
333 let prop_count = Self::read_varu32_safe(buf, pos)?;
335 let mut properties = HashMap::new();
336 for _ in 0..prop_count {
337 let key_len = Self::read_varu32_safe(buf, pos)?;
338 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
339 *pos += key_len;
340 let value = Self::read_value_binary(buf, pos)?;
341 properties.insert(key, value);
342 }
343 EntityData::Node(NodeData::with_properties(properties))
344 }
345 2 => {
346 let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
348 *pos += 4;
349 let weight = f32::from_le_bytes(weight_bytes);
350 let prop_count = Self::read_varu32_safe(buf, pos)?;
351 let mut properties = HashMap::new();
352 for _ in 0..prop_count {
353 let key_len = Self::read_varu32_safe(buf, pos)?;
354 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
355 *pos += key_len;
356 let value = Self::read_value_binary(buf, pos)?;
357 properties.insert(key, value);
358 }
359 let mut edge = EdgeData::new(weight);
360 edge.properties = properties;
361 EntityData::Edge(edge)
362 }
363 3 => {
364 let dim = Self::read_varu32_safe(buf, pos)?;
366 let mut dense = Vec::with_capacity(dim);
367 for _ in 0..dim {
368 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
369 *pos += 4;
370 dense.push(f32::from_le_bytes(bytes));
371 }
372 let mut vector = VectorData::new(dense);
373 if format_version >= STORE_VERSION_V6 {
374 let has_content = buf[*pos] != 0;
375 *pos += 1;
376 if has_content {
377 let content_len = Self::read_varu32_safe(buf, pos)?;
378 vector.content =
379 Some(String::from_utf8(buf[*pos..*pos + content_len].to_vec())?);
380 *pos += content_len;
381 }
382 }
383 EntityData::Vector(vector)
384 }
385 4 => {
386 let metric_len = Self::read_varu32_safe(buf, pos)?;
388 let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
389 *pos += metric_len;
390 let timestamp_ns = Self::read_varu64_safe(buf, pos)?;
391 let value_bytes = [
392 buf[*pos],
393 buf[*pos + 1],
394 buf[*pos + 2],
395 buf[*pos + 3],
396 buf[*pos + 4],
397 buf[*pos + 5],
398 buf[*pos + 6],
399 buf[*pos + 7],
400 ];
401 *pos += 8;
402 let mut tags = HashMap::new();
403 if format_version >= STORE_VERSION_V5 {
404 let tag_count = Self::read_varu32_safe(buf, pos)?;
405 tags = HashMap::with_capacity(tag_count);
406 for _ in 0..tag_count {
407 let key_len = Self::read_varu32_safe(buf, pos)?;
408 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
409 *pos += key_len;
410 let value_len = Self::read_varu32_safe(buf, pos)?;
411 let value = String::from_utf8(buf[*pos..*pos + value_len].to_vec())?;
412 *pos += value_len;
413 tags.insert(key, value);
414 }
415 }
416 EntityData::TimeSeries(crate::storage::unified::entity::TimeSeriesData {
417 metric,
418 timestamp_ns,
419 value: f64::from_le_bytes(value_bytes),
420 tags,
421 })
422 }
423 5 => {
424 let payload = Self::read_value_binary(buf, pos)?;
426 let enqueued_at_ns = Self::read_varu64_safe(buf, pos)?;
427 let attempts = Self::read_varu32_safe(buf, pos)? as u32;
428 EntityData::QueueMessage(crate::storage::unified::entity::QueueMessageData {
429 payload,
430 priority: None,
431 enqueued_at_ns,
432 attempts,
433 max_attempts: 3,
434 acked: false,
435 })
436 }
437 6 => {
438 let field_count = Self::read_varu32_safe(buf, pos)?;
440 let mut named = HashMap::with_capacity(field_count);
441 for _ in 0..field_count {
442 let key_len = Self::read_varu32_safe(buf, pos)?;
443 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
444 *pos += key_len;
445 let value = Self::read_value_binary(buf, pos)?;
446 named.insert(key, value);
447 }
448 EntityData::Row(RowData {
449 columns: Vec::new(),
450 named: Some(named),
451 schema: None,
452 })
453 }
454 _ => return Err(format!("Unknown EntityData type: {}", data_type).into()),
455 };
456
457 let created_at = Self::read_varu64_safe(buf, pos)?;
459 let updated_at = Self::read_varu64_safe(buf, pos)?;
460
461 let embedding_count = Self::read_varu32_safe(buf, pos)?;
463 let mut embeddings = Vec::with_capacity(embedding_count);
464 for _ in 0..embedding_count {
465 let name_len = Self::read_varu32_safe(buf, pos)?;
466 let name = String::from_utf8(buf[*pos..*pos + name_len].to_vec())?;
467 *pos += name_len;
468
469 let dim = Self::read_varu32_safe(buf, pos)?;
470 let mut vector = Vec::with_capacity(dim);
471 for _ in 0..dim {
472 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
473 *pos += 4;
474 vector.push(f32::from_le_bytes(bytes));
475 }
476
477 let model_len = Self::read_varu32_safe(buf, pos)?;
478 let model = String::from_utf8(buf[*pos..*pos + model_len].to_vec())?;
479 *pos += model_len;
480
481 embeddings.push(EmbeddingSlot::new(name, vector, model));
482 }
483
484 let cross_ref_count = Self::read_varu32_safe(buf, pos)?;
486 let mut cross_refs = Vec::with_capacity(cross_ref_count);
487 for _ in 0..cross_ref_count {
488 let source = Self::read_varu64_safe(buf, pos)?;
489 let target = Self::read_varu64_safe(buf, pos)?;
490 let ref_type_byte = buf[*pos];
491 *pos += 1;
492 let (target_collection, weight, created_at) = if format_version >= STORE_VERSION_V2 {
493 let coll_len = Self::read_varu32_safe(buf, pos)?;
494 let collection = String::from_utf8(buf[*pos..*pos + coll_len].to_vec())?;
495 *pos += coll_len;
496 let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
497 *pos += 4;
498 let weight = f32::from_le_bytes(weight_bytes);
499 let created_at = Self::read_varu64_safe(buf, pos)?;
500 (collection, weight, created_at)
501 } else {
502 (String::new(), 1.0, 0)
503 };
504
505 let mut cross_ref = CrossRef::new(
506 EntityId::new(source),
507 EntityId::new(target),
508 target_collection,
509 RefType::from_byte(ref_type_byte),
510 );
511 cross_ref.weight = weight;
512 cross_ref.created_at = created_at;
513 cross_refs.push(cross_ref);
514 }
515
516 let sequence_id = Self::read_varu64_safe(buf, pos)?;
518
519 if format_version >= STORE_VERSION_V4 {
520 if let EntityData::QueueMessage(message) = &mut data {
521 if *pos < buf.len() {
522 let priority_present = buf[*pos] != 0;
523 *pos += 1;
524 message.priority = if priority_present {
525 if *pos + 4 > buf.len() {
526 return Err("truncated queue priority".into());
527 }
528 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
529 *pos += 4;
530 Some(i32::from_le_bytes(bytes))
531 } else {
532 None
533 };
534 message.max_attempts = Self::read_varu32_safe(buf, pos)? as u32;
535 if *pos >= buf.len() {
536 return Err("truncated queue ack flag".into());
537 }
538 message.acked = buf[*pos] != 0;
539 *pos += 1;
540 }
541 }
542 }
543
544 let mut entity = UnifiedEntity::new(EntityId::new(id), kind, data);
545 entity.created_at = created_at;
546 entity.updated_at = updated_at;
547 entity.sequence_id = sequence_id;
548 if format_version >= STORE_VERSION_V8 && *pos < buf.len() {
549 let has_logical_id = buf[*pos] != 0;
550 *pos += 1;
551 if has_logical_id {
552 let logical_id = Self::read_varu64_safe(buf, pos)?;
553 entity.set_logical_id(EntityId::new(logical_id));
554 }
555 }
556 if format_version >= STORE_VERSION_V9 && *pos < buf.len() {
557 let xmin = Self::read_varu64_safe(buf, pos)?;
558 let xmax = Self::read_varu64_safe(buf, pos)?;
559 entity.set_xmin(xmin);
560 entity.set_xmax(xmax);
561 }
562 if !embeddings.is_empty() || !cross_refs.is_empty() {
563 entity.embeddings_mut().extend(embeddings);
564 entity.cross_refs_mut().extend(cross_refs);
565 }
566
567 Ok(entity)
568 }
569
570 fn read_varu32_safe(buf: &[u8], pos: &mut usize) -> Result<usize, Box<dyn std::error::Error>> {
572 read_varu32(buf, pos)
573 .map(|v| v as usize)
574 .map_err(|e| format!("Decode error: {:?}", e).into())
575 }
576
577 fn read_varu64_safe(buf: &[u8], pos: &mut usize) -> Result<u64, Box<dyn std::error::Error>> {
579 read_varu64(buf, pos).map_err(|e| format!("Decode error: {:?}", e).into())
580 }
581
582 pub(crate) fn write_entity_binary(
584 buf: &mut Vec<u8>,
585 entity: &UnifiedEntity,
586 format_version: u32,
587 ) {
588 write_varu64(buf, entity.id.raw());
590
591 match &entity.kind {
593 EntityKind::TableRow { table, row_id } => {
594 buf.push(0);
595 write_varu32(buf, table.len() as u32);
596 buf.extend_from_slice(table.as_bytes());
597 write_varu64(buf, *row_id);
598 }
599 EntityKind::GraphNode(ref node) => {
600 buf.push(1);
601 write_varu32(buf, node.label.len() as u32);
602 buf.extend_from_slice(node.label.as_bytes());
603 write_varu32(buf, node.node_type.len() as u32);
604 buf.extend_from_slice(node.node_type.as_bytes());
605 }
606 EntityKind::GraphEdge(ref edge) => {
607 buf.push(2);
608 write_varu32(buf, edge.label.len() as u32);
609 buf.extend_from_slice(edge.label.as_bytes());
610 write_varu32(buf, edge.from_node.len() as u32);
611 buf.extend_from_slice(edge.from_node.as_bytes());
612 write_varu32(buf, edge.to_node.len() as u32);
613 buf.extend_from_slice(edge.to_node.as_bytes());
614 buf.extend_from_slice(&edge.weight.to_le_bytes());
615 }
616 EntityKind::Vector { collection } => {
617 buf.push(3);
618 write_varu32(buf, collection.len() as u32);
619 buf.extend_from_slice(collection.as_bytes());
620 }
621 EntityKind::TimeSeriesPoint(ref ts) => {
622 buf.push(4);
623 write_varu32(buf, ts.series.len() as u32);
624 buf.extend_from_slice(ts.series.as_bytes());
625 write_varu32(buf, ts.metric.len() as u32);
626 buf.extend_from_slice(ts.metric.as_bytes());
627 }
628 EntityKind::QueueMessage { queue, position } => {
629 buf.push(5);
630 write_varu32(buf, queue.len() as u32);
631 buf.extend_from_slice(queue.as_bytes());
632 write_varu64(buf, *position);
633 }
634 }
635
636 match &entity.data {
638 EntityData::Row(row) => {
639 if let Some(ref named) = row.named {
640 buf.push(6);
647 write_varu32(buf, named.len() as u32);
648 let mut entries: Vec<_> = named.iter().collect();
649 entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
650 for (key, value) in entries {
651 write_varu32(buf, key.len() as u32);
652 buf.extend_from_slice(key.as_bytes());
653 Self::write_value_binary(buf, value);
654 }
655 } else {
656 buf.push(0);
657 write_varu32(buf, row.columns.len() as u32);
658 for col in &row.columns {
659 Self::write_value_binary(buf, col);
660 }
661 }
662 }
663 EntityData::Node(node) => {
664 buf.push(1);
665 write_varu32(buf, node.properties.len() as u32);
666 let mut entries: Vec<_> = node.properties.iter().collect();
667 entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
668 for (key, value) in entries {
669 write_varu32(buf, key.len() as u32);
670 buf.extend_from_slice(key.as_bytes());
671 Self::write_value_binary(buf, value);
672 }
673 }
674 EntityData::Edge(edge) => {
675 buf.push(2);
676 buf.extend_from_slice(&edge.weight.to_le_bytes());
677 write_varu32(buf, edge.properties.len() as u32);
678 let mut entries: Vec<_> = edge.properties.iter().collect();
679 entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
680 for (key, value) in entries {
681 write_varu32(buf, key.len() as u32);
682 buf.extend_from_slice(key.as_bytes());
683 Self::write_value_binary(buf, value);
684 }
685 }
686 EntityData::Vector(vec) => {
687 buf.push(3);
688 write_varu32(buf, vec.dense.len() as u32);
689 for f in &vec.dense {
690 buf.extend_from_slice(&f.to_le_bytes());
691 }
692 if format_version >= STORE_VERSION_V6 {
693 buf.push(u8::from(vec.content.is_some()));
694 if let Some(content) = &vec.content {
695 write_varu32(buf, content.len() as u32);
696 buf.extend_from_slice(content.as_bytes());
697 }
698 }
699 }
700 EntityData::TimeSeries(ts) => {
701 buf.push(4);
702 write_varu32(buf, ts.metric.len() as u32);
703 buf.extend_from_slice(ts.metric.as_bytes());
704 write_varu64(buf, ts.timestamp_ns);
705 buf.extend_from_slice(&ts.value.to_le_bytes());
706 if format_version >= STORE_VERSION_V5 {
707 write_varu32(buf, ts.tags.len() as u32);
708 let mut tag_entries: Vec<_> = ts.tags.iter().collect();
709 tag_entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
710 for (key, value) in tag_entries {
711 write_varu32(buf, key.len() as u32);
712 buf.extend_from_slice(key.as_bytes());
713 write_varu32(buf, value.len() as u32);
714 buf.extend_from_slice(value.as_bytes());
715 }
716 }
717 }
718 EntityData::QueueMessage(msg) => {
719 buf.push(5);
720 Self::write_value_binary(buf, &msg.payload);
721 write_varu64(buf, msg.enqueued_at_ns);
722 write_varu32(buf, msg.attempts);
723 }
724 }
725
726 write_varu64(buf, entity.created_at);
728 write_varu64(buf, entity.updated_at);
729
730 write_varu32(buf, entity.embeddings().len() as u32);
732 for emb in entity.embeddings() {
733 write_varu32(buf, emb.name.len() as u32);
734 buf.extend_from_slice(emb.name.as_bytes());
735 write_varu32(buf, emb.vector.len() as u32);
736 for f in &emb.vector {
737 buf.extend_from_slice(&f.to_le_bytes());
738 }
739 write_varu32(buf, emb.model.len() as u32);
740 buf.extend_from_slice(emb.model.as_bytes());
741 }
742
743 write_varu32(buf, entity.cross_refs().len() as u32);
745 for cross_ref in entity.cross_refs() {
746 write_varu64(buf, cross_ref.source.raw());
747 write_varu64(buf, cross_ref.target.raw());
748 buf.push(cross_ref.ref_type.to_byte());
749 if format_version >= STORE_VERSION_V2 {
750 write_varu32(buf, cross_ref.target_collection.len() as u32);
751 buf.extend_from_slice(cross_ref.target_collection.as_bytes());
752 buf.extend_from_slice(&cross_ref.weight.to_le_bytes());
753 write_varu64(buf, cross_ref.created_at);
754 }
755 }
756
757 write_varu64(buf, entity.sequence_id);
759
760 if format_version >= STORE_VERSION_V4 {
761 if let EntityData::QueueMessage(message) = &entity.data {
762 buf.push(u8::from(message.priority.is_some()));
763 if let Some(priority) = message.priority {
764 buf.extend_from_slice(&priority.to_le_bytes());
765 }
766 write_varu32(buf, message.max_attempts);
767 buf.push(u8::from(message.acked));
768 }
769 }
770
771 if format_version >= STORE_VERSION_V8 {
772 buf.push(u8::from(entity.has_explicit_logical_id()));
773 if entity.has_explicit_logical_id() {
774 write_varu64(buf, entity.logical_id().raw());
775 }
776 }
777 if format_version >= STORE_VERSION_V9 {
778 write_varu64(buf, entity.xmin);
779 write_varu64(buf, entity.xmax);
780 }
781 }
782
783 fn read_value_binary(buf: &[u8], pos: &mut usize) -> Result<Value, Box<dyn std::error::Error>> {
788 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
789
790 let type_byte = buf[*pos];
791 *pos += 1;
792
793 Ok(match type_byte {
794 0 => Value::Null,
795 1 => {
796 let b = buf[*pos] != 0;
797 *pos += 1;
798 Value::Boolean(b)
799 }
800 2 => {
801 let val = i64::from_le_bytes([
802 buf[*pos],
803 buf[*pos + 1],
804 buf[*pos + 2],
805 buf[*pos + 3],
806 buf[*pos + 4],
807 buf[*pos + 5],
808 buf[*pos + 6],
809 buf[*pos + 7],
810 ]);
811 *pos += 8;
812 Value::Integer(val)
813 }
814 3 => {
815 let val = u64::from_le_bytes([
816 buf[*pos],
817 buf[*pos + 1],
818 buf[*pos + 2],
819 buf[*pos + 3],
820 buf[*pos + 4],
821 buf[*pos + 5],
822 buf[*pos + 6],
823 buf[*pos + 7],
824 ]);
825 *pos += 8;
826 Value::UnsignedInteger(val)
827 }
828 4 => {
829 let val = f64::from_le_bytes([
830 buf[*pos],
831 buf[*pos + 1],
832 buf[*pos + 2],
833 buf[*pos + 3],
834 buf[*pos + 4],
835 buf[*pos + 5],
836 buf[*pos + 6],
837 buf[*pos + 7],
838 ]);
839 *pos += 8;
840 Value::Float(val)
841 }
842 5 => {
843 let len = Self::read_varu32_safe(buf, pos)?;
844 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
845 *pos += len;
846 Value::text(s)
847 }
848 6 => {
849 let len = Self::read_varu32_safe(buf, pos)?;
850 let bytes = buf[*pos..*pos + len].to_vec();
851 *pos += len;
852 Value::Blob(bytes)
853 }
854 7 => {
855 let val = i64::from_le_bytes([
856 buf[*pos],
857 buf[*pos + 1],
858 buf[*pos + 2],
859 buf[*pos + 3],
860 buf[*pos + 4],
861 buf[*pos + 5],
862 buf[*pos + 6],
863 buf[*pos + 7],
864 ]);
865 *pos += 8;
866 Value::Timestamp(val)
867 }
868 8 => {
869 let val = i64::from_le_bytes([
870 buf[*pos],
871 buf[*pos + 1],
872 buf[*pos + 2],
873 buf[*pos + 3],
874 buf[*pos + 4],
875 buf[*pos + 5],
876 buf[*pos + 6],
877 buf[*pos + 7],
878 ]);
879 *pos += 8;
880 Value::Duration(val)
881 }
882 9 => {
883 let version = buf[*pos];
885 *pos += 1;
886 if version == 4 {
887 let octets = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
888 *pos += 4;
889 Value::IpAddr(IpAddr::V4(Ipv4Addr::from(octets)))
890 } else {
891 let mut octets = [0u8; 16];
892 octets.copy_from_slice(&buf[*pos..*pos + 16]);
893 *pos += 16;
894 Value::IpAddr(IpAddr::V6(Ipv6Addr::from(octets)))
895 }
896 }
897 10 => {
898 let mut mac = [0u8; 6];
899 mac.copy_from_slice(&buf[*pos..*pos + 6]);
900 *pos += 6;
901 Value::MacAddr(mac)
902 }
903 11 => {
904 let len = Self::read_varu32_safe(buf, pos)?;
905 let mut vector = Vec::with_capacity(len);
906 for _ in 0..len {
907 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
908 *pos += 4;
909 vector.push(f32::from_le_bytes(bytes));
910 }
911 Value::Vector(vector)
912 }
913 12 => {
914 let len = Self::read_varu32_safe(buf, pos)?;
915 let bytes = buf[*pos..*pos + len].to_vec();
916 *pos += len;
917 Value::Json(bytes)
918 }
919 13 => {
920 let mut uuid = [0u8; 16];
921 uuid.copy_from_slice(&buf[*pos..*pos + 16]);
922 *pos += 16;
923 Value::Uuid(uuid)
924 }
925 14 => {
926 let len = Self::read_varu32_safe(buf, pos)?;
927 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
928 *pos += len;
929 Value::NodeRef(s)
930 }
931 15 => {
932 let len = Self::read_varu32_safe(buf, pos)?;
933 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
934 *pos += len;
935 Value::EdgeRef(s)
936 }
937 16 => {
938 let len = Self::read_varu32_safe(buf, pos)?;
939 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
940 *pos += len;
941 let id = u64::from_le_bytes([
942 buf[*pos],
943 buf[*pos + 1],
944 buf[*pos + 2],
945 buf[*pos + 3],
946 buf[*pos + 4],
947 buf[*pos + 5],
948 buf[*pos + 6],
949 buf[*pos + 7],
950 ]);
951 *pos += 8;
952 Value::VectorRef(s, id)
953 }
954 17 => {
955 let len = Self::read_varu32_safe(buf, pos)?;
956 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
957 *pos += len;
958 let id = u64::from_le_bytes([
959 buf[*pos],
960 buf[*pos + 1],
961 buf[*pos + 2],
962 buf[*pos + 3],
963 buf[*pos + 4],
964 buf[*pos + 5],
965 buf[*pos + 6],
966 buf[*pos + 7],
967 ]);
968 *pos += 8;
969 Value::RowRef(s, id)
970 }
971 18 => {
972 let rgb = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
973 *pos += 3;
974 Value::Color(rgb)
975 }
976 19 => {
977 let len = Self::read_varu32_safe(buf, pos)?;
978 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
979 *pos += len;
980 Value::Email(s)
981 }
982 20 => {
983 let len = Self::read_varu32_safe(buf, pos)?;
984 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
985 *pos += len;
986 Value::Url(s)
987 }
988 21 => {
989 let val = u64::from_le_bytes([
990 buf[*pos],
991 buf[*pos + 1],
992 buf[*pos + 2],
993 buf[*pos + 3],
994 buf[*pos + 4],
995 buf[*pos + 5],
996 buf[*pos + 6],
997 buf[*pos + 7],
998 ]);
999 *pos += 8;
1000 Value::Phone(val)
1001 }
1002 22 => {
1003 let val =
1004 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1005 *pos += 4;
1006 Value::Semver(val)
1007 }
1008 23 => {
1009 let ip =
1010 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1011 *pos += 4;
1012 let prefix = buf[*pos];
1013 *pos += 1;
1014 Value::Cidr(ip, prefix)
1015 }
1016 24 => {
1017 let val =
1018 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1019 *pos += 4;
1020 Value::Date(val)
1021 }
1022 25 => {
1023 let val =
1024 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1025 *pos += 4;
1026 Value::Time(val)
1027 }
1028 26 => {
1029 let val = i64::from_le_bytes([
1030 buf[*pos],
1031 buf[*pos + 1],
1032 buf[*pos + 2],
1033 buf[*pos + 3],
1034 buf[*pos + 4],
1035 buf[*pos + 5],
1036 buf[*pos + 6],
1037 buf[*pos + 7],
1038 ]);
1039 *pos += 8;
1040 Value::Decimal(val)
1041 }
1042 27 => {
1043 let val = buf[*pos];
1044 *pos += 1;
1045 Value::EnumValue(val)
1046 }
1047 28 => {
1048 let len = Self::read_varu32_safe(buf, pos)?;
1049 let mut elems = Vec::with_capacity(len);
1050 for _ in 0..len {
1051 elems.push(Self::read_value_binary(buf, pos)?);
1052 }
1053 Value::Array(elems)
1054 }
1055 29 => {
1056 let val = i64::from_le_bytes([
1057 buf[*pos],
1058 buf[*pos + 1],
1059 buf[*pos + 2],
1060 buf[*pos + 3],
1061 buf[*pos + 4],
1062 buf[*pos + 5],
1063 buf[*pos + 6],
1064 buf[*pos + 7],
1065 ]);
1066 *pos += 8;
1067 Value::TimestampMs(val)
1068 }
1069 30 => {
1070 let val =
1071 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1072 *pos += 4;
1073 Value::Ipv4(val)
1074 }
1075 31 => {
1076 let mut bytes = [0u8; 16];
1077 bytes.copy_from_slice(&buf[*pos..*pos + 16]);
1078 *pos += 16;
1079 Value::Ipv6(bytes)
1080 }
1081 32 => {
1082 let ip =
1083 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1084 *pos += 4;
1085 let mask =
1086 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1087 *pos += 4;
1088 Value::Subnet(ip, mask)
1089 }
1090 33 => {
1091 let val = u16::from_le_bytes([buf[*pos], buf[*pos + 1]]);
1092 *pos += 2;
1093 Value::Port(val)
1094 }
1095 34 => {
1096 let val =
1097 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1098 *pos += 4;
1099 Value::Latitude(val)
1100 }
1101 35 => {
1102 let val =
1103 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1104 *pos += 4;
1105 Value::Longitude(val)
1106 }
1107 36 => {
1108 let lat =
1109 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1110 *pos += 4;
1111 let lon =
1112 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1113 *pos += 4;
1114 Value::GeoPoint(lat, lon)
1115 }
1116 37 => {
1117 let c = [buf[*pos], buf[*pos + 1]];
1118 *pos += 2;
1119 Value::Country2(c)
1120 }
1121 38 => {
1122 let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1123 *pos += 3;
1124 Value::Country3(c)
1125 }
1126 39 => {
1127 let c = [buf[*pos], buf[*pos + 1]];
1128 *pos += 2;
1129 Value::Lang2(c)
1130 }
1131 40 => {
1132 let c = [
1133 buf[*pos],
1134 buf[*pos + 1],
1135 buf[*pos + 2],
1136 buf[*pos + 3],
1137 buf[*pos + 4],
1138 ];
1139 *pos += 5;
1140 Value::Lang5(c)
1141 }
1142 41 => {
1143 let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1144 *pos += 3;
1145 Value::Currency(c)
1146 }
1147 42 => {
1148 let rgba = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
1149 *pos += 4;
1150 Value::ColorAlpha(rgba)
1151 }
1152 43 => {
1153 let val = i64::from_le_bytes([
1154 buf[*pos],
1155 buf[*pos + 1],
1156 buf[*pos + 2],
1157 buf[*pos + 3],
1158 buf[*pos + 4],
1159 buf[*pos + 5],
1160 buf[*pos + 6],
1161 buf[*pos + 7],
1162 ]);
1163 *pos += 8;
1164 Value::BigInt(val)
1165 }
1166 44 => {
1167 let col_len = Self::read_varu32_safe(buf, pos)?;
1168 let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1169 *pos += col_len;
1170 let key_len = Self::read_varu32_safe(buf, pos)?;
1171 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
1172 *pos += key_len;
1173 Value::KeyRef(col, key)
1174 }
1175 45 => {
1176 let col_len = Self::read_varu32_safe(buf, pos)?;
1177 let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1178 *pos += col_len;
1179 let id = u64::from_le_bytes([
1180 buf[*pos],
1181 buf[*pos + 1],
1182 buf[*pos + 2],
1183 buf[*pos + 3],
1184 buf[*pos + 4],
1185 buf[*pos + 5],
1186 buf[*pos + 6],
1187 buf[*pos + 7],
1188 ]);
1189 *pos += 8;
1190 Value::DocRef(col, id)
1191 }
1192 46 => {
1193 let len = Self::read_varu32_safe(buf, pos)?;
1194 let name = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1195 *pos += len;
1196 Value::TableRef(name)
1197 }
1198 47 => {
1199 let val =
1200 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1201 *pos += 4;
1202 Value::PageRef(val)
1203 }
1204 48 => {
1205 let len = Self::read_varu32_safe(buf, pos)?;
1206 let bytes = buf[*pos..*pos + len].to_vec();
1207 *pos += len;
1208 Value::Secret(bytes)
1209 }
1210 49 => {
1211 let len = Self::read_varu32_safe(buf, pos)?;
1212 let hash = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1213 *pos += len;
1214 Value::Password(hash)
1215 }
1216 50 => {
1217 let len = Self::read_varu32_safe(buf, pos)?;
1218 let code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1219 *pos += len;
1220 Value::AssetCode(code)
1221 }
1222 51 => {
1223 let len = Self::read_varu32_safe(buf, pos)?;
1224 let asset_code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1225 *pos += len;
1226 let scale = buf[*pos];
1227 *pos += 1;
1228 let minor_units = i64::from_le_bytes([
1229 buf[*pos],
1230 buf[*pos + 1],
1231 buf[*pos + 2],
1232 buf[*pos + 3],
1233 buf[*pos + 4],
1234 buf[*pos + 5],
1235 buf[*pos + 6],
1236 buf[*pos + 7],
1237 ]);
1238 *pos += 8;
1239 Value::Money {
1240 asset_code,
1241 minor_units,
1242 scale,
1243 }
1244 }
1245 0x85 => {
1247 let orig_len = Self::read_varu32_safe(buf, pos)?;
1248 let comp_len = Self::read_varu32_safe(buf, pos)?;
1249 let compressed = &buf[*pos..*pos + comp_len];
1250 *pos += comp_len;
1251 let mut out = vec![0u8; orig_len];
1252 zstd::bulk::decompress_to_buffer(compressed, &mut out)
1253 .map_err(|e| format!("C3 Text decompress: {e}"))?;
1254 Value::text(String::from_utf8(out).map_err(|e| format!("C3 Text UTF-8: {e}"))?)
1255 }
1256 0x86 => {
1257 let orig_len = Self::read_varu32_safe(buf, pos)?;
1258 let comp_len = Self::read_varu32_safe(buf, pos)?;
1259 let compressed = &buf[*pos..*pos + comp_len];
1260 *pos += comp_len;
1261 let mut out = vec![0u8; orig_len];
1262 zstd::bulk::decompress_to_buffer(compressed, &mut out)
1263 .map_err(|e| format!("C3 Blob decompress: {e}"))?;
1264 Value::Blob(out)
1265 }
1266 _ => return Err(format!("Unknown Value type: {}", type_byte).into()),
1267 })
1268 }
1269
1270 fn write_value_binary(buf: &mut Vec<u8>, value: &Value) {
1275 use std::net::IpAddr;
1276
1277 match value {
1278 Value::Null => buf.push(0),
1279 Value::Boolean(b) => {
1280 buf.push(1);
1281 buf.push(if *b { 1 } else { 0 });
1282 }
1283 Value::Integer(i) => {
1284 buf.push(2);
1285 buf.extend_from_slice(&i.to_le_bytes());
1286 }
1287 Value::UnsignedInteger(u) => {
1288 buf.push(3);
1289 buf.extend_from_slice(&u.to_le_bytes());
1290 }
1291 Value::Float(f) => {
1292 buf.push(4);
1293 buf.extend_from_slice(&f.to_le_bytes());
1294 }
1295 Value::Text(s) => {
1296 const TEXT_COMPRESS_THRESHOLD: usize = 2048;
1300 if s.len() >= TEXT_COMPRESS_THRESHOLD {
1301 if let Ok(compressed) = zstd::bulk::compress(s.as_bytes(), 3) {
1302 if compressed.len() < s.len() {
1303 buf.push(0x85); write_varu32(buf, s.len() as u32); write_varu32(buf, compressed.len() as u32);
1306 buf.extend_from_slice(&compressed);
1307 return; }
1309 }
1310 }
1311 buf.push(5);
1312 write_varu32(buf, s.len() as u32);
1313 buf.extend_from_slice(s.as_bytes());
1314 }
1315 Value::Blob(bytes) => {
1316 const BLOB_COMPRESS_THRESHOLD: usize = 2048;
1319 if bytes.len() >= BLOB_COMPRESS_THRESHOLD {
1320 if let Ok(compressed) = zstd::bulk::compress(bytes.as_slice(), 3) {
1321 if compressed.len() < bytes.len() {
1322 buf.push(0x86); write_varu32(buf, bytes.len() as u32);
1324 write_varu32(buf, compressed.len() as u32);
1325 buf.extend_from_slice(&compressed);
1326 return;
1327 }
1328 }
1329 }
1330 buf.push(6);
1331 write_varu32(buf, bytes.len() as u32);
1332 buf.extend_from_slice(bytes);
1333 }
1334 Value::Timestamp(t) => {
1335 buf.push(7);
1336 buf.extend_from_slice(&t.to_le_bytes());
1337 }
1338 Value::Duration(d) => {
1339 buf.push(8);
1340 buf.extend_from_slice(&d.to_le_bytes());
1341 }
1342 Value::IpAddr(ip) => {
1343 buf.push(9);
1344 match ip {
1345 IpAddr::V4(v4) => {
1346 buf.push(4);
1347 buf.extend_from_slice(&v4.octets());
1348 }
1349 IpAddr::V6(v6) => {
1350 buf.push(6);
1351 buf.extend_from_slice(&v6.octets());
1352 }
1353 }
1354 }
1355 Value::MacAddr(mac) => {
1356 buf.push(10);
1357 buf.extend_from_slice(mac);
1358 }
1359 Value::Vector(vec) => {
1360 buf.push(11);
1361 write_varu32(buf, vec.len() as u32);
1362 for f in vec {
1363 buf.extend_from_slice(&f.to_le_bytes());
1364 }
1365 }
1366 Value::Json(bytes) => {
1367 buf.push(12);
1368 write_varu32(buf, bytes.len() as u32);
1369 buf.extend_from_slice(bytes);
1370 }
1371 Value::Uuid(uuid) => {
1372 buf.push(13);
1373 buf.extend_from_slice(uuid);
1374 }
1375 Value::NodeRef(s) => {
1376 buf.push(14);
1377 write_varu32(buf, s.len() as u32);
1378 buf.extend_from_slice(s.as_bytes());
1379 }
1380 Value::EdgeRef(s) => {
1381 buf.push(15);
1382 write_varu32(buf, s.len() as u32);
1383 buf.extend_from_slice(s.as_bytes());
1384 }
1385 Value::VectorRef(s, id) => {
1386 buf.push(16);
1387 write_varu32(buf, s.len() as u32);
1388 buf.extend_from_slice(s.as_bytes());
1389 buf.extend_from_slice(&id.to_le_bytes());
1390 }
1391 Value::RowRef(s, id) => {
1392 buf.push(17);
1393 write_varu32(buf, s.len() as u32);
1394 buf.extend_from_slice(s.as_bytes());
1395 buf.extend_from_slice(&id.to_le_bytes());
1396 }
1397 Value::Color(rgb) => {
1398 buf.push(18);
1399 buf.extend_from_slice(rgb);
1400 }
1401 Value::Email(s) => {
1402 buf.push(19);
1403 write_varu32(buf, s.len() as u32);
1404 buf.extend_from_slice(s.as_bytes());
1405 }
1406 Value::Url(s) => {
1407 buf.push(20);
1408 write_varu32(buf, s.len() as u32);
1409 buf.extend_from_slice(s.as_bytes());
1410 }
1411 Value::Phone(n) => {
1412 buf.push(21);
1413 buf.extend_from_slice(&n.to_le_bytes());
1414 }
1415 Value::Semver(v) => {
1416 buf.push(22);
1417 buf.extend_from_slice(&v.to_le_bytes());
1418 }
1419 Value::Cidr(ip, prefix) => {
1420 buf.push(23);
1421 buf.extend_from_slice(&ip.to_le_bytes());
1422 buf.push(*prefix);
1423 }
1424 Value::Date(d) => {
1425 buf.push(24);
1426 buf.extend_from_slice(&d.to_le_bytes());
1427 }
1428 Value::Time(t) => {
1429 buf.push(25);
1430 buf.extend_from_slice(&t.to_le_bytes());
1431 }
1432 Value::Decimal(v) => {
1433 buf.push(26);
1434 buf.extend_from_slice(&v.to_le_bytes());
1435 }
1436 Value::EnumValue(i) => {
1437 buf.push(27);
1438 buf.push(*i);
1439 }
1440 Value::Array(elems) => {
1441 buf.push(28);
1442 write_varu32(buf, elems.len() as u32);
1443 for elem in elems {
1444 Self::write_value_binary(buf, elem);
1445 }
1446 }
1447 Value::TimestampMs(v) => {
1448 buf.push(29);
1449 buf.extend_from_slice(&v.to_le_bytes());
1450 }
1451 Value::Ipv4(v) => {
1452 buf.push(30);
1453 buf.extend_from_slice(&v.to_le_bytes());
1454 }
1455 Value::Ipv6(bytes) => {
1456 buf.push(31);
1457 buf.extend_from_slice(bytes);
1458 }
1459 Value::Subnet(ip, mask) => {
1460 buf.push(32);
1461 buf.extend_from_slice(&ip.to_le_bytes());
1462 buf.extend_from_slice(&mask.to_le_bytes());
1463 }
1464 Value::Port(v) => {
1465 buf.push(33);
1466 buf.extend_from_slice(&v.to_le_bytes());
1467 }
1468 Value::Latitude(v) => {
1469 buf.push(34);
1470 buf.extend_from_slice(&v.to_le_bytes());
1471 }
1472 Value::Longitude(v) => {
1473 buf.push(35);
1474 buf.extend_from_slice(&v.to_le_bytes());
1475 }
1476 Value::GeoPoint(lat, lon) => {
1477 buf.push(36);
1478 buf.extend_from_slice(&lat.to_le_bytes());
1479 buf.extend_from_slice(&lon.to_le_bytes());
1480 }
1481 Value::Country2(c) => {
1482 buf.push(37);
1483 buf.extend_from_slice(c);
1484 }
1485 Value::Country3(c) => {
1486 buf.push(38);
1487 buf.extend_from_slice(c);
1488 }
1489 Value::Lang2(c) => {
1490 buf.push(39);
1491 buf.extend_from_slice(c);
1492 }
1493 Value::Lang5(c) => {
1494 buf.push(40);
1495 buf.extend_from_slice(c);
1496 }
1497 Value::Currency(c) => {
1498 buf.push(41);
1499 buf.extend_from_slice(c);
1500 }
1501 Value::AssetCode(code) => {
1502 buf.push(50);
1503 write_varu32(buf, code.len() as u32);
1504 buf.extend_from_slice(code.as_bytes());
1505 }
1506 Value::Money {
1507 asset_code,
1508 minor_units,
1509 scale,
1510 } => {
1511 buf.push(51);
1512 write_varu32(buf, asset_code.len() as u32);
1513 buf.extend_from_slice(asset_code.as_bytes());
1514 buf.push(*scale);
1515 buf.extend_from_slice(&minor_units.to_le_bytes());
1516 }
1517 Value::ColorAlpha(rgba) => {
1518 buf.push(42);
1519 buf.extend_from_slice(rgba);
1520 }
1521 Value::BigInt(v) => {
1522 buf.push(43);
1523 buf.extend_from_slice(&v.to_le_bytes());
1524 }
1525 Value::KeyRef(col, key) => {
1526 buf.push(44);
1527 write_varu32(buf, col.len() as u32);
1528 buf.extend_from_slice(col.as_bytes());
1529 write_varu32(buf, key.len() as u32);
1530 buf.extend_from_slice(key.as_bytes());
1531 }
1532 Value::DocRef(col, id) => {
1533 buf.push(45);
1534 write_varu32(buf, col.len() as u32);
1535 buf.extend_from_slice(col.as_bytes());
1536 buf.extend_from_slice(&id.to_le_bytes());
1537 }
1538 Value::TableRef(name) => {
1539 buf.push(46);
1540 write_varu32(buf, name.len() as u32);
1541 buf.extend_from_slice(name.as_bytes());
1542 }
1543 Value::PageRef(page_id) => {
1544 buf.push(47);
1545 buf.extend_from_slice(&page_id.to_le_bytes());
1546 }
1547 Value::Secret(bytes) => {
1548 buf.push(48);
1549 write_varu32(buf, bytes.len() as u32);
1550 buf.extend_from_slice(bytes);
1551 }
1552 Value::Password(hash) => {
1553 buf.push(49);
1554 write_varu32(buf, hash.len() as u32);
1555 buf.extend_from_slice(hash.as_bytes());
1556 }
1557 }
1558 }
1559}
1560
1561#[cfg(test)]
1562mod aux_metadata_dump_tests {
1563 use super::*;
1564
1565 #[test]
1566 fn aux_metadata_round_trips_through_binary_dump() {
1567 let store = UnifiedStore::with_config(UnifiedStoreConfig::default());
1568 store.create_collection("k").expect("create collection");
1569 store.set_aux_metadata(b"contract-blob".to_vec());
1570
1571 let bytes = store.to_binary_dump_bytes();
1572 let reloaded = UnifiedStore::load_from_bytes(&bytes).expect("reload dump");
1573
1574 assert_eq!(reloaded.aux_metadata(), b"contract-blob".to_vec());
1575 }
1576
1577 #[test]
1578 fn empty_aux_metadata_round_trips_as_empty() {
1579 let store = UnifiedStore::with_config(UnifiedStoreConfig::default());
1580 let bytes = store.to_binary_dump_bytes();
1581 let reloaded = UnifiedStore::load_from_bytes(&bytes).expect("reload dump");
1582
1583 assert!(reloaded.aux_metadata().is_empty());
1584 }
1585}