Skip to main content

reddb_server/storage/unified/store/
impl_file.rs

1use super::*;
2
3impl UnifiedStore {
4    pub fn new() -> Self {
5        Self::with_config(UnifiedStoreConfig::default())
6    }
7
8    /// Get the current storage format version
9    pub fn format_version(&self) -> u32 {
10        self.format_version.load(Ordering::SeqCst)
11    }
12
13    pub(crate) fn set_format_version(&self, version: u32) {
14        self.format_version.store(version, Ordering::SeqCst);
15    }
16
17    /// Allocate a global entity ID
18    pub fn next_entity_id(&self) -> EntityId {
19        EntityId::new(self.next_entity_id.fetch_add(1, Ordering::SeqCst))
20    }
21
22    /// Reserve `n` contiguous global entity IDs with one fetch_add.
23    /// Caller assigns `id = EntityId::new(start + i)` per entity.
24    pub fn reserve_entity_ids(&self, n: u64) -> std::ops::Range<u64> {
25        let start = self.next_entity_id.fetch_add(n, Ordering::SeqCst);
26        start..start + n
27    }
28
29    pub(crate) fn register_entity_id(&self, id: EntityId) {
30        let candidate = id.raw().saturating_add(1);
31        let mut current = self.next_entity_id.load(Ordering::SeqCst);
32        while candidate > current {
33            match self.next_entity_id.compare_exchange(
34                current,
35                candidate,
36                Ordering::SeqCst,
37                Ordering::SeqCst,
38            ) {
39                Ok(_) => break,
40                Err(updated) => current = updated,
41            }
42        }
43    }
44
45    /// Load store from binary file
46    ///
47    /// Binary dump framing is defined by `reddb_file::native_store`.
48    ///
49    /// Store payload shape:
50    /// ```text
51    /// [collection_count: varu32]
52    /// [collections...]
53    /// [cross_ref_count: varu32]
54    /// [cross_refs...]
55    /// ```
56    pub fn load_from_file(path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
57        let file = File::open(path)?;
58        let mut reader = BufReader::new(file);
59        let mut buf = Vec::new();
60        reader.read_to_end(&mut buf)?;
61        Self::load_from_bytes(&buf)
62    }
63
64    pub(crate) fn load_from_bytes(buf: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
65        Self::load_from_bytes_with_config(buf, UnifiedStoreConfig::default())
66    }
67
68    pub(crate) fn load_from_bytes_with_config(
69        buf: &[u8],
70        config: UnifiedStoreConfig,
71    ) -> Result<Self, Box<dyn std::error::Error>> {
72        let mut buf = buf.to_vec();
73        let version =
74            reddb_file::decode_native_store_header(&buf).map_err(|err| err.to_string())?;
75        let mut pos = 8;
76
77        // V3+ has CRC32 footer — verify integrity before parsing
78        reddb_file::verify_native_store_crc32_footer(&mut buf, version)
79            .map_err(|err| err.to_string())?;
80
81        let store = Self::with_config(config);
82        store.set_format_version(version);
83
84        // Read collection count
85        let collection_count =
86            reddb_file::decode_native_dump_count(&buf, &mut pos).map_err(|e| e.to_string())?;
87
88        // Read each collection
89        for _ in 0..collection_count {
90            let collection_header =
91                reddb_file::decode_native_dump_collection_header(&buf, &mut pos)
92                    .map_err(|e| e.to_string())?;
93
94            // Read each entity — V7+ includes metadata alongside entity data.
95            for _ in 0..collection_header.entity_count {
96                if version >= STORE_VERSION_V7 {
97                    // Length-prefixed entity+metadata record (serialize_entity_record format)
98                    let record_bytes = reddb_file::decode_native_dump_entity_record(&buf, &mut pos)
99                        .map_err(|e| e.to_string())?;
100
101                    let (entity, metadata) = Self::deserialize_entity_record(record_bytes, version)
102                        .map_err(|e| format!("Entity record deserialization error: {e}"))?;
103
104                    store.insert_auto(&collection_header.name, entity.clone())?;
105
106                    if let Some(metadata) = metadata {
107                        if let Some(manager) = store.get_collection(&collection_header.name) {
108                            let _ = manager.set_metadata(entity.id, metadata);
109                        }
110                    }
111                } else {
112                    // V1–V6: entity only, no metadata
113                    let entity = Self::read_entity_binary(&buf, &mut pos, version)?;
114                    store.insert_auto(&collection_header.name, entity)?;
115                }
116            }
117        }
118
119        if pos < buf.len() {
120            // Read cross-references section
121            let cross_ref_count =
122                reddb_file::decode_native_dump_count(&buf, &mut pos).map_err(|e| e.to_string())?;
123
124            for _ in 0..cross_ref_count {
125                let cross_ref = reddb_file::decode_native_dump_cross_ref(&buf, &mut pos)
126                    .map_err(|e| e.to_string())?;
127                let ref_type = RefType::from_byte(cross_ref.ref_type);
128
129                let source_collection = store
130                    .get_any(EntityId::new(cross_ref.source_id))
131                    .map(|(name, _)| name)
132                    .unwrap_or_else(|| cross_ref.target_collection.clone());
133                let _ = store.add_cross_ref(
134                    &source_collection,
135                    EntityId::new(cross_ref.source_id),
136                    &cross_ref.target_collection,
137                    EntityId::new(cross_ref.target_id),
138                    ref_type,
139                    1.0,
140                );
141            }
142        }
143
144        // V10+: a trailing opaque auxiliary-metadata blob follows the
145        // cross-references. RedDB stores the collection contracts here so the
146        // single-file artifact carries each collection's declared model across
147        // a restart. Older dumps end at the cross-refs and have no blob.
148        if version >= reddb_file::STORE_VERSION_V10 && pos < buf.len() {
149            let aux = reddb_file::decode_native_dump_entity_record(&buf, &mut pos)
150                .map_err(|e| e.to_string())?;
151            if !aux.is_empty() {
152                store.set_aux_metadata(aux.to_vec());
153            }
154        }
155
156        if store.format_version() < STORE_VERSION_CURRENT {
157            store.set_format_version(STORE_VERSION_CURRENT);
158        }
159
160        Ok(store)
161    }
162
163    /// Save store to binary file
164    ///
165    /// Uses compact binary encoding with varint for efficient storage.
166    /// No JSON - pure binary with pages and indices.
167    pub fn save_to_file(&self, path: &Path) -> Result<(), Box<dyn std::error::Error>> {
168        let buf = self.to_binary_dump_bytes();
169        reddb_file::write_native_store_bytes_atomically(path, &buf)?;
170        Ok(())
171    }
172
173    pub(crate) fn to_binary_dump_bytes(&self) -> Vec<u8> {
174        let mut buf = Vec::new();
175
176        // Version 9 includes explicit table-row logical identity plus MVCC
177        // xmin/xmax alongside the V7 metadata envelope.
178        buf.extend_from_slice(&reddb_file::encode_native_store_header(
179            STORE_VERSION_CURRENT,
180        ));
181
182        // Get all collections
183        let collections = self.collections.read();
184        reddb_file::encode_native_dump_count(&mut buf, collections.len() as u32);
185
186        let fv = STORE_VERSION_CURRENT;
187        for (name, manager) in collections.iter() {
188            // Get all entities from this collection
189            let entities = manager.query_all(|_| true);
190            reddb_file::encode_native_dump_collection_header(&mut buf, name, entities.len() as u32);
191
192            // V7+: serialize entity+metadata as a length-prefixed record.
193            // Each record: [u32 len][serialize_entity_record bytes]
194            for entity in entities {
195                let metadata = manager.get_metadata(entity.id);
196                let record = Self::serialize_entity_record(&entity, metadata.as_ref(), fv);
197                reddb_file::encode_native_dump_entity_record(&mut buf, &record);
198            }
199        }
200
201        // Write cross-references
202        let cross_refs = self.cross_refs.read();
203        let total_refs: usize = cross_refs.values().map(|v| v.len()).sum();
204        reddb_file::encode_native_dump_count(&mut buf, total_refs as u32);
205
206        for (source_id, refs) in cross_refs.iter() {
207            for (target_id, ref_type, collection) in refs {
208                reddb_file::encode_native_dump_cross_ref(
209                    &mut buf,
210                    source_id.raw(),
211                    target_id.raw(),
212                    ref_type.to_byte(),
213                    collection,
214                );
215            }
216        }
217
218        // V10+: trailing opaque auxiliary-metadata blob (collection contracts
219        // for the single-file artifact). Always written — empty when unused —
220        // so the read side can unconditionally expect it for V10 dumps.
221        {
222            let aux = self.aux_metadata.read();
223            reddb_file::encode_native_dump_entity_record(&mut buf, &aux);
224        }
225
226        self.set_format_version(STORE_VERSION_CURRENT);
227
228        reddb_file::append_native_store_crc32_footer(&mut buf);
229        buf
230    }
231
232    /// Read entity from binary buffer
233    pub(crate) fn read_entity_binary(
234        buf: &[u8],
235        pos: &mut usize,
236        format_version: u32,
237    ) -> Result<UnifiedEntity, Box<dyn std::error::Error>> {
238        // Entity ID
239        let id = read_varu64(buf, pos).map_err(|e| format!("Failed to read entity id: {:?}", e))?;
240
241        // EntityKind type byte
242        let kind_type = buf[*pos];
243        *pos += 1;
244
245        // EntityKind details
246        let kind = match kind_type {
247            0 => {
248                // TableRow
249                let table_len = Self::read_varu32_safe(buf, pos)?;
250                let table = String::from_utf8(buf[*pos..*pos + table_len].to_vec())?;
251                *pos += table_len;
252                let row_id = Self::read_varu64_safe(buf, pos)?;
253                EntityKind::TableRow {
254                    table: table.into(),
255                    row_id,
256                }
257            }
258            1 => {
259                // GraphNode
260                let label_len = Self::read_varu32_safe(buf, pos)?;
261                let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
262                *pos += label_len;
263                let node_type_len = Self::read_varu32_safe(buf, pos)?;
264                let node_type = String::from_utf8(buf[*pos..*pos + node_type_len].to_vec())?;
265                *pos += node_type_len;
266                EntityKind::GraphNode(Box::new(GraphNodeKind { label, node_type }))
267            }
268            2 => {
269                // GraphEdge
270                let label_len = Self::read_varu32_safe(buf, pos)?;
271                let label = String::from_utf8(buf[*pos..*pos + label_len].to_vec())?;
272                *pos += label_len;
273                let from_node_len = Self::read_varu32_safe(buf, pos)?;
274                let from_node = String::from_utf8(buf[*pos..*pos + from_node_len].to_vec())?;
275                *pos += from_node_len;
276                let to_node_len = Self::read_varu32_safe(buf, pos)?;
277                let to_node = String::from_utf8(buf[*pos..*pos + to_node_len].to_vec())?;
278                *pos += to_node_len;
279                let weight =
280                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
281                *pos += 4;
282                EntityKind::GraphEdge(Box::new(GraphEdgeKind {
283                    label,
284                    from_node,
285                    to_node,
286                    weight,
287                }))
288            }
289            3 => {
290                // Vector
291                let collection_len = Self::read_varu32_safe(buf, pos)?;
292                let collection = String::from_utf8(buf[*pos..*pos + collection_len].to_vec())?;
293                *pos += collection_len;
294                EntityKind::Vector { collection }
295            }
296            4 => {
297                // TimeSeriesPoint
298                let series_len = Self::read_varu32_safe(buf, pos)?;
299                let series = String::from_utf8(buf[*pos..*pos + series_len].to_vec())?;
300                *pos += series_len;
301                let metric_len = Self::read_varu32_safe(buf, pos)?;
302                let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
303                *pos += metric_len;
304                EntityKind::TimeSeriesPoint(Box::new(TimeSeriesPointKind { series, metric }))
305            }
306            5 => {
307                // QueueMessage
308                let queue_len = Self::read_varu32_safe(buf, pos)?;
309                let queue = String::from_utf8(buf[*pos..*pos + queue_len].to_vec())?;
310                *pos += queue_len;
311                let position = Self::read_varu64_safe(buf, pos)?;
312                EntityKind::QueueMessage { queue, position }
313            }
314            _ => return Err(format!("Unknown EntityKind type: {}", kind_type).into()),
315        };
316
317        // EntityData type byte
318        let data_type = buf[*pos];
319        *pos += 1;
320
321        // EntityData
322        let mut data = match data_type {
323            0 => {
324                // Row
325                let col_count = Self::read_varu32_safe(buf, pos)?;
326                let mut columns = Vec::with_capacity(col_count);
327                for _ in 0..col_count {
328                    columns.push(Self::read_value_binary(buf, pos)?);
329                }
330                EntityData::Row(RowData::new(columns))
331            }
332            1 => {
333                // Node
334                let prop_count = Self::read_varu32_safe(buf, pos)?;
335                let mut properties = HashMap::new();
336                for _ in 0..prop_count {
337                    let key_len = Self::read_varu32_safe(buf, pos)?;
338                    let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
339                    *pos += key_len;
340                    let value = Self::read_value_binary(buf, pos)?;
341                    properties.insert(key, value);
342                }
343                EntityData::Node(NodeData::with_properties(properties))
344            }
345            2 => {
346                // Edge
347                let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
348                *pos += 4;
349                let weight = f32::from_le_bytes(weight_bytes);
350                let prop_count = Self::read_varu32_safe(buf, pos)?;
351                let mut properties = HashMap::new();
352                for _ in 0..prop_count {
353                    let key_len = Self::read_varu32_safe(buf, pos)?;
354                    let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
355                    *pos += key_len;
356                    let value = Self::read_value_binary(buf, pos)?;
357                    properties.insert(key, value);
358                }
359                let mut edge = EdgeData::new(weight);
360                edge.properties = properties;
361                EntityData::Edge(edge)
362            }
363            3 => {
364                // Vector
365                let dim = Self::read_varu32_safe(buf, pos)?;
366                let mut dense = Vec::with_capacity(dim);
367                for _ in 0..dim {
368                    let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
369                    *pos += 4;
370                    dense.push(f32::from_le_bytes(bytes));
371                }
372                let mut vector = VectorData::new(dense);
373                if format_version >= STORE_VERSION_V6 {
374                    let has_content = buf[*pos] != 0;
375                    *pos += 1;
376                    if has_content {
377                        let content_len = Self::read_varu32_safe(buf, pos)?;
378                        vector.content =
379                            Some(String::from_utf8(buf[*pos..*pos + content_len].to_vec())?);
380                        *pos += content_len;
381                    }
382                }
383                EntityData::Vector(vector)
384            }
385            4 => {
386                // TimeSeries
387                let metric_len = Self::read_varu32_safe(buf, pos)?;
388                let metric = String::from_utf8(buf[*pos..*pos + metric_len].to_vec())?;
389                *pos += metric_len;
390                let timestamp_ns = Self::read_varu64_safe(buf, pos)?;
391                let value_bytes = [
392                    buf[*pos],
393                    buf[*pos + 1],
394                    buf[*pos + 2],
395                    buf[*pos + 3],
396                    buf[*pos + 4],
397                    buf[*pos + 5],
398                    buf[*pos + 6],
399                    buf[*pos + 7],
400                ];
401                *pos += 8;
402                let mut tags = HashMap::new();
403                if format_version >= STORE_VERSION_V5 {
404                    let tag_count = Self::read_varu32_safe(buf, pos)?;
405                    tags = HashMap::with_capacity(tag_count);
406                    for _ in 0..tag_count {
407                        let key_len = Self::read_varu32_safe(buf, pos)?;
408                        let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
409                        *pos += key_len;
410                        let value_len = Self::read_varu32_safe(buf, pos)?;
411                        let value = String::from_utf8(buf[*pos..*pos + value_len].to_vec())?;
412                        *pos += value_len;
413                        tags.insert(key, value);
414                    }
415                }
416                EntityData::TimeSeries(crate::storage::unified::entity::TimeSeriesData {
417                    metric,
418                    timestamp_ns,
419                    value: f64::from_le_bytes(value_bytes),
420                    tags,
421                })
422            }
423            5 => {
424                // QueueMessage
425                let payload = Self::read_value_binary(buf, pos)?;
426                let enqueued_at_ns = Self::read_varu64_safe(buf, pos)?;
427                let attempts = Self::read_varu32_safe(buf, pos)? as u32;
428                EntityData::QueueMessage(crate::storage::unified::entity::QueueMessageData {
429                    payload,
430                    priority: None,
431                    enqueued_at_ns,
432                    attempts,
433                    max_attempts: 3,
434                    acked: false,
435                })
436            }
437            6 => {
438                // Row with named HashMap
439                let field_count = Self::read_varu32_safe(buf, pos)?;
440                let mut named = HashMap::with_capacity(field_count);
441                for _ in 0..field_count {
442                    let key_len = Self::read_varu32_safe(buf, pos)?;
443                    let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
444                    *pos += key_len;
445                    let value = Self::read_value_binary(buf, pos)?;
446                    named.insert(key, value);
447                }
448                EntityData::Row(RowData {
449                    columns: Vec::new(),
450                    named: Some(named),
451                    schema: None,
452                })
453            }
454            _ => return Err(format!("Unknown EntityData type: {}", data_type).into()),
455        };
456
457        // Timestamps
458        let created_at = Self::read_varu64_safe(buf, pos)?;
459        let updated_at = Self::read_varu64_safe(buf, pos)?;
460
461        // Embeddings count
462        let embedding_count = Self::read_varu32_safe(buf, pos)?;
463        let mut embeddings = Vec::with_capacity(embedding_count);
464        for _ in 0..embedding_count {
465            let name_len = Self::read_varu32_safe(buf, pos)?;
466            let name = String::from_utf8(buf[*pos..*pos + name_len].to_vec())?;
467            *pos += name_len;
468
469            let dim = Self::read_varu32_safe(buf, pos)?;
470            let mut vector = Vec::with_capacity(dim);
471            for _ in 0..dim {
472                let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
473                *pos += 4;
474                vector.push(f32::from_le_bytes(bytes));
475            }
476
477            let model_len = Self::read_varu32_safe(buf, pos)?;
478            let model = String::from_utf8(buf[*pos..*pos + model_len].to_vec())?;
479            *pos += model_len;
480
481            embeddings.push(EmbeddingSlot::new(name, vector, model));
482        }
483
484        // Cross-refs count
485        let cross_ref_count = Self::read_varu32_safe(buf, pos)?;
486        let mut cross_refs = Vec::with_capacity(cross_ref_count);
487        for _ in 0..cross_ref_count {
488            let source = Self::read_varu64_safe(buf, pos)?;
489            let target = Self::read_varu64_safe(buf, pos)?;
490            let ref_type_byte = buf[*pos];
491            *pos += 1;
492            let (target_collection, weight, created_at) = if format_version >= STORE_VERSION_V2 {
493                let coll_len = Self::read_varu32_safe(buf, pos)?;
494                let collection = String::from_utf8(buf[*pos..*pos + coll_len].to_vec())?;
495                *pos += coll_len;
496                let weight_bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
497                *pos += 4;
498                let weight = f32::from_le_bytes(weight_bytes);
499                let created_at = Self::read_varu64_safe(buf, pos)?;
500                (collection, weight, created_at)
501            } else {
502                (String::new(), 1.0, 0)
503            };
504
505            let mut cross_ref = CrossRef::new(
506                EntityId::new(source),
507                EntityId::new(target),
508                target_collection,
509                RefType::from_byte(ref_type_byte),
510            );
511            cross_ref.weight = weight;
512            cross_ref.created_at = created_at;
513            cross_refs.push(cross_ref);
514        }
515
516        // Sequence ID
517        let sequence_id = Self::read_varu64_safe(buf, pos)?;
518
519        if format_version >= STORE_VERSION_V4 {
520            if let EntityData::QueueMessage(message) = &mut data {
521                if *pos < buf.len() {
522                    let priority_present = buf[*pos] != 0;
523                    *pos += 1;
524                    message.priority = if priority_present {
525                        if *pos + 4 > buf.len() {
526                            return Err("truncated queue priority".into());
527                        }
528                        let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
529                        *pos += 4;
530                        Some(i32::from_le_bytes(bytes))
531                    } else {
532                        None
533                    };
534                    message.max_attempts = Self::read_varu32_safe(buf, pos)? as u32;
535                    if *pos >= buf.len() {
536                        return Err("truncated queue ack flag".into());
537                    }
538                    message.acked = buf[*pos] != 0;
539                    *pos += 1;
540                }
541            }
542        }
543
544        let mut entity = UnifiedEntity::new(EntityId::new(id), kind, data);
545        entity.created_at = created_at;
546        entity.updated_at = updated_at;
547        entity.sequence_id = sequence_id;
548        if format_version >= STORE_VERSION_V8 && *pos < buf.len() {
549            let has_logical_id = buf[*pos] != 0;
550            *pos += 1;
551            if has_logical_id {
552                let logical_id = Self::read_varu64_safe(buf, pos)?;
553                entity.set_logical_id(EntityId::new(logical_id));
554            }
555        }
556        if format_version >= STORE_VERSION_V9 && *pos < buf.len() {
557            let xmin = Self::read_varu64_safe(buf, pos)?;
558            let xmax = Self::read_varu64_safe(buf, pos)?;
559            entity.set_xmin(xmin);
560            entity.set_xmax(xmax);
561        }
562        if !embeddings.is_empty() || !cross_refs.is_empty() {
563            entity.embeddings_mut().extend(embeddings);
564            entity.cross_refs_mut().extend(cross_refs);
565        }
566
567        Ok(entity)
568    }
569
570    /// Safe varu32 reader that converts DecodeError to Box<dyn Error>
571    fn read_varu32_safe(buf: &[u8], pos: &mut usize) -> Result<usize, Box<dyn std::error::Error>> {
572        read_varu32(buf, pos)
573            .map(|v| v as usize)
574            .map_err(|e| format!("Decode error: {:?}", e).into())
575    }
576
577    /// Safe varu64 reader that converts DecodeError to Box<dyn Error>
578    fn read_varu64_safe(buf: &[u8], pos: &mut usize) -> Result<u64, Box<dyn std::error::Error>> {
579        read_varu64(buf, pos).map_err(|e| format!("Decode error: {:?}", e).into())
580    }
581
582    /// Write entity to binary buffer
583    pub(crate) fn write_entity_binary(
584        buf: &mut Vec<u8>,
585        entity: &UnifiedEntity,
586        format_version: u32,
587    ) {
588        // Entity ID
589        write_varu64(buf, entity.id.raw());
590
591        // EntityKind
592        match &entity.kind {
593            EntityKind::TableRow { table, row_id } => {
594                buf.push(0);
595                write_varu32(buf, table.len() as u32);
596                buf.extend_from_slice(table.as_bytes());
597                write_varu64(buf, *row_id);
598            }
599            EntityKind::GraphNode(ref node) => {
600                buf.push(1);
601                write_varu32(buf, node.label.len() as u32);
602                buf.extend_from_slice(node.label.as_bytes());
603                write_varu32(buf, node.node_type.len() as u32);
604                buf.extend_from_slice(node.node_type.as_bytes());
605            }
606            EntityKind::GraphEdge(ref edge) => {
607                buf.push(2);
608                write_varu32(buf, edge.label.len() as u32);
609                buf.extend_from_slice(edge.label.as_bytes());
610                write_varu32(buf, edge.from_node.len() as u32);
611                buf.extend_from_slice(edge.from_node.as_bytes());
612                write_varu32(buf, edge.to_node.len() as u32);
613                buf.extend_from_slice(edge.to_node.as_bytes());
614                buf.extend_from_slice(&edge.weight.to_le_bytes());
615            }
616            EntityKind::Vector { collection } => {
617                buf.push(3);
618                write_varu32(buf, collection.len() as u32);
619                buf.extend_from_slice(collection.as_bytes());
620            }
621            EntityKind::TimeSeriesPoint(ref ts) => {
622                buf.push(4);
623                write_varu32(buf, ts.series.len() as u32);
624                buf.extend_from_slice(ts.series.as_bytes());
625                write_varu32(buf, ts.metric.len() as u32);
626                buf.extend_from_slice(ts.metric.as_bytes());
627            }
628            EntityKind::QueueMessage { queue, position } => {
629                buf.push(5);
630                write_varu32(buf, queue.len() as u32);
631                buf.extend_from_slice(queue.as_bytes());
632                write_varu64(buf, *position);
633            }
634        }
635
636        // EntityData
637        match &entity.data {
638            EntityData::Row(row) => {
639                if let Some(ref named) = row.named {
640                    // Named row: type 6 = Row with named HashMap.
641                    // Sort by key so the on-disk byte sequence is
642                    // deterministic — replication relies on hashing the
643                    // serialized record to detect divergence, and a
644                    // HashMap-iteration-order race produces spurious
645                    // mismatches.
646                    buf.push(6);
647                    write_varu32(buf, named.len() as u32);
648                    let mut entries: Vec<_> = named.iter().collect();
649                    entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
650                    for (key, value) in entries {
651                        write_varu32(buf, key.len() as u32);
652                        buf.extend_from_slice(key.as_bytes());
653                        Self::write_value_binary(buf, value);
654                    }
655                } else {
656                    buf.push(0);
657                    write_varu32(buf, row.columns.len() as u32);
658                    for col in &row.columns {
659                        Self::write_value_binary(buf, col);
660                    }
661                }
662            }
663            EntityData::Node(node) => {
664                buf.push(1);
665                write_varu32(buf, node.properties.len() as u32);
666                let mut entries: Vec<_> = node.properties.iter().collect();
667                entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
668                for (key, value) in entries {
669                    write_varu32(buf, key.len() as u32);
670                    buf.extend_from_slice(key.as_bytes());
671                    Self::write_value_binary(buf, value);
672                }
673            }
674            EntityData::Edge(edge) => {
675                buf.push(2);
676                buf.extend_from_slice(&edge.weight.to_le_bytes());
677                write_varu32(buf, edge.properties.len() as u32);
678                let mut entries: Vec<_> = edge.properties.iter().collect();
679                entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
680                for (key, value) in entries {
681                    write_varu32(buf, key.len() as u32);
682                    buf.extend_from_slice(key.as_bytes());
683                    Self::write_value_binary(buf, value);
684                }
685            }
686            EntityData::Vector(vec) => {
687                buf.push(3);
688                write_varu32(buf, vec.dense.len() as u32);
689                for f in &vec.dense {
690                    buf.extend_from_slice(&f.to_le_bytes());
691                }
692                if format_version >= STORE_VERSION_V6 {
693                    buf.push(u8::from(vec.content.is_some()));
694                    if let Some(content) = &vec.content {
695                        write_varu32(buf, content.len() as u32);
696                        buf.extend_from_slice(content.as_bytes());
697                    }
698                }
699            }
700            EntityData::TimeSeries(ts) => {
701                buf.push(4);
702                write_varu32(buf, ts.metric.len() as u32);
703                buf.extend_from_slice(ts.metric.as_bytes());
704                write_varu64(buf, ts.timestamp_ns);
705                buf.extend_from_slice(&ts.value.to_le_bytes());
706                if format_version >= STORE_VERSION_V5 {
707                    write_varu32(buf, ts.tags.len() as u32);
708                    let mut tag_entries: Vec<_> = ts.tags.iter().collect();
709                    tag_entries.sort_by(|a, b| a.0.as_str().cmp(b.0.as_str()));
710                    for (key, value) in tag_entries {
711                        write_varu32(buf, key.len() as u32);
712                        buf.extend_from_slice(key.as_bytes());
713                        write_varu32(buf, value.len() as u32);
714                        buf.extend_from_slice(value.as_bytes());
715                    }
716                }
717            }
718            EntityData::QueueMessage(msg) => {
719                buf.push(5);
720                Self::write_value_binary(buf, &msg.payload);
721                write_varu64(buf, msg.enqueued_at_ns);
722                write_varu32(buf, msg.attempts);
723            }
724        }
725
726        // Timestamps
727        write_varu64(buf, entity.created_at);
728        write_varu64(buf, entity.updated_at);
729
730        // Embeddings
731        write_varu32(buf, entity.embeddings().len() as u32);
732        for emb in entity.embeddings() {
733            write_varu32(buf, emb.name.len() as u32);
734            buf.extend_from_slice(emb.name.as_bytes());
735            write_varu32(buf, emb.vector.len() as u32);
736            for f in &emb.vector {
737                buf.extend_from_slice(&f.to_le_bytes());
738            }
739            write_varu32(buf, emb.model.len() as u32);
740            buf.extend_from_slice(emb.model.as_bytes());
741        }
742
743        // Cross-refs
744        write_varu32(buf, entity.cross_refs().len() as u32);
745        for cross_ref in entity.cross_refs() {
746            write_varu64(buf, cross_ref.source.raw());
747            write_varu64(buf, cross_ref.target.raw());
748            buf.push(cross_ref.ref_type.to_byte());
749            if format_version >= STORE_VERSION_V2 {
750                write_varu32(buf, cross_ref.target_collection.len() as u32);
751                buf.extend_from_slice(cross_ref.target_collection.as_bytes());
752                buf.extend_from_slice(&cross_ref.weight.to_le_bytes());
753                write_varu64(buf, cross_ref.created_at);
754            }
755        }
756
757        // Sequence ID
758        write_varu64(buf, entity.sequence_id);
759
760        if format_version >= STORE_VERSION_V4 {
761            if let EntityData::QueueMessage(message) = &entity.data {
762                buf.push(u8::from(message.priority.is_some()));
763                if let Some(priority) = message.priority {
764                    buf.extend_from_slice(&priority.to_le_bytes());
765                }
766                write_varu32(buf, message.max_attempts);
767                buf.push(u8::from(message.acked));
768            }
769        }
770
771        if format_version >= STORE_VERSION_V8 {
772            buf.push(u8::from(entity.has_explicit_logical_id()));
773            if entity.has_explicit_logical_id() {
774                write_varu64(buf, entity.logical_id().raw());
775            }
776        }
777        if format_version >= STORE_VERSION_V9 {
778            write_varu64(buf, entity.xmin);
779            write_varu64(buf, entity.xmax);
780        }
781    }
782
783    /// Read a Value from binary buffer
784    /// Type bytes: 0=Null, 1=Boolean, 2=Integer, 3=UnsignedInteger, 4=Float,
785    /// 5=Text, 6=Blob, 7=Timestamp, 8=Duration, 9=IpAddr, 10=MacAddr,
786    /// 11=Vector, 12=Json, 13=Uuid, 14=NodeRef, 15=EdgeRef, 16=VectorRef, 17=RowRef
787    fn read_value_binary(buf: &[u8], pos: &mut usize) -> Result<Value, Box<dyn std::error::Error>> {
788        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
789
790        let type_byte = buf[*pos];
791        *pos += 1;
792
793        Ok(match type_byte {
794            0 => Value::Null,
795            1 => {
796                let b = buf[*pos] != 0;
797                *pos += 1;
798                Value::Boolean(b)
799            }
800            2 => {
801                let val = i64::from_le_bytes([
802                    buf[*pos],
803                    buf[*pos + 1],
804                    buf[*pos + 2],
805                    buf[*pos + 3],
806                    buf[*pos + 4],
807                    buf[*pos + 5],
808                    buf[*pos + 6],
809                    buf[*pos + 7],
810                ]);
811                *pos += 8;
812                Value::Integer(val)
813            }
814            3 => {
815                let val = u64::from_le_bytes([
816                    buf[*pos],
817                    buf[*pos + 1],
818                    buf[*pos + 2],
819                    buf[*pos + 3],
820                    buf[*pos + 4],
821                    buf[*pos + 5],
822                    buf[*pos + 6],
823                    buf[*pos + 7],
824                ]);
825                *pos += 8;
826                Value::UnsignedInteger(val)
827            }
828            4 => {
829                let val = f64::from_le_bytes([
830                    buf[*pos],
831                    buf[*pos + 1],
832                    buf[*pos + 2],
833                    buf[*pos + 3],
834                    buf[*pos + 4],
835                    buf[*pos + 5],
836                    buf[*pos + 6],
837                    buf[*pos + 7],
838                ]);
839                *pos += 8;
840                Value::Float(val)
841            }
842            5 => {
843                let len = Self::read_varu32_safe(buf, pos)?;
844                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
845                *pos += len;
846                Value::text(s)
847            }
848            6 => {
849                let len = Self::read_varu32_safe(buf, pos)?;
850                let bytes = buf[*pos..*pos + len].to_vec();
851                *pos += len;
852                Value::Blob(bytes)
853            }
854            7 => {
855                let val = i64::from_le_bytes([
856                    buf[*pos],
857                    buf[*pos + 1],
858                    buf[*pos + 2],
859                    buf[*pos + 3],
860                    buf[*pos + 4],
861                    buf[*pos + 5],
862                    buf[*pos + 6],
863                    buf[*pos + 7],
864                ]);
865                *pos += 8;
866                Value::Timestamp(val)
867            }
868            8 => {
869                let val = i64::from_le_bytes([
870                    buf[*pos],
871                    buf[*pos + 1],
872                    buf[*pos + 2],
873                    buf[*pos + 3],
874                    buf[*pos + 4],
875                    buf[*pos + 5],
876                    buf[*pos + 6],
877                    buf[*pos + 7],
878                ]);
879                *pos += 8;
880                Value::Duration(val)
881            }
882            9 => {
883                // IpAddr: first byte = version (4 or 6)
884                let version = buf[*pos];
885                *pos += 1;
886                if version == 4 {
887                    let octets = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
888                    *pos += 4;
889                    Value::IpAddr(IpAddr::V4(Ipv4Addr::from(octets)))
890                } else {
891                    let mut octets = [0u8; 16];
892                    octets.copy_from_slice(&buf[*pos..*pos + 16]);
893                    *pos += 16;
894                    Value::IpAddr(IpAddr::V6(Ipv6Addr::from(octets)))
895                }
896            }
897            10 => {
898                let mut mac = [0u8; 6];
899                mac.copy_from_slice(&buf[*pos..*pos + 6]);
900                *pos += 6;
901                Value::MacAddr(mac)
902            }
903            11 => {
904                let len = Self::read_varu32_safe(buf, pos)?;
905                let mut vector = Vec::with_capacity(len);
906                for _ in 0..len {
907                    let bytes = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
908                    *pos += 4;
909                    vector.push(f32::from_le_bytes(bytes));
910                }
911                Value::Vector(vector)
912            }
913            12 => {
914                let len = Self::read_varu32_safe(buf, pos)?;
915                let bytes = buf[*pos..*pos + len].to_vec();
916                *pos += len;
917                Value::Json(bytes)
918            }
919            13 => {
920                let mut uuid = [0u8; 16];
921                uuid.copy_from_slice(&buf[*pos..*pos + 16]);
922                *pos += 16;
923                Value::Uuid(uuid)
924            }
925            14 => {
926                let len = Self::read_varu32_safe(buf, pos)?;
927                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
928                *pos += len;
929                Value::NodeRef(s)
930            }
931            15 => {
932                let len = Self::read_varu32_safe(buf, pos)?;
933                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
934                *pos += len;
935                Value::EdgeRef(s)
936            }
937            16 => {
938                let len = Self::read_varu32_safe(buf, pos)?;
939                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
940                *pos += len;
941                let id = u64::from_le_bytes([
942                    buf[*pos],
943                    buf[*pos + 1],
944                    buf[*pos + 2],
945                    buf[*pos + 3],
946                    buf[*pos + 4],
947                    buf[*pos + 5],
948                    buf[*pos + 6],
949                    buf[*pos + 7],
950                ]);
951                *pos += 8;
952                Value::VectorRef(s, id)
953            }
954            17 => {
955                let len = Self::read_varu32_safe(buf, pos)?;
956                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
957                *pos += len;
958                let id = u64::from_le_bytes([
959                    buf[*pos],
960                    buf[*pos + 1],
961                    buf[*pos + 2],
962                    buf[*pos + 3],
963                    buf[*pos + 4],
964                    buf[*pos + 5],
965                    buf[*pos + 6],
966                    buf[*pos + 7],
967                ]);
968                *pos += 8;
969                Value::RowRef(s, id)
970            }
971            18 => {
972                let rgb = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
973                *pos += 3;
974                Value::Color(rgb)
975            }
976            19 => {
977                let len = Self::read_varu32_safe(buf, pos)?;
978                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
979                *pos += len;
980                Value::Email(s)
981            }
982            20 => {
983                let len = Self::read_varu32_safe(buf, pos)?;
984                let s = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
985                *pos += len;
986                Value::Url(s)
987            }
988            21 => {
989                let val = u64::from_le_bytes([
990                    buf[*pos],
991                    buf[*pos + 1],
992                    buf[*pos + 2],
993                    buf[*pos + 3],
994                    buf[*pos + 4],
995                    buf[*pos + 5],
996                    buf[*pos + 6],
997                    buf[*pos + 7],
998                ]);
999                *pos += 8;
1000                Value::Phone(val)
1001            }
1002            22 => {
1003                let val =
1004                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1005                *pos += 4;
1006                Value::Semver(val)
1007            }
1008            23 => {
1009                let ip =
1010                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1011                *pos += 4;
1012                let prefix = buf[*pos];
1013                *pos += 1;
1014                Value::Cidr(ip, prefix)
1015            }
1016            24 => {
1017                let val =
1018                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1019                *pos += 4;
1020                Value::Date(val)
1021            }
1022            25 => {
1023                let val =
1024                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1025                *pos += 4;
1026                Value::Time(val)
1027            }
1028            26 => {
1029                let val = i64::from_le_bytes([
1030                    buf[*pos],
1031                    buf[*pos + 1],
1032                    buf[*pos + 2],
1033                    buf[*pos + 3],
1034                    buf[*pos + 4],
1035                    buf[*pos + 5],
1036                    buf[*pos + 6],
1037                    buf[*pos + 7],
1038                ]);
1039                *pos += 8;
1040                Value::Decimal(val)
1041            }
1042            27 => {
1043                let val = buf[*pos];
1044                *pos += 1;
1045                Value::EnumValue(val)
1046            }
1047            28 => {
1048                let len = Self::read_varu32_safe(buf, pos)?;
1049                let mut elems = Vec::with_capacity(len);
1050                for _ in 0..len {
1051                    elems.push(Self::read_value_binary(buf, pos)?);
1052                }
1053                Value::Array(elems)
1054            }
1055            29 => {
1056                let val = i64::from_le_bytes([
1057                    buf[*pos],
1058                    buf[*pos + 1],
1059                    buf[*pos + 2],
1060                    buf[*pos + 3],
1061                    buf[*pos + 4],
1062                    buf[*pos + 5],
1063                    buf[*pos + 6],
1064                    buf[*pos + 7],
1065                ]);
1066                *pos += 8;
1067                Value::TimestampMs(val)
1068            }
1069            30 => {
1070                let val =
1071                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1072                *pos += 4;
1073                Value::Ipv4(val)
1074            }
1075            31 => {
1076                let mut bytes = [0u8; 16];
1077                bytes.copy_from_slice(&buf[*pos..*pos + 16]);
1078                *pos += 16;
1079                Value::Ipv6(bytes)
1080            }
1081            32 => {
1082                let ip =
1083                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1084                *pos += 4;
1085                let mask =
1086                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1087                *pos += 4;
1088                Value::Subnet(ip, mask)
1089            }
1090            33 => {
1091                let val = u16::from_le_bytes([buf[*pos], buf[*pos + 1]]);
1092                *pos += 2;
1093                Value::Port(val)
1094            }
1095            34 => {
1096                let val =
1097                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1098                *pos += 4;
1099                Value::Latitude(val)
1100            }
1101            35 => {
1102                let val =
1103                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1104                *pos += 4;
1105                Value::Longitude(val)
1106            }
1107            36 => {
1108                let lat =
1109                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1110                *pos += 4;
1111                let lon =
1112                    i32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1113                *pos += 4;
1114                Value::GeoPoint(lat, lon)
1115            }
1116            37 => {
1117                let c = [buf[*pos], buf[*pos + 1]];
1118                *pos += 2;
1119                Value::Country2(c)
1120            }
1121            38 => {
1122                let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1123                *pos += 3;
1124                Value::Country3(c)
1125            }
1126            39 => {
1127                let c = [buf[*pos], buf[*pos + 1]];
1128                *pos += 2;
1129                Value::Lang2(c)
1130            }
1131            40 => {
1132                let c = [
1133                    buf[*pos],
1134                    buf[*pos + 1],
1135                    buf[*pos + 2],
1136                    buf[*pos + 3],
1137                    buf[*pos + 4],
1138                ];
1139                *pos += 5;
1140                Value::Lang5(c)
1141            }
1142            41 => {
1143                let c = [buf[*pos], buf[*pos + 1], buf[*pos + 2]];
1144                *pos += 3;
1145                Value::Currency(c)
1146            }
1147            42 => {
1148                let rgba = [buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]];
1149                *pos += 4;
1150                Value::ColorAlpha(rgba)
1151            }
1152            43 => {
1153                let val = i64::from_le_bytes([
1154                    buf[*pos],
1155                    buf[*pos + 1],
1156                    buf[*pos + 2],
1157                    buf[*pos + 3],
1158                    buf[*pos + 4],
1159                    buf[*pos + 5],
1160                    buf[*pos + 6],
1161                    buf[*pos + 7],
1162                ]);
1163                *pos += 8;
1164                Value::BigInt(val)
1165            }
1166            44 => {
1167                let col_len = Self::read_varu32_safe(buf, pos)?;
1168                let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1169                *pos += col_len;
1170                let key_len = Self::read_varu32_safe(buf, pos)?;
1171                let key = String::from_utf8(buf[*pos..*pos + key_len].to_vec())?;
1172                *pos += key_len;
1173                Value::KeyRef(col, key)
1174            }
1175            45 => {
1176                let col_len = Self::read_varu32_safe(buf, pos)?;
1177                let col = String::from_utf8(buf[*pos..*pos + col_len].to_vec())?;
1178                *pos += col_len;
1179                let id = u64::from_le_bytes([
1180                    buf[*pos],
1181                    buf[*pos + 1],
1182                    buf[*pos + 2],
1183                    buf[*pos + 3],
1184                    buf[*pos + 4],
1185                    buf[*pos + 5],
1186                    buf[*pos + 6],
1187                    buf[*pos + 7],
1188                ]);
1189                *pos += 8;
1190                Value::DocRef(col, id)
1191            }
1192            46 => {
1193                let len = Self::read_varu32_safe(buf, pos)?;
1194                let name = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1195                *pos += len;
1196                Value::TableRef(name)
1197            }
1198            47 => {
1199                let val =
1200                    u32::from_le_bytes([buf[*pos], buf[*pos + 1], buf[*pos + 2], buf[*pos + 3]]);
1201                *pos += 4;
1202                Value::PageRef(val)
1203            }
1204            48 => {
1205                let len = Self::read_varu32_safe(buf, pos)?;
1206                let bytes = buf[*pos..*pos + len].to_vec();
1207                *pos += len;
1208                Value::Secret(bytes)
1209            }
1210            49 => {
1211                let len = Self::read_varu32_safe(buf, pos)?;
1212                let hash = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1213                *pos += len;
1214                Value::Password(hash)
1215            }
1216            50 => {
1217                let len = Self::read_varu32_safe(buf, pos)?;
1218                let code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1219                *pos += len;
1220                Value::AssetCode(code)
1221            }
1222            51 => {
1223                let len = Self::read_varu32_safe(buf, pos)?;
1224                let asset_code = String::from_utf8(buf[*pos..*pos + len].to_vec())?;
1225                *pos += len;
1226                let scale = buf[*pos];
1227                *pos += 1;
1228                let minor_units = i64::from_le_bytes([
1229                    buf[*pos],
1230                    buf[*pos + 1],
1231                    buf[*pos + 2],
1232                    buf[*pos + 3],
1233                    buf[*pos + 4],
1234                    buf[*pos + 5],
1235                    buf[*pos + 6],
1236                    buf[*pos + 7],
1237                ]);
1238                *pos += 8;
1239                Value::Money {
1240                    asset_code,
1241                    minor_units,
1242                    scale,
1243                }
1244            }
1245            // C3 TOAST: compressed Text (0x85) and Blob (0x86).
1246            0x85 => {
1247                let orig_len = Self::read_varu32_safe(buf, pos)?;
1248                let comp_len = Self::read_varu32_safe(buf, pos)?;
1249                let compressed = &buf[*pos..*pos + comp_len];
1250                *pos += comp_len;
1251                let mut out = vec![0u8; orig_len];
1252                zstd::bulk::decompress_to_buffer(compressed, &mut out)
1253                    .map_err(|e| format!("C3 Text decompress: {e}"))?;
1254                Value::text(String::from_utf8(out).map_err(|e| format!("C3 Text UTF-8: {e}"))?)
1255            }
1256            0x86 => {
1257                let orig_len = Self::read_varu32_safe(buf, pos)?;
1258                let comp_len = Self::read_varu32_safe(buf, pos)?;
1259                let compressed = &buf[*pos..*pos + comp_len];
1260                *pos += comp_len;
1261                let mut out = vec![0u8; orig_len];
1262                zstd::bulk::decompress_to_buffer(compressed, &mut out)
1263                    .map_err(|e| format!("C3 Blob decompress: {e}"))?;
1264                Value::Blob(out)
1265            }
1266            _ => return Err(format!("Unknown Value type: {}", type_byte).into()),
1267        })
1268    }
1269
1270    /// Write a Value to binary buffer
1271    /// Type bytes: 0=Null, 1=Boolean, 2=Integer, 3=UnsignedInteger, 4=Float,
1272    /// 5=Text, 6=Blob, 7=Timestamp, 8=Duration, 9=IpAddr, 10=MacAddr,
1273    /// 11=Vector, 12=Json, 13=Uuid, 14=NodeRef, 15=EdgeRef, 16=VectorRef, 17=RowRef
1274    fn write_value_binary(buf: &mut Vec<u8>, value: &Value) {
1275        use std::net::IpAddr;
1276
1277        match value {
1278            Value::Null => buf.push(0),
1279            Value::Boolean(b) => {
1280                buf.push(1);
1281                buf.push(if *b { 1 } else { 0 });
1282            }
1283            Value::Integer(i) => {
1284                buf.push(2);
1285                buf.extend_from_slice(&i.to_le_bytes());
1286            }
1287            Value::UnsignedInteger(u) => {
1288                buf.push(3);
1289                buf.extend_from_slice(&u.to_le_bytes());
1290            }
1291            Value::Float(f) => {
1292                buf.push(4);
1293                buf.extend_from_slice(&f.to_le_bytes());
1294            }
1295            Value::Text(s) => {
1296                // C3 TOAST: compress Text values > 2 KiB with zstd.
1297                // Type 0x85 = compressed Text; type 5 = uncompressed (existing).
1298                // Layout for 0x85: [0x85][orig_len: varuint][comp_len: varuint][compressed bytes]
1299                const TEXT_COMPRESS_THRESHOLD: usize = 2048;
1300                if s.len() >= TEXT_COMPRESS_THRESHOLD {
1301                    if let Ok(compressed) = zstd::bulk::compress(s.as_bytes(), 3) {
1302                        if compressed.len() < s.len() {
1303                            buf.push(0x85); // compressed Text marker
1304                            write_varu32(buf, s.len() as u32); // orig_len (decompression hint)
1305                            write_varu32(buf, compressed.len() as u32);
1306                            buf.extend_from_slice(&compressed);
1307                            return; // written — skip uncompressed path
1308                        }
1309                    }
1310                }
1311                buf.push(5);
1312                write_varu32(buf, s.len() as u32);
1313                buf.extend_from_slice(s.as_bytes());
1314            }
1315            Value::Blob(bytes) => {
1316                // C3 TOAST: compress Blob values > 2 KiB with zstd.
1317                // Type 0x86 = compressed Blob; type 6 = uncompressed (existing).
1318                const BLOB_COMPRESS_THRESHOLD: usize = 2048;
1319                if bytes.len() >= BLOB_COMPRESS_THRESHOLD {
1320                    if let Ok(compressed) = zstd::bulk::compress(bytes.as_slice(), 3) {
1321                        if compressed.len() < bytes.len() {
1322                            buf.push(0x86); // compressed Blob marker
1323                            write_varu32(buf, bytes.len() as u32);
1324                            write_varu32(buf, compressed.len() as u32);
1325                            buf.extend_from_slice(&compressed);
1326                            return;
1327                        }
1328                    }
1329                }
1330                buf.push(6);
1331                write_varu32(buf, bytes.len() as u32);
1332                buf.extend_from_slice(bytes);
1333            }
1334            Value::Timestamp(t) => {
1335                buf.push(7);
1336                buf.extend_from_slice(&t.to_le_bytes());
1337            }
1338            Value::Duration(d) => {
1339                buf.push(8);
1340                buf.extend_from_slice(&d.to_le_bytes());
1341            }
1342            Value::IpAddr(ip) => {
1343                buf.push(9);
1344                match ip {
1345                    IpAddr::V4(v4) => {
1346                        buf.push(4);
1347                        buf.extend_from_slice(&v4.octets());
1348                    }
1349                    IpAddr::V6(v6) => {
1350                        buf.push(6);
1351                        buf.extend_from_slice(&v6.octets());
1352                    }
1353                }
1354            }
1355            Value::MacAddr(mac) => {
1356                buf.push(10);
1357                buf.extend_from_slice(mac);
1358            }
1359            Value::Vector(vec) => {
1360                buf.push(11);
1361                write_varu32(buf, vec.len() as u32);
1362                for f in vec {
1363                    buf.extend_from_slice(&f.to_le_bytes());
1364                }
1365            }
1366            Value::Json(bytes) => {
1367                buf.push(12);
1368                write_varu32(buf, bytes.len() as u32);
1369                buf.extend_from_slice(bytes);
1370            }
1371            Value::Uuid(uuid) => {
1372                buf.push(13);
1373                buf.extend_from_slice(uuid);
1374            }
1375            Value::NodeRef(s) => {
1376                buf.push(14);
1377                write_varu32(buf, s.len() as u32);
1378                buf.extend_from_slice(s.as_bytes());
1379            }
1380            Value::EdgeRef(s) => {
1381                buf.push(15);
1382                write_varu32(buf, s.len() as u32);
1383                buf.extend_from_slice(s.as_bytes());
1384            }
1385            Value::VectorRef(s, id) => {
1386                buf.push(16);
1387                write_varu32(buf, s.len() as u32);
1388                buf.extend_from_slice(s.as_bytes());
1389                buf.extend_from_slice(&id.to_le_bytes());
1390            }
1391            Value::RowRef(s, id) => {
1392                buf.push(17);
1393                write_varu32(buf, s.len() as u32);
1394                buf.extend_from_slice(s.as_bytes());
1395                buf.extend_from_slice(&id.to_le_bytes());
1396            }
1397            Value::Color(rgb) => {
1398                buf.push(18);
1399                buf.extend_from_slice(rgb);
1400            }
1401            Value::Email(s) => {
1402                buf.push(19);
1403                write_varu32(buf, s.len() as u32);
1404                buf.extend_from_slice(s.as_bytes());
1405            }
1406            Value::Url(s) => {
1407                buf.push(20);
1408                write_varu32(buf, s.len() as u32);
1409                buf.extend_from_slice(s.as_bytes());
1410            }
1411            Value::Phone(n) => {
1412                buf.push(21);
1413                buf.extend_from_slice(&n.to_le_bytes());
1414            }
1415            Value::Semver(v) => {
1416                buf.push(22);
1417                buf.extend_from_slice(&v.to_le_bytes());
1418            }
1419            Value::Cidr(ip, prefix) => {
1420                buf.push(23);
1421                buf.extend_from_slice(&ip.to_le_bytes());
1422                buf.push(*prefix);
1423            }
1424            Value::Date(d) => {
1425                buf.push(24);
1426                buf.extend_from_slice(&d.to_le_bytes());
1427            }
1428            Value::Time(t) => {
1429                buf.push(25);
1430                buf.extend_from_slice(&t.to_le_bytes());
1431            }
1432            Value::Decimal(v) => {
1433                buf.push(26);
1434                buf.extend_from_slice(&v.to_le_bytes());
1435            }
1436            Value::EnumValue(i) => {
1437                buf.push(27);
1438                buf.push(*i);
1439            }
1440            Value::Array(elems) => {
1441                buf.push(28);
1442                write_varu32(buf, elems.len() as u32);
1443                for elem in elems {
1444                    Self::write_value_binary(buf, elem);
1445                }
1446            }
1447            Value::TimestampMs(v) => {
1448                buf.push(29);
1449                buf.extend_from_slice(&v.to_le_bytes());
1450            }
1451            Value::Ipv4(v) => {
1452                buf.push(30);
1453                buf.extend_from_slice(&v.to_le_bytes());
1454            }
1455            Value::Ipv6(bytes) => {
1456                buf.push(31);
1457                buf.extend_from_slice(bytes);
1458            }
1459            Value::Subnet(ip, mask) => {
1460                buf.push(32);
1461                buf.extend_from_slice(&ip.to_le_bytes());
1462                buf.extend_from_slice(&mask.to_le_bytes());
1463            }
1464            Value::Port(v) => {
1465                buf.push(33);
1466                buf.extend_from_slice(&v.to_le_bytes());
1467            }
1468            Value::Latitude(v) => {
1469                buf.push(34);
1470                buf.extend_from_slice(&v.to_le_bytes());
1471            }
1472            Value::Longitude(v) => {
1473                buf.push(35);
1474                buf.extend_from_slice(&v.to_le_bytes());
1475            }
1476            Value::GeoPoint(lat, lon) => {
1477                buf.push(36);
1478                buf.extend_from_slice(&lat.to_le_bytes());
1479                buf.extend_from_slice(&lon.to_le_bytes());
1480            }
1481            Value::Country2(c) => {
1482                buf.push(37);
1483                buf.extend_from_slice(c);
1484            }
1485            Value::Country3(c) => {
1486                buf.push(38);
1487                buf.extend_from_slice(c);
1488            }
1489            Value::Lang2(c) => {
1490                buf.push(39);
1491                buf.extend_from_slice(c);
1492            }
1493            Value::Lang5(c) => {
1494                buf.push(40);
1495                buf.extend_from_slice(c);
1496            }
1497            Value::Currency(c) => {
1498                buf.push(41);
1499                buf.extend_from_slice(c);
1500            }
1501            Value::AssetCode(code) => {
1502                buf.push(50);
1503                write_varu32(buf, code.len() as u32);
1504                buf.extend_from_slice(code.as_bytes());
1505            }
1506            Value::Money {
1507                asset_code,
1508                minor_units,
1509                scale,
1510            } => {
1511                buf.push(51);
1512                write_varu32(buf, asset_code.len() as u32);
1513                buf.extend_from_slice(asset_code.as_bytes());
1514                buf.push(*scale);
1515                buf.extend_from_slice(&minor_units.to_le_bytes());
1516            }
1517            Value::ColorAlpha(rgba) => {
1518                buf.push(42);
1519                buf.extend_from_slice(rgba);
1520            }
1521            Value::BigInt(v) => {
1522                buf.push(43);
1523                buf.extend_from_slice(&v.to_le_bytes());
1524            }
1525            Value::KeyRef(col, key) => {
1526                buf.push(44);
1527                write_varu32(buf, col.len() as u32);
1528                buf.extend_from_slice(col.as_bytes());
1529                write_varu32(buf, key.len() as u32);
1530                buf.extend_from_slice(key.as_bytes());
1531            }
1532            Value::DocRef(col, id) => {
1533                buf.push(45);
1534                write_varu32(buf, col.len() as u32);
1535                buf.extend_from_slice(col.as_bytes());
1536                buf.extend_from_slice(&id.to_le_bytes());
1537            }
1538            Value::TableRef(name) => {
1539                buf.push(46);
1540                write_varu32(buf, name.len() as u32);
1541                buf.extend_from_slice(name.as_bytes());
1542            }
1543            Value::PageRef(page_id) => {
1544                buf.push(47);
1545                buf.extend_from_slice(&page_id.to_le_bytes());
1546            }
1547            Value::Secret(bytes) => {
1548                buf.push(48);
1549                write_varu32(buf, bytes.len() as u32);
1550                buf.extend_from_slice(bytes);
1551            }
1552            Value::Password(hash) => {
1553                buf.push(49);
1554                write_varu32(buf, hash.len() as u32);
1555                buf.extend_from_slice(hash.as_bytes());
1556            }
1557        }
1558    }
1559}
1560
1561#[cfg(test)]
1562mod aux_metadata_dump_tests {
1563    use super::*;
1564
1565    #[test]
1566    fn aux_metadata_round_trips_through_binary_dump() {
1567        let store = UnifiedStore::with_config(UnifiedStoreConfig::default());
1568        store.create_collection("k").expect("create collection");
1569        store.set_aux_metadata(b"contract-blob".to_vec());
1570
1571        let bytes = store.to_binary_dump_bytes();
1572        let reloaded = UnifiedStore::load_from_bytes(&bytes).expect("reload dump");
1573
1574        assert_eq!(reloaded.aux_metadata(), b"contract-blob".to_vec());
1575    }
1576
1577    #[test]
1578    fn empty_aux_metadata_round_trips_as_empty() {
1579        let store = UnifiedStore::with_config(UnifiedStoreConfig::default());
1580        let bytes = store.to_binary_dump_bytes();
1581        let reloaded = UnifiedStore::load_from_bytes(&bytes).expect("reload dump");
1582
1583        assert!(reloaded.aux_metadata().is_empty());
1584    }
1585}