1use super::*;
2
3impl UnifiedStore {
4 pub fn new() -> Self {
5 Self::with_config(UnifiedStoreConfig::default())
6 }
7
8 pub fn format_version(&self) -> u32 {
10 self.format_version.load(Ordering::SeqCst)
11 }
12
13 pub(crate) fn set_format_version(&self, version: u32) {
14 self.format_version.store(version, Ordering::SeqCst);
15 }
16
17 pub fn next_entity_id(&self) -> EntityId {
19 EntityId::new(self.next_entity_id.fetch_add(1, Ordering::SeqCst))
20 }
21
22 pub fn reserve_entity_ids(&self, n: u64) -> std::ops::Range<u64> {
25 let start = self.next_entity_id.fetch_add(n, Ordering::SeqCst);
26 start..start + n
27 }
28
29 pub(crate) fn register_entity_id(&self, id: EntityId) {
30 let candidate = id.raw().saturating_add(1);
31 let mut current = self.next_entity_id.load(Ordering::SeqCst);
32 while candidate > current {
33 match self.next_entity_id.compare_exchange(
34 current,
35 candidate,
36 Ordering::SeqCst,
37 Ordering::SeqCst,
38 ) {
39 Ok(_) => break,
40 Err(updated) => current = updated,
41 }
42 }
43 }
44
45 pub fn load_from_file(path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
57 let file = File::open(path)?;
58 let mut reader = BufReader::new(file);
59 let mut buf = Vec::new();
60 reader.read_to_end(&mut buf)?;
61 Self::load_from_bytes(&buf)
62 }
63
64 pub(crate) fn load_from_bytes(buf: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
65 Self::load_from_bytes_with_config(buf, UnifiedStoreConfig::default())
66 }
67
68 pub(crate) fn load_from_bytes_with_config(
69 buf: &[u8],
70 config: UnifiedStoreConfig,
71 ) -> Result<Self, Box<dyn std::error::Error>> {
72 let mut buf = buf.to_vec();
73 let version =
74 reddb_file::decode_native_store_header(&buf).map_err(|err| err.to_string())?;
75 let mut pos = 8;
76
77 reddb_file::verify_native_store_crc32_footer(&mut buf, version)
79 .map_err(|err| err.to_string())?;
80
81 let store = Self::with_config(config);
82 store.set_format_version(version);
83
84 let collection_count =
86 reddb_file::decode_native_dump_count(&buf, &mut pos).map_err(|e| e.to_string())?;
87
88 for _ in 0..collection_count {
90 let collection_header =
91 reddb_file::decode_native_dump_collection_header(&buf, &mut pos)
92 .map_err(|e| e.to_string())?;
93
94 for _ in 0..collection_header.entity_count {
96 if version >= STORE_VERSION_V7 {
97 let record_bytes = reddb_file::decode_native_dump_entity_record(&buf, &mut pos)
99 .map_err(|e| e.to_string())?;
100
101 let (entity, metadata) = Self::deserialize_entity_record(record_bytes, version)
102 .map_err(|e| format!("Entity record deserialization error: {e}"))?;
103
104 store.insert_auto(&collection_header.name, entity.clone())?;
105
106 if let Some(metadata) = metadata {
107 if let Some(manager) = store.get_collection(&collection_header.name) {
108 let _ = manager.set_metadata(entity.id, metadata);
109 }
110 }
111 } else {
112 let entity = Self::read_entity_binary(&buf, &mut pos, version)?;
114 store.insert_auto(&collection_header.name, entity)?;
115 }
116 }
117 }
118
119 if pos < buf.len() {
120 let cross_ref_count =
122 reddb_file::decode_native_dump_count(&buf, &mut pos).map_err(|e| e.to_string())?;
123
124 for _ in 0..cross_ref_count {
125 let cross_ref = reddb_file::decode_native_dump_cross_ref(&buf, &mut pos)
126 .map_err(|e| e.to_string())?;
127 let ref_type = RefType::from_byte(cross_ref.ref_type);
128
129 let source_collection = store
130 .get_any(EntityId::new(cross_ref.source_id))
131 .map(|(name, _)| name)
132 .unwrap_or_else(|| cross_ref.target_collection.clone());
133 let _ = store.add_cross_ref(
134 &source_collection,
135 EntityId::new(cross_ref.source_id),
136 &cross_ref.target_collection,
137 EntityId::new(cross_ref.target_id),
138 ref_type,
139 1.0,
140 );
141 }
142 }
143
144 if store.format_version() < STORE_VERSION_CURRENT {
145 store.set_format_version(STORE_VERSION_CURRENT);
146 }
147
148 Ok(store)
149 }
150
151 pub fn save_to_file(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
156 let buf = self.to_binary_dump_bytes();
157 reddb_file::write_native_store_bytes_atomically(path, &buf)?;
158 Ok(())
159 }
160
161 pub(crate) fn to_binary_dump_bytes(&self) -> Vec<u8> {
162 let mut buf = Vec::new();
163
164 buf.extend_from_slice(&reddb_file::encode_native_store_header(
167 STORE_VERSION_CURRENT,
168 ));
169
170 let collections = self.collections.read();
172 reddb_file::encode_native_dump_count(&mut buf, collections.len() as u32);
173
174 let fv = STORE_VERSION_CURRENT;
175 for (name, manager) in collections.iter() {
176 let entities = manager.query_all(|_| true);
178 reddb_file::encode_native_dump_collection_header(&mut buf, name, entities.len() as u32);
179
180 for entity in entities {
183 let metadata = manager.get_metadata(entity.id);
184 let record = Self::serialize_entity_record(&entity, metadata.as_ref(), fv);
185 reddb_file::encode_native_dump_entity_record(&mut buf, &record);
186 }
187 }
188
189 let cross_refs = self.cross_refs.read();
191 let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
192 reddb_file::encode_native_dump_count(&mut buf, total_refs as u32);
193
194 for (source_id, refs) in cross_refs.iter() {
195 for (target_id, ref_type, collection) in refs {
196 reddb_file::encode_native_dump_cross_ref(
197 &mut buf,
198 source_id.raw(),
199 target_id.raw(),
200 ref_type.to_byte(),
201 collection,
202 );
203 }
204 }
205
206 self.set_format_version(STORE_VERSION_CURRENT);
207
208 reddb_file::append_native_store_crc32_footer(&mut buf);
209 buf
210 }
211
212 pub(crate) fn read_entity_binary(
214 buf: &[u8],
215 pos: &mut usize,
216 format_version: u32,
217 ) -> Result<UnifiedEntity, Box<dyn std::error::Error>> {
218 let id = read_varu64(buf, pos).map_err(|e| format!("Failed to read entity id: {:?}", e))?;
220
221 let kind_type = buf[*pos];
223 *pos += 1;
224
225 let kind = match kind_type {
227 0 => {
228 let table_len = Self::read_varu32_safe(buf, pos)?;
230 let table = String::from_utf8(buf[*pos..*pos + table_len].to_vec())?;
231 *pos += table_len;
232 let row_id = Self::read_varu64_safe(buf, pos)?;
233 EntityKind::TableRow {
234 table: table.into(),
235 row_id,
236 }
237 }
238 1 => {
239 let label_len = Self::read_varu32_safe(buf, pos)?;
241 let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
242 *pos += label_len;
243 let node_type_len = Self::read_varu32_safe(buf, pos)?;
244 let node_type = String::from_utf8(buf[*pos..*pos + node_type_len].to_vec())?;
245 *pos += node_type_len;
246 EntityKind::GraphNode(Box::new(GraphNodeKind { label, node_type }))
247 }
248 2 => {
249 let label_len = Self::read_varu32_safe(buf, pos)?;
251 let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
252 *pos += label_len;
253 let from_node_len = Self::read_varu32_safe(buf, pos)?;
254 let from_node = String::from_utf8(buf[*pos..*pos + from_node_len].to_vec())?;
255 *pos += from_node_len;
256 let to_node_len = Self::read_varu32_safe(buf, pos)?;
257 let to_node = String::from_utf8(buf[*pos..*pos + to_node_len].to_vec())?;
258 *pos += to_node_len;
259 let weight =
260 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
261 *pos += 4;
262 EntityKind::GraphEdge(Box::new(GraphEdgeKind {
263 label,
264 from_node,
265 to_node,
266 weight,
267 }))
268 }
269 3 => {
270 let collection_len = Self::read_varu32_safe(buf, pos)?;
272 let collection = String::from_utf8(buf[*pos..*pos + collection_len].to_vec())?;
273 *pos += collection_len;
274 EntityKind::Vector { collection }
275 }
276 4 => {
277 let series_len = Self::read_varu32_safe(buf, pos)?;
279 let series = String::from_utf8(buf[*pos..*pos + series_len].to_vec())?;
280 *pos += series_len;
281 let metric_len = Self::read_varu32_safe(buf, pos)?;
282 let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
283 *pos += metric_len;
284 EntityKind::TimeSeriesPoint(Box::new(TimeSeriesPointKind { series, metric }))
285 }
286 5 => {
287 let queue_len = Self::read_varu32_safe(buf, pos)?;
289 let queue = String::from_utf8(buf[*pos..*pos + queue_len].to_vec())?;
290 *pos += queue_len;
291 let position = Self::read_varu64_safe(buf, pos)?;
292 EntityKind::QueueMessage { queue, position }
293 }
294 _ => return Err(format!("Unknown EntityKind type: {}", kind_type).into()),
295 };
296
297 let data_type = buf[*pos];
299 *pos += 1;
300
301 let mut data = match data_type {
303 0 => {
304 let col_count = Self::read_varu32_safe(buf, pos)?;
306 let mut columns = Vec::with_capacity(col_count);
307 for _ in 0..col_count {
308 columns.push(Self::read_value_binary(buf, pos)?);
309 }
310 EntityData::Row(RowData::new(columns))
311 }
312 1 => {
313 let prop_count = Self::read_varu32_safe(buf, pos)?;
315 let mut properties = HashMap::new();
316 for _ in 0..prop_count {
317 let key_len = Self::read_varu32_safe(buf, pos)?;
318 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
319 *pos += key_len;
320 let value = Self::read_value_binary(buf, pos)?;
321 properties.insert(key, value);
322 }
323 EntityData::Node(NodeData::with_properties(properties))
324 }
325 2 => {
326 let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
328 *pos += 4;
329 let weight = f32::from_le_bytes(weight_bytes);
330 let prop_count = Self::read_varu32_safe(buf, pos)?;
331 let mut properties = HashMap::new();
332 for _ in 0..prop_count {
333 let key_len = Self::read_varu32_safe(buf, pos)?;
334 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
335 *pos += key_len;
336 let value = Self::read_value_binary(buf, pos)?;
337 properties.insert(key, value);
338 }
339 let mut edge = EdgeData::new(weight);
340 edge.properties = properties;
341 EntityData::Edge(edge)
342 }
343 3 => {
344 let dim = Self::read_varu32_safe(buf, pos)?;
346 let mut dense = Vec::with_capacity(dim);
347 for _ in 0..dim {
348 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
349 *pos += 4;
350 dense.push(f32::from_le_bytes(bytes));
351 }
352 let mut vector = VectorData::new(dense);
353 if format_version >= STORE_VERSION_V6 {
354 let has_content = buf[*pos] != 0;
355 *pos += 1;
356 if has_content {
357 let content_len = Self::read_varu32_safe(buf, pos)?;
358 vector.content =
359 Some(String::from_utf8(buf[*pos..*pos + content_len].to_vec())?);
360 *pos += content_len;
361 }
362 }
363 EntityData::Vector(vector)
364 }
365 4 => {
366 let metric_len = Self::read_varu32_safe(buf, pos)?;
368 let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
369 *pos += metric_len;
370 let timestamp_ns = Self::read_varu64_safe(buf, pos)?;
371 let value_bytes = [
372 buf[*pos],
373 buf[*pos + 1],
374 buf[*pos + 2],
375 buf[*pos + 3],
376 buf[*pos + 4],
377 buf[*pos + 5],
378 buf[*pos + 6],
379 buf[*pos + 7],
380 ];
381 *pos += 8;
382 let mut tags = HashMap::new();
383 if format_version >= STORE_VERSION_V5 {
384 let tag_count = Self::read_varu32_safe(buf, pos)?;
385 tags = HashMap::with_capacity(tag_count);
386 for _ in 0..tag_count {
387 let key_len = Self::read_varu32_safe(buf, pos)?;
388 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
389 *pos += key_len;
390 let value_len = Self::read_varu32_safe(buf, pos)?;
391 let value = String::from_utf8(buf[*pos..*pos + value_len].to_vec())?;
392 *pos += value_len;
393 tags.insert(key, value);
394 }
395 }
396 EntityData::TimeSeries(crate::storage::unified::entity::TimeSeriesData {
397 metric,
398 timestamp_ns,
399 value: f64::from_le_bytes(value_bytes),
400 tags,
401 })
402 }
403 5 => {
404 let payload = Self::read_value_binary(buf, pos)?;
406 let enqueued_at_ns = Self::read_varu64_safe(buf, pos)?;
407 let attempts = Self::read_varu32_safe(buf, pos)? as u32;
408 EntityData::QueueMessage(crate::storage::unified::entity::QueueMessageData {
409 payload,
410 priority: None,
411 enqueued_at_ns,
412 attempts,
413 max_attempts: 3,
414 acked: false,
415 })
416 }
417 6 => {
418 let field_count = Self::read_varu32_safe(buf, pos)?;
420 let mut named = HashMap::with_capacity(field_count);
421 for _ in 0..field_count {
422 let key_len = Self::read_varu32_safe(buf, pos)?;
423 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
424 *pos += key_len;
425 let value = Self::read_value_binary(buf, pos)?;
426 named.insert(key, value);
427 }
428 EntityData::Row(RowData {
429 columns: Vec::new(),
430 named: Some(named),
431 schema: None,
432 })
433 }
434 _ => return Err(format!("Unknown EntityData type: {}", data_type).into()),
435 };
436
437 let created_at = Self::read_varu64_safe(buf, pos)?;
439 let updated_at = Self::read_varu64_safe(buf, pos)?;
440
441 let embedding_count = Self::read_varu32_safe(buf, pos)?;
443 let mut embeddings = Vec::with_capacity(embedding_count);
444 for _ in 0..embedding_count {
445 let name_len = Self::read_varu32_safe(buf, pos)?;
446 let name = String::from_utf8(buf[*pos..*pos + name_len].to_vec())?;
447 *pos += name_len;
448
449 let dim = Self::read_varu32_safe(buf, pos)?;
450 let mut vector = Vec::with_capacity(dim);
451 for _ in 0..dim {
452 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
453 *pos += 4;
454 vector.push(f32::from_le_bytes(bytes));
455 }
456
457 let model_len = Self::read_varu32_safe(buf, pos)?;
458 let model = String::from_utf8(buf[*pos..*pos + model_len].to_vec())?;
459 *pos += model_len;
460
461 embeddings.push(EmbeddingSlot::new(name, vector, model));
462 }
463
464 let cross_ref_count = Self::read_varu32_safe(buf, pos)?;
466 let mut cross_refs = Vec::with_capacity(cross_ref_count);
467 for _ in 0..cross_ref_count {
468 let source = Self::read_varu64_safe(buf, pos)?;
469 let target = Self::read_varu64_safe(buf, pos)?;
470 let ref_type_byte = buf[*pos];
471 *pos += 1;
472 let (target_collection, weight, created_at) = if format_version >= STORE_VERSION_V2 {
473 let coll_len = Self::read_varu32_safe(buf, pos)?;
474 let collection = String::from_utf8(buf[*pos..*pos + coll_len].to_vec())?;
475 *pos += coll_len;
476 let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
477 *pos += 4;
478 let weight = f32::from_le_bytes(weight_bytes);
479 let created_at = Self::read_varu64_safe(buf, pos)?;
480 (collection, weight, created_at)
481 } else {
482 (String::new(), 1.0, 0)
483 };
484
485 let mut cross_ref = CrossRef::new(
486 EntityId::new(source),
487 EntityId::new(target),
488 target_collection,
489 RefType::from_byte(ref_type_byte),
490 );
491 cross_ref.weight = weight;
492 cross_ref.created_at = created_at;
493 cross_refs.push(cross_ref);
494 }
495
496 let sequence_id = Self::read_varu64_safe(buf, pos)?;
498
499 if format_version >= STORE_VERSION_V4 {
500 if let EntityData::QueueMessage(message) = &mut data {
501 if *pos < buf.len() {
502 let priority_present = buf[*pos] != 0;
503 *pos += 1;
504 message.priority = if priority_present {
505 if *pos + 4 > buf.len() {
506 return Err("truncated queue priority".into());
507 }
508 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
509 *pos += 4;
510 Some(i32::from_le_bytes(bytes))
511 } else {
512 None
513 };
514 message.max_attempts = Self::read_varu32_safe(buf, pos)? as u32;
515 if *pos >= buf.len() {
516 return Err("truncated queue ack flag".into());
517 }
518 message.acked = buf[*pos] != 0;
519 *pos += 1;
520 }
521 }
522 }
523
524 let mut entity = UnifiedEntity::new(EntityId::new(id), kind, data);
525 entity.created_at = created_at;
526 entity.updated_at = updated_at;
527 entity.sequence_id = sequence_id;
528 if format_version >= STORE_VERSION_V8 && *pos < buf.len() {
529 let has_logical_id = buf[*pos] != 0;
530 *pos += 1;
531 if has_logical_id {
532 let logical_id = Self::read_varu64_safe(buf, pos)?;
533 entity.set_logical_id(EntityId::new(logical_id));
534 }
535 }
536 if format_version >= STORE_VERSION_V9 && *pos < buf.len() {
537 let xmin = Self::read_varu64_safe(buf, pos)?;
538 let xmax = Self::read_varu64_safe(buf, pos)?;
539 entity.set_xmin(xmin);
540 entity.set_xmax(xmax);
541 }
542 if !embeddings.is_empty() || !cross_refs.is_empty() {
543 entity.embeddings_mut().extend(embeddings);
544 entity.cross_refs_mut().extend(cross_refs);
545 }
546
547 Ok(entity)
548 }
549
550 fn read_varu32_safe(buf: &[u8], pos: &mut usize) -> Result<usize, Box<dyn std::error::Error>> {
552 read_varu32(buf, pos)
553 .map(|v| v as usize)
554 .map_err(|e| format!("Decode error: {:?}", e).into())
555 }
556
557 fn read_varu64_safe(buf: &[u8], pos: &mut usize) -> Result<u64, Box<dyn std::error::Error>> {
559 read_varu64(buf, pos).map_err(|e| format!("Decode error: {:?}", e).into())
560 }
561
562 pub(crate) fn write_entity_binary(
564 buf: &mut Vec<u8>,
565 entity: &UnifiedEntity,
566 format_version: u32,
567 ) {
568 write_varu64(buf, entity.id.raw());
570
571 match &entity.kind {
573 EntityKind::TableRow { table, row_id } => {
574 buf.push(0);
575 write_varu32(buf, table.len() as u32);
576 buf.extend_from_slice(table.as_bytes());
577 write_varu64(buf, *row_id);
578 }
579 EntityKind::GraphNode(ref node) => {
580 buf.push(1);
581 write_varu32(buf, node.label.len() as u32);
582 buf.extend_from_slice(node.label.as_bytes());
583 write_varu32(buf, node.node_type.len() as u32);
584 buf.extend_from_slice(node.node_type.as_bytes());
585 }
586 EntityKind::GraphEdge(ref edge) => {
587 buf.push(2);
588 write_varu32(buf, edge.label.len() as u32);
589 buf.extend_from_slice(edge.label.as_bytes());
590 write_varu32(buf, edge.from_node.len() as u32);
591 buf.extend_from_slice(edge.from_node.as_bytes());
592 write_varu32(buf, edge.to_node.len() as u32);
593 buf.extend_from_slice(edge.to_node.as_bytes());
594 buf.extend_from_slice(&edge.weight.to_le_bytes());
595 }
596 EntityKind::Vector { collection } => {
597 buf.push(3);
598 write_varu32(buf, collection.len() as u32);
599 buf.extend_from_slice(collection.as_bytes());
600 }
601 EntityKind::TimeSeriesPoint(ref ts) => {
602 buf.push(4);
603 write_varu32(buf, ts.series.len() as u32);
604 buf.extend_from_slice(ts.series.as_bytes());
605 write_varu32(buf, ts.metric.len() as u32);
606 buf.extend_from_slice(ts.metric.as_bytes());
607 }
608 EntityKind::QueueMessage { queue, position } => {
609 buf.push(5);
610 write_varu32(buf, queue.len() as u32);
611 buf.extend_from_slice(queue.as_bytes());
612 write_varu64(buf, *position);
613 }
614 }
615
616 match &entity.data {
618 EntityData::Row(row) => {
619 if let Some(ref named) = row.named {
620 buf.push(6);
627 write_varu32(buf, named.len() as u32);
628 let mut entries: Vec<_> = named.iter().collect();
629 entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
630 for (key, value) in entries {
631 write_varu32(buf, key.len() as u32);
632 buf.extend_from_slice(key.as_bytes());
633 Self::write_value_binary(buf, value);
634 }
635 } else {
636 buf.push(0);
637 write_varu32(buf, row.columns.len() as u32);
638 for col in &row.columns {
639 Self::write_value_binary(buf, col);
640 }
641 }
642 }
643 EntityData::Node(node) => {
644 buf.push(1);
645 write_varu32(buf, node.properties.len() as u32);
646 let mut entries: Vec<_> = node.properties.iter().collect();
647 entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
648 for (key, value) in entries {
649 write_varu32(buf, key.len() as u32);
650 buf.extend_from_slice(key.as_bytes());
651 Self::write_value_binary(buf, value);
652 }
653 }
654 EntityData::Edge(edge) => {
655 buf.push(2);
656 buf.extend_from_slice(&edge.weight.to_le_bytes());
657 write_varu32(buf, edge.properties.len() as u32);
658 let mut entries: Vec<_> = edge.properties.iter().collect();
659 entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
660 for (key, value) in entries {
661 write_varu32(buf, key.len() as u32);
662 buf.extend_from_slice(key.as_bytes());
663 Self::write_value_binary(buf, value);
664 }
665 }
666 EntityData::Vector(vec) => {
667 buf.push(3);
668 write_varu32(buf, vec.dense.len() as u32);
669 for f in &vec.dense {
670 buf.extend_from_slice(&f.to_le_bytes());
671 }
672 if format_version >= STORE_VERSION_V6 {
673 buf.push(u8::from(vec.content.is_some()));
674 if let Some(content) = &vec.content {
675 write_varu32(buf, content.len() as u32);
676 buf.extend_from_slice(content.as_bytes());
677 }
678 }
679 }
680 EntityData::TimeSeries(ts) => {
681 buf.push(4);
682 write_varu32(buf, ts.metric.len() as u32);
683 buf.extend_from_slice(ts.metric.as_bytes());
684 write_varu64(buf, ts.timestamp_ns);
685 buf.extend_from_slice(&ts.value.to_le_bytes());
686 if format_version >= STORE_VERSION_V5 {
687 write_varu32(buf, ts.tags.len() as u32);
688 let mut tag_entries: Vec<_> = ts.tags.iter().collect();
689 tag_entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
690 for (key, value) in tag_entries {
691 write_varu32(buf, key.len() as u32);
692 buf.extend_from_slice(key.as_bytes());
693 write_varu32(buf, value.len() as u32);
694 buf.extend_from_slice(value.as_bytes());
695 }
696 }
697 }
698 EntityData::QueueMessage(msg) => {
699 buf.push(5);
700 Self::write_value_binary(buf, &msg.payload);
701 write_varu64(buf, msg.enqueued_at_ns);
702 write_varu32(buf, msg.attempts);
703 }
704 }
705
706 write_varu64(buf, entity.created_at);
708 write_varu64(buf, entity.updated_at);
709
710 write_varu32(buf, entity.embeddings().len() as u32);
712 for emb in entity.embeddings() {
713 write_varu32(buf, emb.name.len() as u32);
714 buf.extend_from_slice(emb.name.as_bytes());
715 write_varu32(buf, emb.vector.len() as u32);
716 for f in &emb.vector {
717 buf.extend_from_slice(&f.to_le_bytes());
718 }
719 write_varu32(buf, emb.model.len() as u32);
720 buf.extend_from_slice(emb.model.as_bytes());
721 }
722
723 write_varu32(buf, entity.cross_refs().len() as u32);
725 for cross_ref in entity.cross_refs() {
726 write_varu64(buf, cross_ref.source.raw());
727 write_varu64(buf, cross_ref.target.raw());
728 buf.push(cross_ref.ref_type.to_byte());
729 if format_version >= STORE_VERSION_V2 {
730 write_varu32(buf, cross_ref.target_collection.len() as u32);
731 buf.extend_from_slice(cross_ref.target_collection.as_bytes());
732 buf.extend_from_slice(&cross_ref.weight.to_le_bytes());
733 write_varu64(buf, cross_ref.created_at);
734 }
735 }
736
737 write_varu64(buf, entity.sequence_id);
739
740 if format_version >= STORE_VERSION_V4 {
741 if let EntityData::QueueMessage(message) = &entity.data {
742 buf.push(u8::from(message.priority.is_some()));
743 if let Some(priority) = message.priority {
744 buf.extend_from_slice(&priority.to_le_bytes());
745 }
746 write_varu32(buf, message.max_attempts);
747 buf.push(u8::from(message.acked));
748 }
749 }
750
751 if format_version >= STORE_VERSION_V8 {
752 buf.push(u8::from(entity.has_explicit_logical_id()));
753 if entity.has_explicit_logical_id() {
754 write_varu64(buf, entity.logical_id().raw());
755 }
756 }
757 if format_version >= STORE_VERSION_V9 {
758 write_varu64(buf, entity.xmin);
759 write_varu64(buf, entity.xmax);
760 }
761 }
762
763 fn read_value_binary(buf: &[u8], pos: &mut usize) -> Result<Value, Box<dyn std::error::Error>> {
768 use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
769
770 let type_byte = buf[*pos];
771 *pos += 1;
772
773 Ok(match type_byte {
774 0 => Value::Null,
775 1 => {
776 let b = buf[*pos] != 0;
777 *pos += 1;
778 Value::Boolean(b)
779 }
780 2 => {
781 let val = i64::from_le_bytes([
782 buf[*pos],
783 buf[*pos + 1],
784 buf[*pos + 2],
785 buf[*pos + 3],
786 buf[*pos + 4],
787 buf[*pos + 5],
788 buf[*pos + 6],
789 buf[*pos + 7],
790 ]);
791 *pos += 8;
792 Value::Integer(val)
793 }
794 3 => {
795 let val = u64::from_le_bytes([
796 buf[*pos],
797 buf[*pos + 1],
798 buf[*pos + 2],
799 buf[*pos + 3],
800 buf[*pos + 4],
801 buf[*pos + 5],
802 buf[*pos + 6],
803 buf[*pos + 7],
804 ]);
805 *pos += 8;
806 Value::UnsignedInteger(val)
807 }
808 4 => {
809 let val = f64::from_le_bytes([
810 buf[*pos],
811 buf[*pos + 1],
812 buf[*pos + 2],
813 buf[*pos + 3],
814 buf[*pos + 4],
815 buf[*pos + 5],
816 buf[*pos + 6],
817 buf[*pos + 7],
818 ]);
819 *pos += 8;
820 Value::Float(val)
821 }
822 5 => {
823 let len = Self::read_varu32_safe(buf, pos)?;
824 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
825 *pos += len;
826 Value::text(s)
827 }
828 6 => {
829 let len = Self::read_varu32_safe(buf, pos)?;
830 let bytes = buf[*pos..*pos + len].to_vec();
831 *pos += len;
832 Value::Blob(bytes)
833 }
834 7 => {
835 let val = i64::from_le_bytes([
836 buf[*pos],
837 buf[*pos + 1],
838 buf[*pos + 2],
839 buf[*pos + 3],
840 buf[*pos + 4],
841 buf[*pos + 5],
842 buf[*pos + 6],
843 buf[*pos + 7],
844 ]);
845 *pos += 8;
846 Value::Timestamp(val)
847 }
848 8 => {
849 let val = i64::from_le_bytes([
850 buf[*pos],
851 buf[*pos + 1],
852 buf[*pos + 2],
853 buf[*pos + 3],
854 buf[*pos + 4],
855 buf[*pos + 5],
856 buf[*pos + 6],
857 buf[*pos + 7],
858 ]);
859 *pos += 8;
860 Value::Duration(val)
861 }
862 9 => {
863 let version = buf[*pos];
865 *pos += 1;
866 if version == 4 {
867 let octets = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
868 *pos += 4;
869 Value::IpAddr(IpAddr::V4(Ipv4Addr::from(octets)))
870 } else {
871 let mut octets = [0u8; 16];
872 octets.copy_from_slice(&buf[*pos..*pos + 16]);
873 *pos += 16;
874 Value::IpAddr(IpAddr::V6(Ipv6Addr::from(octets)))
875 }
876 }
877 10 => {
878 let mut mac = [0u8; 6];
879 mac.copy_from_slice(&buf[*pos..*pos + 6]);
880 *pos += 6;
881 Value::MacAddr(mac)
882 }
883 11 => {
884 let len = Self::read_varu32_safe(buf, pos)?;
885 let mut vector = Vec::with_capacity(len);
886 for _ in 0..len {
887 let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
888 *pos += 4;
889 vector.push(f32::from_le_bytes(bytes));
890 }
891 Value::Vector(vector)
892 }
893 12 => {
894 let len = Self::read_varu32_safe(buf, pos)?;
895 let bytes = buf[*pos..*pos + len].to_vec();
896 *pos += len;
897 Value::Json(bytes)
898 }
899 13 => {
900 let mut uuid = [0u8; 16];
901 uuid.copy_from_slice(&buf[*pos..*pos + 16]);
902 *pos += 16;
903 Value::Uuid(uuid)
904 }
905 14 => {
906 let len = Self::read_varu32_safe(buf, pos)?;
907 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
908 *pos += len;
909 Value::NodeRef(s)
910 }
911 15 => {
912 let len = Self::read_varu32_safe(buf, pos)?;
913 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
914 *pos += len;
915 Value::EdgeRef(s)
916 }
917 16 => {
918 let len = Self::read_varu32_safe(buf, pos)?;
919 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
920 *pos += len;
921 let id = u64::from_le_bytes([
922 buf[*pos],
923 buf[*pos + 1],
924 buf[*pos + 2],
925 buf[*pos + 3],
926 buf[*pos + 4],
927 buf[*pos + 5],
928 buf[*pos + 6],
929 buf[*pos + 7],
930 ]);
931 *pos += 8;
932 Value::VectorRef(s, id)
933 }
934 17 => {
935 let len = Self::read_varu32_safe(buf, pos)?;
936 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
937 *pos += len;
938 let id = u64::from_le_bytes([
939 buf[*pos],
940 buf[*pos + 1],
941 buf[*pos + 2],
942 buf[*pos + 3],
943 buf[*pos + 4],
944 buf[*pos + 5],
945 buf[*pos + 6],
946 buf[*pos + 7],
947 ]);
948 *pos += 8;
949 Value::RowRef(s, id)
950 }
951 18 => {
952 let rgb = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
953 *pos += 3;
954 Value::Color(rgb)
955 }
956 19 => {
957 let len = Self::read_varu32_safe(buf, pos)?;
958 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
959 *pos += len;
960 Value::Email(s)
961 }
962 20 => {
963 let len = Self::read_varu32_safe(buf, pos)?;
964 let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
965 *pos += len;
966 Value::Url(s)
967 }
968 21 => {
969 let val = u64::from_le_bytes([
970 buf[*pos],
971 buf[*pos + 1],
972 buf[*pos + 2],
973 buf[*pos + 3],
974 buf[*pos + 4],
975 buf[*pos + 5],
976 buf[*pos + 6],
977 buf[*pos + 7],
978 ]);
979 *pos += 8;
980 Value::Phone(val)
981 }
982 22 => {
983 let val =
984 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
985 *pos += 4;
986 Value::Semver(val)
987 }
988 23 => {
989 let ip =
990 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
991 *pos += 4;
992 let prefix = buf[*pos];
993 *pos += 1;
994 Value::Cidr(ip, prefix)
995 }
996 24 => {
997 let val =
998 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
999 *pos += 4;
1000 Value::Date(val)
1001 }
1002 25 => {
1003 let val =
1004 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1005 *pos += 4;
1006 Value::Time(val)
1007 }
1008 26 => {
1009 let val = i64::from_le_bytes([
1010 buf[*pos],
1011 buf[*pos + 1],
1012 buf[*pos + 2],
1013 buf[*pos + 3],
1014 buf[*pos + 4],
1015 buf[*pos + 5],
1016 buf[*pos + 6],
1017 buf[*pos + 7],
1018 ]);
1019 *pos += 8;
1020 Value::Decimal(val)
1021 }
1022 27 => {
1023 let val = buf[*pos];
1024 *pos += 1;
1025 Value::EnumValue(val)
1026 }
1027 28 => {
1028 let len = Self::read_varu32_safe(buf, pos)?;
1029 let mut elems = Vec::with_capacity(len);
1030 for _ in 0..len {
1031 elems.push(Self::read_value_binary(buf, pos)?);
1032 }
1033 Value::Array(elems)
1034 }
1035 29 => {
1036 let val = i64::from_le_bytes([
1037 buf[*pos],
1038 buf[*pos + 1],
1039 buf[*pos + 2],
1040 buf[*pos + 3],
1041 buf[*pos + 4],
1042 buf[*pos + 5],
1043 buf[*pos + 6],
1044 buf[*pos + 7],
1045 ]);
1046 *pos += 8;
1047 Value::TimestampMs(val)
1048 }
1049 30 => {
1050 let val =
1051 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1052 *pos += 4;
1053 Value::Ipv4(val)
1054 }
1055 31 => {
1056 let mut bytes = [0u8; 16];
1057 bytes.copy_from_slice(&buf[*pos..*pos + 16]);
1058 *pos += 16;
1059 Value::Ipv6(bytes)
1060 }
1061 32 => {
1062 let ip =
1063 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1064 *pos += 4;
1065 let mask =
1066 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1067 *pos += 4;
1068 Value::Subnet(ip, mask)
1069 }
1070 33 => {
1071 let val = u16::from_le_bytes([buf[*pos], buf[*pos + 1]]);
1072 *pos += 2;
1073 Value::Port(val)
1074 }
1075 34 => {
1076 let val =
1077 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1078 *pos += 4;
1079 Value::Latitude(val)
1080 }
1081 35 => {
1082 let val =
1083 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1084 *pos += 4;
1085 Value::Longitude(val)
1086 }
1087 36 => {
1088 let lat =
1089 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1090 *pos += 4;
1091 let lon =
1092 i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1093 *pos += 4;
1094 Value::GeoPoint(lat, lon)
1095 }
1096 37 => {
1097 let c = [buf[*pos], buf[*pos + 1]];
1098 *pos += 2;
1099 Value::Country2(c)
1100 }
1101 38 => {
1102 let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1103 *pos += 3;
1104 Value::Country3(c)
1105 }
1106 39 => {
1107 let c = [buf[*pos], buf[*pos + 1]];
1108 *pos += 2;
1109 Value::Lang2(c)
1110 }
1111 40 => {
1112 let c = [
1113 buf[*pos],
1114 buf[*pos + 1],
1115 buf[*pos + 2],
1116 buf[*pos + 3],
1117 buf[*pos + 4],
1118 ];
1119 *pos += 5;
1120 Value::Lang5(c)
1121 }
1122 41 => {
1123 let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1124 *pos += 3;
1125 Value::Currency(c)
1126 }
1127 42 => {
1128 let rgba = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
1129 *pos += 4;
1130 Value::ColorAlpha(rgba)
1131 }
1132 43 => {
1133 let val = i64::from_le_bytes([
1134 buf[*pos],
1135 buf[*pos + 1],
1136 buf[*pos + 2],
1137 buf[*pos + 3],
1138 buf[*pos + 4],
1139 buf[*pos + 5],
1140 buf[*pos + 6],
1141 buf[*pos + 7],
1142 ]);
1143 *pos += 8;
1144 Value::BigInt(val)
1145 }
1146 44 => {
1147 let col_len = Self::read_varu32_safe(buf, pos)?;
1148 let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1149 *pos += col_len;
1150 let key_len = Self::read_varu32_safe(buf, pos)?;
1151 let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
1152 *pos += key_len;
1153 Value::KeyRef(col, key)
1154 }
1155 45 => {
1156 let col_len = Self::read_varu32_safe(buf, pos)?;
1157 let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1158 *pos += col_len;
1159 let id = u64::from_le_bytes([
1160 buf[*pos],
1161 buf[*pos + 1],
1162 buf[*pos + 2],
1163 buf[*pos + 3],
1164 buf[*pos + 4],
1165 buf[*pos + 5],
1166 buf[*pos + 6],
1167 buf[*pos + 7],
1168 ]);
1169 *pos += 8;
1170 Value::DocRef(col, id)
1171 }
1172 46 => {
1173 let len = Self::read_varu32_safe(buf, pos)?;
1174 let name = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1175 *pos += len;
1176 Value::TableRef(name)
1177 }
1178 47 => {
1179 let val =
1180 u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1181 *pos += 4;
1182 Value::PageRef(val)
1183 }
1184 48 => {
1185 let len = Self::read_varu32_safe(buf, pos)?;
1186 let bytes = buf[*pos..*pos + len].to_vec();
1187 *pos += len;
1188 Value::Secret(bytes)
1189 }
1190 49 => {
1191 let len = Self::read_varu32_safe(buf, pos)?;
1192 let hash = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1193 *pos += len;
1194 Value::Password(hash)
1195 }
1196 50 => {
1197 let len = Self::read_varu32_safe(buf, pos)?;
1198 let code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1199 *pos += len;
1200 Value::AssetCode(code)
1201 }
1202 51 => {
1203 let len = Self::read_varu32_safe(buf, pos)?;
1204 let asset_code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1205 *pos += len;
1206 let scale = buf[*pos];
1207 *pos += 1;
1208 let minor_units = i64::from_le_bytes([
1209 buf[*pos],
1210 buf[*pos + 1],
1211 buf[*pos + 2],
1212 buf[*pos + 3],
1213 buf[*pos + 4],
1214 buf[*pos + 5],
1215 buf[*pos + 6],
1216 buf[*pos + 7],
1217 ]);
1218 *pos += 8;
1219 Value::Money {
1220 asset_code,
1221 minor_units,
1222 scale,
1223 }
1224 }
1225 0x85 => {
1227 let orig_len = Self::read_varu32_safe(buf, pos)?;
1228 let comp_len = Self::read_varu32_safe(buf, pos)?;
1229 let compressed = &buf[*pos..*pos + comp_len];
1230 *pos += comp_len;
1231 let mut out = vec![0u8; orig_len];
1232 zstd::bulk::decompress_to_buffer(compressed, &mut out)
1233 .map_err(|e| format!("C3 Text decompress: {e}"))?;
1234 Value::text(String::from_utf8(out).map_err(|e| format!("C3 Text UTF-8: {e}"))?)
1235 }
1236 0x86 => {
1237 let orig_len = Self::read_varu32_safe(buf, pos)?;
1238 let comp_len = Self::read_varu32_safe(buf, pos)?;
1239 let compressed = &buf[*pos..*pos + comp_len];
1240 *pos += comp_len;
1241 let mut out = vec![0u8; orig_len];
1242 zstd::bulk::decompress_to_buffer(compressed, &mut out)
1243 .map_err(|e| format!("C3 Blob decompress: {e}"))?;
1244 Value::Blob(out)
1245 }
1246 _ => return Err(format!("Unknown Value type: {}", type_byte).into()),
1247 })
1248 }
1249
1250 fn write_value_binary(buf: &mut Vec<u8>, value: &Value) {
1255 use std::net::IpAddr;
1256
1257 match value {
1258 Value::Null => buf.push(0),
1259 Value::Boolean(b) => {
1260 buf.push(1);
1261 buf.push(if *b { 1 } else { 0 });
1262 }
1263 Value::Integer(i) => {
1264 buf.push(2);
1265 buf.extend_from_slice(&i.to_le_bytes());
1266 }
1267 Value::UnsignedInteger(u) => {
1268 buf.push(3);
1269 buf.extend_from_slice(&u.to_le_bytes());
1270 }
1271 Value::Float(f) => {
1272 buf.push(4);
1273 buf.extend_from_slice(&f.to_le_bytes());
1274 }
1275 Value::Text(s) => {
1276 const TEXT_COMPRESS_THRESHOLD: usize = 2048;
1280 if s.len() >= TEXT_COMPRESS_THRESHOLD {
1281 if let Ok(compressed) = zstd::bulk::compress(s.as_bytes(), 3) {
1282 if compressed.len() < s.len() {
1283 buf.push(0x85); write_varu32(buf, s.len() as u32); write_varu32(buf, compressed.len() as u32);
1286 buf.extend_from_slice(&compressed);
1287 return; }
1289 }
1290 }
1291 buf.push(5);
1292 write_varu32(buf, s.len() as u32);
1293 buf.extend_from_slice(s.as_bytes());
1294 }
1295 Value::Blob(bytes) => {
1296 const BLOB_COMPRESS_THRESHOLD: usize = 2048;
1299 if bytes.len() >= BLOB_COMPRESS_THRESHOLD {
1300 if let Ok(compressed) = zstd::bulk::compress(bytes.as_slice(), 3) {
1301 if compressed.len() < bytes.len() {
1302 buf.push(0x86); write_varu32(buf, bytes.len() as u32);
1304 write_varu32(buf, compressed.len() as u32);
1305 buf.extend_from_slice(&compressed);
1306 return;
1307 }
1308 }
1309 }
1310 buf.push(6);
1311 write_varu32(buf, bytes.len() as u32);
1312 buf.extend_from_slice(bytes);
1313 }
1314 Value::Timestamp(t) => {
1315 buf.push(7);
1316 buf.extend_from_slice(&t.to_le_bytes());
1317 }
1318 Value::Duration(d) => {
1319 buf.push(8);
1320 buf.extend_from_slice(&d.to_le_bytes());
1321 }
1322 Value::IpAddr(ip) => {
1323 buf.push(9);
1324 match ip {
1325 IpAddr::V4(v4) => {
1326 buf.push(4);
1327 buf.extend_from_slice(&v4.octets());
1328 }
1329 IpAddr::V6(v6) => {
1330 buf.push(6);
1331 buf.extend_from_slice(&v6.octets());
1332 }
1333 }
1334 }
1335 Value::MacAddr(mac) => {
1336 buf.push(10);
1337 buf.extend_from_slice(mac);
1338 }
1339 Value::Vector(vec) => {
1340 buf.push(11);
1341 write_varu32(buf, vec.len() as u32);
1342 for f in vec {
1343 buf.extend_from_slice(&f.to_le_bytes());
1344 }
1345 }
1346 Value::Json(bytes) => {
1347 buf.push(12);
1348 write_varu32(buf, bytes.len() as u32);
1349 buf.extend_from_slice(bytes);
1350 }
1351 Value::Uuid(uuid) => {
1352 buf.push(13);
1353 buf.extend_from_slice(uuid);
1354 }
1355 Value::NodeRef(s) => {
1356 buf.push(14);
1357 write_varu32(buf, s.len() as u32);
1358 buf.extend_from_slice(s.as_bytes());
1359 }
1360 Value::EdgeRef(s) => {
1361 buf.push(15);
1362 write_varu32(buf, s.len() as u32);
1363 buf.extend_from_slice(s.as_bytes());
1364 }
1365 Value::VectorRef(s, id) => {
1366 buf.push(16);
1367 write_varu32(buf, s.len() as u32);
1368 buf.extend_from_slice(s.as_bytes());
1369 buf.extend_from_slice(&id.to_le_bytes());
1370 }
1371 Value::RowRef(s, id) => {
1372 buf.push(17);
1373 write_varu32(buf, s.len() as u32);
1374 buf.extend_from_slice(s.as_bytes());
1375 buf.extend_from_slice(&id.to_le_bytes());
1376 }
1377 Value::Color(rgb) => {
1378 buf.push(18);
1379 buf.extend_from_slice(rgb);
1380 }
1381 Value::Email(s) => {
1382 buf.push(19);
1383 write_varu32(buf, s.len() as u32);
1384 buf.extend_from_slice(s.as_bytes());
1385 }
1386 Value::Url(s) => {
1387 buf.push(20);
1388 write_varu32(buf, s.len() as u32);
1389 buf.extend_from_slice(s.as_bytes());
1390 }
1391 Value::Phone(n) => {
1392 buf.push(21);
1393 buf.extend_from_slice(&n.to_le_bytes());
1394 }
1395 Value::Semver(v) => {
1396 buf.push(22);
1397 buf.extend_from_slice(&v.to_le_bytes());
1398 }
1399 Value::Cidr(ip, prefix) => {
1400 buf.push(23);
1401 buf.extend_from_slice(&ip.to_le_bytes());
1402 buf.push(*prefix);
1403 }
1404 Value::Date(d) => {
1405 buf.push(24);
1406 buf.extend_from_slice(&d.to_le_bytes());
1407 }
1408 Value::Time(t) => {
1409 buf.push(25);
1410 buf.extend_from_slice(&t.to_le_bytes());
1411 }
1412 Value::Decimal(v) => {
1413 buf.push(26);
1414 buf.extend_from_slice(&v.to_le_bytes());
1415 }
1416 Value::EnumValue(i) => {
1417 buf.push(27);
1418 buf.push(*i);
1419 }
1420 Value::Array(elems) => {
1421 buf.push(28);
1422 write_varu32(buf, elems.len() as u32);
1423 for elem in elems {
1424 Self::write_value_binary(buf, elem);
1425 }
1426 }
1427 Value::TimestampMs(v) => {
1428 buf.push(29);
1429 buf.extend_from_slice(&v.to_le_bytes());
1430 }
1431 Value::Ipv4(v) => {
1432 buf.push(30);
1433 buf.extend_from_slice(&v.to_le_bytes());
1434 }
1435 Value::Ipv6(bytes) => {
1436 buf.push(31);
1437 buf.extend_from_slice(bytes);
1438 }
1439 Value::Subnet(ip, mask) => {
1440 buf.push(32);
1441 buf.extend_from_slice(&ip.to_le_bytes());
1442 buf.extend_from_slice(&mask.to_le_bytes());
1443 }
1444 Value::Port(v) => {
1445 buf.push(33);
1446 buf.extend_from_slice(&v.to_le_bytes());
1447 }
1448 Value::Latitude(v) => {
1449 buf.push(34);
1450 buf.extend_from_slice(&v.to_le_bytes());
1451 }
1452 Value::Longitude(v) => {
1453 buf.push(35);
1454 buf.extend_from_slice(&v.to_le_bytes());
1455 }
1456 Value::GeoPoint(lat, lon) => {
1457 buf.push(36);
1458 buf.extend_from_slice(&lat.to_le_bytes());
1459 buf.extend_from_slice(&lon.to_le_bytes());
1460 }
1461 Value::Country2(c) => {
1462 buf.push(37);
1463 buf.extend_from_slice(c);
1464 }
1465 Value::Country3(c) => {
1466 buf.push(38);
1467 buf.extend_from_slice(c);
1468 }
1469 Value::Lang2(c) => {
1470 buf.push(39);
1471 buf.extend_from_slice(c);
1472 }
1473 Value::Lang5(c) => {
1474 buf.push(40);
1475 buf.extend_from_slice(c);
1476 }
1477 Value::Currency(c) => {
1478 buf.push(41);
1479 buf.extend_from_slice(c);
1480 }
1481 Value::AssetCode(code) => {
1482 buf.push(50);
1483 write_varu32(buf, code.len() as u32);
1484 buf.extend_from_slice(code.as_bytes());
1485 }
1486 Value::Money {
1487 asset_code,
1488 minor_units,
1489 scale,
1490 } => {
1491 buf.push(51);
1492 write_varu32(buf, asset_code.len() as u32);
1493 buf.extend_from_slice(asset_code.as_bytes());
1494 buf.push(*scale);
1495 buf.extend_from_slice(&minor_units.to_le_bytes());
1496 }
1497 Value::ColorAlpha(rgba) => {
1498 buf.push(42);
1499 buf.extend_from_slice(rgba);
1500 }
1501 Value::BigInt(v) => {
1502 buf.push(43);
1503 buf.extend_from_slice(&v.to_le_bytes());
1504 }
1505 Value::KeyRef(col, key) => {
1506 buf.push(44);
1507 write_varu32(buf, col.len() as u32);
1508 buf.extend_from_slice(col.as_bytes());
1509 write_varu32(buf, key.len() as u32);
1510 buf.extend_from_slice(key.as_bytes());
1511 }
1512 Value::DocRef(col, id) => {
1513 buf.push(45);
1514 write_varu32(buf, col.len() as u32);
1515 buf.extend_from_slice(col.as_bytes());
1516 buf.extend_from_slice(&id.to_le_bytes());
1517 }
1518 Value::TableRef(name) => {
1519 buf.push(46);
1520 write_varu32(buf, name.len() as u32);
1521 buf.extend_from_slice(name.as_bytes());
1522 }
1523 Value::PageRef(page_id) => {
1524 buf.push(47);
1525 buf.extend_from_slice(&page_id.to_le_bytes());
1526 }
1527 Value::Secret(bytes) => {
1528 buf.push(48);
1529 write_varu32(buf, bytes.len() as u32);
1530 buf.extend_from_slice(bytes);
1531 }
1532 Value::Password(hash) => {
1533 buf.push(49);
1534 write_varu32(buf, hash.len() as u32);
1535 buf.extend_from_slice(hash.as_bytes());
1536 }
1537 }
1538 }
1539}