reddb_server/storage/unified/store/
impl_native_c.rs1use super::*;
2
3impl UnifiedStore {
4 pub fn read_native_physical_state(&self) -> Result<NativePhysicalState, StoreError> {
5 let header = self.physical_file_header().ok_or_else(|| {
6 StoreError::Serialization("native physical header is not available".to_string())
7 })?;
8
9 let collection_roots = self.read_native_collection_roots(header.collection_roots_page)?;
10 let manifest = if header.manifest_page != 0 {
11 self.read_native_manifest_summary(header.manifest_page).ok()
12 } else {
13 None
14 };
15 let registry = if header.registry_page != 0 {
16 self.read_native_registry_summary(header.registry_page).ok()
17 } else {
18 None
19 };
20 let recovery = if header.recovery_page != 0 {
21 self.read_native_recovery_summary(header.recovery_page).ok()
22 } else {
23 None
24 };
25 let catalog = if header.catalog_page != 0 {
26 self.read_native_catalog_summary(header.catalog_page).ok()
27 } else {
28 None
29 };
30 let metadata_state = if header.metadata_state_page != 0 {
31 self.read_native_metadata_state_summary(header.metadata_state_page)
32 .ok()
33 } else {
34 None
35 };
36 let vector_artifact_pages = if header.vector_artifact_page != 0 {
37 self.read_native_vector_artifact_store(header.vector_artifact_page)
38 .ok()
39 } else {
40 None
41 };
42
43 Ok(NativePhysicalState {
44 header,
45 collection_roots,
46 manifest,
47 registry,
48 recovery,
49 catalog,
50 metadata_state,
51 vector_artifact_pages,
52 })
53 }
54
55 fn read_native_blob_chain_page_ids(&self, root_page: u32) -> Result<Vec<u32>, StoreError> {
56 let Some(pager) = &self.pager else {
57 return Err(StoreError::Serialization(
58 "native blob chain requires paged mode".to_string(),
59 ));
60 };
61 if root_page == 0 {
62 return Ok(Vec::new());
63 }
64 let mut pages = Vec::new();
65 let mut current = root_page;
66 while current != 0 {
67 pages.push(current);
68 let page = pager
69 .read_page(current)
70 .map_err(|err| StoreError::Serialization(err.to_string()))?;
71 let bytes = page.as_bytes();
72 let content = &bytes[crate::storage::engine::HEADER_SIZE..];
73 let (next_page, _) = reddb_file::decode_native_blob_page(content)
74 .map_err(|err| StoreError::Serialization(err.to_string()))?;
75 current = next_page;
76 }
77 Ok(pages)
78 }
79
80 fn write_native_blob_chain(
81 &self,
82 payload: &[u8],
83 existing_root: Option<u32>,
84 ) -> Result<(u32, u32, u64), StoreError> {
85 let Some(pager) = &self.pager else {
86 return Ok((0, 0, 0));
87 };
88 if payload.is_empty() {
89 return Ok((0, 0, 0));
90 }
91
92 let chunk_capacity = reddb_file::native_blob_chunk_capacity(
93 crate::storage::engine::PAGE_SIZE,
94 crate::storage::engine::HEADER_SIZE,
95 );
96 let page_count = payload.len().div_ceil(chunk_capacity) as u32;
97 let mut page_ids = existing_root
98 .map(|root| self.read_native_blob_chain_page_ids(root))
99 .transpose()?
100 .unwrap_or_default();
101 while page_ids.len() < page_count as usize {
102 page_ids.push(
103 pager
104 .allocate_page(crate::storage::engine::PageType::NativeMeta)
105 .map_err(|err| StoreError::Serialization(err.to_string()))?
106 .page_id(),
107 );
108 }
109
110 for (index, chunk) in payload.chunks(chunk_capacity).enumerate() {
111 let page_id = page_ids[index];
112 let next_page = page_ids.get(index + 1).copied().unwrap_or(0);
113 let data = reddb_file::encode_native_blob_page(next_page, chunk);
114
115 let mut page = crate::storage::engine::Page::new(
116 crate::storage::engine::PageType::NativeMeta,
117 page_id,
118 );
119 let bytes = page.as_bytes_mut();
120 let content_start = crate::storage::engine::HEADER_SIZE;
121 let copy_len = data.len().min(bytes.len() - content_start);
122 bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
123 pager
124 .write_page(page_id, page)
125 .map_err(|err| StoreError::Serialization(err.to_string()))?;
126 }
127
128 Ok((
129 page_ids[0],
130 page_count,
131 crate::storage::engine::crc32(payload) as u64,
132 ))
133 }
134
135 pub fn read_native_blob_chain(&self, root_page: u32) -> Result<Vec<u8>, StoreError> {
136 let Some(pager) = &self.pager else {
137 return Err(StoreError::Serialization(
138 "native blob chain requires paged mode".to_string(),
139 ));
140 };
141 if root_page == 0 {
142 return Ok(Vec::new());
143 }
144 let mut current = root_page;
145 let mut payload = Vec::new();
146 while current != 0 {
147 let page = pager
148 .read_page(current)
149 .map_err(|err| StoreError::Serialization(err.to_string()))?;
150 let bytes = page.as_bytes();
151 let content = &bytes[crate::storage::engine::HEADER_SIZE..];
152 let (next_page, chunk) = reddb_file::decode_native_blob_page(content)
153 .map_err(|err| StoreError::Serialization(err.to_string()))?;
154 payload.extend_from_slice(&chunk);
155 current = next_page;
156 }
157 Ok(payload)
158 }
159
160 pub fn write_native_vector_artifact_store(
161 &self,
162 artifacts: &[(String, String, Vec<u8>)],
163 existing_page: Option<u32>,
164 ) -> Result<(u32, u64, Vec<NativeVectorArtifactPageSummary>), StoreError> {
165 let Some(pager) = &self.pager else {
166 return Ok((0, 0, Vec::new()));
167 };
168
169 let existing = existing_page
170 .map(|page| self.read_native_vector_artifact_store(page))
171 .transpose()?
172 .unwrap_or_default();
173 let page_id = match existing_page.filter(|page| *page != 0) {
174 Some(page) => page,
175 None => pager
176 .allocate_page(crate::storage::engine::PageType::NativeMeta)
177 .map_err(|err| StoreError::Serialization(err.to_string()))?
178 .page_id(),
179 };
180
181 let mut summaries = Vec::new();
182 for (collection, artifact_kind, bytes) in artifacts {
183 let existing_root = existing
184 .iter()
185 .find(|entry| {
186 entry.collection == *collection && entry.artifact_kind == *artifact_kind
187 })
188 .map(|entry| entry.root_page);
189 let (root_page, page_count, checksum) =
190 self.write_native_blob_chain(bytes, existing_root)?;
191 summaries.push(NativeVectorArtifactPageSummary {
192 collection: collection.clone(),
193 artifact_kind: artifact_kind.clone(),
194 root_page,
195 page_count,
196 byte_len: bytes.len() as u64,
197 checksum,
198 });
199 }
200
201 let data = reddb_file::encode_native_vector_artifact_store_page(&summaries);
202 let checksum = reddb_file::native_store_page_checksum(&data);
203 let mut page = crate::storage::engine::Page::new(
204 crate::storage::engine::PageType::NativeMeta,
205 page_id,
206 );
207 let bytes = page.as_bytes_mut();
208 let content_start = crate::storage::engine::HEADER_SIZE;
209 let copy_len = data.len().min(bytes.len() - content_start);
210 bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
211 pager
212 .write_page(page_id, page)
213 .map_err(|err| StoreError::Serialization(err.to_string()))?;
214 Ok((page_id, checksum, summaries))
215 }
216
217 pub fn read_native_vector_artifact_store(
218 &self,
219 page_id: u32,
220 ) -> Result<Vec<NativeVectorArtifactPageSummary>, StoreError> {
221 let Some(pager) = &self.pager else {
222 return Err(StoreError::Serialization(
223 "native vector artifact store requires paged mode".to_string(),
224 ));
225 };
226 if page_id == 0 {
227 return Ok(Vec::new());
228 }
229 let page = pager
230 .read_page(page_id)
231 .map_err(|err| StoreError::Serialization(err.to_string()))?;
232 let bytes = page.as_bytes();
233 let content = &bytes[crate::storage::engine::HEADER_SIZE..];
234 reddb_file::decode_native_vector_artifact_store_page(content)
235 .map_err(|err| StoreError::Serialization(err.to_string()))
236 }
237
238 pub fn read_native_vector_artifact_blob(
239 &self,
240 page_id: u32,
241 collection: &str,
242 artifact_kind: Option<&str>,
243 ) -> Result<Option<(NativeVectorArtifactPageSummary, Vec<u8>)>, StoreError> {
244 let artifact_kind = artifact_kind.unwrap_or("hnsw");
245 let summaries = self.read_native_vector_artifact_store(page_id)?;
246 let Some(summary) = summaries.into_iter().find(|summary| {
247 summary.collection == collection && summary.artifact_kind == artifact_kind
248 }) else {
249 return Ok(None);
250 };
251 let bytes = self.read_native_blob_chain(summary.root_page)?;
252 Ok(Some((summary, bytes)))
253 }
254}