1use super::*;
2
3impl UnifiedStore {
4 pub fn new() -> Self {
5 Self::with_config(UnifiedStoreConfig::default())
6 }
7
8 pub fn format_version(&self) -> u32 {
10 self.format_version.load(Ordering::SeqCst)
11 }
12
13 pub(crate) fn set_format_version(&self, version: u32) {
14 self.format_version.store(version, Ordering::SeqCst);
15 }
16
17 pub fn next_entity_id(&self) -> EntityId {
19 EntityId::new(self.next_entity_id.fetch_add(1, Ordering::SeqCst))
20 }
21
22 pub fn reserve_entity_ids(&self, n: u64) -> std::ops::Range<u64> {
25 let start = self.next_entity_id.fetch_add(n, Ordering::SeqCst);
26 start..start + n
27 }
28
29 pub(crate) fn register_entity_id(&self, id: EntityId) {
30 let candidate = id.raw().saturating_add(1);
31 let mut current = self.next_entity_id.load(Ordering::SeqCst);
32 while candidate > current {
33 match self.next_entity_id.compare_exchange(
34 current,
35 candidate,
36 Ordering::SeqCst,
37 Ordering::SeqCst,
38 ) {
39 Ok(_) => break,
40 Err(updated) => current = updated,
41 }
42 }
43 }
44
45 pub fn load_from_file(path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
57 let file = File::open(path)?;
58 let mut reader = BufReader::new(file);
59 let mut buf = Vec::new();
60 reader.read_to_end(&mut buf)?;
61
62 if buf.len() < 8 {
64 return Err("File too small".into());
65 }
66 if &buf[0..4] != STORE_MAGIC {
67 return Err("Invalid magic bytes - expected RDST".into());
68 }
69 let mut pos = 4;
70
71 let version = u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]]);
73 pos += 4;
74 if version != STORE_VERSION_V1
75 && version != STORE_VERSION_V2
76 && version != STORE_VERSION_V3
77 && version != STORE_VERSION_V4
78 && version != STORE_VERSION_V5
79 && version != STORE_VERSION_V6
80 && version != STORE_VERSION_V7
81 {
82 return Err(format!("Unsupported version: {}", version).into());
83 }
84
85 if version >= STORE_VERSION_V3 {
87 if buf.len() < 12 {
88 return Err("File too small for CRC32 verification".into());
89 }
90 let stored_crc = u32::from_le_bytes([
91 buf[buf.len() - 4],
92 buf[buf.len() - 3],
93 buf[buf.len() - 2],
94 buf[buf.len() - 1],
95 ]);
96 let computed_crc = crate::storage::engine::crc32::crc32(&buf[..buf.len() - 4]);
97 if stored_crc != computed_crc {
98 return Err("Binary store CRC32 mismatch — file corrupted".into());
99 }
100 buf.truncate(buf.len() - 4);
102 }
103
104 let store = Self::with_config(UnifiedStoreConfig::default());
105 store.set_format_version(version);
106
107 let collection_count = read_varu32(&buf, &mut pos)
109 .map_err(|e| format!("Failed to read collection count: {:?}", e))?;
110
111 for _ in 0..collection_count {
113 let name_len = read_varu32(&buf, &mut pos)
115 .map_err(|e| format!("Failed to read name length: {:?}", e))?
116 as usize;
117 let name = String::from_utf8(buf[pos..pos + name_len].to_vec())
118 .map_err(|e| format!("Invalid UTF-8 in collection name: {}", e))?;
119 pos += name_len;
120
121 let entity_count = read_varu32(&buf, &mut pos)
123 .map_err(|e| format!("Failed to read entity count: {:?}", e))?;
124
125 for _ in 0..entity_count {
127 if version >= STORE_VERSION_V7 {
128 if pos + 4 > buf.len() {
130 return Err("Truncated entity record length".into());
131 }
132 let record_len =
133 u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]])
134 as usize;
135 pos += 4;
136 if pos + record_len > buf.len() {
137 return Err("Truncated entity record payload".into());
138 }
139 let record_bytes = &buf[pos..pos + record_len];
140 pos += record_len;
141
142 let (entity, metadata) = Self::deserialize_entity_record(record_bytes, version)
143 .map_err(|e| format!("Entity record deserialization error: {e}"))?;
144
145 store.insert_auto(&name, entity.clone())?;
146
147 if let Some(metadata) = metadata {
148 if let Some(manager) = store.get_collection(&name) {
149 let _ = manager.set_metadata(entity.id, metadata);
150 }
151 }
152 } else {
153 let entity = Self::read_entity_binary(&buf, &mut pos, version)?;
155 store.insert_auto(&name, entity)?;
156 }
157 }
158 }
159
160 if pos < buf.len() {
161 let cross_ref_count = read_varu32(&buf, &mut pos)
163 .map_err(|e| format!("Failed to read cross-ref count: {:?}", e))?;
164
165 for _ in 0..cross_ref_count {
166 let source_id = read_varu64(&buf, &mut pos)
167 .map_err(|e| format!("Failed to read source_id: {:?}", e))?;
168 let target_id = read_varu64(&buf, &mut pos)
169 .map_err(|e| format!("Failed to read target_id: {:?}", e))?;
170 let ref_type_byte = buf[pos];
171 pos += 1;
172 let ref_type = RefType::from_byte(ref_type_byte);
173
174 let coll_len = read_varu32(&buf, &mut pos)
175 .map_err(|e| format!("Failed to read collection length: {:?}", e))?
176 as usize;
177 let collection = String::from_utf8(buf[pos..pos + coll_len].to_vec())
178 .map_err(|e| format!("Invalid UTF-8 in collection: {}", e))?;
179 pos += coll_len;
180
181 let source_collection = store
182 .get_any(EntityId::new(source_id))
183 .map(|(name, _)| name)
184 .unwrap_or_else(|| collection.clone());
185 let _ = store.add_cross_ref(
186 &source_collection,
187 EntityId::new(source_id),
188 &collection,
189 EntityId::new(target_id),
190 ref_type,
191 1.0,
192 );
193 }
194 }
195
196 if store.format_version() < STORE_VERSION_V7 {
197 store.set_format_version(STORE_VERSION_V7);
198 }
199
200 Ok(store)
201 }
202
203 pub fn save_to_file(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
208 let tmp_path = path.with_extension("rdb-tmp");
210 let file = File::create(&tmp_path)?;
211 let mut writer = BufWriter::new(file);
212 let mut buf = Vec::new();
213
214 buf.extend_from_slice(STORE_MAGIC);
216
217 buf.extend_from_slice(&STORE_VERSION_V7.to_le_bytes());
220
221 let collections = self.collections.read();
223 write_varu32(&mut buf, collections.len() as u32);
224
225 let fv = STORE_VERSION_V7;
226 for (name, manager) in collections.iter() {
227 write_varu32(&mut buf, name.len() as u32);
229 buf.extend_from_slice(name.as_bytes());
230
231 let entities = manager.query_all(|_| true);
233 write_varu32(&mut buf, entities.len() as u32);
234
235 for entity in entities {
238 let metadata = manager.get_metadata(entity.id);
239 let record = Self::serialize_entity_record(&entity, metadata.as_ref(), fv);
240 buf.extend_from_slice(&(record.len() as u32).to_le_bytes());
241 buf.extend_from_slice(&record);
242 }
243 }
244
245 let cross_refs = self.cross_refs.read();
247 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
248 write_varu32(&mut buf, total_refs as u32);
249
250 for (source_id, refs) in cross_refs.iter() {
251 for (target_id, ref_type, collection) in refs {
252 write_varu64(&mut buf, source_id.raw());
253 write_varu64(&mut buf, target_id.raw());
254 buf.push(ref_type.to_byte());
255 write_varu32(&mut buf, collection.len() as u32);
256 buf.extend_from_slice(collection.as_bytes());
257 }
258 }
259
260 self.set_format_version(STORE_VERSION_V7);
261
262 let checksum = crate::storage::engine::crc32::crc32(&buf);
264 buf.extend_from_slice(&checksum.to_le_bytes());
265
266 writer.write_all(&buf)?;
267 writer.flush()?;
268 writer.get_ref().sync_all()?;
269 drop(writer);
270
271 std::fs::rename(&tmp_path, path)?;
273
274 if let Some(parent) = path.parent() {
276 if let Ok(dir) = File::open(parent) {
277 let _ = dir.sync_all();
278 }
279 }
280
281 Ok(())
282 }
283
284 pub(crate) fn read_entity_binary(
286 buf: &[u8],
287 pos: &mut usize,
288 format_version: u32,
289 ) -> Result<UnifiedEntity, Box<dyn std::error::Error>> {
290 let id = read_varu64(buf, pos).map_err(|e| format!("Failed to read entity id: {:?}", e))?;
292
293 let kind_type = buf[*pos];
295 *pos += 1;
296
297 let kind = match kind_type {
299 0 => {
300 let table_len = Self::read_varu32_safe(buf, pos)?;
302 let table = String::from_utf8(buf[*pos..*pos + table_len].to_vec())?;
303 *pos += table_len;
304 let row_id = Self::read_varu64_safe(buf, pos)?;
305 EntityKind::TableRow {
306 table: table.into(),
307 row_id,
308 }
309 }
310 1 => {
311 let label_len = Self::read_varu32_safe(buf, pos)?;
313 let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
314 *pos += label_len;
315 let node_type_len = Self::read_varu32_safe(buf, pos)?;
316 let node_type = String::from_utf8(buf[*pos..*pos + node_type_len].to_vec())?;
317 *pos += node_type_len;
318 EntityKind::GraphNode(Box::new(GraphNodeKind { label, node_type }))
319 }
320 2 => {
321 let label_len = Self::read_varu32_safe(buf, pos)?;
323 let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
324 *pos += label_len;
325 let from_node_len = Self::read_varu32_safe(buf, pos)?;
326 let from_node = String::from_utf8(buf[*pos..*pos + from_node_len].to_vec())?;
327 *pos += from_node_len;
328 let to_node_len = Self::read_varu32_safe(buf, pos)?;
329 let to_node = String::from_utf8(buf[*pos..*pos + to_node_len].to_vec())?;
330 *pos += to_node_len;
331 let weight =
332 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
333 *pos += 4;
334 EntityKind::GraphEdge(Box::new(GraphEdgeKind {
335 label,
336 from_node,
337 to_node,
338 weight,
339 }))
340 }
341 3 => {
342 let collection_len = Self::read_varu32_safe(buf, pos)?;
344 let collection = String::from_utf8(buf[*pos..*pos + collection_len].to_vec())?;
345 *pos += collection_len;
346 EntityKind::Vector { collection }
347 }
348 4 => {
349 let series_len = Self::read_varu32_safe(buf, pos)?;
351 let series = String::from_utf8(buf[*pos..*pos + series_len].to_vec())?;
352 *pos += series_len;
353 let metric_len = Self::read_varu32_safe(buf, pos)?;
354 let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
355 *pos += metric_len;
356 EntityKind::TimeSeriesPoint(Box::new(TimeSeriesPointKind { series, metric }))
357 }
358 5 => {
359 let queue_len = Self::read_varu32_safe(buf, pos)?;
361 let queue = String::from_utf8(buf[*pos..*pos + queue_len].to_vec())?;
362 *pos += queue_len;
363 let position = Self::read_varu64_safe(buf, pos)?;
364 EntityKind::QueueMessage { queue, position }
365 }
366 _ => return Err(format!("Unknown EntityKind type: {}", kind_type).into()),
367 };
368
369 let data_type = buf[*pos];
371 *pos += 1;
372
373 let mut data = match data_type {
375 0 => {
376 let col_count = Self::read_varu32_safe(buf, pos)?;
378 let mut columns = Vec::with_capacity(col_count);
379 for _ in 0..col_count {
380 columns.push(Self::read_value_binary(buf, pos)?);
381 }
382 EntityData::Row(RowData::new(columns))
383 }
384 1 => {
385 let prop_count = Self::read_varu32_safe(buf, pos)?;
387 let mut properties = HashMap::new();
388 for _ in 0..prop_count {
389 let key_len = Self::read_varu32_safe(buf, pos)?;
390 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
391 *pos += key_len;
392 let value = Self::read_value_binary(buf, pos)?;
393 properties.insert(key, value);
394 }
395 EntityData::Node(NodeData::with_properties(properties))
396 }
397 2 => {
398 let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
400 *pos += 4;
401 let weight = f32::from_le_bytes(weight_bytes);
402 let prop_count = Self::read_varu32_safe(buf, pos)?;
403 let mut properties = HashMap::new();
404 for _ in 0..prop_count {
405 let key_len = Self::read_varu32_safe(buf, pos)?;
406 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
407 *pos += key_len;
408 let value = Self::read_value_binary(buf, pos)?;
409 properties.insert(key, value);
410 }
411 let mut edge = EdgeData::new(weight);
412 edge.properties = properties;
413 EntityData::Edge(edge)
414 }
415 3 => {
416 let dim = Self::read_varu32_safe(buf, pos)?;
418 let mut dense = Vec::with_capacity(dim);
419 for _ in 0..dim {
420 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
421 *pos += 4;
422 dense.push(f32::from_le_bytes(bytes));
423 }
424 let mut vector = VectorData::new(dense);
425 if format_version >= STORE_VERSION_V6 {
426 let has_content = buf[*pos] != 0;
427 *pos += 1;
428 if has_content {
429 let content_len = Self::read_varu32_safe(buf, pos)?;
430 vector.content =
431 Some(String::from_utf8(buf[*pos..*pos + content_len].to_vec())?);
432 *pos += content_len;
433 }
434 }
435 EntityData::Vector(vector)
436 }
437 4 => {
438 let metric_len = Self::read_varu32_safe(buf, pos)?;
440 let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
441 *pos += metric_len;
442 let timestamp_ns = Self::read_varu64_safe(buf, pos)?;
443 let value_bytes = [
444 buf[*pos],
445 buf[*pos + 1],
446 buf[*pos + 2],
447 buf[*pos + 3],
448 buf[*pos + 4],
449 buf[*pos + 5],
450 buf[*pos + 6],
451 buf[*pos + 7],
452 ];
453 *pos += 8;
454 let mut tags = HashMap::new();
455 if format_version >= STORE_VERSION_V5 {
456 let tag_count = Self::read_varu32_safe(buf, pos)?;
457 tags = HashMap::with_capacity(tag_count);
458 for _ in 0..tag_count {
459 let key_len = Self::read_varu32_safe(buf, pos)?;
460 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
461 *pos += key_len;
462 let value_len = Self::read_varu32_safe(buf, pos)?;
463 let value = String::from_utf8(buf[*pos..*pos + value_len].to_vec())?;
464 *pos += value_len;
465 tags.insert(key, value);
466 }
467 }
468 EntityData::TimeSeries(crate::storage::unified::entity::TimeSeriesData {
469 metric,
470 timestamp_ns,
471 value: f64::from_le_bytes(value_bytes),
472 tags,
473 })
474 }
475 5 => {
476 let payload = Self::read_value_binary(buf, pos)?;
478 let enqueued_at_ns = Self::read_varu64_safe(buf, pos)?;
479 let attempts = Self::read_varu32_safe(buf, pos)? as u32;
480 EntityData::QueueMessage(crate::storage::unified::entity::QueueMessageData {
481 payload,
482 priority: None,
483 enqueued_at_ns,
484 attempts,
485 max_attempts: 3,
486 acked: false,
487 })
488 }
489 6 => {
490 let field_count = Self::read_varu32_safe(buf, pos)?;
492 let mut named = HashMap::with_capacity(field_count);
493 for _ in 0..field_count {
494 let key_len = Self::read_varu32_safe(buf, pos)?;
495 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
496 *pos += key_len;
497 let value = Self::read_value_binary(buf, pos)?;
498 named.insert(key, value);
499 }
500 EntityData::Row(RowData {
501 columns: Vec::new(),
502 named: Some(named),
503 schema: None,
504 })
505 }
506 _ => return Err(format!("Unknown EntityData type: {}", data_type).into()),
507 };
508
509 let created_at = Self::read_varu64_safe(buf, pos)?;
511 let updated_at = Self::read_varu64_safe(buf, pos)?;
512
513 let embedding_count = Self::read_varu32_safe(buf, pos)?;
515 let mut embeddings = Vec::with_capacity(embedding_count);
516 for _ in 0..embedding_count {
517 let name_len = Self::read_varu32_safe(buf, pos)?;
518 let name = String::from_utf8(buf[*pos..*pos + name_len].to_vec())?;
519 *pos += name_len;
520
521 let dim = Self::read_varu32_safe(buf, pos)?;
522 let mut vector = Vec::with_capacity(dim);
523 for _ in 0..dim {
524 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
525 *pos += 4;
526 vector.push(f32::from_le_bytes(bytes));
527 }
528
529 let model_len = Self::read_varu32_safe(buf, pos)?;
530 let model = String::from_utf8(buf[*pos..*pos + model_len].to_vec())?;
531 *pos += model_len;
532
533 embeddings.push(EmbeddingSlot::new(name, vector, model));
534 }
535
536 let cross_ref_count = Self::read_varu32_safe(buf, pos)?;
538 let mut cross_refs = Vec::with_capacity(cross_ref_count);
539 for _ in 0..cross_ref_count {
540 let source = Self::read_varu64_safe(buf, pos)?;
541 let target = Self::read_varu64_safe(buf, pos)?;
542 let ref_type_byte = buf[*pos];
543 *pos += 1;
544 let (target_collection, weight, created_at) = if format_version >= STORE_VERSION_V2 {
545 let coll_len = Self::read_varu32_safe(buf, pos)?;
546 let collection = String::from_utf8(buf[*pos..*pos + coll_len].to_vec())?;
547 *pos += coll_len;
548 let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
549 *pos += 4;
550 let weight = f32::from_le_bytes(weight_bytes);
551 let created_at = Self::read_varu64_safe(buf, pos)?;
552 (collection, weight, created_at)
553 } else {
554 (String::new(), 1.0, 0)
555 };
556
557 let mut cross_ref = CrossRef::new(
558 EntityId::new(source),
559 EntityId::new(target),
560 target_collection,
561 RefType::from_byte(ref_type_byte),
562 );
563 cross_ref.weight = weight;
564 cross_ref.created_at = created_at;
565 cross_refs.push(cross_ref);
566 }
567
568 let sequence_id = Self::read_varu64_safe(buf, pos)?;
570
571 if format_version >= STORE_VERSION_V4 {
572 if let EntityData::QueueMessage(message) = &mut data {
573 if *pos < buf.len() {
574 let priority_present = buf[*pos] != 0;
575 *pos += 1;
576 message.priority = if priority_present {
577 if *pos + 4 > buf.len() {
578 return Err("truncated queue priority".into());
579 }
580 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
581 *pos += 4;
582 Some(i32::from_le_bytes(bytes))
583 } else {
584 None
585 };
586 message.max_attempts = Self::read_varu32_safe(buf, pos)? as u32;
587 if *pos >= buf.len() {
588 return Err("truncated queue ack flag".into());
589 }
590 message.acked = buf[*pos] != 0;
591 *pos += 1;
592 }
593 }
594 }
595
596 let mut entity = UnifiedEntity::new(EntityId::new(id), kind, data);
597 entity.created_at = created_at;
598 entity.updated_at = updated_at;
599 entity.sequence_id = sequence_id;
600 if !embeddings.is_empty() || !cross_refs.is_empty() {
601 entity.embeddings_mut().extend(embeddings);
602 entity.cross_refs_mut().extend(cross_refs);
603 }
604
605 Ok(entity)
606 }
607
608 fn read_varu32_safe(buf: &[u8], pos: &mut usize) -> Result<usize, Box<dyn std::error::Error>> {
610 read_varu32(buf, pos)
611 .map(|v| v as usize)
612 .map_err(|e| format!("Decode error: {:?}", e).into())
613 }
614
615 fn read_varu64_safe(buf: &[u8], pos: &mut usize) -> Result<u64, Box<dyn std::error::Error>> {
617 read_varu64(buf, pos).map_err(|e| format!("Decode error: {:?}", e).into())
618 }
619
620 pub(crate) fn write_entity_binary(
622 buf: &mut Vec<u8>,
623 entity: &UnifiedEntity,
624 format_version: u32,
625 ) {
626 write_varu64(buf, entity.id.raw());
628
629 match &entity.kind {
631 EntityKind::TableRow { table, row_id } => {
632 buf.push(0);
633 write_varu32(buf, table.len() as u32);
634 buf.extend_from_slice(table.as_bytes());
635 write_varu64(buf, *row_id);
636 }
637 EntityKind::GraphNode(ref node) => {
638 buf.push(1);
639 write_varu32(buf, node.label.len() as u32);
640 buf.extend_from_slice(node.label.as_bytes());
641 write_varu32(buf, node.node_type.len() as u32);
642 buf.extend_from_slice(node.node_type.as_bytes());
643 }
644 EntityKind::GraphEdge(ref edge) => {
645 buf.push(2);
646 write_varu32(buf, edge.label.len() as u32);
647 buf.extend_from_slice(edge.label.as_bytes());
648 write_varu32(buf, edge.from_node.len() as u32);
649 buf.extend_from_slice(edge.from_node.as_bytes());
650 write_varu32(buf, edge.to_node.len() as u32);
651 buf.extend_from_slice(edge.to_node.as_bytes());
652 buf.extend_from_slice(&edge.weight.to_le_bytes());
653 }
654 EntityKind::Vector { collection } => {
655 buf.push(3);
656 write_varu32(buf, collection.len() as u32);
657 buf.extend_from_slice(collection.as_bytes());
658 }
659 EntityKind::TimeSeriesPoint(ref ts) => {
660 buf.push(4);
661 write_varu32(buf, ts.series.len() as u32);
662 buf.extend_from_slice(ts.series.as_bytes());
663 write_varu32(buf, ts.metric.len() as u32);
664 buf.extend_from_slice(ts.metric.as_bytes());
665 }
666 EntityKind::QueueMessage { queue, position } => {
667 buf.push(5);
668 write_varu32(buf, queue.len() as u32);
669 buf.extend_from_slice(queue.as_bytes());
670 write_varu64(buf, *position);
671 }
672 }
673
674 match &entity.data {
676 EntityData::Row(row) => {
677 if let Some(ref named) = row.named {
678 buf.push(6);
685 write_varu32(buf, named.len() as u32);
686 let mut entries: Vec<_> = named.iter().collect();
687 entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
688 for (key, value) in entries {
689 write_varu32(buf, key.len() as u32);
690 buf.extend_from_slice(key.as_bytes());
691 Self::write_value_binary(buf, value);
692 }
693 } else {
694 buf.push(0);
695 write_varu32(buf, row.columns.len() as u32);
696 for col in &row.columns {
697 Self::write_value_binary(buf, col);
698 }
699 }
700 }
701 EntityData::Node(node) => {
702 buf.push(1);
703 write_varu32(buf, node.properties.len() as u32);
704 let mut entries: Vec<_> = node.properties.iter().collect();
705 entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
706 for (key, value) in entries {
707 write_varu32(buf, key.len() as u32);
708 buf.extend_from_slice(key.as_bytes());
709 Self::write_value_binary(buf, value);
710 }
711 }
712 EntityData::Edge(edge) => {
713 buf.push(2);
714 buf.extend_from_slice(&edge.weight.to_le_bytes());
715 write_varu32(buf, edge.properties.len() as u32);
716 let mut entries: Vec<_> = edge.properties.iter().collect();
717 entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
718 for (key, value) in entries {
719 write_varu32(buf, key.len() as u32);
720 buf.extend_from_slice(key.as_bytes());
721 Self::write_value_binary(buf, value);
722 }
723 }
724 EntityData::Vector(vec) => {
725 buf.push(3);
726 write_varu32(buf, vec.dense.len() as u32);
727 for f in &vec.dense {
728 buf.extend_from_slice(&f.to_le_bytes());
729 }
730 if format_version >= STORE_VERSION_V6 {
731 buf.push(u8::from(vec.content.is_some()));
732 if let Some(content) = &vec.content {
733 write_varu32(buf, content.len() as u32);
734 buf.extend_from_slice(content.as_bytes());
735 }
736 }
737 }
738 EntityData::TimeSeries(ts) => {
739 buf.push(4);
740 write_varu32(buf, ts.metric.len() as u32);
741 buf.extend_from_slice(ts.metric.as_bytes());
742 write_varu64(buf, ts.timestamp_ns);
743 buf.extend_from_slice(&ts.value.to_le_bytes());
744 if format_version >= STORE_VERSION_V5 {
745 write_varu32(buf, ts.tags.len() as u32);
746 let mut tag_entries: Vec<_> = ts.tags.iter().collect();
747 tag_entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
748 for (key, value) in tag_entries {
749 write_varu32(buf, key.len() as u32);
750 buf.extend_from_slice(key.as_bytes());
751 write_varu32(buf, value.len() as u32);
752 buf.extend_from_slice(value.as_bytes());
753 }
754 }
755 }
756 EntityData::QueueMessage(msg) => {
757 buf.push(5);
758 Self::write_value_binary(buf, &msg.payload);
759 write_varu64(buf, msg.enqueued_at_ns);
760 write_varu32(buf, msg.attempts);
761 }
762 }
763
764 write_varu64(buf, entity.created_at);
766 write_varu64(buf, entity.updated_at);
767
768 write_varu32(buf, entity.embeddings().len() as u32);
770 for emb in entity.embeddings() {
771 write_varu32(buf, emb.name.len() as u32);
772 buf.extend_from_slice(emb.name.as_bytes());
773 write_varu32(buf, emb.vector.len() as u32);
774 for f in &emb.vector {
775 buf.extend_from_slice(&f.to_le_bytes());
776 }
777 write_varu32(buf, emb.model.len() as u32);
778 buf.extend_from_slice(emb.model.as_bytes());
779 }
780
781 write_varu32(buf, entity.cross_refs().len() as u32);
783 for cross_ref in entity.cross_refs() {
784 write_varu64(buf, cross_ref.source.raw());
785 write_varu64(buf, cross_ref.target.raw());
786 buf.push(cross_ref.ref_type.to_byte());
787 if format_version >= STORE_VERSION_V2 {
788 write_varu32(buf, cross_ref.target_collection.len() as u32);
789 buf.extend_from_slice(cross_ref.target_collection.as_bytes());
790 buf.extend_from_slice(&cross_ref.weight.to_le_bytes());
791 write_varu64(buf, cross_ref.created_at);
792 }
793 }
794
795 write_varu64(buf, entity.sequence_id);
797
798 if format_version >= STORE_VERSION_V4 {
799 if let EntityData::QueueMessage(message) = &entity.data {
800 buf.push(u8::from(message.priority.is_some()));
801 if let Some(priority) = message.priority {
802 buf.extend_from_slice(&priority.to_le_bytes());
803 }
804 write_varu32(buf, message.max_attempts);
805 buf.push(u8::from(message.acked));
806 }
807 }
808 }
809
810 fn read_value_binary(buf: &[u8], pos: &mut usize) -> Result<Value, Box<dyn std::error::Error>> {
815 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
816
817 let type_byte = buf[*pos];
818 *pos += 1;
819
820 Ok(match type_byte {
821 0 => Value::Null,
822 1 => {
823 let b = buf[*pos] != 0;
824 *pos += 1;
825 Value::Boolean(b)
826 }
827 2 => {
828 let val = i64::from_le_bytes([
829 buf[*pos],
830 buf[*pos + 1],
831 buf[*pos + 2],
832 buf[*pos + 3],
833 buf[*pos + 4],
834 buf[*pos + 5],
835 buf[*pos + 6],
836 buf[*pos + 7],
837 ]);
838 *pos += 8;
839 Value::Integer(val)
840 }
841 3 => {
842 let val = u64::from_le_bytes([
843 buf[*pos],
844 buf[*pos + 1],
845 buf[*pos + 2],
846 buf[*pos + 3],
847 buf[*pos + 4],
848 buf[*pos + 5],
849 buf[*pos + 6],
850 buf[*pos + 7],
851 ]);
852 *pos += 8;
853 Value::UnsignedInteger(val)
854 }
855 4 => {
856 let val = f64::from_le_bytes([
857 buf[*pos],
858 buf[*pos + 1],
859 buf[*pos + 2],
860 buf[*pos + 3],
861 buf[*pos + 4],
862 buf[*pos + 5],
863 buf[*pos + 6],
864 buf[*pos + 7],
865 ]);
866 *pos += 8;
867 Value::Float(val)
868 }
869 5 => {
870 let len = Self::read_varu32_safe(buf, pos)?;
871 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
872 *pos += len;
873 Value::text(s)
874 }
875 6 => {
876 let len = Self::read_varu32_safe(buf, pos)?;
877 let bytes = buf[*pos..*pos + len].to_vec();
878 *pos += len;
879 Value::Blob(bytes)
880 }
881 7 => {
882 let val = i64::from_le_bytes([
883 buf[*pos],
884 buf[*pos + 1],
885 buf[*pos + 2],
886 buf[*pos + 3],
887 buf[*pos + 4],
888 buf[*pos + 5],
889 buf[*pos + 6],
890 buf[*pos + 7],
891 ]);
892 *pos += 8;
893 Value::Timestamp(val)
894 }
895 8 => {
896 let val = i64::from_le_bytes([
897 buf[*pos],
898 buf[*pos + 1],
899 buf[*pos + 2],
900 buf[*pos + 3],
901 buf[*pos + 4],
902 buf[*pos + 5],
903 buf[*pos + 6],
904 buf[*pos + 7],
905 ]);
906 *pos += 8;
907 Value::Duration(val)
908 }
909 9 => {
910 let version = buf[*pos];
912 *pos += 1;
913 if version == 4 {
914 let octets = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
915 *pos += 4;
916 Value::IpAddr(IpAddr::V4(Ipv4Addr::from(octets)))
917 } else {
918 let mut octets = [0u8; 16];
919 octets.copy_from_slice(&buf[*pos..*pos + 16]);
920 *pos += 16;
921 Value::IpAddr(IpAddr::V6(Ipv6Addr::from(octets)))
922 }
923 }
924 10 => {
925 let mut mac = [0u8; 6];
926 mac.copy_from_slice(&buf[*pos..*pos + 6]);
927 *pos += 6;
928 Value::MacAddr(mac)
929 }
930 11 => {
931 let len = Self::read_varu32_safe(buf, pos)?;
932 let mut vector = Vec::with_capacity(len);
933 for _ in 0..len {
934 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
935 *pos += 4;
936 vector.push(f32::from_le_bytes(bytes));
937 }
938 Value::Vector(vector)
939 }
940 12 => {
941 let len = Self::read_varu32_safe(buf, pos)?;
942 let bytes = buf[*pos..*pos + len].to_vec();
943 *pos += len;
944 Value::Json(bytes)
945 }
946 13 => {
947 let mut uuid = [0u8; 16];
948 uuid.copy_from_slice(&buf[*pos..*pos + 16]);
949 *pos += 16;
950 Value::Uuid(uuid)
951 }
952 14 => {
953 let len = Self::read_varu32_safe(buf, pos)?;
954 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
955 *pos += len;
956 Value::NodeRef(s)
957 }
958 15 => {
959 let len = Self::read_varu32_safe(buf, pos)?;
960 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
961 *pos += len;
962 Value::EdgeRef(s)
963 }
964 16 => {
965 let len = Self::read_varu32_safe(buf, pos)?;
966 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
967 *pos += len;
968 let id = u64::from_le_bytes([
969 buf[*pos],
970 buf[*pos + 1],
971 buf[*pos + 2],
972 buf[*pos + 3],
973 buf[*pos + 4],
974 buf[*pos + 5],
975 buf[*pos + 6],
976 buf[*pos + 7],
977 ]);
978 *pos += 8;
979 Value::VectorRef(s, id)
980 }
981 17 => {
982 let len = Self::read_varu32_safe(buf, pos)?;
983 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
984 *pos += len;
985 let id = u64::from_le_bytes([
986 buf[*pos],
987 buf[*pos + 1],
988 buf[*pos + 2],
989 buf[*pos + 3],
990 buf[*pos + 4],
991 buf[*pos + 5],
992 buf[*pos + 6],
993 buf[*pos + 7],
994 ]);
995 *pos += 8;
996 Value::RowRef(s, id)
997 }
998 18 => {
999 let rgb = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1000 *pos += 3;
1001 Value::Color(rgb)
1002 }
1003 19 => {
1004 let len = Self::read_varu32_safe(buf, pos)?;
1005 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1006 *pos += len;
1007 Value::Email(s)
1008 }
1009 20 => {
1010 let len = Self::read_varu32_safe(buf, pos)?;
1011 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1012 *pos += len;
1013 Value::Url(s)
1014 }
1015 21 => {
1016 let val = u64::from_le_bytes([
1017 buf[*pos],
1018 buf[*pos + 1],
1019 buf[*pos + 2],
1020 buf[*pos + 3],
1021 buf[*pos + 4],
1022 buf[*pos + 5],
1023 buf[*pos + 6],
1024 buf[*pos + 7],
1025 ]);
1026 *pos += 8;
1027 Value::Phone(val)
1028 }
1029 22 => {
1030 let val =
1031 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1032 *pos += 4;
1033 Value::Semver(val)
1034 }
1035 23 => {
1036 let ip =
1037 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1038 *pos += 4;
1039 let prefix = buf[*pos];
1040 *pos += 1;
1041 Value::Cidr(ip, prefix)
1042 }
1043 24 => {
1044 let val =
1045 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1046 *pos += 4;
1047 Value::Date(val)
1048 }
1049 25 => {
1050 let val =
1051 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1052 *pos += 4;
1053 Value::Time(val)
1054 }
1055 26 => {
1056 let val = i64::from_le_bytes([
1057 buf[*pos],
1058 buf[*pos + 1],
1059 buf[*pos + 2],
1060 buf[*pos + 3],
1061 buf[*pos + 4],
1062 buf[*pos + 5],
1063 buf[*pos + 6],
1064 buf[*pos + 7],
1065 ]);
1066 *pos += 8;
1067 Value::Decimal(val)
1068 }
1069 27 => {
1070 let val = buf[*pos];
1071 *pos += 1;
1072 Value::EnumValue(val)
1073 }
1074 28 => {
1075 let len = Self::read_varu32_safe(buf, pos)?;
1076 let mut elems = Vec::with_capacity(len);
1077 for _ in 0..len {
1078 elems.push(Self::read_value_binary(buf, pos)?);
1079 }
1080 Value::Array(elems)
1081 }
1082 29 => {
1083 let val = i64::from_le_bytes([
1084 buf[*pos],
1085 buf[*pos + 1],
1086 buf[*pos + 2],
1087 buf[*pos + 3],
1088 buf[*pos + 4],
1089 buf[*pos + 5],
1090 buf[*pos + 6],
1091 buf[*pos + 7],
1092 ]);
1093 *pos += 8;
1094 Value::TimestampMs(val)
1095 }
1096 30 => {
1097 let val =
1098 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1099 *pos += 4;
1100 Value::Ipv4(val)
1101 }
1102 31 => {
1103 let mut bytes = [0u8; 16];
1104 bytes.copy_from_slice(&buf[*pos..*pos + 16]);
1105 *pos += 16;
1106 Value::Ipv6(bytes)
1107 }
1108 32 => {
1109 let ip =
1110 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1111 *pos += 4;
1112 let mask =
1113 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1114 *pos += 4;
1115 Value::Subnet(ip, mask)
1116 }
1117 33 => {
1118 let val = u16::from_le_bytes([buf[*pos], buf[*pos + 1]]);
1119 *pos += 2;
1120 Value::Port(val)
1121 }
1122 34 => {
1123 let val =
1124 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1125 *pos += 4;
1126 Value::Latitude(val)
1127 }
1128 35 => {
1129 let val =
1130 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1131 *pos += 4;
1132 Value::Longitude(val)
1133 }
1134 36 => {
1135 let lat =
1136 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1137 *pos += 4;
1138 let lon =
1139 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1140 *pos += 4;
1141 Value::GeoPoint(lat, lon)
1142 }
1143 37 => {
1144 let c = [buf[*pos], buf[*pos + 1]];
1145 *pos += 2;
1146 Value::Country2(c)
1147 }
1148 38 => {
1149 let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1150 *pos += 3;
1151 Value::Country3(c)
1152 }
1153 39 => {
1154 let c = [buf[*pos], buf[*pos + 1]];
1155 *pos += 2;
1156 Value::Lang2(c)
1157 }
1158 40 => {
1159 let c = [
1160 buf[*pos],
1161 buf[*pos + 1],
1162 buf[*pos + 2],
1163 buf[*pos + 3],
1164 buf[*pos + 4],
1165 ];
1166 *pos += 5;
1167 Value::Lang5(c)
1168 }
1169 41 => {
1170 let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1171 *pos += 3;
1172 Value::Currency(c)
1173 }
1174 42 => {
1175 let rgba = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
1176 *pos += 4;
1177 Value::ColorAlpha(rgba)
1178 }
1179 43 => {
1180 let val = i64::from_le_bytes([
1181 buf[*pos],
1182 buf[*pos + 1],
1183 buf[*pos + 2],
1184 buf[*pos + 3],
1185 buf[*pos + 4],
1186 buf[*pos + 5],
1187 buf[*pos + 6],
1188 buf[*pos + 7],
1189 ]);
1190 *pos += 8;
1191 Value::BigInt(val)
1192 }
1193 44 => {
1194 let col_len = Self::read_varu32_safe(buf, pos)?;
1195 let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1196 *pos += col_len;
1197 let key_len = Self::read_varu32_safe(buf, pos)?;
1198 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
1199 *pos += key_len;
1200 Value::KeyRef(col, key)
1201 }
1202 45 => {
1203 let col_len = Self::read_varu32_safe(buf, pos)?;
1204 let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1205 *pos += col_len;
1206 let id = u64::from_le_bytes([
1207 buf[*pos],
1208 buf[*pos + 1],
1209 buf[*pos + 2],
1210 buf[*pos + 3],
1211 buf[*pos + 4],
1212 buf[*pos + 5],
1213 buf[*pos + 6],
1214 buf[*pos + 7],
1215 ]);
1216 *pos += 8;
1217 Value::DocRef(col, id)
1218 }
1219 46 => {
1220 let len = Self::read_varu32_safe(buf, pos)?;
1221 let name = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1222 *pos += len;
1223 Value::TableRef(name)
1224 }
1225 47 => {
1226 let val =
1227 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1228 *pos += 4;
1229 Value::PageRef(val)
1230 }
1231 48 => {
1232 let len = Self::read_varu32_safe(buf, pos)?;
1233 let bytes = buf[*pos..*pos + len].to_vec();
1234 *pos += len;
1235 Value::Secret(bytes)
1236 }
1237 49 => {
1238 let len = Self::read_varu32_safe(buf, pos)?;
1239 let hash = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1240 *pos += len;
1241 Value::Password(hash)
1242 }
1243 50 => {
1244 let len = Self::read_varu32_safe(buf, pos)?;
1245 let code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1246 *pos += len;
1247 Value::AssetCode(code)
1248 }
1249 51 => {
1250 let len = Self::read_varu32_safe(buf, pos)?;
1251 let asset_code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1252 *pos += len;
1253 let scale = buf[*pos];
1254 *pos += 1;
1255 let minor_units = i64::from_le_bytes([
1256 buf[*pos],
1257 buf[*pos + 1],
1258 buf[*pos + 2],
1259 buf[*pos + 3],
1260 buf[*pos + 4],
1261 buf[*pos + 5],
1262 buf[*pos + 6],
1263 buf[*pos + 7],
1264 ]);
1265 *pos += 8;
1266 Value::Money {
1267 asset_code,
1268 minor_units,
1269 scale,
1270 }
1271 }
1272 0x85 => {
1274 let orig_len = Self::read_varu32_safe(buf, pos)?;
1275 let comp_len = Self::read_varu32_safe(buf, pos)?;
1276 let compressed = &buf[*pos..*pos + comp_len];
1277 *pos += comp_len;
1278 let mut out = vec![0u8; orig_len];
1279 zstd::bulk::decompress_to_buffer(compressed, &mut out)
1280 .map_err(|e| format!("C3 Text decompress: {e}"))?;
1281 Value::text(String::from_utf8(out).map_err(|e| format!("C3 Text UTF-8: {e}"))?)
1282 }
1283 0x86 => {
1284 let orig_len = Self::read_varu32_safe(buf, pos)?;
1285 let comp_len = Self::read_varu32_safe(buf, pos)?;
1286 let compressed = &buf[*pos..*pos + comp_len];
1287 *pos += comp_len;
1288 let mut out = vec![0u8; orig_len];
1289 zstd::bulk::decompress_to_buffer(compressed, &mut out)
1290 .map_err(|e| format!("C3 Blob decompress: {e}"))?;
1291 Value::Blob(out)
1292 }
1293 _ => return Err(format!("Unknown Value type: {}", type_byte).into()),
1294 })
1295 }
1296
1297 fn write_value_binary(buf: &mut Vec<u8>, value: &Value) {
1302 use std::net::IpAddr;
1303
1304 match value {
1305 Value::Null => buf.push(0),
1306 Value::Boolean(b) => {
1307 buf.push(1);
1308 buf.push(if *b { 1 } else { 0 });
1309 }
1310 Value::Integer(i) => {
1311 buf.push(2);
1312 buf.extend_from_slice(&i.to_le_bytes());
1313 }
1314 Value::UnsignedInteger(u) => {
1315 buf.push(3);
1316 buf.extend_from_slice(&u.to_le_bytes());
1317 }
1318 Value::Float(f) => {
1319 buf.push(4);
1320 buf.extend_from_slice(&f.to_le_bytes());
1321 }
1322 Value::Text(s) => {
1323 const TEXT_COMPRESS_THRESHOLD: usize = 2048;
1327 if s.len() >= TEXT_COMPRESS_THRESHOLD {
1328 if let Ok(compressed) = zstd::bulk::compress(s.as_bytes(), 3) {
1329 if compressed.len() < s.len() {
1330 buf.push(0x85); write_varu32(buf, s.len() as u32); write_varu32(buf, compressed.len() as u32);
1333 buf.extend_from_slice(&compressed);
1334 return; }
1336 }
1337 }
1338 buf.push(5);
1339 write_varu32(buf, s.len() as u32);
1340 buf.extend_from_slice(s.as_bytes());
1341 }
1342 Value::Blob(bytes) => {
1343 const BLOB_COMPRESS_THRESHOLD: usize = 2048;
1346 if bytes.len() >= BLOB_COMPRESS_THRESHOLD {
1347 if let Ok(compressed) = zstd::bulk::compress(bytes.as_slice(), 3) {
1348 if compressed.len() < bytes.len() {
1349 buf.push(0x86); write_varu32(buf, bytes.len() as u32);
1351 write_varu32(buf, compressed.len() as u32);
1352 buf.extend_from_slice(&compressed);
1353 return;
1354 }
1355 }
1356 }
1357 buf.push(6);
1358 write_varu32(buf, bytes.len() as u32);
1359 buf.extend_from_slice(bytes);
1360 }
1361 Value::Timestamp(t) => {
1362 buf.push(7);
1363 buf.extend_from_slice(&t.to_le_bytes());
1364 }
1365 Value::Duration(d) => {
1366 buf.push(8);
1367 buf.extend_from_slice(&d.to_le_bytes());
1368 }
1369 Value::IpAddr(ip) => {
1370 buf.push(9);
1371 match ip {
1372 IpAddr::V4(v4) => {
1373 buf.push(4);
1374 buf.extend_from_slice(&v4.octets());
1375 }
1376 IpAddr::V6(v6) => {
1377 buf.push(6);
1378 buf.extend_from_slice(&v6.octets());
1379 }
1380 }
1381 }
1382 Value::MacAddr(mac) => {
1383 buf.push(10);
1384 buf.extend_from_slice(mac);
1385 }
1386 Value::Vector(vec) => {
1387 buf.push(11);
1388 write_varu32(buf, vec.len() as u32);
1389 for f in vec {
1390 buf.extend_from_slice(&f.to_le_bytes());
1391 }
1392 }
1393 Value::Json(bytes) => {
1394 buf.push(12);
1395 write_varu32(buf, bytes.len() as u32);
1396 buf.extend_from_slice(bytes);
1397 }
1398 Value::Uuid(uuid) => {
1399 buf.push(13);
1400 buf.extend_from_slice(uuid);
1401 }
1402 Value::NodeRef(s) => {
1403 buf.push(14);
1404 write_varu32(buf, s.len() as u32);
1405 buf.extend_from_slice(s.as_bytes());
1406 }
1407 Value::EdgeRef(s) => {
1408 buf.push(15);
1409 write_varu32(buf, s.len() as u32);
1410 buf.extend_from_slice(s.as_bytes());
1411 }
1412 Value::VectorRef(s, id) => {
1413 buf.push(16);
1414 write_varu32(buf, s.len() as u32);
1415 buf.extend_from_slice(s.as_bytes());
1416 buf.extend_from_slice(&id.to_le_bytes());
1417 }
1418 Value::RowRef(s, id) => {
1419 buf.push(17);
1420 write_varu32(buf, s.len() as u32);
1421 buf.extend_from_slice(s.as_bytes());
1422 buf.extend_from_slice(&id.to_le_bytes());
1423 }
1424 Value::Color(rgb) => {
1425 buf.push(18);
1426 buf.extend_from_slice(rgb);
1427 }
1428 Value::Email(s) => {
1429 buf.push(19);
1430 write_varu32(buf, s.len() as u32);
1431 buf.extend_from_slice(s.as_bytes());
1432 }
1433 Value::Url(s) => {
1434 buf.push(20);
1435 write_varu32(buf, s.len() as u32);
1436 buf.extend_from_slice(s.as_bytes());
1437 }
1438 Value::Phone(n) => {
1439 buf.push(21);
1440 buf.extend_from_slice(&n.to_le_bytes());
1441 }
1442 Value::Semver(v) => {
1443 buf.push(22);
1444 buf.extend_from_slice(&v.to_le_bytes());
1445 }
1446 Value::Cidr(ip, prefix) => {
1447 buf.push(23);
1448 buf.extend_from_slice(&ip.to_le_bytes());
1449 buf.push(*prefix);
1450 }
1451 Value::Date(d) => {
1452 buf.push(24);
1453 buf.extend_from_slice(&d.to_le_bytes());
1454 }
1455 Value::Time(t) => {
1456 buf.push(25);
1457 buf.extend_from_slice(&t.to_le_bytes());
1458 }
1459 Value::Decimal(v) => {
1460 buf.push(26);
1461 buf.extend_from_slice(&v.to_le_bytes());
1462 }
1463 Value::EnumValue(i) => {
1464 buf.push(27);
1465 buf.push(*i);
1466 }
1467 Value::Array(elems) => {
1468 buf.push(28);
1469 write_varu32(buf, elems.len() as u32);
1470 for elem in elems {
1471 Self::write_value_binary(buf, elem);
1472 }
1473 }
1474 Value::TimestampMs(v) => {
1475 buf.push(29);
1476 buf.extend_from_slice(&v.to_le_bytes());
1477 }
1478 Value::Ipv4(v) => {
1479 buf.push(30);
1480 buf.extend_from_slice(&v.to_le_bytes());
1481 }
1482 Value::Ipv6(bytes) => {
1483 buf.push(31);
1484 buf.extend_from_slice(bytes);
1485 }
1486 Value::Subnet(ip, mask) => {
1487 buf.push(32);
1488 buf.extend_from_slice(&ip.to_le_bytes());
1489 buf.extend_from_slice(&mask.to_le_bytes());
1490 }
1491 Value::Port(v) => {
1492 buf.push(33);
1493 buf.extend_from_slice(&v.to_le_bytes());
1494 }
1495 Value::Latitude(v) => {
1496 buf.push(34);
1497 buf.extend_from_slice(&v.to_le_bytes());
1498 }
1499 Value::Longitude(v) => {
1500 buf.push(35);
1501 buf.extend_from_slice(&v.to_le_bytes());
1502 }
1503 Value::GeoPoint(lat, lon) => {
1504 buf.push(36);
1505 buf.extend_from_slice(&lat.to_le_bytes());
1506 buf.extend_from_slice(&lon.to_le_bytes());
1507 }
1508 Value::Country2(c) => {
1509 buf.push(37);
1510 buf.extend_from_slice(c);
1511 }
1512 Value::Country3(c) => {
1513 buf.push(38);
1514 buf.extend_from_slice(c);
1515 }
1516 Value::Lang2(c) => {
1517 buf.push(39);
1518 buf.extend_from_slice(c);
1519 }
1520 Value::Lang5(c) => {
1521 buf.push(40);
1522 buf.extend_from_slice(c);
1523 }
1524 Value::Currency(c) => {
1525 buf.push(41);
1526 buf.extend_from_slice(c);
1527 }
1528 Value::AssetCode(code) => {
1529 buf.push(50);
1530 write_varu32(buf, code.len() as u32);
1531 buf.extend_from_slice(code.as_bytes());
1532 }
1533 Value::Money {
1534 asset_code,
1535 minor_units,
1536 scale,
1537 } => {
1538 buf.push(51);
1539 write_varu32(buf, asset_code.len() as u32);
1540 buf.extend_from_slice(asset_code.as_bytes());
1541 buf.push(*scale);
1542 buf.extend_from_slice(&minor_units.to_le_bytes());
1543 }
1544 Value::ColorAlpha(rgba) => {
1545 buf.push(42);
1546 buf.extend_from_slice(rgba);
1547 }
1548 Value::BigInt(v) => {
1549 buf.push(43);
1550 buf.extend_from_slice(&v.to_le_bytes());
1551 }
1552 Value::KeyRef(col, key) => {
1553 buf.push(44);
1554 write_varu32(buf, col.len() as u32);
1555 buf.extend_from_slice(col.as_bytes());
1556 write_varu32(buf, key.len() as u32);
1557 buf.extend_from_slice(key.as_bytes());
1558 }
1559 Value::DocRef(col, id) => {
1560 buf.push(45);
1561 write_varu32(buf, col.len() as u32);
1562 buf.extend_from_slice(col.as_bytes());
1563 buf.extend_from_slice(&id.to_le_bytes());
1564 }
1565 Value::TableRef(name) => {
1566 buf.push(46);
1567 write_varu32(buf, name.len() as u32);
1568 buf.extend_from_slice(name.as_bytes());
1569 }
1570 Value::PageRef(page_id) => {
1571 buf.push(47);
1572 buf.extend_from_slice(&page_id.to_le_bytes());
1573 }
1574 Value::Secret(bytes) => {
1575 buf.push(48);
1576 write_varu32(buf, bytes.len() as u32);
1577 buf.extend_from_slice(bytes);
1578 }
1579 Value::Password(hash) => {
1580 buf.push(49);
1581 write_varu32(buf, hash.len() as u32);
1582 buf.extend_from_slice(hash.as_bytes());
1583 }
1584 }
1585 }
1586}