1use super::*;
2
3impl UnifiedStore {
4 pub fn new() -> Self {
5 Self::with_config(UnifiedStoreConfig::default())
6 }
7
8 pub fn format_version(&self) -> u32 {
10 self.format_version.load(Ordering::SeqCst)
11 }
12
13 pub(crate) fn set_format_version(&self, version: u32) {
14 self.format_version.store(version, Ordering::SeqCst);
15 }
16
17 pub fn next_entity_id(&self) -> EntityId {
19 EntityId::new(self.next_entity_id.fetch_add(1, Ordering::SeqCst))
20 }
21
22 pub fn reserve_entity_ids(&self, n: u64) -> std::ops::Range<u64> {
25 let start = self.next_entity_id.fetch_add(n, Ordering::SeqCst);
26 start..start + n
27 }
28
29 pub(crate) fn register_entity_id(&self, id: EntityId) {
30 let candidate = id.raw().saturating_add(1);
31 let mut current = self.next_entity_id.load(Ordering::SeqCst);
32 while candidate > current {
33 match self.next_entity_id.compare_exchange(
34 current,
35 candidate,
36 Ordering::SeqCst,
37 Ordering::SeqCst,
38 ) {
39 Ok(_) => break,
40 Err(updated) => current = updated,
41 }
42 }
43 }
44
45 pub fn load_from_file(path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
57 let file = File::open(path)?;
58 let mut reader = BufReader::new(file);
59 let mut buf = Vec::new();
60 reader.read_to_end(&mut buf)?;
61
62 if buf.len() < 8 {
64 return Err("File too small".into());
65 }
66 if &buf[0..4] != STORE_MAGIC {
67 return Err("Invalid magic bytes - expected RDST".into());
68 }
69 let mut pos = 4;
70
71 let version = u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]]);
73 pos += 4;
74 if version != STORE_VERSION_V1
75 && version != STORE_VERSION_V2
76 && version != STORE_VERSION_V3
77 && version != STORE_VERSION_V4
78 && version != STORE_VERSION_V5
79 && version != STORE_VERSION_V6
80 && version != STORE_VERSION_V7
81 && version != STORE_VERSION_V8
82 && version != STORE_VERSION_V9
83 {
84 return Err(format!("Unsupported version: {}", version).into());
85 }
86
87 if version >= STORE_VERSION_V3 {
89 if buf.len() < 12 {
90 return Err("File too small for CRC32 verification".into());
91 }
92 let stored_crc = u32::from_le_bytes([
93 buf[buf.len() - 4],
94 buf[buf.len() - 3],
95 buf[buf.len() - 2],
96 buf[buf.len() - 1],
97 ]);
98 let computed_crc = crate::storage::engine::crc32::crc32(&buf[..buf.len() - 4]);
99 if stored_crc != computed_crc {
100 return Err("Binary store CRC32 mismatch — file corrupted".into());
101 }
102 buf.truncate(buf.len() - 4);
104 }
105
106 let store = Self::with_config(UnifiedStoreConfig::default());
107 store.set_format_version(version);
108
109 let collection_count = read_varu32(&buf, &mut pos)
111 .map_err(|e| format!("Failed to read collection count: {:?}", e))?;
112
113 for _ in 0..collection_count {
115 let name_len = read_varu32(&buf, &mut pos)
117 .map_err(|e| format!("Failed to read name length: {:?}", e))?
118 as usize;
119 let name = String::from_utf8(buf[pos..pos + name_len].to_vec())
120 .map_err(|e| format!("Invalid UTF-8 in collection name: {}", e))?;
121 pos += name_len;
122
123 let entity_count = read_varu32(&buf, &mut pos)
125 .map_err(|e| format!("Failed to read entity count: {:?}", e))?;
126
127 for _ in 0..entity_count {
129 if version >= STORE_VERSION_V7 {
130 if pos + 4 > buf.len() {
132 return Err("Truncated entity record length".into());
133 }
134 let record_len =
135 u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]])
136 as usize;
137 pos += 4;
138 if pos + record_len > buf.len() {
139 return Err("Truncated entity record payload".into());
140 }
141 let record_bytes = &buf[pos..pos + record_len];
142 pos += record_len;
143
144 let (entity, metadata) = Self::deserialize_entity_record(record_bytes, version)
145 .map_err(|e| format!("Entity record deserialization error: {e}"))?;
146
147 store.insert_auto(&name, entity.clone())?;
148
149 if let Some(metadata) = metadata {
150 if let Some(manager) = store.get_collection(&name) {
151 let _ = manager.set_metadata(entity.id, metadata);
152 }
153 }
154 } else {
155 let entity = Self::read_entity_binary(&buf, &mut pos, version)?;
157 store.insert_auto(&name, entity)?;
158 }
159 }
160 }
161
162 if pos < buf.len() {
163 let cross_ref_count = read_varu32(&buf, &mut pos)
165 .map_err(|e| format!("Failed to read cross-ref count: {:?}", e))?;
166
167 for _ in 0..cross_ref_count {
168 let source_id = read_varu64(&buf, &mut pos)
169 .map_err(|e| format!("Failed to read source_id: {:?}", e))?;
170 let target_id = read_varu64(&buf, &mut pos)
171 .map_err(|e| format!("Failed to read target_id: {:?}", e))?;
172 let ref_type_byte = buf[pos];
173 pos += 1;
174 let ref_type = RefType::from_byte(ref_type_byte);
175
176 let coll_len = read_varu32(&buf, &mut pos)
177 .map_err(|e| format!("Failed to read collection length: {:?}", e))?
178 as usize;
179 let collection = String::from_utf8(buf[pos..pos + coll_len].to_vec())
180 .map_err(|e| format!("Invalid UTF-8 in collection: {}", e))?;
181 pos += coll_len;
182
183 let source_collection = store
184 .get_any(EntityId::new(source_id))
185 .map(|(name, _)| name)
186 .unwrap_or_else(|| collection.clone());
187 let _ = store.add_cross_ref(
188 &source_collection,
189 EntityId::new(source_id),
190 &collection,
191 EntityId::new(target_id),
192 ref_type,
193 1.0,
194 );
195 }
196 }
197
198 if store.format_version() < STORE_VERSION_V9 {
199 store.set_format_version(STORE_VERSION_V9);
200 }
201
202 Ok(store)
203 }
204
205 pub fn save_to_file(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
210 let tmp_path = path.with_extension("rdb-tmp");
212 let file = File::create(&tmp_path)?;
213 let mut writer = BufWriter::new(file);
214 let mut buf = Vec::new();
215
216 buf.extend_from_slice(STORE_MAGIC);
218
219 buf.extend_from_slice(&STORE_VERSION_V9.to_le_bytes());
222
223 let collections = self.collections.read();
225 write_varu32(&mut buf, collections.len() as u32);
226
227 let fv = STORE_VERSION_V9;
228 for (name, manager) in collections.iter() {
229 write_varu32(&mut buf, name.len() as u32);
231 buf.extend_from_slice(name.as_bytes());
232
233 let entities = manager.query_all(|_| true);
235 write_varu32(&mut buf, entities.len() as u32);
236
237 for entity in entities {
240 let metadata = manager.get_metadata(entity.id);
241 let record = Self::serialize_entity_record(&entity, metadata.as_ref(), fv);
242 buf.extend_from_slice(&(record.len() as u32).to_le_bytes());
243 buf.extend_from_slice(&record);
244 }
245 }
246
247 let cross_refs = self.cross_refs.read();
249 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
250 write_varu32(&mut buf, total_refs as u32);
251
252 for (source_id, refs) in cross_refs.iter() {
253 for (target_id, ref_type, collection) in refs {
254 write_varu64(&mut buf, source_id.raw());
255 write_varu64(&mut buf, target_id.raw());
256 buf.push(ref_type.to_byte());
257 write_varu32(&mut buf, collection.len() as u32);
258 buf.extend_from_slice(collection.as_bytes());
259 }
260 }
261
262 self.set_format_version(STORE_VERSION_V9);
263
264 let checksum = crate::storage::engine::crc32::crc32(&buf);
266 buf.extend_from_slice(&checksum.to_le_bytes());
267
268 writer.write_all(&buf)?;
269 writer.flush()?;
270 writer.get_ref().sync_all()?;
271 drop(writer);
272
273 std::fs::rename(&tmp_path, path)?;
275
276 if let Some(parent) = path.parent() {
278 if let Ok(dir) = File::open(parent) {
279 let _ = dir.sync_all();
280 }
281 }
282
283 Ok(())
284 }
285
286 pub(crate) fn read_entity_binary(
288 buf: &[u8],
289 pos: &mut usize,
290 format_version: u32,
291 ) -> Result<UnifiedEntity, Box<dyn std::error::Error>> {
292 let id = read_varu64(buf, pos).map_err(|e| format!("Failed to read entity id: {:?}", e))?;
294
295 let kind_type = buf[*pos];
297 *pos += 1;
298
299 let kind = match kind_type {
301 0 => {
302 let table_len = Self::read_varu32_safe(buf, pos)?;
304 let table = String::from_utf8(buf[*pos..*pos + table_len].to_vec())?;
305 *pos += table_len;
306 let row_id = Self::read_varu64_safe(buf, pos)?;
307 EntityKind::TableRow {
308 table: table.into(),
309 row_id,
310 }
311 }
312 1 => {
313 let label_len = Self::read_varu32_safe(buf, pos)?;
315 let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
316 *pos += label_len;
317 let node_type_len = Self::read_varu32_safe(buf, pos)?;
318 let node_type = String::from_utf8(buf[*pos..*pos + node_type_len].to_vec())?;
319 *pos += node_type_len;
320 EntityKind::GraphNode(Box::new(GraphNodeKind { label, node_type }))
321 }
322 2 => {
323 let label_len = Self::read_varu32_safe(buf, pos)?;
325 let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
326 *pos += label_len;
327 let from_node_len = Self::read_varu32_safe(buf, pos)?;
328 let from_node = String::from_utf8(buf[*pos..*pos + from_node_len].to_vec())?;
329 *pos += from_node_len;
330 let to_node_len = Self::read_varu32_safe(buf, pos)?;
331 let to_node = String::from_utf8(buf[*pos..*pos + to_node_len].to_vec())?;
332 *pos += to_node_len;
333 let weight =
334 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
335 *pos += 4;
336 EntityKind::GraphEdge(Box::new(GraphEdgeKind {
337 label,
338 from_node,
339 to_node,
340 weight,
341 }))
342 }
343 3 => {
344 let collection_len = Self::read_varu32_safe(buf, pos)?;
346 let collection = String::from_utf8(buf[*pos..*pos + collection_len].to_vec())?;
347 *pos += collection_len;
348 EntityKind::Vector { collection }
349 }
350 4 => {
351 let series_len = Self::read_varu32_safe(buf, pos)?;
353 let series = String::from_utf8(buf[*pos..*pos + series_len].to_vec())?;
354 *pos += series_len;
355 let metric_len = Self::read_varu32_safe(buf, pos)?;
356 let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
357 *pos += metric_len;
358 EntityKind::TimeSeriesPoint(Box::new(TimeSeriesPointKind { series, metric }))
359 }
360 5 => {
361 let queue_len = Self::read_varu32_safe(buf, pos)?;
363 let queue = String::from_utf8(buf[*pos..*pos + queue_len].to_vec())?;
364 *pos += queue_len;
365 let position = Self::read_varu64_safe(buf, pos)?;
366 EntityKind::QueueMessage { queue, position }
367 }
368 _ => return Err(format!("Unknown EntityKind type: {}", kind_type).into()),
369 };
370
371 let data_type = buf[*pos];
373 *pos += 1;
374
375 let mut data = match data_type {
377 0 => {
378 let col_count = Self::read_varu32_safe(buf, pos)?;
380 let mut columns = Vec::with_capacity(col_count);
381 for _ in 0..col_count {
382 columns.push(Self::read_value_binary(buf, pos)?);
383 }
384 EntityData::Row(RowData::new(columns))
385 }
386 1 => {
387 let prop_count = Self::read_varu32_safe(buf, pos)?;
389 let mut properties = HashMap::new();
390 for _ in 0..prop_count {
391 let key_len = Self::read_varu32_safe(buf, pos)?;
392 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
393 *pos += key_len;
394 let value = Self::read_value_binary(buf, pos)?;
395 properties.insert(key, value);
396 }
397 EntityData::Node(NodeData::with_properties(properties))
398 }
399 2 => {
400 let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
402 *pos += 4;
403 let weight = f32::from_le_bytes(weight_bytes);
404 let prop_count = Self::read_varu32_safe(buf, pos)?;
405 let mut properties = HashMap::new();
406 for _ in 0..prop_count {
407 let key_len = Self::read_varu32_safe(buf, pos)?;
408 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
409 *pos += key_len;
410 let value = Self::read_value_binary(buf, pos)?;
411 properties.insert(key, value);
412 }
413 let mut edge = EdgeData::new(weight);
414 edge.properties = properties;
415 EntityData::Edge(edge)
416 }
417 3 => {
418 let dim = Self::read_varu32_safe(buf, pos)?;
420 let mut dense = Vec::with_capacity(dim);
421 for _ in 0..dim {
422 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
423 *pos += 4;
424 dense.push(f32::from_le_bytes(bytes));
425 }
426 let mut vector = VectorData::new(dense);
427 if format_version >= STORE_VERSION_V6 {
428 let has_content = buf[*pos] != 0;
429 *pos += 1;
430 if has_content {
431 let content_len = Self::read_varu32_safe(buf, pos)?;
432 vector.content =
433 Some(String::from_utf8(buf[*pos..*pos + content_len].to_vec())?);
434 *pos += content_len;
435 }
436 }
437 EntityData::Vector(vector)
438 }
439 4 => {
440 let metric_len = Self::read_varu32_safe(buf, pos)?;
442 let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
443 *pos += metric_len;
444 let timestamp_ns = Self::read_varu64_safe(buf, pos)?;
445 let value_bytes = [
446 buf[*pos],
447 buf[*pos + 1],
448 buf[*pos + 2],
449 buf[*pos + 3],
450 buf[*pos + 4],
451 buf[*pos + 5],
452 buf[*pos + 6],
453 buf[*pos + 7],
454 ];
455 *pos += 8;
456 let mut tags = HashMap::new();
457 if format_version >= STORE_VERSION_V5 {
458 let tag_count = Self::read_varu32_safe(buf, pos)?;
459 tags = HashMap::with_capacity(tag_count);
460 for _ in 0..tag_count {
461 let key_len = Self::read_varu32_safe(buf, pos)?;
462 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
463 *pos += key_len;
464 let value_len = Self::read_varu32_safe(buf, pos)?;
465 let value = String::from_utf8(buf[*pos..*pos + value_len].to_vec())?;
466 *pos += value_len;
467 tags.insert(key, value);
468 }
469 }
470 EntityData::TimeSeries(crate::storage::unified::entity::TimeSeriesData {
471 metric,
472 timestamp_ns,
473 value: f64::from_le_bytes(value_bytes),
474 tags,
475 })
476 }
477 5 => {
478 let payload = Self::read_value_binary(buf, pos)?;
480 let enqueued_at_ns = Self::read_varu64_safe(buf, pos)?;
481 let attempts = Self::read_varu32_safe(buf, pos)? as u32;
482 EntityData::QueueMessage(crate::storage::unified::entity::QueueMessageData {
483 payload,
484 priority: None,
485 enqueued_at_ns,
486 attempts,
487 max_attempts: 3,
488 acked: false,
489 })
490 }
491 6 => {
492 let field_count = Self::read_varu32_safe(buf, pos)?;
494 let mut named = HashMap::with_capacity(field_count);
495 for _ in 0..field_count {
496 let key_len = Self::read_varu32_safe(buf, pos)?;
497 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
498 *pos += key_len;
499 let value = Self::read_value_binary(buf, pos)?;
500 named.insert(key, value);
501 }
502 EntityData::Row(RowData {
503 columns: Vec::new(),
504 named: Some(named),
505 schema: None,
506 })
507 }
508 _ => return Err(format!("Unknown EntityData type: {}", data_type).into()),
509 };
510
511 let created_at = Self::read_varu64_safe(buf, pos)?;
513 let updated_at = Self::read_varu64_safe(buf, pos)?;
514
515 let embedding_count = Self::read_varu32_safe(buf, pos)?;
517 let mut embeddings = Vec::with_capacity(embedding_count);
518 for _ in 0..embedding_count {
519 let name_len = Self::read_varu32_safe(buf, pos)?;
520 let name = String::from_utf8(buf[*pos..*pos + name_len].to_vec())?;
521 *pos += name_len;
522
523 let dim = Self::read_varu32_safe(buf, pos)?;
524 let mut vector = Vec::with_capacity(dim);
525 for _ in 0..dim {
526 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
527 *pos += 4;
528 vector.push(f32::from_le_bytes(bytes));
529 }
530
531 let model_len = Self::read_varu32_safe(buf, pos)?;
532 let model = String::from_utf8(buf[*pos..*pos + model_len].to_vec())?;
533 *pos += model_len;
534
535 embeddings.push(EmbeddingSlot::new(name, vector, model));
536 }
537
538 let cross_ref_count = Self::read_varu32_safe(buf, pos)?;
540 let mut cross_refs = Vec::with_capacity(cross_ref_count);
541 for _ in 0..cross_ref_count {
542 let source = Self::read_varu64_safe(buf, pos)?;
543 let target = Self::read_varu64_safe(buf, pos)?;
544 let ref_type_byte = buf[*pos];
545 *pos += 1;
546 let (target_collection, weight, created_at) = if format_version >= STORE_VERSION_V2 {
547 let coll_len = Self::read_varu32_safe(buf, pos)?;
548 let collection = String::from_utf8(buf[*pos..*pos + coll_len].to_vec())?;
549 *pos += coll_len;
550 let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
551 *pos += 4;
552 let weight = f32::from_le_bytes(weight_bytes);
553 let created_at = Self::read_varu64_safe(buf, pos)?;
554 (collection, weight, created_at)
555 } else {
556 (String::new(), 1.0, 0)
557 };
558
559 let mut cross_ref = CrossRef::new(
560 EntityId::new(source),
561 EntityId::new(target),
562 target_collection,
563 RefType::from_byte(ref_type_byte),
564 );
565 cross_ref.weight = weight;
566 cross_ref.created_at = created_at;
567 cross_refs.push(cross_ref);
568 }
569
570 let sequence_id = Self::read_varu64_safe(buf, pos)?;
572
573 if format_version >= STORE_VERSION_V4 {
574 if let EntityData::QueueMessage(message) = &mut data {
575 if *pos < buf.len() {
576 let priority_present = buf[*pos] != 0;
577 *pos += 1;
578 message.priority = if priority_present {
579 if *pos + 4 > buf.len() {
580 return Err("truncated queue priority".into());
581 }
582 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
583 *pos += 4;
584 Some(i32::from_le_bytes(bytes))
585 } else {
586 None
587 };
588 message.max_attempts = Self::read_varu32_safe(buf, pos)? as u32;
589 if *pos >= buf.len() {
590 return Err("truncated queue ack flag".into());
591 }
592 message.acked = buf[*pos] != 0;
593 *pos += 1;
594 }
595 }
596 }
597
598 let mut entity = UnifiedEntity::new(EntityId::new(id), kind, data);
599 entity.created_at = created_at;
600 entity.updated_at = updated_at;
601 entity.sequence_id = sequence_id;
602 if format_version >= STORE_VERSION_V8 && *pos < buf.len() {
603 let has_logical_id = buf[*pos] != 0;
604 *pos += 1;
605 if has_logical_id {
606 let logical_id = Self::read_varu64_safe(buf, pos)?;
607 entity.set_logical_id(EntityId::new(logical_id));
608 }
609 }
610 if format_version >= STORE_VERSION_V9 && *pos < buf.len() {
611 let xmin = Self::read_varu64_safe(buf, pos)?;
612 let xmax = Self::read_varu64_safe(buf, pos)?;
613 entity.set_xmin(xmin);
614 entity.set_xmax(xmax);
615 }
616 if !embeddings.is_empty() || !cross_refs.is_empty() {
617 entity.embeddings_mut().extend(embeddings);
618 entity.cross_refs_mut().extend(cross_refs);
619 }
620
621 Ok(entity)
622 }
623
624 fn read_varu32_safe(buf: &[u8], pos: &mut usize) -> Result<usize, Box<dyn std::error::Error>> {
626 read_varu32(buf, pos)
627 .map(|v| v as usize)
628 .map_err(|e| format!("Decode error: {:?}", e).into())
629 }
630
631 fn read_varu64_safe(buf: &[u8], pos: &mut usize) -> Result<u64, Box<dyn std::error::Error>> {
633 read_varu64(buf, pos).map_err(|e| format!("Decode error: {:?}", e).into())
634 }
635
636 pub(crate) fn write_entity_binary(
638 buf: &mut Vec<u8>,
639 entity: &UnifiedEntity,
640 format_version: u32,
641 ) {
642 write_varu64(buf, entity.id.raw());
644
645 match &entity.kind {
647 EntityKind::TableRow { table, row_id } => {
648 buf.push(0);
649 write_varu32(buf, table.len() as u32);
650 buf.extend_from_slice(table.as_bytes());
651 write_varu64(buf, *row_id);
652 }
653 EntityKind::GraphNode(ref node) => {
654 buf.push(1);
655 write_varu32(buf, node.label.len() as u32);
656 buf.extend_from_slice(node.label.as_bytes());
657 write_varu32(buf, node.node_type.len() as u32);
658 buf.extend_from_slice(node.node_type.as_bytes());
659 }
660 EntityKind::GraphEdge(ref edge) => {
661 buf.push(2);
662 write_varu32(buf, edge.label.len() as u32);
663 buf.extend_from_slice(edge.label.as_bytes());
664 write_varu32(buf, edge.from_node.len() as u32);
665 buf.extend_from_slice(edge.from_node.as_bytes());
666 write_varu32(buf, edge.to_node.len() as u32);
667 buf.extend_from_slice(edge.to_node.as_bytes());
668 buf.extend_from_slice(&edge.weight.to_le_bytes());
669 }
670 EntityKind::Vector { collection } => {
671 buf.push(3);
672 write_varu32(buf, collection.len() as u32);
673 buf.extend_from_slice(collection.as_bytes());
674 }
675 EntityKind::TimeSeriesPoint(ref ts) => {
676 buf.push(4);
677 write_varu32(buf, ts.series.len() as u32);
678 buf.extend_from_slice(ts.series.as_bytes());
679 write_varu32(buf, ts.metric.len() as u32);
680 buf.extend_from_slice(ts.metric.as_bytes());
681 }
682 EntityKind::QueueMessage { queue, position } => {
683 buf.push(5);
684 write_varu32(buf, queue.len() as u32);
685 buf.extend_from_slice(queue.as_bytes());
686 write_varu64(buf, *position);
687 }
688 }
689
690 match &entity.data {
692 EntityData::Row(row) => {
693 if let Some(ref named) = row.named {
694 buf.push(6);
701 write_varu32(buf, named.len() as u32);
702 let mut entries: Vec<_> = named.iter().collect();
703 entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
704 for (key, value) in entries {
705 write_varu32(buf, key.len() as u32);
706 buf.extend_from_slice(key.as_bytes());
707 Self::write_value_binary(buf, value);
708 }
709 } else {
710 buf.push(0);
711 write_varu32(buf, row.columns.len() as u32);
712 for col in &row.columns {
713 Self::write_value_binary(buf, col);
714 }
715 }
716 }
717 EntityData::Node(node) => {
718 buf.push(1);
719 write_varu32(buf, node.properties.len() as u32);
720 let mut entries: Vec<_> = node.properties.iter().collect();
721 entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
722 for (key, value) in entries {
723 write_varu32(buf, key.len() as u32);
724 buf.extend_from_slice(key.as_bytes());
725 Self::write_value_binary(buf, value);
726 }
727 }
728 EntityData::Edge(edge) => {
729 buf.push(2);
730 buf.extend_from_slice(&edge.weight.to_le_bytes());
731 write_varu32(buf, edge.properties.len() as u32);
732 let mut entries: Vec<_> = edge.properties.iter().collect();
733 entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
734 for (key, value) in entries {
735 write_varu32(buf, key.len() as u32);
736 buf.extend_from_slice(key.as_bytes());
737 Self::write_value_binary(buf, value);
738 }
739 }
740 EntityData::Vector(vec) => {
741 buf.push(3);
742 write_varu32(buf, vec.dense.len() as u32);
743 for f in &vec.dense {
744 buf.extend_from_slice(&f.to_le_bytes());
745 }
746 if format_version >= STORE_VERSION_V6 {
747 buf.push(u8::from(vec.content.is_some()));
748 if let Some(content) = &vec.content {
749 write_varu32(buf, content.len() as u32);
750 buf.extend_from_slice(content.as_bytes());
751 }
752 }
753 }
754 EntityData::TimeSeries(ts) => {
755 buf.push(4);
756 write_varu32(buf, ts.metric.len() as u32);
757 buf.extend_from_slice(ts.metric.as_bytes());
758 write_varu64(buf, ts.timestamp_ns);
759 buf.extend_from_slice(&ts.value.to_le_bytes());
760 if format_version >= STORE_VERSION_V5 {
761 write_varu32(buf, ts.tags.len() as u32);
762 let mut tag_entries: Vec<_> = ts.tags.iter().collect();
763 tag_entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
764 for (key, value) in tag_entries {
765 write_varu32(buf, key.len() as u32);
766 buf.extend_from_slice(key.as_bytes());
767 write_varu32(buf, value.len() as u32);
768 buf.extend_from_slice(value.as_bytes());
769 }
770 }
771 }
772 EntityData::QueueMessage(msg) => {
773 buf.push(5);
774 Self::write_value_binary(buf, &msg.payload);
775 write_varu64(buf, msg.enqueued_at_ns);
776 write_varu32(buf, msg.attempts);
777 }
778 }
779
780 write_varu64(buf, entity.created_at);
782 write_varu64(buf, entity.updated_at);
783
784 write_varu32(buf, entity.embeddings().len() as u32);
786 for emb in entity.embeddings() {
787 write_varu32(buf, emb.name.len() as u32);
788 buf.extend_from_slice(emb.name.as_bytes());
789 write_varu32(buf, emb.vector.len() as u32);
790 for f in &emb.vector {
791 buf.extend_from_slice(&f.to_le_bytes());
792 }
793 write_varu32(buf, emb.model.len() as u32);
794 buf.extend_from_slice(emb.model.as_bytes());
795 }
796
797 write_varu32(buf, entity.cross_refs().len() as u32);
799 for cross_ref in entity.cross_refs() {
800 write_varu64(buf, cross_ref.source.raw());
801 write_varu64(buf, cross_ref.target.raw());
802 buf.push(cross_ref.ref_type.to_byte());
803 if format_version >= STORE_VERSION_V2 {
804 write_varu32(buf, cross_ref.target_collection.len() as u32);
805 buf.extend_from_slice(cross_ref.target_collection.as_bytes());
806 buf.extend_from_slice(&cross_ref.weight.to_le_bytes());
807 write_varu64(buf, cross_ref.created_at);
808 }
809 }
810
811 write_varu64(buf, entity.sequence_id);
813
814 if format_version >= STORE_VERSION_V4 {
815 if let EntityData::QueueMessage(message) = &entity.data {
816 buf.push(u8::from(message.priority.is_some()));
817 if let Some(priority) = message.priority {
818 buf.extend_from_slice(&priority.to_le_bytes());
819 }
820 write_varu32(buf, message.max_attempts);
821 buf.push(u8::from(message.acked));
822 }
823 }
824
825 if format_version >= STORE_VERSION_V8 {
826 buf.push(u8::from(entity.has_explicit_logical_id()));
827 if entity.has_explicit_logical_id() {
828 write_varu64(buf, entity.logical_id().raw());
829 }
830 }
831 if format_version >= STORE_VERSION_V9 {
832 write_varu64(buf, entity.xmin);
833 write_varu64(buf, entity.xmax);
834 }
835 }
836
837 fn read_value_binary(buf: &[u8], pos: &mut usize) -> Result<Value, Box<dyn std::error::Error>> {
842 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
843
844 let type_byte = buf[*pos];
845 *pos += 1;
846
847 Ok(match type_byte {
848 0 => Value::Null,
849 1 => {
850 let b = buf[*pos] != 0;
851 *pos += 1;
852 Value::Boolean(b)
853 }
854 2 => {
855 let val = i64::from_le_bytes([
856 buf[*pos],
857 buf[*pos + 1],
858 buf[*pos + 2],
859 buf[*pos + 3],
860 buf[*pos + 4],
861 buf[*pos + 5],
862 buf[*pos + 6],
863 buf[*pos + 7],
864 ]);
865 *pos += 8;
866 Value::Integer(val)
867 }
868 3 => {
869 let val = u64::from_le_bytes([
870 buf[*pos],
871 buf[*pos + 1],
872 buf[*pos + 2],
873 buf[*pos + 3],
874 buf[*pos + 4],
875 buf[*pos + 5],
876 buf[*pos + 6],
877 buf[*pos + 7],
878 ]);
879 *pos += 8;
880 Value::UnsignedInteger(val)
881 }
882 4 => {
883 let val = f64::from_le_bytes([
884 buf[*pos],
885 buf[*pos + 1],
886 buf[*pos + 2],
887 buf[*pos + 3],
888 buf[*pos + 4],
889 buf[*pos + 5],
890 buf[*pos + 6],
891 buf[*pos + 7],
892 ]);
893 *pos += 8;
894 Value::Float(val)
895 }
896 5 => {
897 let len = Self::read_varu32_safe(buf, pos)?;
898 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
899 *pos += len;
900 Value::text(s)
901 }
902 6 => {
903 let len = Self::read_varu32_safe(buf, pos)?;
904 let bytes = buf[*pos..*pos + len].to_vec();
905 *pos += len;
906 Value::Blob(bytes)
907 }
908 7 => {
909 let val = i64::from_le_bytes([
910 buf[*pos],
911 buf[*pos + 1],
912 buf[*pos + 2],
913 buf[*pos + 3],
914 buf[*pos + 4],
915 buf[*pos + 5],
916 buf[*pos + 6],
917 buf[*pos + 7],
918 ]);
919 *pos += 8;
920 Value::Timestamp(val)
921 }
922 8 => {
923 let val = i64::from_le_bytes([
924 buf[*pos],
925 buf[*pos + 1],
926 buf[*pos + 2],
927 buf[*pos + 3],
928 buf[*pos + 4],
929 buf[*pos + 5],
930 buf[*pos + 6],
931 buf[*pos + 7],
932 ]);
933 *pos += 8;
934 Value::Duration(val)
935 }
936 9 => {
937 let version = buf[*pos];
939 *pos += 1;
940 if version == 4 {
941 let octets = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
942 *pos += 4;
943 Value::IpAddr(IpAddr::V4(Ipv4Addr::from(octets)))
944 } else {
945 let mut octets = [0u8; 16];
946 octets.copy_from_slice(&buf[*pos..*pos + 16]);
947 *pos += 16;
948 Value::IpAddr(IpAddr::V6(Ipv6Addr::from(octets)))
949 }
950 }
951 10 => {
952 let mut mac = [0u8; 6];
953 mac.copy_from_slice(&buf[*pos..*pos + 6]);
954 *pos += 6;
955 Value::MacAddr(mac)
956 }
957 11 => {
958 let len = Self::read_varu32_safe(buf, pos)?;
959 let mut vector = Vec::with_capacity(len);
960 for _ in 0..len {
961 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
962 *pos += 4;
963 vector.push(f32::from_le_bytes(bytes));
964 }
965 Value::Vector(vector)
966 }
967 12 => {
968 let len = Self::read_varu32_safe(buf, pos)?;
969 let bytes = buf[*pos..*pos + len].to_vec();
970 *pos += len;
971 Value::Json(bytes)
972 }
973 13 => {
974 let mut uuid = [0u8; 16];
975 uuid.copy_from_slice(&buf[*pos..*pos + 16]);
976 *pos += 16;
977 Value::Uuid(uuid)
978 }
979 14 => {
980 let len = Self::read_varu32_safe(buf, pos)?;
981 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
982 *pos += len;
983 Value::NodeRef(s)
984 }
985 15 => {
986 let len = Self::read_varu32_safe(buf, pos)?;
987 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
988 *pos += len;
989 Value::EdgeRef(s)
990 }
991 16 => {
992 let len = Self::read_varu32_safe(buf, pos)?;
993 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
994 *pos += len;
995 let id = u64::from_le_bytes([
996 buf[*pos],
997 buf[*pos + 1],
998 buf[*pos + 2],
999 buf[*pos + 3],
1000 buf[*pos + 4],
1001 buf[*pos + 5],
1002 buf[*pos + 6],
1003 buf[*pos + 7],
1004 ]);
1005 *pos += 8;
1006 Value::VectorRef(s, id)
1007 }
1008 17 => {
1009 let len = Self::read_varu32_safe(buf, pos)?;
1010 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1011 *pos += len;
1012 let id = u64::from_le_bytes([
1013 buf[*pos],
1014 buf[*pos + 1],
1015 buf[*pos + 2],
1016 buf[*pos + 3],
1017 buf[*pos + 4],
1018 buf[*pos + 5],
1019 buf[*pos + 6],
1020 buf[*pos + 7],
1021 ]);
1022 *pos += 8;
1023 Value::RowRef(s, id)
1024 }
1025 18 => {
1026 let rgb = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1027 *pos += 3;
1028 Value::Color(rgb)
1029 }
1030 19 => {
1031 let len = Self::read_varu32_safe(buf, pos)?;
1032 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1033 *pos += len;
1034 Value::Email(s)
1035 }
1036 20 => {
1037 let len = Self::read_varu32_safe(buf, pos)?;
1038 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1039 *pos += len;
1040 Value::Url(s)
1041 }
1042 21 => {
1043 let val = u64::from_le_bytes([
1044 buf[*pos],
1045 buf[*pos + 1],
1046 buf[*pos + 2],
1047 buf[*pos + 3],
1048 buf[*pos + 4],
1049 buf[*pos + 5],
1050 buf[*pos + 6],
1051 buf[*pos + 7],
1052 ]);
1053 *pos += 8;
1054 Value::Phone(val)
1055 }
1056 22 => {
1057 let val =
1058 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1059 *pos += 4;
1060 Value::Semver(val)
1061 }
1062 23 => {
1063 let ip =
1064 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1065 *pos += 4;
1066 let prefix = buf[*pos];
1067 *pos += 1;
1068 Value::Cidr(ip, prefix)
1069 }
1070 24 => {
1071 let val =
1072 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1073 *pos += 4;
1074 Value::Date(val)
1075 }
1076 25 => {
1077 let val =
1078 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1079 *pos += 4;
1080 Value::Time(val)
1081 }
1082 26 => {
1083 let val = i64::from_le_bytes([
1084 buf[*pos],
1085 buf[*pos + 1],
1086 buf[*pos + 2],
1087 buf[*pos + 3],
1088 buf[*pos + 4],
1089 buf[*pos + 5],
1090 buf[*pos + 6],
1091 buf[*pos + 7],
1092 ]);
1093 *pos += 8;
1094 Value::Decimal(val)
1095 }
1096 27 => {
1097 let val = buf[*pos];
1098 *pos += 1;
1099 Value::EnumValue(val)
1100 }
1101 28 => {
1102 let len = Self::read_varu32_safe(buf, pos)?;
1103 let mut elems = Vec::with_capacity(len);
1104 for _ in 0..len {
1105 elems.push(Self::read_value_binary(buf, pos)?);
1106 }
1107 Value::Array(elems)
1108 }
1109 29 => {
1110 let val = i64::from_le_bytes([
1111 buf[*pos],
1112 buf[*pos + 1],
1113 buf[*pos + 2],
1114 buf[*pos + 3],
1115 buf[*pos + 4],
1116 buf[*pos + 5],
1117 buf[*pos + 6],
1118 buf[*pos + 7],
1119 ]);
1120 *pos += 8;
1121 Value::TimestampMs(val)
1122 }
1123 30 => {
1124 let val =
1125 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1126 *pos += 4;
1127 Value::Ipv4(val)
1128 }
1129 31 => {
1130 let mut bytes = [0u8; 16];
1131 bytes.copy_from_slice(&buf[*pos..*pos + 16]);
1132 *pos += 16;
1133 Value::Ipv6(bytes)
1134 }
1135 32 => {
1136 let ip =
1137 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1138 *pos += 4;
1139 let mask =
1140 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1141 *pos += 4;
1142 Value::Subnet(ip, mask)
1143 }
1144 33 => {
1145 let val = u16::from_le_bytes([buf[*pos], buf[*pos + 1]]);
1146 *pos += 2;
1147 Value::Port(val)
1148 }
1149 34 => {
1150 let val =
1151 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1152 *pos += 4;
1153 Value::Latitude(val)
1154 }
1155 35 => {
1156 let val =
1157 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1158 *pos += 4;
1159 Value::Longitude(val)
1160 }
1161 36 => {
1162 let lat =
1163 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1164 *pos += 4;
1165 let lon =
1166 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1167 *pos += 4;
1168 Value::GeoPoint(lat, lon)
1169 }
1170 37 => {
1171 let c = [buf[*pos], buf[*pos + 1]];
1172 *pos += 2;
1173 Value::Country2(c)
1174 }
1175 38 => {
1176 let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1177 *pos += 3;
1178 Value::Country3(c)
1179 }
1180 39 => {
1181 let c = [buf[*pos], buf[*pos + 1]];
1182 *pos += 2;
1183 Value::Lang2(c)
1184 }
1185 40 => {
1186 let c = [
1187 buf[*pos],
1188 buf[*pos + 1],
1189 buf[*pos + 2],
1190 buf[*pos + 3],
1191 buf[*pos + 4],
1192 ];
1193 *pos += 5;
1194 Value::Lang5(c)
1195 }
1196 41 => {
1197 let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1198 *pos += 3;
1199 Value::Currency(c)
1200 }
1201 42 => {
1202 let rgba = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
1203 *pos += 4;
1204 Value::ColorAlpha(rgba)
1205 }
1206 43 => {
1207 let val = i64::from_le_bytes([
1208 buf[*pos],
1209 buf[*pos + 1],
1210 buf[*pos + 2],
1211 buf[*pos + 3],
1212 buf[*pos + 4],
1213 buf[*pos + 5],
1214 buf[*pos + 6],
1215 buf[*pos + 7],
1216 ]);
1217 *pos += 8;
1218 Value::BigInt(val)
1219 }
1220 44 => {
1221 let col_len = Self::read_varu32_safe(buf, pos)?;
1222 let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1223 *pos += col_len;
1224 let key_len = Self::read_varu32_safe(buf, pos)?;
1225 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
1226 *pos += key_len;
1227 Value::KeyRef(col, key)
1228 }
1229 45 => {
1230 let col_len = Self::read_varu32_safe(buf, pos)?;
1231 let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1232 *pos += col_len;
1233 let id = u64::from_le_bytes([
1234 buf[*pos],
1235 buf[*pos + 1],
1236 buf[*pos + 2],
1237 buf[*pos + 3],
1238 buf[*pos + 4],
1239 buf[*pos + 5],
1240 buf[*pos + 6],
1241 buf[*pos + 7],
1242 ]);
1243 *pos += 8;
1244 Value::DocRef(col, id)
1245 }
1246 46 => {
1247 let len = Self::read_varu32_safe(buf, pos)?;
1248 let name = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1249 *pos += len;
1250 Value::TableRef(name)
1251 }
1252 47 => {
1253 let val =
1254 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1255 *pos += 4;
1256 Value::PageRef(val)
1257 }
1258 48 => {
1259 let len = Self::read_varu32_safe(buf, pos)?;
1260 let bytes = buf[*pos..*pos + len].to_vec();
1261 *pos += len;
1262 Value::Secret(bytes)
1263 }
1264 49 => {
1265 let len = Self::read_varu32_safe(buf, pos)?;
1266 let hash = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1267 *pos += len;
1268 Value::Password(hash)
1269 }
1270 50 => {
1271 let len = Self::read_varu32_safe(buf, pos)?;
1272 let code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1273 *pos += len;
1274 Value::AssetCode(code)
1275 }
1276 51 => {
1277 let len = Self::read_varu32_safe(buf, pos)?;
1278 let asset_code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1279 *pos += len;
1280 let scale = buf[*pos];
1281 *pos += 1;
1282 let minor_units = i64::from_le_bytes([
1283 buf[*pos],
1284 buf[*pos + 1],
1285 buf[*pos + 2],
1286 buf[*pos + 3],
1287 buf[*pos + 4],
1288 buf[*pos + 5],
1289 buf[*pos + 6],
1290 buf[*pos + 7],
1291 ]);
1292 *pos += 8;
1293 Value::Money {
1294 asset_code,
1295 minor_units,
1296 scale,
1297 }
1298 }
1299 0x85 => {
1301 let orig_len = Self::read_varu32_safe(buf, pos)?;
1302 let comp_len = Self::read_varu32_safe(buf, pos)?;
1303 let compressed = &buf[*pos..*pos + comp_len];
1304 *pos += comp_len;
1305 let mut out = vec![0u8; orig_len];
1306 zstd::bulk::decompress_to_buffer(compressed, &mut out)
1307 .map_err(|e| format!("C3 Text decompress: {e}"))?;
1308 Value::text(String::from_utf8(out).map_err(|e| format!("C3 Text UTF-8: {e}"))?)
1309 }
1310 0x86 => {
1311 let orig_len = Self::read_varu32_safe(buf, pos)?;
1312 let comp_len = Self::read_varu32_safe(buf, pos)?;
1313 let compressed = &buf[*pos..*pos + comp_len];
1314 *pos += comp_len;
1315 let mut out = vec![0u8; orig_len];
1316 zstd::bulk::decompress_to_buffer(compressed, &mut out)
1317 .map_err(|e| format!("C3 Blob decompress: {e}"))?;
1318 Value::Blob(out)
1319 }
1320 _ => return Err(format!("Unknown Value type: {}", type_byte).into()),
1321 })
1322 }
1323
1324 fn write_value_binary(buf: &mut Vec<u8>, value: &Value) {
1329 use std::net::IpAddr;
1330
1331 match value {
1332 Value::Null => buf.push(0),
1333 Value::Boolean(b) => {
1334 buf.push(1);
1335 buf.push(if *b { 1 } else { 0 });
1336 }
1337 Value::Integer(i) => {
1338 buf.push(2);
1339 buf.extend_from_slice(&i.to_le_bytes());
1340 }
1341 Value::UnsignedInteger(u) => {
1342 buf.push(3);
1343 buf.extend_from_slice(&u.to_le_bytes());
1344 }
1345 Value::Float(f) => {
1346 buf.push(4);
1347 buf.extend_from_slice(&f.to_le_bytes());
1348 }
1349 Value::Text(s) => {
1350 const TEXT_COMPRESS_THRESHOLD: usize = 2048;
1354 if s.len() >= TEXT_COMPRESS_THRESHOLD {
1355 if let Ok(compressed) = zstd::bulk::compress(s.as_bytes(), 3) {
1356 if compressed.len() < s.len() {
1357 buf.push(0x85); write_varu32(buf, s.len() as u32); write_varu32(buf, compressed.len() as u32);
1360 buf.extend_from_slice(&compressed);
1361 return; }
1363 }
1364 }
1365 buf.push(5);
1366 write_varu32(buf, s.len() as u32);
1367 buf.extend_from_slice(s.as_bytes());
1368 }
1369 Value::Blob(bytes) => {
1370 const BLOB_COMPRESS_THRESHOLD: usize = 2048;
1373 if bytes.len() >= BLOB_COMPRESS_THRESHOLD {
1374 if let Ok(compressed) = zstd::bulk::compress(bytes.as_slice(), 3) {
1375 if compressed.len() < bytes.len() {
1376 buf.push(0x86); write_varu32(buf, bytes.len() as u32);
1378 write_varu32(buf, compressed.len() as u32);
1379 buf.extend_from_slice(&compressed);
1380 return;
1381 }
1382 }
1383 }
1384 buf.push(6);
1385 write_varu32(buf, bytes.len() as u32);
1386 buf.extend_from_slice(bytes);
1387 }
1388 Value::Timestamp(t) => {
1389 buf.push(7);
1390 buf.extend_from_slice(&t.to_le_bytes());
1391 }
1392 Value::Duration(d) => {
1393 buf.push(8);
1394 buf.extend_from_slice(&d.to_le_bytes());
1395 }
1396 Value::IpAddr(ip) => {
1397 buf.push(9);
1398 match ip {
1399 IpAddr::V4(v4) => {
1400 buf.push(4);
1401 buf.extend_from_slice(&v4.octets());
1402 }
1403 IpAddr::V6(v6) => {
1404 buf.push(6);
1405 buf.extend_from_slice(&v6.octets());
1406 }
1407 }
1408 }
1409 Value::MacAddr(mac) => {
1410 buf.push(10);
1411 buf.extend_from_slice(mac);
1412 }
1413 Value::Vector(vec) => {
1414 buf.push(11);
1415 write_varu32(buf, vec.len() as u32);
1416 for f in vec {
1417 buf.extend_from_slice(&f.to_le_bytes());
1418 }
1419 }
1420 Value::Json(bytes) => {
1421 buf.push(12);
1422 write_varu32(buf, bytes.len() as u32);
1423 buf.extend_from_slice(bytes);
1424 }
1425 Value::Uuid(uuid) => {
1426 buf.push(13);
1427 buf.extend_from_slice(uuid);
1428 }
1429 Value::NodeRef(s) => {
1430 buf.push(14);
1431 write_varu32(buf, s.len() as u32);
1432 buf.extend_from_slice(s.as_bytes());
1433 }
1434 Value::EdgeRef(s) => {
1435 buf.push(15);
1436 write_varu32(buf, s.len() as u32);
1437 buf.extend_from_slice(s.as_bytes());
1438 }
1439 Value::VectorRef(s, id) => {
1440 buf.push(16);
1441 write_varu32(buf, s.len() as u32);
1442 buf.extend_from_slice(s.as_bytes());
1443 buf.extend_from_slice(&id.to_le_bytes());
1444 }
1445 Value::RowRef(s, id) => {
1446 buf.push(17);
1447 write_varu32(buf, s.len() as u32);
1448 buf.extend_from_slice(s.as_bytes());
1449 buf.extend_from_slice(&id.to_le_bytes());
1450 }
1451 Value::Color(rgb) => {
1452 buf.push(18);
1453 buf.extend_from_slice(rgb);
1454 }
1455 Value::Email(s) => {
1456 buf.push(19);
1457 write_varu32(buf, s.len() as u32);
1458 buf.extend_from_slice(s.as_bytes());
1459 }
1460 Value::Url(s) => {
1461 buf.push(20);
1462 write_varu32(buf, s.len() as u32);
1463 buf.extend_from_slice(s.as_bytes());
1464 }
1465 Value::Phone(n) => {
1466 buf.push(21);
1467 buf.extend_from_slice(&n.to_le_bytes());
1468 }
1469 Value::Semver(v) => {
1470 buf.push(22);
1471 buf.extend_from_slice(&v.to_le_bytes());
1472 }
1473 Value::Cidr(ip, prefix) => {
1474 buf.push(23);
1475 buf.extend_from_slice(&ip.to_le_bytes());
1476 buf.push(*prefix);
1477 }
1478 Value::Date(d) => {
1479 buf.push(24);
1480 buf.extend_from_slice(&d.to_le_bytes());
1481 }
1482 Value::Time(t) => {
1483 buf.push(25);
1484 buf.extend_from_slice(&t.to_le_bytes());
1485 }
1486 Value::Decimal(v) => {
1487 buf.push(26);
1488 buf.extend_from_slice(&v.to_le_bytes());
1489 }
1490 Value::EnumValue(i) => {
1491 buf.push(27);
1492 buf.push(*i);
1493 }
1494 Value::Array(elems) => {
1495 buf.push(28);
1496 write_varu32(buf, elems.len() as u32);
1497 for elem in elems {
1498 Self::write_value_binary(buf, elem);
1499 }
1500 }
1501 Value::TimestampMs(v) => {
1502 buf.push(29);
1503 buf.extend_from_slice(&v.to_le_bytes());
1504 }
1505 Value::Ipv4(v) => {
1506 buf.push(30);
1507 buf.extend_from_slice(&v.to_le_bytes());
1508 }
1509 Value::Ipv6(bytes) => {
1510 buf.push(31);
1511 buf.extend_from_slice(bytes);
1512 }
1513 Value::Subnet(ip, mask) => {
1514 buf.push(32);
1515 buf.extend_from_slice(&ip.to_le_bytes());
1516 buf.extend_from_slice(&mask.to_le_bytes());
1517 }
1518 Value::Port(v) => {
1519 buf.push(33);
1520 buf.extend_from_slice(&v.to_le_bytes());
1521 }
1522 Value::Latitude(v) => {
1523 buf.push(34);
1524 buf.extend_from_slice(&v.to_le_bytes());
1525 }
1526 Value::Longitude(v) => {
1527 buf.push(35);
1528 buf.extend_from_slice(&v.to_le_bytes());
1529 }
1530 Value::GeoPoint(lat, lon) => {
1531 buf.push(36);
1532 buf.extend_from_slice(&lat.to_le_bytes());
1533 buf.extend_from_slice(&lon.to_le_bytes());
1534 }
1535 Value::Country2(c) => {
1536 buf.push(37);
1537 buf.extend_from_slice(c);
1538 }
1539 Value::Country3(c) => {
1540 buf.push(38);
1541 buf.extend_from_slice(c);
1542 }
1543 Value::Lang2(c) => {
1544 buf.push(39);
1545 buf.extend_from_slice(c);
1546 }
1547 Value::Lang5(c) => {
1548 buf.push(40);
1549 buf.extend_from_slice(c);
1550 }
1551 Value::Currency(c) => {
1552 buf.push(41);
1553 buf.extend_from_slice(c);
1554 }
1555 Value::AssetCode(code) => {
1556 buf.push(50);
1557 write_varu32(buf, code.len() as u32);
1558 buf.extend_from_slice(code.as_bytes());
1559 }
1560 Value::Money {
1561 asset_code,
1562 minor_units,
1563 scale,
1564 } => {
1565 buf.push(51);
1566 write_varu32(buf, asset_code.len() as u32);
1567 buf.extend_from_slice(asset_code.as_bytes());
1568 buf.push(*scale);
1569 buf.extend_from_slice(&minor_units.to_le_bytes());
1570 }
1571 Value::ColorAlpha(rgba) => {
1572 buf.push(42);
1573 buf.extend_from_slice(rgba);
1574 }
1575 Value::BigInt(v) => {
1576 buf.push(43);
1577 buf.extend_from_slice(&v.to_le_bytes());
1578 }
1579 Value::KeyRef(col, key) => {
1580 buf.push(44);
1581 write_varu32(buf, col.len() as u32);
1582 buf.extend_from_slice(col.as_bytes());
1583 write_varu32(buf, key.len() as u32);
1584 buf.extend_from_slice(key.as_bytes());
1585 }
1586 Value::DocRef(col, id) => {
1587 buf.push(45);
1588 write_varu32(buf, col.len() as u32);
1589 buf.extend_from_slice(col.as_bytes());
1590 buf.extend_from_slice(&id.to_le_bytes());
1591 }
1592 Value::TableRef(name) => {
1593 buf.push(46);
1594 write_varu32(buf, name.len() as u32);
1595 buf.extend_from_slice(name.as_bytes());
1596 }
1597 Value::PageRef(page_id) => {
1598 buf.push(47);
1599 buf.extend_from_slice(&page_id.to_le_bytes());
1600 }
1601 Value::Secret(bytes) => {
1602 buf.push(48);
1603 write_varu32(buf, bytes.len() as u32);
1604 buf.extend_from_slice(bytes);
1605 }
1606 Value::Password(hash) => {
1607 buf.push(49);
1608 write_varu32(buf, hash.len() as u32);
1609 buf.extend_from_slice(hash.as_bytes());
1610 }
1611 }
1612 }
1613}