Skip to main content

reddb_server/storage/unified/store/
impl_native_b.rs

1use super::*;
2
3impl UnifiedStore {
4    pub fn read_native_registry_summary(
5        &self,
6        page_id: u32,
7    ) -> Result<NativeRegistrySummary, StoreError> {
8        let Some(pager) = &self.pager else {
9            return Err(StoreError::Serialization(
10                "native registry summary requires paged mode".to_string(),
11            ));
12        };
13        if page_id == 0 {
14            return Err(StoreError::Serialization(
15                "native registry summary page is not set".to_string(),
16            ));
17        }
18
19        let page = pager
20            .read_page(page_id)
21            .map_err(|err| StoreError::Serialization(err.to_string()))?;
22        let bytes = page.as_bytes();
23        let content = &bytes[crate::storage::engine::HEADER_SIZE..];
24        if content.len() < 77 || &content[0..4] != NATIVE_REGISTRY_MAGIC {
25            return Err(StoreError::Serialization(
26                "invalid native registry summary page".to_string(),
27            ));
28        }
29
30        let collection_count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
31        let index_count = u32::from_le_bytes([content[8], content[9], content[10], content[11]]);
32        let graph_projection_count =
33            u32::from_le_bytes([content[12], content[13], content[14], content[15]]);
34        let analytics_job_count =
35            u32::from_le_bytes([content[16], content[17], content[18], content[19]]);
36        let vector_artifact_count =
37            u32::from_le_bytes([content[20], content[21], content[22], content[23]]);
38        let collections_complete = content[24] == 1;
39        let indexes_complete = content[25] == 1;
40        let graph_projections_complete = content[26] == 1;
41        let analytics_jobs_complete = content[27] == 1;
42        let vector_artifacts_complete = content[28] == 1;
43        let omitted_collection_count =
44            u32::from_le_bytes([content[29], content[30], content[31], content[32]]);
45        let omitted_index_count =
46            u32::from_le_bytes([content[33], content[34], content[35], content[36]]);
47        let omitted_graph_projection_count =
48            u32::from_le_bytes([content[37], content[38], content[39], content[40]]);
49        let omitted_analytics_job_count =
50            u32::from_le_bytes([content[41], content[42], content[43], content[44]]);
51        let omitted_vector_artifact_count =
52            u32::from_le_bytes([content[45], content[46], content[47], content[48]]);
53        let collection_sample_count =
54            u32::from_le_bytes([content[49], content[50], content[51], content[52]]) as usize;
55        let index_sample_count =
56            u32::from_le_bytes([content[53], content[54], content[55], content[56]]) as usize;
57        let projection_sample_count =
58            u32::from_le_bytes([content[57], content[58], content[59], content[60]]) as usize;
59        let job_sample_count =
60            u32::from_le_bytes([content[61], content[62], content[63], content[64]]) as usize;
61        let vector_artifact_sample_count =
62            u32::from_le_bytes([content[65], content[66], content[67], content[68]]) as usize;
63
64        let mut pos = 69usize;
65        let mut collection_names = Vec::with_capacity(collection_sample_count);
66        for _ in 0..collection_sample_count {
67            collection_names.push(read_native_string(content, &mut pos)?);
68        }
69
70        let mut indexes = Vec::with_capacity(index_sample_count);
71        for _ in 0..index_sample_count {
72            let name = read_native_string(content, &mut pos)?;
73            let kind = read_native_string(content, &mut pos)?;
74            let collection = match content.get(pos).copied() {
75                Some(1) => {
76                    pos += 1;
77                    Some(read_native_string(content, &mut pos)?)
78                }
79                Some(_) => {
80                    pos += 1;
81                    None
82                }
83                None => None,
84            };
85            let enabled = content.get(pos).copied().unwrap_or(0) == 1;
86            pos = pos.saturating_add(1);
87            if pos + 16 > content.len() {
88                break;
89            }
90            let entries = u64::from_le_bytes([
91                content[pos],
92                content[pos + 1],
93                content[pos + 2],
94                content[pos + 3],
95                content[pos + 4],
96                content[pos + 5],
97                content[pos + 6],
98                content[pos + 7],
99            ]);
100            pos += 8;
101            let estimated_memory_bytes = u64::from_le_bytes([
102                content[pos],
103                content[pos + 1],
104                content[pos + 2],
105                content[pos + 3],
106                content[pos + 4],
107                content[pos + 5],
108                content[pos + 6],
109                content[pos + 7],
110            ]);
111            pos += 8;
112            let last_refresh_ms = match content.get(pos).copied() {
113                Some(1) => {
114                    pos += 1;
115                    if pos + 16 > content.len() {
116                        return Err(StoreError::Serialization(
117                            "truncated native registry refresh timestamp".to_string(),
118                        ));
119                    }
120                    let mut bytes = [0u8; 16];
121                    bytes.copy_from_slice(&content[pos..pos + 16]);
122                    pos += 16;
123                    Some(u128::from_le_bytes(bytes))
124                }
125                Some(_) => {
126                    pos += 1;
127                    None
128                }
129                None => None,
130            };
131            let backend = read_native_string(content, &mut pos)?;
132            indexes.push(NativeRegistryIndexSummary {
133                name,
134                kind,
135                collection,
136                enabled,
137                entries,
138                estimated_memory_bytes,
139                last_refresh_ms,
140                backend,
141            });
142        }
143
144        let mut graph_projections = Vec::with_capacity(projection_sample_count);
145        for _ in 0..projection_sample_count {
146            let name = read_native_string(content, &mut pos)?;
147            let source = read_native_string(content, &mut pos)?;
148            if pos + 32 > content.len() {
149                break;
150            }
151            let mut created_bytes = [0u8; 16];
152            created_bytes.copy_from_slice(&content[pos..pos + 16]);
153            pos += 16;
154            let mut updated_bytes = [0u8; 16];
155            updated_bytes.copy_from_slice(&content[pos..pos + 16]);
156            pos += 16;
157            let node_labels = read_native_string_list(content, &mut pos)?;
158            let node_types = read_native_string_list(content, &mut pos)?;
159            let edge_labels = read_native_string_list(content, &mut pos)?;
160            let last_materialized_sequence = match content.get(pos).copied() {
161                Some(1) => {
162                    pos += 1;
163                    if pos + 8 > content.len() {
164                        return Err(StoreError::Serialization(
165                            "truncated native projection materialization sequence".to_string(),
166                        ));
167                    }
168                    let value = u64::from_le_bytes([
169                        content[pos],
170                        content[pos + 1],
171                        content[pos + 2],
172                        content[pos + 3],
173                        content[pos + 4],
174                        content[pos + 5],
175                        content[pos + 6],
176                        content[pos + 7],
177                    ]);
178                    pos += 8;
179                    Some(value)
180                }
181                Some(_) => {
182                    pos += 1;
183                    None
184                }
185                None => None,
186            };
187            graph_projections.push(NativeRegistryProjectionSummary {
188                name,
189                source,
190                created_at_unix_ms: u128::from_le_bytes(created_bytes),
191                updated_at_unix_ms: u128::from_le_bytes(updated_bytes),
192                node_labels,
193                node_types,
194                edge_labels,
195                last_materialized_sequence,
196            });
197        }
198
199        let mut analytics_jobs = Vec::with_capacity(job_sample_count);
200        for _ in 0..job_sample_count {
201            let id = read_native_string(content, &mut pos)?;
202            let kind = read_native_string(content, &mut pos)?;
203            let projection = match content.get(pos).copied() {
204                Some(1) => {
205                    pos += 1;
206                    Some(read_native_string(content, &mut pos)?)
207                }
208                Some(_) => {
209                    pos += 1;
210                    None
211                }
212                None => None,
213            };
214            let state = read_native_string(content, &mut pos)?;
215            if pos + 32 > content.len() {
216                break;
217            }
218            let mut created_bytes = [0u8; 16];
219            created_bytes.copy_from_slice(&content[pos..pos + 16]);
220            pos += 16;
221            let mut updated_bytes = [0u8; 16];
222            updated_bytes.copy_from_slice(&content[pos..pos + 16]);
223            pos += 16;
224            let last_run_sequence = match content.get(pos).copied() {
225                Some(1) => {
226                    pos += 1;
227                    if pos + 8 > content.len() {
228                        return Err(StoreError::Serialization(
229                            "truncated native analytics job run sequence".to_string(),
230                        ));
231                    }
232                    let value = u64::from_le_bytes([
233                        content[pos],
234                        content[pos + 1],
235                        content[pos + 2],
236                        content[pos + 3],
237                        content[pos + 4],
238                        content[pos + 5],
239                        content[pos + 6],
240                        content[pos + 7],
241                    ]);
242                    pos += 8;
243                    Some(value)
244                }
245                Some(_) => {
246                    pos += 1;
247                    None
248                }
249                None => None,
250            };
251            if pos + 2 > content.len() {
252                return Err(StoreError::Serialization(
253                    "truncated native analytics job metadata count".to_string(),
254                ));
255            }
256            let metadata_count = u16::from_le_bytes([content[pos], content[pos + 1]]) as usize;
257            pos += 2;
258            let mut metadata = BTreeMap::new();
259            for _ in 0..metadata_count {
260                let key = read_native_string(content, &mut pos)?;
261                let value = read_native_string(content, &mut pos)?;
262                metadata.insert(key, value);
263            }
264            analytics_jobs.push(NativeRegistryJobSummary {
265                id,
266                kind,
267                projection,
268                state,
269                created_at_unix_ms: u128::from_le_bytes(created_bytes),
270                updated_at_unix_ms: u128::from_le_bytes(updated_bytes),
271                last_run_sequence,
272                metadata,
273            });
274        }
275        let mut vector_artifacts = Vec::with_capacity(vector_artifact_sample_count);
276        for _ in 0..vector_artifact_sample_count {
277            let collection = read_native_string(content, &mut pos)?;
278            let artifact_kind = read_native_string(content, &mut pos)?;
279            if pos + 32 > content.len() {
280                break;
281            }
282            let vector_count = u64::from_le_bytes([
283                content[pos],
284                content[pos + 1],
285                content[pos + 2],
286                content[pos + 3],
287                content[pos + 4],
288                content[pos + 5],
289                content[pos + 6],
290                content[pos + 7],
291            ]);
292            pos += 8;
293            let dimension = u32::from_le_bytes([
294                content[pos],
295                content[pos + 1],
296                content[pos + 2],
297                content[pos + 3],
298            ]);
299            pos += 4;
300            let max_layer = u32::from_le_bytes([
301                content[pos],
302                content[pos + 1],
303                content[pos + 2],
304                content[pos + 3],
305            ]);
306            pos += 4;
307            let serialized_bytes = u64::from_le_bytes([
308                content[pos],
309                content[pos + 1],
310                content[pos + 2],
311                content[pos + 3],
312                content[pos + 4],
313                content[pos + 5],
314                content[pos + 6],
315                content[pos + 7],
316            ]);
317            pos += 8;
318            let checksum = u64::from_le_bytes([
319                content[pos],
320                content[pos + 1],
321                content[pos + 2],
322                content[pos + 3],
323                content[pos + 4],
324                content[pos + 5],
325                content[pos + 6],
326                content[pos + 7],
327            ]);
328            pos += 8;
329            vector_artifacts.push(NativeVectorArtifactSummary {
330                collection,
331                artifact_kind,
332                vector_count,
333                dimension,
334                max_layer,
335                serialized_bytes,
336                checksum,
337            });
338        }
339
340        Ok(NativeRegistrySummary {
341            collection_count,
342            index_count,
343            graph_projection_count,
344            analytics_job_count,
345            vector_artifact_count,
346            collections_complete,
347            indexes_complete,
348            graph_projections_complete,
349            analytics_jobs_complete,
350            vector_artifacts_complete,
351            omitted_collection_count,
352            omitted_index_count,
353            omitted_graph_projection_count,
354            omitted_analytics_job_count,
355            omitted_vector_artifact_count,
356            collection_names,
357            indexes,
358            graph_projections,
359            analytics_jobs,
360            vector_artifacts,
361        })
362    }
363
364    /// Persist a compact native snapshot/export summary into a dedicated page.
365    pub fn write_native_recovery_summary(
366        &self,
367        summary: &NativeRecoverySummary,
368        existing_page: Option<u32>,
369    ) -> Result<(u32, u64), StoreError> {
370        let Some(pager) = &self.pager else {
371            return Ok((0, 0));
372        };
373
374        let page_id = match existing_page.filter(|page| *page != 0) {
375            Some(page) => page,
376            None => pager
377                .allocate_page(crate::storage::engine::PageType::NativeMeta)
378                .map_err(|err| StoreError::Serialization(err.to_string()))?
379                .page_id(),
380        };
381
382        let mut data = Vec::with_capacity(2048);
383        data.extend_from_slice(NATIVE_RECOVERY_MAGIC);
384        data.extend_from_slice(&summary.snapshot_count.to_le_bytes());
385        data.extend_from_slice(&summary.export_count.to_le_bytes());
386        data.push(u8::from(summary.snapshots_complete));
387        data.push(u8::from(summary.exports_complete));
388        data.extend_from_slice(&summary.omitted_snapshot_count.to_le_bytes());
389        data.extend_from_slice(&summary.omitted_export_count.to_le_bytes());
390        data.extend_from_slice(&(summary.snapshots.len() as u32).to_le_bytes());
391        data.extend_from_slice(&(summary.exports.len() as u32).to_le_bytes());
392
393        for snapshot in &summary.snapshots {
394            data.extend_from_slice(&snapshot.snapshot_id.to_le_bytes());
395            data.extend_from_slice(&snapshot.created_at_unix_ms.to_le_bytes());
396            data.extend_from_slice(&snapshot.superblock_sequence.to_le_bytes());
397            data.extend_from_slice(&snapshot.collection_count.to_le_bytes());
398            data.extend_from_slice(&snapshot.total_entities.to_le_bytes());
399        }
400
401        for export in &summary.exports {
402            push_native_string(&mut data, &export.name);
403            data.extend_from_slice(&export.created_at_unix_ms.to_le_bytes());
404            match export.snapshot_id {
405                Some(snapshot_id) => {
406                    data.push(1);
407                    data.extend_from_slice(&snapshot_id.to_le_bytes());
408                }
409                None => data.push(0),
410            }
411            data.extend_from_slice(&export.superblock_sequence.to_le_bytes());
412            data.extend_from_slice(&export.collection_count.to_le_bytes());
413            data.extend_from_slice(&export.total_entities.to_le_bytes());
414        }
415
416        let checksum = crate::storage::engine::crc32(&data) as u64;
417        let mut page = crate::storage::engine::Page::new(
418            crate::storage::engine::PageType::NativeMeta,
419            page_id,
420        );
421        let bytes = page.as_bytes_mut();
422        let content_start = crate::storage::engine::HEADER_SIZE;
423        let copy_len = data.len().min(bytes.len() - content_start);
424        bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
425        pager
426            .write_page(page_id, page)
427            .map_err(|err| StoreError::Serialization(err.to_string()))?;
428        Ok((page_id, checksum))
429    }
430
431    /// Read a compact native snapshot/export summary from a dedicated page.
432    pub fn read_native_recovery_summary(
433        &self,
434        page_id: u32,
435    ) -> Result<NativeRecoverySummary, StoreError> {
436        let Some(pager) = &self.pager else {
437            return Err(StoreError::Serialization(
438                "native recovery summary requires paged mode".to_string(),
439            ));
440        };
441        if page_id == 0 {
442            return Err(StoreError::Serialization(
443                "native recovery summary page is not set".to_string(),
444            ));
445        }
446
447        let page = pager
448            .read_page(page_id)
449            .map_err(|err| StoreError::Serialization(err.to_string()))?;
450        let bytes = page.as_bytes();
451        let content = &bytes[crate::storage::engine::HEADER_SIZE..];
452        if content.len() < 30 || &content[0..4] != NATIVE_RECOVERY_MAGIC {
453            return Err(StoreError::Serialization(
454                "invalid native recovery summary page".to_string(),
455            ));
456        }
457
458        let snapshot_count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
459        let export_count = u32::from_le_bytes([content[8], content[9], content[10], content[11]]);
460        let snapshots_complete = content[12] == 1;
461        let exports_complete = content[13] == 1;
462        let omitted_snapshot_count =
463            u32::from_le_bytes([content[14], content[15], content[16], content[17]]);
464        let omitted_export_count =
465            u32::from_le_bytes([content[18], content[19], content[20], content[21]]);
466        let snapshot_sample_count =
467            u32::from_le_bytes([content[22], content[23], content[24], content[25]]) as usize;
468        let export_sample_count =
469            u32::from_le_bytes([content[26], content[27], content[28], content[29]]) as usize;
470
471        let mut pos = 30usize;
472        let mut snapshots = Vec::with_capacity(snapshot_sample_count);
473        for _ in 0..snapshot_sample_count {
474            if pos + 44 > content.len() {
475                break;
476            }
477            let snapshot_id = u64::from_le_bytes([
478                content[pos],
479                content[pos + 1],
480                content[pos + 2],
481                content[pos + 3],
482                content[pos + 4],
483                content[pos + 5],
484                content[pos + 6],
485                content[pos + 7],
486            ]);
487            pos += 8;
488            let mut created_bytes = [0u8; 16];
489            created_bytes.copy_from_slice(&content[pos..pos + 16]);
490            pos += 16;
491            let superblock_sequence = u64::from_le_bytes([
492                content[pos],
493                content[pos + 1],
494                content[pos + 2],
495                content[pos + 3],
496                content[pos + 4],
497                content[pos + 5],
498                content[pos + 6],
499                content[pos + 7],
500            ]);
501            pos += 8;
502            let collection_count = u32::from_le_bytes([
503                content[pos],
504                content[pos + 1],
505                content[pos + 2],
506                content[pos + 3],
507            ]);
508            pos += 4;
509            let total_entities = u64::from_le_bytes([
510                content[pos],
511                content[pos + 1],
512                content[pos + 2],
513                content[pos + 3],
514                content[pos + 4],
515                content[pos + 5],
516                content[pos + 6],
517                content[pos + 7],
518            ]);
519            pos += 8;
520            snapshots.push(NativeSnapshotSummary {
521                snapshot_id,
522                created_at_unix_ms: u128::from_le_bytes(created_bytes),
523                superblock_sequence,
524                collection_count,
525                total_entities,
526            });
527        }
528
529        let mut exports = Vec::with_capacity(export_sample_count);
530        for _ in 0..export_sample_count {
531            let name = read_native_string(content, &mut pos)?;
532            if pos + 16 > content.len() {
533                break;
534            }
535            let mut created_bytes = [0u8; 16];
536            created_bytes.copy_from_slice(&content[pos..pos + 16]);
537            pos += 16;
538            let snapshot_id = match content.get(pos).copied() {
539                Some(1) => {
540                    pos += 1;
541                    if pos + 8 > content.len() {
542                        return Err(StoreError::Serialization(
543                            "truncated native export snapshot id".to_string(),
544                        ));
545                    }
546                    let value = u64::from_le_bytes([
547                        content[pos],
548                        content[pos + 1],
549                        content[pos + 2],
550                        content[pos + 3],
551                        content[pos + 4],
552                        content[pos + 5],
553                        content[pos + 6],
554                        content[pos + 7],
555                    ]);
556                    pos += 8;
557                    Some(value)
558                }
559                Some(_) => {
560                    pos += 1;
561                    None
562                }
563                None => None,
564            };
565            if pos + 20 > content.len() {
566                break;
567            }
568            let superblock_sequence = u64::from_le_bytes([
569                content[pos],
570                content[pos + 1],
571                content[pos + 2],
572                content[pos + 3],
573                content[pos + 4],
574                content[pos + 5],
575                content[pos + 6],
576                content[pos + 7],
577            ]);
578            pos += 8;
579            let collection_count = u32::from_le_bytes([
580                content[pos],
581                content[pos + 1],
582                content[pos + 2],
583                content[pos + 3],
584            ]);
585            pos += 4;
586            let total_entities = u64::from_le_bytes([
587                content[pos],
588                content[pos + 1],
589                content[pos + 2],
590                content[pos + 3],
591                content[pos + 4],
592                content[pos + 5],
593                content[pos + 6],
594                content[pos + 7],
595            ]);
596            pos += 8;
597            exports.push(NativeExportSummary {
598                name,
599                created_at_unix_ms: u128::from_le_bytes(created_bytes),
600                snapshot_id,
601                superblock_sequence,
602                collection_count,
603                total_entities,
604            });
605        }
606
607        Ok(NativeRecoverySummary {
608            snapshot_count,
609            export_count,
610            snapshots_complete,
611            exports_complete,
612            omitted_snapshot_count,
613            omitted_export_count,
614            snapshots,
615            exports,
616        })
617    }
618
619    /// Persist a compact native catalog summary into a dedicated page.
620    pub fn write_native_catalog_summary(
621        &self,
622        summary: &NativeCatalogSummary,
623        existing_page: Option<u32>,
624    ) -> Result<(u32, u64), StoreError> {
625        let Some(pager) = &self.pager else {
626            return Ok((0, 0));
627        };
628
629        let page_id = match existing_page.filter(|page| *page != 0) {
630            Some(page) => page,
631            None => pager
632                .allocate_page(crate::storage::engine::PageType::NativeMeta)
633                .map_err(|err| StoreError::Serialization(err.to_string()))?
634                .page_id(),
635        };
636
637        let mut data = Vec::with_capacity(2048);
638        data.extend_from_slice(NATIVE_CATALOG_MAGIC);
639        data.extend_from_slice(&summary.collection_count.to_le_bytes());
640        data.extend_from_slice(&summary.total_entities.to_le_bytes());
641        data.push(u8::from(summary.collections_complete));
642        data.extend_from_slice(&summary.omitted_collection_count.to_le_bytes());
643        data.extend_from_slice(&(summary.collections.len() as u32).to_le_bytes());
644        for collection in &summary.collections {
645            push_native_string(&mut data, &collection.name);
646            data.extend_from_slice(&collection.entities.to_le_bytes());
647            data.extend_from_slice(&collection.cross_refs.to_le_bytes());
648            data.extend_from_slice(&collection.segments.to_le_bytes());
649        }
650
651        let checksum = crate::storage::engine::crc32(&data) as u64;
652        let mut page = crate::storage::engine::Page::new(
653            crate::storage::engine::PageType::NativeMeta,
654            page_id,
655        );
656        let bytes = page.as_bytes_mut();
657        let content_start = crate::storage::engine::HEADER_SIZE;
658        let copy_len = data.len().min(bytes.len() - content_start);
659        bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
660        pager
661            .write_page(page_id, page)
662            .map_err(|err| StoreError::Serialization(err.to_string()))?;
663        Ok((page_id, checksum))
664    }
665
666    /// Read a compact native catalog summary from a dedicated page.
667    pub fn read_native_catalog_summary(
668        &self,
669        page_id: u32,
670    ) -> Result<NativeCatalogSummary, StoreError> {
671        let Some(pager) = &self.pager else {
672            return Err(StoreError::Serialization(
673                "native catalog summary requires paged mode".to_string(),
674            ));
675        };
676        if page_id == 0 {
677            return Err(StoreError::Serialization(
678                "native catalog summary page is not set".to_string(),
679            ));
680        }
681
682        let page = pager
683            .read_page(page_id)
684            .map_err(|err| StoreError::Serialization(err.to_string()))?;
685        let bytes = page.as_bytes();
686        let content = &bytes[crate::storage::engine::HEADER_SIZE..];
687        if content.len() < 25 || &content[0..4] != NATIVE_CATALOG_MAGIC {
688            return Err(StoreError::Serialization(
689                "invalid native catalog summary page".to_string(),
690            ));
691        }
692
693        let collection_count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
694        let total_entities = u64::from_le_bytes([
695            content[8],
696            content[9],
697            content[10],
698            content[11],
699            content[12],
700            content[13],
701            content[14],
702            content[15],
703        ]);
704        let collections_complete = content[16] == 1;
705        let omitted_collection_count =
706            u32::from_le_bytes([content[17], content[18], content[19], content[20]]);
707        let sample_count =
708            u32::from_le_bytes([content[21], content[22], content[23], content[24]]) as usize;
709
710        let mut pos = 25usize;
711        let mut collections = Vec::with_capacity(sample_count);
712        for _ in 0..sample_count {
713            let name = read_native_string(content, &mut pos)?;
714            if pos + 20 > content.len() {
715                break;
716            }
717            let entities = u64::from_le_bytes([
718                content[pos],
719                content[pos + 1],
720                content[pos + 2],
721                content[pos + 3],
722                content[pos + 4],
723                content[pos + 5],
724                content[pos + 6],
725                content[pos + 7],
726            ]);
727            pos += 8;
728            let cross_refs = u64::from_le_bytes([
729                content[pos],
730                content[pos + 1],
731                content[pos + 2],
732                content[pos + 3],
733                content[pos + 4],
734                content[pos + 5],
735                content[pos + 6],
736                content[pos + 7],
737            ]);
738            pos += 8;
739            let segments = u32::from_le_bytes([
740                content[pos],
741                content[pos + 1],
742                content[pos + 2],
743                content[pos + 3],
744            ]);
745            pos += 4;
746            collections.push(NativeCatalogCollectionSummary {
747                name,
748                entities,
749                cross_refs,
750                segments,
751            });
752        }
753
754        Ok(NativeCatalogSummary {
755            collection_count,
756            total_entities,
757            collections_complete,
758            omitted_collection_count,
759            collections,
760        })
761    }
762
763    /// Persist a compact native metadata state summary into a dedicated page.
764    pub fn write_native_metadata_state_summary(
765        &self,
766        summary: &NativeMetadataStateSummary,
767        existing_page: Option<u32>,
768    ) -> Result<(u32, u64), StoreError> {
769        let Some(pager) = &self.pager else {
770            return Ok((0, 0));
771        };
772
773        let page_id = match existing_page.filter(|page| *page != 0) {
774            Some(page) => page,
775            None => pager
776                .allocate_page(crate::storage::engine::PageType::NativeMeta)
777                .map_err(|err| StoreError::Serialization(err.to_string()))?
778                .page_id(),
779        };
780
781        let mut data = Vec::with_capacity(512);
782        data.extend_from_slice(NATIVE_METADATA_STATE_MAGIC);
783        push_native_string(&mut data, &summary.protocol_version);
784        data.extend_from_slice(&summary.generated_at_unix_ms.to_le_bytes());
785        match &summary.last_loaded_from {
786            Some(value) => {
787                data.push(1);
788                push_native_string(&mut data, value);
789            }
790            None => data.push(0),
791        }
792        match summary.last_healed_at_unix_ms {
793            Some(value) => {
794                data.push(1);
795                data.extend_from_slice(&value.to_le_bytes());
796            }
797            None => data.push(0),
798        }
799
800        let checksum = crate::storage::engine::crc32(&data) as u64;
801        let mut page = crate::storage::engine::Page::new(
802            crate::storage::engine::PageType::NativeMeta,
803            page_id,
804        );
805        let bytes = page.as_bytes_mut();
806        let content_start = crate::storage::engine::HEADER_SIZE;
807        let copy_len = data.len().min(bytes.len() - content_start);
808        bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
809        pager
810            .write_page(page_id, page)
811            .map_err(|err| StoreError::Serialization(err.to_string()))?;
812        Ok((page_id, checksum))
813    }
814
815    /// Read a compact native metadata state summary from a dedicated page.
816    pub fn read_native_metadata_state_summary(
817        &self,
818        page_id: u32,
819    ) -> Result<NativeMetadataStateSummary, StoreError> {
820        let Some(pager) = &self.pager else {
821            return Err(StoreError::Serialization(
822                "native metadata state summary requires paged mode".to_string(),
823            ));
824        };
825        if page_id == 0 {
826            return Err(StoreError::Serialization(
827                "native metadata state page is not set".to_string(),
828            ));
829        }
830
831        let page = pager
832            .read_page(page_id)
833            .map_err(|err| StoreError::Serialization(err.to_string()))?;
834        let bytes = page.as_bytes();
835        let content = &bytes[crate::storage::engine::HEADER_SIZE..];
836        if content.len() < 22 || &content[0..4] != NATIVE_METADATA_STATE_MAGIC {
837            return Err(StoreError::Serialization(
838                "invalid native metadata state page".to_string(),
839            ));
840        }
841
842        let mut pos = 4usize;
843        let protocol_version = read_native_string(content, &mut pos)?;
844        if pos + 16 > content.len() {
845            return Err(StoreError::Serialization(
846                "truncated native metadata state timestamp".to_string(),
847            ));
848        }
849        let mut generated_bytes = [0u8; 16];
850        generated_bytes.copy_from_slice(&content[pos..pos + 16]);
851        pos += 16;
852        let last_loaded_from = match content.get(pos).copied() {
853            Some(1) => {
854                pos += 1;
855                Some(read_native_string(content, &mut pos)?)
856            }
857            Some(_) => {
858                pos += 1;
859                None
860            }
861            None => None,
862        };
863        let last_healed_at_unix_ms = match content.get(pos).copied() {
864            Some(1) => {
865                pos += 1;
866                if pos + 16 > content.len() {
867                    return Err(StoreError::Serialization(
868                        "truncated native metadata heal timestamp".to_string(),
869                    ));
870                }
871                let mut healed_bytes = [0u8; 16];
872                healed_bytes.copy_from_slice(&content[pos..pos + 16]);
873                Some(u128::from_le_bytes(healed_bytes))
874            }
875            Some(_) => None,
876            None => None,
877        };
878
879        Ok(NativeMetadataStateSummary {
880            protocol_version,
881            generated_at_unix_ms: u128::from_le_bytes(generated_bytes),
882            last_loaded_from,
883            last_healed_at_unix_ms,
884        })
885    }
886}