reddb_server/storage/unified/store/
impl_native_c.rs1use super::*;
2
3impl UnifiedStore {
4 pub fn read_native_physical_state(&self) -> Result<NativePhysicalState, StoreError> {
5 let header = self.physical_file_header().ok_or_else(|| {
6 StoreError::Serialization("native physical header is not available".to_string())
7 })?;
8
9 let collection_roots = self.read_native_collection_roots(header.collection_roots_page)?;
10 let manifest = if header.manifest_page != 0 {
11 self.read_native_manifest_summary(header.manifest_page).ok()
12 } else {
13 None
14 };
15 let registry = if header.registry_page != 0 {
16 self.read_native_registry_summary(header.registry_page).ok()
17 } else {
18 None
19 };
20 let recovery = if header.recovery_page != 0 {
21 self.read_native_recovery_summary(header.recovery_page).ok()
22 } else {
23 None
24 };
25 let catalog = if header.catalog_page != 0 {
26 self.read_native_catalog_summary(header.catalog_page).ok()
27 } else {
28 None
29 };
30 let metadata_state = if header.metadata_state_page != 0 {
31 self.read_native_metadata_state_summary(header.metadata_state_page)
32 .ok()
33 } else {
34 None
35 };
36 let vector_artifact_pages = if header.vector_artifact_page != 0 {
37 self.read_native_vector_artifact_store(header.vector_artifact_page)
38 .ok()
39 } else {
40 None
41 };
42
43 Ok(NativePhysicalState {
44 header,
45 collection_roots,
46 manifest,
47 registry,
48 recovery,
49 catalog,
50 metadata_state,
51 vector_artifact_pages,
52 })
53 }
54
55 fn read_native_blob_chain_page_ids(&self, root_page: u32) -> Result<Vec<u32>, StoreError> {
56 let Some(pager) = &self.pager else {
57 return Err(StoreError::Serialization(
58 "native blob chain requires paged mode".to_string(),
59 ));
60 };
61 if root_page == 0 {
62 return Ok(Vec::new());
63 }
64 let mut pages = Vec::new();
65 let mut current = root_page;
66 while current != 0 {
67 pages.push(current);
68 let page = pager
69 .read_page(current)
70 .map_err(|err| StoreError::Serialization(err.to_string()))?;
71 let bytes = page.as_bytes();
72 let content = &bytes[crate::storage::engine::HEADER_SIZE..];
73 if content.len() < 12 || &content[0..4] != NATIVE_BLOB_MAGIC {
74 return Err(StoreError::Serialization(
75 "invalid native blob page".to_string(),
76 ));
77 }
78 current = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
79 }
80 Ok(pages)
81 }
82
83 fn write_native_blob_chain(
84 &self,
85 payload: &[u8],
86 existing_root: Option<u32>,
87 ) -> Result<(u32, u32, u64), StoreError> {
88 let Some(pager) = &self.pager else {
89 return Ok((0, 0, 0));
90 };
91 if payload.is_empty() {
92 return Ok((0, 0, 0));
93 }
94
95 let chunk_capacity =
96 crate::storage::engine::PAGE_SIZE - crate::storage::engine::HEADER_SIZE - 12;
97 let page_count = payload.len().div_ceil(chunk_capacity) as u32;
98 let mut page_ids = existing_root
99 .map(|root| self.read_native_blob_chain_page_ids(root))
100 .transpose()?
101 .unwrap_or_default();
102 while page_ids.len() < page_count as usize {
103 page_ids.push(
104 pager
105 .allocate_page(crate::storage::engine::PageType::NativeMeta)
106 .map_err(|err| StoreError::Serialization(err.to_string()))?
107 .page_id(),
108 );
109 }
110
111 for (index, chunk) in payload.chunks(chunk_capacity).enumerate() {
112 let page_id = page_ids[index];
113 let next_page = page_ids.get(index + 1).copied().unwrap_or(0);
114 let mut data = Vec::with_capacity(chunk.len() + 12);
115 data.extend_from_slice(NATIVE_BLOB_MAGIC);
116 data.extend_from_slice(&next_page.to_le_bytes());
117 data.extend_from_slice(&(chunk.len() as u32).to_le_bytes());
118 data.extend_from_slice(chunk);
119
120 let mut page = crate::storage::engine::Page::new(
121 crate::storage::engine::PageType::NativeMeta,
122 page_id,
123 );
124 let bytes = page.as_bytes_mut();
125 let content_start = crate::storage::engine::HEADER_SIZE;
126 let copy_len = data.len().min(bytes.len() - content_start);
127 bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
128 pager
129 .write_page(page_id, page)
130 .map_err(|err| StoreError::Serialization(err.to_string()))?;
131 }
132
133 Ok((
134 page_ids[0],
135 page_count,
136 crate::storage::engine::crc32(payload) as u64,
137 ))
138 }
139
140 pub fn read_native_blob_chain(&self, root_page: u32) -> Result<Vec<u8>, StoreError> {
141 let Some(pager) = &self.pager else {
142 return Err(StoreError::Serialization(
143 "native blob chain requires paged mode".to_string(),
144 ));
145 };
146 if root_page == 0 {
147 return Ok(Vec::new());
148 }
149 let mut current = root_page;
150 let mut payload = Vec::new();
151 while current != 0 {
152 let page = pager
153 .read_page(current)
154 .map_err(|err| StoreError::Serialization(err.to_string()))?;
155 let bytes = page.as_bytes();
156 let content = &bytes[crate::storage::engine::HEADER_SIZE..];
157 if content.len() < 12 || &content[0..4] != NATIVE_BLOB_MAGIC {
158 return Err(StoreError::Serialization(
159 "invalid native blob page".to_string(),
160 ));
161 }
162 let next_page = u32::from_le_bytes([content[4], content[5], content[6], content[7]]);
163 let chunk_len =
164 u32::from_le_bytes([content[8], content[9], content[10], content[11]]) as usize;
165 if 12 + chunk_len > content.len() {
166 return Err(StoreError::Serialization(
167 "truncated native blob page".to_string(),
168 ));
169 }
170 payload.extend_from_slice(&content[12..12 + chunk_len]);
171 current = next_page;
172 }
173 Ok(payload)
174 }
175
176 pub fn write_native_vector_artifact_store(
177 &self,
178 artifacts: &[(String, String, Vec<u8>)],
179 existing_page: Option<u32>,
180 ) -> Result<(u32, u64, Vec<NativeVectorArtifactPageSummary>), StoreError> {
181 let Some(pager) = &self.pager else {
182 return Ok((0, 0, Vec::new()));
183 };
184
185 let existing = existing_page
186 .map(|page| self.read_native_vector_artifact_store(page))
187 .transpose()?
188 .unwrap_or_default();
189 let page_id = match existing_page.filter(|page| *page != 0) {
190 Some(page) => page,
191 None => pager
192 .allocate_page(crate::storage::engine::PageType::NativeMeta)
193 .map_err(|err| StoreError::Serialization(err.to_string()))?
194 .page_id(),
195 };
196
197 let mut summaries = Vec::new();
198 for (collection, artifact_kind, bytes) in artifacts {
199 let existing_root = existing
200 .iter()
201 .find(|entry| {
202 entry.collection == *collection && entry.artifact_kind == *artifact_kind
203 })
204 .map(|entry| entry.root_page);
205 let (root_page, page_count, checksum) =
206 self.write_native_blob_chain(bytes, existing_root)?;
207 summaries.push(NativeVectorArtifactPageSummary {
208 collection: collection.clone(),
209 artifact_kind: artifact_kind.clone(),
210 root_page,
211 page_count,
212 byte_len: bytes.len() as u64,
213 checksum,
214 });
215 }
216
217 let mut data = Vec::with_capacity(1024 + summaries.len() * 64);
218 data.extend_from_slice(NATIVE_VECTOR_ARTIFACT_MAGIC);
219 data.extend_from_slice(&(summaries.len() as u32).to_le_bytes());
220 for summary in &summaries {
221 push_native_string(&mut data, &summary.collection);
222 push_native_string(&mut data, &summary.artifact_kind);
223 data.extend_from_slice(&summary.root_page.to_le_bytes());
224 data.extend_from_slice(&summary.page_count.to_le_bytes());
225 data.extend_from_slice(&summary.byte_len.to_le_bytes());
226 data.extend_from_slice(&summary.checksum.to_le_bytes());
227 }
228
229 let checksum = crate::storage::engine::crc32(&data) as u64;
230 let mut page = crate::storage::engine::Page::new(
231 crate::storage::engine::PageType::NativeMeta,
232 page_id,
233 );
234 let bytes = page.as_bytes_mut();
235 let content_start = crate::storage::engine::HEADER_SIZE;
236 let copy_len = data.len().min(bytes.len() - content_start);
237 bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
238 pager
239 .write_page(page_id, page)
240 .map_err(|err| StoreError::Serialization(err.to_string()))?;
241 Ok((page_id, checksum, summaries))
242 }
243
244 pub fn read_native_vector_artifact_store(
245 &self,
246 page_id: u32,
247 ) -> Result<Vec<NativeVectorArtifactPageSummary>, StoreError> {
248 let Some(pager) = &self.pager else {
249 return Err(StoreError::Serialization(
250 "native vector artifact store requires paged mode".to_string(),
251 ));
252 };
253 if page_id == 0 {
254 return Ok(Vec::new());
255 }
256 let page = pager
257 .read_page(page_id)
258 .map_err(|err| StoreError::Serialization(err.to_string()))?;
259 let bytes = page.as_bytes();
260 let content = &bytes[crate::storage::engine::HEADER_SIZE..];
261 if content.len() < 8 || &content[0..4] != NATIVE_VECTOR_ARTIFACT_MAGIC {
262 return Err(StoreError::Serialization(
263 "invalid native vector artifact store page".to_string(),
264 ));
265 }
266 let count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]) as usize;
267 let mut pos = 8usize;
268 let mut summaries = Vec::with_capacity(count);
269 for _ in 0..count {
270 let collection = read_native_string(content, &mut pos)?;
271 let artifact_kind = read_native_string(content, &mut pos)?;
272 if pos + 24 > content.len() {
273 break;
274 }
275 let root_page = u32::from_le_bytes([
276 content[pos],
277 content[pos + 1],
278 content[pos + 2],
279 content[pos + 3],
280 ]);
281 pos += 4;
282 let page_count = u32::from_le_bytes([
283 content[pos],
284 content[pos + 1],
285 content[pos + 2],
286 content[pos + 3],
287 ]);
288 pos += 4;
289 let byte_len = u64::from_le_bytes([
290 content[pos],
291 content[pos + 1],
292 content[pos + 2],
293 content[pos + 3],
294 content[pos + 4],
295 content[pos + 5],
296 content[pos + 6],
297 content[pos + 7],
298 ]);
299 pos += 8;
300 let checksum = u64::from_le_bytes([
301 content[pos],
302 content[pos + 1],
303 content[pos + 2],
304 content[pos + 3],
305 content[pos + 4],
306 content[pos + 5],
307 content[pos + 6],
308 content[pos + 7],
309 ]);
310 pos += 8;
311 summaries.push(NativeVectorArtifactPageSummary {
312 collection,
313 artifact_kind,
314 root_page,
315 page_count,
316 byte_len,
317 checksum,
318 });
319 }
320 Ok(summaries)
321 }
322
323 pub fn read_native_vector_artifact_blob(
324 &self,
325 page_id: u32,
326 collection: &str,
327 artifact_kind: Option<&str>,
328 ) -> Result<Option<(NativeVectorArtifactPageSummary, Vec<u8>)>, StoreError> {
329 let artifact_kind = artifact_kind.unwrap_or("hnsw");
330 let summaries = self.read_native_vector_artifact_store(page_id)?;
331 let Some(summary) = summaries.into_iter().find(|summary| {
332 summary.collection == collection && summary.artifact_kind == artifact_kind
333 }) else {
334 return Ok(None);
335 };
336 let bytes = self.read_native_blob_chain(summary.root_page)?;
337 Ok(Some((summary, bytes)))
338 }
339}