1use super::*;
2
3impl UnifiedStore {
4 pub fn read_native_registry_summary(
5 &self,
6 page_id: u32,
7 ) -> Result<NativeRegistrySummary, StoreError> {
8 let Some(pager) = &self.pager else {
9 return Err(StoreError::Serialization(
10 "native registry summary requires paged mode".to_string(),
11 ));
12 };
13 if page_id == 0 {
14 return Err(StoreError::Serialization(
15 "native registry summary page is not set".to_string(),
16 ));
17 }
18
19 let page = pager
20 .read_page(page_id)
21 .map_err(|err| StoreError::Serialization(err.to_string()))?;
22 let bytes = page.as_bytes();
23 let content = &bytes[crate::storage::engine::HEADER_SIZE..];
24 if content.len() < 77 || &content[0..4] != NATIVE_REGISTRY_MAGIC {
25 return Err(StoreError::Serialization(
26 "invalid native registry summary page".to_string(),
27 ));
28 }
29
30 let collection_count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
31 let index_count = u32::from_le_bytes([content[8], content[9], content[10], content[11]]);
32 let graph_projection_count =
33 u32::from_le_bytes([content[12], content[13], content[14], content[15]]);
34 let analytics_job_count =
35 u32::from_le_bytes([content[16], content[17], content[18], content[19]]);
36 let vector_artifact_count =
37 u32::from_le_bytes([content[20], content[21], content[22], content[23]]);
38 let collections_complete = content[24] == 1;
39 let indexes_complete = content[25] == 1;
40 let graph_projections_complete = content[26] == 1;
41 let analytics_jobs_complete = content[27] == 1;
42 let vector_artifacts_complete = content[28] == 1;
43 let omitted_collection_count =
44 u32::from_le_bytes([content[29], content[30], content[31], content[32]]);
45 let omitted_index_count =
46 u32::from_le_bytes([content[33], content[34], content[35], content[36]]);
47 let omitted_graph_projection_count =
48 u32::from_le_bytes([content[37], content[38], content[39], content[40]]);
49 let omitted_analytics_job_count =
50 u32::from_le_bytes([content[41], content[42], content[43], content[44]]);
51 let omitted_vector_artifact_count =
52 u32::from_le_bytes([content[45], content[46], content[47], content[48]]);
53 let collection_sample_count =
54 u32::from_le_bytes([content[49], content[50], content[51], content[52]]) as usize;
55 let index_sample_count =
56 u32::from_le_bytes([content[53], content[54], content[55], content[56]]) as usize;
57 let projection_sample_count =
58 u32::from_le_bytes([content[57], content[58], content[59], content[60]]) as usize;
59 let job_sample_count =
60 u32::from_le_bytes([content[61], content[62], content[63], content[64]]) as usize;
61 let vector_artifact_sample_count =
62 u32::from_le_bytes([content[65], content[66], content[67], content[68]]) as usize;
63
64 let mut pos = 69usize;
65 let mut collection_names = Vec::with_capacity(collection_sample_count);
66 for _ in 0..collection_sample_count {
67 collection_names.push(read_native_string(content, &mut pos)?);
68 }
69
70 let mut indexes = Vec::with_capacity(index_sample_count);
71 for _ in 0..index_sample_count {
72 let name = read_native_string(content, &mut pos)?;
73 let kind = read_native_string(content, &mut pos)?;
74 let collection = match content.get(pos).copied() {
75 Some(1) => {
76 pos += 1;
77 Some(read_native_string(content, &mut pos)?)
78 }
79 Some(_) => {
80 pos += 1;
81 None
82 }
83 None => None,
84 };
85 let enabled = content.get(pos).copied().unwrap_or(0) == 1;
86 pos = pos.saturating_add(1);
87 if pos + 16 > content.len() {
88 break;
89 }
90 let entries = u64::from_le_bytes([
91 content[pos],
92 content[pos + 1],
93 content[pos + 2],
94 content[pos + 3],
95 content[pos + 4],
96 content[pos + 5],
97 content[pos + 6],
98 content[pos + 7],
99 ]);
100 pos += 8;
101 let estimated_memory_bytes = u64::from_le_bytes([
102 content[pos],
103 content[pos + 1],
104 content[pos + 2],
105 content[pos + 3],
106 content[pos + 4],
107 content[pos + 5],
108 content[pos + 6],
109 content[pos + 7],
110 ]);
111 pos += 8;
112 let last_refresh_ms = match content.get(pos).copied() {
113 Some(1) => {
114 pos += 1;
115 if pos + 16 > content.len() {
116 return Err(StoreError::Serialization(
117 "truncated native registry refresh timestamp".to_string(),
118 ));
119 }
120 let mut bytes = [0u8; 16];
121 bytes.copy_from_slice(&content[pos..pos + 16]);
122 pos += 16;
123 Some(u128::from_le_bytes(bytes))
124 }
125 Some(_) => {
126 pos += 1;
127 None
128 }
129 None => None,
130 };
131 let backend = read_native_string(content, &mut pos)?;
132 indexes.push(NativeRegistryIndexSummary {
133 name,
134 kind,
135 collection,
136 enabled,
137 entries,
138 estimated_memory_bytes,
139 last_refresh_ms,
140 backend,
141 });
142 }
143
144 let mut graph_projections = Vec::with_capacity(projection_sample_count);
145 for _ in 0..projection_sample_count {
146 let name = read_native_string(content, &mut pos)?;
147 let source = read_native_string(content, &mut pos)?;
148 if pos + 32 > content.len() {
149 break;
150 }
151 let mut created_bytes = [0u8; 16];
152 created_bytes.copy_from_slice(&content[pos..pos + 16]);
153 pos += 16;
154 let mut updated_bytes = [0u8; 16];
155 updated_bytes.copy_from_slice(&content[pos..pos + 16]);
156 pos += 16;
157 let node_labels = read_native_string_list(content, &mut pos)?;
158 let node_types = read_native_string_list(content, &mut pos)?;
159 let edge_labels = read_native_string_list(content, &mut pos)?;
160 let last_materialized_sequence = match content.get(pos).copied() {
161 Some(1) => {
162 pos += 1;
163 if pos + 8 > content.len() {
164 return Err(StoreError::Serialization(
165 "truncated native projection materialization sequence".to_string(),
166 ));
167 }
168 let value = u64::from_le_bytes([
169 content[pos],
170 content[pos + 1],
171 content[pos + 2],
172 content[pos + 3],
173 content[pos + 4],
174 content[pos + 5],
175 content[pos + 6],
176 content[pos + 7],
177 ]);
178 pos += 8;
179 Some(value)
180 }
181 Some(_) => {
182 pos += 1;
183 None
184 }
185 None => None,
186 };
187 graph_projections.push(NativeRegistryProjectionSummary {
188 name,
189 source,
190 created_at_unix_ms: u128::from_le_bytes(created_bytes),
191 updated_at_unix_ms: u128::from_le_bytes(updated_bytes),
192 node_labels,
193 node_types,
194 edge_labels,
195 last_materialized_sequence,
196 });
197 }
198
199 let mut analytics_jobs = Vec::with_capacity(job_sample_count);
200 for _ in 0..job_sample_count {
201 let id = read_native_string(content, &mut pos)?;
202 let kind = read_native_string(content, &mut pos)?;
203 let projection = match content.get(pos).copied() {
204 Some(1) => {
205 pos += 1;
206 Some(read_native_string(content, &mut pos)?)
207 }
208 Some(_) => {
209 pos += 1;
210 None
211 }
212 None => None,
213 };
214 let state = read_native_string(content, &mut pos)?;
215 if pos + 32 > content.len() {
216 break;
217 }
218 let mut created_bytes = [0u8; 16];
219 created_bytes.copy_from_slice(&content[pos..pos + 16]);
220 pos += 16;
221 let mut updated_bytes = [0u8; 16];
222 updated_bytes.copy_from_slice(&content[pos..pos + 16]);
223 pos += 16;
224 let last_run_sequence = match content.get(pos).copied() {
225 Some(1) => {
226 pos += 1;
227 if pos + 8 > content.len() {
228 return Err(StoreError::Serialization(
229 "truncated native analytics job run sequence".to_string(),
230 ));
231 }
232 let value = u64::from_le_bytes([
233 content[pos],
234 content[pos + 1],
235 content[pos + 2],
236 content[pos + 3],
237 content[pos + 4],
238 content[pos + 5],
239 content[pos + 6],
240 content[pos + 7],
241 ]);
242 pos += 8;
243 Some(value)
244 }
245 Some(_) => {
246 pos += 1;
247 None
248 }
249 None => None,
250 };
251 if pos + 2 > content.len() {
252 return Err(StoreError::Serialization(
253 "truncated native analytics job metadata count".to_string(),
254 ));
255 }
256 let metadata_count = u16::from_le_bytes([content[pos], content[pos + 1]]) as usize;
257 pos += 2;
258 let mut metadata = BTreeMap::new();
259 for _ in 0..metadata_count {
260 let key = read_native_string(content, &mut pos)?;
261 let value = read_native_string(content, &mut pos)?;
262 metadata.insert(key, value);
263 }
264 analytics_jobs.push(NativeRegistryJobSummary {
265 id,
266 kind,
267 projection,
268 state,
269 created_at_unix_ms: u128::from_le_bytes(created_bytes),
270 updated_at_unix_ms: u128::from_le_bytes(updated_bytes),
271 last_run_sequence,
272 metadata,
273 });
274 }
275 let mut vector_artifacts = Vec::with_capacity(vector_artifact_sample_count);
276 for _ in 0..vector_artifact_sample_count {
277 let collection = read_native_string(content, &mut pos)?;
278 let artifact_kind = read_native_string(content, &mut pos)?;
279 if pos + 32 > content.len() {
280 break;
281 }
282 let vector_count = u64::from_le_bytes([
283 content[pos],
284 content[pos + 1],
285 content[pos + 2],
286 content[pos + 3],
287 content[pos + 4],
288 content[pos + 5],
289 content[pos + 6],
290 content[pos + 7],
291 ]);
292 pos += 8;
293 let dimension = u32::from_le_bytes([
294 content[pos],
295 content[pos + 1],
296 content[pos + 2],
297 content[pos + 3],
298 ]);
299 pos += 4;
300 let max_layer = u32::from_le_bytes([
301 content[pos],
302 content[pos + 1],
303 content[pos + 2],
304 content[pos + 3],
305 ]);
306 pos += 4;
307 let serialized_bytes = u64::from_le_bytes([
308 content[pos],
309 content[pos + 1],
310 content[pos + 2],
311 content[pos + 3],
312 content[pos + 4],
313 content[pos + 5],
314 content[pos + 6],
315 content[pos + 7],
316 ]);
317 pos += 8;
318 let checksum = u64::from_le_bytes([
319 content[pos],
320 content[pos + 1],
321 content[pos + 2],
322 content[pos + 3],
323 content[pos + 4],
324 content[pos + 5],
325 content[pos + 6],
326 content[pos + 7],
327 ]);
328 pos += 8;
329 vector_artifacts.push(NativeVectorArtifactSummary {
330 collection,
331 artifact_kind,
332 vector_count,
333 dimension,
334 max_layer,
335 serialized_bytes,
336 checksum,
337 });
338 }
339
340 Ok(NativeRegistrySummary {
341 collection_count,
342 index_count,
343 graph_projection_count,
344 analytics_job_count,
345 vector_artifact_count,
346 collections_complete,
347 indexes_complete,
348 graph_projections_complete,
349 analytics_jobs_complete,
350 vector_artifacts_complete,
351 omitted_collection_count,
352 omitted_index_count,
353 omitted_graph_projection_count,
354 omitted_analytics_job_count,
355 omitted_vector_artifact_count,
356 collection_names,
357 indexes,
358 graph_projections,
359 analytics_jobs,
360 vector_artifacts,
361 })
362 }
363
364 pub fn write_native_recovery_summary(
366 &self,
367 summary: &NativeRecoverySummary,
368 existing_page: Option<u32>,
369 ) -> Result<(u32, u64), StoreError> {
370 let Some(pager) = &self.pager else {
371 return Ok((0, 0));
372 };
373
374 let page_id = match existing_page.filter(|page| *page != 0) {
375 Some(page) => page,
376 None => pager
377 .allocate_page(crate::storage::engine::PageType::NativeMeta)
378 .map_err(|err| StoreError::Serialization(err.to_string()))?
379 .page_id(),
380 };
381
382 let mut data = Vec::with_capacity(2048);
383 data.extend_from_slice(NATIVE_RECOVERY_MAGIC);
384 data.extend_from_slice(&summary.snapshot_count.to_le_bytes());
385 data.extend_from_slice(&summary.export_count.to_le_bytes());
386 data.push(u8::from(summary.snapshots_complete));
387 data.push(u8::from(summary.exports_complete));
388 data.extend_from_slice(&summary.omitted_snapshot_count.to_le_bytes());
389 data.extend_from_slice(&summary.omitted_export_count.to_le_bytes());
390 data.extend_from_slice(&(summary.snapshots.len() as u32).to_le_bytes());
391 data.extend_from_slice(&(summary.exports.len() as u32).to_le_bytes());
392
393 for snapshot in &summary.snapshots {
394 data.extend_from_slice(&snapshot.snapshot_id.to_le_bytes());
395 data.extend_from_slice(&snapshot.created_at_unix_ms.to_le_bytes());
396 data.extend_from_slice(&snapshot.superblock_sequence.to_le_bytes());
397 data.extend_from_slice(&snapshot.collection_count.to_le_bytes());
398 data.extend_from_slice(&snapshot.total_entities.to_le_bytes());
399 }
400
401 for export in &summary.exports {
402 push_native_string(&mut data, &export.name);
403 data.extend_from_slice(&export.created_at_unix_ms.to_le_bytes());
404 match export.snapshot_id {
405 Some(snapshot_id) => {
406 data.push(1);
407 data.extend_from_slice(&snapshot_id.to_le_bytes());
408 }
409 None => data.push(0),
410 }
411 data.extend_from_slice(&export.superblock_sequence.to_le_bytes());
412 data.extend_from_slice(&export.collection_count.to_le_bytes());
413 data.extend_from_slice(&export.total_entities.to_le_bytes());
414 }
415
416 let checksum = crate::storage::engine::crc32(&data) as u64;
417 let mut page = crate::storage::engine::Page::new(
418 crate::storage::engine::PageType::NativeMeta,
419 page_id,
420 );
421 let bytes = page.as_bytes_mut();
422 let content_start = crate::storage::engine::HEADER_SIZE;
423 let copy_len = data.len().min(bytes.len() - content_start);
424 bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
425 pager
426 .write_page(page_id, page)
427 .map_err(|err| StoreError::Serialization(err.to_string()))?;
428 Ok((page_id, checksum))
429 }
430
431 pub fn read_native_recovery_summary(
433 &self,
434 page_id: u32,
435 ) -> Result<NativeRecoverySummary, StoreError> {
436 let Some(pager) = &self.pager else {
437 return Err(StoreError::Serialization(
438 "native recovery summary requires paged mode".to_string(),
439 ));
440 };
441 if page_id == 0 {
442 return Err(StoreError::Serialization(
443 "native recovery summary page is not set".to_string(),
444 ));
445 }
446
447 let page = pager
448 .read_page(page_id)
449 .map_err(|err| StoreError::Serialization(err.to_string()))?;
450 let bytes = page.as_bytes();
451 let content = &bytes[crate::storage::engine::HEADER_SIZE..];
452 if content.len() < 30 || &content[0..4] != NATIVE_RECOVERY_MAGIC {
453 return Err(StoreError::Serialization(
454 "invalid native recovery summary page".to_string(),
455 ));
456 }
457
458 let snapshot_count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
459 let export_count = u32::from_le_bytes([content[8], content[9], content[10], content[11]]);
460 let snapshots_complete = content[12] == 1;
461 let exports_complete = content[13] == 1;
462 let omitted_snapshot_count =
463 u32::from_le_bytes([content[14], content[15], content[16], content[17]]);
464 let omitted_export_count =
465 u32::from_le_bytes([content[18], content[19], content[20], content[21]]);
466 let snapshot_sample_count =
467 u32::from_le_bytes([content[22], content[23], content[24], content[25]]) as usize;
468 let export_sample_count =
469 u32::from_le_bytes([content[26], content[27], content[28], content[29]]) as usize;
470
471 let mut pos = 30usize;
472 let mut snapshots = Vec::with_capacity(snapshot_sample_count);
473 for _ in 0..snapshot_sample_count {
474 if pos + 44 > content.len() {
475 break;
476 }
477 let snapshot_id = u64::from_le_bytes([
478 content[pos],
479 content[pos + 1],
480 content[pos + 2],
481 content[pos + 3],
482 content[pos + 4],
483 content[pos + 5],
484 content[pos + 6],
485 content[pos + 7],
486 ]);
487 pos += 8;
488 let mut created_bytes = [0u8; 16];
489 created_bytes.copy_from_slice(&content[pos..pos + 16]);
490 pos += 16;
491 let superblock_sequence = u64::from_le_bytes([
492 content[pos],
493 content[pos + 1],
494 content[pos + 2],
495 content[pos + 3],
496 content[pos + 4],
497 content[pos + 5],
498 content[pos + 6],
499 content[pos + 7],
500 ]);
501 pos += 8;
502 let collection_count = u32::from_le_bytes([
503 content[pos],
504 content[pos + 1],
505 content[pos + 2],
506 content[pos + 3],
507 ]);
508 pos += 4;
509 let total_entities = u64::from_le_bytes([
510 content[pos],
511 content[pos + 1],
512 content[pos + 2],
513 content[pos + 3],
514 content[pos + 4],
515 content[pos + 5],
516 content[pos + 6],
517 content[pos + 7],
518 ]);
519 pos += 8;
520 snapshots.push(NativeSnapshotSummary {
521 snapshot_id,
522 created_at_unix_ms: u128::from_le_bytes(created_bytes),
523 superblock_sequence,
524 collection_count,
525 total_entities,
526 });
527 }
528
529 let mut exports = Vec::with_capacity(export_sample_count);
530 for _ in 0..export_sample_count {
531 let name = read_native_string(content, &mut pos)?;
532 if pos + 16 > content.len() {
533 break;
534 }
535 let mut created_bytes = [0u8; 16];
536 created_bytes.copy_from_slice(&content[pos..pos + 16]);
537 pos += 16;
538 let snapshot_id = match content.get(pos).copied() {
539 Some(1) => {
540 pos += 1;
541 if pos + 8 > content.len() {
542 return Err(StoreError::Serialization(
543 "truncated native export snapshot id".to_string(),
544 ));
545 }
546 let value = u64::from_le_bytes([
547 content[pos],
548 content[pos + 1],
549 content[pos + 2],
550 content[pos + 3],
551 content[pos + 4],
552 content[pos + 5],
553 content[pos + 6],
554 content[pos + 7],
555 ]);
556 pos += 8;
557 Some(value)
558 }
559 Some(_) => {
560 pos += 1;
561 None
562 }
563 None => None,
564 };
565 if pos + 20 > content.len() {
566 break;
567 }
568 let superblock_sequence = u64::from_le_bytes([
569 content[pos],
570 content[pos + 1],
571 content[pos + 2],
572 content[pos + 3],
573 content[pos + 4],
574 content[pos + 5],
575 content[pos + 6],
576 content[pos + 7],
577 ]);
578 pos += 8;
579 let collection_count = u32::from_le_bytes([
580 content[pos],
581 content[pos + 1],
582 content[pos + 2],
583 content[pos + 3],
584 ]);
585 pos += 4;
586 let total_entities = u64::from_le_bytes([
587 content[pos],
588 content[pos + 1],
589 content[pos + 2],
590 content[pos + 3],
591 content[pos + 4],
592 content[pos + 5],
593 content[pos + 6],
594 content[pos + 7],
595 ]);
596 pos += 8;
597 exports.push(NativeExportSummary {
598 name,
599 created_at_unix_ms: u128::from_le_bytes(created_bytes),
600 snapshot_id,
601 superblock_sequence,
602 collection_count,
603 total_entities,
604 });
605 }
606
607 Ok(NativeRecoverySummary {
608 snapshot_count,
609 export_count,
610 snapshots_complete,
611 exports_complete,
612 omitted_snapshot_count,
613 omitted_export_count,
614 snapshots,
615 exports,
616 })
617 }
618
619 pub fn write_native_catalog_summary(
621 &self,
622 summary: &NativeCatalogSummary,
623 existing_page: Option<u32>,
624 ) -> Result<(u32, u64), StoreError> {
625 let Some(pager) = &self.pager else {
626 return Ok((0, 0));
627 };
628
629 let page_id = match existing_page.filter(|page| *page != 0) {
630 Some(page) => page,
631 None => pager
632 .allocate_page(crate::storage::engine::PageType::NativeMeta)
633 .map_err(|err| StoreError::Serialization(err.to_string()))?
634 .page_id(),
635 };
636
637 let mut data = Vec::with_capacity(2048);
638 data.extend_from_slice(NATIVE_CATALOG_MAGIC);
639 data.extend_from_slice(&summary.collection_count.to_le_bytes());
640 data.extend_from_slice(&summary.total_entities.to_le_bytes());
641 data.push(u8::from(summary.collections_complete));
642 data.extend_from_slice(&summary.omitted_collection_count.to_le_bytes());
643 data.extend_from_slice(&(summary.collections.len() as u32).to_le_bytes());
644 for collection in &summary.collections {
645 push_native_string(&mut data, &collection.name);
646 data.extend_from_slice(&collection.entities.to_le_bytes());
647 data.extend_from_slice(&collection.cross_refs.to_le_bytes());
648 data.extend_from_slice(&collection.segments.to_le_bytes());
649 }
650
651 let checksum = crate::storage::engine::crc32(&data) as u64;
652 let mut page = crate::storage::engine::Page::new(
653 crate::storage::engine::PageType::NativeMeta,
654 page_id,
655 );
656 let bytes = page.as_bytes_mut();
657 let content_start = crate::storage::engine::HEADER_SIZE;
658 let copy_len = data.len().min(bytes.len() - content_start);
659 bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
660 pager
661 .write_page(page_id, page)
662 .map_err(|err| StoreError::Serialization(err.to_string()))?;
663 Ok((page_id, checksum))
664 }
665
666 pub fn read_native_catalog_summary(
668 &self,
669 page_id: u32,
670 ) -> Result<NativeCatalogSummary, StoreError> {
671 let Some(pager) = &self.pager else {
672 return Err(StoreError::Serialization(
673 "native catalog summary requires paged mode".to_string(),
674 ));
675 };
676 if page_id == 0 {
677 return Err(StoreError::Serialization(
678 "native catalog summary page is not set".to_string(),
679 ));
680 }
681
682 let page = pager
683 .read_page(page_id)
684 .map_err(|err| StoreError::Serialization(err.to_string()))?;
685 let bytes = page.as_bytes();
686 let content = &bytes[crate::storage::engine::HEADER_SIZE..];
687 if content.len() < 25 || &content[0..4] != NATIVE_CATALOG_MAGIC {
688 return Err(StoreError::Serialization(
689 "invalid native catalog summary page".to_string(),
690 ));
691 }
692
693 let collection_count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
694 let total_entities = u64::from_le_bytes([
695 content[8],
696 content[9],
697 content[10],
698 content[11],
699 content[12],
700 content[13],
701 content[14],
702 content[15],
703 ]);
704 let collections_complete = content[16] == 1;
705 let omitted_collection_count =
706 u32::from_le_bytes([content[17], content[18], content[19], content[20]]);
707 let sample_count =
708 u32::from_le_bytes([content[21], content[22], content[23], content[24]]) as usize;
709
710 let mut pos = 25usize;
711 let mut collections = Vec::with_capacity(sample_count);
712 for _ in 0..sample_count {
713 let name = read_native_string(content, &mut pos)?;
714 if pos + 20 > content.len() {
715 break;
716 }
717 let entities = u64::from_le_bytes([
718 content[pos],
719 content[pos + 1],
720 content[pos + 2],
721 content[pos + 3],
722 content[pos + 4],
723 content[pos + 5],
724 content[pos + 6],
725 content[pos + 7],
726 ]);
727 pos += 8;
728 let cross_refs = u64::from_le_bytes([
729 content[pos],
730 content[pos + 1],
731 content[pos + 2],
732 content[pos + 3],
733 content[pos + 4],
734 content[pos + 5],
735 content[pos + 6],
736 content[pos + 7],
737 ]);
738 pos += 8;
739 let segments = u32::from_le_bytes([
740 content[pos],
741 content[pos + 1],
742 content[pos + 2],
743 content[pos + 3],
744 ]);
745 pos += 4;
746 collections.push(NativeCatalogCollectionSummary {
747 name,
748 entities,
749 cross_refs,
750 segments,
751 });
752 }
753
754 Ok(NativeCatalogSummary {
755 collection_count,
756 total_entities,
757 collections_complete,
758 omitted_collection_count,
759 collections,
760 })
761 }
762
763 pub fn write_native_metadata_state_summary(
765 &self,
766 summary: &NativeMetadataStateSummary,
767 existing_page: Option<u32>,
768 ) -> Result<(u32, u64), StoreError> {
769 let Some(pager) = &self.pager else {
770 return Ok((0, 0));
771 };
772
773 let page_id = match existing_page.filter(|page| *page != 0) {
774 Some(page) => page,
775 None => pager
776 .allocate_page(crate::storage::engine::PageType::NativeMeta)
777 .map_err(|err| StoreError::Serialization(err.to_string()))?
778 .page_id(),
779 };
780
781 let mut data = Vec::with_capacity(512);
782 data.extend_from_slice(NATIVE_METADATA_STATE_MAGIC);
783 push_native_string(&mut data, &summary.protocol_version);
784 data.extend_from_slice(&summary.generated_at_unix_ms.to_le_bytes());
785 match &summary.last_loaded_from {
786 Some(value) => {
787 data.push(1);
788 push_native_string(&mut data, value);
789 }
790 None => data.push(0),
791 }
792 match summary.last_healed_at_unix_ms {
793 Some(value) => {
794 data.push(1);
795 data.extend_from_slice(&value.to_le_bytes());
796 }
797 None => data.push(0),
798 }
799
800 let checksum = crate::storage::engine::crc32(&data) as u64;
801 let mut page = crate::storage::engine::Page::new(
802 crate::storage::engine::PageType::NativeMeta,
803 page_id,
804 );
805 let bytes = page.as_bytes_mut();
806 let content_start = crate::storage::engine::HEADER_SIZE;
807 let copy_len = data.len().min(bytes.len() - content_start);
808 bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
809 pager
810 .write_page(page_id, page)
811 .map_err(|err| StoreError::Serialization(err.to_string()))?;
812 Ok((page_id, checksum))
813 }
814
815 pub fn read_native_metadata_state_summary(
817 &self,
818 page_id: u32,
819 ) -> Result<NativeMetadataStateSummary, StoreError> {
820 let Some(pager) = &self.pager else {
821 return Err(StoreError::Serialization(
822 "native metadata state summary requires paged mode".to_string(),
823 ));
824 };
825 if page_id == 0 {
826 return Err(StoreError::Serialization(
827 "native metadata state page is not set".to_string(),
828 ));
829 }
830
831 let page = pager
832 .read_page(page_id)
833 .map_err(|err| StoreError::Serialization(err.to_string()))?;
834 let bytes = page.as_bytes();
835 let content = &bytes[crate::storage::engine::HEADER_SIZE..];
836 if content.len() < 22 || &content[0..4] != NATIVE_METADATA_STATE_MAGIC {
837 return Err(StoreError::Serialization(
838 "invalid native metadata state page".to_string(),
839 ));
840 }
841
842 let mut pos = 4usize;
843 let protocol_version = read_native_string(content, &mut pos)?;
844 if pos + 16 > content.len() {
845 return Err(StoreError::Serialization(
846 "truncated native metadata state timestamp".to_string(),
847 ));
848 }
849 let mut generated_bytes = [0u8; 16];
850 generated_bytes.copy_from_slice(&content[pos..pos + 16]);
851 pos += 16;
852 let last_loaded_from = match content.get(pos).copied() {
853 Some(1) => {
854 pos += 1;
855 Some(read_native_string(content, &mut pos)?)
856 }
857 Some(_) => {
858 pos += 1;
859 None
860 }
861 None => None,
862 };
863 let last_healed_at_unix_ms = match content.get(pos).copied() {
864 Some(1) => {
865 pos += 1;
866 if pos + 16 > content.len() {
867 return Err(StoreError::Serialization(
868 "truncated native metadata heal timestamp".to_string(),
869 ));
870 }
871 let mut healed_bytes = [0u8; 16];
872 healed_bytes.copy_from_slice(&content[pos..pos + 16]);
873 Some(u128::from_le_bytes(healed_bytes))
874 }
875 Some(_) => None,
876 None => None,
877 };
878
879 Ok(NativeMetadataStateSummary {
880 protocol_version,
881 generated_at_unix_ms: u128::from_le_bytes(generated_bytes),
882 last_loaded_from,
883 last_healed_at_unix_ms,
884 })
885 }
886}