oxirs_vec/
mmap_index.rs

1//! Memory-mapped vector index for efficient disk-based storage
2//!
3//! This module provides a disk-based vector index using memory-mapped files for
4//! efficient access to large vector datasets that don't fit in memory.
5
6use crate::mmap_advanced::{AdvancedMemoryMap, MemoryMapStats, NumaVectorAllocator};
7use crate::{
8    index::{DistanceMetric, IndexConfig, SearchResult},
9    Vector, VectorIndex,
10};
11use anyhow::{bail, Context, Result};
12use blake3::Hasher;
13use memmap2::{Mmap, MmapOptions};
14use oxirs_core::parallel::*;
15use parking_lot::{Mutex, RwLock};
16use std::collections::{BinaryHeap, HashMap};
17use std::fs::{File, OpenOptions};
18use std::io::{Seek, SeekFrom, Write};
19use std::path::{Path, PathBuf};
20use std::sync::Arc;
21
22/// Magic number for file format identification
23const MAGIC: &[u8; 8] = b"OXIRSVEC";
24
25/// Current file format version
26const VERSION: u32 = 1;
27
28/// Default page size for memory mapping (4KB)
29const PAGE_SIZE: usize = 4096;
30
31/// Vector page size for advanced memory mapping (16KB for better vector alignment)
32const VECTOR_PAGE_SIZE: usize = 16384;
33
34/// Header size (must be page-aligned)
35const HEADER_SIZE: usize = PAGE_SIZE;
36
37/// File header structure
38#[repr(C)]
39#[derive(Debug, Clone, Copy)]
40struct FileHeader {
41    magic: [u8; 8],
42    version: u32,
43    flags: u32,
44    vector_count: u64,
45    dimensions: u32,
46    vector_size: u32, // Size of each vector in bytes
47    data_offset: u64,
48    index_offset: u64,
49    uri_offset: u64,
50    checksum: [u8; 32],
51    reserved: [u8; 3968], // Pad to PAGE_SIZE
52}
53
54impl FileHeader {
55    fn new(dimensions: u32) -> Self {
56        let vector_size = dimensions * std::mem::size_of::<f32>() as u32;
57        Self {
58            magic: *MAGIC,
59            version: VERSION,
60            flags: 0,
61            vector_count: 0,
62            dimensions,
63            vector_size,
64            data_offset: HEADER_SIZE as u64,
65            index_offset: 0,
66            uri_offset: 0,
67            checksum: [0; 32],
68            reserved: [0; 3968],
69        }
70    }
71
72    fn validate(&self) -> Result<()> {
73        if self.magic != *MAGIC {
74            bail!("Invalid magic number");
75        }
76        if self.version != VERSION {
77            bail!("Unsupported version: {}", self.version);
78        }
79        Ok(())
80    }
81
82    fn compute_checksum(&mut self) {
83        let mut hasher = Hasher::new();
84        hasher.update(&self.magic);
85        hasher.update(&self.version.to_le_bytes());
86        hasher.update(&self.flags.to_le_bytes());
87        hasher.update(&self.vector_count.to_le_bytes());
88        hasher.update(&self.dimensions.to_le_bytes());
89        hasher.update(&self.vector_size.to_le_bytes());
90        hasher.update(&self.data_offset.to_le_bytes());
91        hasher.update(&self.index_offset.to_le_bytes());
92        hasher.update(&self.uri_offset.to_le_bytes());
93        self.checksum = *hasher.finalize().as_bytes();
94    }
95}
96
97/// Memory-mapped vector index for large datasets
98pub struct MemoryMappedVectorIndex {
99    config: IndexConfig,
100    path: PathBuf,
101    header: Arc<RwLock<FileHeader>>,
102    data_file: Arc<Mutex<File>>,
103    data_mmap: Arc<RwLock<Option<Mmap>>>,
104    uri_map: Arc<RwLock<HashMap<String, u64>>>, // URI to vector ID
105    uri_store: Arc<RwLock<Vec<String>>>,        // Vector ID to URI
106    write_buffer: Arc<Mutex<Vec<(String, Vector)>>>,
107    buffer_size: usize,
108
109    // Advanced memory mapping features
110    advanced_mmap: Option<Arc<AdvancedMemoryMap>>,
111    numa_allocator: Arc<NumaVectorAllocator>,
112    enable_lazy_loading: bool,
113}
114
115impl MemoryMappedVectorIndex {
116    /// Create a new memory-mapped vector index
117    pub fn new<P: AsRef<Path>>(path: P, config: IndexConfig) -> Result<Self> {
118        let path = path.as_ref().to_path_buf();
119
120        // Create or open the data file
121        let data_file = OpenOptions::new()
122            .read(true)
123            .write(true)
124            .create(true)
125            .truncate(true)
126            .open(&path)
127            .context("Failed to open data file")?;
128
129        // Initialize or load header
130        let header = if data_file.metadata()?.len() == 0 {
131            // New file, write header
132            let header = FileHeader::new(0);
133            data_file.set_len(HEADER_SIZE as u64)?;
134            let mut header_bytes = vec![0u8; HEADER_SIZE];
135            unsafe {
136                std::ptr::copy_nonoverlapping(
137                    &header as *const _ as *const u8,
138                    header_bytes.as_mut_ptr(),
139                    std::mem::size_of::<FileHeader>(),
140                );
141            }
142            (&data_file).write_all(&header_bytes)?;
143            header
144        } else {
145            // Existing file, read header
146            let mmap = unsafe { MmapOptions::new().map(&data_file)? };
147            let header = unsafe { std::ptr::read(mmap.as_ptr() as *const FileHeader) };
148            header.validate()?;
149            header
150        };
151
152        Ok(Self {
153            config,
154            path,
155            header: Arc::new(RwLock::new(header)),
156            data_file: Arc::new(Mutex::new(data_file)),
157            data_mmap: Arc::new(RwLock::new(None)),
158            uri_map: Arc::new(RwLock::new(HashMap::new())),
159            uri_store: Arc::new(RwLock::new(Vec::new())),
160            write_buffer: Arc::new(Mutex::new(Vec::new())),
161            buffer_size: 1000, // Buffer 1000 vectors before flushing
162            advanced_mmap: None,
163            numa_allocator: Arc::new(NumaVectorAllocator::new()),
164            enable_lazy_loading: true,
165        })
166    }
167
168    /// Load an existing memory-mapped index
169    pub fn load<P: AsRef<Path>>(path: P, config: IndexConfig) -> Result<Self> {
170        let path = path.as_ref().to_path_buf();
171
172        // Open existing file without truncation
173        let data_file = OpenOptions::new()
174            .read(true)
175            .write(true)
176            .open(&path)
177            .context("Failed to open existing data file")?;
178
179        // Read and validate header
180        let mmap = unsafe { MmapOptions::new().map(&data_file)? };
181        let header = unsafe { std::ptr::read(mmap.as_ptr() as *const FileHeader) };
182        header.validate()?;
183
184        let mut index = Self {
185            config,
186            path,
187            header: Arc::new(RwLock::new(header)),
188            data_file: Arc::new(Mutex::new(data_file)),
189            data_mmap: Arc::new(RwLock::new(None)),
190            uri_map: Arc::new(RwLock::new(HashMap::new())),
191            uri_store: Arc::new(RwLock::new(Vec::new())),
192            write_buffer: Arc::new(Mutex::new(Vec::new())),
193            buffer_size: 1000,
194            advanced_mmap: None,
195            numa_allocator: Arc::new(NumaVectorAllocator::new()),
196            enable_lazy_loading: true,
197        };
198
199        index.reload_mmap()?;
200        index.load_uri_mappings()?;
201        Ok(index)
202    }
203
204    /// Reload memory mapping with optimized configuration
205    fn reload_mmap(&mut self) -> Result<()> {
206        let file = self.data_file.lock();
207        let file_len = file.metadata()?.len();
208
209        if file_len > HEADER_SIZE as u64 {
210            // Create optimized memory mapping with proper options
211            let mmap = unsafe {
212                MmapOptions::new()
213                    .huge(Some(21)) // Use huge pages (2MB) for better performance
214                    .populate() // Pre-populate pages to reduce page faults
215                    .map(&*file)?
216            };
217
218            // Create advanced memory map if lazy loading is enabled
219            if self.enable_lazy_loading {
220                // Calculate optimal page count based on file size
221                let optimal_pages =
222                    ((file_len as usize / VECTOR_PAGE_SIZE) / 10).clamp(1000, 50000);
223
224                // Create advanced memory mapping with cloned mmap
225                let cloned_mmap = unsafe { MmapOptions::new().map(&*file)? };
226                let advanced = AdvancedMemoryMap::new(Some(cloned_mmap), optimal_pages);
227                self.advanced_mmap = Some(Arc::new(advanced));
228            }
229
230            *self.data_mmap.write() = Some(mmap);
231        }
232
233        Ok(())
234    }
235
236    /// Load URI mappings from disk
237    fn load_uri_mappings(&self) -> Result<()> {
238        let header = self.header.read();
239        let uri_offset = header.uri_offset as usize;
240
241        if uri_offset > 0 {
242            if let Some(ref mmap) = *self.data_mmap.read() {
243                // Parse URI mappings from memory-mapped region
244                let uri_data = &mmap[uri_offset..];
245                let mut offset = 0;
246                let mut uri_map = self.uri_map.write();
247                let mut uri_store = self.uri_store.write();
248
249                for id in 0..header.vector_count {
250                    if offset + 4 > uri_data.len() {
251                        break;
252                    }
253
254                    let uri_len = u32::from_le_bytes([
255                        uri_data[offset],
256                        uri_data[offset + 1],
257                        uri_data[offset + 2],
258                        uri_data[offset + 3],
259                    ]) as usize;
260                    offset += 4;
261
262                    if offset + uri_len > uri_data.len() {
263                        break;
264                    }
265
266                    let uri =
267                        String::from_utf8_lossy(&uri_data[offset..offset + uri_len]).into_owned();
268                    offset += uri_len;
269
270                    uri_map.insert(uri.clone(), id);
271                    uri_store.push(uri);
272                }
273            }
274        }
275
276        Ok(())
277    }
278
279    /// Flush write buffer to disk with optimized batch operations
280    fn flush_buffer(&self) -> Result<()> {
281        let mut buffer = self.write_buffer.lock();
282        if buffer.is_empty() {
283            return Ok(());
284        }
285
286        let mut file = self.data_file.lock();
287        let mut header = self.header.write();
288
289        // Calculate required space
290        let vectors_to_write = buffer.len();
291
292        // Pre-validate all vectors and calculate total size
293        let mut total_vector_data_size = 0;
294        for (_, vector) in buffer.iter() {
295            if header.dimensions == 0 {
296                header.dimensions = vector.dimensions as u32;
297                header.vector_size = vector.dimensions as u32 * std::mem::size_of::<f32>() as u32;
298                total_vector_data_size = vectors_to_write * header.vector_size as usize;
299            } else if vector.dimensions != header.dimensions as usize {
300                bail!(
301                    "Vector dimensions ({}) don't match index dimensions ({})",
302                    vector.dimensions,
303                    header.dimensions
304                );
305            } else {
306                total_vector_data_size = vectors_to_write * header.vector_size as usize;
307            }
308        }
309
310        // Extend file if needed
311        let current_data_end =
312            header.data_offset + (header.vector_count * header.vector_size as u64);
313        let new_data_end = current_data_end + total_vector_data_size as u64;
314
315        file.set_len(new_data_end)?;
316        file.seek(SeekFrom::Start(current_data_end))?;
317
318        // Prepare batch write buffer for better I/O performance
319        let mut batch_write_buffer = Vec::with_capacity(total_vector_data_size);
320        let mut uri_updates = Vec::with_capacity(vectors_to_write);
321        let mut uri_map = self.uri_map.write();
322        let mut uri_store = self.uri_store.write();
323
324        // Batch prepare all data in memory first
325        for (uri, vector) in buffer.drain(..) {
326            // Convert vector to bytes
327            let vector_f32 = vector.as_f32();
328            let vector_bytes: Vec<u8> = vector_f32.iter().flat_map(|&f| f.to_le_bytes()).collect();
329            batch_write_buffer.extend_from_slice(&vector_bytes);
330
331            // Prepare URI updates
332            let vector_id = header.vector_count + uri_updates.len() as u64;
333            uri_updates.push((uri, vector_id));
334        }
335
336        // Single large write operation for much better performance
337        file.write_all(&batch_write_buffer)?;
338
339        // Update all URI mappings after successful write
340        for (uri, vector_id) in uri_updates {
341            uri_map.insert(uri.clone(), vector_id);
342            uri_store.push(uri);
343        }
344        header.vector_count += vectors_to_write as u64;
345
346        // Update header with optimized single write
347        header.compute_checksum();
348        file.seek(SeekFrom::Start(0))?;
349        let header_bytes = unsafe {
350            std::slice::from_raw_parts(
351                &*header as *const _ as *const u8,
352                std::mem::size_of::<FileHeader>(),
353            )
354        };
355        file.write_all(header_bytes)?;
356
357        // Use fsync for better durability control
358        file.sync_all()?;
359
360        // Reload memory mapping with optimizations
361        drop(file);
362        drop(header);
363        drop(uri_map);
364        drop(uri_store);
365
366        // Reload with advanced memory mapping if enabled
367        let file = self.data_file.lock();
368        let file_len = file.metadata()?.len();
369        if file_len > HEADER_SIZE as u64 {
370            // Use optimized mmap options for better performance
371            let mmap = unsafe {
372                MmapOptions::new()
373                    .populate() // Pre-populate pages
374                    .map(&*file)?
375            };
376            *self.data_mmap.write() = Some(mmap);
377
378            // Update advanced mmap if it exists
379            if let Some(ref advanced_mmap) = self.advanced_mmap {
380                // Trigger a prefetch of recently written pages
381                let start_page = (current_data_end as usize) / VECTOR_PAGE_SIZE;
382                let end_page = (new_data_end as usize) / VECTOR_PAGE_SIZE;
383
384                for page_id in start_page..=end_page.min(start_page + 10) {
385                    advanced_mmap.async_prefetch(page_id);
386                }
387            }
388        }
389
390        Ok(())
391    }
392
393    /// Get vector by ID from memory-mapped region with optimized loading
394    fn get_vector_by_id(&self, id: u64) -> Result<Option<Vector>> {
395        let header = self.header.read();
396
397        if id >= header.vector_count {
398            return Ok(None);
399        }
400
401        // Try advanced memory mapping first for better performance
402        if let Some(ref advanced_mmap) = self.advanced_mmap {
403            let offset = header.data_offset as usize + (id as usize * header.vector_size as usize);
404            let page_id = offset / VECTOR_PAGE_SIZE;
405
406            if let Ok(page_entry) = advanced_mmap.get_page(page_id) {
407                let page_offset = offset % VECTOR_PAGE_SIZE;
408                let vector_end = page_offset + header.vector_size as usize;
409
410                if vector_end <= page_entry.data().len() {
411                    // Use NUMA-optimized vector allocation
412                    let numa_node = page_entry.numa_node();
413                    let values = self
414                        .numa_allocator
415                        .allocate_vector_on_node(header.dimensions as usize, Some(numa_node));
416
417                    // Optimized SIMD-friendly vector parsing
418                    return Ok(Some(self.parse_vector_optimized(
419                        &page_entry.data()[page_offset..vector_end],
420                        header.dimensions as usize,
421                        values,
422                    )?));
423                }
424            }
425        }
426
427        // Fallback to direct memory mapping
428        if let Some(ref mmap) = *self.data_mmap.read() {
429            let offset = header.data_offset as usize + (id as usize * header.vector_size as usize);
430            let end = offset + header.vector_size as usize;
431
432            if end <= mmap.len() {
433                let vector_bytes = &mmap[offset..end];
434                let values = self
435                    .numa_allocator
436                    .allocate_vector_on_node(header.dimensions as usize, None);
437
438                return Ok(Some(self.parse_vector_optimized(
439                    vector_bytes,
440                    header.dimensions as usize,
441                    values,
442                )?));
443            }
444        }
445
446        Ok(None)
447    }
448
449    /// Optimized vector parsing with SIMD acceleration where possible
450    fn parse_vector_optimized(
451        &self,
452        bytes: &[u8],
453        dimensions: usize,
454        mut values: Vec<f32>,
455    ) -> Result<Vector> {
456        values.clear();
457        values.reserve_exact(dimensions);
458
459        // Use chunked parsing for better cache locality
460        for chunk in bytes.chunks_exact(4) {
461            if values.len() >= dimensions {
462                break;
463            }
464            let float_val = f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
465            values.push(float_val);
466        }
467
468        Ok(Vector::new(values))
469    }
470
471    /// Search using brute force with memory-mapped vectors
472    fn search_mmap(&self, query: &Vector, k: usize) -> Result<Vec<SearchResult>> {
473        let header = self.header.read();
474        let distance_metric = self.config.distance_metric;
475
476        if header.vector_count == 0 {
477            return Ok(Vec::new());
478        }
479
480        // Check if we should use parallel search
481        if self.config.parallel && header.vector_count > 1000 {
482            self.search_mmap_parallel(query, k, distance_metric)
483        } else {
484            self.search_mmap_sequential(query, k, distance_metric)
485        }
486    }
487
488    /// Sequential search through memory-mapped vectors
489    fn search_mmap_sequential(
490        &self,
491        query: &Vector,
492        k: usize,
493        distance_metric: DistanceMetric,
494    ) -> Result<Vec<SearchResult>> {
495        let header = self.header.read();
496        let uri_store = self.uri_store.read();
497        let mut heap = BinaryHeap::new();
498
499        for id in 0..header.vector_count {
500            if let Some(vector) = self.get_vector_by_id(id)? {
501                let distance = distance_metric.distance_vectors(query, &vector);
502
503                if heap.len() < k {
504                    heap.push(std::cmp::Reverse(SearchResult {
505                        uri: uri_store
506                            .get(id as usize)
507                            .cloned()
508                            .unwrap_or_else(|| format!("vector_{id}")),
509                        distance,
510                        score: 1.0 - distance, // Convert distance to similarity score
511                        metadata: None,
512                    }));
513                } else if let Some(std::cmp::Reverse(worst)) = heap.peek() {
514                    if distance < worst.distance {
515                        heap.pop();
516                        heap.push(std::cmp::Reverse(SearchResult {
517                            uri: uri_store
518                                .get(id as usize)
519                                .cloned()
520                                .unwrap_or_else(|| format!("vector_{id}")),
521                            distance,
522                            score: 1.0 - distance, // Convert distance to similarity score
523                            metadata: None,
524                        }));
525                    }
526                }
527            }
528        }
529
530        let mut results: Vec<SearchResult> = heap.into_iter().map(|r| r.0).collect();
531        results.sort_by(|a, b| {
532            a.distance
533                .partial_cmp(&b.distance)
534                .unwrap_or(std::cmp::Ordering::Equal)
535        });
536        Ok(results)
537    }
538
539    /// Parallel search through memory-mapped vectors
540    fn search_mmap_parallel(
541        &self,
542        query: &Vector,
543        k: usize,
544        distance_metric: DistanceMetric,
545    ) -> Result<Vec<SearchResult>> {
546        let header = self.header.read();
547        let uri_store = self.uri_store.read();
548        let vector_count = header.vector_count;
549        let chunk_size = (vector_count / num_threads() as u64).max(100);
550
551        // Process chunks in parallel
552        let partial_results: Vec<Vec<SearchResult>> = (0..vector_count)
553            .step_by(chunk_size as usize)
554            .collect::<Vec<_>>()
555            .par_iter()
556            .map(|&start_id| {
557                let end_id = (start_id + chunk_size).min(vector_count);
558                let mut local_heap = BinaryHeap::new();
559
560                for id in start_id..end_id {
561                    if let Ok(Some(vector)) = self.get_vector_by_id(id) {
562                        let distance = distance_metric.distance_vectors(query, &vector);
563
564                        if local_heap.len() < k {
565                            local_heap.push(std::cmp::Reverse(SearchResult {
566                                uri: uri_store
567                                    .get(id as usize)
568                                    .cloned()
569                                    .unwrap_or_else(|| format!("vector_{id}")),
570                                distance,
571                                score: 1.0 - distance, // Convert distance to similarity score
572                                metadata: None,
573                            }));
574                        } else if let Some(std::cmp::Reverse(worst)) = local_heap.peek() {
575                            if distance < worst.distance {
576                                local_heap.pop();
577                                local_heap.push(std::cmp::Reverse(SearchResult {
578                                    uri: uri_store
579                                        .get(id as usize)
580                                        .cloned()
581                                        .unwrap_or_else(|| format!("vector_{id}")),
582                                    distance,
583                                    score: 1.0 - distance, // Convert distance to similarity score
584                                    metadata: None,
585                                }));
586                            }
587                        }
588                    }
589                }
590
591                local_heap
592                    .into_sorted_vec()
593                    .into_iter()
594                    .map(|r| r.0)
595                    .collect()
596            })
597            .collect();
598
599        // Merge results from all chunks
600        let mut final_heap = BinaryHeap::new();
601        for partial in partial_results {
602            for result in partial {
603                if final_heap.len() < k {
604                    final_heap.push(std::cmp::Reverse(result));
605                } else if let Some(std::cmp::Reverse(worst)) = final_heap.peek() {
606                    if result.distance < worst.distance {
607                        final_heap.pop();
608                        final_heap.push(std::cmp::Reverse(result));
609                    }
610                }
611            }
612        }
613
614        let mut results: Vec<SearchResult> = final_heap.into_iter().map(|r| r.0).collect();
615        results.sort_by(|a, b| {
616            a.distance
617                .partial_cmp(&b.distance)
618                .unwrap_or(std::cmp::Ordering::Equal)
619        });
620        Ok(results)
621    }
622
623    /// Save URI mappings to disk
624    pub fn save_uri_mappings(&self) -> Result<()> {
625        let mut file = self.data_file.lock();
626        let mut header = self.header.write();
627        let uri_store = self.uri_store.read();
628
629        // Calculate size needed for URI data
630        let mut uri_data_size = 0;
631        for uri in uri_store.iter() {
632            uri_data_size += 4 + uri.len(); // 4 bytes for length + URI bytes
633        }
634
635        // Set URI offset after vector data
636        let data_end = header.data_offset + (header.vector_count * header.vector_size as u64);
637        header.uri_offset = data_end;
638
639        // Extend file and write URI data
640        file.set_len(data_end + uri_data_size as u64)?;
641        file.seek(SeekFrom::Start(header.uri_offset))?;
642
643        for uri in uri_store.iter() {
644            let len_bytes = (uri.len() as u32).to_le_bytes();
645            file.write_all(&len_bytes)?;
646            file.write_all(uri.as_bytes())?;
647        }
648
649        // Update header
650        header.compute_checksum();
651        file.seek(SeekFrom::Start(0))?;
652        let header_bytes = unsafe {
653            std::slice::from_raw_parts(
654                &*header as *const _ as *const u8,
655                std::mem::size_of::<FileHeader>(),
656            )
657        };
658        file.write_all(header_bytes)?;
659        file.sync_all()?;
660
661        Ok(())
662    }
663
664    /// Compact the index file by removing deleted entries
665    pub fn compact(&self) -> Result<()> {
666        // This would rewrite the file removing any gaps
667        // For now, we don't support deletion, so nothing to compact
668        Ok(())
669    }
670
671    /// Get index statistics
672    pub fn stats(&self) -> MemoryMappedIndexStats {
673        let header = self.header.read();
674        let file_size = self
675            .data_file
676            .lock()
677            .metadata()
678            .map(|m| m.len())
679            .unwrap_or(0);
680
681        MemoryMappedIndexStats {
682            vector_count: header.vector_count,
683            dimensions: header.dimensions,
684            file_size,
685            memory_usage: self.estimate_memory_usage(),
686        }
687    }
688
689    fn estimate_memory_usage(&self) -> usize {
690        let uri_map_size = self.uri_map.read().len()
691            * (std::mem::size_of::<String>() + std::mem::size_of::<u64>());
692        let uri_store_size = self
693            .uri_store
694            .read()
695            .iter()
696            .map(|s| s.capacity())
697            .sum::<usize>();
698        let buffer_size = self.write_buffer.lock().len()
699            * (std::mem::size_of::<String>() + std::mem::size_of::<Vector>());
700
701        uri_map_size + uri_store_size + buffer_size + HEADER_SIZE
702    }
703
704    /// Enable or disable lazy loading
705    pub fn set_lazy_loading(&mut self, enabled: bool) {
706        self.enable_lazy_loading = enabled;
707    }
708
709    /// Get advanced memory mapping statistics
710    pub fn advanced_stats(&self) -> Option<MemoryMapStats> {
711        self.advanced_mmap.as_ref().map(|mmap| mmap.stats())
712    }
713
714    /// Configure NUMA allocation preferences
715    pub fn configure_numa(&mut self, numa_enabled: bool) {
716        if numa_enabled {
717            self.numa_allocator = Arc::new(NumaVectorAllocator::new());
718        }
719    }
720}
721
722impl VectorIndex for MemoryMappedVectorIndex {
723    fn insert(&mut self, uri: String, vector: Vector) -> Result<()> {
724        // Add to write buffer
725        self.write_buffer.lock().push((uri, vector));
726
727        // Flush if buffer is full
728        if self.write_buffer.lock().len() >= self.buffer_size {
729            self.flush_buffer()?;
730        }
731
732        Ok(())
733    }
734
735    fn search_knn(&self, query: &Vector, k: usize) -> Result<Vec<(String, f32)>> {
736        // Flush any pending writes
737        if !self.write_buffer.lock().is_empty() {
738            self.flush_buffer()?;
739        }
740
741        let results = self.search_mmap(query, k)?;
742        Ok(results.into_iter().map(|r| (r.uri, r.distance)).collect())
743    }
744
745    fn search_threshold(&self, query: &Vector, threshold: f32) -> Result<Vec<(String, f32)>> {
746        // Flush any pending writes
747        if !self.write_buffer.lock().is_empty() {
748            self.flush_buffer()?;
749        }
750
751        let header = self.header.read();
752        let uri_store = self.uri_store.read();
753        let distance_metric = self.config.distance_metric;
754        let mut results = Vec::new();
755
756        for id in 0..header.vector_count {
757            if let Some(vector) = self.get_vector_by_id(id)? {
758                let distance = distance_metric.distance_vectors(query, &vector);
759                if distance <= threshold {
760                    let uri = uri_store
761                        .get(id as usize)
762                        .cloned()
763                        .unwrap_or_else(|| format!("vector_{id}"));
764                    results.push((uri, distance));
765                }
766            }
767        }
768
769        results.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
770        Ok(results)
771    }
772
773    fn get_vector(&self, _uri: &str) -> Option<&Vector> {
774        // Memory-mapped index doesn't store vectors in memory
775        // We would need to read from disk, which doesn't fit the API
776        // that returns a reference. Return None for now.
777        None
778    }
779}
780
781impl Drop for MemoryMappedVectorIndex {
782    fn drop(&mut self) {
783        // Flush any remaining vectors
784        if let Err(e) = self.flush_buffer() {
785            eprintln!("Error flushing buffer on drop: {e}");
786        }
787        // Save URI mappings
788        if let Err(e) = self.save_uri_mappings() {
789            eprintln!("Error saving URI mappings on drop: {e}");
790        }
791    }
792}
793
794/// Statistics for memory-mapped index
795#[derive(Debug, Clone)]
796pub struct MemoryMappedIndexStats {
797    pub vector_count: u64,
798    pub dimensions: u32,
799    pub file_size: u64,
800    pub memory_usage: usize,
801}
802
803#[cfg(test)]
804mod tests {
805    use super::*;
806    use tempfile::tempdir;
807
808    #[test]
809    fn test_memory_mapped_index_basic() -> Result<()> {
810        let dir = tempdir()?;
811        let path = dir.path().join("test_vectors.idx");
812
813        let config = IndexConfig::default();
814        let mut index = MemoryMappedVectorIndex::new(&path, config)?;
815
816        // Insert some vectors
817        let v1 = Vector::new(vec![1.0, 2.0, 3.0]);
818        let v2 = Vector::new(vec![4.0, 5.0, 6.0]);
819        let v3 = Vector::new(vec![7.0, 8.0, 9.0]);
820
821        index.insert("vec1".to_string(), v1.clone())?;
822        index.insert("vec2".to_string(), v2.clone())?;
823        index.insert("vec3".to_string(), v3.clone())?;
824
825        // Force flush
826        index.flush_buffer()?;
827
828        // Search
829        let query = Vector::new(vec![3.0, 4.0, 5.0]);
830        let results = index.search_knn(&query, 2)?;
831
832        assert_eq!(results.len(), 2);
833        assert_eq!(results[0].0, "vec2");
834
835        Ok(())
836    }
837
838    #[test]
839    fn test_memory_mapped_index_persistence() -> Result<()> {
840        let dir = tempdir()?;
841        let path = dir.path().join("test_persist.idx");
842
843        // Create and populate index
844        {
845            let config = IndexConfig::default();
846            let mut index = MemoryMappedVectorIndex::new(&path, config)?;
847
848            for i in 0..10 {
849                let vec = Vector::new(vec![i as f32, (i + 1) as f32, (i + 2) as f32]);
850                index.insert(format!("vec{i}"), vec)?;
851            }
852
853            // Explicitly flush the buffer to ensure data is persisted
854            index.flush_buffer()?;
855        }
856
857        // Load existing index
858        {
859            let config = IndexConfig::default();
860            let index = MemoryMappedVectorIndex::load(&path, config)?;
861
862            let stats = index.stats();
863            assert_eq!(stats.vector_count, 10);
864            assert_eq!(stats.dimensions, 3);
865
866            let query = Vector::new(vec![5.0, 6.0, 7.0]);
867            let results = index.search_knn(&query, 3)?;
868
869            assert_eq!(results.len(), 3);
870            assert_eq!(results[0].0, "vec5");
871        }
872
873        Ok(())
874    }
875}