Skip to main content

oxirs_vec/
mmap_advanced.rs

1//! Advanced memory mapping features for large datasets
2//!
3//! This module provides advanced memory mapping capabilities including:
4//! - Lazy loading with page-level access
5//! - Smart caching and eviction policies
6//! - NUMA-aware memory allocation
7//! - Swapping policies for memory pressure
8
9use anyhow::{bail, Result};
10use lru::LruCache;
11use memmap2::Mmap;
12use oxirs_core::parallel::*;
13use parking_lot::RwLock;
14use std::collections::{HashMap, VecDeque};
15use std::num::NonZeroUsize;
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::Instant;
19
20/// Page size for lazy loading (16KB for better vector alignment)
21const VECTOR_PAGE_SIZE: usize = 16384;
22
23/// Maximum number of pages to keep in memory
24const DEFAULT_MAX_PAGES: usize = 10000;
25
26/// NUMA node information
27#[cfg(target_os = "linux")]
28mod numa {
29    use libc::{c_ulong, c_void};
30
31    extern "C" {
32        fn numa_available() -> i32;
33        fn numa_max_node() -> i32;
34        fn numa_node_of_cpu(cpu: i32) -> i32;
35        fn numa_alloc_onnode(size: usize, node: i32) -> *mut c_void;
36        fn numa_free(ptr: *mut c_void, size: usize);
37        fn mbind(
38            addr: *mut c_void,
39            len: c_ulong,
40            mode: i32,
41            nodemask: *const c_ulong,
42            maxnode: c_ulong,
43            flags: u32,
44        ) -> i32;
45    }
46
47    pub const MPOL_BIND: i32 = 2;
48    pub const MPOL_INTERLEAVE: i32 = 3;
49
50    pub fn is_available() -> bool {
51        unsafe { numa_available() >= 0 }
52    }
53
54    pub fn max_node() -> i32 {
55        unsafe { numa_max_node() }
56    }
57
58    pub fn node_of_cpu(cpu: i32) -> i32 {
59        unsafe { numa_node_of_cpu(cpu) }
60    }
61}
62
63#[cfg(not(target_os = "linux"))]
64mod numa {
65    pub fn is_available() -> bool {
66        false
67    }
68    pub fn max_node() -> i32 {
69        0
70    }
71    pub fn node_of_cpu(_cpu: i32) -> i32 {
72        0
73    }
74}
75
76/// Page access pattern for predictive prefetching
77#[derive(Debug, Clone)]
78struct AccessPattern {
79    page_id: usize,
80    access_time: Instant,
81    access_count: usize,
82}
83
84/// Page cache entry with metadata
85#[derive(Debug)]
86pub struct PageCacheEntry {
87    data: Vec<u8>,
88    page_id: usize,
89    last_access: Instant,
90    access_count: AtomicUsize,
91    dirty: bool,
92    numa_node: i32,
93}
94
95impl PageCacheEntry {
96    /// Get the data slice
97    pub fn data(&self) -> &[u8] {
98        &self.data
99    }
100
101    /// Get the NUMA node
102    pub fn numa_node(&self) -> i32 {
103        self.numa_node
104    }
105}
106
107/// Eviction policy for page cache
108#[derive(Debug, Clone, Copy)]
109pub enum EvictionPolicy {
110    LRU,   // Least Recently Used
111    LFU,   // Least Frequently Used
112    FIFO,  // First In First Out
113    Clock, // Clock algorithm
114    ARC,   // Adaptive Replacement Cache
115}
116
117/// Memory pressure levels
118#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
119pub enum MemoryPressure {
120    Low,
121    Medium,
122    High,
123    Critical,
124}
125
126/// Advanced memory-mapped vector storage
127pub struct AdvancedMemoryMap {
128    /// Base file mapping
129    mmap: Option<Mmap>,
130
131    /// Page cache
132    page_cache: Arc<RwLock<LruCache<usize, Arc<PageCacheEntry>>>>,
133
134    /// Access pattern tracking
135    access_patterns: Arc<RwLock<VecDeque<AccessPattern>>>,
136
137    /// Page access frequency
138    page_frequency: Arc<RwLock<HashMap<usize, usize>>>,
139
140    /// Eviction policy
141    eviction_policy: EvictionPolicy,
142
143    /// Memory statistics
144    total_memory: AtomicUsize,
145    cache_hits: AtomicU64,
146    cache_misses: AtomicU64,
147
148    /// NUMA configuration
149    numa_enabled: bool,
150    numa_nodes: Vec<i32>,
151
152    /// Memory pressure monitor
153    memory_pressure: Arc<RwLock<MemoryPressure>>,
154
155    /// Configuration
156    max_pages: usize,
157    page_size: usize,
158    prefetch_distance: usize,
159}
160
161impl AdvancedMemoryMap {
162    /// Create a new advanced memory map
163    pub fn new(mmap: Option<Mmap>, max_pages: usize) -> Self {
164        let numa_enabled = numa::is_available();
165        let numa_nodes = if numa_enabled {
166            (0..=numa::max_node()).collect()
167        } else {
168            vec![0]
169        };
170
171        let cache_size = NonZeroUsize::new(max_pages)
172            .unwrap_or(NonZeroUsize::new(1).expect("constant 1 is non-zero"));
173
174        Self {
175            mmap,
176            page_cache: Arc::new(RwLock::new(LruCache::new(cache_size))),
177            access_patterns: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
178            page_frequency: Arc::new(RwLock::new(HashMap::new())),
179            eviction_policy: EvictionPolicy::ARC,
180            total_memory: AtomicUsize::new(0),
181            cache_hits: AtomicU64::new(0),
182            cache_misses: AtomicU64::new(0),
183            numa_enabled,
184            numa_nodes,
185            memory_pressure: Arc::new(RwLock::new(MemoryPressure::Low)),
186            max_pages,
187            page_size: VECTOR_PAGE_SIZE,
188            prefetch_distance: 3,
189        }
190    }
191
192    /// Get a page with lazy loading
193    pub fn get_page(&self, page_id: usize) -> Result<Arc<PageCacheEntry>> {
194        // Check cache first
195        {
196            let mut cache = self.page_cache.write();
197            if let Some(entry) = cache.get(&page_id) {
198                self.cache_hits.fetch_add(1, Ordering::Relaxed);
199                entry.access_count.fetch_add(1, Ordering::Relaxed);
200                self.record_access(page_id);
201                return Ok(Arc::clone(entry));
202            }
203        }
204
205        // Cache miss - load from mmap
206        self.cache_misses.fetch_add(1, Ordering::Relaxed);
207        self.load_page(page_id)
208    }
209
210    /// Load a page from memory-mapped file
211    fn load_page(&self, page_id: usize) -> Result<Arc<PageCacheEntry>> {
212        let mmap = self
213            .mmap
214            .as_ref()
215            .ok_or_else(|| anyhow::anyhow!("No memory mapping available"))?;
216
217        let start = page_id * self.page_size;
218        let end = (start + self.page_size).min(mmap.len());
219
220        if start >= mmap.len() {
221            bail!("Page {} out of bounds", page_id);
222        }
223
224        // Copy page data
225        let page_data = mmap[start..end].to_vec();
226
227        // Determine NUMA node for allocation
228        let numa_node = if self.numa_enabled {
229            let cpu = sched_getcpu();
230            numa::node_of_cpu(cpu)
231        } else {
232            0
233        };
234
235        let entry = Arc::new(PageCacheEntry {
236            data: page_data,
237            page_id,
238            last_access: Instant::now(),
239            access_count: AtomicUsize::new(1),
240            dirty: false,
241            numa_node,
242        });
243
244        // Check memory pressure and evict if needed
245        self.check_memory_pressure();
246        if *self.memory_pressure.read() >= MemoryPressure::High {
247            self.evict_pages(1)?;
248        }
249
250        // Insert into cache
251        {
252            let mut cache = self.page_cache.write();
253            cache.put(page_id, Arc::clone(&entry));
254        }
255
256        self.total_memory
257            .fetch_add(entry.data.len(), Ordering::Relaxed);
258        self.record_access(page_id);
259
260        // Predictive prefetching
261        self.prefetch_pages(page_id);
262
263        Ok(entry)
264    }
265
266    /// Record page access for pattern analysis
267    fn record_access(&self, page_id: usize) {
268        let mut patterns = self.access_patterns.write();
269        patterns.push_back(AccessPattern {
270            page_id,
271            access_time: Instant::now(),
272            access_count: 1,
273        });
274
275        // Keep only recent patterns
276        while patterns.len() > 1000 {
277            patterns.pop_front();
278        }
279
280        // Update frequency map
281        let mut freq = self.page_frequency.write();
282        *freq.entry(page_id).or_insert(0) += 1;
283    }
284
285    /// Predictive prefetching based on access patterns
286    fn prefetch_pages(&self, current_page: usize) {
287        let patterns = self.access_patterns.read();
288        let freq = self.page_frequency.read();
289
290        // Analyze recent access patterns for intelligent prefetching
291        let recent_patterns: Vec<_> = patterns.iter().rev().take(10).collect();
292
293        // Check for sequential access pattern
294        let is_sequential = recent_patterns
295            .windows(2)
296            .all(|w| w[0].page_id > 0 && w[0].page_id == w[1].page_id + 1);
297
298        // Check for strided access pattern
299        let stride = if recent_patterns.len() >= 3 {
300            let diff1 = recent_patterns[0]
301                .page_id
302                .saturating_sub(recent_patterns[1].page_id);
303            let diff2 = recent_patterns[1]
304                .page_id
305                .saturating_sub(recent_patterns[2].page_id);
306            if diff1 == diff2 && diff1 > 0 && diff1 <= 10 {
307                Some(diff1)
308            } else {
309                None
310            }
311        } else {
312            None
313        };
314
315        // Adaptive prefetching based on patterns
316        if is_sequential {
317            // Aggressive sequential prefetching
318            for i in 1..=(self.prefetch_distance * 2) {
319                let prefetch_page = current_page + i;
320                self.async_prefetch(prefetch_page);
321            }
322        } else if let Some(stride) = stride {
323            // Strided prefetching
324            for i in 1..=self.prefetch_distance {
325                let prefetch_page = current_page + (i * stride);
326                self.async_prefetch(prefetch_page);
327            }
328        } else {
329            // Conservative prefetching with frequency-based hints
330            for i in 1..=self.prefetch_distance {
331                let prefetch_page = current_page + i;
332
333                // Check if this page has been accessed frequently
334                let frequency = *freq.get(&prefetch_page).unwrap_or(&0);
335                if frequency > 0 {
336                    self.async_prefetch(prefetch_page);
337                }
338            }
339        }
340
341        // Prefetch frequently accessed pages near current page
342        let nearby_range = current_page.saturating_sub(3)..=(current_page + 3);
343        for page_id in nearby_range {
344            let frequency = *freq.get(&page_id).unwrap_or(&0);
345            if frequency > 2 && page_id != current_page {
346                self.async_prefetch(page_id);
347            }
348        }
349    }
350
351    /// Asynchronous prefetch with throttling
352    pub fn async_prefetch(&self, page_id: usize) {
353        // Check if page is already in cache
354        {
355            let cache = self.page_cache.read();
356            if cache.contains(&page_id) {
357                return;
358            }
359        }
360
361        // Check memory pressure before prefetching
362        if *self.memory_pressure.read() >= MemoryPressure::High {
363            return;
364        }
365
366        let self_clone = self.clone_ref();
367        spawn(move || {
368            let _ = self_clone.get_page(page_id);
369        });
370    }
371
372    /// Check system memory pressure
373    fn check_memory_pressure(&self) {
374        let total_memory = self.total_memory.load(Ordering::Relaxed);
375        let max_memory = self.max_pages * self.page_size;
376
377        let pressure = if total_memory < max_memory / 2 {
378            MemoryPressure::Low
379        } else if total_memory < max_memory * 3 / 4 {
380            MemoryPressure::Medium
381        } else if total_memory < max_memory * 9 / 10 {
382            MemoryPressure::High
383        } else {
384            MemoryPressure::Critical
385        };
386
387        *self.memory_pressure.write() = pressure;
388    }
389
390    /// Evict pages based on eviction policy
391    fn evict_pages(&self, num_pages: usize) -> Result<()> {
392        match self.eviction_policy {
393            EvictionPolicy::LRU => self.evict_lru(num_pages),
394            EvictionPolicy::LFU => self.evict_lfu(num_pages),
395            EvictionPolicy::FIFO => self.evict_fifo(num_pages),
396            EvictionPolicy::Clock => self.evict_clock(num_pages),
397            EvictionPolicy::ARC => self.evict_arc(num_pages),
398        }
399    }
400
401    /// LRU eviction
402    fn evict_lru(&self, num_pages: usize) -> Result<()> {
403        let mut cache = self.page_cache.write();
404
405        // LruCache automatically evicts least recently used
406        for _ in 0..num_pages {
407            if let Some((_, entry)) = cache.pop_lru() {
408                self.total_memory
409                    .fetch_sub(entry.data.len(), Ordering::Relaxed);
410
411                // Write back if dirty
412                if entry.dirty {
413                    // TODO: Implement write-back
414                }
415            }
416        }
417
418        Ok(())
419    }
420
421    /// LFU eviction
422    fn evict_lfu(&self, num_pages: usize) -> Result<()> {
423        let cache = self.page_cache.read();
424        let freq = self.page_frequency.read();
425
426        // Sort pages by frequency
427        let mut pages_by_freq: Vec<(usize, usize)> = cache
428            .iter()
429            .map(|(page_id, _)| (*page_id, *freq.get(page_id).unwrap_or(&0)))
430            .collect();
431        pages_by_freq.sort_by_key(|(_, freq)| *freq);
432
433        // Evict least frequently used
434        drop(cache);
435        drop(freq);
436
437        let mut cache = self.page_cache.write();
438        for (page_id, _) in pages_by_freq.iter().take(num_pages) {
439            if let Some(entry) = cache.pop(page_id) {
440                self.total_memory
441                    .fetch_sub(entry.data.len(), Ordering::Relaxed);
442            }
443        }
444
445        Ok(())
446    }
447
448    /// FIFO eviction (not implemented - uses LRU as fallback)
449    fn evict_fifo(&self, num_pages: usize) -> Result<()> {
450        self.evict_lru(num_pages)
451    }
452
453    /// Clock algorithm eviction (not implemented - uses LRU as fallback)
454    fn evict_clock(&self, num_pages: usize) -> Result<()> {
455        self.evict_lru(num_pages)
456    }
457
458    /// ARC (Adaptive Replacement Cache) eviction
459    fn evict_arc(&self, num_pages: usize) -> Result<()> {
460        // Simplified ARC - combines recency and frequency
461        let cache = self.page_cache.read();
462        let freq = self.page_frequency.read();
463
464        // Score = recency * 0.5 + frequency * 0.5
465        let now = Instant::now();
466        let mut scored_pages: Vec<(usize, f64)> = cache
467            .iter()
468            .map(|(page_id, entry)| {
469                let recency_score =
470                    1.0 / (now.duration_since(entry.last_access).as_secs_f64() + 1.0);
471                let frequency_score = *freq.get(page_id).unwrap_or(&0) as f64;
472                let combined_score = recency_score * 0.5 + frequency_score * 0.5;
473                (*page_id, combined_score)
474            })
475            .collect();
476
477        scored_pages.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
478
479        drop(cache);
480        drop(freq);
481
482        let mut cache = self.page_cache.write();
483        for (page_id, _) in scored_pages.iter().take(num_pages) {
484            if let Some(entry) = cache.pop(page_id) {
485                self.total_memory
486                    .fetch_sub(entry.data.len(), Ordering::Relaxed);
487            }
488        }
489
490        Ok(())
491    }
492
493    /// Get cache statistics
494    pub fn stats(&self) -> MemoryMapStats {
495        let cache = self.page_cache.read();
496
497        MemoryMapStats {
498            total_pages: cache.len(),
499            total_memory: self.total_memory.load(Ordering::Relaxed),
500            cache_hits: self.cache_hits.load(Ordering::Relaxed),
501            cache_misses: self.cache_misses.load(Ordering::Relaxed),
502            hit_rate: self.calculate_hit_rate(),
503            memory_pressure: *self.memory_pressure.read(),
504            numa_enabled: self.numa_enabled,
505        }
506    }
507
508    fn calculate_hit_rate(&self) -> f64 {
509        let hits = self.cache_hits.load(Ordering::Relaxed) as f64;
510        let misses = self.cache_misses.load(Ordering::Relaxed) as f64;
511        let total = hits + misses;
512        if total > 0.0 {
513            hits / total
514        } else {
515            0.0
516        }
517    }
518
519    fn clone_ref(&self) -> Self {
520        Self {
521            mmap: None, // Don't clone the mmap
522            page_cache: Arc::clone(&self.page_cache),
523            access_patterns: Arc::clone(&self.access_patterns),
524            page_frequency: Arc::clone(&self.page_frequency),
525            eviction_policy: self.eviction_policy,
526            total_memory: AtomicUsize::new(0),
527            cache_hits: AtomicU64::new(0),
528            cache_misses: AtomicU64::new(0),
529            numa_enabled: self.numa_enabled,
530            numa_nodes: self.numa_nodes.clone(),
531            memory_pressure: Arc::clone(&self.memory_pressure),
532            max_pages: self.max_pages,
533            page_size: self.page_size,
534            prefetch_distance: self.prefetch_distance,
535        }
536    }
537}
538
539/// Statistics for memory-mapped storage
540#[derive(Debug, Clone)]
541pub struct MemoryMapStats {
542    pub total_pages: usize,
543    pub total_memory: usize,
544    pub cache_hits: u64,
545    pub cache_misses: u64,
546    pub hit_rate: f64,
547    pub memory_pressure: MemoryPressure,
548    pub numa_enabled: bool,
549}
550
551/// Get current CPU for NUMA operations
552#[cfg(target_os = "linux")]
553fn sched_getcpu() -> i32 {
554    unsafe { libc::sched_getcpu() }
555}
556
557#[cfg(not(target_os = "linux"))]
558fn sched_getcpu() -> i32 {
559    0
560}
561
562/// NUMA-aware vector allocator
563pub struct NumaVectorAllocator {
564    numa_nodes: Vec<i32>,
565    current_node: AtomicUsize,
566}
567
568impl Default for NumaVectorAllocator {
569    fn default() -> Self {
570        Self::new()
571    }
572}
573
574impl NumaVectorAllocator {
575    pub fn new() -> Self {
576        let numa_nodes = if numa::is_available() {
577            (0..=numa::max_node()).collect()
578        } else {
579            vec![0]
580        };
581
582        Self {
583            numa_nodes,
584            current_node: AtomicUsize::new(0),
585        }
586    }
587
588    /// Allocate vector memory on specific NUMA node
589    pub fn allocate_on_node(&self, size: usize, node: Option<i32>) -> Vec<u8> {
590        if !numa::is_available() {
591            return vec![0u8; size];
592        }
593
594        let _target_node = node.unwrap_or_else(|| {
595            // Round-robin allocation across NUMA nodes
596            let idx = self.current_node.fetch_add(1, Ordering::Relaxed) % self.numa_nodes.len();
597            self.numa_nodes[idx]
598        });
599
600        // For now, just use standard allocation
601        // TODO: Implement actual NUMA allocation when libc bindings are available
602        vec![0u8; size]
603    }
604
605    /// Allocate optimized vector with NUMA awareness (specialized for f32 vectors)
606    pub fn allocate_vector_on_node(&self, dimensions: usize, node: Option<i32>) -> Vec<f32> {
607        if !numa::is_available() {
608            // Pre-allocate with optimal alignment for SIMD operations
609            let mut vec = Vec::with_capacity(dimensions);
610            vec.resize(dimensions, 0.0f32);
611            return vec;
612        }
613
614        let _target_node = node.unwrap_or_else(|| {
615            // Use current CPU's NUMA node for better locality
616            self.preferred_node()
617        });
618
619        // For better performance, use aligned allocation
620        let mut vec = Vec::with_capacity(dimensions);
621        vec.resize(dimensions, 0.0f32);
622
623        // TODO: When NUMA bindings are available, use numa_alloc_onnode
624        // and bind the memory to the specific node
625
626        vec
627    }
628
629    /// Get preferred NUMA node for current thread
630    pub fn preferred_node(&self) -> i32 {
631        if numa::is_available() {
632            numa::node_of_cpu(sched_getcpu())
633        } else {
634            0
635        }
636    }
637}
638
639#[cfg(test)]
640mod tests {
641    use super::*;
642
643    #[test]
644    fn test_memory_pressure() {
645        let mmap = AdvancedMemoryMap::new(None, 100);
646
647        assert_eq!(*mmap.memory_pressure.read(), MemoryPressure::Low);
648
649        // Simulate memory usage
650        mmap.total_memory
651            .store(50 * VECTOR_PAGE_SIZE, Ordering::Relaxed);
652        mmap.check_memory_pressure();
653        assert_eq!(*mmap.memory_pressure.read(), MemoryPressure::Medium);
654
655        mmap.total_memory
656            .store(90 * VECTOR_PAGE_SIZE, Ordering::Relaxed);
657        mmap.check_memory_pressure();
658        assert_eq!(*mmap.memory_pressure.read(), MemoryPressure::Critical);
659    }
660
661    #[test]
662    fn test_cache_stats() {
663        let mmap = AdvancedMemoryMap::new(None, 100);
664
665        mmap.cache_hits.store(75, Ordering::Relaxed);
666        mmap.cache_misses.store(25, Ordering::Relaxed);
667
668        let stats = mmap.stats();
669        assert_eq!(stats.cache_hits, 75);
670        assert_eq!(stats.cache_misses, 25);
671        assert_eq!(stats.hit_rate, 0.75);
672    }
673}