omendb_core/omen/
file.rs

1//! `OmenFile` - main API for .omen format
2//!
3//! Storage backend for `VectorStore`. Uses postcard for efficient binary serialization.
4
5use crate::omen::{
6    align_to_page,
7    header::{OmenHeader, HEADER_SIZE},
8    section::{SectionEntry, SectionType},
9    vectors::VectorSection,
10    wal::{Wal, WalEntry, WalEntryType},
11};
12use anyhow::Result;
13use fs2::FileExt;
14use memmap2::MmapMut;
15use serde::{Deserialize, Serialize};
16use serde_json::Value as JsonValue;
17use std::collections::HashMap;
18use std::fs::{File, OpenOptions};
19use std::io::{self, Read, Seek, SeekFrom, Write};
20use std::path::{Path, PathBuf};
21
22/// Configure OpenOptions for cross-platform compatibility.
23/// On Windows, enables full file sharing to avoid "Access is denied" errors.
24#[cfg(windows)]
25fn configure_open_options(opts: &mut OpenOptions) {
26    use std::os::windows::fs::OpenOptionsExt;
27    // FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE
28    opts.share_mode(0x1 | 0x2 | 0x4);
29}
30
31#[cfg(not(windows))]
32fn configure_open_options(_opts: &mut OpenOptions) {}
33
34fn lock_exclusive(file: &File) -> io::Result<()> {
35    file.try_lock_exclusive().map_err(|_| {
36        io::Error::new(
37            io::ErrorKind::WouldBlock,
38            "Database is locked by another process",
39        )
40    })
41}
42
43#[derive(Serialize, Deserialize, Default)]
44struct CheckpointMetadata {
45    id_to_index: HashMap<String, u32>,
46    index_to_id: HashMap<u32, String>,
47    deleted: HashMap<u32, bool>,
48    config: HashMap<String, u64>,
49    metadata: HashMap<u32, Vec<u8>>,
50}
51
52/// Checkpoint threshold (number of WAL entries before compaction)
53const CHECKPOINT_THRESHOLD: u64 = 1000;
54
55/// `OmenFile` - single-file vector database
56///
57/// Storage layer for vectors, metadata, and serialized HNSW index.
58/// Graph traversal is handled by `HNSWIndex` in the vector layer.
59pub struct OmenFile {
60    path: PathBuf,
61    file: File,
62    mmap: Option<MmapMut>,
63    header: OmenHeader,
64
65    // In-memory state (for writes before checkpoint)
66    vectors_mem: Vec<Vec<f32>>,
67    id_to_index: HashMap<String, u32>,
68    index_to_id: HashMap<u32, String>,
69    metadata_mem: HashMap<u32, Vec<u8>>,
70    deleted: HashMap<u32, bool>,
71    config: HashMap<String, u64>,
72
73    // WAL for durability
74    wal: Wal,
75
76    // Serialized HNSW index (persisted on checkpoint, loaded on open)
77    hnsw_index_bytes: Option<Vec<u8>>,
78}
79
80impl OmenFile {
81    /// Compute .omen path by appending extension (preserves full filename)
82    ///
83    /// Handles filenames with multiple dots (e.g., `test.db_64` → `test.db_64.omen`)
84    /// by appending `.omen` rather than replacing the extension.
85    #[must_use]
86    pub fn compute_omen_path(path: &Path) -> PathBuf {
87        if path.extension().is_some_and(|ext| ext == "omen") {
88            path.to_path_buf()
89        } else {
90            let mut omen = path.as_os_str().to_os_string();
91            omen.push(".omen");
92            PathBuf::from(omen)
93        }
94    }
95
96    /// Compute .wal path by appending extension
97    fn compute_wal_path(path: &Path) -> PathBuf {
98        let mut wal = path.as_os_str().to_os_string();
99        wal.push(".wal");
100        PathBuf::from(wal)
101    }
102
103    pub fn create(path: impl AsRef<Path>, dimensions: u32) -> io::Result<Self> {
104        let path = path.as_ref();
105        let omen_path = Self::compute_omen_path(path);
106        let wal_path = Self::compute_wal_path(path);
107
108        let mut opts = OpenOptions::new();
109        opts.read(true).write(true).create(true).truncate(true);
110        configure_open_options(&mut opts);
111        let mut file = opts.open(&omen_path)?;
112        lock_exclusive(&file)?;
113
114        let header = OmenHeader::new(dimensions);
115        file.write_all(&header.to_bytes())?;
116        file.sync_all()?;
117
118        Ok(Self {
119            path: omen_path,
120            file,
121            mmap: None,
122            header,
123            vectors_mem: Vec::new(),
124            id_to_index: HashMap::new(),
125            index_to_id: HashMap::new(),
126            metadata_mem: HashMap::new(),
127            deleted: HashMap::new(),
128            config: HashMap::from([("dimensions".to_string(), u64::from(dimensions))]),
129            wal: Wal::open(&wal_path)?,
130            hnsw_index_bytes: None,
131        })
132    }
133
134    pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
135        let path = path.as_ref();
136        let omen_path = Self::compute_omen_path(path);
137        let wal_path = Self::compute_wal_path(path);
138
139        let mut opts = OpenOptions::new();
140        opts.read(true).write(true);
141        configure_open_options(&mut opts);
142        let mut file = opts.open(&omen_path)?;
143        lock_exclusive(&file)?;
144
145        let mut header_buf = [0u8; HEADER_SIZE];
146        file.read_exact(&mut header_buf)?;
147        let header = OmenHeader::from_bytes(&header_buf)?;
148
149        let file_len = file.metadata()?.len() as usize;
150        let mmap = if file_len > HEADER_SIZE {
151            Some(unsafe { MmapMut::map_mut(&file)? })
152        } else {
153            None
154        };
155
156        let wal = Wal::open(&wal_path)?;
157        let mut config = HashMap::from([
158            ("dimensions".to_string(), u64::from(header.dimensions)),
159            ("count".to_string(), header.count),
160        ]);
161
162        let mut vectors_mem = Vec::new();
163        let mut id_to_index = HashMap::new();
164        let mut index_to_id = HashMap::new();
165        let mut metadata_mem = HashMap::new();
166        let mut deleted = HashMap::new();
167
168        if let Some(ref mmap) = mmap {
169            // Load vectors
170            if let Some(vec_section) = header.get_section(SectionType::Vectors) {
171                let vec_offset = vec_section.offset as usize;
172                let dim = header.dimensions as usize;
173                let count = header.count as usize;
174                let vec_size = dim * 4;
175
176                for i in 0..count {
177                    let start = vec_offset + i * vec_size;
178                    let end = start + vec_size;
179                    if end <= mmap.len() {
180                        vectors_mem.push(read_vector_from_bytes(&mmap[start..end], dim));
181                    }
182                }
183            }
184
185            // Load metadata section (postcard-encoded CheckpointMetadata)
186            if let Some(meta_section) = header.get_section(SectionType::MetadataRaw) {
187                let meta_offset = meta_section.offset as usize;
188                let meta_len = meta_section.length as usize;
189                if meta_offset + meta_len <= mmap.len() {
190                    let meta_bytes = &mmap[meta_offset..meta_offset + meta_len];
191                    if let Ok(meta) = postcard::from_bytes::<CheckpointMetadata>(meta_bytes) {
192                        id_to_index = meta.id_to_index;
193                        index_to_id = meta.index_to_id;
194                        deleted = meta.deleted;
195                        config.extend(meta.config);
196                        metadata_mem = meta.metadata;
197                    }
198                }
199            }
200        }
201
202        let hnsw_index_bytes = mmap
203            .as_ref()
204            .and_then(|m| header.get_section(SectionType::HnswIndex).map(|s| (m, s)))
205            .and_then(|(m, s)| {
206                let start = s.offset as usize;
207                let end = start + s.length as usize;
208                (end <= m.len()).then(|| m[start..end].to_vec())
209            });
210
211        let mut db = Self {
212            path: omen_path,
213            file,
214            mmap,
215            header,
216            vectors_mem,
217            id_to_index,
218            index_to_id,
219            metadata_mem,
220            deleted,
221            config,
222            wal,
223            hnsw_index_bytes,
224        };
225
226        // Replay WAL
227        db.recover()?;
228
229        Ok(db)
230    }
231
232    /// Recover from WAL
233    fn recover(&mut self) -> io::Result<()> {
234        let entries = self.wal.entries_after_checkpoint()?;
235
236        for entry in entries {
237            if !entry.verify() {
238                // Log and skip corrupted entries
239                tracing::warn!(
240                    entry_type = ?entry.header.entry_type,
241                    timestamp = entry.header.timestamp,
242                    "Skipping corrupted WAL entry during recovery"
243                );
244                continue;
245            }
246
247            match entry.header.entry_type {
248                WalEntryType::InsertNode => {
249                    self.replay_insert(&entry.data)?;
250                }
251                WalEntryType::DeleteNode => {
252                    self.replay_delete(&entry.data)?;
253                }
254                WalEntryType::UpdateNeighbors => {
255                    self.replay_neighbors(&entry.data)?;
256                }
257                WalEntryType::UpdateMetadata | WalEntryType::Checkpoint => {
258                    // No-op: metadata updates tracked in cloud-4uv, checkpoint is marker only
259                }
260            }
261        }
262
263        Ok(())
264    }
265
266    fn replay_insert(&mut self, data: &[u8]) -> io::Result<()> {
267        let mut cursor = std::io::Cursor::new(data);
268        let string_id = read_string_id(&mut cursor)?;
269
270        let mut buf = [0u8; 4];
271
272        // Skip level byte (HNSW graph managed by HNSWIndex)
273        cursor.read_exact(&mut buf[..1])?;
274
275        // Read vector
276        cursor.read_exact(&mut buf)?;
277        let vec_len = u32::from_le_bytes(buf) as usize;
278        let mut vec_bytes = vec![0u8; vec_len * 4];
279        cursor.read_exact(&mut vec_bytes)?;
280        let vector = read_vector_from_bytes(&vec_bytes, vec_len);
281
282        // Read metadata
283        cursor.read_exact(&mut buf)?;
284        let meta_len = u32::from_le_bytes(buf) as usize;
285        let mut metadata = vec![0u8; meta_len];
286        cursor.read_exact(&mut metadata)?;
287
288        let index = self.vectors_mem.len() as u32;
289        self.vectors_mem.push(vector);
290        self.id_to_index.insert(string_id.clone(), index);
291        self.index_to_id.insert(index, string_id);
292        if !metadata.is_empty() {
293            self.metadata_mem.insert(index, metadata);
294        }
295
296        Ok(())
297    }
298
299    fn replay_delete(&mut self, data: &[u8]) -> io::Result<()> {
300        let mut cursor = std::io::Cursor::new(data);
301        let string_id = read_string_id(&mut cursor)?;
302
303        if let Some(&index) = self.id_to_index.get(&string_id) {
304            self.deleted.insert(index, true);
305        }
306
307        Ok(())
308    }
309
310    /// Replay neighbors update from WAL (no-op: graph managed by `HNSWIndex`)
311    #[allow(clippy::unused_self, clippy::unnecessary_wraps)]
312    fn replay_neighbors(&mut self, _data: &[u8]) -> io::Result<()> {
313        // Neighbor updates are consumed from WAL but not stored.
314        // HNSWIndex rebuilds graph from vectors on recovery.
315        Ok(())
316    }
317
318    /// Insert a vector
319    ///
320    /// Note: Graph management (HNSW) is handled by `HNSWIndex` in the vector layer.
321    /// This method only handles storage: WAL, vectors, metadata.
322    pub fn insert(&mut self, id: &str, vector: &[f32], metadata: Option<&[u8]>) -> io::Result<()> {
323        if vector.len() != self.header.dimensions as usize {
324            return Err(io::Error::new(
325                io::ErrorKind::InvalidInput,
326                format!(
327                    "Vector dimensions mismatch: expected {}, got {}",
328                    self.header.dimensions,
329                    vector.len()
330                ),
331            ));
332        }
333
334        let metadata_bytes = metadata.unwrap_or(b"{}");
335
336        // 1. Append to WAL (durable)
337        // Level 0 is placeholder - actual HNSW levels managed by HNSWIndex
338        let entry = WalEntry::insert_node(0, id, 0, vector, metadata_bytes);
339        self.wal.append(entry)?;
340        self.wal.sync()?;
341
342        // 2. Update in-memory state
343        let index = self.vectors_mem.len() as u32;
344        self.vectors_mem.push(vector.to_vec());
345        self.id_to_index.insert(id.to_string(), index);
346        self.index_to_id.insert(index, id.to_string());
347        if metadata_bytes != b"{}" {
348            self.metadata_mem.insert(index, metadata_bytes.to_vec());
349        }
350
351        self.header.count += 1;
352
353        // 3. Periodic checkpoint
354        if self.wal.len() > CHECKPOINT_THRESHOLD {
355            self.checkpoint()?;
356        }
357
358        Ok(())
359    }
360
361    fn find_nearest(&self, query: &[f32], k: usize) -> Vec<u32> {
362        let mut distances: Vec<(u32, f32)> = self
363            .vectors_mem
364            .iter()
365            .enumerate()
366            .filter(|(i, _)| !self.deleted.contains_key(&(*i as u32)))
367            .map(|(i, v)| (i as u32, l2_distance(query, v)))
368            .collect();
369
370        distances.sort_by(|a, b| a.1.total_cmp(&b.1));
371        distances.truncate(k);
372        distances.into_iter().map(|(id, _)| id).collect()
373    }
374
375    /// Search for k nearest neighbors
376    #[must_use]
377    pub fn search(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
378        if query.len() != self.header.dimensions as usize {
379            return Vec::new();
380        }
381
382        let indices = self.find_nearest(query, k);
383
384        indices
385            .into_iter()
386            .filter_map(|idx| {
387                let id = self.index_to_id.get(&idx)?;
388                let vector = self.vectors_mem.get(idx as usize)?;
389                let distance = l2_distance(query, vector);
390                Some((id.clone(), distance))
391            })
392            .collect()
393    }
394
395    pub fn delete(&mut self, id: &str) -> io::Result<bool> {
396        let Some(&index) = self.id_to_index.get(id) else {
397            return Ok(false);
398        };
399
400        self.wal.append(WalEntry::delete_node(0, id))?;
401        self.wal.sync()?;
402        self.deleted.insert(index, true);
403        Ok(true)
404    }
405
406    /// Get vector count
407    #[must_use]
408    pub fn len(&self) -> u64 {
409        self.header.count
410    }
411
412    /// Check if empty
413    #[must_use]
414    pub fn is_empty(&self) -> bool {
415        self.header.count == 0
416    }
417
418    /// Get dimensions
419    #[must_use]
420    pub fn dimensions(&self) -> u32 {
421        self.header.dimensions
422    }
423
424    /// Checkpoint - compact WAL into main file (atomic via temp-file-rename)
425    ///
426    /// Uses write-to-temp-then-rename pattern for crash safety:
427    /// 1. Write complete file to `.omen.tmp`
428    /// 2. Fsync temp file
429    /// 3. Rename temp to actual (atomic on POSIX)
430    /// 4. Fsync directory
431    /// 5. Truncate WAL
432    pub fn checkpoint(&mut self) -> io::Result<()> {
433        if self.vectors_mem.is_empty() && self.hnsw_index_bytes.is_none() {
434            return Ok(());
435        }
436
437        // Serialize metadata with postcard (compact, fast, actively maintained)
438        let checkpoint_meta = CheckpointMetadata {
439            id_to_index: self.id_to_index.clone(),
440            index_to_id: self.index_to_id.clone(),
441            deleted: self.deleted.clone(),
442            config: self.config.clone(),
443            metadata: self.metadata_mem.clone(),
444        };
445        let metadata_bytes = postcard::to_allocvec(&checkpoint_meta)
446            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
447
448        // Calculate section sizes
449        let vector_size =
450            VectorSection::size_for_count(self.header.dimensions, self.vectors_mem.len() as u64)
451                as usize;
452        let graph_size = 0; // Graph managed by HNSWIndex, not stored in OmenFile
453        let metadata_size = metadata_bytes.len();
454        let hnsw_size = self.hnsw_index_bytes.as_ref().map_or(0, Vec::len);
455
456        // Calculate offsets (page-aligned)
457        let vector_offset = align_to_page(HEADER_SIZE);
458        let graph_offset = align_to_page(vector_offset + vector_size);
459        let metadata_offset = align_to_page(graph_offset + graph_size);
460        let hnsw_offset = align_to_page(metadata_offset + metadata_size);
461        let total_size = align_to_page(hnsw_offset + hnsw_size);
462
463        // Create temp file path
464        let temp_path = {
465            let mut p = self.path.as_os_str().to_os_string();
466            p.push(".tmp");
467            PathBuf::from(p)
468        };
469
470        // Update header for new checkpoint
471        self.header.count = self.vectors_mem.len() as u64;
472        self.header.entry_point = 0; // Entry point managed by HNSWIndex
473        self.header.set_section(SectionEntry::new(
474            SectionType::Vectors,
475            vector_offset as u64,
476            vector_size as u64,
477        ));
478        self.header.set_section(SectionEntry::new(
479            SectionType::Graph,
480            graph_offset as u64,
481            graph_size as u64,
482        ));
483        self.header.set_section(SectionEntry::new(
484            SectionType::MetadataRaw,
485            metadata_offset as u64,
486            metadata_size as u64,
487        ));
488        if hnsw_size > 0 {
489            self.header.set_section(SectionEntry::new(
490                SectionType::HnswIndex,
491                hnsw_offset as u64,
492                hnsw_size as u64,
493            ));
494        }
495
496        // Write to temp file
497        {
498            let mut opts = OpenOptions::new();
499            opts.write(true).create(true).truncate(true);
500            configure_open_options(&mut opts);
501            let mut temp_file = opts.open(&temp_path)?;
502            temp_file.set_len(total_size as u64)?;
503
504            // Write header
505            temp_file.write_all(&self.header.to_bytes())?;
506
507            // Write vectors
508            temp_file.seek(SeekFrom::Start(vector_offset as u64))?;
509            for vector in &self.vectors_mem {
510                for &val in vector {
511                    temp_file.write_all(&val.to_le_bytes())?;
512                }
513            }
514
515            // Write metadata
516            temp_file.seek(SeekFrom::Start(metadata_offset as u64))?;
517            temp_file.write_all(&metadata_bytes)?;
518
519            // Write HNSW index (if present)
520            if let Some(ref hnsw_bytes) = self.hnsw_index_bytes {
521                temp_file.seek(SeekFrom::Start(hnsw_offset as u64))?;
522                temp_file.write_all(hnsw_bytes)?;
523            }
524
525            // Fsync temp file before rename
526            temp_file.sync_all()?;
527        }
528
529        // Drop mmap before rename (required - file handle must be released)
530        self.mmap = None;
531
532        // Atomic rename (POSIX guarantees atomicity for same-filesystem rename)
533        std::fs::rename(&temp_path, &self.path)?;
534
535        // Fsync directory to ensure rename is durable
536        #[cfg(unix)]
537        if let Some(parent) = self.path.parent() {
538            if let Ok(dir) = File::open(parent) {
539                let _ = dir.sync_all();
540            }
541        }
542
543        let mut opts = OpenOptions::new();
544        opts.read(true).write(true);
545        configure_open_options(&mut opts);
546        self.file = opts.open(&self.path)?;
547        lock_exclusive(&self.file)?;
548
549        self.wal.truncate()?;
550        self.wal.append(WalEntry::checkpoint(0))?;
551        self.wal.sync()?;
552        self.mmap = Some(unsafe { MmapMut::map_mut(&self.file)? });
553
554        Ok(())
555    }
556}
557
558// ============================================================================
559// Storage API for VectorStore
560// ============================================================================
561
562impl OmenFile {
563    /// Store a vector by internal index
564    pub fn put_vector(&mut self, id: usize, vector: &[f32]) -> Result<()> {
565        let new_len = id + 1;
566        if self.vectors_mem.len() < new_len {
567            self.vectors_mem.resize_with(new_len, Vec::new);
568        }
569        self.vectors_mem[id] = vector.to_vec();
570        Ok(())
571    }
572
573    pub fn get_vector(&self, id: usize) -> Result<Option<Vec<f32>>> {
574        if id < self.vectors_mem.len() && !self.vectors_mem[id].is_empty() {
575            return Ok(Some(self.vectors_mem[id].clone()));
576        }
577
578        let Some(ref mmap) = self.mmap else {
579            return Ok(None);
580        };
581        let Some(vec_section) = self.header.get_section(SectionType::Vectors) else {
582            return Ok(None);
583        };
584
585        let dim = self.header.dimensions as usize;
586        if id >= self.header.count as usize {
587            return Ok(None);
588        }
589
590        let vec_size = dim * 4;
591        let start = vec_section.offset as usize + id * vec_size;
592        let end = start + vec_size;
593
594        if end <= mmap.len() {
595            Ok(Some(read_vector_from_bytes(&mmap[start..end], dim)))
596        } else {
597            Ok(None)
598        }
599    }
600
601    /// Store metadata for a vector (as JSON)
602    pub fn put_metadata(&mut self, id: usize, metadata: &JsonValue) -> Result<()> {
603        let bytes = serde_json::to_vec(metadata)?;
604        self.metadata_mem.insert(id as u32, bytes);
605        Ok(())
606    }
607
608    pub fn get_metadata(&self, id: usize) -> Result<Option<JsonValue>> {
609        self.metadata_mem
610            .get(&(id as u32))
611            .map(|bytes| serde_json::from_slice(bytes))
612            .transpose()
613            .map_err(Into::into)
614    }
615
616    /// Store string ID to internal index mapping
617    pub fn put_id_mapping(&mut self, string_id: &str, index: usize) -> Result<()> {
618        self.id_to_index.insert(string_id.to_string(), index as u32);
619        self.index_to_id.insert(index as u32, string_id.to_string());
620        Ok(())
621    }
622
623    /// Get internal index for a string ID
624    pub fn get_id_mapping(&self, string_id: &str) -> Result<Option<usize>> {
625        Ok(self.id_to_index.get(string_id).map(|&idx| idx as usize))
626    }
627
628    /// Get string ID for an internal index (reverse lookup)
629    pub fn get_string_id(&self, index: usize) -> Result<Option<String>> {
630        Ok(self.index_to_id.get(&(index as u32)).cloned())
631    }
632
633    /// Delete string ID mapping
634    pub fn delete_id_mapping(&mut self, string_id: &str) -> Result<()> {
635        if let Some(&index) = self.id_to_index.get(string_id) {
636            self.index_to_id.remove(&index);
637        }
638        self.id_to_index.remove(string_id);
639        Ok(())
640    }
641
642    /// Store configuration value
643    pub fn put_config(&mut self, key: &str, value: u64) -> Result<()> {
644        self.config.insert(key.to_string(), value);
645        // Sync dimensions to header
646        if key == "dimensions" {
647            self.header.dimensions = value as u32;
648        }
649        Ok(())
650    }
651
652    /// Get configuration value
653    pub fn get_config(&self, key: &str) -> Result<Option<u64>> {
654        Ok(self.config.get(key).copied())
655    }
656
657    /// Load all vectors from storage
658    pub fn load_all_vectors(&self) -> Result<Vec<(usize, Vec<f32>)>> {
659        Ok(self
660            .vectors_mem
661            .iter()
662            .enumerate()
663            .filter(|(_, v)| !v.is_empty())
664            .map(|(id, v)| (id, v.clone()))
665            .collect())
666    }
667
668    /// Increment vector count in storage
669    pub fn increment_count(&mut self) -> Result<usize> {
670        let count = self.config.get("count").copied().unwrap_or(0) as usize;
671        let new_count = count + 1;
672        self.config.insert("count".to_string(), new_count as u64);
673        self.header.count = new_count as u64;
674        Ok(new_count)
675    }
676
677    /// Get current vector count
678    pub fn get_count(&self) -> Result<usize> {
679        Ok(self.config.get("count").copied().unwrap_or(0) as usize)
680    }
681
682    /// Store quantization mode
683    ///
684    /// Mode values: 0=none, 1=sq8, 2=rabitq-4, 3=rabitq-2, 4=rabitq-8
685    pub fn put_quantization_mode(&mut self, mode: u64) -> Result<()> {
686        self.put_config("quantization", mode)
687    }
688
689    /// Get quantization mode
690    ///
691    /// Returns: 0=none, 1=sq8, 2=rabitq-4, 3=rabitq-2, 4=rabitq-8
692    pub fn get_quantization_mode(&self) -> Result<Option<u64>> {
693        self.get_config("quantization")
694    }
695
696    /// Check if store was created with quantization
697    pub fn is_quantized(&self) -> Result<bool> {
698        Ok(self.get_quantization_mode()?.unwrap_or(0) > 0)
699    }
700
701    pub fn load_all_metadata(&self) -> Result<HashMap<usize, JsonValue>> {
702        Ok(self
703            .metadata_mem
704            .iter()
705            .filter_map(|(&id, bytes)| {
706                serde_json::from_slice(bytes)
707                    .ok()
708                    .map(|meta| (id as usize, meta))
709            })
710            .collect())
711    }
712
713    /// Load all ID mappings from storage
714    pub fn load_all_id_mappings(&self) -> Result<HashMap<String, usize>> {
715        Ok(self
716            .id_to_index
717            .iter()
718            .map(|(id, &idx)| (id.clone(), idx as usize))
719            .collect())
720    }
721
722    /// Mark a vector as deleted (tombstone)
723    pub fn put_deleted(&mut self, id: usize) -> Result<()> {
724        self.deleted.insert(id as u32, true);
725        Ok(())
726    }
727
728    pub fn is_deleted(&self, id: usize) -> Result<bool> {
729        Ok(self.deleted.contains_key(&(id as u32)))
730    }
731
732    /// Remove deleted marker (for re-insertion)
733    pub fn remove_deleted(&mut self, id: usize) -> Result<()> {
734        self.deleted.remove(&(id as u32));
735        Ok(())
736    }
737
738    /// Load all deleted IDs from storage
739    pub fn load_all_deleted(&self) -> Result<HashMap<usize, bool>> {
740        Ok(self
741            .deleted
742            .iter()
743            .map(|(&id, &v)| (id as usize, v))
744            .collect())
745    }
746
747    /// Store serialized HNSW index bytes
748    ///
749    /// The bytes are persisted on the next checkpoint/flush.
750    /// `VectorStore` serializes `HNSWIndex` and stores it here.
751    pub fn put_hnsw_index(&mut self, bytes: Vec<u8>) {
752        self.hnsw_index_bytes = Some(bytes);
753    }
754
755    /// Get serialized HNSW index bytes (if present)
756    ///
757    /// Returns the bytes previously stored by `put_hnsw_index()`,
758    /// or loaded from disk on open.
759    #[must_use]
760    pub fn get_hnsw_index(&self) -> Option<&[u8]> {
761        self.hnsw_index_bytes.as_deref()
762    }
763
764    /// Check if HNSW index is stored
765    #[must_use]
766    pub fn has_hnsw_index(&self) -> bool {
767        self.hnsw_index_bytes.is_some()
768    }
769
770    /// Update HNSW parameters in the header
771    ///
772    /// These values are persisted to disk on the next checkpoint/flush.
773    pub fn set_hnsw_params(&mut self, m: u16, ef_construction: u16, ef_search: u16) {
774        self.header.m = m;
775        self.header.ef_construction = ef_construction;
776        self.header.ef_search = ef_search;
777    }
778
779    /// Get storage path
780    #[must_use]
781    pub fn path(&self) -> &Path {
782        &self.path
783    }
784
785    /// Get reference to the header
786    #[must_use]
787    pub fn header(&self) -> &OmenHeader {
788        &self.header
789    }
790
791    /// Flush all pending writes to disk
792    pub fn flush(&mut self) -> Result<()> {
793        self.checkpoint()?;
794        Ok(())
795    }
796
797    /// Batch set vectors with metadata and ID mappings
798    pub fn put_batch(&mut self, items: Vec<(usize, String, Vec<f32>, JsonValue)>) -> Result<()> {
799        if items.is_empty() {
800            return Ok(());
801        }
802
803        for (idx, string_id, vector, metadata) in items {
804            self.put_vector(idx, &vector)?;
805            self.put_metadata(idx, &metadata)?;
806            self.put_id_mapping(&string_id, idx)?;
807        }
808
809        // Update count
810        let current_count = self.get_count()?;
811        let new_count = self
812            .vectors_mem
813            .iter()
814            .filter(|v| !v.is_empty())
815            .count()
816            .max(current_count);
817        self.config.insert("count".to_string(), new_count as u64);
818        self.header.count = new_count as u64;
819
820        Ok(())
821    }
822}
823
824fn l2_distance(a: &[f32], b: &[f32]) -> f32 {
825    a.iter()
826        .zip(b.iter())
827        .map(|(x, y)| (x - y).powi(2))
828        .sum::<f32>()
829        .sqrt()
830}
831
832fn read_string_id(cursor: &mut std::io::Cursor<&[u8]>) -> io::Result<String> {
833    let mut len_buf = [0u8; 4];
834    cursor.read_exact(&mut len_buf)?;
835    let id_len = u32::from_le_bytes(len_buf) as usize;
836    let mut id_buf = vec![0u8; id_len];
837    cursor.read_exact(&mut id_buf)?;
838    String::from_utf8(id_buf).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
839}
840
841fn read_vector_from_bytes(bytes: &[u8], dimensions: usize) -> Vec<f32> {
842    bytes
843        .chunks_exact(4)
844        .take(dimensions)
845        .map(|chunk| f32::from_le_bytes(chunk.try_into().unwrap_or([0; 4])))
846        .collect()
847}
848
849#[cfg(test)]
850mod tests {
851    use super::*;
852    use tempfile::tempdir;
853
854    #[test]
855    fn test_create_and_insert() {
856        let dir = tempdir().unwrap();
857        let db_path = dir.path().join("test.omen");
858
859        let mut db = OmenFile::create(&db_path, 3).unwrap();
860        db.insert("vec1", &[1.0, 2.0, 3.0], None).unwrap();
861        db.insert("vec2", &[4.0, 5.0, 6.0], None).unwrap();
862
863        assert_eq!(db.len(), 2);
864    }
865
866    #[test]
867    fn test_search() {
868        let dir = tempdir().unwrap();
869        let db_path = dir.path().join("test.omen");
870
871        let mut db = OmenFile::create(&db_path, 3).unwrap();
872        db.insert("vec1", &[1.0, 0.0, 0.0], None).unwrap();
873        db.insert("vec2", &[0.0, 1.0, 0.0], None).unwrap();
874        db.insert("vec3", &[0.0, 0.0, 1.0], None).unwrap();
875
876        let results = db.search(&[1.0, 0.0, 0.0], 1);
877        assert_eq!(results.len(), 1);
878        assert_eq!(results[0].0, "vec1");
879    }
880
881    #[test]
882    fn test_checkpoint_and_reopen() {
883        let dir = tempdir().unwrap();
884        let db_path = dir.path().join("test.omen");
885
886        {
887            let mut db = OmenFile::create(&db_path, 3).unwrap();
888            db.insert("vec1", &[1.0, 2.0, 3.0], None).unwrap();
889            db.insert("vec2", &[4.0, 5.0, 6.0], None).unwrap();
890            db.checkpoint().unwrap();
891        }
892
893        {
894            let db = OmenFile::open(&db_path).unwrap();
895            assert_eq!(db.len(), 2);
896        }
897    }
898
899    #[test]
900    fn test_wal_recovery() {
901        let dir = tempdir().unwrap();
902        let db_path = dir.path().join("test.omen");
903
904        {
905            let mut db = OmenFile::create(&db_path, 3).unwrap();
906            db.insert("vec1", &[1.0, 2.0, 3.0], None).unwrap();
907            // Don't checkpoint - data is only in WAL
908        }
909
910        {
911            let db = OmenFile::open(&db_path).unwrap();
912            // Should recover from WAL
913            let results = db.search(&[1.0, 2.0, 3.0], 1);
914            assert_eq!(results.len(), 1);
915            assert_eq!(results[0].0, "vec1");
916        }
917    }
918}