Skip to main content

reddb_server/storage/unified/store/
impl_native_a.rs

1use super::*;
2
3impl UnifiedStore {
4    pub fn update_physical_file_header(
5        &self,
6        physical: PhysicalFileHeader,
7    ) -> Result<(), StoreError> {
8        let Some(pager) = &self.pager else {
9            return Ok(());
10        };
11        pager
12            .update_physical_header(physical)
13            .map_err(|err| StoreError::Serialization(err.to_string()))
14    }
15
16    /// Read the minimal physical header mirrored into page 0 for paged databases.
17    pub fn physical_file_header(&self) -> Option<PhysicalFileHeader> {
18        self.pager
19            .as_ref()
20            .and_then(|pager| pager.physical_header().ok())
21    }
22
23    /// Persist native collection roots into a dedicated page in the paged file.
24    pub fn write_native_collection_roots(
25        &self,
26        roots: &BTreeMap<String, u64>,
27        existing_page: Option<u32>,
28    ) -> Result<(u32, u64), StoreError> {
29        let Some(pager) = &self.pager else {
30            return Ok((0, 0));
31        };
32
33        let page_id = match existing_page.filter(|page| *page != 0) {
34            Some(page) => page,
35            None => pager
36                .allocate_page(crate::storage::engine::PageType::NativeMeta)
37                .map_err(|err| StoreError::Serialization(err.to_string()))?
38                .page_id(),
39        };
40
41        let mut data = Vec::with_capacity(1024);
42        data.extend_from_slice(NATIVE_COLLECTION_ROOTS_MAGIC);
43        data.extend_from_slice(&(roots.len() as u32).to_le_bytes());
44        for (collection, root) in roots {
45            data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
46            data.extend_from_slice(collection.as_bytes());
47            data.extend_from_slice(&root.to_le_bytes());
48        }
49
50        let checksum = crate::storage::engine::crc32(&data) as u64;
51        let mut page = crate::storage::engine::Page::new(
52            crate::storage::engine::PageType::NativeMeta,
53            page_id,
54        );
55        let bytes = page.as_bytes_mut();
56        let content_start = crate::storage::engine::HEADER_SIZE;
57        let copy_len = data.len().min(bytes.len() - content_start);
58        bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
59        pager
60            .write_page(page_id, page)
61            .map_err(|err| StoreError::Serialization(err.to_string()))?;
62        Ok((page_id, checksum))
63    }
64
65    /// Read native collection roots from a dedicated page in the paged file.
66    pub fn read_native_collection_roots(
67        &self,
68        page_id: u32,
69    ) -> Result<BTreeMap<String, u64>, StoreError> {
70        let Some(pager) = &self.pager else {
71            return Ok(BTreeMap::new());
72        };
73        if page_id == 0 {
74            return Ok(BTreeMap::new());
75        }
76
77        let page = pager
78            .read_page(page_id)
79            .map_err(|err| StoreError::Serialization(err.to_string()))?;
80        let bytes = page.as_bytes();
81        let content = &bytes[crate::storage::engine::HEADER_SIZE..];
82        if content.len() < 8 || &content[0..4] != NATIVE_COLLECTION_ROOTS_MAGIC {
83            return Err(StoreError::Serialization(
84                "invalid native collection roots page".to_string(),
85            ));
86        }
87
88        let count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]) as usize;
89        let mut pos = 8usize;
90        let mut roots = BTreeMap::new();
91
92        for _ in 0..count {
93            if pos + 4 > content.len() {
94                break;
95            }
96            let name_len = u32::from_le_bytes([
97                content[pos],
98                content[pos + 1],
99                content[pos + 2],
100                content[pos + 3],
101            ]) as usize;
102            pos += 4;
103            if pos + name_len + 8 > content.len() {
104                break;
105            }
106            let name = String::from_utf8(content[pos..pos + name_len].to_vec())
107                .map_err(|err| StoreError::Serialization(err.to_string()))?;
108            pos += name_len;
109            let root = u64::from_le_bytes([
110                content[pos],
111                content[pos + 1],
112                content[pos + 2],
113                content[pos + 3],
114                content[pos + 4],
115                content[pos + 5],
116                content[pos + 6],
117                content[pos + 7],
118            ]);
119            pos += 8;
120            roots.insert(name, root);
121        }
122
123        Ok(roots)
124    }
125
126    /// Persist a compact native manifest summary into a dedicated page in the paged file.
127    pub fn write_native_manifest_summary(
128        &self,
129        sequence: u64,
130        events: &[ManifestEvent],
131        existing_page: Option<u32>,
132    ) -> Result<(u32, u64), StoreError> {
133        let Some(pager) = &self.pager else {
134            return Ok((0, 0));
135        };
136
137        let page_id = match existing_page.filter(|page| *page != 0) {
138            Some(page) => page,
139            None => pager
140                .allocate_page(crate::storage::engine::PageType::NativeMeta)
141                .map_err(|err| StoreError::Serialization(err.to_string()))?
142                .page_id(),
143        };
144
145        let sample_start = events.len().saturating_sub(NATIVE_MANIFEST_SAMPLE_LIMIT);
146        let sample = &events[sample_start..];
147
148        let mut data = Vec::with_capacity(1024);
149        data.extend_from_slice(NATIVE_MANIFEST_MAGIC);
150        data.extend_from_slice(&sequence.to_le_bytes());
151        data.extend_from_slice(&(events.len() as u32).to_le_bytes());
152        data.push(u8::from(events.len() <= NATIVE_MANIFEST_SAMPLE_LIMIT));
153        data.extend_from_slice(&(events.len().saturating_sub(sample.len()) as u32).to_le_bytes());
154        data.extend_from_slice(&(sample.len() as u32).to_le_bytes());
155        for event in sample {
156            data.push(native_manifest_kind_to_byte(event.kind));
157            data.extend_from_slice(&(event.collection.len() as u16).to_le_bytes());
158            data.extend_from_slice(event.collection.as_bytes());
159            data.extend_from_slice(&(event.object_key.len() as u16).to_le_bytes());
160            data.extend_from_slice(event.object_key.as_bytes());
161            data.extend_from_slice(&event.block.index.to_le_bytes());
162            data.extend_from_slice(&event.block.checksum.to_le_bytes());
163            data.extend_from_slice(&event.snapshot_min.to_le_bytes());
164            match event.snapshot_max {
165                Some(value) => {
166                    data.push(1);
167                    data.extend_from_slice(&value.to_le_bytes());
168                }
169                None => data.push(0),
170            }
171        }
172
173        let checksum = crate::storage::engine::crc32(&data) as u64;
174        let mut page = crate::storage::engine::Page::new(
175            crate::storage::engine::PageType::NativeMeta,
176            page_id,
177        );
178        let bytes = page.as_bytes_mut();
179        let content_start = crate::storage::engine::HEADER_SIZE;
180        let copy_len = data.len().min(bytes.len() - content_start);
181        bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
182        pager
183            .write_page(page_id, page)
184            .map_err(|err| StoreError::Serialization(err.to_string()))?;
185        Ok((page_id, checksum))
186    }
187
188    /// Read a compact native manifest summary from a dedicated page in the paged file.
189    pub fn read_native_manifest_summary(
190        &self,
191        page_id: u32,
192    ) -> Result<NativeManifestSummary, StoreError> {
193        let Some(pager) = &self.pager else {
194            return Err(StoreError::Serialization(
195                "native manifest summary requires paged mode".to_string(),
196            ));
197        };
198        if page_id == 0 {
199            return Err(StoreError::Serialization(
200                "native manifest summary page is not set".to_string(),
201            ));
202        }
203
204        let page = pager
205            .read_page(page_id)
206            .map_err(|err| StoreError::Serialization(err.to_string()))?;
207        let bytes = page.as_bytes();
208        let content = &bytes[crate::storage::engine::HEADER_SIZE..];
209        if content.len() < 25 || &content[0..4] != NATIVE_MANIFEST_MAGIC {
210            return Err(StoreError::Serialization(
211                "invalid native manifest summary page".to_string(),
212            ));
213        }
214
215        let sequence = u64::from_le_bytes([
216            content[4],
217            content[5],
218            content[6],
219            content[7],
220            content[8],
221            content[9],
222            content[10],
223            content[11],
224        ]);
225        let event_count = u32::from_le_bytes([content[12], content[13], content[14], content[15]]);
226        let events_complete = content[16] == 1;
227        let omitted_event_count =
228            u32::from_le_bytes([content[17], content[18], content[19], content[20]]);
229        let sample_count =
230            u32::from_le_bytes([content[21], content[22], content[23], content[24]]) as usize;
231
232        let mut pos = 25usize;
233        let mut recent_events = Vec::with_capacity(sample_count);
234        for _ in 0..sample_count {
235            if pos + 1 + 2 > content.len() {
236                break;
237            }
238            let kind = native_manifest_kind_from_byte(content[pos]).to_string();
239            pos += 1;
240            let collection_len = u16::from_le_bytes([content[pos], content[pos + 1]]) as usize;
241            pos += 2;
242            if pos + collection_len + 2 > content.len() {
243                break;
244            }
245            let collection = String::from_utf8(content[pos..pos + collection_len].to_vec())
246                .map_err(|err| StoreError::Serialization(err.to_string()))?;
247            pos += collection_len;
248            let object_key_len = u16::from_le_bytes([content[pos], content[pos + 1]]) as usize;
249            pos += 2;
250            if pos + object_key_len + 8 + 16 + 8 + 1 > content.len() {
251                break;
252            }
253            let object_key = String::from_utf8(content[pos..pos + object_key_len].to_vec())
254                .map_err(|err| StoreError::Serialization(err.to_string()))?;
255            pos += object_key_len;
256            let block_index = u64::from_le_bytes([
257                content[pos],
258                content[pos + 1],
259                content[pos + 2],
260                content[pos + 3],
261                content[pos + 4],
262                content[pos + 5],
263                content[pos + 6],
264                content[pos + 7],
265            ]);
266            pos += 8;
267            let mut checksum_bytes = [0u8; 16];
268            checksum_bytes.copy_from_slice(&content[pos..pos + 16]);
269            pos += 16;
270            let snapshot_min = u64::from_le_bytes([
271                content[pos],
272                content[pos + 1],
273                content[pos + 2],
274                content[pos + 3],
275                content[pos + 4],
276                content[pos + 5],
277                content[pos + 6],
278                content[pos + 7],
279            ]);
280            pos += 8;
281            let snapshot_max = match content.get(pos).copied() {
282                Some(1) => {
283                    pos += 1;
284                    if pos + 8 > content.len() {
285                        return Err(StoreError::Serialization(
286                            "truncated native manifest snapshot_max".to_string(),
287                        ));
288                    }
289                    let value = u64::from_le_bytes([
290                        content[pos],
291                        content[pos + 1],
292                        content[pos + 2],
293                        content[pos + 3],
294                        content[pos + 4],
295                        content[pos + 5],
296                        content[pos + 6],
297                        content[pos + 7],
298                    ]);
299                    pos += 8;
300                    Some(value)
301                }
302                Some(_) => {
303                    pos += 1;
304                    None
305                }
306                None => None,
307            };
308
309            recent_events.push(NativeManifestEntrySummary {
310                collection,
311                object_key,
312                kind,
313                block_index,
314                block_checksum: u128::from_le_bytes(checksum_bytes),
315                snapshot_min,
316                snapshot_max,
317            });
318        }
319
320        Ok(NativeManifestSummary {
321            sequence,
322            event_count,
323            events_complete,
324            omitted_event_count,
325            recent_events,
326        })
327    }
328
329    /// Persist a compact native operational registry summary into a dedicated page.
330    pub fn write_native_registry_summary(
331        &self,
332        summary: &NativeRegistrySummary,
333        existing_page: Option<u32>,
334    ) -> Result<(u32, u64), StoreError> {
335        let Some(pager) = &self.pager else {
336            return Ok((0, 0));
337        };
338
339        let page_id = match existing_page.filter(|page| *page != 0) {
340            Some(page) => page,
341            None => pager
342                .allocate_page(crate::storage::engine::PageType::NativeMeta)
343                .map_err(|err| StoreError::Serialization(err.to_string()))?
344                .page_id(),
345        };
346
347        let mut data = Vec::with_capacity(2048);
348        data.extend_from_slice(NATIVE_REGISTRY_MAGIC);
349        data.extend_from_slice(&summary.collection_count.to_le_bytes());
350        data.extend_from_slice(&summary.index_count.to_le_bytes());
351        data.extend_from_slice(&summary.graph_projection_count.to_le_bytes());
352        data.extend_from_slice(&summary.analytics_job_count.to_le_bytes());
353        data.extend_from_slice(&summary.vector_artifact_count.to_le_bytes());
354        data.push(u8::from(summary.collections_complete));
355        data.push(u8::from(summary.indexes_complete));
356        data.push(u8::from(summary.graph_projections_complete));
357        data.push(u8::from(summary.analytics_jobs_complete));
358        data.push(u8::from(summary.vector_artifacts_complete));
359        data.extend_from_slice(&summary.omitted_collection_count.to_le_bytes());
360        data.extend_from_slice(&summary.omitted_index_count.to_le_bytes());
361        data.extend_from_slice(&summary.omitted_graph_projection_count.to_le_bytes());
362        data.extend_from_slice(&summary.omitted_analytics_job_count.to_le_bytes());
363        data.extend_from_slice(&summary.omitted_vector_artifact_count.to_le_bytes());
364        data.extend_from_slice(&(summary.collection_names.len() as u32).to_le_bytes());
365        data.extend_from_slice(&(summary.indexes.len() as u32).to_le_bytes());
366        data.extend_from_slice(&(summary.graph_projections.len() as u32).to_le_bytes());
367        data.extend_from_slice(&(summary.analytics_jobs.len() as u32).to_le_bytes());
368        data.extend_from_slice(&(summary.vector_artifacts.len() as u32).to_le_bytes());
369
370        for name in &summary.collection_names {
371            push_native_string(&mut data, name);
372        }
373        for index in &summary.indexes {
374            push_native_string(&mut data, &index.name);
375            push_native_string(&mut data, &index.kind);
376            match &index.collection {
377                Some(collection) => {
378                    data.push(1);
379                    push_native_string(&mut data, collection);
380                }
381                None => data.push(0),
382            }
383            data.push(u8::from(index.enabled));
384            data.extend_from_slice(&index.entries.to_le_bytes());
385            data.extend_from_slice(&index.estimated_memory_bytes.to_le_bytes());
386            match index.last_refresh_ms {
387                Some(value) => {
388                    data.push(1);
389                    data.extend_from_slice(&value.to_le_bytes());
390                }
391                None => data.push(0),
392            }
393            push_native_string(&mut data, &index.backend);
394        }
395        for projection in &summary.graph_projections {
396            push_native_string(&mut data, &projection.name);
397            push_native_string(&mut data, &projection.source);
398            data.extend_from_slice(&projection.created_at_unix_ms.to_le_bytes());
399            data.extend_from_slice(&projection.updated_at_unix_ms.to_le_bytes());
400            push_native_string_list(&mut data, &projection.node_labels);
401            push_native_string_list(&mut data, &projection.node_types);
402            push_native_string_list(&mut data, &projection.edge_labels);
403            match projection.last_materialized_sequence {
404                Some(value) => {
405                    data.push(1);
406                    data.extend_from_slice(&value.to_le_bytes());
407                }
408                None => data.push(0),
409            }
410        }
411        for job in &summary.analytics_jobs {
412            push_native_string(&mut data, &job.id);
413            push_native_string(&mut data, &job.kind);
414            match &job.projection {
415                Some(projection) => {
416                    data.push(1);
417                    push_native_string(&mut data, projection);
418                }
419                None => data.push(0),
420            }
421            push_native_string(&mut data, &job.state);
422            data.extend_from_slice(&job.created_at_unix_ms.to_le_bytes());
423            data.extend_from_slice(&job.updated_at_unix_ms.to_le_bytes());
424            match job.last_run_sequence {
425                Some(value) => {
426                    data.push(1);
427                    data.extend_from_slice(&value.to_le_bytes());
428                }
429                None => data.push(0),
430            }
431            let metadata_count = job.metadata.len().min(u16::MAX as usize) as u16;
432            data.extend_from_slice(&metadata_count.to_le_bytes());
433            for (key, value) in job.metadata.iter().take(metadata_count as usize) {
434                push_native_string(&mut data, key);
435                push_native_string(&mut data, value);
436            }
437        }
438        for artifact in &summary.vector_artifacts {
439            push_native_string(&mut data, &artifact.collection);
440            push_native_string(&mut data, &artifact.artifact_kind);
441            data.extend_from_slice(&artifact.vector_count.to_le_bytes());
442            data.extend_from_slice(&artifact.dimension.to_le_bytes());
443            data.extend_from_slice(&artifact.max_layer.to_le_bytes());
444            data.extend_from_slice(&artifact.serialized_bytes.to_le_bytes());
445            data.extend_from_slice(&artifact.checksum.to_le_bytes());
446        }
447
448        let checksum = crate::storage::engine::crc32(&data) as u64;
449        let mut page = crate::storage::engine::Page::new(
450            crate::storage::engine::PageType::NativeMeta,
451            page_id,
452        );
453        let bytes = page.as_bytes_mut();
454        let content_start = crate::storage::engine::HEADER_SIZE;
455        let copy_len = data.len().min(bytes.len() - content_start);
456        bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
457        pager
458            .write_page(page_id, page)
459            .map_err(|err| StoreError::Serialization(err.to_string()))?;
460        Ok((page_id, checksum))
461    }
462}