Skip to main content

rvf_runtime/
store.rs

1//! Main RvfStore API — the primary user-facing interface.
2//!
3//! Ties together the write path, read path, indexing, deletion, and
4//! compaction into a single cohesive store.
5
6use std::collections::BinaryHeap;
7use std::fs::{self, File, OpenOptions};
8use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
9use std::path::{Path, PathBuf};
10
11use rvf_types::{
12    DomainProfile, ErrorCode, FileIdentity, RvfError, SegmentType,
13    SEGMENT_HEADER_SIZE, SEGMENT_MAGIC,
14};
15use rvf_types::kernel::{KernelHeader, KERNEL_MAGIC};
16use rvf_types::kernel_binding::KernelBinding;
17use rvf_types::ebpf::{EbpfHeader, EBPF_MAGIC};
18
19use crate::cow::{CowEngine, CowStats};
20use crate::deletion::DeletionBitmap;
21use crate::filter::{self, FilterExpr, FilterValue, MetadataStore, metadata_value_to_filter};
22use crate::locking::WriterLock;
23use crate::membership::MembershipFilter;
24use crate::options::*;
25use crate::read_path::{self, VectorData};
26use crate::status::{CompactionState, StoreStatus};
27use crate::write_path::SegmentWriter;
28
29/// Helper to convert any error into an RvfError with the given code.
30fn err(code: ErrorCode) -> RvfError {
31    RvfError::Code(code)
32}
33
34/// The main RVF store handle.
35///
36/// Provides create, open, ingest, query, delete, compact, and close.
37pub struct RvfStore {
38    path: PathBuf,
39    options: RvfOptions,
40    file: File,
41    seg_writer: Option<SegmentWriter>,
42    writer_lock: Option<WriterLock>,
43    vectors: VectorData,
44    deletion_bitmap: DeletionBitmap,
45    metadata: MetadataStore,
46    epoch: u32,
47    segment_dir: Vec<(u64, u64, u64, u8)>,
48    read_only: bool,
49    last_compaction_time: u64,
50    file_identity: FileIdentity,
51    /// COW engine for branched/snapshot stores (None for root stores).
52    cow_engine: Option<CowEngine>,
53    /// Membership filter for branch-level vector visibility (None if unused).
54    membership_filter: Option<MembershipFilter>,
55    /// Path to the parent file (for COW reads that need parent data).
56    parent_path: Option<PathBuf>,
57}
58
59impl RvfStore {
60    /// Create a new RVF store at the given path.
61    pub fn create(path: &Path, options: RvfOptions) -> Result<Self, RvfError> {
62        if options.dimension == 0 {
63            return Err(err(ErrorCode::InvalidManifest));
64        }
65
66        let file = OpenOptions::new()
67            .read(true)
68            .write(true)
69            .create_new(true)
70            .open(path)
71            .map_err(|_| err(ErrorCode::FsyncFailed))?;
72
73        let writer_lock = WriterLock::acquire(path)
74            .map_err(|_| err(ErrorCode::LockHeld))?;
75
76        // Generate a random file_id from path hash + timestamp
77        let file_id = generate_file_id(path);
78
79        // Detect domain profile from file extension
80        let domain_profile = path
81            .extension()
82            .and_then(|ext| ext.to_str())
83            .and_then(DomainProfile::from_extension)
84            .unwrap_or(options.domain_profile);
85
86        let mut opts = options.clone();
87        opts.domain_profile = domain_profile;
88
89        let mut store = Self {
90            path: path.to_path_buf(),
91            options: opts,
92            file,
93            seg_writer: Some(SegmentWriter::new(1)),
94            writer_lock: Some(writer_lock),
95            vectors: VectorData::new(options.dimension),
96            deletion_bitmap: DeletionBitmap::new(),
97            metadata: MetadataStore::new(),
98            epoch: 0,
99            segment_dir: Vec::new(),
100            read_only: false,
101            last_compaction_time: 0,
102            file_identity: FileIdentity::new_root(file_id),
103            cow_engine: None,
104            membership_filter: None,
105            parent_path: None,
106        };
107
108        store.write_manifest()?;
109        Ok(store)
110    }
111
112    /// Open an existing RVF store for read-write access.
113    pub fn open(path: &Path) -> Result<Self, RvfError> {
114        if !path.exists() {
115            return Err(err(ErrorCode::ManifestNotFound));
116        }
117
118        let writer_lock = WriterLock::acquire(path)
119            .map_err(|_| err(ErrorCode::LockHeld))?;
120
121        let file = OpenOptions::new()
122            .read(true)
123            .write(true)
124            .open(path)
125            .map_err(|_| err(ErrorCode::InvalidManifest))?;
126
127        // Detect domain profile from extension
128        let domain_profile = path
129            .extension()
130            .and_then(|ext| ext.to_str())
131            .and_then(DomainProfile::from_extension)
132            .unwrap_or(DomainProfile::Generic);
133
134        let opts = RvfOptions {
135            domain_profile,
136            ..Default::default()
137        };
138
139        let mut store = Self {
140            path: path.to_path_buf(),
141            options: opts,
142            file,
143            seg_writer: None,
144            writer_lock: Some(writer_lock),
145            vectors: VectorData::new(0),
146            deletion_bitmap: DeletionBitmap::new(),
147            metadata: MetadataStore::new(),
148            epoch: 0,
149            segment_dir: Vec::new(),
150            read_only: false,
151            last_compaction_time: 0,
152            file_identity: FileIdentity::zeroed(),
153            cow_engine: None,
154            membership_filter: None,
155            parent_path: None,
156        };
157
158        store.boot()?;
159        Ok(store)
160    }
161
162    /// Open an existing RVF store for read-only access (no lock required).
163    pub fn open_readonly(path: &Path) -> Result<Self, RvfError> {
164        if !path.exists() {
165            return Err(err(ErrorCode::ManifestNotFound));
166        }
167
168        let file = OpenOptions::new()
169            .read(true)
170            .open(path)
171            .map_err(|_| err(ErrorCode::InvalidManifest))?;
172
173        let domain_profile = path
174            .extension()
175            .and_then(|ext| ext.to_str())
176            .and_then(DomainProfile::from_extension)
177            .unwrap_or(DomainProfile::Generic);
178
179        let opts = RvfOptions {
180            domain_profile,
181            ..Default::default()
182        };
183
184        let mut store = Self {
185            path: path.to_path_buf(),
186            options: opts,
187            file,
188            seg_writer: None,
189            writer_lock: None,
190            vectors: VectorData::new(0),
191            deletion_bitmap: DeletionBitmap::new(),
192            metadata: MetadataStore::new(),
193            epoch: 0,
194            segment_dir: Vec::new(),
195            read_only: true,
196            last_compaction_time: 0,
197            file_identity: FileIdentity::zeroed(),
198            cow_engine: None,
199            membership_filter: None,
200            parent_path: None,
201        };
202
203        store.boot()?;
204        Ok(store)
205    }
206
207    /// Ingest a batch of vectors into the store.
208    pub fn ingest_batch(
209        &mut self,
210        vectors: &[&[f32]],
211        ids: &[u64],
212        metadata: Option<&[MetadataEntry]>,
213    ) -> Result<IngestResult, RvfError> {
214        if self.read_only {
215            return Err(err(ErrorCode::ReadOnly));
216        }
217        if vectors.len() != ids.len() {
218            return Err(err(ErrorCode::DimensionMismatch));
219        }
220
221        let dim = self.options.dimension as usize;
222        let mut accepted = 0u64;
223        let mut rejected = 0u64;
224
225        let mut valid_vectors: Vec<&[f32]> = Vec::with_capacity(vectors.len());
226        let mut valid_ids: Vec<u64> = Vec::with_capacity(ids.len());
227
228        for (i, &vec_data) in vectors.iter().enumerate() {
229            if vec_data.len() != dim {
230                rejected += 1;
231                continue;
232            }
233            valid_vectors.push(vec_data);
234            valid_ids.push(ids[i]);
235            accepted += 1;
236        }
237
238        if valid_vectors.is_empty() {
239            self.epoch += 1;
240            return Ok(IngestResult { accepted: 0, rejected, epoch: self.epoch });
241        }
242
243        let writer = self.seg_writer.as_mut().ok_or_else(|| err(ErrorCode::InvalidManifest))?;
244
245        let (vec_seg_id, vec_seg_offset) = {
246            let mut buf_writer = BufWriter::with_capacity(256 * 1024, &self.file);
247            buf_writer.seek(SeekFrom::End(0)).map_err(|_| err(ErrorCode::FsyncFailed))?;
248            writer.write_vec_seg(&mut buf_writer, &valid_vectors, &valid_ids, self.options.dimension)
249                .map_err(|_| err(ErrorCode::FsyncFailed))?
250        };
251
252        let bytes_per_vec = (self.options.dimension as usize) * 4;
253        let vec_payload_len = (2 + 4 + valid_vectors.len() * (8 + bytes_per_vec)) as u64;
254
255        self.segment_dir.push((vec_seg_id, vec_seg_offset, vec_payload_len, SegmentType::Vec as u8));
256
257        for (vec_data, &vec_id) in valid_vectors.iter().zip(valid_ids.iter()) {
258            self.vectors.insert(vec_id, vec_data.to_vec());
259        }
260
261        if let Some(meta_entries) = metadata {
262            let entries_per_id = meta_entries.len() / valid_ids.len().max(1);
263            if entries_per_id > 0 {
264                for (i, &vid) in valid_ids.iter().enumerate() {
265                    let start = i * entries_per_id;
266                    let end = ((i + 1) * entries_per_id).min(meta_entries.len());
267                    let fields: Vec<(u16, FilterValue)> = meta_entries[start..end]
268                        .iter()
269                        .map(|e| (e.field_id, metadata_value_to_filter(&e.value)))
270                        .collect();
271                    self.metadata.insert(vid, fields);
272                }
273            }
274        }
275
276        self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
277
278        self.epoch += 1;
279        self.write_manifest()?;
280
281        Ok(IngestResult { accepted, rejected, epoch: self.epoch })
282    }
283
284    /// Query the store for the k nearest neighbors of the given vector.
285    pub fn query(
286        &self,
287        vector: &[f32],
288        k: usize,
289        options: &QueryOptions,
290    ) -> Result<Vec<SearchResult>, RvfError> {
291        let dim = self.options.dimension as usize;
292        if vector.len() != dim {
293            return Err(err(ErrorCode::DimensionMismatch));
294        }
295
296        if self.vectors.len() == 0 {
297            return Ok(Vec::new());
298        }
299
300        // Max-heap: peek() returns the largest (farthest) distance in our k set.
301        // When a closer vector is found, evict the farthest.
302        let mut heap: BinaryHeap<(OrderedFloat, u64)> = BinaryHeap::new();
303
304        for &vec_id in self.vectors.ids() {
305            if self.deletion_bitmap.is_deleted(vec_id) {
306                continue;
307            }
308            if let Some(ref filter_expr) = options.filter {
309                if !filter::evaluate(filter_expr, vec_id, &self.metadata) {
310                    continue;
311                }
312            }
313            if let Some(stored_vec) = self.vectors.get(vec_id) {
314                let dist = compute_distance(vector, stored_vec, &self.options.metric);
315                if heap.len() < k {
316                    heap.push((OrderedFloat(dist), vec_id));
317                } else if let Some(&(OrderedFloat(worst), _)) = heap.peek() {
318                    if dist < worst {
319                        heap.pop();
320                        heap.push((OrderedFloat(dist), vec_id));
321                    }
322                }
323            }
324        }
325
326        // Drain the max-heap into sorted results (closest first).
327        let mut results: Vec<SearchResult> = heap
328            .into_iter()
329            .map(|(OrderedFloat(dist), id)| SearchResult { id, distance: dist })
330            .collect();
331        results.sort_by(|a, b| a.distance.partial_cmp(&b.distance).unwrap_or(std::cmp::Ordering::Equal));
332        Ok(results)
333    }
334
335    /// Soft-delete vectors by ID.
336    pub fn delete(&mut self, ids: &[u64]) -> Result<DeleteResult, RvfError> {
337        if self.read_only {
338            return Err(err(ErrorCode::ReadOnly));
339        }
340
341        let writer = self.seg_writer.as_mut().ok_or_else(|| err(ErrorCode::InvalidManifest))?;
342        let epoch = self.epoch + 1;
343
344        let (journal_seg_id, journal_offset) = {
345            let mut buf_writer = BufWriter::new(&self.file);
346            buf_writer.seek(SeekFrom::End(0)).map_err(|_| err(ErrorCode::FsyncFailed))?;
347            writer.write_journal_seg(&mut buf_writer, ids, epoch)
348                .map_err(|_| err(ErrorCode::FsyncFailed))?
349        };
350
351        let journal_payload_len = (16 + ids.len() * 12) as u64;
352        self.segment_dir.push((journal_seg_id, journal_offset, journal_payload_len, SegmentType::Journal as u8));
353
354        self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
355
356        let mut deleted = 0u64;
357        for &id in ids {
358            if self.vectors.get(id).is_some() && !self.deletion_bitmap.is_deleted(id) {
359                self.deletion_bitmap.delete(id);
360                deleted += 1;
361            }
362        }
363
364        self.epoch = epoch;
365        self.write_manifest()?;
366
367        Ok(DeleteResult { deleted, epoch: self.epoch })
368    }
369
370    /// Soft-delete vectors matching a filter expression.
371    pub fn delete_by_filter(&mut self, filter_expr: &FilterExpr) -> Result<DeleteResult, RvfError> {
372        if self.read_only {
373            return Err(err(ErrorCode::ReadOnly));
374        }
375
376        let matching_ids: Vec<u64> = self.vectors.ids()
377            .filter(|&&id| {
378                !self.deletion_bitmap.is_deleted(id)
379                    && filter::evaluate(filter_expr, id, &self.metadata)
380            })
381            .copied()
382            .collect();
383
384        if matching_ids.is_empty() {
385            return Ok(DeleteResult { deleted: 0, epoch: self.epoch });
386        }
387
388        self.delete(&matching_ids)
389    }
390
391    /// Get the current store status.
392    pub fn status(&self) -> StoreStatus {
393        let total_vectors = (self.vectors.len() as u64).saturating_sub(self.deletion_bitmap.count() as u64);
394        let file_size = self.file.metadata().map(|m| m.len()).unwrap_or(0);
395        let dead_space_ratio = {
396            let total = self.vectors.len() as f64;
397            let deleted = self.deletion_bitmap.count() as f64;
398            if total > 0.0 { deleted / total } else { 0.0 }
399        };
400
401        StoreStatus {
402            total_vectors,
403            total_segments: self.segment_dir.len() as u32,
404            file_size,
405            current_epoch: self.epoch,
406            profile_id: self.options.profile,
407            compaction_state: CompactionState::Idle,
408            dead_space_ratio,
409            read_only: self.read_only,
410        }
411    }
412
413    /// Run compaction to reclaim dead space.
414    ///
415    /// Preserves all non-Vec, non-Manifest, non-Journal segments byte-for-byte
416    /// to maintain forward compatibility with segment types this version does
417    /// not understand (e.g., future Kernel, Ebpf, or vendor-extension segments).
418    pub fn compact(&mut self) -> Result<CompactionResult, RvfError> {
419        if self.read_only {
420            return Err(err(ErrorCode::ReadOnly));
421        }
422
423        let deleted_ids = self.deletion_bitmap.to_sorted_ids();
424        for &id in &deleted_ids {
425            self.vectors.remove(id);
426        }
427        self.metadata.remove_ids(&deleted_ids);
428
429        let segments_compacted = deleted_ids.len() as u32;
430        let bytes_reclaimed = (deleted_ids.len() as u64) * (self.options.dimension as u64) * 4;
431
432        self.deletion_bitmap.clear();
433
434        // Read the entire original file into memory so we can scan for segments
435        // that may not be in the manifest (e.g., unknown types appended by newer tools).
436        let original_bytes = {
437            let mut reader = BufReader::new(&self.file);
438            reader.seek(SeekFrom::Start(0)).map_err(|_| err(ErrorCode::FsyncFailed))?;
439            let mut buf = Vec::new();
440            reader.read_to_end(&mut buf).map_err(|_| err(ErrorCode::FsyncFailed))?;
441            buf
442        };
443
444        let temp_path = self.path.with_extension("rvf.compact.tmp");
445        let mut new_segment_dir = Vec::new();
446        let mut seg_writer = SegmentWriter::new(1);
447        {
448            let temp_file = OpenOptions::new()
449                .read(true)
450                .write(true)
451                .create(true)
452                .truncate(true)
453                .open(&temp_path)
454                .map_err(|_| err(ErrorCode::DiskFull))?;
455
456            let mut temp_writer = BufWriter::new(&temp_file);
457
458            let live_ids: Vec<u64> = self.vectors.ids().copied().collect();
459            let live_vecs: Vec<Vec<f32>> = live_ids.iter()
460                .filter_map(|&id| self.vectors.get(id).map(|v| v.to_vec()))
461                .collect();
462
463            if !live_ids.is_empty() {
464                let vec_refs: Vec<&[f32]> = live_vecs.iter().map(|v| v.as_slice()).collect();
465                let (seg_id, offset) = seg_writer.write_vec_seg(
466                    &mut temp_writer, &vec_refs, &live_ids, self.options.dimension,
467                ).map_err(|_| err(ErrorCode::FsyncFailed))?;
468
469                let bytes_per_vec = (self.options.dimension as usize) * 4;
470                let payload_len = (2 + 4 + live_ids.len() * (8 + bytes_per_vec)) as u64;
471                new_segment_dir.push((seg_id, offset, payload_len, SegmentType::Vec as u8));
472            }
473
474            // Preserve non-Vec, non-Manifest, non-Journal segments from the
475            // original file. This includes both segments recorded in the old
476            // manifest and segments appended after it (e.g., unknown types from
477            // newer format versions).
478            let preserved = scan_preservable_segments(&original_bytes);
479            for (orig_offset, seg_id, payload_len, seg_type) in &preserved {
480                // Use checked arithmetic for bounds safety.
481                let total_bytes = match (*payload_len as usize).checked_add(SEGMENT_HEADER_SIZE) {
482                    Some(t) => t,
483                    None => continue, // skip segment with implausible size
484                };
485                let end = match orig_offset.checked_add(total_bytes) {
486                    Some(e) if e <= original_bytes.len() => e,
487                    _ => continue, // skip out-of-bounds segment
488                };
489                let src = &original_bytes[*orig_offset..end];
490
491                // Flush the BufWriter so stream_position reflects the true offset.
492                temp_writer.flush().map_err(|_| err(ErrorCode::FsyncFailed))?;
493                let new_offset = temp_writer.stream_position()
494                    .map_err(|_| err(ErrorCode::FsyncFailed))?;
495
496                temp_writer.write_all(src).map_err(|_| err(ErrorCode::FsyncFailed))?;
497
498                // Ensure the seg_writer's next_seg_id stays above any preserved ID.
499                while seg_writer.next_id() <= *seg_id {
500                    seg_writer.alloc_seg_id();
501                }
502
503                new_segment_dir.push((*seg_id, new_offset, *payload_len, *seg_type));
504            }
505
506            self.epoch += 1;
507            let total_vectors = live_ids.len() as u64;
508            let empty_dels: Vec<u64> = Vec::new();
509            let fi = if self.file_identity.file_id != [0u8; 16] {
510                Some(&self.file_identity)
511            } else {
512                None
513            };
514            // Flush before writing manifest so offsets are accurate.
515            temp_writer.flush().map_err(|_| err(ErrorCode::FsyncFailed))?;
516            seg_writer.write_manifest_seg_with_identity(
517                &mut temp_writer, self.epoch, self.options.dimension,
518                total_vectors, self.options.profile, &new_segment_dir, &empty_dels, fi,
519            ).map_err(|_| err(ErrorCode::FsyncFailed))?;
520
521            temp_writer.flush().map_err(|_| err(ErrorCode::FsyncFailed))?;
522            temp_file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
523        }
524
525        fs::rename(&temp_path, &self.path).map_err(|_| err(ErrorCode::FsyncFailed))?;
526
527        // Sync parent directory to make rename durable
528        if let Some(parent) = self.path.parent() {
529            if let Ok(dir) = std::fs::File::open(parent) {
530                let _ = dir.sync_all();
531            }
532        }
533
534        self.file = OpenOptions::new()
535            .read(true)
536            .write(true)
537            .open(&self.path)
538            .map_err(|_| err(ErrorCode::InvalidManifest))?;
539
540        self.segment_dir = new_segment_dir;
541        self.seg_writer = Some(seg_writer);
542        self.last_compaction_time = now_secs();
543
544        Ok(CompactionResult { segments_compacted, bytes_reclaimed, epoch: self.epoch })
545    }
546
547    /// Close the store, releasing the writer lock.
548    pub fn close(self) -> Result<(), RvfError> {
549        self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
550
551        if let Some(lock) = self.writer_lock {
552            lock.release().map_err(|_| err(ErrorCode::LockHeld))?;
553        }
554
555        Ok(())
556    }
557
558
559    // -- Kernel / eBPF embedding API --
560
561    /// Embed a kernel image into this RVF file as a KERNEL_SEG.
562    ///
563    /// Builds a 128-byte KernelHeader, serializes it, then delegates to
564    /// the write path. Returns the segment_id of the new KERNEL_SEG.
565    pub fn embed_kernel(
566        &mut self,
567        arch: u8,
568        kernel_type: u8,
569        kernel_flags: u32,
570        kernel_image: &[u8],
571        api_port: u16,
572        cmdline: Option<&str>,
573    ) -> Result<u64, RvfError> {
574        if self.read_only {
575            return Err(err(ErrorCode::ReadOnly));
576        }
577
578        let image_hash = simple_shake256_256(kernel_image);
579        let header = KernelHeader {
580            kernel_magic: KERNEL_MAGIC,
581            header_version: 1,
582            arch,
583            kernel_type,
584            kernel_flags,
585            min_memory_mb: 0,
586            entry_point: 0,
587            image_size: kernel_image.len() as u64,
588            compressed_size: kernel_image.len() as u64,
589            compression: 0,
590            api_transport: 0,
591            api_port,
592            api_version: 1,
593            image_hash,
594            build_id: [0u8; 16],
595            build_timestamp: 0,
596            vcpu_count: 0,
597            reserved_0: 0,
598            cmdline_offset: 128,
599            cmdline_length: cmdline.map_or(0, |s| s.len() as u32),
600            reserved_1: 0,
601        };
602        let header_bytes = header.to_bytes();
603
604        let cmdline_bytes = cmdline.map(|s| s.as_bytes());
605
606        let writer = self.seg_writer.as_mut()
607            .ok_or_else(|| err(ErrorCode::InvalidManifest))?;
608        let (seg_id, seg_offset) = {
609            let mut buf_writer = BufWriter::new(&self.file);
610            buf_writer.seek(SeekFrom::End(0))
611                .map_err(|_| err(ErrorCode::FsyncFailed))?;
612            writer.write_kernel_seg(
613                &mut buf_writer, &header_bytes, kernel_image, cmdline_bytes,
614            ).map_err(|_| err(ErrorCode::FsyncFailed))?
615        };
616
617        let cmdline_len = cmdline_bytes.map_or(0, |c| c.len());
618        let payload_len = (128 + kernel_image.len() + cmdline_len) as u64;
619        self.segment_dir.push((
620            seg_id, seg_offset, payload_len, SegmentType::Kernel as u8,
621        ));
622
623        self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
624        self.epoch += 1;
625        self.write_manifest()?;
626
627        Ok(seg_id)
628    }
629
630    /// Embed a kernel image with a KernelBinding footer.
631    ///
632    /// The new KERNEL_SEG wire format is:
633    ///   KernelHeader (128B) || KernelBinding (128B) || cmdline || kernel_image
634    ///
635    /// The KernelBinding ties the manifest root hash to the kernel, preventing
636    /// segment-swap attacks.
637    pub fn embed_kernel_with_binding(
638        &mut self,
639        arch: u8,
640        kernel_type: u8,
641        kernel_flags: u32,
642        kernel_image: &[u8],
643        api_port: u16,
644        cmdline: Option<&str>,
645        binding: &KernelBinding,
646    ) -> Result<u64, RvfError> {
647        if self.read_only {
648            return Err(err(ErrorCode::ReadOnly));
649        }
650
651        let image_hash = simple_shake256_256(kernel_image);
652        let cmdline_len = cmdline.map_or(0u32, |s| s.len() as u32);
653        let header = KernelHeader {
654            kernel_magic: KERNEL_MAGIC,
655            header_version: 1,
656            arch,
657            kernel_type,
658            kernel_flags,
659            min_memory_mb: 0,
660            entry_point: 0,
661            image_size: kernel_image.len() as u64,
662            compressed_size: kernel_image.len() as u64,
663            compression: 0,
664            api_transport: 0,
665            api_port,
666            api_version: 1,
667            image_hash,
668            build_id: [0u8; 16],
669            build_timestamp: 0,
670            vcpu_count: 0,
671            reserved_0: 0,
672            // cmdline_offset now accounts for KernelBinding (128 + 128 = 256)
673            cmdline_offset: 128 + 128,
674            cmdline_length: cmdline_len,
675            reserved_1: 0,
676        };
677        let header_bytes = header.to_bytes();
678        let binding_bytes = binding.to_bytes();
679
680        // Build the combined payload: header(128) + binding(128) + cmdline + image
681        let cmdline_data = cmdline.map(|s| s.as_bytes());
682        let cmdline_slice = cmdline_data.unwrap_or(&[]);
683
684        let mut payload = Vec::with_capacity(128 + 128 + cmdline_slice.len() + kernel_image.len());
685        payload.extend_from_slice(&header_bytes);
686        payload.extend_from_slice(&binding_bytes);
687        payload.extend_from_slice(cmdline_slice);
688        payload.extend_from_slice(kernel_image);
689
690        let writer = self.seg_writer.as_mut()
691            .ok_or_else(|| err(ErrorCode::InvalidManifest))?;
692
693        let (seg_id, seg_offset) = {
694            let mut buf_writer = BufWriter::new(&self.file);
695            buf_writer.seek(SeekFrom::End(0))
696                .map_err(|_| err(ErrorCode::FsyncFailed))?;
697            // Write as raw kernel segment: the write_kernel_seg expects
698            // header_bytes separately, but we need to include binding in
699            // the "image" portion to keep the wire format correct.
700            // So we pass the full payload minus the header as "image".
701            writer.write_kernel_seg(
702                &mut buf_writer,
703                &header_bytes,
704                &payload[128..], // binding + cmdline + image
705                None,            // cmdline already included above
706            ).map_err(|_| err(ErrorCode::FsyncFailed))?
707        };
708
709        let total_payload_len = payload.len() as u64;
710        self.segment_dir.push((
711            seg_id, seg_offset, total_payload_len, SegmentType::Kernel as u8,
712        ));
713
714        self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
715        self.epoch += 1;
716        self.write_manifest()?;
717
718        Ok(seg_id)
719    }
720
721    /// Extract the kernel image from this RVF file.
722    ///
723    /// Scans the segment directory for a KERNEL_SEG (type 0x0E) and returns
724    /// the first 128 bytes (serialized KernelHeader) plus the remainder
725    /// (kernel image bytes). Returns None if no KERNEL_SEG is present.
726    ///
727    /// For files with KernelBinding (ADR-031), the remainder includes the
728    /// 128-byte binding followed by optional cmdline and the kernel image.
729    /// Use `extract_kernel_binding` to parse the binding separately.
730    #[allow(clippy::type_complexity)]
731    pub fn extract_kernel(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>, RvfError> {
732        let entry = self.segment_dir.iter()
733            .find(|&&(_, _, _, stype)| stype == SegmentType::Kernel as u8);
734
735        let entry = match entry {
736            Some(e) => e,
737            None => return Ok(None),
738        };
739
740        let (_header, payload) = {
741            let mut reader = BufReader::new(&self.file);
742            read_path::read_segment_payload(&mut reader, entry.1)
743                .map_err(|_| err(ErrorCode::InvalidChecksum))?
744        };
745
746        if payload.len() < 128 {
747            return Err(err(ErrorCode::TruncatedSegment));
748        }
749
750        let kernel_header = payload[..128].to_vec();
751        let kernel_image = payload[128..].to_vec();
752
753        Ok(Some((kernel_header, kernel_image)))
754    }
755
756    /// Extract the KernelBinding from a KERNEL_SEG, if present.
757    ///
758    /// Returns `None` if no KERNEL_SEG exists or if the payload is too short
759    /// to contain a KernelBinding (backward-compatible with old format).
760    pub fn extract_kernel_binding(&self) -> Result<Option<KernelBinding>, RvfError> {
761        let result = self.extract_kernel()?;
762        match result {
763            None => Ok(None),
764            Some((_header_bytes, remainder)) => {
765                if remainder.len() < 128 {
766                    // Old format: no KernelBinding present
767                    return Ok(None);
768                }
769                let mut binding_data = [0u8; 128];
770                binding_data.copy_from_slice(&remainder[..128]);
771                let binding = KernelBinding::from_bytes(&binding_data);
772                // Check if this looks like a real binding (version > 0)
773                if binding.binding_version == 0 {
774                    return Ok(None);
775                }
776                Ok(Some(binding))
777            }
778        }
779    }
780
781    /// Embed an eBPF program into this RVF file as an EBPF_SEG.
782    ///
783    /// Builds a 64-byte EbpfHeader, serializes it, then delegates to
784    /// the write path. Returns the segment_id of the new EBPF_SEG.
785    pub fn embed_ebpf(
786        &mut self,
787        program_type: u8,
788        attach_type: u8,
789        max_dimension: u16,
790        program_bytecode: &[u8],
791        btf_data: Option<&[u8]>,
792    ) -> Result<u64, RvfError> {
793        if self.read_only {
794            return Err(err(ErrorCode::ReadOnly));
795        }
796
797        let program_hash = simple_shake256_256(program_bytecode);
798        let header = EbpfHeader {
799            ebpf_magic: EBPF_MAGIC,
800            header_version: 1,
801            program_type,
802            attach_type,
803            program_flags: 0,
804            insn_count: (program_bytecode.len() / 8) as u16,
805            max_dimension,
806            program_size: program_bytecode.len() as u64,
807            map_count: 0,
808            btf_size: btf_data.map_or(0, |b| b.len() as u32),
809            program_hash,
810        };
811        let header_bytes = header.to_bytes();
812
813        let writer = self.seg_writer.as_mut()
814            .ok_or_else(|| err(ErrorCode::InvalidManifest))?;
815        let (seg_id, seg_offset) = {
816            let mut buf_writer = BufWriter::new(&self.file);
817            buf_writer.seek(SeekFrom::End(0))
818                .map_err(|_| err(ErrorCode::FsyncFailed))?;
819            writer.write_ebpf_seg(
820                &mut buf_writer, &header_bytes, program_bytecode, btf_data,
821            ).map_err(|_| err(ErrorCode::FsyncFailed))?
822        };
823
824        let btf_len = btf_data.map_or(0, |b| b.len());
825        let payload_len = (64 + program_bytecode.len() + btf_len) as u64;
826        self.segment_dir.push((
827            seg_id, seg_offset, payload_len, SegmentType::Ebpf as u8,
828        ));
829
830        self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
831        self.epoch += 1;
832        self.write_manifest()?;
833
834        Ok(seg_id)
835    }
836
837    /// Extract eBPF program bytecode from this RVF file.
838    ///
839    /// Scans the segment directory for an EBPF_SEG (type 0x0F) and returns
840    /// the first 64 bytes (serialized EbpfHeader) plus the remainder
841    /// (program bytecode + optional BTF). Returns None if no EBPF_SEG.
842    #[allow(clippy::type_complexity)]
843    pub fn extract_ebpf(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>, RvfError> {
844        let entry = self.segment_dir.iter()
845            .find(|&&(_, _, _, stype)| stype == SegmentType::Ebpf as u8);
846
847        let entry = match entry {
848            Some(e) => e,
849            None => return Ok(None),
850        };
851
852        let (_header, payload) = {
853            let mut reader = BufReader::new(&self.file);
854            read_path::read_segment_payload(&mut reader, entry.1)
855                .map_err(|_| err(ErrorCode::InvalidChecksum))?
856        };
857
858        if payload.len() < 64 {
859            return Err(err(ErrorCode::TruncatedSegment));
860        }
861
862        let ebpf_header = payload[..64].to_vec();
863        let ebpf_bytecode = payload[64..].to_vec();
864
865        Ok(Some((ebpf_header, ebpf_bytecode)))
866    }
867
868    /// Get the segment directory.
869    pub fn segment_dir(&self) -> &[(u64, u64, u64, u8)] {
870        &self.segment_dir
871    }
872
873    /// Get the store's vector dimensionality.
874    pub fn dimension(&self) -> u16 {
875        self.options.dimension
876    }
877
878    /// Get the file identity (lineage metadata) for this store.
879    pub fn file_identity(&self) -> &FileIdentity {
880        &self.file_identity
881    }
882
883    /// Get this file's unique identifier.
884    pub fn file_id(&self) -> &[u8; 16] {
885        &self.file_identity.file_id
886    }
887
888    /// Get the parent file's identifier (all zeros if root).
889    pub fn parent_id(&self) -> &[u8; 16] {
890        &self.file_identity.parent_id
891    }
892
893    /// Get the lineage depth (0 for root files).
894    pub fn lineage_depth(&self) -> u32 {
895        self.file_identity.lineage_depth
896    }
897
898    /// Create a COW branch from this store.
899    ///
900    /// Creates a new child file that inherits all vectors from the parent via
901    /// COW references. Writes to the child only allocate local clusters as
902    /// needed. The parent should be frozen first to ensure immutability.
903    pub fn branch(&self, child_path: &Path) -> Result<Self, RvfError> {
904        // Compute cluster geometry from the vector data
905        let dim = self.options.dimension as u32;
906        let bytes_per_vec = dim * 4; // f32
907        let vectors_per_cluster = if bytes_per_vec > 0 {
908            (4096 / bytes_per_vec).max(1)
909        } else {
910            64
911        };
912        let cluster_size = vectors_per_cluster * bytes_per_vec;
913        let total_vecs = self.vectors.len() as u64;
914        let cluster_count = if vectors_per_cluster > 0 {
915            total_vecs.div_ceil(vectors_per_cluster as u64) as u32
916        } else {
917            0
918        };
919
920        // Derive the child via the standard lineage path
921        let mut child = self.derive(
922            child_path,
923            rvf_types::DerivationType::Clone,
924            Some(self.options.clone()),
925        )?;
926
927        // Initialize COW engine on the child with all clusters pointing to parent
928        child.cow_engine = Some(CowEngine::from_parent(
929            cluster_count,
930            cluster_size,
931            vectors_per_cluster,
932            bytes_per_vec,
933        ));
934
935        // Initialize membership filter with all parent vectors visible
936        let mut filter = MembershipFilter::new_include(total_vecs);
937        for &vid in self.vectors.ids() {
938            if !self.deletion_bitmap.is_deleted(vid) {
939                filter.add(vid);
940            }
941        }
942        child.membership_filter = Some(filter);
943
944        Ok(child)
945    }
946
947    /// Freeze (snapshot) this store. Prevents further writes to this generation.
948    pub fn freeze(&mut self) -> Result<(), RvfError> {
949        if self.read_only {
950            return Err(err(ErrorCode::ReadOnly));
951        }
952
953        if let Some(ref mut engine) = self.cow_engine {
954            engine.freeze(self.epoch)?;
955        }
956
957        // Set read_only to prevent further mutations
958        self.read_only = true;
959        Ok(())
960    }
961
962    /// Check if this store is a COW child (has a parent).
963    pub fn is_cow_child(&self) -> bool {
964        self.cow_engine.is_some()
965    }
966
967    /// Get COW statistics, if this store uses COW.
968    pub fn cow_stats(&self) -> Option<CowStats> {
969        self.cow_engine.as_ref().map(|e| e.stats())
970    }
971
972    /// Get the membership filter, if present.
973    pub fn membership_filter(&self) -> Option<&MembershipFilter> {
974        self.membership_filter.as_ref()
975    }
976
977    /// Get a mutable reference to the membership filter.
978    pub fn membership_filter_mut(&mut self) -> Option<&mut MembershipFilter> {
979        self.membership_filter.as_mut()
980    }
981
982    /// Get the parent file path, if this is a COW child.
983    pub fn parent_path(&self) -> Option<&Path> {
984        self.parent_path.as_deref()
985    }
986
987    /// Derive a child store from this parent.
988    ///
989    /// Creates a new RVF file at `child_path` that records this store as its
990    /// parent. The child gets a new file_id, inherits dimensions and options,
991    /// and records the parent's manifest hash for provenance verification.
992    pub fn derive(
993        &self,
994        child_path: &Path,
995        _derivation_type: rvf_types::DerivationType,
996        child_options: Option<RvfOptions>,
997    ) -> Result<Self, RvfError> {
998        let opts = child_options.unwrap_or_else(|| self.options.clone());
999
1000        let child_file_id = generate_file_id(child_path);
1001
1002        // Compute parent manifest hash from the file on disk
1003        let parent_hash = self.compute_own_manifest_hash()?;
1004
1005        let new_depth = self.file_identity.lineage_depth.checked_add(1)
1006            .ok_or_else(|| err(ErrorCode::LineageBroken))?;
1007
1008        let child_identity = FileIdentity {
1009            file_id: child_file_id,
1010            parent_id: self.file_identity.file_id,
1011            parent_hash,
1012            lineage_depth: new_depth,
1013        };
1014
1015        let file = OpenOptions::new()
1016            .read(true)
1017            .write(true)
1018            .create_new(true)
1019            .open(child_path)
1020            .map_err(|_| err(ErrorCode::FsyncFailed))?;
1021
1022        let writer_lock = WriterLock::acquire(child_path)
1023            .map_err(|_| err(ErrorCode::LockHeld))?;
1024
1025        // Detect domain profile from child extension
1026        let domain_profile = child_path
1027            .extension()
1028            .and_then(|ext| ext.to_str())
1029            .and_then(DomainProfile::from_extension)
1030            .unwrap_or(opts.domain_profile);
1031
1032        let mut child_opts = opts;
1033        child_opts.domain_profile = domain_profile;
1034
1035        let mut store = Self {
1036            path: child_path.to_path_buf(),
1037            options: child_opts,
1038            file,
1039            seg_writer: Some(SegmentWriter::new(1)),
1040            writer_lock: Some(writer_lock),
1041            vectors: VectorData::new(self.options.dimension),
1042            deletion_bitmap: DeletionBitmap::new(),
1043            metadata: MetadataStore::new(),
1044            epoch: 0,
1045            segment_dir: Vec::new(),
1046            read_only: false,
1047            last_compaction_time: 0,
1048            file_identity: child_identity,
1049            cow_engine: None,
1050            membership_filter: None,
1051            parent_path: Some(self.path.clone()),
1052        };
1053
1054        store.write_manifest()?;
1055        Ok(store)
1056    }
1057
1058    /// Compute a hash of this file's content for use as parent_hash in derivation.
1059    fn compute_own_manifest_hash(&self) -> Result<[u8; 32], RvfError> {
1060        use std::io::Read;
1061        let file_len = self.file.metadata()
1062            .map_err(|_| err(ErrorCode::InvalidManifest))?
1063            .len();
1064        if file_len == 0 {
1065            return Ok([0u8; 32]);
1066        }
1067        // Hash up to 64KB from the end of the file (covers manifest segments)
1068        let read_len = file_len.min(65536) as usize;
1069        let mut reader = BufReader::new(&self.file);
1070        reader.seek(SeekFrom::End(-(read_len as i64)))
1071            .map_err(|_| err(ErrorCode::InvalidManifest))?;
1072        let mut buf = vec![0u8; read_len];
1073        reader.read_exact(&mut buf).map_err(|_| err(ErrorCode::InvalidManifest))?;
1074        Ok(simple_shake256_256(&buf))
1075    }
1076
1077    // ── Internal methods ──────────────────────────────────────────────
1078
1079    fn boot(&mut self) -> Result<(), RvfError> {
1080        let manifest = {
1081            let mut reader = BufReader::new(&self.file);
1082            read_path::find_latest_manifest(&mut reader)
1083                .map_err(|_| err(ErrorCode::ManifestNotFound))?
1084        };
1085
1086        let manifest = match manifest {
1087            Some(m) => m,
1088            None => return Err(err(ErrorCode::ManifestNotFound)),
1089        };
1090
1091        self.epoch = manifest.epoch;
1092        self.options.dimension = manifest.dimension;
1093        self.options.profile = manifest.profile_id;
1094        self.vectors = VectorData::new(manifest.dimension);
1095        self.deletion_bitmap = DeletionBitmap::from_ids(&manifest.deleted_ids);
1096
1097        self.segment_dir = manifest.segment_dir.iter()
1098            .map(|e| (e.seg_id, e.offset, e.payload_length, e.seg_type))
1099            .collect();
1100
1101        let vec_seg_entries: Vec<_> = manifest.segment_dir.iter()
1102            .filter(|e| e.seg_type == SegmentType::Vec as u8)
1103            .collect();
1104
1105        for entry in vec_seg_entries {
1106            let (_header, payload) = {
1107                let mut reader = BufReader::new(&self.file);
1108                read_path::read_segment_payload(&mut reader, entry.offset)
1109                    .map_err(|_| err(ErrorCode::InvalidChecksum))?
1110            };
1111
1112            if let Some(vec_entries) = read_path::read_vec_seg_payload(&payload) {
1113                for (vec_id, vec_data) in vec_entries {
1114                    self.vectors.insert(vec_id, vec_data);
1115                }
1116            }
1117        }
1118
1119        // Restore FileIdentity from manifest if present
1120        if let Some(fi) = manifest.file_identity {
1121            self.file_identity = fi;
1122        }
1123
1124        if !self.read_only {
1125            let max_seg_id = self.segment_dir.iter()
1126                .map(|&(id, _, _, _)| id)
1127                .max()
1128                .unwrap_or(0);
1129            self.seg_writer = Some(SegmentWriter::new(max_seg_id + 1));
1130        }
1131
1132        Ok(())
1133    }
1134
1135    fn write_manifest(&mut self) -> Result<(), RvfError> {
1136        let writer = self.seg_writer.as_mut().ok_or_else(|| err(ErrorCode::InvalidManifest))?;
1137
1138        let total_vectors = self.vectors.len() as u64;
1139        let deleted_ids = self.deletion_bitmap.to_sorted_ids();
1140
1141        // Include FileIdentity if this file has a non-zero file_id
1142        let fi = if self.file_identity.file_id != [0u8; 16] {
1143            Some(&self.file_identity)
1144        } else {
1145            None
1146        };
1147
1148        let (manifest_seg_id, manifest_offset) = {
1149            let mut buf_writer = BufWriter::new(&self.file);
1150            buf_writer.seek(SeekFrom::End(0)).map_err(|_| err(ErrorCode::FsyncFailed))?;
1151            writer.write_manifest_seg_with_identity(
1152                &mut buf_writer, self.epoch, self.options.dimension,
1153                total_vectors, self.options.profile, &self.segment_dir, &deleted_ids, fi,
1154            ).map_err(|_| err(ErrorCode::FsyncFailed))?
1155        };
1156
1157        let mut manifest_payload_len = (22 + self.segment_dir.len() * 25 + 4 + deleted_ids.len() * 8) as u64;
1158        if fi.is_some() {
1159            manifest_payload_len += 4 + 68; // FIDI marker + FileIdentity
1160        }
1161        self.segment_dir.push((manifest_seg_id, manifest_offset, manifest_payload_len, SegmentType::Manifest as u8));
1162
1163        self.file.sync_all().map_err(|_| err(ErrorCode::FsyncFailed))?;
1164        Ok(())
1165    }
1166}
1167
1168fn compute_distance(a: &[f32], b: &[f32], metric: &DistanceMetric) -> f32 {
1169    match metric {
1170        DistanceMetric::L2 => {
1171            a.iter().zip(b.iter()).map(|(x, y)| { let d = x - y; d * d }).sum()
1172        }
1173        DistanceMetric::InnerProduct => {
1174            let dot: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
1175            -dot
1176        }
1177        DistanceMetric::Cosine => {
1178            let mut dot = 0.0f32;
1179            let mut norm_a = 0.0f32;
1180            let mut norm_b = 0.0f32;
1181            for (x, y) in a.iter().zip(b.iter()) {
1182                dot += x * y;
1183                norm_a += x * x;
1184                norm_b += y * y;
1185            }
1186            let denom = (norm_a * norm_b).sqrt();
1187            if denom < f32::EPSILON { 1.0 } else { 1.0 - dot / denom }
1188        }
1189    }
1190}
1191
1192#[derive(Clone, Copy, Debug, PartialEq)]
1193struct OrderedFloat(f32);
1194
1195impl Eq for OrderedFloat {}
1196
1197impl PartialOrd for OrderedFloat {
1198    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1199        Some(self.cmp(other))
1200    }
1201}
1202
1203impl Ord for OrderedFloat {
1204    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1205        self.0.partial_cmp(&other.0).unwrap_or(std::cmp::Ordering::Equal)
1206    }
1207}
1208
1209/// Generate a file_id from path + timestamp using `simple_shake256_256`.
1210///
1211/// Previous implementation used XOR mixing which has very poor distribution
1212/// (e.g. paths differing in a single byte could collide). Now we hash the
1213/// concatenation of path bytes and nanosecond timestamp through
1214/// `simple_shake256_256` and take the first 16 bytes for much better
1215/// collision resistance.
1216fn generate_file_id(path: &Path) -> [u8; 16] {
1217    let path_str = path.to_string_lossy();
1218    let path_bytes = path_str.as_bytes();
1219
1220    let ts = std::time::SystemTime::now()
1221        .duration_since(std::time::UNIX_EPOCH)
1222        .map(|d| d.as_nanos() as u64)
1223        .unwrap_or(0);
1224    let ts_bytes = ts.to_le_bytes();
1225
1226    // Concatenate path + timestamp, then hash for uniform distribution
1227    let mut input = Vec::with_capacity(path_bytes.len() + 8);
1228    input.extend_from_slice(path_bytes);
1229    input.extend_from_slice(&ts_bytes);
1230
1231    let digest = simple_shake256_256(&input);
1232    let mut id = [0u8; 16];
1233    id.copy_from_slice(&digest[..16]);
1234    id
1235}
1236
1237/// Minimal SHAKE-256 hash without depending on rvf-crypto.
1238/// Uses a simple XOR-fold for a 32-byte digest.
1239pub(crate) fn simple_shake256_256(data: &[u8]) -> [u8; 32] {
1240    // We use a simple non-cryptographic hash here since rvf-runtime
1241    // doesn't depend on rvf-crypto. For production lineage verification,
1242    // use rvf_crypto::compute_manifest_hash.
1243    let mut out = [0u8; 32];
1244    for (i, &b) in data.iter().enumerate() {
1245        out[i % 32] = out[i % 32].wrapping_add(b);
1246        // Avalanche
1247        let j = (i + 13) % 32;
1248        out[j] = out[j].wrapping_add(out[i % 32].rotate_left(3));
1249    }
1250    out
1251}
1252
1253/// Scan raw file bytes for segment headers whose type should be preserved
1254/// during compaction. Returns `(file_offset, seg_id, payload_len, seg_type)`
1255/// for every segment that is NOT Vec (0x01), Manifest (0x05), or Journal (0x04).
1256///
1257/// This ensures forward compatibility: segment types unknown to this version
1258/// of the runtime (e.g., Kernel, Ebpf, or vendor extensions) survive a
1259/// compact/rewrite cycle byte-for-byte.
1260fn scan_preservable_segments(file_bytes: &[u8]) -> Vec<(usize, u64, u64, u8)> {
1261    let magic_bytes = SEGMENT_MAGIC.to_le_bytes();
1262    let mut results = Vec::new();
1263
1264    if file_bytes.len() < SEGMENT_HEADER_SIZE {
1265        return results;
1266    }
1267
1268    let last_possible = file_bytes.len() - SEGMENT_HEADER_SIZE;
1269    let mut i = 0;
1270    while i <= last_possible {
1271        if file_bytes[i..i + 4] == magic_bytes {
1272            let seg_type = file_bytes[i + 5];
1273            let seg_id = u64::from_le_bytes([
1274                file_bytes[i + 0x08], file_bytes[i + 0x09],
1275                file_bytes[i + 0x0A], file_bytes[i + 0x0B],
1276                file_bytes[i + 0x0C], file_bytes[i + 0x0D],
1277                file_bytes[i + 0x0E], file_bytes[i + 0x0F],
1278            ]);
1279            let payload_len = u64::from_le_bytes([
1280                file_bytes[i + 0x10], file_bytes[i + 0x11],
1281                file_bytes[i + 0x12], file_bytes[i + 0x13],
1282                file_bytes[i + 0x14], file_bytes[i + 0x15],
1283                file_bytes[i + 0x16], file_bytes[i + 0x17],
1284            ]);
1285
1286            // Use checked arithmetic to prevent overflow on crafted payload_len.
1287            let total = match (payload_len as usize).checked_add(SEGMENT_HEADER_SIZE) {
1288                Some(t) if payload_len <= file_bytes.len() as u64 => t,
1289                _ => {
1290                    // Payload length is implausibly large; skip this byte.
1291                    i += 1;
1292                    continue;
1293                }
1294            };
1295
1296            // Skip Vec, Manifest, and Journal segments -- these are
1297            // reconstructed by the compaction logic itself.
1298            if seg_type != SegmentType::Vec as u8
1299                && seg_type != SegmentType::Manifest as u8
1300                && seg_type != SegmentType::Journal as u8
1301            {
1302                // Only include if the full segment fits in the file.
1303                if i.checked_add(total).is_some_and(|end| end <= file_bytes.len()) {
1304                    results.push((i, seg_id, payload_len, seg_type));
1305                }
1306            }
1307
1308            // Advance past this segment (header + payload) to avoid
1309            // false magic matches inside payload data.
1310            if total > 0 {
1311                match i.checked_add(total) {
1312                    Some(next) if next > i => i = next,
1313                    _ => i += 1,
1314                }
1315            } else {
1316                i += 1;
1317            }
1318        } else {
1319            i += 1;
1320        }
1321    }
1322
1323    results
1324}
1325
1326fn now_secs() -> u64 {
1327    std::time::SystemTime::now()
1328        .duration_since(std::time::UNIX_EPOCH)
1329        .map(|d| d.as_secs())
1330        .unwrap_or(0)
1331}
1332
1333#[cfg(test)]
1334mod tests {
1335    use super::*;
1336    use crate::filter::FilterValue;
1337    use tempfile::TempDir;
1338
1339    fn random_vector(dim: usize, seed: u64) -> Vec<f32> {
1340        let mut v = Vec::with_capacity(dim);
1341        let mut x = seed;
1342        for _ in 0..dim {
1343            x = x.wrapping_mul(6364136223846793005).wrapping_add(1442695040888963407);
1344            v.push(((x >> 33) as f32) / (u32::MAX as f32) - 0.5);
1345        }
1346        v
1347    }
1348
1349    #[test]
1350    fn create_ingest_query() {
1351        let dir = TempDir::new().unwrap();
1352        let path = dir.path().join("test.rvf");
1353
1354        let options = RvfOptions {
1355            dimension: 8,
1356            metric: DistanceMetric::L2,
1357            ..Default::default()
1358        };
1359
1360        let mut store = RvfStore::create(&path, options).unwrap();
1361
1362        let dim = 8;
1363        let vecs: Vec<Vec<f32>> = (0..100).map(|i| random_vector(dim, i)).collect();
1364        let vec_refs: Vec<&[f32]> = vecs.iter().map(|v| v.as_slice()).collect();
1365        let ids: Vec<u64> = (0..100).collect();
1366
1367        let result = store.ingest_batch(&vec_refs, &ids, None).unwrap();
1368        assert_eq!(result.accepted, 100);
1369        assert_eq!(result.rejected, 0);
1370
1371        let query_vec = random_vector(dim, 42);
1372        let results = store.query(&query_vec, 10, &QueryOptions::default()).unwrap();
1373        assert_eq!(results.len(), 10);
1374
1375        for i in 1..results.len() {
1376            assert!(results[i].distance >= results[i - 1].distance);
1377        }
1378
1379        assert_eq!(results[0].id, 42);
1380        assert!(results[0].distance < f32::EPSILON);
1381
1382        store.close().unwrap();
1383    }
1384
1385    #[test]
1386    fn open_existing_store() {
1387        let dir = TempDir::new().unwrap();
1388        let path = dir.path().join("reopen.rvf");
1389
1390        let options = RvfOptions {
1391            dimension: 4,
1392            metric: DistanceMetric::L2,
1393            ..Default::default()
1394        };
1395
1396        {
1397            let mut store = RvfStore::create(&path, options.clone()).unwrap();
1398            let v1 = vec![1.0, 0.0, 0.0, 0.0];
1399            let v2 = vec![0.0, 1.0, 0.0, 0.0];
1400            let vecs: Vec<&[f32]> = vec![&v1, &v2];
1401            let ids = vec![10, 20];
1402            store.ingest_batch(&vecs, &ids, None).unwrap();
1403            store.close().unwrap();
1404        }
1405
1406        {
1407            let store = RvfStore::open(&path).unwrap();
1408            let query = vec![1.0, 0.0, 0.0, 0.0];
1409            let results = store.query(&query, 2, &QueryOptions::default()).unwrap();
1410            assert_eq!(results.len(), 2);
1411            assert_eq!(results[0].id, 10);
1412            assert!(results[0].distance < f32::EPSILON);
1413            store.close().unwrap();
1414        }
1415    }
1416
1417    #[test]
1418    fn delete_vectors() {
1419        let dir = TempDir::new().unwrap();
1420        let path = dir.path().join("delete.rvf");
1421
1422        let options = RvfOptions {
1423            dimension: 4,
1424            metric: DistanceMetric::L2,
1425            ..Default::default()
1426        };
1427
1428        let mut store = RvfStore::create(&path, options).unwrap();
1429
1430        let v1 = vec![1.0, 0.0, 0.0, 0.0];
1431        let v2 = vec![0.0, 1.0, 0.0, 0.0];
1432        let v3 = vec![0.0, 0.0, 1.0, 0.0];
1433        let vecs: Vec<&[f32]> = vec![&v1, &v2, &v3];
1434        let ids = vec![1, 2, 3];
1435        store.ingest_batch(&vecs, &ids, None).unwrap();
1436
1437        let del_result = store.delete(&[2]).unwrap();
1438        assert_eq!(del_result.deleted, 1);
1439
1440        let query = vec![0.0, 1.0, 0.0, 0.0];
1441        let results = store.query(&query, 10, &QueryOptions::default()).unwrap();
1442        assert_eq!(results.len(), 2);
1443        assert!(results.iter().all(|r| r.id != 2));
1444
1445        store.close().unwrap();
1446    }
1447
1448    #[test]
1449    fn filter_query() {
1450        let dir = TempDir::new().unwrap();
1451        let path = dir.path().join("filter.rvf");
1452
1453        let options = RvfOptions {
1454            dimension: 4,
1455            metric: DistanceMetric::L2,
1456            ..Default::default()
1457        };
1458
1459        let mut store = RvfStore::create(&path, options).unwrap();
1460
1461        let v1 = vec![1.0, 0.0, 0.0, 0.0];
1462        let v2 = vec![0.0, 1.0, 0.0, 0.0];
1463        let v3 = vec![0.0, 0.0, 1.0, 0.0];
1464        let vecs: Vec<&[f32]> = vec![&v1, &v2, &v3];
1465        let ids = vec![1, 2, 3];
1466        let metadata = vec![
1467            MetadataEntry { field_id: 0, value: MetadataValue::String("cat_a".into()) },
1468            MetadataEntry { field_id: 0, value: MetadataValue::String("cat_b".into()) },
1469            MetadataEntry { field_id: 0, value: MetadataValue::String("cat_a".into()) },
1470        ];
1471        store.ingest_batch(&vecs, &ids, Some(&metadata)).unwrap();
1472
1473        let query = vec![0.5, 0.5, 0.5, 0.0];
1474        let query_opts = QueryOptions {
1475            filter: Some(FilterExpr::Eq(0, FilterValue::String("cat_a".into()))),
1476            ..Default::default()
1477        };
1478        let results = store.query(&query, 10, &query_opts).unwrap();
1479        assert_eq!(results.len(), 2);
1480        assert!(results.iter().all(|r| r.id == 1 || r.id == 3));
1481
1482        store.close().unwrap();
1483    }
1484
1485    #[test]
1486    fn status_reports() {
1487        let dir = TempDir::new().unwrap();
1488        let path = dir.path().join("status.rvf");
1489
1490        let options = RvfOptions {
1491            dimension: 4,
1492            metric: DistanceMetric::L2,
1493            ..Default::default()
1494        };
1495
1496        let mut store = RvfStore::create(&path, options).unwrap();
1497
1498        let status = store.status();
1499        assert_eq!(status.total_vectors, 0);
1500        assert!(!status.read_only);
1501
1502        let v1 = [1.0, 0.0, 0.0, 0.0];
1503        store.ingest_batch(&[&v1[..]], &[1], None).unwrap();
1504
1505        let status = store.status();
1506        assert_eq!(status.total_vectors, 1);
1507        assert!(status.file_size > 0);
1508
1509        store.close().unwrap();
1510    }
1511
1512    #[test]
1513    fn compact_reclaims_space() {
1514        let dir = TempDir::new().unwrap();
1515        let path = dir.path().join("compact.rvf");
1516
1517        let options = RvfOptions {
1518            dimension: 4,
1519            metric: DistanceMetric::L2,
1520            ..Default::default()
1521        };
1522
1523        let mut store = RvfStore::create(&path, options).unwrap();
1524
1525        let vecs: Vec<Vec<f32>> = (0..10).map(|i| vec![i as f32, 0.0, 0.0, 0.0]).collect();
1526        let vec_refs: Vec<&[f32]> = vecs.iter().map(|v| v.as_slice()).collect();
1527        let ids: Vec<u64> = (0..10).collect();
1528        store.ingest_batch(&vec_refs, &ids, None).unwrap();
1529
1530        store.delete(&[0, 2, 4, 6, 8]).unwrap();
1531
1532        let status = store.status();
1533        assert_eq!(status.total_vectors, 5);
1534        assert!(status.dead_space_ratio > 0.0);
1535
1536        let compact_result = store.compact().unwrap();
1537        assert_eq!(compact_result.segments_compacted, 5);
1538
1539        let query = vec![1.0, 0.0, 0.0, 0.0];
1540        let results = store.query(&query, 10, &QueryOptions::default()).unwrap();
1541        assert_eq!(results.len(), 5);
1542        let result_ids: Vec<u64> = results.iter().map(|r| r.id).collect();
1543        for id in &[1, 3, 5, 7, 9] {
1544            assert!(result_ids.contains(id));
1545        }
1546
1547        store.close().unwrap();
1548    }
1549
1550    #[test]
1551    fn lock_prevents_two_writers() {
1552        let dir = TempDir::new().unwrap();
1553        let path = dir.path().join("locked.rvf");
1554
1555        let options = RvfOptions {
1556            dimension: 4,
1557            metric: DistanceMetric::L2,
1558            ..Default::default()
1559        };
1560
1561        let _store1 = RvfStore::create(&path, options.clone()).unwrap();
1562
1563        let result = RvfStore::open(&path);
1564        assert!(result.is_err());
1565    }
1566
1567    #[test]
1568    fn readonly_open() {
1569        let dir = TempDir::new().unwrap();
1570        let path = dir.path().join("readonly.rvf");
1571
1572        let options = RvfOptions {
1573            dimension: 4,
1574            metric: DistanceMetric::L2,
1575            ..Default::default()
1576        };
1577
1578        {
1579            let mut store = RvfStore::create(&path, options).unwrap();
1580            let v1 = [1.0, 0.0, 0.0, 0.0];
1581            store.ingest_batch(&[&v1[..]], &[1], None).unwrap();
1582            store.close().unwrap();
1583        }
1584
1585        let store = RvfStore::open_readonly(&path).unwrap();
1586        let status = store.status();
1587        assert!(status.read_only);
1588        assert_eq!(status.total_vectors, 1);
1589
1590        let query = vec![1.0, 0.0, 0.0, 0.0];
1591        let results = store.query(&query, 1, &QueryOptions::default()).unwrap();
1592        assert_eq!(results.len(), 1);
1593    }
1594
1595    #[test]
1596    fn delete_by_filter_works() {
1597        let dir = TempDir::new().unwrap();
1598        let path = dir.path().join("del_filter.rvf");
1599
1600        let options = RvfOptions {
1601            dimension: 4,
1602            metric: DistanceMetric::L2,
1603            ..Default::default()
1604        };
1605
1606        let mut store = RvfStore::create(&path, options).unwrap();
1607
1608        let v1 = vec![1.0, 0.0, 0.0, 0.0];
1609        let v2 = vec![0.0, 1.0, 0.0, 0.0];
1610        let v3 = vec![0.0, 0.0, 1.0, 0.0];
1611        let vecs: Vec<&[f32]> = vec![&v1, &v2, &v3];
1612        let ids = vec![1, 2, 3];
1613        let metadata = vec![
1614            MetadataEntry { field_id: 0, value: MetadataValue::U64(10) },
1615            MetadataEntry { field_id: 0, value: MetadataValue::U64(20) },
1616            MetadataEntry { field_id: 0, value: MetadataValue::U64(30) },
1617        ];
1618        store.ingest_batch(&vecs, &ids, Some(&metadata)).unwrap();
1619
1620        let filter = FilterExpr::Gt(0, FilterValue::U64(15));
1621        let del_result = store.delete_by_filter(&filter).unwrap();
1622        assert_eq!(del_result.deleted, 2);
1623
1624        let query = vec![0.0, 0.0, 0.0, 0.0];
1625        let results = store.query(&query, 10, &QueryOptions::default()).unwrap();
1626        assert_eq!(results.len(), 1);
1627        assert_eq!(results[0].id, 1);
1628
1629        store.close().unwrap();
1630    }
1631
1632    #[test]
1633    fn embed_extract_kernel_round_trip() {
1634        let dir = TempDir::new().unwrap();
1635        let path = dir.path().join("kernel_rt.rvf");
1636
1637        let options = RvfOptions {
1638            dimension: 4,
1639            metric: DistanceMetric::L2,
1640            ..Default::default()
1641        };
1642
1643        let mut store = RvfStore::create(&path, options).unwrap();
1644
1645        let kernel_image = b"fake-compressed-kernel-image-0123456789abcdef";
1646        let seg_id = store.embed_kernel(
1647            1,    // arch: x86_64
1648            0,    // kernel_type: unikernel
1649            0x01, // kernel_flags
1650            kernel_image,
1651            8080, // api_port
1652            Some("console=ttyS0 quiet"),
1653        ).unwrap();
1654        assert!(seg_id > 0);
1655
1656        let result = store.extract_kernel().unwrap();
1657        assert!(result.is_some());
1658        let (header_bytes, image_bytes) = result.unwrap();
1659        assert_eq!(header_bytes.len(), 128);
1660
1661        // Verify the image portion matches what we embedded
1662        // (image_bytes includes the cmdline appended after the kernel)
1663        assert!(image_bytes.starts_with(kernel_image));
1664
1665        // Verify magic in the header
1666        let magic = u32::from_le_bytes([
1667            header_bytes[0], header_bytes[1],
1668            header_bytes[2], header_bytes[3],
1669        ]);
1670        assert_eq!(magic, KERNEL_MAGIC);
1671
1672        // Verify arch (offset 0x06)
1673        assert_eq!(header_bytes[0x06], 1);
1674
1675        // Verify api_port (offset 0x2A, big-endian)
1676        let port = u16::from_be_bytes([header_bytes[0x2A], header_bytes[0x2B]]);
1677        assert_eq!(port, 8080);
1678
1679        store.close().unwrap();
1680    }
1681
1682    #[test]
1683    fn embed_extract_ebpf_round_trip() {
1684        let dir = TempDir::new().unwrap();
1685        let path = dir.path().join("ebpf_rt.rvf");
1686
1687        let options = RvfOptions {
1688            dimension: 4,
1689            metric: DistanceMetric::L2,
1690            ..Default::default()
1691        };
1692
1693        let mut store = RvfStore::create(&path, options).unwrap();
1694
1695        let bytecode = b"ebpf-program-instructions-here";
1696        let btf = b"btf-type-information";
1697        let seg_id = store.embed_ebpf(
1698            2,     // program_type: XDP
1699            1,     // attach_type
1700            1024,  // max_dimension
1701            bytecode,
1702            Some(btf),
1703        ).unwrap();
1704        assert!(seg_id > 0);
1705
1706        let result = store.extract_ebpf().unwrap();
1707        assert!(result.is_some());
1708        let (header_bytes, payload_bytes) = result.unwrap();
1709        assert_eq!(header_bytes.len(), 64);
1710
1711        // Payload should be bytecode + btf
1712        assert_eq!(payload_bytes.len(), bytecode.len() + btf.len());
1713        assert_eq!(&payload_bytes[..bytecode.len()], bytecode);
1714        assert_eq!(&payload_bytes[bytecode.len()..], btf);
1715
1716        // Verify magic
1717        let magic = u32::from_le_bytes([
1718            header_bytes[0], header_bytes[1],
1719            header_bytes[2], header_bytes[3],
1720        ]);
1721        assert_eq!(magic, EBPF_MAGIC);
1722
1723        // Verify program_type (offset 0x06)
1724        assert_eq!(header_bytes[0x06], 2);
1725
1726        // Verify max_dimension (offset 0x0E)
1727        let dim = u16::from_le_bytes([header_bytes[0x0E], header_bytes[0x0F]]);
1728        assert_eq!(dim, 1024);
1729
1730        store.close().unwrap();
1731    }
1732
1733    #[test]
1734    fn embed_kernel_persists_through_reopen() {
1735        let dir = TempDir::new().unwrap();
1736        let path = dir.path().join("kernel_persist.rvf");
1737
1738        let options = RvfOptions {
1739            dimension: 4,
1740            metric: DistanceMetric::L2,
1741            ..Default::default()
1742        };
1743
1744        let kernel_image = b"persistent-kernel-image-data";
1745
1746        {
1747            let mut store = RvfStore::create(&path, options).unwrap();
1748            store.embed_kernel(
1749                2,    // arch: aarch64
1750                1,    // kernel_type
1751                0,    // flags
1752                kernel_image,
1753                9090,
1754                None,
1755            ).unwrap();
1756            store.close().unwrap();
1757        }
1758
1759        {
1760            let store = RvfStore::open_readonly(&path).unwrap();
1761            let result = store.extract_kernel().unwrap();
1762            assert!(result.is_some());
1763            let (header_bytes, image_bytes) = result.unwrap();
1764            assert_eq!(header_bytes.len(), 128);
1765            assert_eq!(image_bytes, kernel_image);
1766
1767            // Verify arch (offset 0x06)
1768            assert_eq!(header_bytes[0x06], 2);
1769
1770            // Verify api_port (offset 0x2A, big-endian)
1771            let port = u16::from_be_bytes([header_bytes[0x2A], header_bytes[0x2B]]);
1772            assert_eq!(port, 9090);
1773        }
1774    }
1775
1776    #[test]
1777    fn extract_returns_none_when_no_segment() {
1778        let dir = TempDir::new().unwrap();
1779        let path = dir.path().join("no_kernel.rvf");
1780
1781        let options = RvfOptions {
1782            dimension: 4,
1783            metric: DistanceMetric::L2,
1784            ..Default::default()
1785        };
1786
1787        let store = RvfStore::create(&path, options).unwrap();
1788        assert!(store.extract_kernel().unwrap().is_none());
1789        assert!(store.extract_ebpf().unwrap().is_none());
1790        store.close().unwrap();
1791    }
1792
1793}