Skip to main content

reddb_server/storage/unified/store/
impl_file.rs

1use super::*;
2
3impl UnifiedStore {
4    pub fn new() -> Self {
5        Self::with_config(UnifiedStoreConfig::default())
6    }
7
8    /// Get the current storage format version
9    pub fn format_version(&self) -> u32 {
10        self.format_version.load(Ordering::SeqCst)
11    }
12
13    pub(crate) fn set_format_version(&self, version: u32) {
14        self.format_version.store(version, Ordering::SeqCst);
15    }
16
17    /// Allocate a global entity ID
18    pub fn next_entity_id(&self) -> EntityId {
19        EntityId::new(self.next_entity_id.fetch_add(1, Ordering::SeqCst))
20    }
21
22    /// Reserve `n` contiguous global entity IDs with one fetch_add.
23    /// Caller assigns `id = EntityId::new(start + i)` per entity.
24    pub fn reserve_entity_ids(&self, n: u64) -> std::ops::Range<u64> {
25        let start = self.next_entity_id.fetch_add(n, Ordering::SeqCst);
26        start..start + n
27    }
28
29    pub(crate) fn register_entity_id(&self, id: EntityId) {
30        let candidate = id.raw().saturating_add(1);
31        let mut current = self.next_entity_id.load(Ordering::SeqCst);
32        while candidate > current {
33            match self.next_entity_id.compare_exchange(
34                current,
35                candidate,
36                Ordering::SeqCst,
37                Ordering::SeqCst,
38            ) {
39                Ok(_) => break,
40                Err(updated) => current = updated,
41            }
42        }
43    }
44
45    /// Load store from binary file
46    ///
47    /// Binary format:
48    /// ```text
49    /// [magic: 4 bytes "RDST"]
50    /// [version: u32]
51    /// [collection_count: varu32]
52    /// [collections...]
53    /// [cross_ref_count: varu32]
54    /// [cross_refs...]
55    /// ```
56    pub fn load_from_file(path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
57        let file = File::open(path)?;
58        let mut reader = BufReader::new(file);
59        let mut buf = Vec::new();
60        reader.read_to_end(&mut buf)?;
61
62        // Verify magic bytes "RDST" (RedDB Store)
63        if buf.len() < 8 {
64            return Err("File too small".into());
65        }
66        if &buf[0..4] != STORE_MAGIC {
67            return Err("Invalid magic bytes - expected RDST".into());
68        }
69        let mut pos = 4;
70
71        // Version check
72        let version = u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]]);
73        pos += 4;
74        if version != STORE_VERSION_V1
75            && version != STORE_VERSION_V2
76            && version != STORE_VERSION_V3
77            && version != STORE_VERSION_V4
78            && version != STORE_VERSION_V5
79            && version != STORE_VERSION_V6
80            && version != STORE_VERSION_V7
81        {
82            return Err(format!("Unsupported version: {}", version).into());
83        }
84
85        // V3+ has CRC32 footer — verify integrity before parsing
86        if version >= STORE_VERSION_V3 {
87            if buf.len() < 12 {
88                return Err("File too small for CRC32 verification".into());
89            }
90            let stored_crc = u32::from_le_bytes([
91                buf[buf.len() - 4],
92                buf[buf.len() - 3],
93                buf[buf.len() - 2],
94                buf[buf.len() - 1],
95            ]);
96            let computed_crc = crate::storage::engine::crc32::crc32(&buf[..buf.len() - 4]);
97            if stored_crc != computed_crc {
98                return Err("Binary store CRC32 mismatch — file corrupted".into());
99            }
100            // Trim the CRC footer so parsing doesn't read into it
101            buf.truncate(buf.len() - 4);
102        }
103
104        let store = Self::with_config(UnifiedStoreConfig::default());
105        store.set_format_version(version);
106
107        // Read collection count
108        let collection_count = read_varu32(&buf, &mut pos)
109            .map_err(|e| format!("Failed to read collection count: {:?}", e))?;
110
111        // Read each collection
112        for _ in 0..collection_count {
113            // Collection name
114            let name_len = read_varu32(&buf, &mut pos)
115                .map_err(|e| format!("Failed to read name length: {:?}", e))?
116                as usize;
117            let name = String::from_utf8(buf[pos..pos + name_len].to_vec())
118                .map_err(|e| format!("Invalid UTF-8 in collection name: {}", e))?;
119            pos += name_len;
120
121            // Entity count
122            let entity_count = read_varu32(&buf, &mut pos)
123                .map_err(|e| format!("Failed to read entity count: {:?}", e))?;
124
125            // Read each entity — V7+ includes metadata alongside entity data.
126            for _ in 0..entity_count {
127                if version >= STORE_VERSION_V7 {
128                    // Length-prefixed entity+metadata record (serialize_entity_record format)
129                    if pos + 4 > buf.len() {
130                        return Err("Truncated entity record length".into());
131                    }
132                    let record_len =
133                        u32::from_le_bytes([buf[pos], buf[pos + 1], buf[pos + 2], buf[pos + 3]])
134                            as usize;
135                    pos += 4;
136                    if pos + record_len > buf.len() {
137                        return Err("Truncated entity record payload".into());
138                    }
139                    let record_bytes = &buf[pos..pos + record_len];
140                    pos += record_len;
141
142                    let (entity, metadata) = Self::deserialize_entity_record(record_bytes, version)
143                        .map_err(|e| format!("Entity record deserialization error: {e}"))?;
144
145                    store.insert_auto(&name, entity.clone())?;
146
147                    if let Some(metadata) = metadata {
148                        if let Some(manager) = store.get_collection(&name) {
149                            let _ = manager.set_metadata(entity.id, metadata);
150                        }
151                    }
152                } else {
153                    // V1–V6: entity only, no metadata
154                    let entity = Self::read_entity_binary(&buf, &mut pos, version)?;
155                    store.insert_auto(&name, entity)?;
156                }
157            }
158        }
159
160        if pos < buf.len() {
161            // Read cross-references section
162            let cross_ref_count = read_varu32(&buf, &mut pos)
163                .map_err(|e| format!("Failed to read cross-ref count: {:?}", e))?;
164
165            for _ in 0..cross_ref_count {
166                let source_id = read_varu64(&buf, &mut pos)
167                    .map_err(|e| format!("Failed to read source_id: {:?}", e))?;
168                let target_id = read_varu64(&buf, &mut pos)
169                    .map_err(|e| format!("Failed to read target_id: {:?}", e))?;
170                let ref_type_byte = buf[pos];
171                pos += 1;
172                let ref_type = RefType::from_byte(ref_type_byte);
173
174                let coll_len = read_varu32(&buf, &mut pos)
175                    .map_err(|e| format!("Failed to read collection length: {:?}", e))?
176                    as usize;
177                let collection = String::from_utf8(buf[pos..pos + coll_len].to_vec())
178                    .map_err(|e| format!("Invalid UTF-8 in collection: {}", e))?;
179                pos += coll_len;
180
181                let source_collection = store
182                    .get_any(EntityId::new(source_id))
183                    .map(|(name, _)| name)
184                    .unwrap_or_else(|| collection.clone());
185                let _ = store.add_cross_ref(
186                    &source_collection,
187                    EntityId::new(source_id),
188                    &collection,
189                    EntityId::new(target_id),
190                    ref_type,
191                    1.0,
192                );
193            }
194        }
195
196        if store.format_version() < STORE_VERSION_V7 {
197            store.set_format_version(STORE_VERSION_V7);
198        }
199
200        Ok(store)
201    }
202
203    /// Save store to binary file
204    ///
205    /// Uses compact binary encoding with varint for efficient storage.
206    /// No JSON - pure binary with pages and indices.
207    pub fn save_to_file(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
208        // Write to temp file first, then atomic rename
209        let tmp_path = path.with_extension("rdb-tmp");
210        let file = File::create(&tmp_path)?;
211        let mut writer = BufWriter::new(file);
212        let mut buf = Vec::new();
213
214        // Magic bytes "RDST"
215        buf.extend_from_slice(STORE_MAGIC);
216
217        // Version (7 — includes entity-record metadata alongside the V6
218        // CRC/footer, queue tail, timeseries tag, and vector persistence work).
219        buf.extend_from_slice(&STORE_VERSION_V7.to_le_bytes());
220
221        // Get all collections
222        let collections = self.collections.read();
223        write_varu32(&mut buf, collections.len() as u32);
224
225        let fv = STORE_VERSION_V7;
226        for (name, manager) in collections.iter() {
227            // Collection name
228            write_varu32(&mut buf, name.len() as u32);
229            buf.extend_from_slice(name.as_bytes());
230
231            // Get all entities from this collection
232            let entities = manager.query_all(|_| true);
233            write_varu32(&mut buf, entities.len() as u32);
234
235            // V7+: serialize entity+metadata as a length-prefixed record.
236            // Each record: [u32 len][serialize_entity_record bytes]
237            for entity in entities {
238                let metadata = manager.get_metadata(entity.id);
239                let record = Self::serialize_entity_record(&entity, metadata.as_ref(), fv);
240                buf.extend_from_slice(&(record.len() as u32).to_le_bytes());
241                buf.extend_from_slice(&record);
242            }
243        }
244
245        // Write cross-references
246        let cross_refs = self.cross_refs.read();
247        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
248        write_varu32(&mut buf, total_refs as u32);
249
250        for (source_id, refs) in cross_refs.iter() {
251            for (target_id, ref_type, collection) in refs {
252                write_varu64(&mut buf, source_id.raw());
253                write_varu64(&mut buf, target_id.raw());
254                buf.push(ref_type.to_byte());
255                write_varu32(&mut buf, collection.len() as u32);
256                buf.extend_from_slice(collection.as_bytes());
257            }
258        }
259
260        self.set_format_version(STORE_VERSION_V7);
261
262        // Append CRC32 footer over entire content
263        let checksum = crate::storage::engine::crc32::crc32(&buf);
264        buf.extend_from_slice(&checksum.to_le_bytes());
265
266        writer.write_all(&buf)?;
267        writer.flush()?;
268        writer.get_ref().sync_all()?;
269        drop(writer);
270
271        // Atomic rename: tmp → final
272        std::fs::rename(&tmp_path, path)?;
273
274        // fsync parent directory for rename durability
275        if let Some(parent) = path.parent() {
276            if let Ok(dir) = File::open(parent) {
277                let _ = dir.sync_all();
278            }
279        }
280
281        Ok(())
282    }
283
284    /// Read entity from binary buffer
285    pub(crate) fn read_entity_binary(
286        buf: &[u8],
287        pos: &mut usize,
288        format_version: u32,
289    ) -> Result<UnifiedEntity, Box<dyn std::error::Error>> {
290        // Entity ID
291        let id = read_varu64(buf, pos).map_err(|e| format!("Failed to read entity id: {:?}", e))?;
292
293        // EntityKind type byte
294        let kind_type = buf[*pos];
295        *pos += 1;
296
297        // EntityKind details
298        let kind = match kind_type {
299            0 => {
300                // TableRow
301                let table_len = Self::read_varu32_safe(buf, pos)?;
302                let table = String::from_utf8(buf[*pos..*pos + table_len].to_vec())?;
303                *pos += table_len;
304                let row_id = Self::read_varu64_safe(buf, pos)?;
305                EntityKind::TableRow {
306                    table: table.into(),
307                    row_id,
308                }
309            }
310            1 => {
311                // GraphNode
312                let label_len = Self::read_varu32_safe(buf, pos)?;
313                let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
314                *pos += label_len;
315                let node_type_len = Self::read_varu32_safe(buf, pos)?;
316                let node_type = String::from_utf8(buf[*pos..*pos + node_type_len].to_vec())?;
317                *pos += node_type_len;
318                EntityKind::GraphNode(Box::new(GraphNodeKind { label, node_type }))
319            }
320            2 => {
321                // GraphEdge
322                let label_len = Self::read_varu32_safe(buf, pos)?;
323                let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
324                *pos += label_len;
325                let from_node_len = Self::read_varu32_safe(buf, pos)?;
326                let from_node = String::from_utf8(buf[*pos..*pos + from_node_len].to_vec())?;
327                *pos += from_node_len;
328                let to_node_len = Self::read_varu32_safe(buf, pos)?;
329                let to_node = String::from_utf8(buf[*pos..*pos + to_node_len].to_vec())?;
330                *pos += to_node_len;
331                let weight =
332                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
333                *pos += 4;
334                EntityKind::GraphEdge(Box::new(GraphEdgeKind {
335                    label,
336                    from_node,
337                    to_node,
338                    weight,
339                }))
340            }
341            3 => {
342                // Vector
343                let collection_len = Self::read_varu32_safe(buf, pos)?;
344                let collection = String::from_utf8(buf[*pos..*pos + collection_len].to_vec())?;
345                *pos += collection_len;
346                EntityKind::Vector { collection }
347            }
348            4 => {
349                // TimeSeriesPoint
350                let series_len = Self::read_varu32_safe(buf, pos)?;
351                let series = String::from_utf8(buf[*pos..*pos + series_len].to_vec())?;
352                *pos += series_len;
353                let metric_len = Self::read_varu32_safe(buf, pos)?;
354                let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
355                *pos += metric_len;
356                EntityKind::TimeSeriesPoint(Box::new(TimeSeriesPointKind { series, metric }))
357            }
358            5 => {
359                // QueueMessage
360                let queue_len = Self::read_varu32_safe(buf, pos)?;
361                let queue = String::from_utf8(buf[*pos..*pos + queue_len].to_vec())?;
362                *pos += queue_len;
363                let position = Self::read_varu64_safe(buf, pos)?;
364                EntityKind::QueueMessage { queue, position }
365            }
366            _ => return Err(format!("Unknown EntityKind type: {}", kind_type).into()),
367        };
368
369        // EntityData type byte
370        let data_type = buf[*pos];
371        *pos += 1;
372
373        // EntityData
374        let mut data = match data_type {
375            0 => {
376                // Row
377                let col_count = Self::read_varu32_safe(buf, pos)?;
378                let mut columns = Vec::with_capacity(col_count);
379                for _ in 0..col_count {
380                    columns.push(Self::read_value_binary(buf, pos)?);
381                }
382                EntityData::Row(RowData::new(columns))
383            }
384            1 => {
385                // Node
386                let prop_count = Self::read_varu32_safe(buf, pos)?;
387                let mut properties = HashMap::new();
388                for _ in 0..prop_count {
389                    let key_len = Self::read_varu32_safe(buf, pos)?;
390                    let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
391                    *pos += key_len;
392                    let value = Self::read_value_binary(buf, pos)?;
393                    properties.insert(key, value);
394                }
395                EntityData::Node(NodeData::with_properties(properties))
396            }
397            2 => {
398                // Edge
399                let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
400                *pos += 4;
401                let weight = f32::from_le_bytes(weight_bytes);
402                let prop_count = Self::read_varu32_safe(buf, pos)?;
403                let mut properties = HashMap::new();
404                for _ in 0..prop_count {
405                    let key_len = Self::read_varu32_safe(buf, pos)?;
406                    let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
407                    *pos += key_len;
408                    let value = Self::read_value_binary(buf, pos)?;
409                    properties.insert(key, value);
410                }
411                let mut edge = EdgeData::new(weight);
412                edge.properties = properties;
413                EntityData::Edge(edge)
414            }
415            3 => {
416                // Vector
417                let dim = Self::read_varu32_safe(buf, pos)?;
418                let mut dense = Vec::with_capacity(dim);
419                for _ in 0..dim {
420                    let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
421                    *pos += 4;
422                    dense.push(f32::from_le_bytes(bytes));
423                }
424                let mut vector = VectorData::new(dense);
425                if format_version >= STORE_VERSION_V6 {
426                    let has_content = buf[*pos] != 0;
427                    *pos += 1;
428                    if has_content {
429                        let content_len = Self::read_varu32_safe(buf, pos)?;
430                        vector.content =
431                            Some(String::from_utf8(buf[*pos..*pos + content_len].to_vec())?);
432                        *pos += content_len;
433                    }
434                }
435                EntityData::Vector(vector)
436            }
437            4 => {
438                // TimeSeries
439                let metric_len = Self::read_varu32_safe(buf, pos)?;
440                let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
441                *pos += metric_len;
442                let timestamp_ns = Self::read_varu64_safe(buf, pos)?;
443                let value_bytes = [
444                    buf[*pos],
445                    buf[*pos + 1],
446                    buf[*pos + 2],
447                    buf[*pos + 3],
448                    buf[*pos + 4],
449                    buf[*pos + 5],
450                    buf[*pos + 6],
451                    buf[*pos + 7],
452                ];
453                *pos += 8;
454                let mut tags = HashMap::new();
455                if format_version >= STORE_VERSION_V5 {
456                    let tag_count = Self::read_varu32_safe(buf, pos)?;
457                    tags = HashMap::with_capacity(tag_count);
458                    for _ in 0..tag_count {
459                        let key_len = Self::read_varu32_safe(buf, pos)?;
460                        let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
461                        *pos += key_len;
462                        let value_len = Self::read_varu32_safe(buf, pos)?;
463                        let value = String::from_utf8(buf[*pos..*pos + value_len].to_vec())?;
464                        *pos += value_len;
465                        tags.insert(key, value);
466                    }
467                }
468                EntityData::TimeSeries(crate::storage::unified::entity::TimeSeriesData {
469                    metric,
470                    timestamp_ns,
471                    value: f64::from_le_bytes(value_bytes),
472                    tags,
473                })
474            }
475            5 => {
476                // QueueMessage
477                let payload = Self::read_value_binary(buf, pos)?;
478                let enqueued_at_ns = Self::read_varu64_safe(buf, pos)?;
479                let attempts = Self::read_varu32_safe(buf, pos)? as u32;
480                EntityData::QueueMessage(crate::storage::unified::entity::QueueMessageData {
481                    payload,
482                    priority: None,
483                    enqueued_at_ns,
484                    attempts,
485                    max_attempts: 3,
486                    acked: false,
487                })
488            }
489            6 => {
490                // Row with named HashMap
491                let field_count = Self::read_varu32_safe(buf, pos)?;
492                let mut named = HashMap::with_capacity(field_count);
493                for _ in 0..field_count {
494                    let key_len = Self::read_varu32_safe(buf, pos)?;
495                    let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
496                    *pos += key_len;
497                    let value = Self::read_value_binary(buf, pos)?;
498                    named.insert(key, value);
499                }
500                EntityData::Row(RowData {
501                    columns: Vec::new(),
502                    named: Some(named),
503                    schema: None,
504                })
505            }
506            _ => return Err(format!("Unknown EntityData type: {}", data_type).into()),
507        };
508
509        // Timestamps
510        let created_at = Self::read_varu64_safe(buf, pos)?;
511        let updated_at = Self::read_varu64_safe(buf, pos)?;
512
513        // Embeddings count
514        let embedding_count = Self::read_varu32_safe(buf, pos)?;
515        let mut embeddings = Vec::with_capacity(embedding_count);
516        for _ in 0..embedding_count {
517            let name_len = Self::read_varu32_safe(buf, pos)?;
518            let name = String::from_utf8(buf[*pos..*pos + name_len].to_vec())?;
519            *pos += name_len;
520
521            let dim = Self::read_varu32_safe(buf, pos)?;
522            let mut vector = Vec::with_capacity(dim);
523            for _ in 0..dim {
524                let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
525                *pos += 4;
526                vector.push(f32::from_le_bytes(bytes));
527            }
528
529            let model_len = Self::read_varu32_safe(buf, pos)?;
530            let model = String::from_utf8(buf[*pos..*pos + model_len].to_vec())?;
531            *pos += model_len;
532
533            embeddings.push(EmbeddingSlot::new(name, vector, model));
534        }
535
536        // Cross-refs count
537        let cross_ref_count = Self::read_varu32_safe(buf, pos)?;
538        let mut cross_refs = Vec::with_capacity(cross_ref_count);
539        for _ in 0..cross_ref_count {
540            let source = Self::read_varu64_safe(buf, pos)?;
541            let target = Self::read_varu64_safe(buf, pos)?;
542            let ref_type_byte = buf[*pos];
543            *pos += 1;
544            let (target_collection, weight, created_at) = if format_version >= STORE_VERSION_V2 {
545                let coll_len = Self::read_varu32_safe(buf, pos)?;
546                let collection = String::from_utf8(buf[*pos..*pos + coll_len].to_vec())?;
547                *pos += coll_len;
548                let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
549                *pos += 4;
550                let weight = f32::from_le_bytes(weight_bytes);
551                let created_at = Self::read_varu64_safe(buf, pos)?;
552                (collection, weight, created_at)
553            } else {
554                (String::new(), 1.0, 0)
555            };
556
557            let mut cross_ref = CrossRef::new(
558                EntityId::new(source),
559                EntityId::new(target),
560                target_collection,
561                RefType::from_byte(ref_type_byte),
562            );
563            cross_ref.weight = weight;
564            cross_ref.created_at = created_at;
565            cross_refs.push(cross_ref);
566        }
567
568        // Sequence ID
569        let sequence_id = Self::read_varu64_safe(buf, pos)?;
570
571        if format_version >= STORE_VERSION_V4 {
572            if let EntityData::QueueMessage(message) = &mut data {
573                if *pos < buf.len() {
574                    let priority_present = buf[*pos] != 0;
575                    *pos += 1;
576                    message.priority = if priority_present {
577                        if *pos + 4 > buf.len() {
578                            return Err("truncated queue priority".into());
579                        }
580                        let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
581                        *pos += 4;
582                        Some(i32::from_le_bytes(bytes))
583                    } else {
584                        None
585                    };
586                    message.max_attempts = Self::read_varu32_safe(buf, pos)? as u32;
587                    if *pos >= buf.len() {
588                        return Err("truncated queue ack flag".into());
589                    }
590                    message.acked = buf[*pos] != 0;
591                    *pos += 1;
592                }
593            }
594        }
595
596        let mut entity = UnifiedEntity::new(EntityId::new(id), kind, data);
597        entity.created_at = created_at;
598        entity.updated_at = updated_at;
599        entity.sequence_id = sequence_id;
600        if !embeddings.is_empty() || !cross_refs.is_empty() {
601            entity.embeddings_mut().extend(embeddings);
602            entity.cross_refs_mut().extend(cross_refs);
603        }
604
605        Ok(entity)
606    }
607
608    /// Safe varu32 reader that converts DecodeError to Box<dyn Error>
609    fn read_varu32_safe(buf: &[u8], pos: &mut usize) -> Result<usize, Box<dyn std::error::Error>> {
610        read_varu32(buf, pos)
611            .map(|v| v as usize)
612            .map_err(|e| format!("Decode error: {:?}", e).into())
613    }
614
615    /// Safe varu64 reader that converts DecodeError to Box<dyn Error>
616    fn read_varu64_safe(buf: &[u8], pos: &mut usize) -> Result<u64, Box<dyn std::error::Error>> {
617        read_varu64(buf, pos).map_err(|e| format!("Decode error: {:?}", e).into())
618    }
619
620    /// Write entity to binary buffer
621    pub(crate) fn write_entity_binary(
622        buf: &mut Vec<u8>,
623        entity: &UnifiedEntity,
624        format_version: u32,
625    ) {
626        // Entity ID
627        write_varu64(buf, entity.id.raw());
628
629        // EntityKind
630        match &entity.kind {
631            EntityKind::TableRow { table, row_id } => {
632                buf.push(0);
633                write_varu32(buf, table.len() as u32);
634                buf.extend_from_slice(table.as_bytes());
635                write_varu64(buf, *row_id);
636            }
637            EntityKind::GraphNode(ref node) => {
638                buf.push(1);
639                write_varu32(buf, node.label.len() as u32);
640                buf.extend_from_slice(node.label.as_bytes());
641                write_varu32(buf, node.node_type.len() as u32);
642                buf.extend_from_slice(node.node_type.as_bytes());
643            }
644            EntityKind::GraphEdge(ref edge) => {
645                buf.push(2);
646                write_varu32(buf, edge.label.len() as u32);
647                buf.extend_from_slice(edge.label.as_bytes());
648                write_varu32(buf, edge.from_node.len() as u32);
649                buf.extend_from_slice(edge.from_node.as_bytes());
650                write_varu32(buf, edge.to_node.len() as u32);
651                buf.extend_from_slice(edge.to_node.as_bytes());
652                buf.extend_from_slice(&edge.weight.to_le_bytes());
653            }
654            EntityKind::Vector { collection } => {
655                buf.push(3);
656                write_varu32(buf, collection.len() as u32);
657                buf.extend_from_slice(collection.as_bytes());
658            }
659            EntityKind::TimeSeriesPoint(ref ts) => {
660                buf.push(4);
661                write_varu32(buf, ts.series.len() as u32);
662                buf.extend_from_slice(ts.series.as_bytes());
663                write_varu32(buf, ts.metric.len() as u32);
664                buf.extend_from_slice(ts.metric.as_bytes());
665            }
666            EntityKind::QueueMessage { queue, position } => {
667                buf.push(5);
668                write_varu32(buf, queue.len() as u32);
669                buf.extend_from_slice(queue.as_bytes());
670                write_varu64(buf, *position);
671            }
672        }
673
674        // EntityData
675        match &entity.data {
676            EntityData::Row(row) => {
677                if let Some(ref named) = row.named {
678                    // Named row: type 6 = Row with named HashMap.
679                    // Sort by key so the on-disk byte sequence is
680                    // deterministic — replication relies on hashing the
681                    // serialized record to detect divergence, and a
682                    // HashMap-iteration-order race produces spurious
683                    // mismatches.
684                    buf.push(6);
685                    write_varu32(buf, named.len() as u32);
686                    let mut entries: Vec<_> = named.iter().collect();
687                    entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
688                    for (key, value) in entries {
689                        write_varu32(buf, key.len() as u32);
690                        buf.extend_from_slice(key.as_bytes());
691                        Self::write_value_binary(buf, value);
692                    }
693                } else {
694                    buf.push(0);
695                    write_varu32(buf, row.columns.len() as u32);
696                    for col in &row.columns {
697                        Self::write_value_binary(buf, col);
698                    }
699                }
700            }
701            EntityData::Node(node) => {
702                buf.push(1);
703                write_varu32(buf, node.properties.len() as u32);
704                let mut entries: Vec<_> = node.properties.iter().collect();
705                entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
706                for (key, value) in entries {
707                    write_varu32(buf, key.len() as u32);
708                    buf.extend_from_slice(key.as_bytes());
709                    Self::write_value_binary(buf, value);
710                }
711            }
712            EntityData::Edge(edge) => {
713                buf.push(2);
714                buf.extend_from_slice(&edge.weight.to_le_bytes());
715                write_varu32(buf, edge.properties.len() as u32);
716                let mut entries: Vec<_> = edge.properties.iter().collect();
717                entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
718                for (key, value) in entries {
719                    write_varu32(buf, key.len() as u32);
720                    buf.extend_from_slice(key.as_bytes());
721                    Self::write_value_binary(buf, value);
722                }
723            }
724            EntityData::Vector(vec) => {
725                buf.push(3);
726                write_varu32(buf, vec.dense.len() as u32);
727                for f in &vec.dense {
728                    buf.extend_from_slice(&f.to_le_bytes());
729                }
730                if format_version >= STORE_VERSION_V6 {
731                    buf.push(u8::from(vec.content.is_some()));
732                    if let Some(content) = &vec.content {
733                        write_varu32(buf, content.len() as u32);
734                        buf.extend_from_slice(content.as_bytes());
735                    }
736                }
737            }
738            EntityData::TimeSeries(ts) => {
739                buf.push(4);
740                write_varu32(buf, ts.metric.len() as u32);
741                buf.extend_from_slice(ts.metric.as_bytes());
742                write_varu64(buf, ts.timestamp_ns);
743                buf.extend_from_slice(&ts.value.to_le_bytes());
744                if format_version >= STORE_VERSION_V5 {
745                    write_varu32(buf, ts.tags.len() as u32);
746                    let mut tag_entries: Vec<_> = ts.tags.iter().collect();
747                    tag_entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
748                    for (key, value) in tag_entries {
749                        write_varu32(buf, key.len() as u32);
750                        buf.extend_from_slice(key.as_bytes());
751                        write_varu32(buf, value.len() as u32);
752                        buf.extend_from_slice(value.as_bytes());
753                    }
754                }
755            }
756            EntityData::QueueMessage(msg) => {
757                buf.push(5);
758                Self::write_value_binary(buf, &msg.payload);
759                write_varu64(buf, msg.enqueued_at_ns);
760                write_varu32(buf, msg.attempts);
761            }
762        }
763
764        // Timestamps
765        write_varu64(buf, entity.created_at);
766        write_varu64(buf, entity.updated_at);
767
768        // Embeddings
769        write_varu32(buf, entity.embeddings().len() as u32);
770        for emb in entity.embeddings() {
771            write_varu32(buf, emb.name.len() as u32);
772            buf.extend_from_slice(emb.name.as_bytes());
773            write_varu32(buf, emb.vector.len() as u32);
774            for f in &emb.vector {
775                buf.extend_from_slice(&f.to_le_bytes());
776            }
777            write_varu32(buf, emb.model.len() as u32);
778            buf.extend_from_slice(emb.model.as_bytes());
779        }
780
781        // Cross-refs
782        write_varu32(buf, entity.cross_refs().len() as u32);
783        for cross_ref in entity.cross_refs() {
784            write_varu64(buf, cross_ref.source.raw());
785            write_varu64(buf, cross_ref.target.raw());
786            buf.push(cross_ref.ref_type.to_byte());
787            if format_version >= STORE_VERSION_V2 {
788                write_varu32(buf, cross_ref.target_collection.len() as u32);
789                buf.extend_from_slice(cross_ref.target_collection.as_bytes());
790                buf.extend_from_slice(&cross_ref.weight.to_le_bytes());
791                write_varu64(buf, cross_ref.created_at);
792            }
793        }
794
795        // Sequence ID
796        write_varu64(buf, entity.sequence_id);
797
798        if format_version >= STORE_VERSION_V4 {
799            if let EntityData::QueueMessage(message) = &entity.data {
800                buf.push(u8::from(message.priority.is_some()));
801                if let Some(priority) = message.priority {
802                    buf.extend_from_slice(&priority.to_le_bytes());
803                }
804                write_varu32(buf, message.max_attempts);
805                buf.push(u8::from(message.acked));
806            }
807        }
808    }
809
810    /// Read a Value from binary buffer
811    /// Type bytes: 0=Null, 1=Boolean, 2=Integer, 3=UnsignedInteger, 4=Float,
812    /// 5=Text, 6=Blob, 7=Timestamp, 8=Duration, 9=IpAddr, 10=MacAddr,
813    /// 11=Vector, 12=Json, 13=Uuid, 14=NodeRef, 15=EdgeRef, 16=VectorRef, 17=RowRef
814    fn read_value_binary(buf: &[u8], pos: &mut usize) -> Result<Value, Box<dyn std::error::Error>> {
815        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
816
817        let type_byte = buf[*pos];
818        *pos += 1;
819
820        Ok(match type_byte {
821            0 => Value::Null,
822            1 => {
823                let b = buf[*pos] != 0;
824                *pos += 1;
825                Value::Boolean(b)
826            }
827            2 => {
828                let val = i64::from_le_bytes([
829                    buf[*pos],
830                    buf[*pos + 1],
831                    buf[*pos + 2],
832                    buf[*pos + 3],
833                    buf[*pos + 4],
834                    buf[*pos + 5],
835                    buf[*pos + 6],
836                    buf[*pos + 7],
837                ]);
838                *pos += 8;
839                Value::Integer(val)
840            }
841            3 => {
842                let val = u64::from_le_bytes([
843                    buf[*pos],
844                    buf[*pos + 1],
845                    buf[*pos + 2],
846                    buf[*pos + 3],
847                    buf[*pos + 4],
848                    buf[*pos + 5],
849                    buf[*pos + 6],
850                    buf[*pos + 7],
851                ]);
852                *pos += 8;
853                Value::UnsignedInteger(val)
854            }
855            4 => {
856                let val = f64::from_le_bytes([
857                    buf[*pos],
858                    buf[*pos + 1],
859                    buf[*pos + 2],
860                    buf[*pos + 3],
861                    buf[*pos + 4],
862                    buf[*pos + 5],
863                    buf[*pos + 6],
864                    buf[*pos + 7],
865                ]);
866                *pos += 8;
867                Value::Float(val)
868            }
869            5 => {
870                let len = Self::read_varu32_safe(buf, pos)?;
871                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
872                *pos += len;
873                Value::text(s)
874            }
875            6 => {
876                let len = Self::read_varu32_safe(buf, pos)?;
877                let bytes = buf[*pos..*pos + len].to_vec();
878                *pos += len;
879                Value::Blob(bytes)
880            }
881            7 => {
882                let val = i64::from_le_bytes([
883                    buf[*pos],
884                    buf[*pos + 1],
885                    buf[*pos + 2],
886                    buf[*pos + 3],
887                    buf[*pos + 4],
888                    buf[*pos + 5],
889                    buf[*pos + 6],
890                    buf[*pos + 7],
891                ]);
892                *pos += 8;
893                Value::Timestamp(val)
894            }
895            8 => {
896                let val = i64::from_le_bytes([
897                    buf[*pos],
898                    buf[*pos + 1],
899                    buf[*pos + 2],
900                    buf[*pos + 3],
901                    buf[*pos + 4],
902                    buf[*pos + 5],
903                    buf[*pos + 6],
904                    buf[*pos + 7],
905                ]);
906                *pos += 8;
907                Value::Duration(val)
908            }
909            9 => {
910                // IpAddr: first byte = version (4 or 6)
911                let version = buf[*pos];
912                *pos += 1;
913                if version == 4 {
914                    let octets = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
915                    *pos += 4;
916                    Value::IpAddr(IpAddr::V4(Ipv4Addr::from(octets)))
917                } else {
918                    let mut octets = [0u8; 16];
919                    octets.copy_from_slice(&buf[*pos..*pos + 16]);
920                    *pos += 16;
921                    Value::IpAddr(IpAddr::V6(Ipv6Addr::from(octets)))
922                }
923            }
924            10 => {
925                let mut mac = [0u8; 6];
926                mac.copy_from_slice(&buf[*pos..*pos + 6]);
927                *pos += 6;
928                Value::MacAddr(mac)
929            }
930            11 => {
931                let len = Self::read_varu32_safe(buf, pos)?;
932                let mut vector = Vec::with_capacity(len);
933                for _ in 0..len {
934                    let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
935                    *pos += 4;
936                    vector.push(f32::from_le_bytes(bytes));
937                }
938                Value::Vector(vector)
939            }
940            12 => {
941                let len = Self::read_varu32_safe(buf, pos)?;
942                let bytes = buf[*pos..*pos + len].to_vec();
943                *pos += len;
944                Value::Json(bytes)
945            }
946            13 => {
947                let mut uuid = [0u8; 16];
948                uuid.copy_from_slice(&buf[*pos..*pos + 16]);
949                *pos += 16;
950                Value::Uuid(uuid)
951            }
952            14 => {
953                let len = Self::read_varu32_safe(buf, pos)?;
954                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
955                *pos += len;
956                Value::NodeRef(s)
957            }
958            15 => {
959                let len = Self::read_varu32_safe(buf, pos)?;
960                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
961                *pos += len;
962                Value::EdgeRef(s)
963            }
964            16 => {
965                let len = Self::read_varu32_safe(buf, pos)?;
966                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
967                *pos += len;
968                let id = u64::from_le_bytes([
969                    buf[*pos],
970                    buf[*pos + 1],
971                    buf[*pos + 2],
972                    buf[*pos + 3],
973                    buf[*pos + 4],
974                    buf[*pos + 5],
975                    buf[*pos + 6],
976                    buf[*pos + 7],
977                ]);
978                *pos += 8;
979                Value::VectorRef(s, id)
980            }
981            17 => {
982                let len = Self::read_varu32_safe(buf, pos)?;
983                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
984                *pos += len;
985                let id = u64::from_le_bytes([
986                    buf[*pos],
987                    buf[*pos + 1],
988                    buf[*pos + 2],
989                    buf[*pos + 3],
990                    buf[*pos + 4],
991                    buf[*pos + 5],
992                    buf[*pos + 6],
993                    buf[*pos + 7],
994                ]);
995                *pos += 8;
996                Value::RowRef(s, id)
997            }
998            18 => {
999                let rgb = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1000                *pos += 3;
1001                Value::Color(rgb)
1002            }
1003            19 => {
1004                let len = Self::read_varu32_safe(buf, pos)?;
1005                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1006                *pos += len;
1007                Value::Email(s)
1008            }
1009            20 => {
1010                let len = Self::read_varu32_safe(buf, pos)?;
1011                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1012                *pos += len;
1013                Value::Url(s)
1014            }
1015            21 => {
1016                let val = u64::from_le_bytes([
1017                    buf[*pos],
1018                    buf[*pos + 1],
1019                    buf[*pos + 2],
1020                    buf[*pos + 3],
1021                    buf[*pos + 4],
1022                    buf[*pos + 5],
1023                    buf[*pos + 6],
1024                    buf[*pos + 7],
1025                ]);
1026                *pos += 8;
1027                Value::Phone(val)
1028            }
1029            22 => {
1030                let val =
1031                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1032                *pos += 4;
1033                Value::Semver(val)
1034            }
1035            23 => {
1036                let ip =
1037                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1038                *pos += 4;
1039                let prefix = buf[*pos];
1040                *pos += 1;
1041                Value::Cidr(ip, prefix)
1042            }
1043            24 => {
1044                let val =
1045                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1046                *pos += 4;
1047                Value::Date(val)
1048            }
1049            25 => {
1050                let val =
1051                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1052                *pos += 4;
1053                Value::Time(val)
1054            }
1055            26 => {
1056                let val = i64::from_le_bytes([
1057                    buf[*pos],
1058                    buf[*pos + 1],
1059                    buf[*pos + 2],
1060                    buf[*pos + 3],
1061                    buf[*pos + 4],
1062                    buf[*pos + 5],
1063                    buf[*pos + 6],
1064                    buf[*pos + 7],
1065                ]);
1066                *pos += 8;
1067                Value::Decimal(val)
1068            }
1069            27 => {
1070                let val = buf[*pos];
1071                *pos += 1;
1072                Value::EnumValue(val)
1073            }
1074            28 => {
1075                let len = Self::read_varu32_safe(buf, pos)?;
1076                let mut elems = Vec::with_capacity(len);
1077                for _ in 0..len {
1078                    elems.push(Self::read_value_binary(buf, pos)?);
1079                }
1080                Value::Array(elems)
1081            }
1082            29 => {
1083                let val = i64::from_le_bytes([
1084                    buf[*pos],
1085                    buf[*pos + 1],
1086                    buf[*pos + 2],
1087                    buf[*pos + 3],
1088                    buf[*pos + 4],
1089                    buf[*pos + 5],
1090                    buf[*pos + 6],
1091                    buf[*pos + 7],
1092                ]);
1093                *pos += 8;
1094                Value::TimestampMs(val)
1095            }
1096            30 => {
1097                let val =
1098                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1099                *pos += 4;
1100                Value::Ipv4(val)
1101            }
1102            31 => {
1103                let mut bytes = [0u8; 16];
1104                bytes.copy_from_slice(&buf[*pos..*pos + 16]);
1105                *pos += 16;
1106                Value::Ipv6(bytes)
1107            }
1108            32 => {
1109                let ip =
1110                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1111                *pos += 4;
1112                let mask =
1113                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1114                *pos += 4;
1115                Value::Subnet(ip, mask)
1116            }
1117            33 => {
1118                let val = u16::from_le_bytes([buf[*pos], buf[*pos + 1]]);
1119                *pos += 2;
1120                Value::Port(val)
1121            }
1122            34 => {
1123                let val =
1124                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1125                *pos += 4;
1126                Value::Latitude(val)
1127            }
1128            35 => {
1129                let val =
1130                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1131                *pos += 4;
1132                Value::Longitude(val)
1133            }
1134            36 => {
1135                let lat =
1136                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1137                *pos += 4;
1138                let lon =
1139                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1140                *pos += 4;
1141                Value::GeoPoint(lat, lon)
1142            }
1143            37 => {
1144                let c = [buf[*pos], buf[*pos + 1]];
1145                *pos += 2;
1146                Value::Country2(c)
1147            }
1148            38 => {
1149                let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1150                *pos += 3;
1151                Value::Country3(c)
1152            }
1153            39 => {
1154                let c = [buf[*pos], buf[*pos + 1]];
1155                *pos += 2;
1156                Value::Lang2(c)
1157            }
1158            40 => {
1159                let c = [
1160                    buf[*pos],
1161                    buf[*pos + 1],
1162                    buf[*pos + 2],
1163                    buf[*pos + 3],
1164                    buf[*pos + 4],
1165                ];
1166                *pos += 5;
1167                Value::Lang5(c)
1168            }
1169            41 => {
1170                let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1171                *pos += 3;
1172                Value::Currency(c)
1173            }
1174            42 => {
1175                let rgba = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
1176                *pos += 4;
1177                Value::ColorAlpha(rgba)
1178            }
1179            43 => {
1180                let val = i64::from_le_bytes([
1181                    buf[*pos],
1182                    buf[*pos + 1],
1183                    buf[*pos + 2],
1184                    buf[*pos + 3],
1185                    buf[*pos + 4],
1186                    buf[*pos + 5],
1187                    buf[*pos + 6],
1188                    buf[*pos + 7],
1189                ]);
1190                *pos += 8;
1191                Value::BigInt(val)
1192            }
1193            44 => {
1194                let col_len = Self::read_varu32_safe(buf, pos)?;
1195                let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1196                *pos += col_len;
1197                let key_len = Self::read_varu32_safe(buf, pos)?;
1198                let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
1199                *pos += key_len;
1200                Value::KeyRef(col, key)
1201            }
1202            45 => {
1203                let col_len = Self::read_varu32_safe(buf, pos)?;
1204                let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1205                *pos += col_len;
1206                let id = u64::from_le_bytes([
1207                    buf[*pos],
1208                    buf[*pos + 1],
1209                    buf[*pos + 2],
1210                    buf[*pos + 3],
1211                    buf[*pos + 4],
1212                    buf[*pos + 5],
1213                    buf[*pos + 6],
1214                    buf[*pos + 7],
1215                ]);
1216                *pos += 8;
1217                Value::DocRef(col, id)
1218            }
1219            46 => {
1220                let len = Self::read_varu32_safe(buf, pos)?;
1221                let name = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1222                *pos += len;
1223                Value::TableRef(name)
1224            }
1225            47 => {
1226                let val =
1227                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1228                *pos += 4;
1229                Value::PageRef(val)
1230            }
1231            48 => {
1232                let len = Self::read_varu32_safe(buf, pos)?;
1233                let bytes = buf[*pos..*pos + len].to_vec();
1234                *pos += len;
1235                Value::Secret(bytes)
1236            }
1237            49 => {
1238                let len = Self::read_varu32_safe(buf, pos)?;
1239                let hash = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1240                *pos += len;
1241                Value::Password(hash)
1242            }
1243            50 => {
1244                let len = Self::read_varu32_safe(buf, pos)?;
1245                let code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1246                *pos += len;
1247                Value::AssetCode(code)
1248            }
1249            51 => {
1250                let len = Self::read_varu32_safe(buf, pos)?;
1251                let asset_code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1252                *pos += len;
1253                let scale = buf[*pos];
1254                *pos += 1;
1255                let minor_units = i64::from_le_bytes([
1256                    buf[*pos],
1257                    buf[*pos + 1],
1258                    buf[*pos + 2],
1259                    buf[*pos + 3],
1260                    buf[*pos + 4],
1261                    buf[*pos + 5],
1262                    buf[*pos + 6],
1263                    buf[*pos + 7],
1264                ]);
1265                *pos += 8;
1266                Value::Money {
1267                    asset_code,
1268                    minor_units,
1269                    scale,
1270                }
1271            }
1272            // C3 TOAST: compressed Text (0x85) and Blob (0x86).
1273            0x85 => {
1274                let orig_len = Self::read_varu32_safe(buf, pos)?;
1275                let comp_len = Self::read_varu32_safe(buf, pos)?;
1276                let compressed = &buf[*pos..*pos + comp_len];
1277                *pos += comp_len;
1278                let mut out = vec![0u8; orig_len];
1279                zstd::bulk::decompress_to_buffer(compressed, &mut out)
1280                    .map_err(|e| format!("C3 Text decompress: {e}"))?;
1281                Value::text(String::from_utf8(out).map_err(|e| format!("C3 Text UTF-8: {e}"))?)
1282            }
1283            0x86 => {
1284                let orig_len = Self::read_varu32_safe(buf, pos)?;
1285                let comp_len = Self::read_varu32_safe(buf, pos)?;
1286                let compressed = &buf[*pos..*pos + comp_len];
1287                *pos += comp_len;
1288                let mut out = vec![0u8; orig_len];
1289                zstd::bulk::decompress_to_buffer(compressed, &mut out)
1290                    .map_err(|e| format!("C3 Blob decompress: {e}"))?;
1291                Value::Blob(out)
1292            }
1293            _ => return Err(format!("Unknown Value type: {}", type_byte).into()),
1294        })
1295    }
1296
1297    /// Write a Value to binary buffer
1298    /// Type bytes: 0=Null, 1=Boolean, 2=Integer, 3=UnsignedInteger, 4=Float,
1299    /// 5=Text, 6=Blob, 7=Timestamp, 8=Duration, 9=IpAddr, 10=MacAddr,
1300    /// 11=Vector, 12=Json, 13=Uuid, 14=NodeRef, 15=EdgeRef, 16=VectorRef, 17=RowRef
1301    fn write_value_binary(buf: &mut Vec<u8>, value: &Value) {
1302        use std::net::IpAddr;
1303
1304        match value {
1305            Value::Null => buf.push(0),
1306            Value::Boolean(b) => {
1307                buf.push(1);
1308                buf.push(if *b { 1 } else { 0 });
1309            }
1310            Value::Integer(i) => {
1311                buf.push(2);
1312                buf.extend_from_slice(&i.to_le_bytes());
1313            }
1314            Value::UnsignedInteger(u) => {
1315                buf.push(3);
1316                buf.extend_from_slice(&u.to_le_bytes());
1317            }
1318            Value::Float(f) => {
1319                buf.push(4);
1320                buf.extend_from_slice(&f.to_le_bytes());
1321            }
1322            Value::Text(s) => {
1323                // C3 TOAST: compress Text values > 2 KiB with zstd.
1324                // Type 0x85 = compressed Text; type 5 = uncompressed (existing).
1325                // Layout for 0x85: [0x85][orig_len: varuint][comp_len: varuint][compressed bytes]
1326                const TEXT_COMPRESS_THRESHOLD: usize = 2048;
1327                if s.len() >= TEXT_COMPRESS_THRESHOLD {
1328                    if let Ok(compressed) = zstd::bulk::compress(s.as_bytes(), 3) {
1329                        if compressed.len() < s.len() {
1330                            buf.push(0x85); // compressed Text marker
1331                            write_varu32(buf, s.len() as u32); // orig_len (decompression hint)
1332                            write_varu32(buf, compressed.len() as u32);
1333                            buf.extend_from_slice(&compressed);
1334                            return; // written — skip uncompressed path
1335                        }
1336                    }
1337                }
1338                buf.push(5);
1339                write_varu32(buf, s.len() as u32);
1340                buf.extend_from_slice(s.as_bytes());
1341            }
1342            Value::Blob(bytes) => {
1343                // C3 TOAST: compress Blob values > 2 KiB with zstd.
1344                // Type 0x86 = compressed Blob; type 6 = uncompressed (existing).
1345                const BLOB_COMPRESS_THRESHOLD: usize = 2048;
1346                if bytes.len() >= BLOB_COMPRESS_THRESHOLD {
1347                    if let Ok(compressed) = zstd::bulk::compress(bytes.as_slice(), 3) {
1348                        if compressed.len() < bytes.len() {
1349                            buf.push(0x86); // compressed Blob marker
1350                            write_varu32(buf, bytes.len() as u32);
1351                            write_varu32(buf, compressed.len() as u32);
1352                            buf.extend_from_slice(&compressed);
1353                            return;
1354                        }
1355                    }
1356                }
1357                buf.push(6);
1358                write_varu32(buf, bytes.len() as u32);
1359                buf.extend_from_slice(bytes);
1360            }
1361            Value::Timestamp(t) => {
1362                buf.push(7);
1363                buf.extend_from_slice(&t.to_le_bytes());
1364            }
1365            Value::Duration(d) => {
1366                buf.push(8);
1367                buf.extend_from_slice(&d.to_le_bytes());
1368            }
1369            Value::IpAddr(ip) => {
1370                buf.push(9);
1371                match ip {
1372                    IpAddr::V4(v4) => {
1373                        buf.push(4);
1374                        buf.extend_from_slice(&v4.octets());
1375                    }
1376                    IpAddr::V6(v6) => {
1377                        buf.push(6);
1378                        buf.extend_from_slice(&v6.octets());
1379                    }
1380                }
1381            }
1382            Value::MacAddr(mac) => {
1383                buf.push(10);
1384                buf.extend_from_slice(mac);
1385            }
1386            Value::Vector(vec) => {
1387                buf.push(11);
1388                write_varu32(buf, vec.len() as u32);
1389                for f in vec {
1390                    buf.extend_from_slice(&f.to_le_bytes());
1391                }
1392            }
1393            Value::Json(bytes) => {
1394                buf.push(12);
1395                write_varu32(buf, bytes.len() as u32);
1396                buf.extend_from_slice(bytes);
1397            }
1398            Value::Uuid(uuid) => {
1399                buf.push(13);
1400                buf.extend_from_slice(uuid);
1401            }
1402            Value::NodeRef(s) => {
1403                buf.push(14);
1404                write_varu32(buf, s.len() as u32);
1405                buf.extend_from_slice(s.as_bytes());
1406            }
1407            Value::EdgeRef(s) => {
1408                buf.push(15);
1409                write_varu32(buf, s.len() as u32);
1410                buf.extend_from_slice(s.as_bytes());
1411            }
1412            Value::VectorRef(s, id) => {
1413                buf.push(16);
1414                write_varu32(buf, s.len() as u32);
1415                buf.extend_from_slice(s.as_bytes());
1416                buf.extend_from_slice(&id.to_le_bytes());
1417            }
1418            Value::RowRef(s, id) => {
1419                buf.push(17);
1420                write_varu32(buf, s.len() as u32);
1421                buf.extend_from_slice(s.as_bytes());
1422                buf.extend_from_slice(&id.to_le_bytes());
1423            }
1424            Value::Color(rgb) => {
1425                buf.push(18);
1426                buf.extend_from_slice(rgb);
1427            }
1428            Value::Email(s) => {
1429                buf.push(19);
1430                write_varu32(buf, s.len() as u32);
1431                buf.extend_from_slice(s.as_bytes());
1432            }
1433            Value::Url(s) => {
1434                buf.push(20);
1435                write_varu32(buf, s.len() as u32);
1436                buf.extend_from_slice(s.as_bytes());
1437            }
1438            Value::Phone(n) => {
1439                buf.push(21);
1440                buf.extend_from_slice(&n.to_le_bytes());
1441            }
1442            Value::Semver(v) => {
1443                buf.push(22);
1444                buf.extend_from_slice(&v.to_le_bytes());
1445            }
1446            Value::Cidr(ip, prefix) => {
1447                buf.push(23);
1448                buf.extend_from_slice(&ip.to_le_bytes());
1449                buf.push(*prefix);
1450            }
1451            Value::Date(d) => {
1452                buf.push(24);
1453                buf.extend_from_slice(&d.to_le_bytes());
1454            }
1455            Value::Time(t) => {
1456                buf.push(25);
1457                buf.extend_from_slice(&t.to_le_bytes());
1458            }
1459            Value::Decimal(v) => {
1460                buf.push(26);
1461                buf.extend_from_slice(&v.to_le_bytes());
1462            }
1463            Value::EnumValue(i) => {
1464                buf.push(27);
1465                buf.push(*i);
1466            }
1467            Value::Array(elems) => {
1468                buf.push(28);
1469                write_varu32(buf, elems.len() as u32);
1470                for elem in elems {
1471                    Self::write_value_binary(buf, elem);
1472                }
1473            }
1474            Value::TimestampMs(v) => {
1475                buf.push(29);
1476                buf.extend_from_slice(&v.to_le_bytes());
1477            }
1478            Value::Ipv4(v) => {
1479                buf.push(30);
1480                buf.extend_from_slice(&v.to_le_bytes());
1481            }
1482            Value::Ipv6(bytes) => {
1483                buf.push(31);
1484                buf.extend_from_slice(bytes);
1485            }
1486            Value::Subnet(ip, mask) => {
1487                buf.push(32);
1488                buf.extend_from_slice(&ip.to_le_bytes());
1489                buf.extend_from_slice(&mask.to_le_bytes());
1490            }
1491            Value::Port(v) => {
1492                buf.push(33);
1493                buf.extend_from_slice(&v.to_le_bytes());
1494            }
1495            Value::Latitude(v) => {
1496                buf.push(34);
1497                buf.extend_from_slice(&v.to_le_bytes());
1498            }
1499            Value::Longitude(v) => {
1500                buf.push(35);
1501                buf.extend_from_slice(&v.to_le_bytes());
1502            }
1503            Value::GeoPoint(lat, lon) => {
1504                buf.push(36);
1505                buf.extend_from_slice(&lat.to_le_bytes());
1506                buf.extend_from_slice(&lon.to_le_bytes());
1507            }
1508            Value::Country2(c) => {
1509                buf.push(37);
1510                buf.extend_from_slice(c);
1511            }
1512            Value::Country3(c) => {
1513                buf.push(38);
1514                buf.extend_from_slice(c);
1515            }
1516            Value::Lang2(c) => {
1517                buf.push(39);
1518                buf.extend_from_slice(c);
1519            }
1520            Value::Lang5(c) => {
1521                buf.push(40);
1522                buf.extend_from_slice(c);
1523            }
1524            Value::Currency(c) => {
1525                buf.push(41);
1526                buf.extend_from_slice(c);
1527            }
1528            Value::AssetCode(code) => {
1529                buf.push(50);
1530                write_varu32(buf, code.len() as u32);
1531                buf.extend_from_slice(code.as_bytes());
1532            }
1533            Value::Money {
1534                asset_code,
1535                minor_units,
1536                scale,
1537            } => {
1538                buf.push(51);
1539                write_varu32(buf, asset_code.len() as u32);
1540                buf.extend_from_slice(asset_code.as_bytes());
1541                buf.push(*scale);
1542                buf.extend_from_slice(&minor_units.to_le_bytes());
1543            }
1544            Value::ColorAlpha(rgba) => {
1545                buf.push(42);
1546                buf.extend_from_slice(rgba);
1547            }
1548            Value::BigInt(v) => {
1549                buf.push(43);
1550                buf.extend_from_slice(&v.to_le_bytes());
1551            }
1552            Value::KeyRef(col, key) => {
1553                buf.push(44);
1554                write_varu32(buf, col.len() as u32);
1555                buf.extend_from_slice(col.as_bytes());
1556                write_varu32(buf, key.len() as u32);
1557                buf.extend_from_slice(key.as_bytes());
1558            }
1559            Value::DocRef(col, id) => {
1560                buf.push(45);
1561                write_varu32(buf, col.len() as u32);
1562                buf.extend_from_slice(col.as_bytes());
1563                buf.extend_from_slice(&id.to_le_bytes());
1564            }
1565            Value::TableRef(name) => {
1566                buf.push(46);
1567                write_varu32(buf, name.len() as u32);
1568                buf.extend_from_slice(name.as_bytes());
1569            }
1570            Value::PageRef(page_id) => {
1571                buf.push(47);
1572                buf.extend_from_slice(&page_id.to_le_bytes());
1573            }
1574            Value::Secret(bytes) => {
1575                buf.push(48);
1576                write_varu32(buf, bytes.len() as u32);
1577                buf.extend_from_slice(bytes);
1578            }
1579            Value::Password(hash) => {
1580                buf.push(49);
1581                write_varu32(buf, hash.len() as u32);
1582                buf.extend_from_slice(hash.as_bytes());
1583            }
1584        }
1585    }
1586}