oxirs_vec/
mmap_advanced.rs

1//! Advanced memory mapping features for large datasets
2//!
3//! This module provides advanced memory mapping capabilities including:
4//! - Lazy loading with page-level access
5//! - Smart caching and eviction policies
6//! - NUMA-aware memory allocation
7//! - Swapping policies for memory pressure
8
9use anyhow::{bail, Result};
10use lru::LruCache;
11use memmap2::Mmap;
12use oxirs_core::parallel::*;
13use parking_lot::RwLock;
14use std::collections::{HashMap, VecDeque};
15use std::num::NonZeroUsize;
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::Instant;
19
20/// Page size for lazy loading (16KB for better vector alignment)
21const VECTOR_PAGE_SIZE: usize = 16384;
22
23/// Maximum number of pages to keep in memory
24const DEFAULT_MAX_PAGES: usize = 10000;
25
26/// NUMA node information
27#[cfg(target_os = "linux")]
28mod numa {
29    use libc::{c_ulong, c_void};
30
31    extern "C" {
32        fn numa_available() -> i32;
33        fn numa_max_node() -> i32;
34        fn numa_node_of_cpu(cpu: i32) -> i32;
35        fn numa_alloc_onnode(size: usize, node: i32) -> *mut c_void;
36        fn numa_free(ptr: *mut c_void, size: usize);
37        fn mbind(
38            addr: *mut c_void,
39            len: c_ulong,
40            mode: i32,
41            nodemask: *const c_ulong,
42            maxnode: c_ulong,
43            flags: u32,
44        ) -> i32;
45    }
46
47    pub const MPOL_BIND: i32 = 2;
48    pub const MPOL_INTERLEAVE: i32 = 3;
49
50    pub fn is_available() -> bool {
51        unsafe { numa_available() >= 0 }
52    }
53
54    pub fn max_node() -> i32 {
55        unsafe { numa_max_node() }
56    }
57
58    pub fn node_of_cpu(cpu: i32) -> i32 {
59        unsafe { numa_node_of_cpu(cpu) }
60    }
61}
62
63#[cfg(not(target_os = "linux"))]
64mod numa {
65    pub fn is_available() -> bool {
66        false
67    }
68    pub fn max_node() -> i32 {
69        0
70    }
71    pub fn node_of_cpu(_cpu: i32) -> i32 {
72        0
73    }
74}
75
76/// Page access pattern for predictive prefetching
77#[derive(Debug, Clone)]
78struct AccessPattern {
79    page_id: usize,
80    access_time: Instant,
81    access_count: usize,
82}
83
84/// Page cache entry with metadata
85#[derive(Debug)]
86pub struct PageCacheEntry {
87    data: Vec<u8>,
88    page_id: usize,
89    last_access: Instant,
90    access_count: AtomicUsize,
91    dirty: bool,
92    numa_node: i32,
93}
94
95impl PageCacheEntry {
96    /// Get the data slice
97    pub fn data(&self) -> &[u8] {
98        &self.data
99    }
100
101    /// Get the NUMA node
102    pub fn numa_node(&self) -> i32 {
103        self.numa_node
104    }
105}
106
107/// Eviction policy for page cache
108#[derive(Debug, Clone, Copy)]
109pub enum EvictionPolicy {
110    LRU,   // Least Recently Used
111    LFU,   // Least Frequently Used
112    FIFO,  // First In First Out
113    Clock, // Clock algorithm
114    ARC,   // Adaptive Replacement Cache
115}
116
117/// Memory pressure levels
118#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
119pub enum MemoryPressure {
120    Low,
121    Medium,
122    High,
123    Critical,
124}
125
126/// Advanced memory-mapped vector storage
127pub struct AdvancedMemoryMap {
128    /// Base file mapping
129    mmap: Option<Mmap>,
130
131    /// Page cache
132    page_cache: Arc<RwLock<LruCache<usize, Arc<PageCacheEntry>>>>,
133
134    /// Access pattern tracking
135    access_patterns: Arc<RwLock<VecDeque<AccessPattern>>>,
136
137    /// Page access frequency
138    page_frequency: Arc<RwLock<HashMap<usize, usize>>>,
139
140    /// Eviction policy
141    eviction_policy: EvictionPolicy,
142
143    /// Memory statistics
144    total_memory: AtomicUsize,
145    cache_hits: AtomicU64,
146    cache_misses: AtomicU64,
147
148    /// NUMA configuration
149    numa_enabled: bool,
150    numa_nodes: Vec<i32>,
151
152    /// Memory pressure monitor
153    memory_pressure: Arc<RwLock<MemoryPressure>>,
154
155    /// Configuration
156    max_pages: usize,
157    page_size: usize,
158    prefetch_distance: usize,
159}
160
161impl AdvancedMemoryMap {
162    /// Create a new advanced memory map
163    pub fn new(mmap: Option<Mmap>, max_pages: usize) -> Self {
164        let numa_enabled = numa::is_available();
165        let numa_nodes = if numa_enabled {
166            (0..=numa::max_node()).collect()
167        } else {
168            vec![0]
169        };
170
171        let cache_size = NonZeroUsize::new(max_pages).unwrap_or(NonZeroUsize::new(1).unwrap());
172
173        Self {
174            mmap,
175            page_cache: Arc::new(RwLock::new(LruCache::new(cache_size))),
176            access_patterns: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
177            page_frequency: Arc::new(RwLock::new(HashMap::new())),
178            eviction_policy: EvictionPolicy::ARC,
179            total_memory: AtomicUsize::new(0),
180            cache_hits: AtomicU64::new(0),
181            cache_misses: AtomicU64::new(0),
182            numa_enabled,
183            numa_nodes,
184            memory_pressure: Arc::new(RwLock::new(MemoryPressure::Low)),
185            max_pages,
186            page_size: VECTOR_PAGE_SIZE,
187            prefetch_distance: 3,
188        }
189    }
190
191    /// Get a page with lazy loading
192    pub fn get_page(&self, page_id: usize) -> Result<Arc<PageCacheEntry>> {
193        // Check cache first
194        {
195            let mut cache = self.page_cache.write();
196            if let Some(entry) = cache.get(&page_id) {
197                self.cache_hits.fetch_add(1, Ordering::Relaxed);
198                entry.access_count.fetch_add(1, Ordering::Relaxed);
199                self.record_access(page_id);
200                return Ok(Arc::clone(entry));
201            }
202        }
203
204        // Cache miss - load from mmap
205        self.cache_misses.fetch_add(1, Ordering::Relaxed);
206        self.load_page(page_id)
207    }
208
209    /// Load a page from memory-mapped file
210    fn load_page(&self, page_id: usize) -> Result<Arc<PageCacheEntry>> {
211        let mmap = self
212            .mmap
213            .as_ref()
214            .ok_or_else(|| anyhow::anyhow!("No memory mapping available"))?;
215
216        let start = page_id * self.page_size;
217        let end = (start + self.page_size).min(mmap.len());
218
219        if start >= mmap.len() {
220            bail!("Page {} out of bounds", page_id);
221        }
222
223        // Copy page data
224        let page_data = mmap[start..end].to_vec();
225
226        // Determine NUMA node for allocation
227        let numa_node = if self.numa_enabled {
228            let cpu = sched_getcpu();
229            numa::node_of_cpu(cpu)
230        } else {
231            0
232        };
233
234        let entry = Arc::new(PageCacheEntry {
235            data: page_data,
236            page_id,
237            last_access: Instant::now(),
238            access_count: AtomicUsize::new(1),
239            dirty: false,
240            numa_node,
241        });
242
243        // Check memory pressure and evict if needed
244        self.check_memory_pressure();
245        if *self.memory_pressure.read() >= MemoryPressure::High {
246            self.evict_pages(1)?;
247        }
248
249        // Insert into cache
250        {
251            let mut cache = self.page_cache.write();
252            cache.put(page_id, Arc::clone(&entry));
253        }
254
255        self.total_memory
256            .fetch_add(entry.data.len(), Ordering::Relaxed);
257        self.record_access(page_id);
258
259        // Predictive prefetching
260        self.prefetch_pages(page_id);
261
262        Ok(entry)
263    }
264
265    /// Record page access for pattern analysis
266    fn record_access(&self, page_id: usize) {
267        let mut patterns = self.access_patterns.write();
268        patterns.push_back(AccessPattern {
269            page_id,
270            access_time: Instant::now(),
271            access_count: 1,
272        });
273
274        // Keep only recent patterns
275        while patterns.len() > 1000 {
276            patterns.pop_front();
277        }
278
279        // Update frequency map
280        let mut freq = self.page_frequency.write();
281        *freq.entry(page_id).or_insert(0) += 1;
282    }
283
284    /// Predictive prefetching based on access patterns
285    fn prefetch_pages(&self, current_page: usize) {
286        let patterns = self.access_patterns.read();
287        let freq = self.page_frequency.read();
288
289        // Analyze recent access patterns for intelligent prefetching
290        let recent_patterns: Vec<_> = patterns.iter().rev().take(10).collect();
291
292        // Check for sequential access pattern
293        let is_sequential = recent_patterns
294            .windows(2)
295            .all(|w| w[0].page_id > 0 && w[0].page_id == w[1].page_id + 1);
296
297        // Check for strided access pattern
298        let stride = if recent_patterns.len() >= 3 {
299            let diff1 = recent_patterns[0]
300                .page_id
301                .saturating_sub(recent_patterns[1].page_id);
302            let diff2 = recent_patterns[1]
303                .page_id
304                .saturating_sub(recent_patterns[2].page_id);
305            if diff1 == diff2 && diff1 > 0 && diff1 <= 10 {
306                Some(diff1)
307            } else {
308                None
309            }
310        } else {
311            None
312        };
313
314        // Adaptive prefetching based on patterns
315        if is_sequential {
316            // Aggressive sequential prefetching
317            for i in 1..=(self.prefetch_distance * 2) {
318                let prefetch_page = current_page + i;
319                self.async_prefetch(prefetch_page);
320            }
321        } else if let Some(stride) = stride {
322            // Strided prefetching
323            for i in 1..=self.prefetch_distance {
324                let prefetch_page = current_page + (i * stride);
325                self.async_prefetch(prefetch_page);
326            }
327        } else {
328            // Conservative prefetching with frequency-based hints
329            for i in 1..=self.prefetch_distance {
330                let prefetch_page = current_page + i;
331
332                // Check if this page has been accessed frequently
333                let frequency = *freq.get(&prefetch_page).unwrap_or(&0);
334                if frequency > 0 {
335                    self.async_prefetch(prefetch_page);
336                }
337            }
338        }
339
340        // Prefetch frequently accessed pages near current page
341        let nearby_range = current_page.saturating_sub(3)..=(current_page + 3);
342        for page_id in nearby_range {
343            let frequency = *freq.get(&page_id).unwrap_or(&0);
344            if frequency > 2 && page_id != current_page {
345                self.async_prefetch(page_id);
346            }
347        }
348    }
349
350    /// Asynchronous prefetch with throttling
351    pub fn async_prefetch(&self, page_id: usize) {
352        // Check if page is already in cache
353        {
354            let cache = self.page_cache.read();
355            if cache.contains(&page_id) {
356                return;
357            }
358        }
359
360        // Check memory pressure before prefetching
361        if *self.memory_pressure.read() >= MemoryPressure::High {
362            return;
363        }
364
365        let self_clone = self.clone_ref();
366        spawn(move || {
367            let _ = self_clone.get_page(page_id);
368        });
369    }
370
371    /// Check system memory pressure
372    fn check_memory_pressure(&self) {
373        let total_memory = self.total_memory.load(Ordering::Relaxed);
374        let max_memory = self.max_pages * self.page_size;
375
376        let pressure = if total_memory < max_memory / 2 {
377            MemoryPressure::Low
378        } else if total_memory < max_memory * 3 / 4 {
379            MemoryPressure::Medium
380        } else if total_memory < max_memory * 9 / 10 {
381            MemoryPressure::High
382        } else {
383            MemoryPressure::Critical
384        };
385
386        *self.memory_pressure.write() = pressure;
387    }
388
389    /// Evict pages based on eviction policy
390    fn evict_pages(&self, num_pages: usize) -> Result<()> {
391        match self.eviction_policy {
392            EvictionPolicy::LRU => self.evict_lru(num_pages),
393            EvictionPolicy::LFU => self.evict_lfu(num_pages),
394            EvictionPolicy::FIFO => self.evict_fifo(num_pages),
395            EvictionPolicy::Clock => self.evict_clock(num_pages),
396            EvictionPolicy::ARC => self.evict_arc(num_pages),
397        }
398    }
399
400    /// LRU eviction
401    fn evict_lru(&self, num_pages: usize) -> Result<()> {
402        let mut cache = self.page_cache.write();
403
404        // LruCache automatically evicts least recently used
405        for _ in 0..num_pages {
406            if let Some((_, entry)) = cache.pop_lru() {
407                self.total_memory
408                    .fetch_sub(entry.data.len(), Ordering::Relaxed);
409
410                // Write back if dirty
411                if entry.dirty {
412                    // TODO: Implement write-back
413                }
414            }
415        }
416
417        Ok(())
418    }
419
420    /// LFU eviction
421    fn evict_lfu(&self, num_pages: usize) -> Result<()> {
422        let cache = self.page_cache.read();
423        let freq = self.page_frequency.read();
424
425        // Sort pages by frequency
426        let mut pages_by_freq: Vec<(usize, usize)> = cache
427            .iter()
428            .map(|(page_id, _)| (*page_id, *freq.get(page_id).unwrap_or(&0)))
429            .collect();
430        pages_by_freq.sort_by_key(|(_, freq)| *freq);
431
432        // Evict least frequently used
433        drop(cache);
434        drop(freq);
435
436        let mut cache = self.page_cache.write();
437        for (page_id, _) in pages_by_freq.iter().take(num_pages) {
438            if let Some(entry) = cache.pop(page_id) {
439                self.total_memory
440                    .fetch_sub(entry.data.len(), Ordering::Relaxed);
441            }
442        }
443
444        Ok(())
445    }
446
447    /// FIFO eviction (not implemented - uses LRU as fallback)
448    fn evict_fifo(&self, num_pages: usize) -> Result<()> {
449        self.evict_lru(num_pages)
450    }
451
452    /// Clock algorithm eviction (not implemented - uses LRU as fallback)
453    fn evict_clock(&self, num_pages: usize) -> Result<()> {
454        self.evict_lru(num_pages)
455    }
456
457    /// ARC (Adaptive Replacement Cache) eviction
458    fn evict_arc(&self, num_pages: usize) -> Result<()> {
459        // Simplified ARC - combines recency and frequency
460        let cache = self.page_cache.read();
461        let freq = self.page_frequency.read();
462
463        // Score = recency * 0.5 + frequency * 0.5
464        let now = Instant::now();
465        let mut scored_pages: Vec<(usize, f64)> = cache
466            .iter()
467            .map(|(page_id, entry)| {
468                let recency_score =
469                    1.0 / (now.duration_since(entry.last_access).as_secs_f64() + 1.0);
470                let frequency_score = *freq.get(page_id).unwrap_or(&0) as f64;
471                let combined_score = recency_score * 0.5 + frequency_score * 0.5;
472                (*page_id, combined_score)
473            })
474            .collect();
475
476        scored_pages.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
477
478        drop(cache);
479        drop(freq);
480
481        let mut cache = self.page_cache.write();
482        for (page_id, _) in scored_pages.iter().take(num_pages) {
483            if let Some(entry) = cache.pop(page_id) {
484                self.total_memory
485                    .fetch_sub(entry.data.len(), Ordering::Relaxed);
486            }
487        }
488
489        Ok(())
490    }
491
492    /// Get cache statistics
493    pub fn stats(&self) -> MemoryMapStats {
494        let cache = self.page_cache.read();
495
496        MemoryMapStats {
497            total_pages: cache.len(),
498            total_memory: self.total_memory.load(Ordering::Relaxed),
499            cache_hits: self.cache_hits.load(Ordering::Relaxed),
500            cache_misses: self.cache_misses.load(Ordering::Relaxed),
501            hit_rate: self.calculate_hit_rate(),
502            memory_pressure: *self.memory_pressure.read(),
503            numa_enabled: self.numa_enabled,
504        }
505    }
506
507    fn calculate_hit_rate(&self) -> f64 {
508        let hits = self.cache_hits.load(Ordering::Relaxed) as f64;
509        let misses = self.cache_misses.load(Ordering::Relaxed) as f64;
510        let total = hits + misses;
511        if total > 0.0 {
512            hits / total
513        } else {
514            0.0
515        }
516    }
517
518    fn clone_ref(&self) -> Self {
519        Self {
520            mmap: None, // Don't clone the mmap
521            page_cache: Arc::clone(&self.page_cache),
522            access_patterns: Arc::clone(&self.access_patterns),
523            page_frequency: Arc::clone(&self.page_frequency),
524            eviction_policy: self.eviction_policy,
525            total_memory: AtomicUsize::new(0),
526            cache_hits: AtomicU64::new(0),
527            cache_misses: AtomicU64::new(0),
528            numa_enabled: self.numa_enabled,
529            numa_nodes: self.numa_nodes.clone(),
530            memory_pressure: Arc::clone(&self.memory_pressure),
531            max_pages: self.max_pages,
532            page_size: self.page_size,
533            prefetch_distance: self.prefetch_distance,
534        }
535    }
536}
537
538/// Statistics for memory-mapped storage
539#[derive(Debug, Clone)]
540pub struct MemoryMapStats {
541    pub total_pages: usize,
542    pub total_memory: usize,
543    pub cache_hits: u64,
544    pub cache_misses: u64,
545    pub hit_rate: f64,
546    pub memory_pressure: MemoryPressure,
547    pub numa_enabled: bool,
548}
549
550/// Get current CPU for NUMA operations
551#[cfg(target_os = "linux")]
552fn sched_getcpu() -> i32 {
553    unsafe { libc::sched_getcpu() }
554}
555
556#[cfg(not(target_os = "linux"))]
557fn sched_getcpu() -> i32 {
558    0
559}
560
561/// NUMA-aware vector allocator
562pub struct NumaVectorAllocator {
563    numa_nodes: Vec<i32>,
564    current_node: AtomicUsize,
565}
566
567impl Default for NumaVectorAllocator {
568    fn default() -> Self {
569        Self::new()
570    }
571}
572
573impl NumaVectorAllocator {
574    pub fn new() -> Self {
575        let numa_nodes = if numa::is_available() {
576            (0..=numa::max_node()).collect()
577        } else {
578            vec![0]
579        };
580
581        Self {
582            numa_nodes,
583            current_node: AtomicUsize::new(0),
584        }
585    }
586
587    /// Allocate vector memory on specific NUMA node
588    pub fn allocate_on_node(&self, size: usize, node: Option<i32>) -> Vec<u8> {
589        if !numa::is_available() {
590            return vec![0u8; size];
591        }
592
593        let _target_node = node.unwrap_or_else(|| {
594            // Round-robin allocation across NUMA nodes
595            let idx = self.current_node.fetch_add(1, Ordering::Relaxed) % self.numa_nodes.len();
596            self.numa_nodes[idx]
597        });
598
599        // For now, just use standard allocation
600        // TODO: Implement actual NUMA allocation when libc bindings are available
601        vec![0u8; size]
602    }
603
604    /// Allocate optimized vector with NUMA awareness (specialized for f32 vectors)
605    pub fn allocate_vector_on_node(&self, dimensions: usize, node: Option<i32>) -> Vec<f32> {
606        if !numa::is_available() {
607            // Pre-allocate with optimal alignment for SIMD operations
608            let mut vec = Vec::with_capacity(dimensions);
609            vec.resize(dimensions, 0.0f32);
610            return vec;
611        }
612
613        let _target_node = node.unwrap_or_else(|| {
614            // Use current CPU's NUMA node for better locality
615            self.preferred_node()
616        });
617
618        // For better performance, use aligned allocation
619        let mut vec = Vec::with_capacity(dimensions);
620        vec.resize(dimensions, 0.0f32);
621
622        // TODO: When NUMA bindings are available, use numa_alloc_onnode
623        // and bind the memory to the specific node
624
625        vec
626    }
627
628    /// Get preferred NUMA node for current thread
629    pub fn preferred_node(&self) -> i32 {
630        if numa::is_available() {
631            numa::node_of_cpu(sched_getcpu())
632        } else {
633            0
634        }
635    }
636}
637
638#[cfg(test)]
639mod tests {
640    use super::*;
641
642    #[test]
643    fn test_memory_pressure() {
644        let mmap = AdvancedMemoryMap::new(None, 100);
645
646        assert_eq!(*mmap.memory_pressure.read(), MemoryPressure::Low);
647
648        // Simulate memory usage
649        mmap.total_memory
650            .store(50 * VECTOR_PAGE_SIZE, Ordering::Relaxed);
651        mmap.check_memory_pressure();
652        assert_eq!(*mmap.memory_pressure.read(), MemoryPressure::Medium);
653
654        mmap.total_memory
655            .store(90 * VECTOR_PAGE_SIZE, Ordering::Relaxed);
656        mmap.check_memory_pressure();
657        assert_eq!(*mmap.memory_pressure.read(), MemoryPressure::Critical);
658    }
659
660    #[test]
661    fn test_cache_stats() {
662        let mmap = AdvancedMemoryMap::new(None, 100);
663
664        mmap.cache_hits.store(75, Ordering::Relaxed);
665        mmap.cache_misses.store(25, Ordering::Relaxed);
666
667        let stats = mmap.stats();
668        assert_eq!(stats.cache_hits, 75);
669        assert_eq!(stats.cache_misses, 25);
670        assert_eq!(stats.hit_rate, 0.75);
671    }
672}