Skip to main content

reddb_server/storage/unified/store/
impl_native_c.rs

1use super::*;
2
3impl UnifiedStore {
4    pub fn read_native_physical_state(&self) -> Result<NativePhysicalState, StoreError> {
5        let header = self.physical_file_header().ok_or_else(|| {
6            StoreError::Serialization("native physical header is not available".to_string())
7        })?;
8
9        let collection_roots = self.read_native_collection_roots(header.collection_roots_page)?;
10        let manifest = if header.manifest_page != 0 {
11            self.read_native_manifest_summary(header.manifest_page).ok()
12        } else {
13            None
14        };
15        let registry = if header.registry_page != 0 {
16            self.read_native_registry_summary(header.registry_page).ok()
17        } else {
18            None
19        };
20        let recovery = if header.recovery_page != 0 {
21            self.read_native_recovery_summary(header.recovery_page).ok()
22        } else {
23            None
24        };
25        let catalog = if header.catalog_page != 0 {
26            self.read_native_catalog_summary(header.catalog_page).ok()
27        } else {
28            None
29        };
30        let metadata_state = if header.metadata_state_page != 0 {
31            self.read_native_metadata_state_summary(header.metadata_state_page)
32                .ok()
33        } else {
34            None
35        };
36        let vector_artifact_pages = if header.vector_artifact_page != 0 {
37            self.read_native_vector_artifact_store(header.vector_artifact_page)
38                .ok()
39        } else {
40            None
41        };
42
43        Ok(NativePhysicalState {
44            header,
45            collection_roots,
46            manifest,
47            registry,
48            recovery,
49            catalog,
50            metadata_state,
51            vector_artifact_pages,
52        })
53    }
54
55    fn read_native_blob_chain_page_ids(&self, root_page: u32) -> Result<Vec<u32>, StoreError> {
56        let Some(pager) = &self.pager else {
57            return Err(StoreError::Serialization(
58                "native blob chain requires paged mode".to_string(),
59            ));
60        };
61        if root_page == 0 {
62            return Ok(Vec::new());
63        }
64        let mut pages = Vec::new();
65        let mut current = root_page;
66        while current != 0 {
67            pages.push(current);
68            let page = pager
69                .read_page(current)
70                .map_err(|err| StoreError::Serialization(err.to_string()))?;
71            let bytes = page.as_bytes();
72            let content = &bytes[crate::storage::engine::HEADER_SIZE..];
73            let (next_page, _) = reddb_file::decode_native_blob_page(content)
74                .map_err(|err| StoreError::Serialization(err.to_string()))?;
75            current = next_page;
76        }
77        Ok(pages)
78    }
79
80    fn write_native_blob_chain(
81        &self,
82        payload: &[u8],
83        existing_root: Option<u32>,
84    ) -> Result<(u32, u32, u64), StoreError> {
85        let Some(pager) = &self.pager else {
86            return Ok((0, 0, 0));
87        };
88        if payload.is_empty() {
89            return Ok((0, 0, 0));
90        }
91
92        let chunk_capacity = reddb_file::native_blob_chunk_capacity(
93            crate::storage::engine::PAGE_SIZE,
94            crate::storage::engine::HEADER_SIZE,
95        );
96        let page_count = payload.len().div_ceil(chunk_capacity) as u32;
97        let mut page_ids = existing_root
98            .map(|root| self.read_native_blob_chain_page_ids(root))
99            .transpose()?
100            .unwrap_or_default();
101        while page_ids.len() < page_count as usize {
102            page_ids.push(
103                pager
104                    .allocate_page(crate::storage::engine::PageType::NativeMeta)
105                    .map_err(|err| StoreError::Serialization(err.to_string()))?
106                    .page_id(),
107            );
108        }
109
110        for (index, chunk) in payload.chunks(chunk_capacity).enumerate() {
111            let page_id = page_ids[index];
112            let next_page = page_ids.get(index + 1).copied().unwrap_or(0);
113            let data = reddb_file::encode_native_blob_page(next_page, chunk);
114
115            let mut page = crate::storage::engine::Page::new(
116                crate::storage::engine::PageType::NativeMeta,
117                page_id,
118            );
119            let bytes = page.as_bytes_mut();
120            let content_start = crate::storage::engine::HEADER_SIZE;
121            let copy_len = data.len().min(bytes.len() - content_start);
122            bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
123            pager
124                .write_page(page_id, page)
125                .map_err(|err| StoreError::Serialization(err.to_string()))?;
126        }
127
128        Ok((
129            page_ids[0],
130            page_count,
131            crate::storage::engine::crc32(payload) as u64,
132        ))
133    }
134
135    pub fn read_native_blob_chain(&self, root_page: u32) -> Result<Vec<u8>, StoreError> {
136        let Some(pager) = &self.pager else {
137            return Err(StoreError::Serialization(
138                "native blob chain requires paged mode".to_string(),
139            ));
140        };
141        if root_page == 0 {
142            return Ok(Vec::new());
143        }
144        let mut current = root_page;
145        let mut payload = Vec::new();
146        while current != 0 {
147            let page = pager
148                .read_page(current)
149                .map_err(|err| StoreError::Serialization(err.to_string()))?;
150            let bytes = page.as_bytes();
151            let content = &bytes[crate::storage::engine::HEADER_SIZE..];
152            let (next_page, chunk) = reddb_file::decode_native_blob_page(content)
153                .map_err(|err| StoreError::Serialization(err.to_string()))?;
154            payload.extend_from_slice(&chunk);
155            current = next_page;
156        }
157        Ok(payload)
158    }
159
160    pub fn write_native_vector_artifact_store(
161        &self,
162        artifacts: &[(String, String, Vec<u8>)],
163        existing_page: Option<u32>,
164    ) -> Result<(u32, u64, Vec<NativeVectorArtifactPageSummary>), StoreError> {
165        let Some(pager) = &self.pager else {
166            return Ok((0, 0, Vec::new()));
167        };
168
169        let existing = existing_page
170            .map(|page| self.read_native_vector_artifact_store(page))
171            .transpose()?
172            .unwrap_or_default();
173        let page_id = match existing_page.filter(|page| *page != 0) {
174            Some(page) => page,
175            None => pager
176                .allocate_page(crate::storage::engine::PageType::NativeMeta)
177                .map_err(|err| StoreError::Serialization(err.to_string()))?
178                .page_id(),
179        };
180
181        let mut summaries = Vec::new();
182        for (collection, artifact_kind, bytes) in artifacts {
183            let existing_root = existing
184                .iter()
185                .find(|entry| {
186                    entry.collection == *collection && entry.artifact_kind == *artifact_kind
187                })
188                .map(|entry| entry.root_page);
189            let (root_page, page_count, checksum) =
190                self.write_native_blob_chain(bytes, existing_root)?;
191            summaries.push(NativeVectorArtifactPageSummary {
192                collection: collection.clone(),
193                artifact_kind: artifact_kind.clone(),
194                root_page,
195                page_count,
196                byte_len: bytes.len() as u64,
197                checksum,
198            });
199        }
200
201        let data = reddb_file::encode_native_vector_artifact_store_page(&summaries);
202        let checksum = reddb_file::native_store_page_checksum(&data);
203        let mut page = crate::storage::engine::Page::new(
204            crate::storage::engine::PageType::NativeMeta,
205            page_id,
206        );
207        let bytes = page.as_bytes_mut();
208        let content_start = crate::storage::engine::HEADER_SIZE;
209        let copy_len = data.len().min(bytes.len() - content_start);
210        bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
211        pager
212            .write_page(page_id, page)
213            .map_err(|err| StoreError::Serialization(err.to_string()))?;
214        Ok((page_id, checksum, summaries))
215    }
216
217    pub fn read_native_vector_artifact_store(
218        &self,
219        page_id: u32,
220    ) -> Result<Vec<NativeVectorArtifactPageSummary>, StoreError> {
221        let Some(pager) = &self.pager else {
222            return Err(StoreError::Serialization(
223                "native vector artifact store requires paged mode".to_string(),
224            ));
225        };
226        if page_id == 0 {
227            return Ok(Vec::new());
228        }
229        let page = pager
230            .read_page(page_id)
231            .map_err(|err| StoreError::Serialization(err.to_string()))?;
232        let bytes = page.as_bytes();
233        let content = &bytes[crate::storage::engine::HEADER_SIZE..];
234        reddb_file::decode_native_vector_artifact_store_page(content)
235            .map_err(|err| StoreError::Serialization(err.to_string()))
236    }
237
238    pub fn read_native_vector_artifact_blob(
239        &self,
240        page_id: u32,
241        collection: &str,
242        artifact_kind: Option<&str>,
243    ) -> Result<Option<(NativeVectorArtifactPageSummary, Vec<u8>)>, StoreError> {
244        let artifact_kind = artifact_kind.unwrap_or("hnsw");
245        let summaries = self.read_native_vector_artifact_store(page_id)?;
246        let Some(summary) = summaries.into_iter().find(|summary| {
247            summary.collection == collection && summary.artifact_kind == artifact_kind
248        }) else {
249            return Ok(None);
250        };
251        let bytes = self.read_native_blob_chain(summary.root_page)?;
252        Ok(Some((summary, bytes)))
253    }
254}