Skip to main content

reddb_server/storage/unified/store/
impl_native_c.rs

1use super::*;
2
3impl UnifiedStore {
4    pub fn read_native_physical_state(&self) -> Result<NativePhysicalState, StoreError> {
5        let header = self.physical_file_header().ok_or_else(|| {
6            StoreError::Serialization("native physical header is not available".to_string())
7        })?;
8
9        let collection_roots = self.read_native_collection_roots(header.collection_roots_page)?;
10        let manifest = if header.manifest_page != 0 {
11            self.read_native_manifest_summary(header.manifest_page).ok()
12        } else {
13            None
14        };
15        let registry = if header.registry_page != 0 {
16            self.read_native_registry_summary(header.registry_page).ok()
17        } else {
18            None
19        };
20        let recovery = if header.recovery_page != 0 {
21            self.read_native_recovery_summary(header.recovery_page).ok()
22        } else {
23            None
24        };
25        let catalog = if header.catalog_page != 0 {
26            self.read_native_catalog_summary(header.catalog_page).ok()
27        } else {
28            None
29        };
30        let metadata_state = if header.metadata_state_page != 0 {
31            self.read_native_metadata_state_summary(header.metadata_state_page)
32                .ok()
33        } else {
34            None
35        };
36        let vector_artifact_pages = if header.vector_artifact_page != 0 {
37            self.read_native_vector_artifact_store(header.vector_artifact_page)
38                .ok()
39        } else {
40            None
41        };
42
43        Ok(NativePhysicalState {
44            header,
45            collection_roots,
46            manifest,
47            registry,
48            recovery,
49            catalog,
50            metadata_state,
51            vector_artifact_pages,
52        })
53    }
54
55    fn read_native_blob_chain_page_ids(&self, root_page: u32) -> Result<Vec<u32>, StoreError> {
56        let Some(pager) = &self.pager else {
57            return Err(StoreError::Serialization(
58                "native blob chain requires paged mode".to_string(),
59            ));
60        };
61        if root_page == 0 {
62            return Ok(Vec::new());
63        }
64        let mut pages = Vec::new();
65        let mut current = root_page;
66        while current != 0 {
67            pages.push(current);
68            let page = pager
69                .read_page(current)
70                .map_err(|err| StoreError::Serialization(err.to_string()))?;
71            let bytes = page.as_bytes();
72            let content = &bytes[crate::storage::engine::HEADER_SIZE..];
73            if content.len() < 12 || &content[0..4] != NATIVE_BLOB_MAGIC {
74                return Err(StoreError::Serialization(
75                    "invalid native blob page".to_string(),
76                ));
77            }
78            current = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
79        }
80        Ok(pages)
81    }
82
83    fn write_native_blob_chain(
84        &self,
85        payload: &[u8],
86        existing_root: Option<u32>,
87    ) -> Result<(u32, u32, u64), StoreError> {
88        let Some(pager) = &self.pager else {
89            return Ok((0, 0, 0));
90        };
91        if payload.is_empty() {
92            return Ok((0, 0, 0));
93        }
94
95        let chunk_capacity =
96            crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE - 12;
97        let page_count = payload.len().div_ceil(chunk_capacity) as u32;
98        let mut page_ids = existing_root
99            .map(|root| self.read_native_blob_chain_page_ids(root))
100            .transpose()?
101            .unwrap_or_default();
102        while page_ids.len() < page_count as usize {
103            page_ids.push(
104                pager
105                    .allocate_page(crate::storage::engine::PageType::NativeMeta)
106                    .map_err(|err| StoreError::Serialization(err.to_string()))?
107                    .page_id(),
108            );
109        }
110
111        for (index, chunk) in payload.chunks(chunk_capacity).enumerate() {
112            let page_id = page_ids[index];
113            let next_page = page_ids.get(index + 1).copied().unwrap_or(0);
114            let mut data = Vec::with_capacity(chunk.len() + 12);
115            data.extend_from_slice(NATIVE_BLOB_MAGIC);
116            data.extend_from_slice(&next_page.to_le_bytes());
117            data.extend_from_slice(&(chunk.len() as u32).to_le_bytes());
118            data.extend_from_slice(chunk);
119
120            let mut page = crate::storage::engine::Page::new(
121                crate::storage::engine::PageType::NativeMeta,
122                page_id,
123            );
124            let bytes = page.as_bytes_mut();
125            let content_start = crate::storage::engine::HEADER_SIZE;
126            let copy_len = data.len().min(bytes.len() - content_start);
127            bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
128            pager
129                .write_page(page_id, page)
130                .map_err(|err| StoreError::Serialization(err.to_string()))?;
131        }
132
133        Ok((
134            page_ids[0],
135            page_count,
136            crate::storage::engine::crc32(payload) as u64,
137        ))
138    }
139
140    pub fn read_native_blob_chain(&self, root_page: u32) -> Result<Vec<u8>, StoreError> {
141        let Some(pager) = &self.pager else {
142            return Err(StoreError::Serialization(
143                "native blob chain requires paged mode".to_string(),
144            ));
145        };
146        if root_page == 0 {
147            return Ok(Vec::new());
148        }
149        let mut current = root_page;
150        let mut payload = Vec::new();
151        while current != 0 {
152            let page = pager
153                .read_page(current)
154                .map_err(|err| StoreError::Serialization(err.to_string()))?;
155            let bytes = page.as_bytes();
156            let content = &bytes[crate::storage::engine::HEADER_SIZE..];
157            if content.len() < 12 || &content[0..4] != NATIVE_BLOB_MAGIC {
158                return Err(StoreError::Serialization(
159                    "invalid native blob page".to_string(),
160                ));
161            }
162            let next_page = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
163            let chunk_len =
164                u32::from_le_bytes([content[8], content[9], content[10], content[11]]) as usize;
165            if 12 + chunk_len > content.len() {
166                return Err(StoreError::Serialization(
167                    "truncated native blob page".to_string(),
168                ));
169            }
170            payload.extend_from_slice(&content[12..12 + chunk_len]);
171            current = next_page;
172        }
173        Ok(payload)
174    }
175
176    pub fn write_native_vector_artifact_store(
177        &self,
178        artifacts: &[(String, String, Vec<u8>)],
179        existing_page: Option<u32>,
180    ) -> Result<(u32, u64, Vec<NativeVectorArtifactPageSummary>), StoreError> {
181        let Some(pager) = &self.pager else {
182            return Ok((0, 0, Vec::new()));
183        };
184
185        let existing = existing_page
186            .map(|page| self.read_native_vector_artifact_store(page))
187            .transpose()?
188            .unwrap_or_default();
189        let page_id = match existing_page.filter(|page| *page != 0) {
190            Some(page) => page,
191            None => pager
192                .allocate_page(crate::storage::engine::PageType::NativeMeta)
193                .map_err(|err| StoreError::Serialization(err.to_string()))?
194                .page_id(),
195        };
196
197        let mut summaries = Vec::new();
198        for (collection, artifact_kind, bytes) in artifacts {
199            let existing_root = existing
200                .iter()
201                .find(|entry| {
202                    entry.collection == *collection && entry.artifact_kind == *artifact_kind
203                })
204                .map(|entry| entry.root_page);
205            let (root_page, page_count, checksum) =
206                self.write_native_blob_chain(bytes, existing_root)?;
207            summaries.push(NativeVectorArtifactPageSummary {
208                collection: collection.clone(),
209                artifact_kind: artifact_kind.clone(),
210                root_page,
211                page_count,
212                byte_len: bytes.len() as u64,
213                checksum,
214            });
215        }
216
217        let mut data = Vec::with_capacity(1024 + summaries.len() * 64);
218        data.extend_from_slice(NATIVE_VECTOR_ARTIFACT_MAGIC);
219        data.extend_from_slice(&(summaries.len() as u32).to_le_bytes());
220        for summary in &summaries {
221            push_native_string(&mut data, &summary.collection);
222            push_native_string(&mut data, &summary.artifact_kind);
223            data.extend_from_slice(&summary.root_page.to_le_bytes());
224            data.extend_from_slice(&summary.page_count.to_le_bytes());
225            data.extend_from_slice(&summary.byte_len.to_le_bytes());
226            data.extend_from_slice(&summary.checksum.to_le_bytes());
227        }
228
229        let checksum = crate::storage::engine::crc32(&data) as u64;
230        let mut page = crate::storage::engine::Page::new(
231            crate::storage::engine::PageType::NativeMeta,
232            page_id,
233        );
234        let bytes = page.as_bytes_mut();
235        let content_start = crate::storage::engine::HEADER_SIZE;
236        let copy_len = data.len().min(bytes.len() - content_start);
237        bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
238        pager
239            .write_page(page_id, page)
240            .map_err(|err| StoreError::Serialization(err.to_string()))?;
241        Ok((page_id, checksum, summaries))
242    }
243
244    pub fn read_native_vector_artifact_store(
245        &self,
246        page_id: u32,
247    ) -> Result<Vec<NativeVectorArtifactPageSummary>, StoreError> {
248        let Some(pager) = &self.pager else {
249            return Err(StoreError::Serialization(
250                "native vector artifact store requires paged mode".to_string(),
251            ));
252        };
253        if page_id == 0 {
254            return Ok(Vec::new());
255        }
256        let page = pager
257            .read_page(page_id)
258            .map_err(|err| StoreError::Serialization(err.to_string()))?;
259        let bytes = page.as_bytes();
260        let content = &bytes[crate::storage::engine::HEADER_SIZE..];
261        if content.len() < 8 || &content[0..4] != NATIVE_VECTOR_ARTIFACT_MAGIC {
262            return Err(StoreError::Serialization(
263                "invalid native vector artifact store page".to_string(),
264            ));
265        }
266        let count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]) as usize;
267        let mut pos = 8usize;
268        let mut summaries = Vec::with_capacity(count);
269        for _ in 0..count {
270            let collection = read_native_string(content, &mut pos)?;
271            let artifact_kind = read_native_string(content, &mut pos)?;
272            if pos + 24 > content.len() {
273                break;
274            }
275            let root_page = u32::from_le_bytes([
276                content[pos],
277                content[pos + 1],
278                content[pos + 2],
279                content[pos + 3],
280            ]);
281            pos += 4;
282            let page_count = u32::from_le_bytes([
283                content[pos],
284                content[pos + 1],
285                content[pos + 2],
286                content[pos + 3],
287            ]);
288            pos += 4;
289            let byte_len = u64::from_le_bytes([
290                content[pos],
291                content[pos + 1],
292                content[pos + 2],
293                content[pos + 3],
294                content[pos + 4],
295                content[pos + 5],
296                content[pos + 6],
297                content[pos + 7],
298            ]);
299            pos += 8;
300            let checksum = u64::from_le_bytes([
301                content[pos],
302                content[pos + 1],
303                content[pos + 2],
304                content[pos + 3],
305                content[pos + 4],
306                content[pos + 5],
307                content[pos + 6],
308                content[pos + 7],
309            ]);
310            pos += 8;
311            summaries.push(NativeVectorArtifactPageSummary {
312                collection,
313                artifact_kind,
314                root_page,
315                page_count,
316                byte_len,
317                checksum,
318            });
319        }
320        Ok(summaries)
321    }
322
323    pub fn read_native_vector_artifact_blob(
324        &self,
325        page_id: u32,
326        collection: &str,
327        artifact_kind: Option<&str>,
328    ) -> Result<Option<(NativeVectorArtifactPageSummary, Vec<u8>)>, StoreError> {
329        let artifact_kind = artifact_kind.unwrap_or("hnsw");
330        let summaries = self.read_native_vector_artifact_store(page_id)?;
331        let Some(summary) = summaries.into_iter().find(|summary| {
332            summary.collection == collection && summary.artifact_kind == artifact_kind
333        }) else {
334            return Ok(None);
335        };
336        let bytes = self.read_native_blob_chain(summary.root_page)?;
337        Ok(Some((summary, bytes)))
338    }
339}