Skip to main content

reddb_server/storage/unified/store/
impl_file.rs

1use super::*;
2
3impl UnifiedStore {
4    pub fn new() -> Self {
5        Self::with_config(UnifiedStoreConfig::default())
6    }
7
8    /// Get the current storage format version
9    pub fn format_version(&self) -> u32 {
10        self.format_version.load(Ordering::SeqCst)
11    }
12
13    pub(crate) fn set_format_version(&self, version: u32) {
14        self.format_version.store(version, Ordering::SeqCst);
15    }
16
17    /// Allocate a global entity ID
18    pub fn next_entity_id(&self) -> EntityId {
19        EntityId::new(self.next_entity_id.fetch_add(1, Ordering::SeqCst))
20    }
21
22    /// Reserve `n` contiguous global entity IDs with one fetch_add.
23    /// Caller assigns `id = EntityId::new(start + i)` per entity.
24    pub fn reserve_entity_ids(&self, n: u64) -> std::ops::Range<u64> {
25        let start = self.next_entity_id.fetch_add(n, Ordering::SeqCst);
26        start..start + n
27    }
28
29    pub(crate) fn register_entity_id(&self, id: EntityId) {
30        let candidate = id.raw().saturating_add(1);
31        let mut current = self.next_entity_id.load(Ordering::SeqCst);
32        while candidate > current {
33            match self.next_entity_id.compare_exchange(
34                current,
35                candidate,
36                Ordering::SeqCst,
37                Ordering::SeqCst,
38            ) {
39                Ok(_) => break,
40                Err(updated) => current = updated,
41            }
42        }
43    }
44
45    /// Load store from binary file
46    ///
47    /// Binary format:
48    /// ```text
49    /// [magic: 4 bytes "RDST"]
50    /// [version: u32]
51    /// [collection_count: varu32]
52    /// [collections...]
53    /// [cross_ref_count: varu32]
54    /// [cross_refs...]
55    /// ```
56    pub fn load_from_file(path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
57        let file = File::open(path)?;
58        let mut reader = BufReader::new(file);
59        let mut buf = Vec::new();
60        reader.read_to_end(&mut buf)?;
61
62        // Verify magic bytes "RDST" (RedDB Store)
63        if buf.len() < 8 {
64            return Err("File too small".into());
65        }
66        if &buf[0..4] != STORE_MAGIC {
67            return Err("Invalid magic bytes - expected RDST".into());
68        }
69        let mut pos = 4;
70
71        // Version check
72        let version = u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]]);
73        pos += 4;
74        if version != STORE_VERSION_V1
75            && version != STORE_VERSION_V2
76            && version != STORE_VERSION_V3
77            && version != STORE_VERSION_V4
78            && version != STORE_VERSION_V5
79            && version != STORE_VERSION_V6
80            && version != STORE_VERSION_V7
81            && version != STORE_VERSION_V8
82            && version != STORE_VERSION_V9
83        {
84            return Err(format!("Unsupported version: {}", version).into());
85        }
86
87        // V3+ has CRC32 footer — verify integrity before parsing
88        if version >= STORE_VERSION_V3 {
89            if buf.len() < 12 {
90                return Err("File too small for CRC32 verification".into());
91            }
92            let stored_crc = u32::from_le_bytes([
93                buf[buf.len() - 4],
94                buf[buf.len() - 3],
95                buf[buf.len() - 2],
96                buf[buf.len() - 1],
97            ]);
98            let computed_crc = crate::storage::engine::crc32::crc32(&buf[..buf.len() - 4]);
99            if stored_crc != computed_crc {
100                return Err("Binary store CRC32 mismatch — file corrupted".into());
101            }
102            // Trim the CRC footer so parsing doesn't read into it
103            buf.truncate(buf.len() - 4);
104        }
105
106        let store = Self::with_config(UnifiedStoreConfig::default());
107        store.set_format_version(version);
108
109        // Read collection count
110        let collection_count = read_varu32(&buf, &mut pos)
111            .map_err(|e| format!("Failed to read collection count: {:?}", e))?;
112
113        // Read each collection
114        for _ in 0..collection_count {
115            // Collection name
116            let name_len = read_varu32(&buf, &mut pos)
117                .map_err(|e| format!("Failed to read name length: {:?}", e))?
118                as usize;
119            let name = String::from_utf8(buf[pos..pos + name_len].to_vec())
120                .map_err(|e| format!("Invalid UTF-8 in collection name: {}", e))?;
121            pos += name_len;
122
123            // Entity count
124            let entity_count = read_varu32(&buf, &mut pos)
125                .map_err(|e| format!("Failed to read entity count: {:?}", e))?;
126
127            // Read each entity — V7+ includes metadata alongside entity data.
128            for _ in 0..entity_count {
129                if version >= STORE_VERSION_V7 {
130                    // Length-prefixed entity+metadata record (serialize_entity_record format)
131                    if pos + 4 > buf.len() {
132                        return Err("Truncated entity record length".into());
133                    }
134                    let record_len =
135                        u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]])
136                            as usize;
137                    pos += 4;
138                    if pos + record_len > buf.len() {
139                        return Err("Truncated entity record payload".into());
140                    }
141                    let record_bytes = &buf[pos..pos + record_len];
142                    pos += record_len;
143
144                    let (entity, metadata) = Self::deserialize_entity_record(record_bytes, version)
145                        .map_err(|e| format!("Entity record deserialization error: {e}"))?;
146
147                    store.insert_auto(&name, entity.clone())?;
148
149                    if let Some(metadata) = metadata {
150                        if let Some(manager) = store.get_collection(&name) {
151                            let _ = manager.set_metadata(entity.id, metadata);
152                        }
153                    }
154                } else {
155                    // V1–V6: entity only, no metadata
156                    let entity = Self::read_entity_binary(&buf, &mut pos, version)?;
157                    store.insert_auto(&name, entity)?;
158                }
159            }
160        }
161
162        if pos < buf.len() {
163            // Read cross-references section
164            let cross_ref_count = read_varu32(&buf, &mut pos)
165                .map_err(|e| format!("Failed to read cross-ref count: {:?}", e))?;
166
167            for _ in 0..cross_ref_count {
168                let source_id = read_varu64(&buf, &mut pos)
169                    .map_err(|e| format!("Failed to read source_id: {:?}", e))?;
170                let target_id = read_varu64(&buf, &mut pos)
171                    .map_err(|e| format!("Failed to read target_id: {:?}", e))?;
172                let ref_type_byte = buf[pos];
173                pos += 1;
174                let ref_type = RefType::from_byte(ref_type_byte);
175
176                let coll_len = read_varu32(&buf, &mut pos)
177                    .map_err(|e| format!("Failed to read collection length: {:?}", e))?
178                    as usize;
179                let collection = String::from_utf8(buf[pos..pos + coll_len].to_vec())
180                    .map_err(|e| format!("Invalid UTF-8 in collection: {}", e))?;
181                pos += coll_len;
182
183                let source_collection = store
184                    .get_any(EntityId::new(source_id))
185                    .map(|(name, _)| name)
186                    .unwrap_or_else(|| collection.clone());
187                let _ = store.add_cross_ref(
188                    &source_collection,
189                    EntityId::new(source_id),
190                    &collection,
191                    EntityId::new(target_id),
192                    ref_type,
193                    1.0,
194                );
195            }
196        }
197
198        if store.format_version() < STORE_VERSION_V9 {
199            store.set_format_version(STORE_VERSION_V9);
200        }
201
202        Ok(store)
203    }
204
205    /// Save store to binary file
206    ///
207    /// Uses compact binary encoding with varint for efficient storage.
208    /// No JSON - pure binary with pages and indices.
209    pub fn save_to_file(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
210        // Write to temp file first, then atomic rename
211        let tmp_path = path.with_extension("rdb-tmp");
212        let file = File::create(&tmp_path)?;
213        let mut writer = BufWriter::new(file);
214        let mut buf = Vec::new();
215
216        // Magic bytes "RDST"
217        buf.extend_from_slice(STORE_MAGIC);
218
219        // Version (9 — includes explicit table-row logical identity
220        // plus MVCC xmin/xmax alongside the V7 metadata envelope).
221        buf.extend_from_slice(&STORE_VERSION_V9.to_le_bytes());
222
223        // Get all collections
224        let collections = self.collections.read();
225        write_varu32(&mut buf, collections.len() as u32);
226
227        let fv = STORE_VERSION_V9;
228        for (name, manager) in collections.iter() {
229            // Collection name
230            write_varu32(&mut buf, name.len() as u32);
231            buf.extend_from_slice(name.as_bytes());
232
233            // Get all entities from this collection
234            let entities = manager.query_all(|_| true);
235            write_varu32(&mut buf, entities.len() as u32);
236
237            // V7+: serialize entity+metadata as a length-prefixed record.
238            // Each record: [u32 len][serialize_entity_record bytes]
239            for entity in entities {
240                let metadata = manager.get_metadata(entity.id);
241                let record = Self::serialize_entity_record(&entity, metadata.as_ref(), fv);
242                buf.extend_from_slice(&(record.len() as u32).to_le_bytes());
243                buf.extend_from_slice(&record);
244            }
245        }
246
247        // Write cross-references
248        let cross_refs = self.cross_refs.read();
249        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
250        write_varu32(&mut buf, total_refs as u32);
251
252        for (source_id, refs) in cross_refs.iter() {
253            for (target_id, ref_type, collection) in refs {
254                write_varu64(&mut buf, source_id.raw());
255                write_varu64(&mut buf, target_id.raw());
256                buf.push(ref_type.to_byte());
257                write_varu32(&mut buf, collection.len() as u32);
258                buf.extend_from_slice(collection.as_bytes());
259            }
260        }
261
262        self.set_format_version(STORE_VERSION_V9);
263
264        // Append CRC32 footer over entire content
265        let checksum = crate::storage::engine::crc32::crc32(&buf);
266        buf.extend_from_slice(&checksum.to_le_bytes());
267
268        writer.write_all(&buf)?;
269        writer.flush()?;
270        writer.get_ref().sync_all()?;
271        drop(writer);
272
273        // Atomic rename: tmp → final
274        std::fs::rename(&tmp_path, path)?;
275
276        // fsync parent directory for rename durability
277        if let Some(parent) = path.parent() {
278            if let Ok(dir) = File::open(parent) {
279                let _ = dir.sync_all();
280            }
281        }
282
283        Ok(())
284    }
285
286    /// Read entity from binary buffer
287    pub(crate) fn read_entity_binary(
288        buf: &[u8],
289        pos: &mut usize,
290        format_version: u32,
291    ) -> Result<UnifiedEntity, Box<dyn std::error::Error>> {
292        // Entity ID
293        let id = read_varu64(buf, pos).map_err(|e| format!("Failed to read entity id: {:?}", e))?;
294
295        // EntityKind type byte
296        let kind_type = buf[*pos];
297        *pos += 1;
298
299        // EntityKind details
300        let kind = match kind_type {
301            0 => {
302                // TableRow
303                let table_len = Self::read_varu32_safe(buf, pos)?;
304                let table = String::from_utf8(buf[*pos..*pos + table_len].to_vec())?;
305                *pos += table_len;
306                let row_id = Self::read_varu64_safe(buf, pos)?;
307                EntityKind::TableRow {
308                    table: table.into(),
309                    row_id,
310                }
311            }
312            1 => {
313                // GraphNode
314                let label_len = Self::read_varu32_safe(buf, pos)?;
315                let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
316                *pos += label_len;
317                let node_type_len = Self::read_varu32_safe(buf, pos)?;
318                let node_type = String::from_utf8(buf[*pos..*pos + node_type_len].to_vec())?;
319                *pos += node_type_len;
320                EntityKind::GraphNode(Box::new(GraphNodeKind { label, node_type }))
321            }
322            2 => {
323                // GraphEdge
324                let label_len = Self::read_varu32_safe(buf, pos)?;
325                let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
326                *pos += label_len;
327                let from_node_len = Self::read_varu32_safe(buf, pos)?;
328                let from_node = String::from_utf8(buf[*pos..*pos + from_node_len].to_vec())?;
329                *pos += from_node_len;
330                let to_node_len = Self::read_varu32_safe(buf, pos)?;
331                let to_node = String::from_utf8(buf[*pos..*pos + to_node_len].to_vec())?;
332                *pos += to_node_len;
333                let weight =
334                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
335                *pos += 4;
336                EntityKind::GraphEdge(Box::new(GraphEdgeKind {
337                    label,
338                    from_node,
339                    to_node,
340                    weight,
341                }))
342            }
343            3 => {
344                // Vector
345                let collection_len = Self::read_varu32_safe(buf, pos)?;
346                let collection = String::from_utf8(buf[*pos..*pos + collection_len].to_vec())?;
347                *pos += collection_len;
348                EntityKind::Vector { collection }
349            }
350            4 => {
351                // TimeSeriesPoint
352                let series_len = Self::read_varu32_safe(buf, pos)?;
353                let series = String::from_utf8(buf[*pos..*pos + series_len].to_vec())?;
354                *pos += series_len;
355                let metric_len = Self::read_varu32_safe(buf, pos)?;
356                let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
357                *pos += metric_len;
358                EntityKind::TimeSeriesPoint(Box::new(TimeSeriesPointKind { series, metric }))
359            }
360            5 => {
361                // QueueMessage
362                let queue_len = Self::read_varu32_safe(buf, pos)?;
363                let queue = String::from_utf8(buf[*pos..*pos + queue_len].to_vec())?;
364                *pos += queue_len;
365                let position = Self::read_varu64_safe(buf, pos)?;
366                EntityKind::QueueMessage { queue, position }
367            }
368            _ => return Err(format!("Unknown EntityKind type: {}", kind_type).into()),
369        };
370
371        // EntityData type byte
372        let data_type = buf[*pos];
373        *pos += 1;
374
375        // EntityData
376        let mut data = match data_type {
377            0 => {
378                // Row
379                let col_count = Self::read_varu32_safe(buf, pos)?;
380                let mut columns = Vec::with_capacity(col_count);
381                for _ in 0..col_count {
382                    columns.push(Self::read_value_binary(buf, pos)?);
383                }
384                EntityData::Row(RowData::new(columns))
385            }
386            1 => {
387                // Node
388                let prop_count = Self::read_varu32_safe(buf, pos)?;
389                let mut properties = HashMap::new();
390                for _ in 0..prop_count {
391                    let key_len = Self::read_varu32_safe(buf, pos)?;
392                    let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
393                    *pos += key_len;
394                    let value = Self::read_value_binary(buf, pos)?;
395                    properties.insert(key, value);
396                }
397                EntityData::Node(NodeData::with_properties(properties))
398            }
399            2 => {
400                // Edge
401                let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
402                *pos += 4;
403                let weight = f32::from_le_bytes(weight_bytes);
404                let prop_count = Self::read_varu32_safe(buf, pos)?;
405                let mut properties = HashMap::new();
406                for _ in 0..prop_count {
407                    let key_len = Self::read_varu32_safe(buf, pos)?;
408                    let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
409                    *pos += key_len;
410                    let value = Self::read_value_binary(buf, pos)?;
411                    properties.insert(key, value);
412                }
413                let mut edge = EdgeData::new(weight);
414                edge.properties = properties;
415                EntityData::Edge(edge)
416            }
417            3 => {
418                // Vector
419                let dim = Self::read_varu32_safe(buf, pos)?;
420                let mut dense = Vec::with_capacity(dim);
421                for _ in 0..dim {
422                    let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
423                    *pos += 4;
424                    dense.push(f32::from_le_bytes(bytes));
425                }
426                let mut vector = VectorData::new(dense);
427                if format_version >= STORE_VERSION_V6 {
428                    let has_content = buf[*pos] != 0;
429                    *pos += 1;
430                    if has_content {
431                        let content_len = Self::read_varu32_safe(buf, pos)?;
432                        vector.content =
433                            Some(String::from_utf8(buf[*pos..*pos + content_len].to_vec())?);
434                        *pos += content_len;
435                    }
436                }
437                EntityData::Vector(vector)
438            }
439            4 => {
440                // TimeSeries
441                let metric_len = Self::read_varu32_safe(buf, pos)?;
442                let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
443                *pos += metric_len;
444                let timestamp_ns = Self::read_varu64_safe(buf, pos)?;
445                let value_bytes = [
446                    buf[*pos],
447                    buf[*pos + 1],
448                    buf[*pos + 2],
449                    buf[*pos + 3],
450                    buf[*pos + 4],
451                    buf[*pos + 5],
452                    buf[*pos + 6],
453                    buf[*pos + 7],
454                ];
455                *pos += 8;
456                let mut tags = HashMap::new();
457                if format_version >= STORE_VERSION_V5 {
458                    let tag_count = Self::read_varu32_safe(buf, pos)?;
459                    tags = HashMap::with_capacity(tag_count);
460                    for _ in 0..tag_count {
461                        let key_len = Self::read_varu32_safe(buf, pos)?;
462                        let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
463                        *pos += key_len;
464                        let value_len = Self::read_varu32_safe(buf, pos)?;
465                        let value = String::from_utf8(buf[*pos..*pos + value_len].to_vec())?;
466                        *pos += value_len;
467                        tags.insert(key, value);
468                    }
469                }
470                EntityData::TimeSeries(crate::storage::unified::entity::TimeSeriesData {
471                    metric,
472                    timestamp_ns,
473                    value: f64::from_le_bytes(value_bytes),
474                    tags,
475                })
476            }
477            5 => {
478                // QueueMessage
479                let payload = Self::read_value_binary(buf, pos)?;
480                let enqueued_at_ns = Self::read_varu64_safe(buf, pos)?;
481                let attempts = Self::read_varu32_safe(buf, pos)? as u32;
482                EntityData::QueueMessage(crate::storage::unified::entity::QueueMessageData {
483                    payload,
484                    priority: None,
485                    enqueued_at_ns,
486                    attempts,
487                    max_attempts: 3,
488                    acked: false,
489                })
490            }
491            6 => {
492                // Row with named HashMap
493                let field_count = Self::read_varu32_safe(buf, pos)?;
494                let mut named = HashMap::with_capacity(field_count);
495                for _ in 0..field_count {
496                    let key_len = Self::read_varu32_safe(buf, pos)?;
497                    let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
498                    *pos += key_len;
499                    let value = Self::read_value_binary(buf, pos)?;
500                    named.insert(key, value);
501                }
502                EntityData::Row(RowData {
503                    columns: Vec::new(),
504                    named: Some(named),
505                    schema: None,
506                })
507            }
508            _ => return Err(format!("Unknown EntityData type: {}", data_type).into()),
509        };
510
511        // Timestamps
512        let created_at = Self::read_varu64_safe(buf, pos)?;
513        let updated_at = Self::read_varu64_safe(buf, pos)?;
514
515        // Embeddings count
516        let embedding_count = Self::read_varu32_safe(buf, pos)?;
517        let mut embeddings = Vec::with_capacity(embedding_count);
518        for _ in 0..embedding_count {
519            let name_len = Self::read_varu32_safe(buf, pos)?;
520            let name = String::from_utf8(buf[*pos..*pos + name_len].to_vec())?;
521            *pos += name_len;
522
523            let dim = Self::read_varu32_safe(buf, pos)?;
524            let mut vector = Vec::with_capacity(dim);
525            for _ in 0..dim {
526                let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
527                *pos += 4;
528                vector.push(f32::from_le_bytes(bytes));
529            }
530
531            let model_len = Self::read_varu32_safe(buf, pos)?;
532            let model = String::from_utf8(buf[*pos..*pos + model_len].to_vec())?;
533            *pos += model_len;
534
535            embeddings.push(EmbeddingSlot::new(name, vector, model));
536        }
537
538        // Cross-refs count
539        let cross_ref_count = Self::read_varu32_safe(buf, pos)?;
540        let mut cross_refs = Vec::with_capacity(cross_ref_count);
541        for _ in 0..cross_ref_count {
542            let source = Self::read_varu64_safe(buf, pos)?;
543            let target = Self::read_varu64_safe(buf, pos)?;
544            let ref_type_byte = buf[*pos];
545            *pos += 1;
546            let (target_collection, weight, created_at) = if format_version >= STORE_VERSION_V2 {
547                let coll_len = Self::read_varu32_safe(buf, pos)?;
548                let collection = String::from_utf8(buf[*pos..*pos + coll_len].to_vec())?;
549                *pos += coll_len;
550                let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
551                *pos += 4;
552                let weight = f32::from_le_bytes(weight_bytes);
553                let created_at = Self::read_varu64_safe(buf, pos)?;
554                (collection, weight, created_at)
555            } else {
556                (String::new(), 1.0, 0)
557            };
558
559            let mut cross_ref = CrossRef::new(
560                EntityId::new(source),
561                EntityId::new(target),
562                target_collection,
563                RefType::from_byte(ref_type_byte),
564            );
565            cross_ref.weight = weight;
566            cross_ref.created_at = created_at;
567            cross_refs.push(cross_ref);
568        }
569
570        // Sequence ID
571        let sequence_id = Self::read_varu64_safe(buf, pos)?;
572
573        if format_version >= STORE_VERSION_V4 {
574            if let EntityData::QueueMessage(message) = &mut data {
575                if *pos < buf.len() {
576                    let priority_present = buf[*pos] != 0;
577                    *pos += 1;
578                    message.priority = if priority_present {
579                        if *pos + 4 > buf.len() {
580                            return Err("truncated queue priority".into());
581                        }
582                        let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
583                        *pos += 4;
584                        Some(i32::from_le_bytes(bytes))
585                    } else {
586                        None
587                    };
588                    message.max_attempts = Self::read_varu32_safe(buf, pos)? as u32;
589                    if *pos >= buf.len() {
590                        return Err("truncated queue ack flag".into());
591                    }
592                    message.acked = buf[*pos] != 0;
593                    *pos += 1;
594                }
595            }
596        }
597
598        let mut entity = UnifiedEntity::new(EntityId::new(id), kind, data);
599        entity.created_at = created_at;
600        entity.updated_at = updated_at;
601        entity.sequence_id = sequence_id;
602        if format_version >= STORE_VERSION_V8 && *pos < buf.len() {
603            let has_logical_id = buf[*pos] != 0;
604            *pos += 1;
605            if has_logical_id {
606                let logical_id = Self::read_varu64_safe(buf, pos)?;
607                entity.set_logical_id(EntityId::new(logical_id));
608            }
609        }
610        if format_version >= STORE_VERSION_V9 && *pos < buf.len() {
611            let xmin = Self::read_varu64_safe(buf, pos)?;
612            let xmax = Self::read_varu64_safe(buf, pos)?;
613            entity.set_xmin(xmin);
614            entity.set_xmax(xmax);
615        }
616        if !embeddings.is_empty() || !cross_refs.is_empty() {
617            entity.embeddings_mut().extend(embeddings);
618            entity.cross_refs_mut().extend(cross_refs);
619        }
620
621        Ok(entity)
622    }
623
624    /// Safe varu32 reader that converts DecodeError to Box<dyn Error>
625    fn read_varu32_safe(buf: &[u8], pos: &mut usize) -> Result<usize, Box<dyn std::error::Error>> {
626        read_varu32(buf, pos)
627            .map(|v| v as usize)
628            .map_err(|e| format!("Decode error: {:?}", e).into())
629    }
630
631    /// Safe varu64 reader that converts DecodeError to Box<dyn Error>
632    fn read_varu64_safe(buf: &[u8], pos: &mut usize) -> Result<u64, Box<dyn std::error::Error>> {
633        read_varu64(buf, pos).map_err(|e| format!("Decode error: {:?}", e).into())
634    }
635
636    /// Write entity to binary buffer
637    pub(crate) fn write_entity_binary(
638        buf: &mut Vec<u8>,
639        entity: &UnifiedEntity,
640        format_version: u32,
641    ) {
642        // Entity ID
643        write_varu64(buf, entity.id.raw());
644
645        // EntityKind
646        match &entity.kind {
647            EntityKind::TableRow { table, row_id } => {
648                buf.push(0);
649                write_varu32(buf, table.len() as u32);
650                buf.extend_from_slice(table.as_bytes());
651                write_varu64(buf, *row_id);
652            }
653            EntityKind::GraphNode(ref node) => {
654                buf.push(1);
655                write_varu32(buf, node.label.len() as u32);
656                buf.extend_from_slice(node.label.as_bytes());
657                write_varu32(buf, node.node_type.len() as u32);
658                buf.extend_from_slice(node.node_type.as_bytes());
659            }
660            EntityKind::GraphEdge(ref edge) => {
661                buf.push(2);
662                write_varu32(buf, edge.label.len() as u32);
663                buf.extend_from_slice(edge.label.as_bytes());
664                write_varu32(buf, edge.from_node.len() as u32);
665                buf.extend_from_slice(edge.from_node.as_bytes());
666                write_varu32(buf, edge.to_node.len() as u32);
667                buf.extend_from_slice(edge.to_node.as_bytes());
668                buf.extend_from_slice(&edge.weight.to_le_bytes());
669            }
670            EntityKind::Vector { collection } => {
671                buf.push(3);
672                write_varu32(buf, collection.len() as u32);
673                buf.extend_from_slice(collection.as_bytes());
674            }
675            EntityKind::TimeSeriesPoint(ref ts) => {
676                buf.push(4);
677                write_varu32(buf, ts.series.len() as u32);
678                buf.extend_from_slice(ts.series.as_bytes());
679                write_varu32(buf, ts.metric.len() as u32);
680                buf.extend_from_slice(ts.metric.as_bytes());
681            }
682            EntityKind::QueueMessage { queue, position } => {
683                buf.push(5);
684                write_varu32(buf, queue.len() as u32);
685                buf.extend_from_slice(queue.as_bytes());
686                write_varu64(buf, *position);
687            }
688        }
689
690        // EntityData
691        match &entity.data {
692            EntityData::Row(row) => {
693                if let Some(ref named) = row.named {
694                    // Named row: type 6 = Row with named HashMap.
695                    // Sort by key so the on-disk byte sequence is
696                    // deterministic — replication relies on hashing the
697                    // serialized record to detect divergence, and a
698                    // HashMap-iteration-order race produces spurious
699                    // mismatches.
700                    buf.push(6);
701                    write_varu32(buf, named.len() as u32);
702                    let mut entries: Vec<_> = named.iter().collect();
703                    entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
704                    for (key, value) in entries {
705                        write_varu32(buf, key.len() as u32);
706                        buf.extend_from_slice(key.as_bytes());
707                        Self::write_value_binary(buf, value);
708                    }
709                } else {
710                    buf.push(0);
711                    write_varu32(buf, row.columns.len() as u32);
712                    for col in &row.columns {
713                        Self::write_value_binary(buf, col);
714                    }
715                }
716            }
717            EntityData::Node(node) => {
718                buf.push(1);
719                write_varu32(buf, node.properties.len() as u32);
720                let mut entries: Vec<_> = node.properties.iter().collect();
721                entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
722                for (key, value) in entries {
723                    write_varu32(buf, key.len() as u32);
724                    buf.extend_from_slice(key.as_bytes());
725                    Self::write_value_binary(buf, value);
726                }
727            }
728            EntityData::Edge(edge) => {
729                buf.push(2);
730                buf.extend_from_slice(&edge.weight.to_le_bytes());
731                write_varu32(buf, edge.properties.len() as u32);
732                let mut entries: Vec<_> = edge.properties.iter().collect();
733                entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
734                for (key, value) in entries {
735                    write_varu32(buf, key.len() as u32);
736                    buf.extend_from_slice(key.as_bytes());
737                    Self::write_value_binary(buf, value);
738                }
739            }
740            EntityData::Vector(vec) => {
741                buf.push(3);
742                write_varu32(buf, vec.dense.len() as u32);
743                for f in &vec.dense {
744                    buf.extend_from_slice(&f.to_le_bytes());
745                }
746                if format_version >= STORE_VERSION_V6 {
747                    buf.push(u8::from(vec.content.is_some()));
748                    if let Some(content) = &vec.content {
749                        write_varu32(buf, content.len() as u32);
750                        buf.extend_from_slice(content.as_bytes());
751                    }
752                }
753            }
754            EntityData::TimeSeries(ts) => {
755                buf.push(4);
756                write_varu32(buf, ts.metric.len() as u32);
757                buf.extend_from_slice(ts.metric.as_bytes());
758                write_varu64(buf, ts.timestamp_ns);
759                buf.extend_from_slice(&ts.value.to_le_bytes());
760                if format_version >= STORE_VERSION_V5 {
761                    write_varu32(buf, ts.tags.len() as u32);
762                    let mut tag_entries: Vec<_> = ts.tags.iter().collect();
763                    tag_entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
764                    for (key, value) in tag_entries {
765                        write_varu32(buf, key.len() as u32);
766                        buf.extend_from_slice(key.as_bytes());
767                        write_varu32(buf, value.len() as u32);
768                        buf.extend_from_slice(value.as_bytes());
769                    }
770                }
771            }
772            EntityData::QueueMessage(msg) => {
773                buf.push(5);
774                Self::write_value_binary(buf, &msg.payload);
775                write_varu64(buf, msg.enqueued_at_ns);
776                write_varu32(buf, msg.attempts);
777            }
778        }
779
780        // Timestamps
781        write_varu64(buf, entity.created_at);
782        write_varu64(buf, entity.updated_at);
783
784        // Embeddings
785        write_varu32(buf, entity.embeddings().len() as u32);
786        for emb in entity.embeddings() {
787            write_varu32(buf, emb.name.len() as u32);
788            buf.extend_from_slice(emb.name.as_bytes());
789            write_varu32(buf, emb.vector.len() as u32);
790            for f in &emb.vector {
791                buf.extend_from_slice(&f.to_le_bytes());
792            }
793            write_varu32(buf, emb.model.len() as u32);
794            buf.extend_from_slice(emb.model.as_bytes());
795        }
796
797        // Cross-refs
798        write_varu32(buf, entity.cross_refs().len() as u32);
799        for cross_ref in entity.cross_refs() {
800            write_varu64(buf, cross_ref.source.raw());
801            write_varu64(buf, cross_ref.target.raw());
802            buf.push(cross_ref.ref_type.to_byte());
803            if format_version >= STORE_VERSION_V2 {
804                write_varu32(buf, cross_ref.target_collection.len() as u32);
805                buf.extend_from_slice(cross_ref.target_collection.as_bytes());
806                buf.extend_from_slice(&cross_ref.weight.to_le_bytes());
807                write_varu64(buf, cross_ref.created_at);
808            }
809        }
810
811        // Sequence ID
812        write_varu64(buf, entity.sequence_id);
813
814        if format_version >= STORE_VERSION_V4 {
815            if let EntityData::QueueMessage(message) = &entity.data {
816                buf.push(u8::from(message.priority.is_some()));
817                if let Some(priority) = message.priority {
818                    buf.extend_from_slice(&priority.to_le_bytes());
819                }
820                write_varu32(buf, message.max_attempts);
821                buf.push(u8::from(message.acked));
822            }
823        }
824
825        if format_version >= STORE_VERSION_V8 {
826            buf.push(u8::from(entity.has_explicit_logical_id()));
827            if entity.has_explicit_logical_id() {
828                write_varu64(buf, entity.logical_id().raw());
829            }
830        }
831        if format_version >= STORE_VERSION_V9 {
832            write_varu64(buf, entity.xmin);
833            write_varu64(buf, entity.xmax);
834        }
835    }
836
837    /// Read a Value from binary buffer
838    /// Type bytes: 0=Null, 1=Boolean, 2=Integer, 3=UnsignedInteger, 4=Float,
839    /// 5=Text, 6=Blob, 7=Timestamp, 8=Duration, 9=IpAddr, 10=MacAddr,
840    /// 11=Vector, 12=Json, 13=Uuid, 14=NodeRef, 15=EdgeRef, 16=VectorRef, 17=RowRef
841    fn read_value_binary(buf: &[u8], pos: &mut usize) -> Result<Value, Box<dyn std::error::Error>> {
842        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
843
844        let type_byte = buf[*pos];
845        *pos += 1;
846
847        Ok(match type_byte {
848            0 => Value::Null,
849            1 => {
850                let b = buf[*pos] != 0;
851                *pos += 1;
852                Value::Boolean(b)
853            }
854            2 => {
855                let val = i64::from_le_bytes([
856                    buf[*pos],
857                    buf[*pos + 1],
858                    buf[*pos + 2],
859                    buf[*pos + 3],
860                    buf[*pos + 4],
861                    buf[*pos + 5],
862                    buf[*pos + 6],
863                    buf[*pos + 7],
864                ]);
865                *pos += 8;
866                Value::Integer(val)
867            }
868            3 => {
869                let val = u64::from_le_bytes([
870                    buf[*pos],
871                    buf[*pos + 1],
872                    buf[*pos + 2],
873                    buf[*pos + 3],
874                    buf[*pos + 4],
875                    buf[*pos + 5],
876                    buf[*pos + 6],
877                    buf[*pos + 7],
878                ]);
879                *pos += 8;
880                Value::UnsignedInteger(val)
881            }
882            4 => {
883                let val = f64::from_le_bytes([
884                    buf[*pos],
885                    buf[*pos + 1],
886                    buf[*pos + 2],
887                    buf[*pos + 3],
888                    buf[*pos + 4],
889                    buf[*pos + 5],
890                    buf[*pos + 6],
891                    buf[*pos + 7],
892                ]);
893                *pos += 8;
894                Value::Float(val)
895            }
896            5 => {
897                let len = Self::read_varu32_safe(buf, pos)?;
898                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
899                *pos += len;
900                Value::text(s)
901            }
902            6 => {
903                let len = Self::read_varu32_safe(buf, pos)?;
904                let bytes = buf[*pos..*pos + len].to_vec();
905                *pos += len;
906                Value::Blob(bytes)
907            }
908            7 => {
909                let val = i64::from_le_bytes([
910                    buf[*pos],
911                    buf[*pos + 1],
912                    buf[*pos + 2],
913                    buf[*pos + 3],
914                    buf[*pos + 4],
915                    buf[*pos + 5],
916                    buf[*pos + 6],
917                    buf[*pos + 7],
918                ]);
919                *pos += 8;
920                Value::Timestamp(val)
921            }
922            8 => {
923                let val = i64::from_le_bytes([
924                    buf[*pos],
925                    buf[*pos + 1],
926                    buf[*pos + 2],
927                    buf[*pos + 3],
928                    buf[*pos + 4],
929                    buf[*pos + 5],
930                    buf[*pos + 6],
931                    buf[*pos + 7],
932                ]);
933                *pos += 8;
934                Value::Duration(val)
935            }
936            9 => {
937                // IpAddr: first byte = version (4 or 6)
938                let version = buf[*pos];
939                *pos += 1;
940                if version == 4 {
941                    let octets = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
942                    *pos += 4;
943                    Value::IpAddr(IpAddr::V4(Ipv4Addr::from(octets)))
944                } else {
945                    let mut octets = [0u8; 16];
946                    octets.copy_from_slice(&buf[*pos..*pos + 16]);
947                    *pos += 16;
948                    Value::IpAddr(IpAddr::V6(Ipv6Addr::from(octets)))
949                }
950            }
951            10 => {
952                let mut mac = [0u8; 6];
953                mac.copy_from_slice(&buf[*pos..*pos + 6]);
954                *pos += 6;
955                Value::MacAddr(mac)
956            }
957            11 => {
958                let len = Self::read_varu32_safe(buf, pos)?;
959                let mut vector = Vec::with_capacity(len);
960                for _ in 0..len {
961                    let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
962                    *pos += 4;
963                    vector.push(f32::from_le_bytes(bytes));
964                }
965                Value::Vector(vector)
966            }
967            12 => {
968                let len = Self::read_varu32_safe(buf, pos)?;
969                let bytes = buf[*pos..*pos + len].to_vec();
970                *pos += len;
971                Value::Json(bytes)
972            }
973            13 => {
974                let mut uuid = [0u8; 16];
975                uuid.copy_from_slice(&buf[*pos..*pos + 16]);
976                *pos += 16;
977                Value::Uuid(uuid)
978            }
979            14 => {
980                let len = Self::read_varu32_safe(buf, pos)?;
981                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
982                *pos += len;
983                Value::NodeRef(s)
984            }
985            15 => {
986                let len = Self::read_varu32_safe(buf, pos)?;
987                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
988                *pos += len;
989                Value::EdgeRef(s)
990            }
991            16 => {
992                let len = Self::read_varu32_safe(buf, pos)?;
993                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
994                *pos += len;
995                let id = u64::from_le_bytes([
996                    buf[*pos],
997                    buf[*pos + 1],
998                    buf[*pos + 2],
999                    buf[*pos + 3],
1000                    buf[*pos + 4],
1001                    buf[*pos + 5],
1002                    buf[*pos + 6],
1003                    buf[*pos + 7],
1004                ]);
1005                *pos += 8;
1006                Value::VectorRef(s, id)
1007            }
1008            17 => {
1009                let len = Self::read_varu32_safe(buf, pos)?;
1010                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1011                *pos += len;
1012                let id = u64::from_le_bytes([
1013                    buf[*pos],
1014                    buf[*pos + 1],
1015                    buf[*pos + 2],
1016                    buf[*pos + 3],
1017                    buf[*pos + 4],
1018                    buf[*pos + 5],
1019                    buf[*pos + 6],
1020                    buf[*pos + 7],
1021                ]);
1022                *pos += 8;
1023                Value::RowRef(s, id)
1024            }
1025            18 => {
1026                let rgb = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1027                *pos += 3;
1028                Value::Color(rgb)
1029            }
1030            19 => {
1031                let len = Self::read_varu32_safe(buf, pos)?;
1032                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1033                *pos += len;
1034                Value::Email(s)
1035            }
1036            20 => {
1037                let len = Self::read_varu32_safe(buf, pos)?;
1038                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1039                *pos += len;
1040                Value::Url(s)
1041            }
1042            21 => {
1043                let val = u64::from_le_bytes([
1044                    buf[*pos],
1045                    buf[*pos + 1],
1046                    buf[*pos + 2],
1047                    buf[*pos + 3],
1048                    buf[*pos + 4],
1049                    buf[*pos + 5],
1050                    buf[*pos + 6],
1051                    buf[*pos + 7],
1052                ]);
1053                *pos += 8;
1054                Value::Phone(val)
1055            }
1056            22 => {
1057                let val =
1058                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1059                *pos += 4;
1060                Value::Semver(val)
1061            }
1062            23 => {
1063                let ip =
1064                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1065                *pos += 4;
1066                let prefix = buf[*pos];
1067                *pos += 1;
1068                Value::Cidr(ip, prefix)
1069            }
1070            24 => {
1071                let val =
1072                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1073                *pos += 4;
1074                Value::Date(val)
1075            }
1076            25 => {
1077                let val =
1078                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1079                *pos += 4;
1080                Value::Time(val)
1081            }
1082            26 => {
1083                let val = i64::from_le_bytes([
1084                    buf[*pos],
1085                    buf[*pos + 1],
1086                    buf[*pos + 2],
1087                    buf[*pos + 3],
1088                    buf[*pos + 4],
1089                    buf[*pos + 5],
1090                    buf[*pos + 6],
1091                    buf[*pos + 7],
1092                ]);
1093                *pos += 8;
1094                Value::Decimal(val)
1095            }
1096            27 => {
1097                let val = buf[*pos];
1098                *pos += 1;
1099                Value::EnumValue(val)
1100            }
1101            28 => {
1102                let len = Self::read_varu32_safe(buf, pos)?;
1103                let mut elems = Vec::with_capacity(len);
1104                for _ in 0..len {
1105                    elems.push(Self::read_value_binary(buf, pos)?);
1106                }
1107                Value::Array(elems)
1108            }
1109            29 => {
1110                let val = i64::from_le_bytes([
1111                    buf[*pos],
1112                    buf[*pos + 1],
1113                    buf[*pos + 2],
1114                    buf[*pos + 3],
1115                    buf[*pos + 4],
1116                    buf[*pos + 5],
1117                    buf[*pos + 6],
1118                    buf[*pos + 7],
1119                ]);
1120                *pos += 8;
1121                Value::TimestampMs(val)
1122            }
1123            30 => {
1124                let val =
1125                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1126                *pos += 4;
1127                Value::Ipv4(val)
1128            }
1129            31 => {
1130                let mut bytes = [0u8; 16];
1131                bytes.copy_from_slice(&buf[*pos..*pos + 16]);
1132                *pos += 16;
1133                Value::Ipv6(bytes)
1134            }
1135            32 => {
1136                let ip =
1137                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1138                *pos += 4;
1139                let mask =
1140                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1141                *pos += 4;
1142                Value::Subnet(ip, mask)
1143            }
1144            33 => {
1145                let val = u16::from_le_bytes([buf[*pos], buf[*pos + 1]]);
1146                *pos += 2;
1147                Value::Port(val)
1148            }
1149            34 => {
1150                let val =
1151                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1152                *pos += 4;
1153                Value::Latitude(val)
1154            }
1155            35 => {
1156                let val =
1157                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1158                *pos += 4;
1159                Value::Longitude(val)
1160            }
1161            36 => {
1162                let lat =
1163                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1164                *pos += 4;
1165                let lon =
1166                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1167                *pos += 4;
1168                Value::GeoPoint(lat, lon)
1169            }
1170            37 => {
1171                let c = [buf[*pos], buf[*pos + 1]];
1172                *pos += 2;
1173                Value::Country2(c)
1174            }
1175            38 => {
1176                let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1177                *pos += 3;
1178                Value::Country3(c)
1179            }
1180            39 => {
1181                let c = [buf[*pos], buf[*pos + 1]];
1182                *pos += 2;
1183                Value::Lang2(c)
1184            }
1185            40 => {
1186                let c = [
1187                    buf[*pos],
1188                    buf[*pos + 1],
1189                    buf[*pos + 2],
1190                    buf[*pos + 3],
1191                    buf[*pos + 4],
1192                ];
1193                *pos += 5;
1194                Value::Lang5(c)
1195            }
1196            41 => {
1197                let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1198                *pos += 3;
1199                Value::Currency(c)
1200            }
1201            42 => {
1202                let rgba = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
1203                *pos += 4;
1204                Value::ColorAlpha(rgba)
1205            }
1206            43 => {
1207                let val = i64::from_le_bytes([
1208                    buf[*pos],
1209                    buf[*pos + 1],
1210                    buf[*pos + 2],
1211                    buf[*pos + 3],
1212                    buf[*pos + 4],
1213                    buf[*pos + 5],
1214                    buf[*pos + 6],
1215                    buf[*pos + 7],
1216                ]);
1217                *pos += 8;
1218                Value::BigInt(val)
1219            }
1220            44 => {
1221                let col_len = Self::read_varu32_safe(buf, pos)?;
1222                let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1223                *pos += col_len;
1224                let key_len = Self::read_varu32_safe(buf, pos)?;
1225                let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
1226                *pos += key_len;
1227                Value::KeyRef(col, key)
1228            }
1229            45 => {
1230                let col_len = Self::read_varu32_safe(buf, pos)?;
1231                let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1232                *pos += col_len;
1233                let id = u64::from_le_bytes([
1234                    buf[*pos],
1235                    buf[*pos + 1],
1236                    buf[*pos + 2],
1237                    buf[*pos + 3],
1238                    buf[*pos + 4],
1239                    buf[*pos + 5],
1240                    buf[*pos + 6],
1241                    buf[*pos + 7],
1242                ]);
1243                *pos += 8;
1244                Value::DocRef(col, id)
1245            }
1246            46 => {
1247                let len = Self::read_varu32_safe(buf, pos)?;
1248                let name = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1249                *pos += len;
1250                Value::TableRef(name)
1251            }
1252            47 => {
1253                let val =
1254                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1255                *pos += 4;
1256                Value::PageRef(val)
1257            }
1258            48 => {
1259                let len = Self::read_varu32_safe(buf, pos)?;
1260                let bytes = buf[*pos..*pos + len].to_vec();
1261                *pos += len;
1262                Value::Secret(bytes)
1263            }
1264            49 => {
1265                let len = Self::read_varu32_safe(buf, pos)?;
1266                let hash = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1267                *pos += len;
1268                Value::Password(hash)
1269            }
1270            50 => {
1271                let len = Self::read_varu32_safe(buf, pos)?;
1272                let code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1273                *pos += len;
1274                Value::AssetCode(code)
1275            }
1276            51 => {
1277                let len = Self::read_varu32_safe(buf, pos)?;
1278                let asset_code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1279                *pos += len;
1280                let scale = buf[*pos];
1281                *pos += 1;
1282                let minor_units = i64::from_le_bytes([
1283                    buf[*pos],
1284                    buf[*pos + 1],
1285                    buf[*pos + 2],
1286                    buf[*pos + 3],
1287                    buf[*pos + 4],
1288                    buf[*pos + 5],
1289                    buf[*pos + 6],
1290                    buf[*pos + 7],
1291                ]);
1292                *pos += 8;
1293                Value::Money {
1294                    asset_code,
1295                    minor_units,
1296                    scale,
1297                }
1298            }
1299            // C3 TOAST: compressed Text (0x85) and Blob (0x86).
1300            0x85 => {
1301                let orig_len = Self::read_varu32_safe(buf, pos)?;
1302                let comp_len = Self::read_varu32_safe(buf, pos)?;
1303                let compressed = &buf[*pos..*pos + comp_len];
1304                *pos += comp_len;
1305                let mut out = vec![0u8; orig_len];
1306                zstd::bulk::decompress_to_buffer(compressed, &mut out)
1307                    .map_err(|e| format!("C3 Text decompress: {e}"))?;
1308                Value::text(String::from_utf8(out).map_err(|e| format!("C3 Text UTF-8: {e}"))?)
1309            }
1310            0x86 => {
1311                let orig_len = Self::read_varu32_safe(buf, pos)?;
1312                let comp_len = Self::read_varu32_safe(buf, pos)?;
1313                let compressed = &buf[*pos..*pos + comp_len];
1314                *pos += comp_len;
1315                let mut out = vec![0u8; orig_len];
1316                zstd::bulk::decompress_to_buffer(compressed, &mut out)
1317                    .map_err(|e| format!("C3 Blob decompress: {e}"))?;
1318                Value::Blob(out)
1319            }
1320            _ => return Err(format!("Unknown Value type: {}", type_byte).into()),
1321        })
1322    }
1323
1324    /// Write a Value to binary buffer
1325    /// Type bytes: 0=Null, 1=Boolean, 2=Integer, 3=UnsignedInteger, 4=Float,
1326    /// 5=Text, 6=Blob, 7=Timestamp, 8=Duration, 9=IpAddr, 10=MacAddr,
1327    /// 11=Vector, 12=Json, 13=Uuid, 14=NodeRef, 15=EdgeRef, 16=VectorRef, 17=RowRef
1328    fn write_value_binary(buf: &mut Vec<u8>, value: &Value) {
1329        use std::net::IpAddr;
1330
1331        match value {
1332            Value::Null => buf.push(0),
1333            Value::Boolean(b) => {
1334                buf.push(1);
1335                buf.push(if *b { 1 } else { 0 });
1336            }
1337            Value::Integer(i) => {
1338                buf.push(2);
1339                buf.extend_from_slice(&i.to_le_bytes());
1340            }
1341            Value::UnsignedInteger(u) => {
1342                buf.push(3);
1343                buf.extend_from_slice(&u.to_le_bytes());
1344            }
1345            Value::Float(f) => {
1346                buf.push(4);
1347                buf.extend_from_slice(&f.to_le_bytes());
1348            }
1349            Value::Text(s) => {
1350                // C3 TOAST: compress Text values > 2 KiB with zstd.
1351                // Type 0x85 = compressed Text; type 5 = uncompressed (existing).
1352                // Layout for 0x85: [0x85][orig_len: varuint][comp_len: varuint][compressed bytes]
1353                const TEXT_COMPRESS_THRESHOLD: usize = 2048;
1354                if s.len() >= TEXT_COMPRESS_THRESHOLD {
1355                    if let Ok(compressed) = zstd::bulk::compress(s.as_bytes(), 3) {
1356                        if compressed.len() < s.len() {
1357                            buf.push(0x85); // compressed Text marker
1358                            write_varu32(buf, s.len() as u32); // orig_len (decompression hint)
1359                            write_varu32(buf, compressed.len() as u32);
1360                            buf.extend_from_slice(&compressed);
1361                            return; // written — skip uncompressed path
1362                        }
1363                    }
1364                }
1365                buf.push(5);
1366                write_varu32(buf, s.len() as u32);
1367                buf.extend_from_slice(s.as_bytes());
1368            }
1369            Value::Blob(bytes) => {
1370                // C3 TOAST: compress Blob values > 2 KiB with zstd.
1371                // Type 0x86 = compressed Blob; type 6 = uncompressed (existing).
1372                const BLOB_COMPRESS_THRESHOLD: usize = 2048;
1373                if bytes.len() >= BLOB_COMPRESS_THRESHOLD {
1374                    if let Ok(compressed) = zstd::bulk::compress(bytes.as_slice(), 3) {
1375                        if compressed.len() < bytes.len() {
1376                            buf.push(0x86); // compressed Blob marker
1377                            write_varu32(buf, bytes.len() as u32);
1378                            write_varu32(buf, compressed.len() as u32);
1379                            buf.extend_from_slice(&compressed);
1380                            return;
1381                        }
1382                    }
1383                }
1384                buf.push(6);
1385                write_varu32(buf, bytes.len() as u32);
1386                buf.extend_from_slice(bytes);
1387            }
1388            Value::Timestamp(t) => {
1389                buf.push(7);
1390                buf.extend_from_slice(&t.to_le_bytes());
1391            }
1392            Value::Duration(d) => {
1393                buf.push(8);
1394                buf.extend_from_slice(&d.to_le_bytes());
1395            }
1396            Value::IpAddr(ip) => {
1397                buf.push(9);
1398                match ip {
1399                    IpAddr::V4(v4) => {
1400                        buf.push(4);
1401                        buf.extend_from_slice(&v4.octets());
1402                    }
1403                    IpAddr::V6(v6) => {
1404                        buf.push(6);
1405                        buf.extend_from_slice(&v6.octets());
1406                    }
1407                }
1408            }
1409            Value::MacAddr(mac) => {
1410                buf.push(10);
1411                buf.extend_from_slice(mac);
1412            }
1413            Value::Vector(vec) => {
1414                buf.push(11);
1415                write_varu32(buf, vec.len() as u32);
1416                for f in vec {
1417                    buf.extend_from_slice(&f.to_le_bytes());
1418                }
1419            }
1420            Value::Json(bytes) => {
1421                buf.push(12);
1422                write_varu32(buf, bytes.len() as u32);
1423                buf.extend_from_slice(bytes);
1424            }
1425            Value::Uuid(uuid) => {
1426                buf.push(13);
1427                buf.extend_from_slice(uuid);
1428            }
1429            Value::NodeRef(s) => {
1430                buf.push(14);
1431                write_varu32(buf, s.len() as u32);
1432                buf.extend_from_slice(s.as_bytes());
1433            }
1434            Value::EdgeRef(s) => {
1435                buf.push(15);
1436                write_varu32(buf, s.len() as u32);
1437                buf.extend_from_slice(s.as_bytes());
1438            }
1439            Value::VectorRef(s, id) => {
1440                buf.push(16);
1441                write_varu32(buf, s.len() as u32);
1442                buf.extend_from_slice(s.as_bytes());
1443                buf.extend_from_slice(&id.to_le_bytes());
1444            }
1445            Value::RowRef(s, id) => {
1446                buf.push(17);
1447                write_varu32(buf, s.len() as u32);
1448                buf.extend_from_slice(s.as_bytes());
1449                buf.extend_from_slice(&id.to_le_bytes());
1450            }
1451            Value::Color(rgb) => {
1452                buf.push(18);
1453                buf.extend_from_slice(rgb);
1454            }
1455            Value::Email(s) => {
1456                buf.push(19);
1457                write_varu32(buf, s.len() as u32);
1458                buf.extend_from_slice(s.as_bytes());
1459            }
1460            Value::Url(s) => {
1461                buf.push(20);
1462                write_varu32(buf, s.len() as u32);
1463                buf.extend_from_slice(s.as_bytes());
1464            }
1465            Value::Phone(n) => {
1466                buf.push(21);
1467                buf.extend_from_slice(&n.to_le_bytes());
1468            }
1469            Value::Semver(v) => {
1470                buf.push(22);
1471                buf.extend_from_slice(&v.to_le_bytes());
1472            }
1473            Value::Cidr(ip, prefix) => {
1474                buf.push(23);
1475                buf.extend_from_slice(&ip.to_le_bytes());
1476                buf.push(*prefix);
1477            }
1478            Value::Date(d) => {
1479                buf.push(24);
1480                buf.extend_from_slice(&d.to_le_bytes());
1481            }
1482            Value::Time(t) => {
1483                buf.push(25);
1484                buf.extend_from_slice(&t.to_le_bytes());
1485            }
1486            Value::Decimal(v) => {
1487                buf.push(26);
1488                buf.extend_from_slice(&v.to_le_bytes());
1489            }
1490            Value::EnumValue(i) => {
1491                buf.push(27);
1492                buf.push(*i);
1493            }
1494            Value::Array(elems) => {
1495                buf.push(28);
1496                write_varu32(buf, elems.len() as u32);
1497                for elem in elems {
1498                    Self::write_value_binary(buf, elem);
1499                }
1500            }
1501            Value::TimestampMs(v) => {
1502                buf.push(29);
1503                buf.extend_from_slice(&v.to_le_bytes());
1504            }
1505            Value::Ipv4(v) => {
1506                buf.push(30);
1507                buf.extend_from_slice(&v.to_le_bytes());
1508            }
1509            Value::Ipv6(bytes) => {
1510                buf.push(31);
1511                buf.extend_from_slice(bytes);
1512            }
1513            Value::Subnet(ip, mask) => {
1514                buf.push(32);
1515                buf.extend_from_slice(&ip.to_le_bytes());
1516                buf.extend_from_slice(&mask.to_le_bytes());
1517            }
1518            Value::Port(v) => {
1519                buf.push(33);
1520                buf.extend_from_slice(&v.to_le_bytes());
1521            }
1522            Value::Latitude(v) => {
1523                buf.push(34);
1524                buf.extend_from_slice(&v.to_le_bytes());
1525            }
1526            Value::Longitude(v) => {
1527                buf.push(35);
1528                buf.extend_from_slice(&v.to_le_bytes());
1529            }
1530            Value::GeoPoint(lat, lon) => {
1531                buf.push(36);
1532                buf.extend_from_slice(&lat.to_le_bytes());
1533                buf.extend_from_slice(&lon.to_le_bytes());
1534            }
1535            Value::Country2(c) => {
1536                buf.push(37);
1537                buf.extend_from_slice(c);
1538            }
1539            Value::Country3(c) => {
1540                buf.push(38);
1541                buf.extend_from_slice(c);
1542            }
1543            Value::Lang2(c) => {
1544                buf.push(39);
1545                buf.extend_from_slice(c);
1546            }
1547            Value::Lang5(c) => {
1548                buf.push(40);
1549                buf.extend_from_slice(c);
1550            }
1551            Value::Currency(c) => {
1552                buf.push(41);
1553                buf.extend_from_slice(c);
1554            }
1555            Value::AssetCode(code) => {
1556                buf.push(50);
1557                write_varu32(buf, code.len() as u32);
1558                buf.extend_from_slice(code.as_bytes());
1559            }
1560            Value::Money {
1561                asset_code,
1562                minor_units,
1563                scale,
1564            } => {
1565                buf.push(51);
1566                write_varu32(buf, asset_code.len() as u32);
1567                buf.extend_from_slice(asset_code.as_bytes());
1568                buf.push(*scale);
1569                buf.extend_from_slice(&minor_units.to_le_bytes());
1570            }
1571            Value::ColorAlpha(rgba) => {
1572                buf.push(42);
1573                buf.extend_from_slice(rgba);
1574            }
1575            Value::BigInt(v) => {
1576                buf.push(43);
1577                buf.extend_from_slice(&v.to_le_bytes());
1578            }
1579            Value::KeyRef(col, key) => {
1580                buf.push(44);
1581                write_varu32(buf, col.len() as u32);
1582                buf.extend_from_slice(col.as_bytes());
1583                write_varu32(buf, key.len() as u32);
1584                buf.extend_from_slice(key.as_bytes());
1585            }
1586            Value::DocRef(col, id) => {
1587                buf.push(45);
1588                write_varu32(buf, col.len() as u32);
1589                buf.extend_from_slice(col.as_bytes());
1590                buf.extend_from_slice(&id.to_le_bytes());
1591            }
1592            Value::TableRef(name) => {
1593                buf.push(46);
1594                write_varu32(buf, name.len() as u32);
1595                buf.extend_from_slice(name.as_bytes());
1596            }
1597            Value::PageRef(page_id) => {
1598                buf.push(47);
1599                buf.extend_from_slice(&page_id.to_le_bytes());
1600            }
1601            Value::Secret(bytes) => {
1602                buf.push(48);
1603                write_varu32(buf, bytes.len() as u32);
1604                buf.extend_from_slice(bytes);
1605            }
1606            Value::Password(hash) => {
1607                buf.push(49);
1608                write_varu32(buf, hash.len() as u32);
1609                buf.extend_from_slice(hash.as_bytes());
1610            }
1611        }
1612    }
1613}