1use std::collections::BinaryHeap;
7use std::fs::{self, File, OpenOptions};
8use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
9use std::path::{Path, PathBuf};
10
11use rvf_types::{
12 DomainProfile, ErrorCode, FileIdentity, RvfError, SegmentType,
13 SEGMENT_HEADER_SIZE, SEGMENT_MAGIC,
14};
15use rvf_types::kernel::{KernelHeader, KERNEL_MAGIC};
16use rvf_types::kernel_binding::KernelBinding;
17use rvf_types::ebpf::{EbpfHeader, EBPF_MAGIC};
18
19use crate::cow::{CowEngine, CowStats};
20use crate::deletion::DeletionBitmap;
21use crate::filter::{self, FilterExpr, FilterValue, MetadataStore, metadata_value_to_filter};
22use crate::locking::WriterLock;
23use crate::membership::MembershipFilter;
24use crate::options::*;
25use crate::read_path::{self, VectorData};
26use crate::status::{CompactionState, StoreStatus};
27use crate::write_path::SegmentWriter;
28
29fn err(code: ErrorCode) -> RvfError {
31 RvfError::Code(code)
32}
33
34pub struct RvfStore {
38 path: PathBuf,
39 options: RvfOptions,
40 file: File,
41 seg_writer: Option<SegmentWriter>,
42 writer_lock: Option<WriterLock>,
43 vectors: VectorData,
44 deletion_bitmap: DeletionBitmap,
45 metadata: MetadataStore,
46 epoch: u32,
47 segment_dir: Vec<(u64, u64, u64, u8)>,
48 read_only: bool,
49 last_compaction_time: u64,
50 file_identity: FileIdentity,
51 cow_engine: Option<CowEngine>,
53 membership_filter: Option<MembershipFilter>,
55 parent_path: Option<PathBuf>,
57}
58
59impl RvfStore {
60 pub fn create(path: &Path, options: RvfOptions) -> Result<Self, RvfError> {
62 if options.dimension == 0 {
63 return Err(err(ErrorCode::InvalidManifest));
64 }
65
66 let file = OpenOptions::new()
67 .read(true)
68 .write(true)
69 .create_new(true)
70 .open(path)
71 .map_err(|_| err(ErrorCode::FsyncFailed))?;
72
73 let writer_lock = WriterLock::acquire(path)
74 .map_err(|_| err(ErrorCode::LockHeld))?;
75
76 let file_id = generate_file_id(path);
78
79 let domain_profile = path
81 .extension()
82 .and_then(|ext| ext.to_str())
83 .and_then(DomainProfile::from_extension)
84 .unwrap_or(options.domain_profile);
85
86 let mut opts = options.clone();
87 opts.domain_profile = domain_profile;
88
89 let mut store = Self {
90 path: path.to_path_buf(),
91 options: opts,
92 file,
93 seg_writer: Some(SegmentWriter::new(1)),
94 writer_lock: Some(writer_lock),
95 vectors: VectorData::new(options.dimension),
96 deletion_bitmap: DeletionBitmap::new(),
97 metadata: MetadataStore::new(),
98 epoch: 0,
99 segment_dir: Vec::new(),
100 read_only: false,
101 last_compaction_time: 0,
102 file_identity: FileIdentity::new_root(file_id),
103 cow_engine: None,
104 membership_filter: None,
105 parent_path: None,
106 };
107
108 store.write_manifest()?;
109 Ok(store)
110 }
111
112 pub fn open(path: &Path) -> Result<Self, RvfError> {
114 if !path.exists() {
115 return Err(err(ErrorCode::ManifestNotFound));
116 }
117
118 let writer_lock = WriterLock::acquire(path)
119 .map_err(|_| err(ErrorCode::LockHeld))?;
120
121 let file = OpenOptions::new()
122 .read(true)
123 .write(true)
124 .open(path)
125 .map_err(|_| err(ErrorCode::InvalidManifest))?;
126
127 let domain_profile = path
129 .extension()
130 .and_then(|ext| ext.to_str())
131 .and_then(DomainProfile::from_extension)
132 .unwrap_or(DomainProfile::Generic);
133
134 let opts = RvfOptions {
135 domain_profile,
136 ..Default::default()
137 };
138
139 let mut store = Self {
140 path: path.to_path_buf(),
141 options: opts,
142 file,
143 seg_writer: None,
144 writer_lock: Some(writer_lock),
145 vectors: VectorData::new(0),
146 deletion_bitmap: DeletionBitmap::new(),
147 metadata: MetadataStore::new(),
148 epoch: 0,
149 segment_dir: Vec::new(),
150 read_only: false,
151 last_compaction_time: 0,
152 file_identity: FileIdentity::zeroed(),
153 cow_engine: None,
154 membership_filter: None,
155 parent_path: None,
156 };
157
158 store.boot()?;
159 Ok(store)
160 }
161
162 pub fn open_readonly(path: &Path) -> Result<Self, RvfError> {
164 if !path.exists() {
165 return Err(err(ErrorCode::ManifestNotFound));
166 }
167
168 let file = OpenOptions::new()
169 .read(true)
170 .open(path)
171 .map_err(|_| err(ErrorCode::InvalidManifest))?;
172
173 let domain_profile = path
174 .extension()
175 .and_then(|ext| ext.to_str())
176 .and_then(DomainProfile::from_extension)
177 .unwrap_or(DomainProfile::Generic);
178
179 let opts = RvfOptions {
180 domain_profile,
181 ..Default::default()
182 };
183
184 let mut store = Self {
185 path: path.to_path_buf(),
186 options: opts,
187 file,
188 seg_writer: None,
189 writer_lock: None,
190 vectors: VectorData::new(0),
191 deletion_bitmap: DeletionBitmap::new(),
192 metadata: MetadataStore::new(),
193 epoch: 0,
194 segment_dir: Vec::new(),
195 read_only: true,
196 last_compaction_time: 0,
197 file_identity: FileIdentity::zeroed(),
198 cow_engine: None,
199 membership_filter: None,
200 parent_path: None,
201 };
202
203 store.boot()?;
204 Ok(store)
205 }
206
207 pub fn ingest_batch(
209 &mut self,
210 vectors: &[&[f32]],
211 ids: &[u64],
212 metadata: Option<&[MetadataEntry]>,
213 ) -> Result<IngestResult, RvfError> {
214 if self.read_only {
215 return Err(err(ErrorCode::ReadOnly));
216 }
217 if vectors.len() != ids.len() {
218 return Err(err(ErrorCode::DimensionMismatch));
219 }
220
221 let dim = self.options.dimension as usize;
222 let mut accepted = 0u64;
223 let mut rejected = 0u64;
224
225 let mut valid_vectors: Vec<&[f32]> = Vec::with_capacity(vectors.len());
226 let mut valid_ids: Vec<u64> = Vec::with_capacity(ids.len());
227
228 for (i, &vec_data) in vectors.iter().enumerate() {
229 if vec_data.len() != dim {
230 rejected += 1;
231 continue;
232 }
233 valid_vectors.push(vec_data);
234 valid_ids.push(ids[i]);
235 accepted += 1;
236 }
237
238 if valid_vectors.is_empty() {
239 self.epoch += 1;
240 return Ok(IngestResult { accepted: 0, rejected, epoch: self.epoch });
241 }
242
243 let writer = self.seg_writer.as_mut().ok_or_else(|| err(ErrorCode::InvalidManifest))?;
244
245 let (vec_seg_id, vec_seg_offset) = {
246 let mut buf_writer = BufWriter::with_capacity(256 * 1024, &self.file);
247 buf_writer.seek(SeekFrom::End(0)).map_err(|_| err(ErrorCode::FsyncFailed))?;
248 writer.write_vec_seg(&mut buf_writer, &valid_vectors, &valid_ids, self.options.dimension)
249 .map_err(|_| err(ErrorCode::FsyncFailed))?
250 };
251
252 let bytes_per_vec = (self.options.dimension as usize) * 4;
253 let vec_payload_len = (2 + 4 + valid_vectors.len() * (8 + bytes_per_vec)) as u64;
254
255 self.segment_dir.push((vec_seg_id, vec_seg_offset, vec_payload_len, SegmentType::Vec as u8));
256
257 for (vec_data, &vec_id) in valid_vectors.iter().zip(valid_ids.iter()) {
258 self.vectors.insert(vec_id, vec_data.to_vec());
259 }
260
261 if let Some(meta_entries) = metadata {
262 let entries_per_id = meta_entries.len() / valid_ids.len().max(1);
263 if entries_per_id > 0 {
264 for (i, &vid) in valid_ids.iter().enumerate() {
265 let start = i * entries_per_id;
266 let end = ((i + 1) * entries_per_id).min(meta_entries.len());
267 let fields: Vec<(u16, FilterValue)> = meta_entries[start..end]
268 .iter()
269 .map(|e| (e.field_id, metadata_value_to_filter(&e.value)))
270 .collect();
271 self.metadata.insert(vid, fields);
272 }
273 }
274 }
275
276 self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
277
278 self.epoch += 1;
279 self.write_manifest()?;
280
281 Ok(IngestResult { accepted, rejected, epoch: self.epoch })
282 }
283
284 pub fn query(
286 &self,
287 vector: &[f32],
288 k: usize,
289 options: &QueryOptions,
290 ) -> Result<Vec<SearchResult>, RvfError> {
291 let dim = self.options.dimension as usize;
292 if vector.len() != dim {
293 return Err(err(ErrorCode::DimensionMismatch));
294 }
295
296 if self.vectors.len() == 0 {
297 return Ok(Vec::new());
298 }
299
300 let mut heap: BinaryHeap<(OrderedFloat, u64)> = BinaryHeap::new();
303
304 for &vec_id in self.vectors.ids() {
305 if self.deletion_bitmap.is_deleted(vec_id) {
306 continue;
307 }
308 if let Some(ref filter_expr) = options.filter {
309 if !filter::evaluate(filter_expr, vec_id, &self.metadata) {
310 continue;
311 }
312 }
313 if let Some(stored_vec) = self.vectors.get(vec_id) {
314 let dist = compute_distance(vector, stored_vec, &self.options.metric);
315 if heap.len() < k {
316 heap.push((OrderedFloat(dist), vec_id));
317 } else if let Some(&(OrderedFloat(worst), _)) = heap.peek() {
318 if dist < worst {
319 heap.pop();
320 heap.push((OrderedFloat(dist), vec_id));
321 }
322 }
323 }
324 }
325
326 let mut results: Vec<SearchResult> = heap
328 .into_iter()
329 .map(|(OrderedFloat(dist), id)| SearchResult { id, distance: dist })
330 .collect();
331 results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap_or(std::cmp::Ordering::Equal));
332 Ok(results)
333 }
334
335 pub fn delete(&mut self, ids: &[u64]) -> Result<DeleteResult, RvfError> {
337 if self.read_only {
338 return Err(err(ErrorCode::ReadOnly));
339 }
340
341 let writer = self.seg_writer.as_mut().ok_or_else(|| err(ErrorCode::InvalidManifest))?;
342 let epoch = self.epoch + 1;
343
344 let (journal_seg_id, journal_offset) = {
345 let mut buf_writer = BufWriter::new(&self.file);
346 buf_writer.seek(SeekFrom::End(0)).map_err(|_| err(ErrorCode::FsyncFailed))?;
347 writer.write_journal_seg(&mut buf_writer, ids, epoch)
348 .map_err(|_| err(ErrorCode::FsyncFailed))?
349 };
350
351 let journal_payload_len = (16 + ids.len() * 12) as u64;
352 self.segment_dir.push((journal_seg_id, journal_offset, journal_payload_len, SegmentType::Journal as u8));
353
354 self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
355
356 let mut deleted = 0u64;
357 for &id in ids {
358 if self.vectors.get(id).is_some() && !self.deletion_bitmap.is_deleted(id) {
359 self.deletion_bitmap.delete(id);
360 deleted += 1;
361 }
362 }
363
364 self.epoch = epoch;
365 self.write_manifest()?;
366
367 Ok(DeleteResult { deleted, epoch: self.epoch })
368 }
369
370 pub fn delete_by_filter(&mut self, filter_expr: &FilterExpr) -> Result<DeleteResult, RvfError> {
372 if self.read_only {
373 return Err(err(ErrorCode::ReadOnly));
374 }
375
376 let matching_ids: Vec<u64> = self.vectors.ids()
377 .filter(|&&id| {
378 !self.deletion_bitmap.is_deleted(id)
379 && filter::evaluate(filter_expr, id, &self.metadata)
380 })
381 .copied()
382 .collect();
383
384 if matching_ids.is_empty() {
385 return Ok(DeleteResult { deleted: 0, epoch: self.epoch });
386 }
387
388 self.delete(&matching_ids)
389 }
390
391 pub fn status(&self) -> StoreStatus {
393 let total_vectors = (self.vectors.len() as u64).saturating_sub(self.deletion_bitmap.count() as u64);
394 let file_size = self.file.metadata().map(|m| m.len()).unwrap_or(0);
395 let dead_space_ratio = {
396 let total = self.vectors.len() as f64;
397 let deleted = self.deletion_bitmap.count() as f64;
398 if total > 0.0 { deleted / total } else { 0.0 }
399 };
400
401 StoreStatus {
402 total_vectors,
403 total_segments: self.segment_dir.len() as u32,
404 file_size,
405 current_epoch: self.epoch,
406 profile_id: self.options.profile,
407 compaction_state: CompactionState::Idle,
408 dead_space_ratio,
409 read_only: self.read_only,
410 }
411 }
412
413 pub fn compact(&mut self) -> Result<CompactionResult, RvfError> {
419 if self.read_only {
420 return Err(err(ErrorCode::ReadOnly));
421 }
422
423 let deleted_ids = self.deletion_bitmap.to_sorted_ids();
424 for &id in &deleted_ids {
425 self.vectors.remove(id);
426 }
427 self.metadata.remove_ids(&deleted_ids);
428
429 let segments_compacted = deleted_ids.len() as u32;
430 let bytes_reclaimed = (deleted_ids.len() as u64) * (self.options.dimension as u64) * 4;
431
432 self.deletion_bitmap.clear();
433
434 let original_bytes = {
437 let mut reader = BufReader::new(&self.file);
438 reader.seek(SeekFrom::Start(0)).map_err(|_| err(ErrorCode::FsyncFailed))?;
439 let mut buf = Vec::new();
440 reader.read_to_end(&mut buf).map_err(|_| err(ErrorCode::FsyncFailed))?;
441 buf
442 };
443
444 let temp_path = self.path.with_extension("rvf.compact.tmp");
445 let mut new_segment_dir = Vec::new();
446 let mut seg_writer = SegmentWriter::new(1);
447 {
448 let temp_file = OpenOptions::new()
449 .read(true)
450 .write(true)
451 .create(true)
452 .truncate(true)
453 .open(&temp_path)
454 .map_err(|_| err(ErrorCode::DiskFull))?;
455
456 let mut temp_writer = BufWriter::new(&temp_file);
457
458 let live_ids: Vec<u64> = self.vectors.ids().copied().collect();
459 let live_vecs: Vec<Vec<f32>> = live_ids.iter()
460 .filter_map(|&id| self.vectors.get(id).map(|v| v.to_vec()))
461 .collect();
462
463 if !live_ids.is_empty() {
464 let vec_refs: Vec<&[f32]> = live_vecs.iter().map(|v| v.as_slice()).collect();
465 let (seg_id, offset) = seg_writer.write_vec_seg(
466 &mut temp_writer, &vec_refs, &live_ids, self.options.dimension,
467 ).map_err(|_| err(ErrorCode::FsyncFailed))?;
468
469 let bytes_per_vec = (self.options.dimension as usize) * 4;
470 let payload_len = (2 + 4 + live_ids.len() * (8 + bytes_per_vec)) as u64;
471 new_segment_dir.push((seg_id, offset, payload_len, SegmentType::Vec as u8));
472 }
473
474 let preserved = scan_preservable_segments(&original_bytes);
479 for (orig_offset, seg_id, payload_len, seg_type) in &preserved {
480 let total_bytes = match (*payload_len as usize).checked_add(SEGMENT_HEADER_SIZE) {
482 Some(t) => t,
483 None => continue, };
485 let end = match orig_offset.checked_add(total_bytes) {
486 Some(e) if e <= original_bytes.len() => e,
487 _ => continue, };
489 let src = &original_bytes[*orig_offset..end];
490
491 temp_writer.flush().map_err(|_| err(ErrorCode::FsyncFailed))?;
493 let new_offset = temp_writer.stream_position()
494 .map_err(|_| err(ErrorCode::FsyncFailed))?;
495
496 temp_writer.write_all(src).map_err(|_| err(ErrorCode::FsyncFailed))?;
497
498 while seg_writer.next_id() <= *seg_id {
500 seg_writer.alloc_seg_id();
501 }
502
503 new_segment_dir.push((*seg_id, new_offset, *payload_len, *seg_type));
504 }
505
506 self.epoch += 1;
507 let total_vectors = live_ids.len() as u64;
508 let empty_dels: Vec<u64> = Vec::new();
509 let fi = if self.file_identity.file_id != [0u8; 16] {
510 Some(&self.file_identity)
511 } else {
512 None
513 };
514 temp_writer.flush().map_err(|_| err(ErrorCode::FsyncFailed))?;
516 seg_writer.write_manifest_seg_with_identity(
517 &mut temp_writer, self.epoch, self.options.dimension,
518 total_vectors, self.options.profile, &new_segment_dir, &empty_dels, fi,
519 ).map_err(|_| err(ErrorCode::FsyncFailed))?;
520
521 temp_writer.flush().map_err(|_| err(ErrorCode::FsyncFailed))?;
522 temp_file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
523 }
524
525 fs::rename(&temp_path, &self.path).map_err(|_| err(ErrorCode::FsyncFailed))?;
526
527 if let Some(parent) = self.path.parent() {
529 if let Ok(dir) = std::fs::File::open(parent) {
530 let _ = dir.sync_all();
531 }
532 }
533
534 self.file = OpenOptions::new()
535 .read(true)
536 .write(true)
537 .open(&self.path)
538 .map_err(|_| err(ErrorCode::InvalidManifest))?;
539
540 self.segment_dir = new_segment_dir;
541 self.seg_writer = Some(seg_writer);
542 self.last_compaction_time = now_secs();
543
544 Ok(CompactionResult { segments_compacted, bytes_reclaimed, epoch: self.epoch })
545 }
546
547 pub fn close(self) -> Result<(), RvfError> {
549 self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
550
551 if let Some(lock) = self.writer_lock {
552 lock.release().map_err(|_| err(ErrorCode::LockHeld))?;
553 }
554
555 Ok(())
556 }
557
558
559 pub fn embed_kernel(
566 &mut self,
567 arch: u8,
568 kernel_type: u8,
569 kernel_flags: u32,
570 kernel_image: &[u8],
571 api_port: u16,
572 cmdline: Option<&str>,
573 ) -> Result<u64, RvfError> {
574 if self.read_only {
575 return Err(err(ErrorCode::ReadOnly));
576 }
577
578 let image_hash = simple_shake256_256(kernel_image);
579 let header = KernelHeader {
580 kernel_magic: KERNEL_MAGIC,
581 header_version: 1,
582 arch,
583 kernel_type,
584 kernel_flags,
585 min_memory_mb: 0,
586 entry_point: 0,
587 image_size: kernel_image.len() as u64,
588 compressed_size: kernel_image.len() as u64,
589 compression: 0,
590 api_transport: 0,
591 api_port,
592 api_version: 1,
593 image_hash,
594 build_id: [0u8; 16],
595 build_timestamp: 0,
596 vcpu_count: 0,
597 reserved_0: 0,
598 cmdline_offset: 128,
599 cmdline_length: cmdline.map_or(0, |s| s.len() as u32),
600 reserved_1: 0,
601 };
602 let header_bytes = header.to_bytes();
603
604 let cmdline_bytes = cmdline.map(|s| s.as_bytes());
605
606 let writer = self.seg_writer.as_mut()
607 .ok_or_else(|| err(ErrorCode::InvalidManifest))?;
608 let (seg_id, seg_offset) = {
609 let mut buf_writer = BufWriter::new(&self.file);
610 buf_writer.seek(SeekFrom::End(0))
611 .map_err(|_| err(ErrorCode::FsyncFailed))?;
612 writer.write_kernel_seg(
613 &mut buf_writer, &header_bytes, kernel_image, cmdline_bytes,
614 ).map_err(|_| err(ErrorCode::FsyncFailed))?
615 };
616
617 let cmdline_len = cmdline_bytes.map_or(0, |c| c.len());
618 let payload_len = (128 + kernel_image.len() + cmdline_len) as u64;
619 self.segment_dir.push((
620 seg_id, seg_offset, payload_len, SegmentType::Kernel as u8,
621 ));
622
623 self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
624 self.epoch += 1;
625 self.write_manifest()?;
626
627 Ok(seg_id)
628 }
629
630 pub fn embed_kernel_with_binding(
638 &mut self,
639 arch: u8,
640 kernel_type: u8,
641 kernel_flags: u32,
642 kernel_image: &[u8],
643 api_port: u16,
644 cmdline: Option<&str>,
645 binding: &KernelBinding,
646 ) -> Result<u64, RvfError> {
647 if self.read_only {
648 return Err(err(ErrorCode::ReadOnly));
649 }
650
651 let image_hash = simple_shake256_256(kernel_image);
652 let cmdline_len = cmdline.map_or(0u32, |s| s.len() as u32);
653 let header = KernelHeader {
654 kernel_magic: KERNEL_MAGIC,
655 header_version: 1,
656 arch,
657 kernel_type,
658 kernel_flags,
659 min_memory_mb: 0,
660 entry_point: 0,
661 image_size: kernel_image.len() as u64,
662 compressed_size: kernel_image.len() as u64,
663 compression: 0,
664 api_transport: 0,
665 api_port,
666 api_version: 1,
667 image_hash,
668 build_id: [0u8; 16],
669 build_timestamp: 0,
670 vcpu_count: 0,
671 reserved_0: 0,
672 cmdline_offset: 128 + 128,
674 cmdline_length: cmdline_len,
675 reserved_1: 0,
676 };
677 let header_bytes = header.to_bytes();
678 let binding_bytes = binding.to_bytes();
679
680 let cmdline_data = cmdline.map(|s| s.as_bytes());
682 let cmdline_slice = cmdline_data.unwrap_or(&[]);
683
684 let mut payload = Vec::with_capacity(128 + 128 + cmdline_slice.len() + kernel_image.len());
685 payload.extend_from_slice(&header_bytes);
686 payload.extend_from_slice(&binding_bytes);
687 payload.extend_from_slice(cmdline_slice);
688 payload.extend_from_slice(kernel_image);
689
690 let writer = self.seg_writer.as_mut()
691 .ok_or_else(|| err(ErrorCode::InvalidManifest))?;
692
693 let (seg_id, seg_offset) = {
694 let mut buf_writer = BufWriter::new(&self.file);
695 buf_writer.seek(SeekFrom::End(0))
696 .map_err(|_| err(ErrorCode::FsyncFailed))?;
697 writer.write_kernel_seg(
702 &mut buf_writer,
703 &header_bytes,
704 &payload[128..], None, ).map_err(|_| err(ErrorCode::FsyncFailed))?
707 };
708
709 let total_payload_len = payload.len() as u64;
710 self.segment_dir.push((
711 seg_id, seg_offset, total_payload_len, SegmentType::Kernel as u8,
712 ));
713
714 self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
715 self.epoch += 1;
716 self.write_manifest()?;
717
718 Ok(seg_id)
719 }
720
721 #[allow(clippy::type_complexity)]
731 pub fn extract_kernel(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>, RvfError> {
732 let entry = self.segment_dir.iter()
733 .find(|&&(_, _, _, stype)| stype == SegmentType::Kernel as u8);
734
735 let entry = match entry {
736 Some(e) => e,
737 None => return Ok(None),
738 };
739
740 let (_header, payload) = {
741 let mut reader = BufReader::new(&self.file);
742 read_path::read_segment_payload(&mut reader, entry.1)
743 .map_err(|_| err(ErrorCode::InvalidChecksum))?
744 };
745
746 if payload.len() < 128 {
747 return Err(err(ErrorCode::TruncatedSegment));
748 }
749
750 let kernel_header = payload[..128].to_vec();
751 let kernel_image = payload[128..].to_vec();
752
753 Ok(Some((kernel_header, kernel_image)))
754 }
755
756 pub fn extract_kernel_binding(&self) -> Result<Option<KernelBinding>, RvfError> {
761 let result = self.extract_kernel()?;
762 match result {
763 None => Ok(None),
764 Some((_header_bytes, remainder)) => {
765 if remainder.len() < 128 {
766 return Ok(None);
768 }
769 let mut binding_data = [0u8; 128];
770 binding_data.copy_from_slice(&remainder[..128]);
771 let binding = KernelBinding::from_bytes(&binding_data);
772 if binding.binding_version == 0 {
774 return Ok(None);
775 }
776 Ok(Some(binding))
777 }
778 }
779 }
780
781 pub fn embed_ebpf(
786 &mut self,
787 program_type: u8,
788 attach_type: u8,
789 max_dimension: u16,
790 program_bytecode: &[u8],
791 btf_data: Option<&[u8]>,
792 ) -> Result<u64, RvfError> {
793 if self.read_only {
794 return Err(err(ErrorCode::ReadOnly));
795 }
796
797 let program_hash = simple_shake256_256(program_bytecode);
798 let header = EbpfHeader {
799 ebpf_magic: EBPF_MAGIC,
800 header_version: 1,
801 program_type,
802 attach_type,
803 program_flags: 0,
804 insn_count: (program_bytecode.len() / 8) as u16,
805 max_dimension,
806 program_size: program_bytecode.len() as u64,
807 map_count: 0,
808 btf_size: btf_data.map_or(0, |b| b.len() as u32),
809 program_hash,
810 };
811 let header_bytes = header.to_bytes();
812
813 let writer = self.seg_writer.as_mut()
814 .ok_or_else(|| err(ErrorCode::InvalidManifest))?;
815 let (seg_id, seg_offset) = {
816 let mut buf_writer = BufWriter::new(&self.file);
817 buf_writer.seek(SeekFrom::End(0))
818 .map_err(|_| err(ErrorCode::FsyncFailed))?;
819 writer.write_ebpf_seg(
820 &mut buf_writer, &header_bytes, program_bytecode, btf_data,
821 ).map_err(|_| err(ErrorCode::FsyncFailed))?
822 };
823
824 let btf_len = btf_data.map_or(0, |b| b.len());
825 let payload_len = (64 + program_bytecode.len() + btf_len) as u64;
826 self.segment_dir.push((
827 seg_id, seg_offset, payload_len, SegmentType::Ebpf as u8,
828 ));
829
830 self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
831 self.epoch += 1;
832 self.write_manifest()?;
833
834 Ok(seg_id)
835 }
836
837 #[allow(clippy::type_complexity)]
843 pub fn extract_ebpf(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>, RvfError> {
844 let entry = self.segment_dir.iter()
845 .find(|&&(_, _, _, stype)| stype == SegmentType::Ebpf as u8);
846
847 let entry = match entry {
848 Some(e) => e,
849 None => return Ok(None),
850 };
851
852 let (_header, payload) = {
853 let mut reader = BufReader::new(&self.file);
854 read_path::read_segment_payload(&mut reader, entry.1)
855 .map_err(|_| err(ErrorCode::InvalidChecksum))?
856 };
857
858 if payload.len() < 64 {
859 return Err(err(ErrorCode::TruncatedSegment));
860 }
861
862 let ebpf_header = payload[..64].to_vec();
863 let ebpf_bytecode = payload[64..].to_vec();
864
865 Ok(Some((ebpf_header, ebpf_bytecode)))
866 }
867
868 pub fn segment_dir(&self) -> &[(u64, u64, u64, u8)] {
870 &self.segment_dir
871 }
872
873 pub fn dimension(&self) -> u16 {
875 self.options.dimension
876 }
877
878 pub fn file_identity(&self) -> &FileIdentity {
880 &self.file_identity
881 }
882
883 pub fn file_id(&self) -> &[u8; 16] {
885 &self.file_identity.file_id
886 }
887
888 pub fn parent_id(&self) -> &[u8; 16] {
890 &self.file_identity.parent_id
891 }
892
893 pub fn lineage_depth(&self) -> u32 {
895 self.file_identity.lineage_depth
896 }
897
898 pub fn branch(&self, child_path: &Path) -> Result<Self, RvfError> {
904 let dim = self.options.dimension as u32;
906 let bytes_per_vec = dim * 4; let vectors_per_cluster = if bytes_per_vec > 0 {
908 (4096 / bytes_per_vec).max(1)
909 } else {
910 64
911 };
912 let cluster_size = vectors_per_cluster * bytes_per_vec;
913 let total_vecs = self.vectors.len() as u64;
914 let cluster_count = if vectors_per_cluster > 0 {
915 total_vecs.div_ceil(vectors_per_cluster as u64) as u32
916 } else {
917 0
918 };
919
920 let mut child = self.derive(
922 child_path,
923 rvf_types::DerivationType::Clone,
924 Some(self.options.clone()),
925 )?;
926
927 child.cow_engine = Some(CowEngine::from_parent(
929 cluster_count,
930 cluster_size,
931 vectors_per_cluster,
932 bytes_per_vec,
933 ));
934
935 let mut filter = MembershipFilter::new_include(total_vecs);
937 for &vid in self.vectors.ids() {
938 if !self.deletion_bitmap.is_deleted(vid) {
939 filter.add(vid);
940 }
941 }
942 child.membership_filter = Some(filter);
943
944 Ok(child)
945 }
946
947 pub fn freeze(&mut self) -> Result<(), RvfError> {
949 if self.read_only {
950 return Err(err(ErrorCode::ReadOnly));
951 }
952
953 if let Some(ref mut engine) = self.cow_engine {
954 engine.freeze(self.epoch)?;
955 }
956
957 self.read_only = true;
959 Ok(())
960 }
961
962 pub fn is_cow_child(&self) -> bool {
964 self.cow_engine.is_some()
965 }
966
967 pub fn cow_stats(&self) -> Option<CowStats> {
969 self.cow_engine.as_ref().map(|e| e.stats())
970 }
971
972 pub fn membership_filter(&self) -> Option<&MembershipFilter> {
974 self.membership_filter.as_ref()
975 }
976
977 pub fn membership_filter_mut(&mut self) -> Option<&mut MembershipFilter> {
979 self.membership_filter.as_mut()
980 }
981
982 pub fn parent_path(&self) -> Option<&Path> {
984 self.parent_path.as_deref()
985 }
986
987 pub fn derive(
993 &self,
994 child_path: &Path,
995 _derivation_type: rvf_types::DerivationType,
996 child_options: Option<RvfOptions>,
997 ) -> Result<Self, RvfError> {
998 let opts = child_options.unwrap_or_else(|| self.options.clone());
999
1000 let child_file_id = generate_file_id(child_path);
1001
1002 let parent_hash = self.compute_own_manifest_hash()?;
1004
1005 let new_depth = self.file_identity.lineage_depth.checked_add(1)
1006 .ok_or_else(|| err(ErrorCode::LineageBroken))?;
1007
1008 let child_identity = FileIdentity {
1009 file_id: child_file_id,
1010 parent_id: self.file_identity.file_id,
1011 parent_hash,
1012 lineage_depth: new_depth,
1013 };
1014
1015 let file = OpenOptions::new()
1016 .read(true)
1017 .write(true)
1018 .create_new(true)
1019 .open(child_path)
1020 .map_err(|_| err(ErrorCode::FsyncFailed))?;
1021
1022 let writer_lock = WriterLock::acquire(child_path)
1023 .map_err(|_| err(ErrorCode::LockHeld))?;
1024
1025 let domain_profile = child_path
1027 .extension()
1028 .and_then(|ext| ext.to_str())
1029 .and_then(DomainProfile::from_extension)
1030 .unwrap_or(opts.domain_profile);
1031
1032 let mut child_opts = opts;
1033 child_opts.domain_profile = domain_profile;
1034
1035 let mut store = Self {
1036 path: child_path.to_path_buf(),
1037 options: child_opts,
1038 file,
1039 seg_writer: Some(SegmentWriter::new(1)),
1040 writer_lock: Some(writer_lock),
1041 vectors: VectorData::new(self.options.dimension),
1042 deletion_bitmap: DeletionBitmap::new(),
1043 metadata: MetadataStore::new(),
1044 epoch: 0,
1045 segment_dir: Vec::new(),
1046 read_only: false,
1047 last_compaction_time: 0,
1048 file_identity: child_identity,
1049 cow_engine: None,
1050 membership_filter: None,
1051 parent_path: Some(self.path.clone()),
1052 };
1053
1054 store.write_manifest()?;
1055 Ok(store)
1056 }
1057
1058 fn compute_own_manifest_hash(&self) -> Result<[u8; 32], RvfError> {
1060 use std::io::Read;
1061 let file_len = self.file.metadata()
1062 .map_err(|_| err(ErrorCode::InvalidManifest))?
1063 .len();
1064 if file_len == 0 {
1065 return Ok([0u8; 32]);
1066 }
1067 let read_len = file_len.min(65536) as usize;
1069 let mut reader = BufReader::new(&self.file);
1070 reader.seek(SeekFrom::End(-(read_len as i64)))
1071 .map_err(|_| err(ErrorCode::InvalidManifest))?;
1072 let mut buf = vec![0u8; read_len];
1073 reader.read_exact(&mut buf).map_err(|_| err(ErrorCode::InvalidManifest))?;
1074 Ok(simple_shake256_256(&buf))
1075 }
1076
1077 fn boot(&mut self) -> Result<(), RvfError> {
1080 let manifest = {
1081 let mut reader = BufReader::new(&self.file);
1082 read_path::find_latest_manifest(&mut reader)
1083 .map_err(|_| err(ErrorCode::ManifestNotFound))?
1084 };
1085
1086 let manifest = match manifest {
1087 Some(m) => m,
1088 None => return Err(err(ErrorCode::ManifestNotFound)),
1089 };
1090
1091 self.epoch = manifest.epoch;
1092 self.options.dimension = manifest.dimension;
1093 self.options.profile = manifest.profile_id;
1094 self.vectors = VectorData::new(manifest.dimension);
1095 self.deletion_bitmap = DeletionBitmap::from_ids(&manifest.deleted_ids);
1096
1097 self.segment_dir = manifest.segment_dir.iter()
1098 .map(|e| (e.seg_id, e.offset, e.payload_length, e.seg_type))
1099 .collect();
1100
1101 let vec_seg_entries: Vec<_> = manifest.segment_dir.iter()
1102 .filter(|e| e.seg_type == SegmentType::Vec as u8)
1103 .collect();
1104
1105 for entry in vec_seg_entries {
1106 let (_header, payload) = {
1107 let mut reader = BufReader::new(&self.file);
1108 read_path::read_segment_payload(&mut reader, entry.offset)
1109 .map_err(|_| err(ErrorCode::InvalidChecksum))?
1110 };
1111
1112 if let Some(vec_entries) = read_path::read_vec_seg_payload(&payload) {
1113 for (vec_id, vec_data) in vec_entries {
1114 self.vectors.insert(vec_id, vec_data);
1115 }
1116 }
1117 }
1118
1119 if let Some(fi) = manifest.file_identity {
1121 self.file_identity = fi;
1122 }
1123
1124 if !self.read_only {
1125 let max_seg_id = self.segment_dir.iter()
1126 .map(|&(id, _, _, _)| id)
1127 .max()
1128 .unwrap_or(0);
1129 self.seg_writer = Some(SegmentWriter::new(max_seg_id + 1));
1130 }
1131
1132 Ok(())
1133 }
1134
1135 fn write_manifest(&mut self) -> Result<(), RvfError> {
1136 let writer = self.seg_writer.as_mut().ok_or_else(|| err(ErrorCode::InvalidManifest))?;
1137
1138 let total_vectors = self.vectors.len() as u64;
1139 let deleted_ids = self.deletion_bitmap.to_sorted_ids();
1140
1141 let fi = if self.file_identity.file_id != [0u8; 16] {
1143 Some(&self.file_identity)
1144 } else {
1145 None
1146 };
1147
1148 let (manifest_seg_id, manifest_offset) = {
1149 let mut buf_writer = BufWriter::new(&self.file);
1150 buf_writer.seek(SeekFrom::End(0)).map_err(|_| err(ErrorCode::FsyncFailed))?;
1151 writer.write_manifest_seg_with_identity(
1152 &mut buf_writer, self.epoch, self.options.dimension,
1153 total_vectors, self.options.profile, &self.segment_dir, &deleted_ids, fi,
1154 ).map_err(|_| err(ErrorCode::FsyncFailed))?
1155 };
1156
1157 let mut manifest_payload_len = (22 + self.segment_dir.len() * 25 + 4 + deleted_ids.len() * 8) as u64;
1158 if fi.is_some() {
1159 manifest_payload_len += 4 + 68; }
1161 self.segment_dir.push((manifest_seg_id, manifest_offset, manifest_payload_len, SegmentType::Manifest as u8));
1162
1163 self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
1164 Ok(())
1165 }
1166}
1167
1168fn compute_distance(a: &[f32], b: &[f32], metric: &DistanceMetric) -> f32 {
1169 match metric {
1170 DistanceMetric::L2 => {
1171 a.iter().zip(b.iter()).map(|(x, y)| { let d = x - y; d * d }).sum()
1172 }
1173 DistanceMetric::InnerProduct => {
1174 let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
1175 -dot
1176 }
1177 DistanceMetric::Cosine => {
1178 let mut dot = 0.0f32;
1179 let mut norm_a = 0.0f32;
1180 let mut norm_b = 0.0f32;
1181 for (x, y) in a.iter().zip(b.iter()) {
1182 dot += x * y;
1183 norm_a += x * x;
1184 norm_b += y * y;
1185 }
1186 let denom = (norm_a * norm_b).sqrt();
1187 if denom < f32::EPSILON { 1.0 } else { 1.0 - dot / denom }
1188 }
1189 }
1190}
1191
1192#[derive(Clone, Copy, Debug, PartialEq)]
1193struct OrderedFloat(f32);
1194
1195impl Eq for OrderedFloat {}
1196
1197impl PartialOrd for OrderedFloat {
1198 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1199 Some(self.cmp(other))
1200 }
1201}
1202
1203impl Ord for OrderedFloat {
1204 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1205 self.0.partial_cmp(&other.0).unwrap_or(std::cmp::Ordering::Equal)
1206 }
1207}
1208
1209fn generate_file_id(path: &Path) -> [u8; 16] {
1217 let path_str = path.to_string_lossy();
1218 let path_bytes = path_str.as_bytes();
1219
1220 let ts = std::time::SystemTime::now()
1221 .duration_since(std::time::UNIX_EPOCH)
1222 .map(|d| d.as_nanos() as u64)
1223 .unwrap_or(0);
1224 let ts_bytes = ts.to_le_bytes();
1225
1226 let mut input = Vec::with_capacity(path_bytes.len() + 8);
1228 input.extend_from_slice(path_bytes);
1229 input.extend_from_slice(&ts_bytes);
1230
1231 let digest = simple_shake256_256(&input);
1232 let mut id = [0u8; 16];
1233 id.copy_from_slice(&digest[..16]);
1234 id
1235}
1236
1237pub(crate) fn simple_shake256_256(data: &[u8]) -> [u8; 32] {
1240 let mut out = [0u8; 32];
1244 for (i, &b) in data.iter().enumerate() {
1245 out[i % 32] = out[i % 32].wrapping_add(b);
1246 let j = (i + 13) % 32;
1248 out[j] = out[j].wrapping_add(out[i % 32].rotate_left(3));
1249 }
1250 out
1251}
1252
1253fn scan_preservable_segments(file_bytes: &[u8]) -> Vec<(usize, u64, u64, u8)> {
1261 let magic_bytes = SEGMENT_MAGIC.to_le_bytes();
1262 let mut results = Vec::new();
1263
1264 if file_bytes.len() < SEGMENT_HEADER_SIZE {
1265 return results;
1266 }
1267
1268 let last_possible = file_bytes.len() - SEGMENT_HEADER_SIZE;
1269 let mut i = 0;
1270 while i <= last_possible {
1271 if file_bytes[i..i + 4] == magic_bytes {
1272 let seg_type = file_bytes[i + 5];
1273 let seg_id = u64::from_le_bytes([
1274 file_bytes[i + 0x08], file_bytes[i + 0x09],
1275 file_bytes[i + 0x0A], file_bytes[i + 0x0B],
1276 file_bytes[i + 0x0C], file_bytes[i + 0x0D],
1277 file_bytes[i + 0x0E], file_bytes[i + 0x0F],
1278 ]);
1279 let payload_len = u64::from_le_bytes([
1280 file_bytes[i + 0x10], file_bytes[i + 0x11],
1281 file_bytes[i + 0x12], file_bytes[i + 0x13],
1282 file_bytes[i + 0x14], file_bytes[i + 0x15],
1283 file_bytes[i + 0x16], file_bytes[i + 0x17],
1284 ]);
1285
1286 let total = match (payload_len as usize).checked_add(SEGMENT_HEADER_SIZE) {
1288 Some(t) if payload_len <= file_bytes.len() as u64 => t,
1289 _ => {
1290 i += 1;
1292 continue;
1293 }
1294 };
1295
1296 if seg_type != SegmentType::Vec as u8
1299 && seg_type != SegmentType::Manifest as u8
1300 && seg_type != SegmentType::Journal as u8
1301 {
1302 if i.checked_add(total).is_some_and(|end| end <= file_bytes.len()) {
1304 results.push((i, seg_id, payload_len, seg_type));
1305 }
1306 }
1307
1308 if total > 0 {
1311 match i.checked_add(total) {
1312 Some(next) if next > i => i = next,
1313 _ => i += 1,
1314 }
1315 } else {
1316 i += 1;
1317 }
1318 } else {
1319 i += 1;
1320 }
1321 }
1322
1323 results
1324}
1325
1326fn now_secs() -> u64 {
1327 std::time::SystemTime::now()
1328 .duration_since(std::time::UNIX_EPOCH)
1329 .map(|d| d.as_secs())
1330 .unwrap_or(0)
1331}
1332
1333#[cfg(test)]
1334mod tests {
1335 use super::*;
1336 use crate::filter::FilterValue;
1337 use tempfile::TempDir;
1338
1339 fn random_vector(dim: usize, seed: u64) -> Vec<f32> {
1340 let mut v = Vec::with_capacity(dim);
1341 let mut x = seed;
1342 for _ in 0..dim {
1343 x = x.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407);
1344 v.push(((x >> 33) as f32) / (u32::MAX as f32) - 0.5);
1345 }
1346 v
1347 }
1348
1349 #[test]
1350 fn create_ingest_query() {
1351 let dir = TempDir::new().unwrap();
1352 let path = dir.path().join("test.rvf");
1353
1354 let options = RvfOptions {
1355 dimension: 8,
1356 metric: DistanceMetric::L2,
1357 ..Default::default()
1358 };
1359
1360 let mut store = RvfStore::create(&path, options).unwrap();
1361
1362 let dim = 8;
1363 let vecs: Vec<Vec<f32>> = (0..100).map(|i| random_vector(dim, i)).collect();
1364 let vec_refs: Vec<&[f32]> = vecs.iter().map(|v| v.as_slice()).collect();
1365 let ids: Vec<u64> = (0..100).collect();
1366
1367 let result = store.ingest_batch(&vec_refs, &ids, None).unwrap();
1368 assert_eq!(result.accepted, 100);
1369 assert_eq!(result.rejected, 0);
1370
1371 let query_vec = random_vector(dim, 42);
1372 let results = store.query(&query_vec, 10, &QueryOptions::default()).unwrap();
1373 assert_eq!(results.len(), 10);
1374
1375 for i in 1..results.len() {
1376 assert!(results[i].distance >= results[i - 1].distance);
1377 }
1378
1379 assert_eq!(results[0].id, 42);
1380 assert!(results[0].distance < f32::EPSILON);
1381
1382 store.close().unwrap();
1383 }
1384
1385 #[test]
1386 fn open_existing_store() {
1387 let dir = TempDir::new().unwrap();
1388 let path = dir.path().join("reopen.rvf");
1389
1390 let options = RvfOptions {
1391 dimension: 4,
1392 metric: DistanceMetric::L2,
1393 ..Default::default()
1394 };
1395
1396 {
1397 let mut store = RvfStore::create(&path, options.clone()).unwrap();
1398 let v1 = vec![1.0, 0.0, 0.0, 0.0];
1399 let v2 = vec![0.0, 1.0, 0.0, 0.0];
1400 let vecs: Vec<&[f32]> = vec![&v1, &v2];
1401 let ids = vec![10, 20];
1402 store.ingest_batch(&vecs, &ids, None).unwrap();
1403 store.close().unwrap();
1404 }
1405
1406 {
1407 let store = RvfStore::open(&path).unwrap();
1408 let query = vec![1.0, 0.0, 0.0, 0.0];
1409 let results = store.query(&query, 2, &QueryOptions::default()).unwrap();
1410 assert_eq!(results.len(), 2);
1411 assert_eq!(results[0].id, 10);
1412 assert!(results[0].distance < f32::EPSILON);
1413 store.close().unwrap();
1414 }
1415 }
1416
1417 #[test]
1418 fn delete_vectors() {
1419 let dir = TempDir::new().unwrap();
1420 let path = dir.path().join("delete.rvf");
1421
1422 let options = RvfOptions {
1423 dimension: 4,
1424 metric: DistanceMetric::L2,
1425 ..Default::default()
1426 };
1427
1428 let mut store = RvfStore::create(&path, options).unwrap();
1429
1430 let v1 = vec![1.0, 0.0, 0.0, 0.0];
1431 let v2 = vec![0.0, 1.0, 0.0, 0.0];
1432 let v3 = vec![0.0, 0.0, 1.0, 0.0];
1433 let vecs: Vec<&[f32]> = vec![&v1, &v2, &v3];
1434 let ids = vec![1, 2, 3];
1435 store.ingest_batch(&vecs, &ids, None).unwrap();
1436
1437 let del_result = store.delete(&[2]).unwrap();
1438 assert_eq!(del_result.deleted, 1);
1439
1440 let query = vec![0.0, 1.0, 0.0, 0.0];
1441 let results = store.query(&query, 10, &QueryOptions::default()).unwrap();
1442 assert_eq!(results.len(), 2);
1443 assert!(results.iter().all(|r| r.id != 2));
1444
1445 store.close().unwrap();
1446 }
1447
1448 #[test]
1449 fn filter_query() {
1450 let dir = TempDir::new().unwrap();
1451 let path = dir.path().join("filter.rvf");
1452
1453 let options = RvfOptions {
1454 dimension: 4,
1455 metric: DistanceMetric::L2,
1456 ..Default::default()
1457 };
1458
1459 let mut store = RvfStore::create(&path, options).unwrap();
1460
1461 let v1 = vec![1.0, 0.0, 0.0, 0.0];
1462 let v2 = vec![0.0, 1.0, 0.0, 0.0];
1463 let v3 = vec![0.0, 0.0, 1.0, 0.0];
1464 let vecs: Vec<&[f32]> = vec![&v1, &v2, &v3];
1465 let ids = vec![1, 2, 3];
1466 let metadata = vec![
1467 MetadataEntry { field_id: 0, value: MetadataValue::String("cat_a".into()) },
1468 MetadataEntry { field_id: 0, value: MetadataValue::String("cat_b".into()) },
1469 MetadataEntry { field_id: 0, value: MetadataValue::String("cat_a".into()) },
1470 ];
1471 store.ingest_batch(&vecs, &ids, Some(&metadata)).unwrap();
1472
1473 let query = vec![0.5, 0.5, 0.5, 0.0];
1474 let query_opts = QueryOptions {
1475 filter: Some(FilterExpr::Eq(0, FilterValue::String("cat_a".into()))),
1476 ..Default::default()
1477 };
1478 let results = store.query(&query, 10, &query_opts).unwrap();
1479 assert_eq!(results.len(), 2);
1480 assert!(results.iter().all(|r| r.id == 1 || r.id == 3));
1481
1482 store.close().unwrap();
1483 }
1484
1485 #[test]
1486 fn status_reports() {
1487 let dir = TempDir::new().unwrap();
1488 let path = dir.path().join("status.rvf");
1489
1490 let options = RvfOptions {
1491 dimension: 4,
1492 metric: DistanceMetric::L2,
1493 ..Default::default()
1494 };
1495
1496 let mut store = RvfStore::create(&path, options).unwrap();
1497
1498 let status = store.status();
1499 assert_eq!(status.total_vectors, 0);
1500 assert!(!status.read_only);
1501
1502 let v1 = [1.0, 0.0, 0.0, 0.0];
1503 store.ingest_batch(&[&v1[..]], &[1], None).unwrap();
1504
1505 let status = store.status();
1506 assert_eq!(status.total_vectors, 1);
1507 assert!(status.file_size > 0);
1508
1509 store.close().unwrap();
1510 }
1511
1512 #[test]
1513 fn compact_reclaims_space() {
1514 let dir = TempDir::new().unwrap();
1515 let path = dir.path().join("compact.rvf");
1516
1517 let options = RvfOptions {
1518 dimension: 4,
1519 metric: DistanceMetric::L2,
1520 ..Default::default()
1521 };
1522
1523 let mut store = RvfStore::create(&path, options).unwrap();
1524
1525 let vecs: Vec<Vec<f32>> = (0..10).map(|i| vec![i as f32, 0.0, 0.0, 0.0]).collect();
1526 let vec_refs: Vec<&[f32]> = vecs.iter().map(|v| v.as_slice()).collect();
1527 let ids: Vec<u64> = (0..10).collect();
1528 store.ingest_batch(&vec_refs, &ids, None).unwrap();
1529
1530 store.delete(&[0, 2, 4, 6, 8]).unwrap();
1531
1532 let status = store.status();
1533 assert_eq!(status.total_vectors, 5);
1534 assert!(status.dead_space_ratio > 0.0);
1535
1536 let compact_result = store.compact().unwrap();
1537 assert_eq!(compact_result.segments_compacted, 5);
1538
1539 let query = vec![1.0, 0.0, 0.0, 0.0];
1540 let results = store.query(&query, 10, &QueryOptions::default()).unwrap();
1541 assert_eq!(results.len(), 5);
1542 let result_ids: Vec<u64> = results.iter().map(|r| r.id).collect();
1543 for id in &[1, 3, 5, 7, 9] {
1544 assert!(result_ids.contains(id));
1545 }
1546
1547 store.close().unwrap();
1548 }
1549
1550 #[test]
1551 fn lock_prevents_two_writers() {
1552 let dir = TempDir::new().unwrap();
1553 let path = dir.path().join("locked.rvf");
1554
1555 let options = RvfOptions {
1556 dimension: 4,
1557 metric: DistanceMetric::L2,
1558 ..Default::default()
1559 };
1560
1561 let _store1 = RvfStore::create(&path, options.clone()).unwrap();
1562
1563 let result = RvfStore::open(&path);
1564 assert!(result.is_err());
1565 }
1566
1567 #[test]
1568 fn readonly_open() {
1569 let dir = TempDir::new().unwrap();
1570 let path = dir.path().join("readonly.rvf");
1571
1572 let options = RvfOptions {
1573 dimension: 4,
1574 metric: DistanceMetric::L2,
1575 ..Default::default()
1576 };
1577
1578 {
1579 let mut store = RvfStore::create(&path, options).unwrap();
1580 let v1 = [1.0, 0.0, 0.0, 0.0];
1581 store.ingest_batch(&[&v1[..]], &[1], None).unwrap();
1582 store.close().unwrap();
1583 }
1584
1585 let store = RvfStore::open_readonly(&path).unwrap();
1586 let status = store.status();
1587 assert!(status.read_only);
1588 assert_eq!(status.total_vectors, 1);
1589
1590 let query = vec![1.0, 0.0, 0.0, 0.0];
1591 let results = store.query(&query, 1, &QueryOptions::default()).unwrap();
1592 assert_eq!(results.len(), 1);
1593 }
1594
1595 #[test]
1596 fn delete_by_filter_works() {
1597 let dir = TempDir::new().unwrap();
1598 let path = dir.path().join("del_filter.rvf");
1599
1600 let options = RvfOptions {
1601 dimension: 4,
1602 metric: DistanceMetric::L2,
1603 ..Default::default()
1604 };
1605
1606 let mut store = RvfStore::create(&path, options).unwrap();
1607
1608 let v1 = vec![1.0, 0.0, 0.0, 0.0];
1609 let v2 = vec![0.0, 1.0, 0.0, 0.0];
1610 let v3 = vec![0.0, 0.0, 1.0, 0.0];
1611 let vecs: Vec<&[f32]> = vec![&v1, &v2, &v3];
1612 let ids = vec![1, 2, 3];
1613 let metadata = vec![
1614 MetadataEntry { field_id: 0, value: MetadataValue::U64(10) },
1615 MetadataEntry { field_id: 0, value: MetadataValue::U64(20) },
1616 MetadataEntry { field_id: 0, value: MetadataValue::U64(30) },
1617 ];
1618 store.ingest_batch(&vecs, &ids, Some(&metadata)).unwrap();
1619
1620 let filter = FilterExpr::Gt(0, FilterValue::U64(15));
1621 let del_result = store.delete_by_filter(&filter).unwrap();
1622 assert_eq!(del_result.deleted, 2);
1623
1624 let query = vec![0.0, 0.0, 0.0, 0.0];
1625 let results = store.query(&query, 10, &QueryOptions::default()).unwrap();
1626 assert_eq!(results.len(), 1);
1627 assert_eq!(results[0].id, 1);
1628
1629 store.close().unwrap();
1630 }
1631
1632 #[test]
1633 fn embed_extract_kernel_round_trip() {
1634 let dir = TempDir::new().unwrap();
1635 let path = dir.path().join("kernel_rt.rvf");
1636
1637 let options = RvfOptions {
1638 dimension: 4,
1639 metric: DistanceMetric::L2,
1640 ..Default::default()
1641 };
1642
1643 let mut store = RvfStore::create(&path, options).unwrap();
1644
1645 let kernel_image = b"fake-compressed-kernel-image-0123456789abcdef";
1646 let seg_id = store.embed_kernel(
1647 1, 0, 0x01, kernel_image,
1651 8080, Some("console=ttyS0 quiet"),
1653 ).unwrap();
1654 assert!(seg_id > 0);
1655
1656 let result = store.extract_kernel().unwrap();
1657 assert!(result.is_some());
1658 let (header_bytes, image_bytes) = result.unwrap();
1659 assert_eq!(header_bytes.len(), 128);
1660
1661 assert!(image_bytes.starts_with(kernel_image));
1664
1665 let magic = u32::from_le_bytes([
1667 header_bytes[0], header_bytes[1],
1668 header_bytes[2], header_bytes[3],
1669 ]);
1670 assert_eq!(magic, KERNEL_MAGIC);
1671
1672 assert_eq!(header_bytes[0x06], 1);
1674
1675 let port = u16::from_be_bytes([header_bytes[0x2A], header_bytes[0x2B]]);
1677 assert_eq!(port, 8080);
1678
1679 store.close().unwrap();
1680 }
1681
1682 #[test]
1683 fn embed_extract_ebpf_round_trip() {
1684 let dir = TempDir::new().unwrap();
1685 let path = dir.path().join("ebpf_rt.rvf");
1686
1687 let options = RvfOptions {
1688 dimension: 4,
1689 metric: DistanceMetric::L2,
1690 ..Default::default()
1691 };
1692
1693 let mut store = RvfStore::create(&path, options).unwrap();
1694
1695 let bytecode = b"ebpf-program-instructions-here";
1696 let btf = b"btf-type-information";
1697 let seg_id = store.embed_ebpf(
1698 2, 1, 1024, bytecode,
1702 Some(btf),
1703 ).unwrap();
1704 assert!(seg_id > 0);
1705
1706 let result = store.extract_ebpf().unwrap();
1707 assert!(result.is_some());
1708 let (header_bytes, payload_bytes) = result.unwrap();
1709 assert_eq!(header_bytes.len(), 64);
1710
1711 assert_eq!(payload_bytes.len(), bytecode.len() + btf.len());
1713 assert_eq!(&payload_bytes[..bytecode.len()], bytecode);
1714 assert_eq!(&payload_bytes[bytecode.len()..], btf);
1715
1716 let magic = u32::from_le_bytes([
1718 header_bytes[0], header_bytes[1],
1719 header_bytes[2], header_bytes[3],
1720 ]);
1721 assert_eq!(magic, EBPF_MAGIC);
1722
1723 assert_eq!(header_bytes[0x06], 2);
1725
1726 let dim = u16::from_le_bytes([header_bytes[0x0E], header_bytes[0x0F]]);
1728 assert_eq!(dim, 1024);
1729
1730 store.close().unwrap();
1731 }
1732
1733 #[test]
1734 fn embed_kernel_persists_through_reopen() {
1735 let dir = TempDir::new().unwrap();
1736 let path = dir.path().join("kernel_persist.rvf");
1737
1738 let options = RvfOptions {
1739 dimension: 4,
1740 metric: DistanceMetric::L2,
1741 ..Default::default()
1742 };
1743
1744 let kernel_image = b"persistent-kernel-image-data";
1745
1746 {
1747 let mut store = RvfStore::create(&path, options).unwrap();
1748 store.embed_kernel(
1749 2, 1, 0, kernel_image,
1753 9090,
1754 None,
1755 ).unwrap();
1756 store.close().unwrap();
1757 }
1758
1759 {
1760 let store = RvfStore::open_readonly(&path).unwrap();
1761 let result = store.extract_kernel().unwrap();
1762 assert!(result.is_some());
1763 let (header_bytes, image_bytes) = result.unwrap();
1764 assert_eq!(header_bytes.len(), 128);
1765 assert_eq!(image_bytes, kernel_image);
1766
1767 assert_eq!(header_bytes[0x06], 2);
1769
1770 let port = u16::from_be_bytes([header_bytes[0x2A], header_bytes[0x2B]]);
1772 assert_eq!(port, 9090);
1773 }
1774 }
1775
1776 #[test]
1777 fn extract_returns_none_when_no_segment() {
1778 let dir = TempDir::new().unwrap();
1779 let path = dir.path().join("no_kernel.rvf");
1780
1781 let options = RvfOptions {
1782 dimension: 4,
1783 metric: DistanceMetric::L2,
1784 ..Default::default()
1785 };
1786
1787 let store = RvfStore::create(&path, options).unwrap();
1788 assert!(store.extract_kernel().unwrap().is_none());
1789 assert!(store.extract_ebpf().unwrap().is_none());
1790 store.close().unwrap();
1791 }
1792
1793}