Skip to main content

reddb_server/storage/unified/store/
impl_file.rs

1use super::*;
2
3impl UnifiedStore {
4    pub fn new() -> Self {
5        Self::with_config(UnifiedStoreConfig::default())
6    }
7
8    /// Get the current storage format version
9    pub fn format_version(&self) -> u32 {
10        self.format_version.load(Ordering::SeqCst)
11    }
12
13    pub(crate) fn set_format_version(&self, version: u32) {
14        self.format_version.store(version, Ordering::SeqCst);
15    }
16
17    /// Allocate a global entity ID
18    pub fn next_entity_id(&self) -> EntityId {
19        EntityId::new(self.next_entity_id.fetch_add(1, Ordering::SeqCst))
20    }
21
22    /// Reserve `n` contiguous global entity IDs with one fetch_add.
23    /// Caller assigns `id = EntityId::new(start + i)` per entity.
24    pub fn reserve_entity_ids(&self, n: u64) -> std::ops::Range<u64> {
25        let start = self.next_entity_id.fetch_add(n, Ordering::SeqCst);
26        start..start + n
27    }
28
29    pub(crate) fn register_entity_id(&self, id: EntityId) {
30        let candidate = id.raw().saturating_add(1);
31        let mut current = self.next_entity_id.load(Ordering::SeqCst);
32        while candidate > current {
33            match self.next_entity_id.compare_exchange(
34                current,
35                candidate,
36                Ordering::SeqCst,
37                Ordering::SeqCst,
38            ) {
39                Ok(_) => break,
40                Err(updated) => current = updated,
41            }
42        }
43    }
44
45    /// Load store from binary file
46    ///
47    /// Binary dump framing is defined by `reddb_file::native_store`.
48    ///
49    /// Store payload shape:
50    /// ```text
51    /// [collection_count: varu32]
52    /// [collections...]
53    /// [cross_ref_count: varu32]
54    /// [cross_refs...]
55    /// ```
56    pub fn load_from_file(path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
57        let file = File::open(path)?;
58        let mut reader = BufReader::new(file);
59        let mut buf = Vec::new();
60        reader.read_to_end(&mut buf)?;
61        Self::load_from_bytes(&buf)
62    }
63
64    pub(crate) fn load_from_bytes(buf: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
65        Self::load_from_bytes_with_config(buf, UnifiedStoreConfig::default())
66    }
67
68    pub(crate) fn load_from_bytes_with_config(
69        buf: &[u8],
70        config: UnifiedStoreConfig,
71    ) -> Result<Self, Box<dyn std::error::Error>> {
72        let mut buf = buf.to_vec();
73        let version =
74            reddb_file::decode_native_store_header(&buf).map_err(|err| err.to_string())?;
75        let mut pos = 8;
76
77        // V3+ has CRC32 footer — verify integrity before parsing
78        reddb_file::verify_native_store_crc32_footer(&mut buf, version)
79            .map_err(|err| err.to_string())?;
80
81        let store = Self::with_config(config);
82        store.set_format_version(version);
83
84        // Read collection count
85        let collection_count =
86            reddb_file::decode_native_dump_count(&buf, &mut pos).map_err(|e| e.to_string())?;
87
88        // Read each collection
89        for _ in 0..collection_count {
90            let collection_header =
91                reddb_file::decode_native_dump_collection_header(&buf, &mut pos)
92                    .map_err(|e| e.to_string())?;
93
94            // Read each entity — V7+ includes metadata alongside entity data.
95            for _ in 0..collection_header.entity_count {
96                if version >= STORE_VERSION_V7 {
97                    // Length-prefixed entity+metadata record (serialize_entity_record format)
98                    let record_bytes = reddb_file::decode_native_dump_entity_record(&buf, &mut pos)
99                        .map_err(|e| e.to_string())?;
100
101                    let (entity, metadata) = Self::deserialize_entity_record(record_bytes, version)
102                        .map_err(|e| format!("Entity record deserialization error: {e}"))?;
103
104                    store.insert_auto(&collection_header.name, entity.clone())?;
105
106                    if let Some(metadata) = metadata {
107                        if let Some(manager) = store.get_collection(&collection_header.name) {
108                            let _ = manager.set_metadata(entity.id, metadata);
109                        }
110                    }
111                } else {
112                    // V1–V6: entity only, no metadata
113                    let entity = Self::read_entity_binary(&buf, &mut pos, version)?;
114                    store.insert_auto(&collection_header.name, entity)?;
115                }
116            }
117        }
118
119        if pos < buf.len() {
120            // Read cross-references section
121            let cross_ref_count =
122                reddb_file::decode_native_dump_count(&buf, &mut pos).map_err(|e| e.to_string())?;
123
124            for _ in 0..cross_ref_count {
125                let cross_ref = reddb_file::decode_native_dump_cross_ref(&buf, &mut pos)
126                    .map_err(|e| e.to_string())?;
127                let ref_type = RefType::from_byte(cross_ref.ref_type);
128
129                let source_collection = store
130                    .get_any(EntityId::new(cross_ref.source_id))
131                    .map(|(name, _)| name)
132                    .unwrap_or_else(|| cross_ref.target_collection.clone());
133                let _ = store.add_cross_ref(
134                    &source_collection,
135                    EntityId::new(cross_ref.source_id),
136                    &cross_ref.target_collection,
137                    EntityId::new(cross_ref.target_id),
138                    ref_type,
139                    1.0,
140                );
141            }
142        }
143
144        if store.format_version() < STORE_VERSION_CURRENT {
145            store.set_format_version(STORE_VERSION_CURRENT);
146        }
147
148        Ok(store)
149    }
150
151    /// Save store to binary file
152    ///
153    /// Uses compact binary encoding with varint for efficient storage.
154    /// No JSON - pure binary with pages and indices.
155    pub fn save_to_file(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
156        let buf = self.to_binary_dump_bytes();
157        reddb_file::write_native_store_bytes_atomically(path, &buf)?;
158        Ok(())
159    }
160
161    pub(crate) fn to_binary_dump_bytes(&self) -> Vec<u8> {
162        let mut buf = Vec::new();
163
164        // Version 9 includes explicit table-row logical identity plus MVCC
165        // xmin/xmax alongside the V7 metadata envelope.
166        buf.extend_from_slice(&reddb_file::encode_native_store_header(
167            STORE_VERSION_CURRENT,
168        ));
169
170        // Get all collections
171        let collections = self.collections.read();
172        reddb_file::encode_native_dump_count(&mut buf, collections.len() as u32);
173
174        let fv = STORE_VERSION_CURRENT;
175        for (name, manager) in collections.iter() {
176            // Get all entities from this collection
177            let entities = manager.query_all(|_| true);
178            reddb_file::encode_native_dump_collection_header(&mut buf, name, entities.len() as u32);
179
180            // V7+: serialize entity+metadata as a length-prefixed record.
181            // Each record: [u32 len][serialize_entity_record bytes]
182            for entity in entities {
183                let metadata = manager.get_metadata(entity.id);
184                let record = Self::serialize_entity_record(&entity, metadata.as_ref(), fv);
185                reddb_file::encode_native_dump_entity_record(&mut buf, &record);
186            }
187        }
188
189        // Write cross-references
190        let cross_refs = self.cross_refs.read();
191        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
192        reddb_file::encode_native_dump_count(&mut buf, total_refs as u32);
193
194        for (source_id, refs) in cross_refs.iter() {
195            for (target_id, ref_type, collection) in refs {
196                reddb_file::encode_native_dump_cross_ref(
197                    &mut buf,
198                    source_id.raw(),
199                    target_id.raw(),
200                    ref_type.to_byte(),
201                    collection,
202                );
203            }
204        }
205
206        self.set_format_version(STORE_VERSION_CURRENT);
207
208        reddb_file::append_native_store_crc32_footer(&mut buf);
209        buf
210    }
211
212    /// Read entity from binary buffer
213    pub(crate) fn read_entity_binary(
214        buf: &[u8],
215        pos: &mut usize,
216        format_version: u32,
217    ) -> Result<UnifiedEntity, Box<dyn std::error::Error>> {
218        // Entity ID
219        let id = read_varu64(buf, pos).map_err(|e| format!("Failed to read entity id: {:?}", e))?;
220
221        // EntityKind type byte
222        let kind_type = buf[*pos];
223        *pos += 1;
224
225        // EntityKind details
226        let kind = match kind_type {
227            0 => {
228                // TableRow
229                let table_len = Self::read_varu32_safe(buf, pos)?;
230                let table = String::from_utf8(buf[*pos..*pos + table_len].to_vec())?;
231                *pos += table_len;
232                let row_id = Self::read_varu64_safe(buf, pos)?;
233                EntityKind::TableRow {
234                    table: table.into(),
235                    row_id,
236                }
237            }
238            1 => {
239                // GraphNode
240                let label_len = Self::read_varu32_safe(buf, pos)?;
241                let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
242                *pos += label_len;
243                let node_type_len = Self::read_varu32_safe(buf, pos)?;
244                let node_type = String::from_utf8(buf[*pos..*pos + node_type_len].to_vec())?;
245                *pos += node_type_len;
246                EntityKind::GraphNode(Box::new(GraphNodeKind { label, node_type }))
247            }
248            2 => {
249                // GraphEdge
250                let label_len = Self::read_varu32_safe(buf, pos)?;
251                let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
252                *pos += label_len;
253                let from_node_len = Self::read_varu32_safe(buf, pos)?;
254                let from_node = String::from_utf8(buf[*pos..*pos + from_node_len].to_vec())?;
255                *pos += from_node_len;
256                let to_node_len = Self::read_varu32_safe(buf, pos)?;
257                let to_node = String::from_utf8(buf[*pos..*pos + to_node_len].to_vec())?;
258                *pos += to_node_len;
259                let weight =
260                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
261                *pos += 4;
262                EntityKind::GraphEdge(Box::new(GraphEdgeKind {
263                    label,
264                    from_node,
265                    to_node,
266                    weight,
267                }))
268            }
269            3 => {
270                // Vector
271                let collection_len = Self::read_varu32_safe(buf, pos)?;
272                let collection = String::from_utf8(buf[*pos..*pos + collection_len].to_vec())?;
273                *pos += collection_len;
274                EntityKind::Vector { collection }
275            }
276            4 => {
277                // TimeSeriesPoint
278                let series_len = Self::read_varu32_safe(buf, pos)?;
279                let series = String::from_utf8(buf[*pos..*pos + series_len].to_vec())?;
280                *pos += series_len;
281                let metric_len = Self::read_varu32_safe(buf, pos)?;
282                let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
283                *pos += metric_len;
284                EntityKind::TimeSeriesPoint(Box::new(TimeSeriesPointKind { series, metric }))
285            }
286            5 => {
287                // QueueMessage
288                let queue_len = Self::read_varu32_safe(buf, pos)?;
289                let queue = String::from_utf8(buf[*pos..*pos + queue_len].to_vec())?;
290                *pos += queue_len;
291                let position = Self::read_varu64_safe(buf, pos)?;
292                EntityKind::QueueMessage { queue, position }
293            }
294            _ => return Err(format!("Unknown EntityKind type: {}", kind_type).into()),
295        };
296
297        // EntityData type byte
298        let data_type = buf[*pos];
299        *pos += 1;
300
301        // EntityData
302        let mut data = match data_type {
303            0 => {
304                // Row
305                let col_count = Self::read_varu32_safe(buf, pos)?;
306                let mut columns = Vec::with_capacity(col_count);
307                for _ in 0..col_count {
308                    columns.push(Self::read_value_binary(buf, pos)?);
309                }
310                EntityData::Row(RowData::new(columns))
311            }
312            1 => {
313                // Node
314                let prop_count = Self::read_varu32_safe(buf, pos)?;
315                let mut properties = HashMap::new();
316                for _ in 0..prop_count {
317                    let key_len = Self::read_varu32_safe(buf, pos)?;
318                    let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
319                    *pos += key_len;
320                    let value = Self::read_value_binary(buf, pos)?;
321                    properties.insert(key, value);
322                }
323                EntityData::Node(NodeData::with_properties(properties))
324            }
325            2 => {
326                // Edge
327                let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
328                *pos += 4;
329                let weight = f32::from_le_bytes(weight_bytes);
330                let prop_count = Self::read_varu32_safe(buf, pos)?;
331                let mut properties = HashMap::new();
332                for _ in 0..prop_count {
333                    let key_len = Self::read_varu32_safe(buf, pos)?;
334                    let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
335                    *pos += key_len;
336                    let value = Self::read_value_binary(buf, pos)?;
337                    properties.insert(key, value);
338                }
339                let mut edge = EdgeData::new(weight);
340                edge.properties = properties;
341                EntityData::Edge(edge)
342            }
343            3 => {
344                // Vector
345                let dim = Self::read_varu32_safe(buf, pos)?;
346                let mut dense = Vec::with_capacity(dim);
347                for _ in 0..dim {
348                    let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
349                    *pos += 4;
350                    dense.push(f32::from_le_bytes(bytes));
351                }
352                let mut vector = VectorData::new(dense);
353                if format_version >= STORE_VERSION_V6 {
354                    let has_content = buf[*pos] != 0;
355                    *pos += 1;
356                    if has_content {
357                        let content_len = Self::read_varu32_safe(buf, pos)?;
358                        vector.content =
359                            Some(String::from_utf8(buf[*pos..*pos + content_len].to_vec())?);
360                        *pos += content_len;
361                    }
362                }
363                EntityData::Vector(vector)
364            }
365            4 => {
366                // TimeSeries
367                let metric_len = Self::read_varu32_safe(buf, pos)?;
368                let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
369                *pos += metric_len;
370                let timestamp_ns = Self::read_varu64_safe(buf, pos)?;
371                let value_bytes = [
372                    buf[*pos],
373                    buf[*pos + 1],
374                    buf[*pos + 2],
375                    buf[*pos + 3],
376                    buf[*pos + 4],
377                    buf[*pos + 5],
378                    buf[*pos + 6],
379                    buf[*pos + 7],
380                ];
381                *pos += 8;
382                let mut tags = HashMap::new();
383                if format_version >= STORE_VERSION_V5 {
384                    let tag_count = Self::read_varu32_safe(buf, pos)?;
385                    tags = HashMap::with_capacity(tag_count);
386                    for _ in 0..tag_count {
387                        let key_len = Self::read_varu32_safe(buf, pos)?;
388                        let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
389                        *pos += key_len;
390                        let value_len = Self::read_varu32_safe(buf, pos)?;
391                        let value = String::from_utf8(buf[*pos..*pos + value_len].to_vec())?;
392                        *pos += value_len;
393                        tags.insert(key, value);
394                    }
395                }
396                EntityData::TimeSeries(crate::storage::unified::entity::TimeSeriesData {
397                    metric,
398                    timestamp_ns,
399                    value: f64::from_le_bytes(value_bytes),
400                    tags,
401                })
402            }
403            5 => {
404                // QueueMessage
405                let payload = Self::read_value_binary(buf, pos)?;
406                let enqueued_at_ns = Self::read_varu64_safe(buf, pos)?;
407                let attempts = Self::read_varu32_safe(buf, pos)? as u32;
408                EntityData::QueueMessage(crate::storage::unified::entity::QueueMessageData {
409                    payload,
410                    priority: None,
411                    enqueued_at_ns,
412                    attempts,
413                    max_attempts: 3,
414                    acked: false,
415                })
416            }
417            6 => {
418                // Row with named HashMap
419                let field_count = Self::read_varu32_safe(buf, pos)?;
420                let mut named = HashMap::with_capacity(field_count);
421                for _ in 0..field_count {
422                    let key_len = Self::read_varu32_safe(buf, pos)?;
423                    let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
424                    *pos += key_len;
425                    let value = Self::read_value_binary(buf, pos)?;
426                    named.insert(key, value);
427                }
428                EntityData::Row(RowData {
429                    columns: Vec::new(),
430                    named: Some(named),
431                    schema: None,
432                })
433            }
434            _ => return Err(format!("Unknown EntityData type: {}", data_type).into()),
435        };
436
437        // Timestamps
438        let created_at = Self::read_varu64_safe(buf, pos)?;
439        let updated_at = Self::read_varu64_safe(buf, pos)?;
440
441        // Embeddings count
442        let embedding_count = Self::read_varu32_safe(buf, pos)?;
443        let mut embeddings = Vec::with_capacity(embedding_count);
444        for _ in 0..embedding_count {
445            let name_len = Self::read_varu32_safe(buf, pos)?;
446            let name = String::from_utf8(buf[*pos..*pos + name_len].to_vec())?;
447            *pos += name_len;
448
449            let dim = Self::read_varu32_safe(buf, pos)?;
450            let mut vector = Vec::with_capacity(dim);
451            for _ in 0..dim {
452                let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
453                *pos += 4;
454                vector.push(f32::from_le_bytes(bytes));
455            }
456
457            let model_len = Self::read_varu32_safe(buf, pos)?;
458            let model = String::from_utf8(buf[*pos..*pos + model_len].to_vec())?;
459            *pos += model_len;
460
461            embeddings.push(EmbeddingSlot::new(name, vector, model));
462        }
463
464        // Cross-refs count
465        let cross_ref_count = Self::read_varu32_safe(buf, pos)?;
466        let mut cross_refs = Vec::with_capacity(cross_ref_count);
467        for _ in 0..cross_ref_count {
468            let source = Self::read_varu64_safe(buf, pos)?;
469            let target = Self::read_varu64_safe(buf, pos)?;
470            let ref_type_byte = buf[*pos];
471            *pos += 1;
472            let (target_collection, weight, created_at) = if format_version >= STORE_VERSION_V2 {
473                let coll_len = Self::read_varu32_safe(buf, pos)?;
474                let collection = String::from_utf8(buf[*pos..*pos + coll_len].to_vec())?;
475                *pos += coll_len;
476                let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
477                *pos += 4;
478                let weight = f32::from_le_bytes(weight_bytes);
479                let created_at = Self::read_varu64_safe(buf, pos)?;
480                (collection, weight, created_at)
481            } else {
482                (String::new(), 1.0, 0)
483            };
484
485            let mut cross_ref = CrossRef::new(
486                EntityId::new(source),
487                EntityId::new(target),
488                target_collection,
489                RefType::from_byte(ref_type_byte),
490            );
491            cross_ref.weight = weight;
492            cross_ref.created_at = created_at;
493            cross_refs.push(cross_ref);
494        }
495
496        // Sequence ID
497        let sequence_id = Self::read_varu64_safe(buf, pos)?;
498
499        if format_version >= STORE_VERSION_V4 {
500            if let EntityData::QueueMessage(message) = &mut data {
501                if *pos < buf.len() {
502                    let priority_present = buf[*pos] != 0;
503                    *pos += 1;
504                    message.priority = if priority_present {
505                        if *pos + 4 > buf.len() {
506                            return Err("truncated queue priority".into());
507                        }
508                        let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
509                        *pos += 4;
510                        Some(i32::from_le_bytes(bytes))
511                    } else {
512                        None
513                    };
514                    message.max_attempts = Self::read_varu32_safe(buf, pos)? as u32;
515                    if *pos >= buf.len() {
516                        return Err("truncated queue ack flag".into());
517                    }
518                    message.acked = buf[*pos] != 0;
519                    *pos += 1;
520                }
521            }
522        }
523
524        let mut entity = UnifiedEntity::new(EntityId::new(id), kind, data);
525        entity.created_at = created_at;
526        entity.updated_at = updated_at;
527        entity.sequence_id = sequence_id;
528        if format_version >= STORE_VERSION_V8 && *pos < buf.len() {
529            let has_logical_id = buf[*pos] != 0;
530            *pos += 1;
531            if has_logical_id {
532                let logical_id = Self::read_varu64_safe(buf, pos)?;
533                entity.set_logical_id(EntityId::new(logical_id));
534            }
535        }
536        if format_version >= STORE_VERSION_V9 && *pos < buf.len() {
537            let xmin = Self::read_varu64_safe(buf, pos)?;
538            let xmax = Self::read_varu64_safe(buf, pos)?;
539            entity.set_xmin(xmin);
540            entity.set_xmax(xmax);
541        }
542        if !embeddings.is_empty() || !cross_refs.is_empty() {
543            entity.embeddings_mut().extend(embeddings);
544            entity.cross_refs_mut().extend(cross_refs);
545        }
546
547        Ok(entity)
548    }
549
550    /// Safe varu32 reader that converts DecodeError to Box<dyn Error>
551    fn read_varu32_safe(buf: &[u8], pos: &mut usize) -> Result<usize, Box<dyn std::error::Error>> {
552        read_varu32(buf, pos)
553            .map(|v| v as usize)
554            .map_err(|e| format!("Decode error: {:?}", e).into())
555    }
556
557    /// Safe varu64 reader that converts DecodeError to Box<dyn Error>
558    fn read_varu64_safe(buf: &[u8], pos: &mut usize) -> Result<u64, Box<dyn std::error::Error>> {
559        read_varu64(buf, pos).map_err(|e| format!("Decode error: {:?}", e).into())
560    }
561
562    /// Write entity to binary buffer
563    pub(crate) fn write_entity_binary(
564        buf: &mut Vec<u8>,
565        entity: &UnifiedEntity,
566        format_version: u32,
567    ) {
568        // Entity ID
569        write_varu64(buf, entity.id.raw());
570
571        // EntityKind
572        match &entity.kind {
573            EntityKind::TableRow { table, row_id } => {
574                buf.push(0);
575                write_varu32(buf, table.len() as u32);
576                buf.extend_from_slice(table.as_bytes());
577                write_varu64(buf, *row_id);
578            }
579            EntityKind::GraphNode(ref node) => {
580                buf.push(1);
581                write_varu32(buf, node.label.len() as u32);
582                buf.extend_from_slice(node.label.as_bytes());
583                write_varu32(buf, node.node_type.len() as u32);
584                buf.extend_from_slice(node.node_type.as_bytes());
585            }
586            EntityKind::GraphEdge(ref edge) => {
587                buf.push(2);
588                write_varu32(buf, edge.label.len() as u32);
589                buf.extend_from_slice(edge.label.as_bytes());
590                write_varu32(buf, edge.from_node.len() as u32);
591                buf.extend_from_slice(edge.from_node.as_bytes());
592                write_varu32(buf, edge.to_node.len() as u32);
593                buf.extend_from_slice(edge.to_node.as_bytes());
594                buf.extend_from_slice(&edge.weight.to_le_bytes());
595            }
596            EntityKind::Vector { collection } => {
597                buf.push(3);
598                write_varu32(buf, collection.len() as u32);
599                buf.extend_from_slice(collection.as_bytes());
600            }
601            EntityKind::TimeSeriesPoint(ref ts) => {
602                buf.push(4);
603                write_varu32(buf, ts.series.len() as u32);
604                buf.extend_from_slice(ts.series.as_bytes());
605                write_varu32(buf, ts.metric.len() as u32);
606                buf.extend_from_slice(ts.metric.as_bytes());
607            }
608            EntityKind::QueueMessage { queue, position } => {
609                buf.push(5);
610                write_varu32(buf, queue.len() as u32);
611                buf.extend_from_slice(queue.as_bytes());
612                write_varu64(buf, *position);
613            }
614        }
615
616        // EntityData
617        match &entity.data {
618            EntityData::Row(row) => {
619                if let Some(ref named) = row.named {
620                    // Named row: type 6 = Row with named HashMap.
621                    // Sort by key so the on-disk byte sequence is
622                    // deterministic — replication relies on hashing the
623                    // serialized record to detect divergence, and a
624                    // HashMap-iteration-order race produces spurious
625                    // mismatches.
626                    buf.push(6);
627                    write_varu32(buf, named.len() as u32);
628                    let mut entries: Vec<_> = named.iter().collect();
629                    entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
630                    for (key, value) in entries {
631                        write_varu32(buf, key.len() as u32);
632                        buf.extend_from_slice(key.as_bytes());
633                        Self::write_value_binary(buf, value);
634                    }
635                } else {
636                    buf.push(0);
637                    write_varu32(buf, row.columns.len() as u32);
638                    for col in &row.columns {
639                        Self::write_value_binary(buf, col);
640                    }
641                }
642            }
643            EntityData::Node(node) => {
644                buf.push(1);
645                write_varu32(buf, node.properties.len() as u32);
646                let mut entries: Vec<_> = node.properties.iter().collect();
647                entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
648                for (key, value) in entries {
649                    write_varu32(buf, key.len() as u32);
650                    buf.extend_from_slice(key.as_bytes());
651                    Self::write_value_binary(buf, value);
652                }
653            }
654            EntityData::Edge(edge) => {
655                buf.push(2);
656                buf.extend_from_slice(&edge.weight.to_le_bytes());
657                write_varu32(buf, edge.properties.len() as u32);
658                let mut entries: Vec<_> = edge.properties.iter().collect();
659                entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
660                for (key, value) in entries {
661                    write_varu32(buf, key.len() as u32);
662                    buf.extend_from_slice(key.as_bytes());
663                    Self::write_value_binary(buf, value);
664                }
665            }
666            EntityData::Vector(vec) => {
667                buf.push(3);
668                write_varu32(buf, vec.dense.len() as u32);
669                for f in &vec.dense {
670                    buf.extend_from_slice(&f.to_le_bytes());
671                }
672                if format_version >= STORE_VERSION_V6 {
673                    buf.push(u8::from(vec.content.is_some()));
674                    if let Some(content) = &vec.content {
675                        write_varu32(buf, content.len() as u32);
676                        buf.extend_from_slice(content.as_bytes());
677                    }
678                }
679            }
680            EntityData::TimeSeries(ts) => {
681                buf.push(4);
682                write_varu32(buf, ts.metric.len() as u32);
683                buf.extend_from_slice(ts.metric.as_bytes());
684                write_varu64(buf, ts.timestamp_ns);
685                buf.extend_from_slice(&ts.value.to_le_bytes());
686                if format_version >= STORE_VERSION_V5 {
687                    write_varu32(buf, ts.tags.len() as u32);
688                    let mut tag_entries: Vec<_> = ts.tags.iter().collect();
689                    tag_entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
690                    for (key, value) in tag_entries {
691                        write_varu32(buf, key.len() as u32);
692                        buf.extend_from_slice(key.as_bytes());
693                        write_varu32(buf, value.len() as u32);
694                        buf.extend_from_slice(value.as_bytes());
695                    }
696                }
697            }
698            EntityData::QueueMessage(msg) => {
699                buf.push(5);
700                Self::write_value_binary(buf, &msg.payload);
701                write_varu64(buf, msg.enqueued_at_ns);
702                write_varu32(buf, msg.attempts);
703            }
704        }
705
706        // Timestamps
707        write_varu64(buf, entity.created_at);
708        write_varu64(buf, entity.updated_at);
709
710        // Embeddings
711        write_varu32(buf, entity.embeddings().len() as u32);
712        for emb in entity.embeddings() {
713            write_varu32(buf, emb.name.len() as u32);
714            buf.extend_from_slice(emb.name.as_bytes());
715            write_varu32(buf, emb.vector.len() as u32);
716            for f in &emb.vector {
717                buf.extend_from_slice(&f.to_le_bytes());
718            }
719            write_varu32(buf, emb.model.len() as u32);
720            buf.extend_from_slice(emb.model.as_bytes());
721        }
722
723        // Cross-refs
724        write_varu32(buf, entity.cross_refs().len() as u32);
725        for cross_ref in entity.cross_refs() {
726            write_varu64(buf, cross_ref.source.raw());
727            write_varu64(buf, cross_ref.target.raw());
728            buf.push(cross_ref.ref_type.to_byte());
729            if format_version >= STORE_VERSION_V2 {
730                write_varu32(buf, cross_ref.target_collection.len() as u32);
731                buf.extend_from_slice(cross_ref.target_collection.as_bytes());
732                buf.extend_from_slice(&cross_ref.weight.to_le_bytes());
733                write_varu64(buf, cross_ref.created_at);
734            }
735        }
736
737        // Sequence ID
738        write_varu64(buf, entity.sequence_id);
739
740        if format_version >= STORE_VERSION_V4 {
741            if let EntityData::QueueMessage(message) = &entity.data {
742                buf.push(u8::from(message.priority.is_some()));
743                if let Some(priority) = message.priority {
744                    buf.extend_from_slice(&priority.to_le_bytes());
745                }
746                write_varu32(buf, message.max_attempts);
747                buf.push(u8::from(message.acked));
748            }
749        }
750
751        if format_version >= STORE_VERSION_V8 {
752            buf.push(u8::from(entity.has_explicit_logical_id()));
753            if entity.has_explicit_logical_id() {
754                write_varu64(buf, entity.logical_id().raw());
755            }
756        }
757        if format_version >= STORE_VERSION_V9 {
758            write_varu64(buf, entity.xmin);
759            write_varu64(buf, entity.xmax);
760        }
761    }
762
763    /// Read a Value from binary buffer
764    /// Type bytes: 0=Null, 1=Boolean, 2=Integer, 3=UnsignedInteger, 4=Float,
765    /// 5=Text, 6=Blob, 7=Timestamp, 8=Duration, 9=IpAddr, 10=MacAddr,
766    /// 11=Vector, 12=Json, 13=Uuid, 14=NodeRef, 15=EdgeRef, 16=VectorRef, 17=RowRef
767    fn read_value_binary(buf: &[u8], pos: &mut usize) -> Result<Value, Box<dyn std::error::Error>> {
768        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
769
770        let type_byte = buf[*pos];
771        *pos += 1;
772
773        Ok(match type_byte {
774            0 => Value::Null,
775            1 => {
776                let b = buf[*pos] != 0;
777                *pos += 1;
778                Value::Boolean(b)
779            }
780            2 => {
781                let val = i64::from_le_bytes([
782                    buf[*pos],
783                    buf[*pos + 1],
784                    buf[*pos + 2],
785                    buf[*pos + 3],
786                    buf[*pos + 4],
787                    buf[*pos + 5],
788                    buf[*pos + 6],
789                    buf[*pos + 7],
790                ]);
791                *pos += 8;
792                Value::Integer(val)
793            }
794            3 => {
795                let val = u64::from_le_bytes([
796                    buf[*pos],
797                    buf[*pos + 1],
798                    buf[*pos + 2],
799                    buf[*pos + 3],
800                    buf[*pos + 4],
801                    buf[*pos + 5],
802                    buf[*pos + 6],
803                    buf[*pos + 7],
804                ]);
805                *pos += 8;
806                Value::UnsignedInteger(val)
807            }
808            4 => {
809                let val = f64::from_le_bytes([
810                    buf[*pos],
811                    buf[*pos + 1],
812                    buf[*pos + 2],
813                    buf[*pos + 3],
814                    buf[*pos + 4],
815                    buf[*pos + 5],
816                    buf[*pos + 6],
817                    buf[*pos + 7],
818                ]);
819                *pos += 8;
820                Value::Float(val)
821            }
822            5 => {
823                let len = Self::read_varu32_safe(buf, pos)?;
824                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
825                *pos += len;
826                Value::text(s)
827            }
828            6 => {
829                let len = Self::read_varu32_safe(buf, pos)?;
830                let bytes = buf[*pos..*pos + len].to_vec();
831                *pos += len;
832                Value::Blob(bytes)
833            }
834            7 => {
835                let val = i64::from_le_bytes([
836                    buf[*pos],
837                    buf[*pos + 1],
838                    buf[*pos + 2],
839                    buf[*pos + 3],
840                    buf[*pos + 4],
841                    buf[*pos + 5],
842                    buf[*pos + 6],
843                    buf[*pos + 7],
844                ]);
845                *pos += 8;
846                Value::Timestamp(val)
847            }
848            8 => {
849                let val = i64::from_le_bytes([
850                    buf[*pos],
851                    buf[*pos + 1],
852                    buf[*pos + 2],
853                    buf[*pos + 3],
854                    buf[*pos + 4],
855                    buf[*pos + 5],
856                    buf[*pos + 6],
857                    buf[*pos + 7],
858                ]);
859                *pos += 8;
860                Value::Duration(val)
861            }
862            9 => {
863                // IpAddr: first byte = version (4 or 6)
864                let version = buf[*pos];
865                *pos += 1;
866                if version == 4 {
867                    let octets = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
868                    *pos += 4;
869                    Value::IpAddr(IpAddr::V4(Ipv4Addr::from(octets)))
870                } else {
871                    let mut octets = [0u8; 16];
872                    octets.copy_from_slice(&buf[*pos..*pos + 16]);
873                    *pos += 16;
874                    Value::IpAddr(IpAddr::V6(Ipv6Addr::from(octets)))
875                }
876            }
877            10 => {
878                let mut mac = [0u8; 6];
879                mac.copy_from_slice(&buf[*pos..*pos + 6]);
880                *pos += 6;
881                Value::MacAddr(mac)
882            }
883            11 => {
884                let len = Self::read_varu32_safe(buf, pos)?;
885                let mut vector = Vec::with_capacity(len);
886                for _ in 0..len {
887                    let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
888                    *pos += 4;
889                    vector.push(f32::from_le_bytes(bytes));
890                }
891                Value::Vector(vector)
892            }
893            12 => {
894                let len = Self::read_varu32_safe(buf, pos)?;
895                let bytes = buf[*pos..*pos + len].to_vec();
896                *pos += len;
897                Value::Json(bytes)
898            }
899            13 => {
900                let mut uuid = [0u8; 16];
901                uuid.copy_from_slice(&buf[*pos..*pos + 16]);
902                *pos += 16;
903                Value::Uuid(uuid)
904            }
905            14 => {
906                let len = Self::read_varu32_safe(buf, pos)?;
907                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
908                *pos += len;
909                Value::NodeRef(s)
910            }
911            15 => {
912                let len = Self::read_varu32_safe(buf, pos)?;
913                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
914                *pos += len;
915                Value::EdgeRef(s)
916            }
917            16 => {
918                let len = Self::read_varu32_safe(buf, pos)?;
919                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
920                *pos += len;
921                let id = u64::from_le_bytes([
922                    buf[*pos],
923                    buf[*pos + 1],
924                    buf[*pos + 2],
925                    buf[*pos + 3],
926                    buf[*pos + 4],
927                    buf[*pos + 5],
928                    buf[*pos + 6],
929                    buf[*pos + 7],
930                ]);
931                *pos += 8;
932                Value::VectorRef(s, id)
933            }
934            17 => {
935                let len = Self::read_varu32_safe(buf, pos)?;
936                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
937                *pos += len;
938                let id = u64::from_le_bytes([
939                    buf[*pos],
940                    buf[*pos + 1],
941                    buf[*pos + 2],
942                    buf[*pos + 3],
943                    buf[*pos + 4],
944                    buf[*pos + 5],
945                    buf[*pos + 6],
946                    buf[*pos + 7],
947                ]);
948                *pos += 8;
949                Value::RowRef(s, id)
950            }
951            18 => {
952                let rgb = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
953                *pos += 3;
954                Value::Color(rgb)
955            }
956            19 => {
957                let len = Self::read_varu32_safe(buf, pos)?;
958                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
959                *pos += len;
960                Value::Email(s)
961            }
962            20 => {
963                let len = Self::read_varu32_safe(buf, pos)?;
964                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
965                *pos += len;
966                Value::Url(s)
967            }
968            21 => {
969                let val = u64::from_le_bytes([
970                    buf[*pos],
971                    buf[*pos + 1],
972                    buf[*pos + 2],
973                    buf[*pos + 3],
974                    buf[*pos + 4],
975                    buf[*pos + 5],
976                    buf[*pos + 6],
977                    buf[*pos + 7],
978                ]);
979                *pos += 8;
980                Value::Phone(val)
981            }
982            22 => {
983                let val =
984                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
985                *pos += 4;
986                Value::Semver(val)
987            }
988            23 => {
989                let ip =
990                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
991                *pos += 4;
992                let prefix = buf[*pos];
993                *pos += 1;
994                Value::Cidr(ip, prefix)
995            }
996            24 => {
997                let val =
998                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
999                *pos += 4;
1000                Value::Date(val)
1001            }
1002            25 => {
1003                let val =
1004                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1005                *pos += 4;
1006                Value::Time(val)
1007            }
1008            26 => {
1009                let val = i64::from_le_bytes([
1010                    buf[*pos],
1011                    buf[*pos + 1],
1012                    buf[*pos + 2],
1013                    buf[*pos + 3],
1014                    buf[*pos + 4],
1015                    buf[*pos + 5],
1016                    buf[*pos + 6],
1017                    buf[*pos + 7],
1018                ]);
1019                *pos += 8;
1020                Value::Decimal(val)
1021            }
1022            27 => {
1023                let val = buf[*pos];
1024                *pos += 1;
1025                Value::EnumValue(val)
1026            }
1027            28 => {
1028                let len = Self::read_varu32_safe(buf, pos)?;
1029                let mut elems = Vec::with_capacity(len);
1030                for _ in 0..len {
1031                    elems.push(Self::read_value_binary(buf, pos)?);
1032                }
1033                Value::Array(elems)
1034            }
1035            29 => {
1036                let val = i64::from_le_bytes([
1037                    buf[*pos],
1038                    buf[*pos + 1],
1039                    buf[*pos + 2],
1040                    buf[*pos + 3],
1041                    buf[*pos + 4],
1042                    buf[*pos + 5],
1043                    buf[*pos + 6],
1044                    buf[*pos + 7],
1045                ]);
1046                *pos += 8;
1047                Value::TimestampMs(val)
1048            }
1049            30 => {
1050                let val =
1051                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1052                *pos += 4;
1053                Value::Ipv4(val)
1054            }
1055            31 => {
1056                let mut bytes = [0u8; 16];
1057                bytes.copy_from_slice(&buf[*pos..*pos + 16]);
1058                *pos += 16;
1059                Value::Ipv6(bytes)
1060            }
1061            32 => {
1062                let ip =
1063                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1064                *pos += 4;
1065                let mask =
1066                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1067                *pos += 4;
1068                Value::Subnet(ip, mask)
1069            }
1070            33 => {
1071                let val = u16::from_le_bytes([buf[*pos], buf[*pos + 1]]);
1072                *pos += 2;
1073                Value::Port(val)
1074            }
1075            34 => {
1076                let val =
1077                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1078                *pos += 4;
1079                Value::Latitude(val)
1080            }
1081            35 => {
1082                let val =
1083                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1084                *pos += 4;
1085                Value::Longitude(val)
1086            }
1087            36 => {
1088                let lat =
1089                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1090                *pos += 4;
1091                let lon =
1092                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1093                *pos += 4;
1094                Value::GeoPoint(lat, lon)
1095            }
1096            37 => {
1097                let c = [buf[*pos], buf[*pos + 1]];
1098                *pos += 2;
1099                Value::Country2(c)
1100            }
1101            38 => {
1102                let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1103                *pos += 3;
1104                Value::Country3(c)
1105            }
1106            39 => {
1107                let c = [buf[*pos], buf[*pos + 1]];
1108                *pos += 2;
1109                Value::Lang2(c)
1110            }
1111            40 => {
1112                let c = [
1113                    buf[*pos],
1114                    buf[*pos + 1],
1115                    buf[*pos + 2],
1116                    buf[*pos + 3],
1117                    buf[*pos + 4],
1118                ];
1119                *pos += 5;
1120                Value::Lang5(c)
1121            }
1122            41 => {
1123                let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1124                *pos += 3;
1125                Value::Currency(c)
1126            }
1127            42 => {
1128                let rgba = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
1129                *pos += 4;
1130                Value::ColorAlpha(rgba)
1131            }
1132            43 => {
1133                let val = i64::from_le_bytes([
1134                    buf[*pos],
1135                    buf[*pos + 1],
1136                    buf[*pos + 2],
1137                    buf[*pos + 3],
1138                    buf[*pos + 4],
1139                    buf[*pos + 5],
1140                    buf[*pos + 6],
1141                    buf[*pos + 7],
1142                ]);
1143                *pos += 8;
1144                Value::BigInt(val)
1145            }
1146            44 => {
1147                let col_len = Self::read_varu32_safe(buf, pos)?;
1148                let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1149                *pos += col_len;
1150                let key_len = Self::read_varu32_safe(buf, pos)?;
1151                let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
1152                *pos += key_len;
1153                Value::KeyRef(col, key)
1154            }
1155            45 => {
1156                let col_len = Self::read_varu32_safe(buf, pos)?;
1157                let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1158                *pos += col_len;
1159                let id = u64::from_le_bytes([
1160                    buf[*pos],
1161                    buf[*pos + 1],
1162                    buf[*pos + 2],
1163                    buf[*pos + 3],
1164                    buf[*pos + 4],
1165                    buf[*pos + 5],
1166                    buf[*pos + 6],
1167                    buf[*pos + 7],
1168                ]);
1169                *pos += 8;
1170                Value::DocRef(col, id)
1171            }
1172            46 => {
1173                let len = Self::read_varu32_safe(buf, pos)?;
1174                let name = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1175                *pos += len;
1176                Value::TableRef(name)
1177            }
1178            47 => {
1179                let val =
1180                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1181                *pos += 4;
1182                Value::PageRef(val)
1183            }
1184            48 => {
1185                let len = Self::read_varu32_safe(buf, pos)?;
1186                let bytes = buf[*pos..*pos + len].to_vec();
1187                *pos += len;
1188                Value::Secret(bytes)
1189            }
1190            49 => {
1191                let len = Self::read_varu32_safe(buf, pos)?;
1192                let hash = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1193                *pos += len;
1194                Value::Password(hash)
1195            }
1196            50 => {
1197                let len = Self::read_varu32_safe(buf, pos)?;
1198                let code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1199                *pos += len;
1200                Value::AssetCode(code)
1201            }
1202            51 => {
1203                let len = Self::read_varu32_safe(buf, pos)?;
1204                let asset_code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1205                *pos += len;
1206                let scale = buf[*pos];
1207                *pos += 1;
1208                let minor_units = i64::from_le_bytes([
1209                    buf[*pos],
1210                    buf[*pos + 1],
1211                    buf[*pos + 2],
1212                    buf[*pos + 3],
1213                    buf[*pos + 4],
1214                    buf[*pos + 5],
1215                    buf[*pos + 6],
1216                    buf[*pos + 7],
1217                ]);
1218                *pos += 8;
1219                Value::Money {
1220                    asset_code,
1221                    minor_units,
1222                    scale,
1223                }
1224            }
1225            // C3 TOAST: compressed Text (0x85) and Blob (0x86).
1226            0x85 => {
1227                let orig_len = Self::read_varu32_safe(buf, pos)?;
1228                let comp_len = Self::read_varu32_safe(buf, pos)?;
1229                let compressed = &buf[*pos..*pos + comp_len];
1230                *pos += comp_len;
1231                let mut out = vec![0u8; orig_len];
1232                zstd::bulk::decompress_to_buffer(compressed, &mut out)
1233                    .map_err(|e| format!("C3 Text decompress: {e}"))?;
1234                Value::text(String::from_utf8(out).map_err(|e| format!("C3 Text UTF-8: {e}"))?)
1235            }
1236            0x86 => {
1237                let orig_len = Self::read_varu32_safe(buf, pos)?;
1238                let comp_len = Self::read_varu32_safe(buf, pos)?;
1239                let compressed = &buf[*pos..*pos + comp_len];
1240                *pos += comp_len;
1241                let mut out = vec![0u8; orig_len];
1242                zstd::bulk::decompress_to_buffer(compressed, &mut out)
1243                    .map_err(|e| format!("C3 Blob decompress: {e}"))?;
1244                Value::Blob(out)
1245            }
1246            _ => return Err(format!("Unknown Value type: {}", type_byte).into()),
1247        })
1248    }
1249
1250    /// Write a Value to binary buffer
1251    /// Type bytes: 0=Null, 1=Boolean, 2=Integer, 3=UnsignedInteger, 4=Float,
1252    /// 5=Text, 6=Blob, 7=Timestamp, 8=Duration, 9=IpAddr, 10=MacAddr,
1253    /// 11=Vector, 12=Json, 13=Uuid, 14=NodeRef, 15=EdgeRef, 16=VectorRef, 17=RowRef
1254    fn write_value_binary(buf: &mut Vec<u8>, value: &Value) {
1255        use std::net::IpAddr;
1256
1257        match value {
1258            Value::Null => buf.push(0),
1259            Value::Boolean(b) => {
1260                buf.push(1);
1261                buf.push(if *b { 1 } else { 0 });
1262            }
1263            Value::Integer(i) => {
1264                buf.push(2);
1265                buf.extend_from_slice(&i.to_le_bytes());
1266            }
1267            Value::UnsignedInteger(u) => {
1268                buf.push(3);
1269                buf.extend_from_slice(&u.to_le_bytes());
1270            }
1271            Value::Float(f) => {
1272                buf.push(4);
1273                buf.extend_from_slice(&f.to_le_bytes());
1274            }
1275            Value::Text(s) => {
1276                // C3 TOAST: compress Text values > 2 KiB with zstd.
1277                // Type 0x85 = compressed Text; type 5 = uncompressed (existing).
1278                // Layout for 0x85: [0x85][orig_len: varuint][comp_len: varuint][compressed bytes]
1279                const TEXT_COMPRESS_THRESHOLD: usize = 2048;
1280                if s.len() >= TEXT_COMPRESS_THRESHOLD {
1281                    if let Ok(compressed) = zstd::bulk::compress(s.as_bytes(), 3) {
1282                        if compressed.len() < s.len() {
1283                            buf.push(0x85); // compressed Text marker
1284                            write_varu32(buf, s.len() as u32); // orig_len (decompression hint)
1285                            write_varu32(buf, compressed.len() as u32);
1286                            buf.extend_from_slice(&compressed);
1287                            return; // written — skip uncompressed path
1288                        }
1289                    }
1290                }
1291                buf.push(5);
1292                write_varu32(buf, s.len() as u32);
1293                buf.extend_from_slice(s.as_bytes());
1294            }
1295            Value::Blob(bytes) => {
1296                // C3 TOAST: compress Blob values > 2 KiB with zstd.
1297                // Type 0x86 = compressed Blob; type 6 = uncompressed (existing).
1298                const BLOB_COMPRESS_THRESHOLD: usize = 2048;
1299                if bytes.len() >= BLOB_COMPRESS_THRESHOLD {
1300                    if let Ok(compressed) = zstd::bulk::compress(bytes.as_slice(), 3) {
1301                        if compressed.len() < bytes.len() {
1302                            buf.push(0x86); // compressed Blob marker
1303                            write_varu32(buf, bytes.len() as u32);
1304                            write_varu32(buf, compressed.len() as u32);
1305                            buf.extend_from_slice(&compressed);
1306                            return;
1307                        }
1308                    }
1309                }
1310                buf.push(6);
1311                write_varu32(buf, bytes.len() as u32);
1312                buf.extend_from_slice(bytes);
1313            }
1314            Value::Timestamp(t) => {
1315                buf.push(7);
1316                buf.extend_from_slice(&t.to_le_bytes());
1317            }
1318            Value::Duration(d) => {
1319                buf.push(8);
1320                buf.extend_from_slice(&d.to_le_bytes());
1321            }
1322            Value::IpAddr(ip) => {
1323                buf.push(9);
1324                match ip {
1325                    IpAddr::V4(v4) => {
1326                        buf.push(4);
1327                        buf.extend_from_slice(&v4.octets());
1328                    }
1329                    IpAddr::V6(v6) => {
1330                        buf.push(6);
1331                        buf.extend_from_slice(&v6.octets());
1332                    }
1333                }
1334            }
1335            Value::MacAddr(mac) => {
1336                buf.push(10);
1337                buf.extend_from_slice(mac);
1338            }
1339            Value::Vector(vec) => {
1340                buf.push(11);
1341                write_varu32(buf, vec.len() as u32);
1342                for f in vec {
1343                    buf.extend_from_slice(&f.to_le_bytes());
1344                }
1345            }
1346            Value::Json(bytes) => {
1347                buf.push(12);
1348                write_varu32(buf, bytes.len() as u32);
1349                buf.extend_from_slice(bytes);
1350            }
1351            Value::Uuid(uuid) => {
1352                buf.push(13);
1353                buf.extend_from_slice(uuid);
1354            }
1355            Value::NodeRef(s) => {
1356                buf.push(14);
1357                write_varu32(buf, s.len() as u32);
1358                buf.extend_from_slice(s.as_bytes());
1359            }
1360            Value::EdgeRef(s) => {
1361                buf.push(15);
1362                write_varu32(buf, s.len() as u32);
1363                buf.extend_from_slice(s.as_bytes());
1364            }
1365            Value::VectorRef(s, id) => {
1366                buf.push(16);
1367                write_varu32(buf, s.len() as u32);
1368                buf.extend_from_slice(s.as_bytes());
1369                buf.extend_from_slice(&id.to_le_bytes());
1370            }
1371            Value::RowRef(s, id) => {
1372                buf.push(17);
1373                write_varu32(buf, s.len() as u32);
1374                buf.extend_from_slice(s.as_bytes());
1375                buf.extend_from_slice(&id.to_le_bytes());
1376            }
1377            Value::Color(rgb) => {
1378                buf.push(18);
1379                buf.extend_from_slice(rgb);
1380            }
1381            Value::Email(s) => {
1382                buf.push(19);
1383                write_varu32(buf, s.len() as u32);
1384                buf.extend_from_slice(s.as_bytes());
1385            }
1386            Value::Url(s) => {
1387                buf.push(20);
1388                write_varu32(buf, s.len() as u32);
1389                buf.extend_from_slice(s.as_bytes());
1390            }
1391            Value::Phone(n) => {
1392                buf.push(21);
1393                buf.extend_from_slice(&n.to_le_bytes());
1394            }
1395            Value::Semver(v) => {
1396                buf.push(22);
1397                buf.extend_from_slice(&v.to_le_bytes());
1398            }
1399            Value::Cidr(ip, prefix) => {
1400                buf.push(23);
1401                buf.extend_from_slice(&ip.to_le_bytes());
1402                buf.push(*prefix);
1403            }
1404            Value::Date(d) => {
1405                buf.push(24);
1406                buf.extend_from_slice(&d.to_le_bytes());
1407            }
1408            Value::Time(t) => {
1409                buf.push(25);
1410                buf.extend_from_slice(&t.to_le_bytes());
1411            }
1412            Value::Decimal(v) => {
1413                buf.push(26);
1414                buf.extend_from_slice(&v.to_le_bytes());
1415            }
1416            Value::EnumValue(i) => {
1417                buf.push(27);
1418                buf.push(*i);
1419            }
1420            Value::Array(elems) => {
1421                buf.push(28);
1422                write_varu32(buf, elems.len() as u32);
1423                for elem in elems {
1424                    Self::write_value_binary(buf, elem);
1425                }
1426            }
1427            Value::TimestampMs(v) => {
1428                buf.push(29);
1429                buf.extend_from_slice(&v.to_le_bytes());
1430            }
1431            Value::Ipv4(v) => {
1432                buf.push(30);
1433                buf.extend_from_slice(&v.to_le_bytes());
1434            }
1435            Value::Ipv6(bytes) => {
1436                buf.push(31);
1437                buf.extend_from_slice(bytes);
1438            }
1439            Value::Subnet(ip, mask) => {
1440                buf.push(32);
1441                buf.extend_from_slice(&ip.to_le_bytes());
1442                buf.extend_from_slice(&mask.to_le_bytes());
1443            }
1444            Value::Port(v) => {
1445                buf.push(33);
1446                buf.extend_from_slice(&v.to_le_bytes());
1447            }
1448            Value::Latitude(v) => {
1449                buf.push(34);
1450                buf.extend_from_slice(&v.to_le_bytes());
1451            }
1452            Value::Longitude(v) => {
1453                buf.push(35);
1454                buf.extend_from_slice(&v.to_le_bytes());
1455            }
1456            Value::GeoPoint(lat, lon) => {
1457                buf.push(36);
1458                buf.extend_from_slice(&lat.to_le_bytes());
1459                buf.extend_from_slice(&lon.to_le_bytes());
1460            }
1461            Value::Country2(c) => {
1462                buf.push(37);
1463                buf.extend_from_slice(c);
1464            }
1465            Value::Country3(c) => {
1466                buf.push(38);
1467                buf.extend_from_slice(c);
1468            }
1469            Value::Lang2(c) => {
1470                buf.push(39);
1471                buf.extend_from_slice(c);
1472            }
1473            Value::Lang5(c) => {
1474                buf.push(40);
1475                buf.extend_from_slice(c);
1476            }
1477            Value::Currency(c) => {
1478                buf.push(41);
1479                buf.extend_from_slice(c);
1480            }
1481            Value::AssetCode(code) => {
1482                buf.push(50);
1483                write_varu32(buf, code.len() as u32);
1484                buf.extend_from_slice(code.as_bytes());
1485            }
1486            Value::Money {
1487                asset_code,
1488                minor_units,
1489                scale,
1490            } => {
1491                buf.push(51);
1492                write_varu32(buf, asset_code.len() as u32);
1493                buf.extend_from_slice(asset_code.as_bytes());
1494                buf.push(*scale);
1495                buf.extend_from_slice(&minor_units.to_le_bytes());
1496            }
1497            Value::ColorAlpha(rgba) => {
1498                buf.push(42);
1499                buf.extend_from_slice(rgba);
1500            }
1501            Value::BigInt(v) => {
1502                buf.push(43);
1503                buf.extend_from_slice(&v.to_le_bytes());
1504            }
1505            Value::KeyRef(col, key) => {
1506                buf.push(44);
1507                write_varu32(buf, col.len() as u32);
1508                buf.extend_from_slice(col.as_bytes());
1509                write_varu32(buf, key.len() as u32);
1510                buf.extend_from_slice(key.as_bytes());
1511            }
1512            Value::DocRef(col, id) => {
1513                buf.push(45);
1514                write_varu32(buf, col.len() as u32);
1515                buf.extend_from_slice(col.as_bytes());
1516                buf.extend_from_slice(&id.to_le_bytes());
1517            }
1518            Value::TableRef(name) => {
1519                buf.push(46);
1520                write_varu32(buf, name.len() as u32);
1521                buf.extend_from_slice(name.as_bytes());
1522            }
1523            Value::PageRef(page_id) => {
1524                buf.push(47);
1525                buf.extend_from_slice(&page_id.to_le_bytes());
1526            }
1527            Value::Secret(bytes) => {
1528                buf.push(48);
1529                write_varu32(buf, bytes.len() as u32);
1530                buf.extend_from_slice(bytes);
1531            }
1532            Value::Password(hash) => {
1533                buf.push(49);
1534                write_varu32(buf, hash.len() as u32);
1535                buf.extend_from_slice(hash.as_bytes());
1536            }
1537        }
1538    }
1539}