Skip to main content

reddb_file/
native_store.rs

1//! Native unified-store file contracts.
2//!
3//! The server owns the storage-engine semantics, but these magic bytes, format
4//! versions, and persisted summary shapes are file contracts. They live here so
5//! `.rdb` compatibility is governed by `reddb-file`.
6
7use std::collections::BTreeMap;
8use std::fs::{self, File};
9use std::io::{BufWriter, Write};
10use std::path::Path;
11
12use crate::embedded::{RdbFileError, RdbFileResult};
13use crate::physical_metadata::{ManifestEvent, ManifestEventKind};
14
15pub const STORE_MAGIC: &[u8; 4] = b"RDST";
16pub const STORE_VERSION_V1: u32 = 1;
17pub const STORE_VERSION_V2: u32 = 2;
18pub const STORE_VERSION_V3: u32 = 3;
19pub const STORE_VERSION_V4: u32 = 4;
20pub const STORE_VERSION_V5: u32 = 5;
21pub const STORE_VERSION_V6: u32 = 6;
22/// Entity records include metadata (`serialize_entity_record` format).
23pub const STORE_VERSION_V7: u32 = 7;
24/// Table rows may carry explicit logical identity.
25pub const STORE_VERSION_V8: u32 = 8;
26/// Entity records persist MVCC xmin/xmax.
27pub const STORE_VERSION_V9: u32 = 9;
28pub const STORE_VERSION_CURRENT: u32 = STORE_VERSION_V9;
29
30pub const METADATA_MAGIC: &[u8; 4] = b"RDM2";
31pub const METADATA_HEADER_BYTES: usize = 12;
32pub const NATIVE_COLLECTION_ROOTS_MAGIC: &[u8; 4] = b"RDRT";
33pub const NATIVE_MANIFEST_MAGIC: &[u8; 4] = b"RDMF";
34pub const NATIVE_REGISTRY_MAGIC: &[u8; 4] = b"RDRG";
35pub const NATIVE_RECOVERY_MAGIC: &[u8; 4] = b"RDRV";
36pub const NATIVE_CATALOG_MAGIC: &[u8; 4] = b"RDCL";
37pub const NATIVE_METADATA_STATE_MAGIC: &[u8; 4] = b"RDMS";
38pub const NATIVE_VECTOR_ARTIFACT_MAGIC: &[u8; 4] = b"RDVA";
39pub const NATIVE_BLOB_MAGIC: &[u8; 4] = b"RDBL";
40pub const NATIVE_MANIFEST_SAMPLE_LIMIT: usize = 16;
41pub const ENTITY_RECORD_MAGIC: &[u8; 4] = b"RER1";
42pub const METADATA_OVERFLOW_MAGIC: &[u8; 4] = b"RDM3";
43pub const METADATA_OVERFLOW_HEADER_BYTES: usize = 16;
44pub const METADATA_OVERFLOW_CONTINUATION_HEADER_BYTES: usize = 8;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub struct NativeEntityRecordFrame<'a> {
48    pub entity: &'a [u8],
49    pub metadata: &'a [u8],
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub struct NativeMetadataOverflowHeader {
54    pub format_version: u32,
55    pub total_payload_bytes: u32,
56    pub next_overflow_page_id: u32,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub struct NativeMetadataOverflowContinuationHeader {
61    pub next_overflow_page_id: u32,
62    pub chunk_bytes: u32,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub struct NativePagedMetadataHeader {
67    pub format_version: u32,
68    pub collection_count: u32,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct NativePagedCollectionRoot {
73    pub collection: String,
74    pub root_page: u32,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub struct NativePagedCrossRef {
79    pub source_id: u64,
80    pub target_id: u64,
81    pub ref_type: u8,
82    pub target_collection: String,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct NativeDumpCollectionHeader {
87    pub name: String,
88    pub entity_count: u32,
89}
90
91#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct NativeDumpCrossRef {
93    pub source_id: u64,
94    pub target_id: u64,
95    pub ref_type: u8,
96    pub target_collection: String,
97}
98
99pub fn native_store_magic_matches(bytes: &[u8]) -> bool {
100    bytes.len() >= STORE_MAGIC.len() && &bytes[..STORE_MAGIC.len()] == STORE_MAGIC
101}
102
103pub fn write_native_store_bytes_atomically(path: &Path, bytes: &[u8]) -> RdbFileResult<()> {
104    let tmp_path = crate::layout::temp_path(path);
105    {
106        let file = File::create(&tmp_path)?;
107        let mut writer = BufWriter::new(file);
108        writer.write_all(bytes)?;
109        writer.flush()?;
110        writer.get_ref().sync_all()?;
111    }
112
113    fs::rename(&tmp_path, path)?;
114    if let Some(parent) = path.parent() {
115        if let Ok(dir) = File::open(parent) {
116            let _ = dir.sync_all();
117        }
118    }
119    Ok(())
120}
121
122pub fn encode_native_entity_record_frame(entity: &[u8], metadata: Option<&[u8]>) -> Vec<u8> {
123    let metadata = metadata.unwrap_or(&[]);
124    let mut out = Vec::with_capacity(12 + entity.len() + metadata.len());
125    out.extend_from_slice(ENTITY_RECORD_MAGIC);
126    out.extend_from_slice(&(entity.len() as u32).to_le_bytes());
127    out.extend_from_slice(entity);
128    out.extend_from_slice(&(metadata.len() as u32).to_le_bytes());
129    out.extend_from_slice(metadata);
130    out
131}
132
133pub fn decode_native_entity_record_frame(
134    data: &[u8],
135) -> RdbFileResult<Option<NativeEntityRecordFrame<'_>>> {
136    if data.len() < 8 || &data[..4] != ENTITY_RECORD_MAGIC {
137        return Ok(None);
138    }
139
140    let mut pos = 4usize;
141    let entity_len =
142        read_native_u32(data, &mut pos, "truncated entity record entity length")? as usize;
143    let entity = read_native_bytes(
144        data,
145        &mut pos,
146        entity_len,
147        "truncated entity record payload",
148    )?;
149    let metadata_len =
150        read_native_u32(data, &mut pos, "truncated entity record metadata length")? as usize;
151    let metadata = read_native_bytes(
152        data,
153        &mut pos,
154        metadata_len,
155        "truncated entity record metadata",
156    )?;
157
158    Ok(Some(NativeEntityRecordFrame { entity, metadata }))
159}
160
161pub fn encode_native_metadata_overflow_header(
162    out: &mut [u8],
163    header: NativeMetadataOverflowHeader,
164) -> RdbFileResult<()> {
165    if out.len() < METADATA_OVERFLOW_HEADER_BYTES {
166        return Err(RdbFileError::InvalidOperation(
167            "metadata overflow header buffer too small".to_string(),
168        ));
169    }
170    out[0..4].copy_from_slice(METADATA_OVERFLOW_MAGIC);
171    out[4..8].copy_from_slice(&header.format_version.to_le_bytes());
172    out[8..12].copy_from_slice(&header.total_payload_bytes.to_le_bytes());
173    out[12..16].copy_from_slice(&header.next_overflow_page_id.to_le_bytes());
174    Ok(())
175}
176
177pub fn decode_native_metadata_overflow_header(
178    data: &[u8],
179) -> RdbFileResult<Option<NativeMetadataOverflowHeader>> {
180    if data.len() < 4 || &data[..4] != METADATA_OVERFLOW_MAGIC {
181        return Ok(None);
182    }
183    if data.len() < METADATA_OVERFLOW_HEADER_BYTES {
184        return Err(RdbFileError::InvalidOperation(
185            "truncated metadata overflow header".to_string(),
186        ));
187    }
188    Ok(Some(NativeMetadataOverflowHeader {
189        format_version: u32::from_le_bytes([data[4], data[5], data[6], data[7]]),
190        total_payload_bytes: u32::from_le_bytes([data[8], data[9], data[10], data[11]]),
191        next_overflow_page_id: u32::from_le_bytes([data[12], data[13], data[14], data[15]]),
192    }))
193}
194
195pub fn encode_native_metadata_overflow_continuation_header(
196    out: &mut [u8],
197    header: NativeMetadataOverflowContinuationHeader,
198) -> RdbFileResult<()> {
199    if out.len() < METADATA_OVERFLOW_CONTINUATION_HEADER_BYTES {
200        return Err(RdbFileError::InvalidOperation(
201            "metadata overflow continuation header buffer too small".to_string(),
202        ));
203    }
204    out[0..4].copy_from_slice(&header.next_overflow_page_id.to_le_bytes());
205    out[4..8].copy_from_slice(&header.chunk_bytes.to_le_bytes());
206    Ok(())
207}
208
209pub fn decode_native_metadata_overflow_continuation_header(
210    data: &[u8],
211) -> RdbFileResult<NativeMetadataOverflowContinuationHeader> {
212    if data.len() < METADATA_OVERFLOW_CONTINUATION_HEADER_BYTES {
213        return Err(RdbFileError::InvalidOperation(
214            "truncated metadata overflow continuation header".to_string(),
215        ));
216    }
217    Ok(NativeMetadataOverflowContinuationHeader {
218        next_overflow_page_id: u32::from_le_bytes([data[0], data[1], data[2], data[3]]),
219        chunk_bytes: u32::from_le_bytes([data[4], data[5], data[6], data[7]]),
220    })
221}
222
223pub fn encode_native_paged_metadata_header(out: &mut Vec<u8>, header: NativePagedMetadataHeader) {
224    out.extend_from_slice(METADATA_MAGIC);
225    out.extend_from_slice(&header.format_version.to_le_bytes());
226    out.extend_from_slice(&header.collection_count.to_le_bytes());
227}
228
229pub fn decode_native_paged_metadata_header(
230    data: &[u8],
231) -> RdbFileResult<Option<NativePagedMetadataHeader>> {
232    if data.len() < 4 || &data[..4] != METADATA_MAGIC {
233        return Ok(None);
234    }
235    if data.len() < METADATA_HEADER_BYTES {
236        return Err(RdbFileError::InvalidOperation(
237            "truncated native paged metadata header".to_string(),
238        ));
239    }
240    Ok(Some(NativePagedMetadataHeader {
241        format_version: u32::from_le_bytes([data[4], data[5], data[6], data[7]]),
242        collection_count: u32::from_le_bytes([data[8], data[9], data[10], data[11]]),
243    }))
244}
245
246pub fn encode_native_len_prefixed_bytes(out: &mut Vec<u8>, bytes: &[u8]) {
247    out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
248    out.extend_from_slice(bytes);
249}
250
251pub fn encode_native_len_prefixed_str(out: &mut Vec<u8>, value: &str) {
252    encode_native_len_prefixed_bytes(out, value.as_bytes());
253}
254
255pub fn decode_native_len_prefixed_bytes<'a>(
256    data: &'a [u8],
257    pos: &mut usize,
258) -> RdbFileResult<&'a [u8]> {
259    let len = read_native_u32(data, pos, "truncated native length-prefixed length")? as usize;
260    read_native_bytes(data, pos, len, "truncated native length-prefixed payload")
261}
262
263pub fn decode_native_len_prefixed_string(data: &[u8], pos: &mut usize) -> RdbFileResult<String> {
264    let bytes = decode_native_len_prefixed_bytes(data, pos)?;
265    String::from_utf8(bytes.to_vec()).map_err(|err| RdbFileError::InvalidOperation(err.to_string()))
266}
267
268pub fn encode_native_paged_collection_root(out: &mut Vec<u8>, collection: &str, root_page: u32) {
269    encode_native_len_prefixed_str(out, collection);
270    out.extend_from_slice(&root_page.to_le_bytes());
271}
272
273pub fn decode_native_paged_collection_root(
274    data: &[u8],
275    pos: &mut usize,
276) -> RdbFileResult<NativePagedCollectionRoot> {
277    let collection = decode_native_len_prefixed_string(data, pos)?;
278    let root_page = read_native_u32(data, pos, "truncated native paged collection root")?;
279    Ok(NativePagedCollectionRoot {
280        collection,
281        root_page,
282    })
283}
284
285pub fn encode_native_paged_cross_ref(
286    out: &mut Vec<u8>,
287    source_id: u64,
288    target_id: u64,
289    ref_type: u8,
290    target_collection: &str,
291) {
292    out.extend_from_slice(&source_id.to_le_bytes());
293    out.extend_from_slice(&target_id.to_le_bytes());
294    out.push(ref_type);
295    encode_native_len_prefixed_str(out, target_collection);
296}
297
298pub fn decode_native_paged_cross_ref(
299    data: &[u8],
300    pos: &mut usize,
301) -> RdbFileResult<NativePagedCrossRef> {
302    let source_id = read_native_u64(data, pos, "truncated native paged cross-ref source")?;
303    let target_id = read_native_u64(data, pos, "truncated native paged cross-ref target")?;
304    let ref_type = read_native_u8(data, pos, "truncated native paged cross-ref type")?;
305    let target_collection = decode_native_len_prefixed_string(data, pos)?;
306    Ok(NativePagedCrossRef {
307        source_id,
308        target_id,
309        ref_type,
310        target_collection,
311    })
312}
313
314pub fn encode_native_dump_count(out: &mut Vec<u8>, count: u32) {
315    write_native_varu32(out, count);
316}
317
318pub fn decode_native_dump_count(data: &[u8], pos: &mut usize) -> RdbFileResult<u32> {
319    read_native_varu32(data, pos, "truncated native dump count")
320}
321
322pub fn encode_native_dump_collection_header(out: &mut Vec<u8>, name: &str, entity_count: u32) {
323    write_native_varu32(out, name.len() as u32);
324    out.extend_from_slice(name.as_bytes());
325    write_native_varu32(out, entity_count);
326}
327
328pub fn decode_native_dump_collection_header(
329    data: &[u8],
330    pos: &mut usize,
331) -> RdbFileResult<NativeDumpCollectionHeader> {
332    let name_len =
333        read_native_varu32(data, pos, "truncated native dump collection name length")? as usize;
334    let name_bytes =
335        read_native_bytes(data, pos, name_len, "truncated native dump collection name")?;
336    let name = String::from_utf8(name_bytes.to_vec()).map_err(|err| {
337        RdbFileError::InvalidOperation(format!("invalid native dump collection name utf8: {err}"))
338    })?;
339    let entity_count =
340        read_native_varu32(data, pos, "truncated native dump collection entity count")?;
341    Ok(NativeDumpCollectionHeader { name, entity_count })
342}
343
344pub fn encode_native_dump_entity_record(out: &mut Vec<u8>, record: &[u8]) {
345    out.extend_from_slice(&(record.len() as u32).to_le_bytes());
346    out.extend_from_slice(record);
347}
348
349pub fn decode_native_dump_entity_record<'a>(
350    data: &'a [u8],
351    pos: &mut usize,
352) -> RdbFileResult<&'a [u8]> {
353    let record_len =
354        read_native_u32(data, pos, "truncated native dump entity record length")? as usize;
355    read_native_bytes(
356        data,
357        pos,
358        record_len,
359        "truncated native dump entity record payload",
360    )
361}
362
363pub fn encode_native_dump_cross_ref(
364    out: &mut Vec<u8>,
365    source_id: u64,
366    target_id: u64,
367    ref_type: u8,
368    target_collection: &str,
369) {
370    write_native_varu64(out, source_id);
371    write_native_varu64(out, target_id);
372    out.push(ref_type);
373    write_native_varu32(out, target_collection.len() as u32);
374    out.extend_from_slice(target_collection.as_bytes());
375}
376
377pub fn decode_native_dump_cross_ref(
378    data: &[u8],
379    pos: &mut usize,
380) -> RdbFileResult<NativeDumpCrossRef> {
381    let source_id = read_native_varu64(data, pos, "truncated native dump cross-ref source")?;
382    let target_id = read_native_varu64(data, pos, "truncated native dump cross-ref target")?;
383    let ref_type = read_native_u8(data, pos, "truncated native dump cross-ref type")?;
384    let collection_len = read_native_varu32(
385        data,
386        pos,
387        "truncated native dump cross-ref collection length",
388    )? as usize;
389    let collection_bytes = read_native_bytes(
390        data,
391        pos,
392        collection_len,
393        "truncated native dump cross-ref collection",
394    )?;
395    let target_collection = String::from_utf8(collection_bytes.to_vec()).map_err(|err| {
396        RdbFileError::InvalidOperation(format!(
397            "invalid native dump cross-ref collection utf8: {err}"
398        ))
399    })?;
400    Ok(NativeDumpCrossRef {
401        source_id,
402        target_id,
403        ref_type,
404        target_collection,
405    })
406}
407
408pub fn is_supported_store_version(version: u32) -> bool {
409    matches!(
410        version,
411        STORE_VERSION_V1
412            | STORE_VERSION_V2
413            | STORE_VERSION_V3
414            | STORE_VERSION_V4
415            | STORE_VERSION_V5
416            | STORE_VERSION_V6
417            | STORE_VERSION_V7
418            | STORE_VERSION_V8
419            | STORE_VERSION_V9
420    )
421}
422
423pub fn encode_native_store_header(version: u32) -> Vec<u8> {
424    let mut out = Vec::with_capacity(8);
425    out.extend_from_slice(STORE_MAGIC);
426    out.extend_from_slice(&version.to_le_bytes());
427    out
428}
429
430pub fn decode_native_store_header(bytes: &[u8]) -> RdbFileResult<u32> {
431    if bytes.len() < 8 {
432        return Err(RdbFileError::InvalidOperation("File too small".to_string()));
433    }
434    if &bytes[0..4] != STORE_MAGIC {
435        return Err(RdbFileError::InvalidOperation(
436            "Invalid magic bytes - expected RDST".to_string(),
437        ));
438    }
439    let version = u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
440    if !is_supported_store_version(version) {
441        return Err(RdbFileError::InvalidOperation(format!(
442            "Unsupported version: {version}"
443        )));
444    }
445    Ok(version)
446}
447
448pub fn append_native_store_crc32_footer(bytes: &mut Vec<u8>) {
449    let checksum = native_store_dump_crc32(bytes);
450    bytes.extend_from_slice(&checksum.to_le_bytes());
451}
452
453pub fn verify_native_store_crc32_footer(bytes: &mut Vec<u8>, version: u32) -> RdbFileResult<()> {
454    if version < STORE_VERSION_V3 {
455        return Ok(());
456    }
457    if bytes.len() < 12 {
458        return Err(RdbFileError::InvalidOperation(
459            "File too small for CRC32 verification".to_string(),
460        ));
461    }
462    let footer_at = bytes.len() - 4;
463    let stored_crc = u32::from_le_bytes([
464        bytes[footer_at],
465        bytes[footer_at + 1],
466        bytes[footer_at + 2],
467        bytes[footer_at + 3],
468    ]);
469    let computed_crc = native_store_dump_crc32(&bytes[..footer_at]);
470    if stored_crc != computed_crc {
471        return Err(RdbFileError::InvalidOperation(
472            "Binary store CRC32 mismatch — file corrupted".to_string(),
473        ));
474    }
475    bytes.truncate(footer_at);
476    Ok(())
477}
478
479fn native_store_dump_crc32(bytes: &[u8]) -> u32 {
480    let mut hasher = crc32fast::Hasher::new();
481    hasher.update(bytes);
482    hasher.finalize()
483}
484
485fn read_native_bytes<'a>(
486    data: &'a [u8],
487    pos: &mut usize,
488    len: usize,
489    err: &'static str,
490) -> RdbFileResult<&'a [u8]> {
491    let end = pos
492        .checked_add(len)
493        .ok_or_else(|| RdbFileError::InvalidOperation(err.to_string()))?;
494    if end > data.len() {
495        return Err(RdbFileError::InvalidOperation(err.to_string()));
496    }
497    let bytes = &data[*pos..end];
498    *pos = end;
499    Ok(bytes)
500}
501
502fn read_native_u32(data: &[u8], pos: &mut usize, err: &'static str) -> RdbFileResult<u32> {
503    let bytes = read_native_bytes(data, pos, 4, err)?;
504    let mut raw = [0u8; 4];
505    raw.copy_from_slice(bytes);
506    Ok(u32::from_le_bytes(raw))
507}
508
509fn read_native_u64(data: &[u8], pos: &mut usize, err: &'static str) -> RdbFileResult<u64> {
510    let bytes = read_native_bytes(data, pos, 8, err)?;
511    let mut raw = [0u8; 8];
512    raw.copy_from_slice(bytes);
513    Ok(u64::from_le_bytes(raw))
514}
515
516fn read_native_u8(data: &[u8], pos: &mut usize, err: &'static str) -> RdbFileResult<u8> {
517    let bytes = read_native_bytes(data, pos, 1, err)?;
518    Ok(bytes[0])
519}
520
521fn write_native_varu32(out: &mut Vec<u8>, mut value: u32) {
522    while value >= 0x80 {
523        out.push((value as u8) | 0x80);
524        value >>= 7;
525    }
526    out.push(value as u8);
527}
528
529fn write_native_varu64(out: &mut Vec<u8>, mut value: u64) {
530    while value >= 0x80 {
531        out.push((value as u8) | 0x80);
532        value >>= 7;
533    }
534    out.push(value as u8);
535}
536
537fn read_native_varu32(data: &[u8], pos: &mut usize, err: &'static str) -> RdbFileResult<u32> {
538    let mut result = 0u32;
539    let mut shift = 0u32;
540    for _ in 0..5 {
541        let byte = read_native_u8(data, pos, err)?;
542        result |= ((byte & 0x7f) as u32) << shift;
543        if byte & 0x80 == 0 {
544            return Ok(result);
545        }
546        shift += 7;
547    }
548    Err(RdbFileError::InvalidOperation(
549        "invalid native dump varu32".to_string(),
550    ))
551}
552
553fn read_native_varu64(data: &[u8], pos: &mut usize, err: &'static str) -> RdbFileResult<u64> {
554    let mut result = 0u64;
555    let mut shift = 0u32;
556    for _ in 0..10 {
557        let byte = read_native_u8(data, pos, err)?;
558        result |= ((byte & 0x7f) as u64) << shift;
559        if byte & 0x80 == 0 {
560            return Ok(result);
561        }
562        shift += 7;
563    }
564    Err(RdbFileError::InvalidOperation(
565        "invalid native dump varu64".to_string(),
566    ))
567}
568
569#[derive(Debug, Clone)]
570pub struct NativeManifestEntrySummary {
571    pub collection: String,
572    pub object_key: String,
573    pub kind: String,
574    pub block_index: u64,
575    pub block_checksum: u128,
576    pub snapshot_min: u64,
577    pub snapshot_max: Option<u64>,
578}
579
580#[derive(Debug, Clone)]
581pub struct NativeManifestSummary {
582    pub sequence: u64,
583    pub event_count: u32,
584    pub events_complete: bool,
585    pub omitted_event_count: u32,
586    pub recent_events: Vec<NativeManifestEntrySummary>,
587}
588
589#[derive(Debug, Clone, PartialEq, Eq)]
590pub struct NativeRegistryIndexSummary {
591    pub name: String,
592    pub kind: String,
593    pub collection: Option<String>,
594    pub enabled: bool,
595    pub entries: u64,
596    pub estimated_memory_bytes: u64,
597    pub last_refresh_ms: Option<u128>,
598    pub backend: String,
599}
600
601#[derive(Debug, Clone, PartialEq, Eq)]
602pub struct NativeRegistryProjectionSummary {
603    pub name: String,
604    pub source: String,
605    pub created_at_unix_ms: u128,
606    pub updated_at_unix_ms: u128,
607    pub node_labels: Vec<String>,
608    pub node_types: Vec<String>,
609    pub edge_labels: Vec<String>,
610    pub last_materialized_sequence: Option<u64>,
611}
612
613#[derive(Debug, Clone, PartialEq, Eq)]
614pub struct NativeRegistryJobSummary {
615    pub id: String,
616    pub kind: String,
617    pub projection: Option<String>,
618    pub state: String,
619    pub created_at_unix_ms: u128,
620    pub updated_at_unix_ms: u128,
621    pub last_run_sequence: Option<u64>,
622    pub metadata: BTreeMap<String, String>,
623}
624
625#[derive(Debug, Clone, PartialEq, Eq)]
626pub struct NativeVectorArtifactSummary {
627    pub collection: String,
628    pub artifact_kind: String,
629    pub vector_count: u64,
630    pub dimension: u32,
631    pub max_layer: u32,
632    pub serialized_bytes: u64,
633    pub checksum: u64,
634}
635
636#[derive(Debug, Clone, PartialEq, Eq)]
637pub struct NativeVectorArtifactPageSummary {
638    pub collection: String,
639    pub artifact_kind: String,
640    pub root_page: u32,
641    pub page_count: u32,
642    pub byte_len: u64,
643    pub checksum: u64,
644}
645
646#[derive(Debug, Clone, PartialEq, Eq)]
647pub struct NativeRegistrySummary {
648    pub collection_count: u32,
649    pub index_count: u32,
650    pub graph_projection_count: u32,
651    pub analytics_job_count: u32,
652    pub vector_artifact_count: u32,
653    pub collections_complete: bool,
654    pub indexes_complete: bool,
655    pub graph_projections_complete: bool,
656    pub analytics_jobs_complete: bool,
657    pub vector_artifacts_complete: bool,
658    pub omitted_collection_count: u32,
659    pub omitted_index_count: u32,
660    pub omitted_graph_projection_count: u32,
661    pub omitted_analytics_job_count: u32,
662    pub omitted_vector_artifact_count: u32,
663    pub collection_names: Vec<String>,
664    pub indexes: Vec<NativeRegistryIndexSummary>,
665    pub graph_projections: Vec<NativeRegistryProjectionSummary>,
666    pub analytics_jobs: Vec<NativeRegistryJobSummary>,
667    pub vector_artifacts: Vec<NativeVectorArtifactSummary>,
668}
669
670#[derive(Debug, Clone, PartialEq, Eq)]
671pub struct NativeSnapshotSummary {
672    pub snapshot_id: u64,
673    pub created_at_unix_ms: u128,
674    pub superblock_sequence: u64,
675    pub collection_count: u32,
676    pub total_entities: u64,
677}
678
679#[derive(Debug, Clone, PartialEq, Eq)]
680pub struct NativeExportSummary {
681    pub name: String,
682    pub created_at_unix_ms: u128,
683    pub snapshot_id: Option<u64>,
684    pub superblock_sequence: u64,
685    pub collection_count: u32,
686    pub total_entities: u64,
687}
688
689#[derive(Debug, Clone, PartialEq, Eq)]
690pub struct NativeRecoverySummary {
691    pub snapshot_count: u32,
692    pub export_count: u32,
693    pub snapshots_complete: bool,
694    pub exports_complete: bool,
695    pub omitted_snapshot_count: u32,
696    pub omitted_export_count: u32,
697    pub snapshots: Vec<NativeSnapshotSummary>,
698    pub exports: Vec<NativeExportSummary>,
699}
700
701#[derive(Debug, Clone, PartialEq, Eq)]
702pub struct NativeCatalogCollectionSummary {
703    pub name: String,
704    pub entities: u64,
705    pub cross_refs: u64,
706    pub segments: u32,
707}
708
709#[derive(Debug, Clone, PartialEq, Eq)]
710pub struct NativeCatalogSummary {
711    pub collection_count: u32,
712    pub total_entities: u64,
713    pub collections_complete: bool,
714    pub omitted_collection_count: u32,
715    pub collections: Vec<NativeCatalogCollectionSummary>,
716}
717
718#[derive(Debug, Clone, PartialEq, Eq)]
719pub struct NativeMetadataStateSummary {
720    pub protocol_version: String,
721    pub generated_at_unix_ms: u128,
722    pub last_loaded_from: Option<String>,
723    pub last_healed_at_unix_ms: Option<u128>,
724}
725
726pub fn native_store_page_checksum(data: &[u8]) -> u64 {
727    let mut hasher = crc32fast::Hasher::new();
728    hasher.update(data);
729    hasher.finalize() as u64
730}
731
732pub fn encode_native_collection_roots_page(roots: &BTreeMap<String, u64>) -> Vec<u8> {
733    let mut data = Vec::with_capacity(1024);
734    data.extend_from_slice(NATIVE_COLLECTION_ROOTS_MAGIC);
735    data.extend_from_slice(&(roots.len() as u32).to_le_bytes());
736    for (collection, root) in roots {
737        data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
738        data.extend_from_slice(collection.as_bytes());
739        data.extend_from_slice(&root.to_le_bytes());
740    }
741    data
742}
743
744pub fn decode_native_collection_roots_page(content: &[u8]) -> RdbFileResult<BTreeMap<String, u64>> {
745    if content.len() < 8 || &content[0..4] != NATIVE_COLLECTION_ROOTS_MAGIC {
746        return Err(RdbFileError::InvalidOperation(
747            "invalid native collection roots page".to_string(),
748        ));
749    }
750
751    let count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]) as usize;
752    let mut pos = 8usize;
753    let mut roots = BTreeMap::new();
754
755    for _ in 0..count {
756        if pos + 4 > content.len() {
757            break;
758        }
759        let name_len = u32::from_le_bytes([
760            content[pos],
761            content[pos + 1],
762            content[pos + 2],
763            content[pos + 3],
764        ]) as usize;
765        pos += 4;
766        if pos + name_len + 8 > content.len() {
767            break;
768        }
769        let name = String::from_utf8(content[pos..pos + name_len].to_vec())
770            .map_err(|err| RdbFileError::InvalidOperation(err.to_string()))?;
771        pos += name_len;
772        let root = u64::from_le_bytes([
773            content[pos],
774            content[pos + 1],
775            content[pos + 2],
776            content[pos + 3],
777            content[pos + 4],
778            content[pos + 5],
779            content[pos + 6],
780            content[pos + 7],
781        ]);
782        pos += 8;
783        roots.insert(name, root);
784    }
785
786    Ok(roots)
787}
788
789pub fn encode_native_manifest_summary_page(sequence: u64, events: &[ManifestEvent]) -> Vec<u8> {
790    let sample_start = events.len().saturating_sub(NATIVE_MANIFEST_SAMPLE_LIMIT);
791    let sample = &events[sample_start..];
792
793    let mut data = Vec::with_capacity(1024);
794    data.extend_from_slice(NATIVE_MANIFEST_MAGIC);
795    data.extend_from_slice(&sequence.to_le_bytes());
796    data.extend_from_slice(&(events.len() as u32).to_le_bytes());
797    data.push(u8::from(events.len() <= NATIVE_MANIFEST_SAMPLE_LIMIT));
798    data.extend_from_slice(&(events.len().saturating_sub(sample.len()) as u32).to_le_bytes());
799    data.extend_from_slice(&(sample.len() as u32).to_le_bytes());
800    for event in sample {
801        data.push(native_manifest_kind_to_byte(event.kind));
802        data.extend_from_slice(&(event.collection.len() as u16).to_le_bytes());
803        data.extend_from_slice(event.collection.as_bytes());
804        data.extend_from_slice(&(event.object_key.len() as u16).to_le_bytes());
805        data.extend_from_slice(event.object_key.as_bytes());
806        data.extend_from_slice(&event.block.index.to_le_bytes());
807        data.extend_from_slice(&event.block.checksum.to_le_bytes());
808        data.extend_from_slice(&event.snapshot_min.to_le_bytes());
809        match event.snapshot_max {
810            Some(value) => {
811                data.push(1);
812                data.extend_from_slice(&value.to_le_bytes());
813            }
814            None => data.push(0),
815        }
816    }
817
818    data
819}
820
821pub fn decode_native_manifest_summary_page(content: &[u8]) -> RdbFileResult<NativeManifestSummary> {
822    if content.len() < 25 || &content[0..4] != NATIVE_MANIFEST_MAGIC {
823        return Err(RdbFileError::InvalidOperation(
824            "invalid native manifest summary page".to_string(),
825        ));
826    }
827
828    let sequence = u64::from_le_bytes([
829        content[4],
830        content[5],
831        content[6],
832        content[7],
833        content[8],
834        content[9],
835        content[10],
836        content[11],
837    ]);
838    let event_count = u32::from_le_bytes([content[12], content[13], content[14], content[15]]);
839    let events_complete = content[16] == 1;
840    let omitted_event_count =
841        u32::from_le_bytes([content[17], content[18], content[19], content[20]]);
842    let sample_count =
843        u32::from_le_bytes([content[21], content[22], content[23], content[24]]) as usize;
844
845    let mut pos = 25usize;
846    let mut recent_events = Vec::with_capacity(sample_count);
847    for _ in 0..sample_count {
848        if pos + 1 + 2 > content.len() {
849            break;
850        }
851        let kind = native_manifest_kind_from_byte(content[pos]).to_string();
852        pos += 1;
853        let collection_len = u16::from_le_bytes([content[pos], content[pos + 1]]) as usize;
854        pos += 2;
855        if pos + collection_len + 2 > content.len() {
856            break;
857        }
858        let collection = String::from_utf8(content[pos..pos + collection_len].to_vec())
859            .map_err(|err| RdbFileError::InvalidOperation(err.to_string()))?;
860        pos += collection_len;
861        let object_key_len = u16::from_le_bytes([content[pos], content[pos + 1]]) as usize;
862        pos += 2;
863        if pos + object_key_len + 8 + 16 + 8 + 1 > content.len() {
864            break;
865        }
866        let object_key = String::from_utf8(content[pos..pos + object_key_len].to_vec())
867            .map_err(|err| RdbFileError::InvalidOperation(err.to_string()))?;
868        pos += object_key_len;
869        let block_index = u64::from_le_bytes([
870            content[pos],
871            content[pos + 1],
872            content[pos + 2],
873            content[pos + 3],
874            content[pos + 4],
875            content[pos + 5],
876            content[pos + 6],
877            content[pos + 7],
878        ]);
879        pos += 8;
880        let mut checksum_bytes = [0u8; 16];
881        checksum_bytes.copy_from_slice(&content[pos..pos + 16]);
882        pos += 16;
883        let snapshot_min = u64::from_le_bytes([
884            content[pos],
885            content[pos + 1],
886            content[pos + 2],
887            content[pos + 3],
888            content[pos + 4],
889            content[pos + 5],
890            content[pos + 6],
891            content[pos + 7],
892        ]);
893        pos += 8;
894        let snapshot_max = match content.get(pos).copied() {
895            Some(1) => {
896                pos += 1;
897                if pos + 8 > content.len() {
898                    return Err(RdbFileError::InvalidOperation(
899                        "truncated native manifest snapshot_max".to_string(),
900                    ));
901                }
902                let value = u64::from_le_bytes([
903                    content[pos],
904                    content[pos + 1],
905                    content[pos + 2],
906                    content[pos + 3],
907                    content[pos + 4],
908                    content[pos + 5],
909                    content[pos + 6],
910                    content[pos + 7],
911                ]);
912                pos += 8;
913                Some(value)
914            }
915            Some(_) => {
916                pos += 1;
917                None
918            }
919            None => None,
920        };
921
922        recent_events.push(NativeManifestEntrySummary {
923            collection,
924            object_key,
925            kind,
926            block_index,
927            block_checksum: u128::from_le_bytes(checksum_bytes),
928            snapshot_min,
929            snapshot_max,
930        });
931    }
932
933    Ok(NativeManifestSummary {
934        sequence,
935        event_count,
936        events_complete,
937        omitted_event_count,
938        recent_events,
939    })
940}
941
942pub fn encode_native_registry_summary_page(summary: &NativeRegistrySummary) -> Vec<u8> {
943    let mut data = Vec::with_capacity(2048);
944    data.extend_from_slice(NATIVE_REGISTRY_MAGIC);
945    data.extend_from_slice(&summary.collection_count.to_le_bytes());
946    data.extend_from_slice(&summary.index_count.to_le_bytes());
947    data.extend_from_slice(&summary.graph_projection_count.to_le_bytes());
948    data.extend_from_slice(&summary.analytics_job_count.to_le_bytes());
949    data.extend_from_slice(&summary.vector_artifact_count.to_le_bytes());
950    data.push(u8::from(summary.collections_complete));
951    data.push(u8::from(summary.indexes_complete));
952    data.push(u8::from(summary.graph_projections_complete));
953    data.push(u8::from(summary.analytics_jobs_complete));
954    data.push(u8::from(summary.vector_artifacts_complete));
955    data.extend_from_slice(&summary.omitted_collection_count.to_le_bytes());
956    data.extend_from_slice(&summary.omitted_index_count.to_le_bytes());
957    data.extend_from_slice(&summary.omitted_graph_projection_count.to_le_bytes());
958    data.extend_from_slice(&summary.omitted_analytics_job_count.to_le_bytes());
959    data.extend_from_slice(&summary.omitted_vector_artifact_count.to_le_bytes());
960    data.extend_from_slice(&(summary.collection_names.len() as u32).to_le_bytes());
961    data.extend_from_slice(&(summary.indexes.len() as u32).to_le_bytes());
962    data.extend_from_slice(&(summary.graph_projections.len() as u32).to_le_bytes());
963    data.extend_from_slice(&(summary.analytics_jobs.len() as u32).to_le_bytes());
964    data.extend_from_slice(&(summary.vector_artifacts.len() as u32).to_le_bytes());
965
966    for name in &summary.collection_names {
967        push_native_string(&mut data, name);
968    }
969    for index in &summary.indexes {
970        push_native_string(&mut data, &index.name);
971        push_native_string(&mut data, &index.kind);
972        match &index.collection {
973            Some(collection) => {
974                data.push(1);
975                push_native_string(&mut data, collection);
976            }
977            None => data.push(0),
978        }
979        data.push(u8::from(index.enabled));
980        data.extend_from_slice(&index.entries.to_le_bytes());
981        data.extend_from_slice(&index.estimated_memory_bytes.to_le_bytes());
982        match index.last_refresh_ms {
983            Some(value) => {
984                data.push(1);
985                data.extend_from_slice(&value.to_le_bytes());
986            }
987            None => data.push(0),
988        }
989        push_native_string(&mut data, &index.backend);
990    }
991    for projection in &summary.graph_projections {
992        push_native_string(&mut data, &projection.name);
993        push_native_string(&mut data, &projection.source);
994        data.extend_from_slice(&projection.created_at_unix_ms.to_le_bytes());
995        data.extend_from_slice(&projection.updated_at_unix_ms.to_le_bytes());
996        push_native_string_list(&mut data, &projection.node_labels);
997        push_native_string_list(&mut data, &projection.node_types);
998        push_native_string_list(&mut data, &projection.edge_labels);
999        match projection.last_materialized_sequence {
1000            Some(value) => {
1001                data.push(1);
1002                data.extend_from_slice(&value.to_le_bytes());
1003            }
1004            None => data.push(0),
1005        }
1006    }
1007    for job in &summary.analytics_jobs {
1008        push_native_string(&mut data, &job.id);
1009        push_native_string(&mut data, &job.kind);
1010        match &job.projection {
1011            Some(projection) => {
1012                data.push(1);
1013                push_native_string(&mut data, projection);
1014            }
1015            None => data.push(0),
1016        }
1017        push_native_string(&mut data, &job.state);
1018        data.extend_from_slice(&job.created_at_unix_ms.to_le_bytes());
1019        data.extend_from_slice(&job.updated_at_unix_ms.to_le_bytes());
1020        match job.last_run_sequence {
1021            Some(value) => {
1022                data.push(1);
1023                data.extend_from_slice(&value.to_le_bytes());
1024            }
1025            None => data.push(0),
1026        }
1027        let metadata_count = job.metadata.len().min(u16::MAX as usize) as u16;
1028        data.extend_from_slice(&metadata_count.to_le_bytes());
1029        for (key, value) in job.metadata.iter().take(metadata_count as usize) {
1030            push_native_string(&mut data, key);
1031            push_native_string(&mut data, value);
1032        }
1033    }
1034    for artifact in &summary.vector_artifacts {
1035        push_native_string(&mut data, &artifact.collection);
1036        push_native_string(&mut data, &artifact.artifact_kind);
1037        data.extend_from_slice(&artifact.vector_count.to_le_bytes());
1038        data.extend_from_slice(&artifact.dimension.to_le_bytes());
1039        data.extend_from_slice(&artifact.max_layer.to_le_bytes());
1040        data.extend_from_slice(&artifact.serialized_bytes.to_le_bytes());
1041        data.extend_from_slice(&artifact.checksum.to_le_bytes());
1042    }
1043
1044    data
1045}
1046
1047pub fn decode_native_registry_summary_page(content: &[u8]) -> RdbFileResult<NativeRegistrySummary> {
1048    if content.len() < 77 || &content[0..4] != NATIVE_REGISTRY_MAGIC {
1049        return Err(RdbFileError::InvalidOperation(
1050            "invalid native registry summary page".to_string(),
1051        ));
1052    }
1053
1054    let collection_count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
1055    let index_count = u32::from_le_bytes([content[8], content[9], content[10], content[11]]);
1056    let graph_projection_count =
1057        u32::from_le_bytes([content[12], content[13], content[14], content[15]]);
1058    let analytics_job_count =
1059        u32::from_le_bytes([content[16], content[17], content[18], content[19]]);
1060    let vector_artifact_count =
1061        u32::from_le_bytes([content[20], content[21], content[22], content[23]]);
1062    let collections_complete = content[24] == 1;
1063    let indexes_complete = content[25] == 1;
1064    let graph_projections_complete = content[26] == 1;
1065    let analytics_jobs_complete = content[27] == 1;
1066    let vector_artifacts_complete = content[28] == 1;
1067    let omitted_collection_count =
1068        u32::from_le_bytes([content[29], content[30], content[31], content[32]]);
1069    let omitted_index_count =
1070        u32::from_le_bytes([content[33], content[34], content[35], content[36]]);
1071    let omitted_graph_projection_count =
1072        u32::from_le_bytes([content[37], content[38], content[39], content[40]]);
1073    let omitted_analytics_job_count =
1074        u32::from_le_bytes([content[41], content[42], content[43], content[44]]);
1075    let omitted_vector_artifact_count =
1076        u32::from_le_bytes([content[45], content[46], content[47], content[48]]);
1077    let collection_sample_count =
1078        u32::from_le_bytes([content[49], content[50], content[51], content[52]]) as usize;
1079    let index_sample_count =
1080        u32::from_le_bytes([content[53], content[54], content[55], content[56]]) as usize;
1081    let projection_sample_count =
1082        u32::from_le_bytes([content[57], content[58], content[59], content[60]]) as usize;
1083    let job_sample_count =
1084        u32::from_le_bytes([content[61], content[62], content[63], content[64]]) as usize;
1085    let vector_artifact_sample_count =
1086        u32::from_le_bytes([content[65], content[66], content[67], content[68]]) as usize;
1087
1088    let mut pos = 69usize;
1089    let mut collection_names = Vec::with_capacity(collection_sample_count);
1090    for _ in 0..collection_sample_count {
1091        collection_names.push(read_native_string(content, &mut pos)?);
1092    }
1093
1094    let mut indexes = Vec::with_capacity(index_sample_count);
1095    for _ in 0..index_sample_count {
1096        let name = read_native_string(content, &mut pos)?;
1097        let kind = read_native_string(content, &mut pos)?;
1098        let collection = read_flagged_string(content, &mut pos)?;
1099        let enabled = content.get(pos).copied().unwrap_or(0) == 1;
1100        pos = pos.saturating_add(1);
1101        if pos + 16 > content.len() {
1102            break;
1103        }
1104        let entries = read_u64(content, &mut pos);
1105        let estimated_memory_bytes = read_u64(content, &mut pos);
1106        let last_refresh_ms =
1107            read_flagged_u128(content, &mut pos, "native registry refresh timestamp")?;
1108        let backend = read_native_string(content, &mut pos)?;
1109        indexes.push(NativeRegistryIndexSummary {
1110            name,
1111            kind,
1112            collection,
1113            enabled,
1114            entries,
1115            estimated_memory_bytes,
1116            last_refresh_ms,
1117            backend,
1118        });
1119    }
1120
1121    let mut graph_projections = Vec::with_capacity(projection_sample_count);
1122    for _ in 0..projection_sample_count {
1123        let name = read_native_string(content, &mut pos)?;
1124        let source = read_native_string(content, &mut pos)?;
1125        if pos + 32 > content.len() {
1126            break;
1127        }
1128        let created_at_unix_ms = read_u128(content, &mut pos);
1129        let updated_at_unix_ms = read_u128(content, &mut pos);
1130        let node_labels = read_native_string_list(content, &mut pos)?;
1131        let node_types = read_native_string_list(content, &mut pos)?;
1132        let edge_labels = read_native_string_list(content, &mut pos)?;
1133        let last_materialized_sequence = read_flagged_u64(
1134            content,
1135            &mut pos,
1136            "native projection materialization sequence",
1137        )?;
1138        graph_projections.push(NativeRegistryProjectionSummary {
1139            name,
1140            source,
1141            created_at_unix_ms,
1142            updated_at_unix_ms,
1143            node_labels,
1144            node_types,
1145            edge_labels,
1146            last_materialized_sequence,
1147        });
1148    }
1149
1150    let mut analytics_jobs = Vec::with_capacity(job_sample_count);
1151    for _ in 0..job_sample_count {
1152        let id = read_native_string(content, &mut pos)?;
1153        let kind = read_native_string(content, &mut pos)?;
1154        let projection = read_flagged_string(content, &mut pos)?;
1155        let state = read_native_string(content, &mut pos)?;
1156        if pos + 32 > content.len() {
1157            break;
1158        }
1159        let created_at_unix_ms = read_u128(content, &mut pos);
1160        let updated_at_unix_ms = read_u128(content, &mut pos);
1161        let last_run_sequence =
1162            read_flagged_u64(content, &mut pos, "native analytics job run sequence")?;
1163        if pos + 2 > content.len() {
1164            return Err(RdbFileError::InvalidOperation(
1165                "truncated native analytics job metadata count".to_string(),
1166            ));
1167        }
1168        let metadata_count = u16::from_le_bytes([content[pos], content[pos + 1]]) as usize;
1169        pos += 2;
1170        let mut metadata = BTreeMap::new();
1171        for _ in 0..metadata_count {
1172            let key = read_native_string(content, &mut pos)?;
1173            let value = read_native_string(content, &mut pos)?;
1174            metadata.insert(key, value);
1175        }
1176        analytics_jobs.push(NativeRegistryJobSummary {
1177            id,
1178            kind,
1179            projection,
1180            state,
1181            created_at_unix_ms,
1182            updated_at_unix_ms,
1183            last_run_sequence,
1184            metadata,
1185        });
1186    }
1187
1188    let mut vector_artifacts = Vec::with_capacity(vector_artifact_sample_count);
1189    for _ in 0..vector_artifact_sample_count {
1190        let collection = read_native_string(content, &mut pos)?;
1191        let artifact_kind = read_native_string(content, &mut pos)?;
1192        if pos + 32 > content.len() {
1193            break;
1194        }
1195        let vector_count = read_u64(content, &mut pos);
1196        let dimension = read_u32(content, &mut pos);
1197        let max_layer = read_u32(content, &mut pos);
1198        let serialized_bytes = read_u64(content, &mut pos);
1199        let checksum = read_u64(content, &mut pos);
1200        vector_artifacts.push(NativeVectorArtifactSummary {
1201            collection,
1202            artifact_kind,
1203            vector_count,
1204            dimension,
1205            max_layer,
1206            serialized_bytes,
1207            checksum,
1208        });
1209    }
1210
1211    Ok(NativeRegistrySummary {
1212        collection_count,
1213        index_count,
1214        graph_projection_count,
1215        analytics_job_count,
1216        vector_artifact_count,
1217        collections_complete,
1218        indexes_complete,
1219        graph_projections_complete,
1220        analytics_jobs_complete,
1221        vector_artifacts_complete,
1222        omitted_collection_count,
1223        omitted_index_count,
1224        omitted_graph_projection_count,
1225        omitted_analytics_job_count,
1226        omitted_vector_artifact_count,
1227        collection_names,
1228        indexes,
1229        graph_projections,
1230        analytics_jobs,
1231        vector_artifacts,
1232    })
1233}
1234
1235pub fn encode_native_recovery_summary_page(summary: &NativeRecoverySummary) -> Vec<u8> {
1236    let mut data = Vec::with_capacity(2048);
1237    data.extend_from_slice(NATIVE_RECOVERY_MAGIC);
1238    data.extend_from_slice(&summary.snapshot_count.to_le_bytes());
1239    data.extend_from_slice(&summary.export_count.to_le_bytes());
1240    data.push(u8::from(summary.snapshots_complete));
1241    data.push(u8::from(summary.exports_complete));
1242    data.extend_from_slice(&summary.omitted_snapshot_count.to_le_bytes());
1243    data.extend_from_slice(&summary.omitted_export_count.to_le_bytes());
1244    data.extend_from_slice(&(summary.snapshots.len() as u32).to_le_bytes());
1245    data.extend_from_slice(&(summary.exports.len() as u32).to_le_bytes());
1246
1247    for snapshot in &summary.snapshots {
1248        data.extend_from_slice(&snapshot.snapshot_id.to_le_bytes());
1249        data.extend_from_slice(&snapshot.created_at_unix_ms.to_le_bytes());
1250        data.extend_from_slice(&snapshot.superblock_sequence.to_le_bytes());
1251        data.extend_from_slice(&snapshot.collection_count.to_le_bytes());
1252        data.extend_from_slice(&snapshot.total_entities.to_le_bytes());
1253    }
1254
1255    for export in &summary.exports {
1256        push_native_string(&mut data, &export.name);
1257        data.extend_from_slice(&export.created_at_unix_ms.to_le_bytes());
1258        match export.snapshot_id {
1259            Some(snapshot_id) => {
1260                data.push(1);
1261                data.extend_from_slice(&snapshot_id.to_le_bytes());
1262            }
1263            None => data.push(0),
1264        }
1265        data.extend_from_slice(&export.superblock_sequence.to_le_bytes());
1266        data.extend_from_slice(&export.collection_count.to_le_bytes());
1267        data.extend_from_slice(&export.total_entities.to_le_bytes());
1268    }
1269
1270    data
1271}
1272
1273pub fn decode_native_recovery_summary_page(content: &[u8]) -> RdbFileResult<NativeRecoverySummary> {
1274    if content.len() < 30 || &content[0..4] != NATIVE_RECOVERY_MAGIC {
1275        return Err(RdbFileError::InvalidOperation(
1276            "invalid native recovery summary page".to_string(),
1277        ));
1278    }
1279
1280    let snapshot_count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
1281    let export_count = u32::from_le_bytes([content[8], content[9], content[10], content[11]]);
1282    let snapshots_complete = content[12] == 1;
1283    let exports_complete = content[13] == 1;
1284    let omitted_snapshot_count =
1285        u32::from_le_bytes([content[14], content[15], content[16], content[17]]);
1286    let omitted_export_count =
1287        u32::from_le_bytes([content[18], content[19], content[20], content[21]]);
1288    let snapshot_sample_count =
1289        u32::from_le_bytes([content[22], content[23], content[24], content[25]]) as usize;
1290    let export_sample_count =
1291        u32::from_le_bytes([content[26], content[27], content[28], content[29]]) as usize;
1292
1293    let mut pos = 30usize;
1294    let mut snapshots = Vec::with_capacity(snapshot_sample_count);
1295    for _ in 0..snapshot_sample_count {
1296        if pos + 44 > content.len() {
1297            break;
1298        }
1299        let snapshot_id = read_u64(content, &mut pos);
1300        let created_at_unix_ms = read_u128(content, &mut pos);
1301        let superblock_sequence = read_u64(content, &mut pos);
1302        let collection_count = read_u32(content, &mut pos);
1303        let total_entities = read_u64(content, &mut pos);
1304        snapshots.push(NativeSnapshotSummary {
1305            snapshot_id,
1306            created_at_unix_ms,
1307            superblock_sequence,
1308            collection_count,
1309            total_entities,
1310        });
1311    }
1312
1313    let mut exports = Vec::with_capacity(export_sample_count);
1314    for _ in 0..export_sample_count {
1315        let name = read_native_string(content, &mut pos)?;
1316        if pos + 16 > content.len() {
1317            break;
1318        }
1319        let created_at_unix_ms = read_u128(content, &mut pos);
1320        let snapshot_id = read_flagged_u64(content, &mut pos, "native export snapshot id")?;
1321        if pos + 20 > content.len() {
1322            break;
1323        }
1324        let superblock_sequence = read_u64(content, &mut pos);
1325        let collection_count = read_u32(content, &mut pos);
1326        let total_entities = read_u64(content, &mut pos);
1327        exports.push(NativeExportSummary {
1328            name,
1329            created_at_unix_ms,
1330            snapshot_id,
1331            superblock_sequence,
1332            collection_count,
1333            total_entities,
1334        });
1335    }
1336
1337    Ok(NativeRecoverySummary {
1338        snapshot_count,
1339        export_count,
1340        snapshots_complete,
1341        exports_complete,
1342        omitted_snapshot_count,
1343        omitted_export_count,
1344        snapshots,
1345        exports,
1346    })
1347}
1348
1349pub fn encode_native_catalog_summary_page(summary: &NativeCatalogSummary) -> Vec<u8> {
1350    let mut data = Vec::with_capacity(2048);
1351    data.extend_from_slice(NATIVE_CATALOG_MAGIC);
1352    data.extend_from_slice(&summary.collection_count.to_le_bytes());
1353    data.extend_from_slice(&summary.total_entities.to_le_bytes());
1354    data.push(u8::from(summary.collections_complete));
1355    data.extend_from_slice(&summary.omitted_collection_count.to_le_bytes());
1356    data.extend_from_slice(&(summary.collections.len() as u32).to_le_bytes());
1357    for collection in &summary.collections {
1358        push_native_string(&mut data, &collection.name);
1359        data.extend_from_slice(&collection.entities.to_le_bytes());
1360        data.extend_from_slice(&collection.cross_refs.to_le_bytes());
1361        data.extend_from_slice(&collection.segments.to_le_bytes());
1362    }
1363    data
1364}
1365
1366pub fn decode_native_catalog_summary_page(content: &[u8]) -> RdbFileResult<NativeCatalogSummary> {
1367    if content.len() < 25 || &content[0..4] != NATIVE_CATALOG_MAGIC {
1368        return Err(RdbFileError::InvalidOperation(
1369            "invalid native catalog summary page".to_string(),
1370        ));
1371    }
1372
1373    let collection_count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
1374    let mut pos = 8usize;
1375    let total_entities = read_u64(content, &mut pos);
1376    let collections_complete = content[16] == 1;
1377    let omitted_collection_count =
1378        u32::from_le_bytes([content[17], content[18], content[19], content[20]]);
1379    let sample_count =
1380        u32::from_le_bytes([content[21], content[22], content[23], content[24]]) as usize;
1381
1382    let mut pos = 25usize;
1383    let mut collections = Vec::with_capacity(sample_count);
1384    for _ in 0..sample_count {
1385        let name = read_native_string(content, &mut pos)?;
1386        if pos + 20 > content.len() {
1387            break;
1388        }
1389        let entities = read_u64(content, &mut pos);
1390        let cross_refs = read_u64(content, &mut pos);
1391        let segments = read_u32(content, &mut pos);
1392        collections.push(NativeCatalogCollectionSummary {
1393            name,
1394            entities,
1395            cross_refs,
1396            segments,
1397        });
1398    }
1399
1400    Ok(NativeCatalogSummary {
1401        collection_count,
1402        total_entities,
1403        collections_complete,
1404        omitted_collection_count,
1405        collections,
1406    })
1407}
1408
1409pub fn encode_native_metadata_state_summary_page(summary: &NativeMetadataStateSummary) -> Vec<u8> {
1410    let mut data = Vec::with_capacity(512);
1411    data.extend_from_slice(NATIVE_METADATA_STATE_MAGIC);
1412    push_native_string(&mut data, &summary.protocol_version);
1413    data.extend_from_slice(&summary.generated_at_unix_ms.to_le_bytes());
1414    match &summary.last_loaded_from {
1415        Some(value) => {
1416            data.push(1);
1417            push_native_string(&mut data, value);
1418        }
1419        None => data.push(0),
1420    }
1421    match summary.last_healed_at_unix_ms {
1422        Some(value) => {
1423            data.push(1);
1424            data.extend_from_slice(&value.to_le_bytes());
1425        }
1426        None => data.push(0),
1427    }
1428    data
1429}
1430
1431pub fn decode_native_metadata_state_summary_page(
1432    content: &[u8],
1433) -> RdbFileResult<NativeMetadataStateSummary> {
1434    if content.len() < 22 || &content[0..4] != NATIVE_METADATA_STATE_MAGIC {
1435        return Err(RdbFileError::InvalidOperation(
1436            "invalid native metadata state page".to_string(),
1437        ));
1438    }
1439
1440    let mut pos = 4usize;
1441    let protocol_version = read_native_string(content, &mut pos)?;
1442    if pos + 16 > content.len() {
1443        return Err(RdbFileError::InvalidOperation(
1444            "truncated native metadata state timestamp".to_string(),
1445        ));
1446    }
1447    let generated_at_unix_ms = read_u128(content, &mut pos);
1448    let last_loaded_from = read_flagged_string(content, &mut pos)?;
1449    let last_healed_at_unix_ms =
1450        read_flagged_u128(content, &mut pos, "native metadata heal timestamp")?;
1451
1452    Ok(NativeMetadataStateSummary {
1453        protocol_version,
1454        generated_at_unix_ms,
1455        last_loaded_from,
1456        last_healed_at_unix_ms,
1457    })
1458}
1459
1460pub const NATIVE_BLOB_PAGE_HEADER_BYTES: usize = 12;
1461
1462pub fn native_blob_chunk_capacity(page_size: usize, page_header_size: usize) -> usize {
1463    page_size - page_header_size - NATIVE_BLOB_PAGE_HEADER_BYTES
1464}
1465
1466pub fn encode_native_blob_page(next_page: u32, chunk: &[u8]) -> Vec<u8> {
1467    let mut data = Vec::with_capacity(chunk.len() + NATIVE_BLOB_PAGE_HEADER_BYTES);
1468    data.extend_from_slice(NATIVE_BLOB_MAGIC);
1469    data.extend_from_slice(&next_page.to_le_bytes());
1470    data.extend_from_slice(&(chunk.len() as u32).to_le_bytes());
1471    data.extend_from_slice(chunk);
1472    data
1473}
1474
1475pub fn decode_native_blob_page(content: &[u8]) -> RdbFileResult<(u32, Vec<u8>)> {
1476    if content.len() < NATIVE_BLOB_PAGE_HEADER_BYTES || &content[0..4] != NATIVE_BLOB_MAGIC {
1477        return Err(RdbFileError::InvalidOperation(
1478            "invalid native blob page".to_string(),
1479        ));
1480    }
1481    let next_page = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
1482    let chunk_len = u32::from_le_bytes([content[8], content[9], content[10], content[11]]) as usize;
1483    if NATIVE_BLOB_PAGE_HEADER_BYTES + chunk_len > content.len() {
1484        return Err(RdbFileError::InvalidOperation(
1485            "truncated native blob page".to_string(),
1486        ));
1487    }
1488    Ok((
1489        next_page,
1490        content[NATIVE_BLOB_PAGE_HEADER_BYTES..NATIVE_BLOB_PAGE_HEADER_BYTES + chunk_len].to_vec(),
1491    ))
1492}
1493
1494pub fn encode_native_vector_artifact_store_page(
1495    summaries: &[NativeVectorArtifactPageSummary],
1496) -> Vec<u8> {
1497    let mut data = Vec::with_capacity(1024 + summaries.len() * 64);
1498    data.extend_from_slice(NATIVE_VECTOR_ARTIFACT_MAGIC);
1499    data.extend_from_slice(&(summaries.len() as u32).to_le_bytes());
1500    for summary in summaries {
1501        push_native_string(&mut data, &summary.collection);
1502        push_native_string(&mut data, &summary.artifact_kind);
1503        data.extend_from_slice(&summary.root_page.to_le_bytes());
1504        data.extend_from_slice(&summary.page_count.to_le_bytes());
1505        data.extend_from_slice(&summary.byte_len.to_le_bytes());
1506        data.extend_from_slice(&summary.checksum.to_le_bytes());
1507    }
1508    data
1509}
1510
1511pub fn decode_native_vector_artifact_store_page(
1512    content: &[u8],
1513) -> RdbFileResult<Vec<NativeVectorArtifactPageSummary>> {
1514    if content.len() < 8 || &content[0..4] != NATIVE_VECTOR_ARTIFACT_MAGIC {
1515        return Err(RdbFileError::InvalidOperation(
1516            "invalid native vector artifact store page".to_string(),
1517        ));
1518    }
1519    let count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]) as usize;
1520    let mut pos = 8usize;
1521    let mut summaries = Vec::with_capacity(count);
1522    for _ in 0..count {
1523        let collection = read_native_string(content, &mut pos)?;
1524        let artifact_kind = read_native_string(content, &mut pos)?;
1525        if pos + 24 > content.len() {
1526            break;
1527        }
1528        let root_page = read_u32(content, &mut pos);
1529        let page_count = read_u32(content, &mut pos);
1530        let byte_len = read_u64(content, &mut pos);
1531        let checksum = read_u64(content, &mut pos);
1532        summaries.push(NativeVectorArtifactPageSummary {
1533            collection,
1534            artifact_kind,
1535            root_page,
1536            page_count,
1537            byte_len,
1538            checksum,
1539        });
1540    }
1541    Ok(summaries)
1542}
1543
1544fn native_manifest_kind_to_byte(kind: ManifestEventKind) -> u8 {
1545    match kind {
1546        ManifestEventKind::Insert => 1,
1547        ManifestEventKind::Update => 2,
1548        ManifestEventKind::Remove => 3,
1549        ManifestEventKind::Checkpoint => 4,
1550    }
1551}
1552
1553fn push_native_string(data: &mut Vec<u8>, value: &str) {
1554    let bytes = value.as_bytes();
1555    let len = bytes.len().min(u16::MAX as usize) as u16;
1556    data.extend_from_slice(&len.to_le_bytes());
1557    data.extend_from_slice(&bytes[..len as usize]);
1558}
1559
1560fn read_native_string(content: &[u8], pos: &mut usize) -> RdbFileResult<String> {
1561    if *pos + 2 > content.len() {
1562        return Err(RdbFileError::InvalidOperation(
1563            "truncated native string length".to_string(),
1564        ));
1565    }
1566    let len = u16::from_le_bytes([content[*pos], content[*pos + 1]]) as usize;
1567    *pos += 2;
1568    if *pos + len > content.len() {
1569        return Err(RdbFileError::InvalidOperation(
1570            "truncated native string payload".to_string(),
1571        ));
1572    }
1573    let value = String::from_utf8(content[*pos..*pos + len].to_vec())
1574        .map_err(|err| RdbFileError::InvalidOperation(err.to_string()))?;
1575    *pos += len;
1576    Ok(value)
1577}
1578
1579fn push_native_string_list(data: &mut Vec<u8>, values: &[String]) {
1580    let count = values.len().min(u16::MAX as usize) as u16;
1581    data.extend_from_slice(&count.to_le_bytes());
1582    for value in values.iter().take(count as usize) {
1583        push_native_string(data, value);
1584    }
1585}
1586
1587fn read_native_string_list(content: &[u8], pos: &mut usize) -> RdbFileResult<Vec<String>> {
1588    if *pos + 2 > content.len() {
1589        return Err(RdbFileError::InvalidOperation(
1590            "truncated native string list count".to_string(),
1591        ));
1592    }
1593    let count = u16::from_le_bytes([content[*pos], content[*pos + 1]]) as usize;
1594    *pos += 2;
1595    let mut values = Vec::with_capacity(count);
1596    for _ in 0..count {
1597        values.push(read_native_string(content, pos)?);
1598    }
1599    Ok(values)
1600}
1601
1602fn read_flagged_string(content: &[u8], pos: &mut usize) -> RdbFileResult<Option<String>> {
1603    match content.get(*pos).copied() {
1604        Some(1) => {
1605            *pos += 1;
1606            Ok(Some(read_native_string(content, pos)?))
1607        }
1608        Some(_) => {
1609            *pos += 1;
1610            Ok(None)
1611        }
1612        None => Ok(None),
1613    }
1614}
1615
1616fn read_flagged_u64(content: &[u8], pos: &mut usize, label: &str) -> RdbFileResult<Option<u64>> {
1617    match content.get(*pos).copied() {
1618        Some(1) => {
1619            *pos += 1;
1620            if *pos + 8 > content.len() {
1621                return Err(RdbFileError::InvalidOperation(format!("truncated {label}")));
1622            }
1623            Ok(Some(read_u64(content, pos)))
1624        }
1625        Some(_) => {
1626            *pos += 1;
1627            Ok(None)
1628        }
1629        None => Ok(None),
1630    }
1631}
1632
1633fn read_flagged_u128(content: &[u8], pos: &mut usize, label: &str) -> RdbFileResult<Option<u128>> {
1634    match content.get(*pos).copied() {
1635        Some(1) => {
1636            *pos += 1;
1637            if *pos + 16 > content.len() {
1638                return Err(RdbFileError::InvalidOperation(format!("truncated {label}")));
1639            }
1640            Ok(Some(read_u128(content, pos)))
1641        }
1642        Some(_) => {
1643            *pos += 1;
1644            Ok(None)
1645        }
1646        None => Ok(None),
1647    }
1648}
1649
1650fn read_u32(content: &[u8], pos: &mut usize) -> u32 {
1651    let value = u32::from_le_bytes([
1652        content[*pos],
1653        content[*pos + 1],
1654        content[*pos + 2],
1655        content[*pos + 3],
1656    ]);
1657    *pos += 4;
1658    value
1659}
1660
1661fn read_u64(content: &[u8], pos: &mut usize) -> u64 {
1662    let value = u64::from_le_bytes([
1663        content[*pos],
1664        content[*pos + 1],
1665        content[*pos + 2],
1666        content[*pos + 3],
1667        content[*pos + 4],
1668        content[*pos + 5],
1669        content[*pos + 6],
1670        content[*pos + 7],
1671    ]);
1672    *pos += 8;
1673    value
1674}
1675
1676fn read_u128(content: &[u8], pos: &mut usize) -> u128 {
1677    let mut bytes = [0u8; 16];
1678    bytes.copy_from_slice(&content[*pos..*pos + 16]);
1679    *pos += 16;
1680    u128::from_le_bytes(bytes)
1681}
1682
1683fn native_manifest_kind_from_byte(byte: u8) -> &'static str {
1684    match byte {
1685        1 => "insert",
1686        2 => "update",
1687        3 => "remove",
1688        4 => "checkpoint",
1689        _ => "unknown",
1690    }
1691}
1692
1693#[cfg(test)]
1694mod tests;