1use std::collections::BTreeMap;
8use std::fs::{self, File};
9use std::io::{BufWriter, Write};
10use std::path::Path;
11
12use crate::embedded::{RdbFileError, RdbFileResult};
13use crate::physical_metadata::{ManifestEvent, ManifestEventKind};
14
15pub const STORE_MAGIC: &[u8; 4] = b"RDST";
16pub const STORE_VERSION_V1: u32 = 1;
17pub const STORE_VERSION_V2: u32 = 2;
18pub const STORE_VERSION_V3: u32 = 3;
19pub const STORE_VERSION_V4: u32 = 4;
20pub const STORE_VERSION_V5: u32 = 5;
21pub const STORE_VERSION_V6: u32 = 6;
22pub const STORE_VERSION_V7: u32 = 7;
24pub const STORE_VERSION_V8: u32 = 8;
26pub const STORE_VERSION_V9: u32 = 9;
28pub const STORE_VERSION_CURRENT: u32 = STORE_VERSION_V9;
29
30pub const METADATA_MAGIC: &[u8; 4] = b"RDM2";
31pub const METADATA_HEADER_BYTES: usize = 12;
32pub const NATIVE_COLLECTION_ROOTS_MAGIC: &[u8; 4] = b"RDRT";
33pub const NATIVE_MANIFEST_MAGIC: &[u8; 4] = b"RDMF";
34pub const NATIVE_REGISTRY_MAGIC: &[u8; 4] = b"RDRG";
35pub const NATIVE_RECOVERY_MAGIC: &[u8; 4] = b"RDRV";
36pub const NATIVE_CATALOG_MAGIC: &[u8; 4] = b"RDCL";
37pub const NATIVE_METADATA_STATE_MAGIC: &[u8; 4] = b"RDMS";
38pub const NATIVE_VECTOR_ARTIFACT_MAGIC: &[u8; 4] = b"RDVA";
39pub const NATIVE_BLOB_MAGIC: &[u8; 4] = b"RDBL";
40pub const NATIVE_MANIFEST_SAMPLE_LIMIT: usize = 16;
41pub const ENTITY_RECORD_MAGIC: &[u8; 4] = b"RER1";
42pub const METADATA_OVERFLOW_MAGIC: &[u8; 4] = b"RDM3";
43pub const METADATA_OVERFLOW_HEADER_BYTES: usize = 16;
44pub const METADATA_OVERFLOW_CONTINUATION_HEADER_BYTES: usize = 8;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub struct NativeEntityRecordFrame<'a> {
48 pub entity: &'a [u8],
49 pub metadata: &'a [u8],
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub struct NativeMetadataOverflowHeader {
54 pub format_version: u32,
55 pub total_payload_bytes: u32,
56 pub next_overflow_page_id: u32,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub struct NativeMetadataOverflowContinuationHeader {
61 pub next_overflow_page_id: u32,
62 pub chunk_bytes: u32,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub struct NativePagedMetadataHeader {
67 pub format_version: u32,
68 pub collection_count: u32,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct NativePagedCollectionRoot {
73 pub collection: String,
74 pub root_page: u32,
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub struct NativePagedCrossRef {
79 pub source_id: u64,
80 pub target_id: u64,
81 pub ref_type: u8,
82 pub target_collection: String,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct NativeDumpCollectionHeader {
87 pub name: String,
88 pub entity_count: u32,
89}
90
91#[derive(Debug, Clone, PartialEq, Eq)]
92pub struct NativeDumpCrossRef {
93 pub source_id: u64,
94 pub target_id: u64,
95 pub ref_type: u8,
96 pub target_collection: String,
97}
98
99pub fn native_store_magic_matches(bytes: &[u8]) -> bool {
100 bytes.len() >= STORE_MAGIC.len() && &bytes[..STORE_MAGIC.len()] == STORE_MAGIC
101}
102
103pub fn write_native_store_bytes_atomically(path: &Path, bytes: &[u8]) -> RdbFileResult<()> {
104 let tmp_path = crate::layout::temp_path(path);
105 {
106 let file = File::create(&tmp_path)?;
107 let mut writer = BufWriter::new(file);
108 writer.write_all(bytes)?;
109 writer.flush()?;
110 writer.get_ref().sync_all()?;
111 }
112
113 fs::rename(&tmp_path, path)?;
114 if let Some(parent) = path.parent() {
115 if let Ok(dir) = File::open(parent) {
116 let _ = dir.sync_all();
117 }
118 }
119 Ok(())
120}
121
122pub fn encode_native_entity_record_frame(entity: &[u8], metadata: Option<&[u8]>) -> Vec<u8> {
123 let metadata = metadata.unwrap_or(&[]);
124 let mut out = Vec::with_capacity(12 + entity.len() + metadata.len());
125 out.extend_from_slice(ENTITY_RECORD_MAGIC);
126 out.extend_from_slice(&(entity.len() as u32).to_le_bytes());
127 out.extend_from_slice(entity);
128 out.extend_from_slice(&(metadata.len() as u32).to_le_bytes());
129 out.extend_from_slice(metadata);
130 out
131}
132
133pub fn decode_native_entity_record_frame(
134 data: &[u8],
135) -> RdbFileResult<Option<NativeEntityRecordFrame<'_>>> {
136 if data.len() < 8 || &data[..4] != ENTITY_RECORD_MAGIC {
137 return Ok(None);
138 }
139
140 let mut pos = 4usize;
141 let entity_len =
142 read_native_u32(data, &mut pos, "truncated entity record entity length")? as usize;
143 let entity = read_native_bytes(
144 data,
145 &mut pos,
146 entity_len,
147 "truncated entity record payload",
148 )?;
149 let metadata_len =
150 read_native_u32(data, &mut pos, "truncated entity record metadata length")? as usize;
151 let metadata = read_native_bytes(
152 data,
153 &mut pos,
154 metadata_len,
155 "truncated entity record metadata",
156 )?;
157
158 Ok(Some(NativeEntityRecordFrame { entity, metadata }))
159}
160
161pub fn encode_native_metadata_overflow_header(
162 out: &mut [u8],
163 header: NativeMetadataOverflowHeader,
164) -> RdbFileResult<()> {
165 if out.len() < METADATA_OVERFLOW_HEADER_BYTES {
166 return Err(RdbFileError::InvalidOperation(
167 "metadata overflow header buffer too small".to_string(),
168 ));
169 }
170 out[0..4].copy_from_slice(METADATA_OVERFLOW_MAGIC);
171 out[4..8].copy_from_slice(&header.format_version.to_le_bytes());
172 out[8..12].copy_from_slice(&header.total_payload_bytes.to_le_bytes());
173 out[12..16].copy_from_slice(&header.next_overflow_page_id.to_le_bytes());
174 Ok(())
175}
176
177pub fn decode_native_metadata_overflow_header(
178 data: &[u8],
179) -> RdbFileResult<Option<NativeMetadataOverflowHeader>> {
180 if data.len() < 4 || &data[..4] != METADATA_OVERFLOW_MAGIC {
181 return Ok(None);
182 }
183 if data.len() < METADATA_OVERFLOW_HEADER_BYTES {
184 return Err(RdbFileError::InvalidOperation(
185 "truncated metadata overflow header".to_string(),
186 ));
187 }
188 Ok(Some(NativeMetadataOverflowHeader {
189 format_version: u32::from_le_bytes([data[4], data[5], data[6], data[7]]),
190 total_payload_bytes: u32::from_le_bytes([data[8], data[9], data[10], data[11]]),
191 next_overflow_page_id: u32::from_le_bytes([data[12], data[13], data[14], data[15]]),
192 }))
193}
194
195pub fn encode_native_metadata_overflow_continuation_header(
196 out: &mut [u8],
197 header: NativeMetadataOverflowContinuationHeader,
198) -> RdbFileResult<()> {
199 if out.len() < METADATA_OVERFLOW_CONTINUATION_HEADER_BYTES {
200 return Err(RdbFileError::InvalidOperation(
201 "metadata overflow continuation header buffer too small".to_string(),
202 ));
203 }
204 out[0..4].copy_from_slice(&header.next_overflow_page_id.to_le_bytes());
205 out[4..8].copy_from_slice(&header.chunk_bytes.to_le_bytes());
206 Ok(())
207}
208
209pub fn decode_native_metadata_overflow_continuation_header(
210 data: &[u8],
211) -> RdbFileResult<NativeMetadataOverflowContinuationHeader> {
212 if data.len() < METADATA_OVERFLOW_CONTINUATION_HEADER_BYTES {
213 return Err(RdbFileError::InvalidOperation(
214 "truncated metadata overflow continuation header".to_string(),
215 ));
216 }
217 Ok(NativeMetadataOverflowContinuationHeader {
218 next_overflow_page_id: u32::from_le_bytes([data[0], data[1], data[2], data[3]]),
219 chunk_bytes: u32::from_le_bytes([data[4], data[5], data[6], data[7]]),
220 })
221}
222
223pub fn encode_native_paged_metadata_header(out: &mut Vec<u8>, header: NativePagedMetadataHeader) {
224 out.extend_from_slice(METADATA_MAGIC);
225 out.extend_from_slice(&header.format_version.to_le_bytes());
226 out.extend_from_slice(&header.collection_count.to_le_bytes());
227}
228
229pub fn decode_native_paged_metadata_header(
230 data: &[u8],
231) -> RdbFileResult<Option<NativePagedMetadataHeader>> {
232 if data.len() < 4 || &data[..4] != METADATA_MAGIC {
233 return Ok(None);
234 }
235 if data.len() < METADATA_HEADER_BYTES {
236 return Err(RdbFileError::InvalidOperation(
237 "truncated native paged metadata header".to_string(),
238 ));
239 }
240 Ok(Some(NativePagedMetadataHeader {
241 format_version: u32::from_le_bytes([data[4], data[5], data[6], data[7]]),
242 collection_count: u32::from_le_bytes([data[8], data[9], data[10], data[11]]),
243 }))
244}
245
246pub fn encode_native_len_prefixed_bytes(out: &mut Vec<u8>, bytes: &[u8]) {
247 out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
248 out.extend_from_slice(bytes);
249}
250
251pub fn encode_native_len_prefixed_str(out: &mut Vec<u8>, value: &str) {
252 encode_native_len_prefixed_bytes(out, value.as_bytes());
253}
254
255pub fn decode_native_len_prefixed_bytes<'a>(
256 data: &'a [u8],
257 pos: &mut usize,
258) -> RdbFileResult<&'a [u8]> {
259 let len = read_native_u32(data, pos, "truncated native length-prefixed length")? as usize;
260 read_native_bytes(data, pos, len, "truncated native length-prefixed payload")
261}
262
263pub fn decode_native_len_prefixed_string(data: &[u8], pos: &mut usize) -> RdbFileResult<String> {
264 let bytes = decode_native_len_prefixed_bytes(data, pos)?;
265 String::from_utf8(bytes.to_vec()).map_err(|err| RdbFileError::InvalidOperation(err.to_string()))
266}
267
268pub fn encode_native_paged_collection_root(out: &mut Vec<u8>, collection: &str, root_page: u32) {
269 encode_native_len_prefixed_str(out, collection);
270 out.extend_from_slice(&root_page.to_le_bytes());
271}
272
273pub fn decode_native_paged_collection_root(
274 data: &[u8],
275 pos: &mut usize,
276) -> RdbFileResult<NativePagedCollectionRoot> {
277 let collection = decode_native_len_prefixed_string(data, pos)?;
278 let root_page = read_native_u32(data, pos, "truncated native paged collection root")?;
279 Ok(NativePagedCollectionRoot {
280 collection,
281 root_page,
282 })
283}
284
285pub fn encode_native_paged_cross_ref(
286 out: &mut Vec<u8>,
287 source_id: u64,
288 target_id: u64,
289 ref_type: u8,
290 target_collection: &str,
291) {
292 out.extend_from_slice(&source_id.to_le_bytes());
293 out.extend_from_slice(&target_id.to_le_bytes());
294 out.push(ref_type);
295 encode_native_len_prefixed_str(out, target_collection);
296}
297
298pub fn decode_native_paged_cross_ref(
299 data: &[u8],
300 pos: &mut usize,
301) -> RdbFileResult<NativePagedCrossRef> {
302 let source_id = read_native_u64(data, pos, "truncated native paged cross-ref source")?;
303 let target_id = read_native_u64(data, pos, "truncated native paged cross-ref target")?;
304 let ref_type = read_native_u8(data, pos, "truncated native paged cross-ref type")?;
305 let target_collection = decode_native_len_prefixed_string(data, pos)?;
306 Ok(NativePagedCrossRef {
307 source_id,
308 target_id,
309 ref_type,
310 target_collection,
311 })
312}
313
314pub fn encode_native_dump_count(out: &mut Vec<u8>, count: u32) {
315 write_native_varu32(out, count);
316}
317
318pub fn decode_native_dump_count(data: &[u8], pos: &mut usize) -> RdbFileResult<u32> {
319 read_native_varu32(data, pos, "truncated native dump count")
320}
321
322pub fn encode_native_dump_collection_header(out: &mut Vec<u8>, name: &str, entity_count: u32) {
323 write_native_varu32(out, name.len() as u32);
324 out.extend_from_slice(name.as_bytes());
325 write_native_varu32(out, entity_count);
326}
327
328pub fn decode_native_dump_collection_header(
329 data: &[u8],
330 pos: &mut usize,
331) -> RdbFileResult<NativeDumpCollectionHeader> {
332 let name_len =
333 read_native_varu32(data, pos, "truncated native dump collection name length")? as usize;
334 let name_bytes =
335 read_native_bytes(data, pos, name_len, "truncated native dump collection name")?;
336 let name = String::from_utf8(name_bytes.to_vec()).map_err(|err| {
337 RdbFileError::InvalidOperation(format!("invalid native dump collection name utf8: {err}"))
338 })?;
339 let entity_count =
340 read_native_varu32(data, pos, "truncated native dump collection entity count")?;
341 Ok(NativeDumpCollectionHeader { name, entity_count })
342}
343
344pub fn encode_native_dump_entity_record(out: &mut Vec<u8>, record: &[u8]) {
345 out.extend_from_slice(&(record.len() as u32).to_le_bytes());
346 out.extend_from_slice(record);
347}
348
349pub fn decode_native_dump_entity_record<'a>(
350 data: &'a [u8],
351 pos: &mut usize,
352) -> RdbFileResult<&'a [u8]> {
353 let record_len =
354 read_native_u32(data, pos, "truncated native dump entity record length")? as usize;
355 read_native_bytes(
356 data,
357 pos,
358 record_len,
359 "truncated native dump entity record payload",
360 )
361}
362
363pub fn encode_native_dump_cross_ref(
364 out: &mut Vec<u8>,
365 source_id: u64,
366 target_id: u64,
367 ref_type: u8,
368 target_collection: &str,
369) {
370 write_native_varu64(out, source_id);
371 write_native_varu64(out, target_id);
372 out.push(ref_type);
373 write_native_varu32(out, target_collection.len() as u32);
374 out.extend_from_slice(target_collection.as_bytes());
375}
376
377pub fn decode_native_dump_cross_ref(
378 data: &[u8],
379 pos: &mut usize,
380) -> RdbFileResult<NativeDumpCrossRef> {
381 let source_id = read_native_varu64(data, pos, "truncated native dump cross-ref source")?;
382 let target_id = read_native_varu64(data, pos, "truncated native dump cross-ref target")?;
383 let ref_type = read_native_u8(data, pos, "truncated native dump cross-ref type")?;
384 let collection_len = read_native_varu32(
385 data,
386 pos,
387 "truncated native dump cross-ref collection length",
388 )? as usize;
389 let collection_bytes = read_native_bytes(
390 data,
391 pos,
392 collection_len,
393 "truncated native dump cross-ref collection",
394 )?;
395 let target_collection = String::from_utf8(collection_bytes.to_vec()).map_err(|err| {
396 RdbFileError::InvalidOperation(format!(
397 "invalid native dump cross-ref collection utf8: {err}"
398 ))
399 })?;
400 Ok(NativeDumpCrossRef {
401 source_id,
402 target_id,
403 ref_type,
404 target_collection,
405 })
406}
407
408pub fn is_supported_store_version(version: u32) -> bool {
409 matches!(
410 version,
411 STORE_VERSION_V1
412 | STORE_VERSION_V2
413 | STORE_VERSION_V3
414 | STORE_VERSION_V4
415 | STORE_VERSION_V5
416 | STORE_VERSION_V6
417 | STORE_VERSION_V7
418 | STORE_VERSION_V8
419 | STORE_VERSION_V9
420 )
421}
422
423pub fn encode_native_store_header(version: u32) -> Vec<u8> {
424 let mut out = Vec::with_capacity(8);
425 out.extend_from_slice(STORE_MAGIC);
426 out.extend_from_slice(&version.to_le_bytes());
427 out
428}
429
430pub fn decode_native_store_header(bytes: &[u8]) -> RdbFileResult<u32> {
431 if bytes.len() < 8 {
432 return Err(RdbFileError::InvalidOperation("File too small".to_string()));
433 }
434 if &bytes[0..4] != STORE_MAGIC {
435 return Err(RdbFileError::InvalidOperation(
436 "Invalid magic bytes - expected RDST".to_string(),
437 ));
438 }
439 let version = u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
440 if !is_supported_store_version(version) {
441 return Err(RdbFileError::InvalidOperation(format!(
442 "Unsupported version: {version}"
443 )));
444 }
445 Ok(version)
446}
447
448pub fn append_native_store_crc32_footer(bytes: &mut Vec<u8>) {
449 let checksum = native_store_dump_crc32(bytes);
450 bytes.extend_from_slice(&checksum.to_le_bytes());
451}
452
453pub fn verify_native_store_crc32_footer(bytes: &mut Vec<u8>, version: u32) -> RdbFileResult<()> {
454 if version < STORE_VERSION_V3 {
455 return Ok(());
456 }
457 if bytes.len() < 12 {
458 return Err(RdbFileError::InvalidOperation(
459 "File too small for CRC32 verification".to_string(),
460 ));
461 }
462 let footer_at = bytes.len() - 4;
463 let stored_crc = u32::from_le_bytes([
464 bytes[footer_at],
465 bytes[footer_at + 1],
466 bytes[footer_at + 2],
467 bytes[footer_at + 3],
468 ]);
469 let computed_crc = native_store_dump_crc32(&bytes[..footer_at]);
470 if stored_crc != computed_crc {
471 return Err(RdbFileError::InvalidOperation(
472 "Binary store CRC32 mismatch — file corrupted".to_string(),
473 ));
474 }
475 bytes.truncate(footer_at);
476 Ok(())
477}
478
479fn native_store_dump_crc32(bytes: &[u8]) -> u32 {
480 let mut hasher = crc32fast::Hasher::new();
481 hasher.update(bytes);
482 hasher.finalize()
483}
484
485fn read_native_bytes<'a>(
486 data: &'a [u8],
487 pos: &mut usize,
488 len: usize,
489 err: &'static str,
490) -> RdbFileResult<&'a [u8]> {
491 let end = pos
492 .checked_add(len)
493 .ok_or_else(|| RdbFileError::InvalidOperation(err.to_string()))?;
494 if end > data.len() {
495 return Err(RdbFileError::InvalidOperation(err.to_string()));
496 }
497 let bytes = &data[*pos..end];
498 *pos = end;
499 Ok(bytes)
500}
501
502fn read_native_u32(data: &[u8], pos: &mut usize, err: &'static str) -> RdbFileResult<u32> {
503 let bytes = read_native_bytes(data, pos, 4, err)?;
504 let mut raw = [0u8; 4];
505 raw.copy_from_slice(bytes);
506 Ok(u32::from_le_bytes(raw))
507}
508
509fn read_native_u64(data: &[u8], pos: &mut usize, err: &'static str) -> RdbFileResult<u64> {
510 let bytes = read_native_bytes(data, pos, 8, err)?;
511 let mut raw = [0u8; 8];
512 raw.copy_from_slice(bytes);
513 Ok(u64::from_le_bytes(raw))
514}
515
516fn read_native_u8(data: &[u8], pos: &mut usize, err: &'static str) -> RdbFileResult<u8> {
517 let bytes = read_native_bytes(data, pos, 1, err)?;
518 Ok(bytes[0])
519}
520
521fn write_native_varu32(out: &mut Vec<u8>, mut value: u32) {
522 while value >= 0x80 {
523 out.push((value as u8) | 0x80);
524 value >>= 7;
525 }
526 out.push(value as u8);
527}
528
529fn write_native_varu64(out: &mut Vec<u8>, mut value: u64) {
530 while value >= 0x80 {
531 out.push((value as u8) | 0x80);
532 value >>= 7;
533 }
534 out.push(value as u8);
535}
536
537fn read_native_varu32(data: &[u8], pos: &mut usize, err: &'static str) -> RdbFileResult<u32> {
538 let mut result = 0u32;
539 let mut shift = 0u32;
540 for _ in 0..5 {
541 let byte = read_native_u8(data, pos, err)?;
542 result |= ((byte & 0x7f) as u32) << shift;
543 if byte & 0x80 == 0 {
544 return Ok(result);
545 }
546 shift += 7;
547 }
548 Err(RdbFileError::InvalidOperation(
549 "invalid native dump varu32".to_string(),
550 ))
551}
552
553fn read_native_varu64(data: &[u8], pos: &mut usize, err: &'static str) -> RdbFileResult<u64> {
554 let mut result = 0u64;
555 let mut shift = 0u32;
556 for _ in 0..10 {
557 let byte = read_native_u8(data, pos, err)?;
558 result |= ((byte & 0x7f) as u64) << shift;
559 if byte & 0x80 == 0 {
560 return Ok(result);
561 }
562 shift += 7;
563 }
564 Err(RdbFileError::InvalidOperation(
565 "invalid native dump varu64".to_string(),
566 ))
567}
568
569#[derive(Debug, Clone)]
570pub struct NativeManifestEntrySummary {
571 pub collection: String,
572 pub object_key: String,
573 pub kind: String,
574 pub block_index: u64,
575 pub block_checksum: u128,
576 pub snapshot_min: u64,
577 pub snapshot_max: Option<u64>,
578}
579
580#[derive(Debug, Clone)]
581pub struct NativeManifestSummary {
582 pub sequence: u64,
583 pub event_count: u32,
584 pub events_complete: bool,
585 pub omitted_event_count: u32,
586 pub recent_events: Vec<NativeManifestEntrySummary>,
587}
588
589#[derive(Debug, Clone, PartialEq, Eq)]
590pub struct NativeRegistryIndexSummary {
591 pub name: String,
592 pub kind: String,
593 pub collection: Option<String>,
594 pub enabled: bool,
595 pub entries: u64,
596 pub estimated_memory_bytes: u64,
597 pub last_refresh_ms: Option<u128>,
598 pub backend: String,
599}
600
601#[derive(Debug, Clone, PartialEq, Eq)]
602pub struct NativeRegistryProjectionSummary {
603 pub name: String,
604 pub source: String,
605 pub created_at_unix_ms: u128,
606 pub updated_at_unix_ms: u128,
607 pub node_labels: Vec<String>,
608 pub node_types: Vec<String>,
609 pub edge_labels: Vec<String>,
610 pub last_materialized_sequence: Option<u64>,
611}
612
613#[derive(Debug, Clone, PartialEq, Eq)]
614pub struct NativeRegistryJobSummary {
615 pub id: String,
616 pub kind: String,
617 pub projection: Option<String>,
618 pub state: String,
619 pub created_at_unix_ms: u128,
620 pub updated_at_unix_ms: u128,
621 pub last_run_sequence: Option<u64>,
622 pub metadata: BTreeMap<String, String>,
623}
624
625#[derive(Debug, Clone, PartialEq, Eq)]
626pub struct NativeVectorArtifactSummary {
627 pub collection: String,
628 pub artifact_kind: String,
629 pub vector_count: u64,
630 pub dimension: u32,
631 pub max_layer: u32,
632 pub serialized_bytes: u64,
633 pub checksum: u64,
634}
635
636#[derive(Debug, Clone, PartialEq, Eq)]
637pub struct NativeVectorArtifactPageSummary {
638 pub collection: String,
639 pub artifact_kind: String,
640 pub root_page: u32,
641 pub page_count: u32,
642 pub byte_len: u64,
643 pub checksum: u64,
644}
645
646#[derive(Debug, Clone, PartialEq, Eq)]
647pub struct NativeRegistrySummary {
648 pub collection_count: u32,
649 pub index_count: u32,
650 pub graph_projection_count: u32,
651 pub analytics_job_count: u32,
652 pub vector_artifact_count: u32,
653 pub collections_complete: bool,
654 pub indexes_complete: bool,
655 pub graph_projections_complete: bool,
656 pub analytics_jobs_complete: bool,
657 pub vector_artifacts_complete: bool,
658 pub omitted_collection_count: u32,
659 pub omitted_index_count: u32,
660 pub omitted_graph_projection_count: u32,
661 pub omitted_analytics_job_count: u32,
662 pub omitted_vector_artifact_count: u32,
663 pub collection_names: Vec<String>,
664 pub indexes: Vec<NativeRegistryIndexSummary>,
665 pub graph_projections: Vec<NativeRegistryProjectionSummary>,
666 pub analytics_jobs: Vec<NativeRegistryJobSummary>,
667 pub vector_artifacts: Vec<NativeVectorArtifactSummary>,
668}
669
670#[derive(Debug, Clone, PartialEq, Eq)]
671pub struct NativeSnapshotSummary {
672 pub snapshot_id: u64,
673 pub created_at_unix_ms: u128,
674 pub superblock_sequence: u64,
675 pub collection_count: u32,
676 pub total_entities: u64,
677}
678
679#[derive(Debug, Clone, PartialEq, Eq)]
680pub struct NativeExportSummary {
681 pub name: String,
682 pub created_at_unix_ms: u128,
683 pub snapshot_id: Option<u64>,
684 pub superblock_sequence: u64,
685 pub collection_count: u32,
686 pub total_entities: u64,
687}
688
689#[derive(Debug, Clone, PartialEq, Eq)]
690pub struct NativeRecoverySummary {
691 pub snapshot_count: u32,
692 pub export_count: u32,
693 pub snapshots_complete: bool,
694 pub exports_complete: bool,
695 pub omitted_snapshot_count: u32,
696 pub omitted_export_count: u32,
697 pub snapshots: Vec<NativeSnapshotSummary>,
698 pub exports: Vec<NativeExportSummary>,
699}
700
701#[derive(Debug, Clone, PartialEq, Eq)]
702pub struct NativeCatalogCollectionSummary {
703 pub name: String,
704 pub entities: u64,
705 pub cross_refs: u64,
706 pub segments: u32,
707}
708
709#[derive(Debug, Clone, PartialEq, Eq)]
710pub struct NativeCatalogSummary {
711 pub collection_count: u32,
712 pub total_entities: u64,
713 pub collections_complete: bool,
714 pub omitted_collection_count: u32,
715 pub collections: Vec<NativeCatalogCollectionSummary>,
716}
717
718#[derive(Debug, Clone, PartialEq, Eq)]
719pub struct NativeMetadataStateSummary {
720 pub protocol_version: String,
721 pub generated_at_unix_ms: u128,
722 pub last_loaded_from: Option<String>,
723 pub last_healed_at_unix_ms: Option<u128>,
724}
725
726pub fn native_store_page_checksum(data: &[u8]) -> u64 {
727 let mut hasher = crc32fast::Hasher::new();
728 hasher.update(data);
729 hasher.finalize() as u64
730}
731
732pub fn encode_native_collection_roots_page(roots: &BTreeMap<String, u64>) -> Vec<u8> {
733 let mut data = Vec::with_capacity(1024);
734 data.extend_from_slice(NATIVE_COLLECTION_ROOTS_MAGIC);
735 data.extend_from_slice(&(roots.len() as u32).to_le_bytes());
736 for (collection, root) in roots {
737 data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
738 data.extend_from_slice(collection.as_bytes());
739 data.extend_from_slice(&root.to_le_bytes());
740 }
741 data
742}
743
744pub fn decode_native_collection_roots_page(content: &[u8]) -> RdbFileResult<BTreeMap<String, u64>> {
745 if content.len() < 8 || &content[0..4] != NATIVE_COLLECTION_ROOTS_MAGIC {
746 return Err(RdbFileError::InvalidOperation(
747 "invalid native collection roots page".to_string(),
748 ));
749 }
750
751 let count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]) as usize;
752 let mut pos = 8usize;
753 let mut roots = BTreeMap::new();
754
755 for _ in 0..count {
756 if pos + 4 > content.len() {
757 break;
758 }
759 let name_len = u32::from_le_bytes([
760 content[pos],
761 content[pos + 1],
762 content[pos + 2],
763 content[pos + 3],
764 ]) as usize;
765 pos += 4;
766 if pos + name_len + 8 > content.len() {
767 break;
768 }
769 let name = String::from_utf8(content[pos..pos + name_len].to_vec())
770 .map_err(|err| RdbFileError::InvalidOperation(err.to_string()))?;
771 pos += name_len;
772 let root = u64::from_le_bytes([
773 content[pos],
774 content[pos + 1],
775 content[pos + 2],
776 content[pos + 3],
777 content[pos + 4],
778 content[pos + 5],
779 content[pos + 6],
780 content[pos + 7],
781 ]);
782 pos += 8;
783 roots.insert(name, root);
784 }
785
786 Ok(roots)
787}
788
789pub fn encode_native_manifest_summary_page(sequence: u64, events: &[ManifestEvent]) -> Vec<u8> {
790 let sample_start = events.len().saturating_sub(NATIVE_MANIFEST_SAMPLE_LIMIT);
791 let sample = &events[sample_start..];
792
793 let mut data = Vec::with_capacity(1024);
794 data.extend_from_slice(NATIVE_MANIFEST_MAGIC);
795 data.extend_from_slice(&sequence.to_le_bytes());
796 data.extend_from_slice(&(events.len() as u32).to_le_bytes());
797 data.push(u8::from(events.len() <= NATIVE_MANIFEST_SAMPLE_LIMIT));
798 data.extend_from_slice(&(events.len().saturating_sub(sample.len()) as u32).to_le_bytes());
799 data.extend_from_slice(&(sample.len() as u32).to_le_bytes());
800 for event in sample {
801 data.push(native_manifest_kind_to_byte(event.kind));
802 data.extend_from_slice(&(event.collection.len() as u16).to_le_bytes());
803 data.extend_from_slice(event.collection.as_bytes());
804 data.extend_from_slice(&(event.object_key.len() as u16).to_le_bytes());
805 data.extend_from_slice(event.object_key.as_bytes());
806 data.extend_from_slice(&event.block.index.to_le_bytes());
807 data.extend_from_slice(&event.block.checksum.to_le_bytes());
808 data.extend_from_slice(&event.snapshot_min.to_le_bytes());
809 match event.snapshot_max {
810 Some(value) => {
811 data.push(1);
812 data.extend_from_slice(&value.to_le_bytes());
813 }
814 None => data.push(0),
815 }
816 }
817
818 data
819}
820
821pub fn decode_native_manifest_summary_page(content: &[u8]) -> RdbFileResult<NativeManifestSummary> {
822 if content.len() < 25 || &content[0..4] != NATIVE_MANIFEST_MAGIC {
823 return Err(RdbFileError::InvalidOperation(
824 "invalid native manifest summary page".to_string(),
825 ));
826 }
827
828 let sequence = u64::from_le_bytes([
829 content[4],
830 content[5],
831 content[6],
832 content[7],
833 content[8],
834 content[9],
835 content[10],
836 content[11],
837 ]);
838 let event_count = u32::from_le_bytes([content[12], content[13], content[14], content[15]]);
839 let events_complete = content[16] == 1;
840 let omitted_event_count =
841 u32::from_le_bytes([content[17], content[18], content[19], content[20]]);
842 let sample_count =
843 u32::from_le_bytes([content[21], content[22], content[23], content[24]]) as usize;
844
845 let mut pos = 25usize;
846 let mut recent_events = Vec::with_capacity(sample_count);
847 for _ in 0..sample_count {
848 if pos + 1 + 2 > content.len() {
849 break;
850 }
851 let kind = native_manifest_kind_from_byte(content[pos]).to_string();
852 pos += 1;
853 let collection_len = u16::from_le_bytes([content[pos], content[pos + 1]]) as usize;
854 pos += 2;
855 if pos + collection_len + 2 > content.len() {
856 break;
857 }
858 let collection = String::from_utf8(content[pos..pos + collection_len].to_vec())
859 .map_err(|err| RdbFileError::InvalidOperation(err.to_string()))?;
860 pos += collection_len;
861 let object_key_len = u16::from_le_bytes([content[pos], content[pos + 1]]) as usize;
862 pos += 2;
863 if pos + object_key_len + 8 + 16 + 8 + 1 > content.len() {
864 break;
865 }
866 let object_key = String::from_utf8(content[pos..pos + object_key_len].to_vec())
867 .map_err(|err| RdbFileError::InvalidOperation(err.to_string()))?;
868 pos += object_key_len;
869 let block_index = u64::from_le_bytes([
870 content[pos],
871 content[pos + 1],
872 content[pos + 2],
873 content[pos + 3],
874 content[pos + 4],
875 content[pos + 5],
876 content[pos + 6],
877 content[pos + 7],
878 ]);
879 pos += 8;
880 let mut checksum_bytes = [0u8; 16];
881 checksum_bytes.copy_from_slice(&content[pos..pos + 16]);
882 pos += 16;
883 let snapshot_min = u64::from_le_bytes([
884 content[pos],
885 content[pos + 1],
886 content[pos + 2],
887 content[pos + 3],
888 content[pos + 4],
889 content[pos + 5],
890 content[pos + 6],
891 content[pos + 7],
892 ]);
893 pos += 8;
894 let snapshot_max = match content.get(pos).copied() {
895 Some(1) => {
896 pos += 1;
897 if pos + 8 > content.len() {
898 return Err(RdbFileError::InvalidOperation(
899 "truncated native manifest snapshot_max".to_string(),
900 ));
901 }
902 let value = u64::from_le_bytes([
903 content[pos],
904 content[pos + 1],
905 content[pos + 2],
906 content[pos + 3],
907 content[pos + 4],
908 content[pos + 5],
909 content[pos + 6],
910 content[pos + 7],
911 ]);
912 pos += 8;
913 Some(value)
914 }
915 Some(_) => {
916 pos += 1;
917 None
918 }
919 None => None,
920 };
921
922 recent_events.push(NativeManifestEntrySummary {
923 collection,
924 object_key,
925 kind,
926 block_index,
927 block_checksum: u128::from_le_bytes(checksum_bytes),
928 snapshot_min,
929 snapshot_max,
930 });
931 }
932
933 Ok(NativeManifestSummary {
934 sequence,
935 event_count,
936 events_complete,
937 omitted_event_count,
938 recent_events,
939 })
940}
941
942pub fn encode_native_registry_summary_page(summary: &NativeRegistrySummary) -> Vec<u8> {
943 let mut data = Vec::with_capacity(2048);
944 data.extend_from_slice(NATIVE_REGISTRY_MAGIC);
945 data.extend_from_slice(&summary.collection_count.to_le_bytes());
946 data.extend_from_slice(&summary.index_count.to_le_bytes());
947 data.extend_from_slice(&summary.graph_projection_count.to_le_bytes());
948 data.extend_from_slice(&summary.analytics_job_count.to_le_bytes());
949 data.extend_from_slice(&summary.vector_artifact_count.to_le_bytes());
950 data.push(u8::from(summary.collections_complete));
951 data.push(u8::from(summary.indexes_complete));
952 data.push(u8::from(summary.graph_projections_complete));
953 data.push(u8::from(summary.analytics_jobs_complete));
954 data.push(u8::from(summary.vector_artifacts_complete));
955 data.extend_from_slice(&summary.omitted_collection_count.to_le_bytes());
956 data.extend_from_slice(&summary.omitted_index_count.to_le_bytes());
957 data.extend_from_slice(&summary.omitted_graph_projection_count.to_le_bytes());
958 data.extend_from_slice(&summary.omitted_analytics_job_count.to_le_bytes());
959 data.extend_from_slice(&summary.omitted_vector_artifact_count.to_le_bytes());
960 data.extend_from_slice(&(summary.collection_names.len() as u32).to_le_bytes());
961 data.extend_from_slice(&(summary.indexes.len() as u32).to_le_bytes());
962 data.extend_from_slice(&(summary.graph_projections.len() as u32).to_le_bytes());
963 data.extend_from_slice(&(summary.analytics_jobs.len() as u32).to_le_bytes());
964 data.extend_from_slice(&(summary.vector_artifacts.len() as u32).to_le_bytes());
965
966 for name in &summary.collection_names {
967 push_native_string(&mut data, name);
968 }
969 for index in &summary.indexes {
970 push_native_string(&mut data, &index.name);
971 push_native_string(&mut data, &index.kind);
972 match &index.collection {
973 Some(collection) => {
974 data.push(1);
975 push_native_string(&mut data, collection);
976 }
977 None => data.push(0),
978 }
979 data.push(u8::from(index.enabled));
980 data.extend_from_slice(&index.entries.to_le_bytes());
981 data.extend_from_slice(&index.estimated_memory_bytes.to_le_bytes());
982 match index.last_refresh_ms {
983 Some(value) => {
984 data.push(1);
985 data.extend_from_slice(&value.to_le_bytes());
986 }
987 None => data.push(0),
988 }
989 push_native_string(&mut data, &index.backend);
990 }
991 for projection in &summary.graph_projections {
992 push_native_string(&mut data, &projection.name);
993 push_native_string(&mut data, &projection.source);
994 data.extend_from_slice(&projection.created_at_unix_ms.to_le_bytes());
995 data.extend_from_slice(&projection.updated_at_unix_ms.to_le_bytes());
996 push_native_string_list(&mut data, &projection.node_labels);
997 push_native_string_list(&mut data, &projection.node_types);
998 push_native_string_list(&mut data, &projection.edge_labels);
999 match projection.last_materialized_sequence {
1000 Some(value) => {
1001 data.push(1);
1002 data.extend_from_slice(&value.to_le_bytes());
1003 }
1004 None => data.push(0),
1005 }
1006 }
1007 for job in &summary.analytics_jobs {
1008 push_native_string(&mut data, &job.id);
1009 push_native_string(&mut data, &job.kind);
1010 match &job.projection {
1011 Some(projection) => {
1012 data.push(1);
1013 push_native_string(&mut data, projection);
1014 }
1015 None => data.push(0),
1016 }
1017 push_native_string(&mut data, &job.state);
1018 data.extend_from_slice(&job.created_at_unix_ms.to_le_bytes());
1019 data.extend_from_slice(&job.updated_at_unix_ms.to_le_bytes());
1020 match job.last_run_sequence {
1021 Some(value) => {
1022 data.push(1);
1023 data.extend_from_slice(&value.to_le_bytes());
1024 }
1025 None => data.push(0),
1026 }
1027 let metadata_count = job.metadata.len().min(u16::MAX as usize) as u16;
1028 data.extend_from_slice(&metadata_count.to_le_bytes());
1029 for (key, value) in job.metadata.iter().take(metadata_count as usize) {
1030 push_native_string(&mut data, key);
1031 push_native_string(&mut data, value);
1032 }
1033 }
1034 for artifact in &summary.vector_artifacts {
1035 push_native_string(&mut data, &artifact.collection);
1036 push_native_string(&mut data, &artifact.artifact_kind);
1037 data.extend_from_slice(&artifact.vector_count.to_le_bytes());
1038 data.extend_from_slice(&artifact.dimension.to_le_bytes());
1039 data.extend_from_slice(&artifact.max_layer.to_le_bytes());
1040 data.extend_from_slice(&artifact.serialized_bytes.to_le_bytes());
1041 data.extend_from_slice(&artifact.checksum.to_le_bytes());
1042 }
1043
1044 data
1045}
1046
1047pub fn decode_native_registry_summary_page(content: &[u8]) -> RdbFileResult<NativeRegistrySummary> {
1048 if content.len() < 77 || &content[0..4] != NATIVE_REGISTRY_MAGIC {
1049 return Err(RdbFileError::InvalidOperation(
1050 "invalid native registry summary page".to_string(),
1051 ));
1052 }
1053
1054 let collection_count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
1055 let index_count = u32::from_le_bytes([content[8], content[9], content[10], content[11]]);
1056 let graph_projection_count =
1057 u32::from_le_bytes([content[12], content[13], content[14], content[15]]);
1058 let analytics_job_count =
1059 u32::from_le_bytes([content[16], content[17], content[18], content[19]]);
1060 let vector_artifact_count =
1061 u32::from_le_bytes([content[20], content[21], content[22], content[23]]);
1062 let collections_complete = content[24] == 1;
1063 let indexes_complete = content[25] == 1;
1064 let graph_projections_complete = content[26] == 1;
1065 let analytics_jobs_complete = content[27] == 1;
1066 let vector_artifacts_complete = content[28] == 1;
1067 let omitted_collection_count =
1068 u32::from_le_bytes([content[29], content[30], content[31], content[32]]);
1069 let omitted_index_count =
1070 u32::from_le_bytes([content[33], content[34], content[35], content[36]]);
1071 let omitted_graph_projection_count =
1072 u32::from_le_bytes([content[37], content[38], content[39], content[40]]);
1073 let omitted_analytics_job_count =
1074 u32::from_le_bytes([content[41], content[42], content[43], content[44]]);
1075 let omitted_vector_artifact_count =
1076 u32::from_le_bytes([content[45], content[46], content[47], content[48]]);
1077 let collection_sample_count =
1078 u32::from_le_bytes([content[49], content[50], content[51], content[52]]) as usize;
1079 let index_sample_count =
1080 u32::from_le_bytes([content[53], content[54], content[55], content[56]]) as usize;
1081 let projection_sample_count =
1082 u32::from_le_bytes([content[57], content[58], content[59], content[60]]) as usize;
1083 let job_sample_count =
1084 u32::from_le_bytes([content[61], content[62], content[63], content[64]]) as usize;
1085 let vector_artifact_sample_count =
1086 u32::from_le_bytes([content[65], content[66], content[67], content[68]]) as usize;
1087
1088 let mut pos = 69usize;
1089 let mut collection_names = Vec::with_capacity(collection_sample_count);
1090 for _ in 0..collection_sample_count {
1091 collection_names.push(read_native_string(content, &mut pos)?);
1092 }
1093
1094 let mut indexes = Vec::with_capacity(index_sample_count);
1095 for _ in 0..index_sample_count {
1096 let name = read_native_string(content, &mut pos)?;
1097 let kind = read_native_string(content, &mut pos)?;
1098 let collection = read_flagged_string(content, &mut pos)?;
1099 let enabled = content.get(pos).copied().unwrap_or(0) == 1;
1100 pos = pos.saturating_add(1);
1101 if pos + 16 > content.len() {
1102 break;
1103 }
1104 let entries = read_u64(content, &mut pos);
1105 let estimated_memory_bytes = read_u64(content, &mut pos);
1106 let last_refresh_ms =
1107 read_flagged_u128(content, &mut pos, "native registry refresh timestamp")?;
1108 let backend = read_native_string(content, &mut pos)?;
1109 indexes.push(NativeRegistryIndexSummary {
1110 name,
1111 kind,
1112 collection,
1113 enabled,
1114 entries,
1115 estimated_memory_bytes,
1116 last_refresh_ms,
1117 backend,
1118 });
1119 }
1120
1121 let mut graph_projections = Vec::with_capacity(projection_sample_count);
1122 for _ in 0..projection_sample_count {
1123 let name = read_native_string(content, &mut pos)?;
1124 let source = read_native_string(content, &mut pos)?;
1125 if pos + 32 > content.len() {
1126 break;
1127 }
1128 let created_at_unix_ms = read_u128(content, &mut pos);
1129 let updated_at_unix_ms = read_u128(content, &mut pos);
1130 let node_labels = read_native_string_list(content, &mut pos)?;
1131 let node_types = read_native_string_list(content, &mut pos)?;
1132 let edge_labels = read_native_string_list(content, &mut pos)?;
1133 let last_materialized_sequence = read_flagged_u64(
1134 content,
1135 &mut pos,
1136 "native projection materialization sequence",
1137 )?;
1138 graph_projections.push(NativeRegistryProjectionSummary {
1139 name,
1140 source,
1141 created_at_unix_ms,
1142 updated_at_unix_ms,
1143 node_labels,
1144 node_types,
1145 edge_labels,
1146 last_materialized_sequence,
1147 });
1148 }
1149
1150 let mut analytics_jobs = Vec::with_capacity(job_sample_count);
1151 for _ in 0..job_sample_count {
1152 let id = read_native_string(content, &mut pos)?;
1153 let kind = read_native_string(content, &mut pos)?;
1154 let projection = read_flagged_string(content, &mut pos)?;
1155 let state = read_native_string(content, &mut pos)?;
1156 if pos + 32 > content.len() {
1157 break;
1158 }
1159 let created_at_unix_ms = read_u128(content, &mut pos);
1160 let updated_at_unix_ms = read_u128(content, &mut pos);
1161 let last_run_sequence =
1162 read_flagged_u64(content, &mut pos, "native analytics job run sequence")?;
1163 if pos + 2 > content.len() {
1164 return Err(RdbFileError::InvalidOperation(
1165 "truncated native analytics job metadata count".to_string(),
1166 ));
1167 }
1168 let metadata_count = u16::from_le_bytes([content[pos], content[pos + 1]]) as usize;
1169 pos += 2;
1170 let mut metadata = BTreeMap::new();
1171 for _ in 0..metadata_count {
1172 let key = read_native_string(content, &mut pos)?;
1173 let value = read_native_string(content, &mut pos)?;
1174 metadata.insert(key, value);
1175 }
1176 analytics_jobs.push(NativeRegistryJobSummary {
1177 id,
1178 kind,
1179 projection,
1180 state,
1181 created_at_unix_ms,
1182 updated_at_unix_ms,
1183 last_run_sequence,
1184 metadata,
1185 });
1186 }
1187
1188 let mut vector_artifacts = Vec::with_capacity(vector_artifact_sample_count);
1189 for _ in 0..vector_artifact_sample_count {
1190 let collection = read_native_string(content, &mut pos)?;
1191 let artifact_kind = read_native_string(content, &mut pos)?;
1192 if pos + 32 > content.len() {
1193 break;
1194 }
1195 let vector_count = read_u64(content, &mut pos);
1196 let dimension = read_u32(content, &mut pos);
1197 let max_layer = read_u32(content, &mut pos);
1198 let serialized_bytes = read_u64(content, &mut pos);
1199 let checksum = read_u64(content, &mut pos);
1200 vector_artifacts.push(NativeVectorArtifactSummary {
1201 collection,
1202 artifact_kind,
1203 vector_count,
1204 dimension,
1205 max_layer,
1206 serialized_bytes,
1207 checksum,
1208 });
1209 }
1210
1211 Ok(NativeRegistrySummary {
1212 collection_count,
1213 index_count,
1214 graph_projection_count,
1215 analytics_job_count,
1216 vector_artifact_count,
1217 collections_complete,
1218 indexes_complete,
1219 graph_projections_complete,
1220 analytics_jobs_complete,
1221 vector_artifacts_complete,
1222 omitted_collection_count,
1223 omitted_index_count,
1224 omitted_graph_projection_count,
1225 omitted_analytics_job_count,
1226 omitted_vector_artifact_count,
1227 collection_names,
1228 indexes,
1229 graph_projections,
1230 analytics_jobs,
1231 vector_artifacts,
1232 })
1233}
1234
1235pub fn encode_native_recovery_summary_page(summary: &NativeRecoverySummary) -> Vec<u8> {
1236 let mut data = Vec::with_capacity(2048);
1237 data.extend_from_slice(NATIVE_RECOVERY_MAGIC);
1238 data.extend_from_slice(&summary.snapshot_count.to_le_bytes());
1239 data.extend_from_slice(&summary.export_count.to_le_bytes());
1240 data.push(u8::from(summary.snapshots_complete));
1241 data.push(u8::from(summary.exports_complete));
1242 data.extend_from_slice(&summary.omitted_snapshot_count.to_le_bytes());
1243 data.extend_from_slice(&summary.omitted_export_count.to_le_bytes());
1244 data.extend_from_slice(&(summary.snapshots.len() as u32).to_le_bytes());
1245 data.extend_from_slice(&(summary.exports.len() as u32).to_le_bytes());
1246
1247 for snapshot in &summary.snapshots {
1248 data.extend_from_slice(&snapshot.snapshot_id.to_le_bytes());
1249 data.extend_from_slice(&snapshot.created_at_unix_ms.to_le_bytes());
1250 data.extend_from_slice(&snapshot.superblock_sequence.to_le_bytes());
1251 data.extend_from_slice(&snapshot.collection_count.to_le_bytes());
1252 data.extend_from_slice(&snapshot.total_entities.to_le_bytes());
1253 }
1254
1255 for export in &summary.exports {
1256 push_native_string(&mut data, &export.name);
1257 data.extend_from_slice(&export.created_at_unix_ms.to_le_bytes());
1258 match export.snapshot_id {
1259 Some(snapshot_id) => {
1260 data.push(1);
1261 data.extend_from_slice(&snapshot_id.to_le_bytes());
1262 }
1263 None => data.push(0),
1264 }
1265 data.extend_from_slice(&export.superblock_sequence.to_le_bytes());
1266 data.extend_from_slice(&export.collection_count.to_le_bytes());
1267 data.extend_from_slice(&export.total_entities.to_le_bytes());
1268 }
1269
1270 data
1271}
1272
1273pub fn decode_native_recovery_summary_page(content: &[u8]) -> RdbFileResult<NativeRecoverySummary> {
1274 if content.len() < 30 || &content[0..4] != NATIVE_RECOVERY_MAGIC {
1275 return Err(RdbFileError::InvalidOperation(
1276 "invalid native recovery summary page".to_string(),
1277 ));
1278 }
1279
1280 let snapshot_count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
1281 let export_count = u32::from_le_bytes([content[8], content[9], content[10], content[11]]);
1282 let snapshots_complete = content[12] == 1;
1283 let exports_complete = content[13] == 1;
1284 let omitted_snapshot_count =
1285 u32::from_le_bytes([content[14], content[15], content[16], content[17]]);
1286 let omitted_export_count =
1287 u32::from_le_bytes([content[18], content[19], content[20], content[21]]);
1288 let snapshot_sample_count =
1289 u32::from_le_bytes([content[22], content[23], content[24], content[25]]) as usize;
1290 let export_sample_count =
1291 u32::from_le_bytes([content[26], content[27], content[28], content[29]]) as usize;
1292
1293 let mut pos = 30usize;
1294 let mut snapshots = Vec::with_capacity(snapshot_sample_count);
1295 for _ in 0..snapshot_sample_count {
1296 if pos + 44 > content.len() {
1297 break;
1298 }
1299 let snapshot_id = read_u64(content, &mut pos);
1300 let created_at_unix_ms = read_u128(content, &mut pos);
1301 let superblock_sequence = read_u64(content, &mut pos);
1302 let collection_count = read_u32(content, &mut pos);
1303 let total_entities = read_u64(content, &mut pos);
1304 snapshots.push(NativeSnapshotSummary {
1305 snapshot_id,
1306 created_at_unix_ms,
1307 superblock_sequence,
1308 collection_count,
1309 total_entities,
1310 });
1311 }
1312
1313 let mut exports = Vec::with_capacity(export_sample_count);
1314 for _ in 0..export_sample_count {
1315 let name = read_native_string(content, &mut pos)?;
1316 if pos + 16 > content.len() {
1317 break;
1318 }
1319 let created_at_unix_ms = read_u128(content, &mut pos);
1320 let snapshot_id = read_flagged_u64(content, &mut pos, "native export snapshot id")?;
1321 if pos + 20 > content.len() {
1322 break;
1323 }
1324 let superblock_sequence = read_u64(content, &mut pos);
1325 let collection_count = read_u32(content, &mut pos);
1326 let total_entities = read_u64(content, &mut pos);
1327 exports.push(NativeExportSummary {
1328 name,
1329 created_at_unix_ms,
1330 snapshot_id,
1331 superblock_sequence,
1332 collection_count,
1333 total_entities,
1334 });
1335 }
1336
1337 Ok(NativeRecoverySummary {
1338 snapshot_count,
1339 export_count,
1340 snapshots_complete,
1341 exports_complete,
1342 omitted_snapshot_count,
1343 omitted_export_count,
1344 snapshots,
1345 exports,
1346 })
1347}
1348
1349pub fn encode_native_catalog_summary_page(summary: &NativeCatalogSummary) -> Vec<u8> {
1350 let mut data = Vec::with_capacity(2048);
1351 data.extend_from_slice(NATIVE_CATALOG_MAGIC);
1352 data.extend_from_slice(&summary.collection_count.to_le_bytes());
1353 data.extend_from_slice(&summary.total_entities.to_le_bytes());
1354 data.push(u8::from(summary.collections_complete));
1355 data.extend_from_slice(&summary.omitted_collection_count.to_le_bytes());
1356 data.extend_from_slice(&(summary.collections.len() as u32).to_le_bytes());
1357 for collection in &summary.collections {
1358 push_native_string(&mut data, &collection.name);
1359 data.extend_from_slice(&collection.entities.to_le_bytes());
1360 data.extend_from_slice(&collection.cross_refs.to_le_bytes());
1361 data.extend_from_slice(&collection.segments.to_le_bytes());
1362 }
1363 data
1364}
1365
1366pub fn decode_native_catalog_summary_page(content: &[u8]) -> RdbFileResult<NativeCatalogSummary> {
1367 if content.len() < 25 || &content[0..4] != NATIVE_CATALOG_MAGIC {
1368 return Err(RdbFileError::InvalidOperation(
1369 "invalid native catalog summary page".to_string(),
1370 ));
1371 }
1372
1373 let collection_count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
1374 let mut pos = 8usize;
1375 let total_entities = read_u64(content, &mut pos);
1376 let collections_complete = content[16] == 1;
1377 let omitted_collection_count =
1378 u32::from_le_bytes([content[17], content[18], content[19], content[20]]);
1379 let sample_count =
1380 u32::from_le_bytes([content[21], content[22], content[23], content[24]]) as usize;
1381
1382 let mut pos = 25usize;
1383 let mut collections = Vec::with_capacity(sample_count);
1384 for _ in 0..sample_count {
1385 let name = read_native_string(content, &mut pos)?;
1386 if pos + 20 > content.len() {
1387 break;
1388 }
1389 let entities = read_u64(content, &mut pos);
1390 let cross_refs = read_u64(content, &mut pos);
1391 let segments = read_u32(content, &mut pos);
1392 collections.push(NativeCatalogCollectionSummary {
1393 name,
1394 entities,
1395 cross_refs,
1396 segments,
1397 });
1398 }
1399
1400 Ok(NativeCatalogSummary {
1401 collection_count,
1402 total_entities,
1403 collections_complete,
1404 omitted_collection_count,
1405 collections,
1406 })
1407}
1408
1409pub fn encode_native_metadata_state_summary_page(summary: &NativeMetadataStateSummary) -> Vec<u8> {
1410 let mut data = Vec::with_capacity(512);
1411 data.extend_from_slice(NATIVE_METADATA_STATE_MAGIC);
1412 push_native_string(&mut data, &summary.protocol_version);
1413 data.extend_from_slice(&summary.generated_at_unix_ms.to_le_bytes());
1414 match &summary.last_loaded_from {
1415 Some(value) => {
1416 data.push(1);
1417 push_native_string(&mut data, value);
1418 }
1419 None => data.push(0),
1420 }
1421 match summary.last_healed_at_unix_ms {
1422 Some(value) => {
1423 data.push(1);
1424 data.extend_from_slice(&value.to_le_bytes());
1425 }
1426 None => data.push(0),
1427 }
1428 data
1429}
1430
1431pub fn decode_native_metadata_state_summary_page(
1432 content: &[u8],
1433) -> RdbFileResult<NativeMetadataStateSummary> {
1434 if content.len() < 22 || &content[0..4] != NATIVE_METADATA_STATE_MAGIC {
1435 return Err(RdbFileError::InvalidOperation(
1436 "invalid native metadata state page".to_string(),
1437 ));
1438 }
1439
1440 let mut pos = 4usize;
1441 let protocol_version = read_native_string(content, &mut pos)?;
1442 if pos + 16 > content.len() {
1443 return Err(RdbFileError::InvalidOperation(
1444 "truncated native metadata state timestamp".to_string(),
1445 ));
1446 }
1447 let generated_at_unix_ms = read_u128(content, &mut pos);
1448 let last_loaded_from = read_flagged_string(content, &mut pos)?;
1449 let last_healed_at_unix_ms =
1450 read_flagged_u128(content, &mut pos, "native metadata heal timestamp")?;
1451
1452 Ok(NativeMetadataStateSummary {
1453 protocol_version,
1454 generated_at_unix_ms,
1455 last_loaded_from,
1456 last_healed_at_unix_ms,
1457 })
1458}
1459
1460pub const NATIVE_BLOB_PAGE_HEADER_BYTES: usize = 12;
1461
1462pub fn native_blob_chunk_capacity(page_size: usize, page_header_size: usize) -> usize {
1463 page_size - page_header_size - NATIVE_BLOB_PAGE_HEADER_BYTES
1464}
1465
1466pub fn encode_native_blob_page(next_page: u32, chunk: &[u8]) -> Vec<u8> {
1467 let mut data = Vec::with_capacity(chunk.len() + NATIVE_BLOB_PAGE_HEADER_BYTES);
1468 data.extend_from_slice(NATIVE_BLOB_MAGIC);
1469 data.extend_from_slice(&next_page.to_le_bytes());
1470 data.extend_from_slice(&(chunk.len() as u32).to_le_bytes());
1471 data.extend_from_slice(chunk);
1472 data
1473}
1474
1475pub fn decode_native_blob_page(content: &[u8]) -> RdbFileResult<(u32, Vec<u8>)> {
1476 if content.len() < NATIVE_BLOB_PAGE_HEADER_BYTES || &content[0..4] != NATIVE_BLOB_MAGIC {
1477 return Err(RdbFileError::InvalidOperation(
1478 "invalid native blob page".to_string(),
1479 ));
1480 }
1481 let next_page = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
1482 let chunk_len = u32::from_le_bytes([content[8], content[9], content[10], content[11]]) as usize;
1483 if NATIVE_BLOB_PAGE_HEADER_BYTES + chunk_len > content.len() {
1484 return Err(RdbFileError::InvalidOperation(
1485 "truncated native blob page".to_string(),
1486 ));
1487 }
1488 Ok((
1489 next_page,
1490 content[NATIVE_BLOB_PAGE_HEADER_BYTES..NATIVE_BLOB_PAGE_HEADER_BYTES + chunk_len].to_vec(),
1491 ))
1492}
1493
1494pub fn encode_native_vector_artifact_store_page(
1495 summaries: &[NativeVectorArtifactPageSummary],
1496) -> Vec<u8> {
1497 let mut data = Vec::with_capacity(1024 + summaries.len() * 64);
1498 data.extend_from_slice(NATIVE_VECTOR_ARTIFACT_MAGIC);
1499 data.extend_from_slice(&(summaries.len() as u32).to_le_bytes());
1500 for summary in summaries {
1501 push_native_string(&mut data, &summary.collection);
1502 push_native_string(&mut data, &summary.artifact_kind);
1503 data.extend_from_slice(&summary.root_page.to_le_bytes());
1504 data.extend_from_slice(&summary.page_count.to_le_bytes());
1505 data.extend_from_slice(&summary.byte_len.to_le_bytes());
1506 data.extend_from_slice(&summary.checksum.to_le_bytes());
1507 }
1508 data
1509}
1510
1511pub fn decode_native_vector_artifact_store_page(
1512 content: &[u8],
1513) -> RdbFileResult<Vec<NativeVectorArtifactPageSummary>> {
1514 if content.len() < 8 || &content[0..4] != NATIVE_VECTOR_ARTIFACT_MAGIC {
1515 return Err(RdbFileError::InvalidOperation(
1516 "invalid native vector artifact store page".to_string(),
1517 ));
1518 }
1519 let count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]) as usize;
1520 let mut pos = 8usize;
1521 let mut summaries = Vec::with_capacity(count);
1522 for _ in 0..count {
1523 let collection = read_native_string(content, &mut pos)?;
1524 let artifact_kind = read_native_string(content, &mut pos)?;
1525 if pos + 24 > content.len() {
1526 break;
1527 }
1528 let root_page = read_u32(content, &mut pos);
1529 let page_count = read_u32(content, &mut pos);
1530 let byte_len = read_u64(content, &mut pos);
1531 let checksum = read_u64(content, &mut pos);
1532 summaries.push(NativeVectorArtifactPageSummary {
1533 collection,
1534 artifact_kind,
1535 root_page,
1536 page_count,
1537 byte_len,
1538 checksum,
1539 });
1540 }
1541 Ok(summaries)
1542}
1543
1544fn native_manifest_kind_to_byte(kind: ManifestEventKind) -> u8 {
1545 match kind {
1546 ManifestEventKind::Insert => 1,
1547 ManifestEventKind::Update => 2,
1548 ManifestEventKind::Remove => 3,
1549 ManifestEventKind::Checkpoint => 4,
1550 }
1551}
1552
1553fn push_native_string(data: &mut Vec<u8>, value: &str) {
1554 let bytes = value.as_bytes();
1555 let len = bytes.len().min(u16::MAX as usize) as u16;
1556 data.extend_from_slice(&len.to_le_bytes());
1557 data.extend_from_slice(&bytes[..len as usize]);
1558}
1559
1560fn read_native_string(content: &[u8], pos: &mut usize) -> RdbFileResult<String> {
1561 if *pos + 2 > content.len() {
1562 return Err(RdbFileError::InvalidOperation(
1563 "truncated native string length".to_string(),
1564 ));
1565 }
1566 let len = u16::from_le_bytes([content[*pos], content[*pos + 1]]) as usize;
1567 *pos += 2;
1568 if *pos + len > content.len() {
1569 return Err(RdbFileError::InvalidOperation(
1570 "truncated native string payload".to_string(),
1571 ));
1572 }
1573 let value = String::from_utf8(content[*pos..*pos + len].to_vec())
1574 .map_err(|err| RdbFileError::InvalidOperation(err.to_string()))?;
1575 *pos += len;
1576 Ok(value)
1577}
1578
1579fn push_native_string_list(data: &mut Vec<u8>, values: &[String]) {
1580 let count = values.len().min(u16::MAX as usize) as u16;
1581 data.extend_from_slice(&count.to_le_bytes());
1582 for value in values.iter().take(count as usize) {
1583 push_native_string(data, value);
1584 }
1585}
1586
1587fn read_native_string_list(content: &[u8], pos: &mut usize) -> RdbFileResult<Vec<String>> {
1588 if *pos + 2 > content.len() {
1589 return Err(RdbFileError::InvalidOperation(
1590 "truncated native string list count".to_string(),
1591 ));
1592 }
1593 let count = u16::from_le_bytes([content[*pos], content[*pos + 1]]) as usize;
1594 *pos += 2;
1595 let mut values = Vec::with_capacity(count);
1596 for _ in 0..count {
1597 values.push(read_native_string(content, pos)?);
1598 }
1599 Ok(values)
1600}
1601
1602fn read_flagged_string(content: &[u8], pos: &mut usize) -> RdbFileResult<Option<String>> {
1603 match content.get(*pos).copied() {
1604 Some(1) => {
1605 *pos += 1;
1606 Ok(Some(read_native_string(content, pos)?))
1607 }
1608 Some(_) => {
1609 *pos += 1;
1610 Ok(None)
1611 }
1612 None => Ok(None),
1613 }
1614}
1615
1616fn read_flagged_u64(content: &[u8], pos: &mut usize, label: &str) -> RdbFileResult<Option<u64>> {
1617 match content.get(*pos).copied() {
1618 Some(1) => {
1619 *pos += 1;
1620 if *pos + 8 > content.len() {
1621 return Err(RdbFileError::InvalidOperation(format!("truncated {label}")));
1622 }
1623 Ok(Some(read_u64(content, pos)))
1624 }
1625 Some(_) => {
1626 *pos += 1;
1627 Ok(None)
1628 }
1629 None => Ok(None),
1630 }
1631}
1632
1633fn read_flagged_u128(content: &[u8], pos: &mut usize, label: &str) -> RdbFileResult<Option<u128>> {
1634 match content.get(*pos).copied() {
1635 Some(1) => {
1636 *pos += 1;
1637 if *pos + 16 > content.len() {
1638 return Err(RdbFileError::InvalidOperation(format!("truncated {label}")));
1639 }
1640 Ok(Some(read_u128(content, pos)))
1641 }
1642 Some(_) => {
1643 *pos += 1;
1644 Ok(None)
1645 }
1646 None => Ok(None),
1647 }
1648}
1649
1650fn read_u32(content: &[u8], pos: &mut usize) -> u32 {
1651 let value = u32::from_le_bytes([
1652 content[*pos],
1653 content[*pos + 1],
1654 content[*pos + 2],
1655 content[*pos + 3],
1656 ]);
1657 *pos += 4;
1658 value
1659}
1660
1661fn read_u64(content: &[u8], pos: &mut usize) -> u64 {
1662 let value = u64::from_le_bytes([
1663 content[*pos],
1664 content[*pos + 1],
1665 content[*pos + 2],
1666 content[*pos + 3],
1667 content[*pos + 4],
1668 content[*pos + 5],
1669 content[*pos + 6],
1670 content[*pos + 7],
1671 ]);
1672 *pos += 8;
1673 value
1674}
1675
1676fn read_u128(content: &[u8], pos: &mut usize) -> u128 {
1677 let mut bytes = [0u8; 16];
1678 bytes.copy_from_slice(&content[*pos..*pos + 16]);
1679 *pos += 16;
1680 u128::from_le_bytes(bytes)
1681}
1682
1683fn native_manifest_kind_from_byte(byte: u8) -> &'static str {
1684 match byte {
1685 1 => "insert",
1686 2 => "update",
1687 3 => "remove",
1688 4 => "checkpoint",
1689 _ => "unknown",
1690 }
1691}
1692
1693#[cfg(test)]
1694mod tests;