reddb_server/storage/unified/store/
impl_native_a.rs1use super::*;
2
3impl UnifiedStore {
4 pub fn update_physical_file_header(
5 &self,
6 physical: PhysicalFileHeader,
7 ) -> Result<(), StoreError> {
8 let Some(pager) = &self.pager else {
9 return Ok(());
10 };
11 pager
12 .update_physical_header(physical)
13 .map_err(|err| StoreError::Serialization(err.to_string()))
14 }
15
16 pub fn physical_file_header(&self) -> Option<PhysicalFileHeader> {
18 self.pager
19 .as_ref()
20 .and_then(|pager| pager.physical_header().ok())
21 }
22
23 pub fn write_native_collection_roots(
25 &self,
26 roots: &BTreeMap<String, u64>,
27 existing_page: Option<u32>,
28 ) -> Result<(u32, u64), StoreError> {
29 let Some(pager) = &self.pager else {
30 return Ok((0, 0));
31 };
32
33 let page_id = match existing_page.filter(|page| *page != 0) {
34 Some(page) => page,
35 None => pager
36 .allocate_page(crate::storage::engine::PageType::NativeMeta)
37 .map_err(|err| StoreError::Serialization(err.to_string()))?
38 .page_id(),
39 };
40
41 let mut data = Vec::with_capacity(1024);
42 data.extend_from_slice(NATIVE_COLLECTION_ROOTS_MAGIC);
43 data.extend_from_slice(&(roots.len() as u32).to_le_bytes());
44 for (collection, root) in roots {
45 data.extend_from_slice(&(collection.len() as u32).to_le_bytes());
46 data.extend_from_slice(collection.as_bytes());
47 data.extend_from_slice(&root.to_le_bytes());
48 }
49
50 let checksum = crate::storage::engine::crc32(&data) as u64;
51 let mut page = crate::storage::engine::Page::new(
52 crate::storage::engine::PageType::NativeMeta,
53 page_id,
54 );
55 let bytes = page.as_bytes_mut();
56 let content_start = crate::storage::engine::HEADER_SIZE;
57 let copy_len = data.len().min(bytes.len() - content_start);
58 bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
59 pager
60 .write_page(page_id, page)
61 .map_err(|err| StoreError::Serialization(err.to_string()))?;
62 Ok((page_id, checksum))
63 }
64
65 pub fn read_native_collection_roots(
67 &self,
68 page_id: u32,
69 ) -> Result<BTreeMap<String, u64>, StoreError> {
70 let Some(pager) = &self.pager else {
71 return Ok(BTreeMap::new());
72 };
73 if page_id == 0 {
74 return Ok(BTreeMap::new());
75 }
76
77 let page = pager
78 .read_page(page_id)
79 .map_err(|err| StoreError::Serialization(err.to_string()))?;
80 let bytes = page.as_bytes();
81 let content = &bytes[crate::storage::engine::HEADER_SIZE..];
82 if content.len() < 8 || &content[0..4] != NATIVE_COLLECTION_ROOTS_MAGIC {
83 return Err(StoreError::Serialization(
84 "invalid native collection roots page".to_string(),
85 ));
86 }
87
88 let count = u32::from_le_bytes([content[4], content[5], content[6], content[7]]) as usize;
89 let mut pos = 8usize;
90 let mut roots = BTreeMap::new();
91
92 for _ in 0..count {
93 if pos + 4 > content.len() {
94 break;
95 }
96 let name_len = u32::from_le_bytes([
97 content[pos],
98 content[pos + 1],
99 content[pos + 2],
100 content[pos + 3],
101 ]) as usize;
102 pos += 4;
103 if pos + name_len + 8 > content.len() {
104 break;
105 }
106 let name = String::from_utf8(content[pos..pos + name_len].to_vec())
107 .map_err(|err| StoreError::Serialization(err.to_string()))?;
108 pos += name_len;
109 let root = u64::from_le_bytes([
110 content[pos],
111 content[pos + 1],
112 content[pos + 2],
113 content[pos + 3],
114 content[pos + 4],
115 content[pos + 5],
116 content[pos + 6],
117 content[pos + 7],
118 ]);
119 pos += 8;
120 roots.insert(name, root);
121 }
122
123 Ok(roots)
124 }
125
126 pub fn write_native_manifest_summary(
128 &self,
129 sequence: u64,
130 events: &[ManifestEvent],
131 existing_page: Option<u32>,
132 ) -> Result<(u32, u64), StoreError> {
133 let Some(pager) = &self.pager else {
134 return Ok((0, 0));
135 };
136
137 let page_id = match existing_page.filter(|page| *page != 0) {
138 Some(page) => page,
139 None => pager
140 .allocate_page(crate::storage::engine::PageType::NativeMeta)
141 .map_err(|err| StoreError::Serialization(err.to_string()))?
142 .page_id(),
143 };
144
145 let sample_start = events.len().saturating_sub(NATIVE_MANIFEST_SAMPLE_LIMIT);
146 let sample = &events[sample_start..];
147
148 let mut data = Vec::with_capacity(1024);
149 data.extend_from_slice(NATIVE_MANIFEST_MAGIC);
150 data.extend_from_slice(&sequence.to_le_bytes());
151 data.extend_from_slice(&(events.len() as u32).to_le_bytes());
152 data.push(u8::from(events.len() <= NATIVE_MANIFEST_SAMPLE_LIMIT));
153 data.extend_from_slice(&(events.len().saturating_sub(sample.len()) as u32).to_le_bytes());
154 data.extend_from_slice(&(sample.len() as u32).to_le_bytes());
155 for event in sample {
156 data.push(native_manifest_kind_to_byte(event.kind));
157 data.extend_from_slice(&(event.collection.len() as u16).to_le_bytes());
158 data.extend_from_slice(event.collection.as_bytes());
159 data.extend_from_slice(&(event.object_key.len() as u16).to_le_bytes());
160 data.extend_from_slice(event.object_key.as_bytes());
161 data.extend_from_slice(&event.block.index.to_le_bytes());
162 data.extend_from_slice(&event.block.checksum.to_le_bytes());
163 data.extend_from_slice(&event.snapshot_min.to_le_bytes());
164 match event.snapshot_max {
165 Some(value) => {
166 data.push(1);
167 data.extend_from_slice(&value.to_le_bytes());
168 }
169 None => data.push(0),
170 }
171 }
172
173 let checksum = crate::storage::engine::crc32(&data) as u64;
174 let mut page = crate::storage::engine::Page::new(
175 crate::storage::engine::PageType::NativeMeta,
176 page_id,
177 );
178 let bytes = page.as_bytes_mut();
179 let content_start = crate::storage::engine::HEADER_SIZE;
180 let copy_len = data.len().min(bytes.len() - content_start);
181 bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
182 pager
183 .write_page(page_id, page)
184 .map_err(|err| StoreError::Serialization(err.to_string()))?;
185 Ok((page_id, checksum))
186 }
187
188 pub fn read_native_manifest_summary(
190 &self,
191 page_id: u32,
192 ) -> Result<NativeManifestSummary, StoreError> {
193 let Some(pager) = &self.pager else {
194 return Err(StoreError::Serialization(
195 "native manifest summary requires paged mode".to_string(),
196 ));
197 };
198 if page_id == 0 {
199 return Err(StoreError::Serialization(
200 "native manifest summary page is not set".to_string(),
201 ));
202 }
203
204 let page = pager
205 .read_page(page_id)
206 .map_err(|err| StoreError::Serialization(err.to_string()))?;
207 let bytes = page.as_bytes();
208 let content = &bytes[crate::storage::engine::HEADER_SIZE..];
209 if content.len() < 25 || &content[0..4] != NATIVE_MANIFEST_MAGIC {
210 return Err(StoreError::Serialization(
211 "invalid native manifest summary page".to_string(),
212 ));
213 }
214
215 let sequence = u64::from_le_bytes([
216 content[4],
217 content[5],
218 content[6],
219 content[7],
220 content[8],
221 content[9],
222 content[10],
223 content[11],
224 ]);
225 let event_count = u32::from_le_bytes([content[12], content[13], content[14], content[15]]);
226 let events_complete = content[16] == 1;
227 let omitted_event_count =
228 u32::from_le_bytes([content[17], content[18], content[19], content[20]]);
229 let sample_count =
230 u32::from_le_bytes([content[21], content[22], content[23], content[24]]) as usize;
231
232 let mut pos = 25usize;
233 let mut recent_events = Vec::with_capacity(sample_count);
234 for _ in 0..sample_count {
235 if pos + 1 + 2 > content.len() {
236 break;
237 }
238 let kind = native_manifest_kind_from_byte(content[pos]).to_string();
239 pos += 1;
240 let collection_len = u16::from_le_bytes([content[pos], content[pos + 1]]) as usize;
241 pos += 2;
242 if pos + collection_len + 2 > content.len() {
243 break;
244 }
245 let collection = String::from_utf8(content[pos..pos + collection_len].to_vec())
246 .map_err(|err| StoreError::Serialization(err.to_string()))?;
247 pos += collection_len;
248 let object_key_len = u16::from_le_bytes([content[pos], content[pos + 1]]) as usize;
249 pos += 2;
250 if pos + object_key_len + 8 + 16 + 8 + 1 > content.len() {
251 break;
252 }
253 let object_key = String::from_utf8(content[pos..pos + object_key_len].to_vec())
254 .map_err(|err| StoreError::Serialization(err.to_string()))?;
255 pos += object_key_len;
256 let block_index = u64::from_le_bytes([
257 content[pos],
258 content[pos + 1],
259 content[pos + 2],
260 content[pos + 3],
261 content[pos + 4],
262 content[pos + 5],
263 content[pos + 6],
264 content[pos + 7],
265 ]);
266 pos += 8;
267 let mut checksum_bytes = [0u8; 16];
268 checksum_bytes.copy_from_slice(&content[pos..pos + 16]);
269 pos += 16;
270 let snapshot_min = u64::from_le_bytes([
271 content[pos],
272 content[pos + 1],
273 content[pos + 2],
274 content[pos + 3],
275 content[pos + 4],
276 content[pos + 5],
277 content[pos + 6],
278 content[pos + 7],
279 ]);
280 pos += 8;
281 let snapshot_max = match content.get(pos).copied() {
282 Some(1) => {
283 pos += 1;
284 if pos + 8 > content.len() {
285 return Err(StoreError::Serialization(
286 "truncated native manifest snapshot_max".to_string(),
287 ));
288 }
289 let value = u64::from_le_bytes([
290 content[pos],
291 content[pos + 1],
292 content[pos + 2],
293 content[pos + 3],
294 content[pos + 4],
295 content[pos + 5],
296 content[pos + 6],
297 content[pos + 7],
298 ]);
299 pos += 8;
300 Some(value)
301 }
302 Some(_) => {
303 pos += 1;
304 None
305 }
306 None => None,
307 };
308
309 recent_events.push(NativeManifestEntrySummary {
310 collection,
311 object_key,
312 kind,
313 block_index,
314 block_checksum: u128::from_le_bytes(checksum_bytes),
315 snapshot_min,
316 snapshot_max,
317 });
318 }
319
320 Ok(NativeManifestSummary {
321 sequence,
322 event_count,
323 events_complete,
324 omitted_event_count,
325 recent_events,
326 })
327 }
328
329 pub fn write_native_registry_summary(
331 &self,
332 summary: &NativeRegistrySummary,
333 existing_page: Option<u32>,
334 ) -> Result<(u32, u64), StoreError> {
335 let Some(pager) = &self.pager else {
336 return Ok((0, 0));
337 };
338
339 let page_id = match existing_page.filter(|page| *page != 0) {
340 Some(page) => page,
341 None => pager
342 .allocate_page(crate::storage::engine::PageType::NativeMeta)
343 .map_err(|err| StoreError::Serialization(err.to_string()))?
344 .page_id(),
345 };
346
347 let mut data = Vec::with_capacity(2048);
348 data.extend_from_slice(NATIVE_REGISTRY_MAGIC);
349 data.extend_from_slice(&summary.collection_count.to_le_bytes());
350 data.extend_from_slice(&summary.index_count.to_le_bytes());
351 data.extend_from_slice(&summary.graph_projection_count.to_le_bytes());
352 data.extend_from_slice(&summary.analytics_job_count.to_le_bytes());
353 data.extend_from_slice(&summary.vector_artifact_count.to_le_bytes());
354 data.push(u8::from(summary.collections_complete));
355 data.push(u8::from(summary.indexes_complete));
356 data.push(u8::from(summary.graph_projections_complete));
357 data.push(u8::from(summary.analytics_jobs_complete));
358 data.push(u8::from(summary.vector_artifacts_complete));
359 data.extend_from_slice(&summary.omitted_collection_count.to_le_bytes());
360 data.extend_from_slice(&summary.omitted_index_count.to_le_bytes());
361 data.extend_from_slice(&summary.omitted_graph_projection_count.to_le_bytes());
362 data.extend_from_slice(&summary.omitted_analytics_job_count.to_le_bytes());
363 data.extend_from_slice(&summary.omitted_vector_artifact_count.to_le_bytes());
364 data.extend_from_slice(&(summary.collection_names.len() as u32).to_le_bytes());
365 data.extend_from_slice(&(summary.indexes.len() as u32).to_le_bytes());
366 data.extend_from_slice(&(summary.graph_projections.len() as u32).to_le_bytes());
367 data.extend_from_slice(&(summary.analytics_jobs.len() as u32).to_le_bytes());
368 data.extend_from_slice(&(summary.vector_artifacts.len() as u32).to_le_bytes());
369
370 for name in &summary.collection_names {
371 push_native_string(&mut data, name);
372 }
373 for index in &summary.indexes {
374 push_native_string(&mut data, &index.name);
375 push_native_string(&mut data, &index.kind);
376 match &index.collection {
377 Some(collection) => {
378 data.push(1);
379 push_native_string(&mut data, collection);
380 }
381 None => data.push(0),
382 }
383 data.push(u8::from(index.enabled));
384 data.extend_from_slice(&index.entries.to_le_bytes());
385 data.extend_from_slice(&index.estimated_memory_bytes.to_le_bytes());
386 match index.last_refresh_ms {
387 Some(value) => {
388 data.push(1);
389 data.extend_from_slice(&value.to_le_bytes());
390 }
391 None => data.push(0),
392 }
393 push_native_string(&mut data, &index.backend);
394 }
395 for projection in &summary.graph_projections {
396 push_native_string(&mut data, &projection.name);
397 push_native_string(&mut data, &projection.source);
398 data.extend_from_slice(&projection.created_at_unix_ms.to_le_bytes());
399 data.extend_from_slice(&projection.updated_at_unix_ms.to_le_bytes());
400 push_native_string_list(&mut data, &projection.node_labels);
401 push_native_string_list(&mut data, &projection.node_types);
402 push_native_string_list(&mut data, &projection.edge_labels);
403 match projection.last_materialized_sequence {
404 Some(value) => {
405 data.push(1);
406 data.extend_from_slice(&value.to_le_bytes());
407 }
408 None => data.push(0),
409 }
410 }
411 for job in &summary.analytics_jobs {
412 push_native_string(&mut data, &job.id);
413 push_native_string(&mut data, &job.kind);
414 match &job.projection {
415 Some(projection) => {
416 data.push(1);
417 push_native_string(&mut data, projection);
418 }
419 None => data.push(0),
420 }
421 push_native_string(&mut data, &job.state);
422 data.extend_from_slice(&job.created_at_unix_ms.to_le_bytes());
423 data.extend_from_slice(&job.updated_at_unix_ms.to_le_bytes());
424 match job.last_run_sequence {
425 Some(value) => {
426 data.push(1);
427 data.extend_from_slice(&value.to_le_bytes());
428 }
429 None => data.push(0),
430 }
431 let metadata_count = job.metadata.len().min(u16::MAX as usize) as u16;
432 data.extend_from_slice(&metadata_count.to_le_bytes());
433 for (key, value) in job.metadata.iter().take(metadata_count as usize) {
434 push_native_string(&mut data, key);
435 push_native_string(&mut data, value);
436 }
437 }
438 for artifact in &summary.vector_artifacts {
439 push_native_string(&mut data, &artifact.collection);
440 push_native_string(&mut data, &artifact.artifact_kind);
441 data.extend_from_slice(&artifact.vector_count.to_le_bytes());
442 data.extend_from_slice(&artifact.dimension.to_le_bytes());
443 data.extend_from_slice(&artifact.max_layer.to_le_bytes());
444 data.extend_from_slice(&artifact.serialized_bytes.to_le_bytes());
445 data.extend_from_slice(&artifact.checksum.to_le_bytes());
446 }
447
448 let checksum = crate::storage::engine::crc32(&data) as u64;
449 let mut page = crate::storage::engine::Page::new(
450 crate::storage::engine::PageType::NativeMeta,
451 page_id,
452 );
453 let bytes = page.as_bytes_mut();
454 let content_start = crate::storage::engine::HEADER_SIZE;
455 let copy_len = data.len().min(bytes.len() - content_start);
456 bytes[content_start..content_start + copy_len].copy_from_slice(&data[..copy_len]);
457 pager
458 .write_page(page_id, page)
459 .map_err(|err| StoreError::Serialization(err.to_string()))?;
460 Ok((page_id, checksum))
461 }
462}