1use anyhow::{bail, Result};
10use lru::LruCache;
11use memmap2::Mmap;
12use oxirs_core::parallel::*;
13use parking_lot::RwLock;
14use std::collections::{HashMap, VecDeque};
15use std::num::NonZeroUsize;
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::Instant;
19
20const VECTOR_PAGE_SIZE: usize = 16384;
22
23const DEFAULT_MAX_PAGES: usize = 10000;
25
26#[cfg(target_os = "linux")]
28mod numa {
29 use libc::{c_ulong, c_void};
30
31 extern "C" {
32 fn numa_available() -> i32;
33 fn numa_max_node() -> i32;
34 fn numa_node_of_cpu(cpu: i32) -> i32;
35 fn numa_alloc_onnode(size: usize, node: i32) -> *mut c_void;
36 fn numa_free(ptr: *mut c_void, size: usize);
37 fn mbind(
38 addr: *mut c_void,
39 len: c_ulong,
40 mode: i32,
41 nodemask: *const c_ulong,
42 maxnode: c_ulong,
43 flags: u32,
44 ) -> i32;
45 }
46
47 pub const MPOL_BIND: i32 = 2;
48 pub const MPOL_INTERLEAVE: i32 = 3;
49
50 pub fn is_available() -> bool {
51 unsafe { numa_available() >= 0 }
52 }
53
54 pub fn max_node() -> i32 {
55 unsafe { numa_max_node() }
56 }
57
58 pub fn node_of_cpu(cpu: i32) -> i32 {
59 unsafe { numa_node_of_cpu(cpu) }
60 }
61}
62
63#[cfg(not(target_os = "linux"))]
64mod numa {
65 pub fn is_available() -> bool {
66 false
67 }
68 pub fn max_node() -> i32 {
69 0
70 }
71 pub fn node_of_cpu(_cpu: i32) -> i32 {
72 0
73 }
74}
75
76#[derive(Debug, Clone)]
78struct AccessPattern {
79 page_id: usize,
80 access_time: Instant,
81 access_count: usize,
82}
83
84#[derive(Debug)]
86pub struct PageCacheEntry {
87 data: Vec<u8>,
88 page_id: usize,
89 last_access: Instant,
90 access_count: AtomicUsize,
91 dirty: bool,
92 numa_node: i32,
93}
94
95impl PageCacheEntry {
96 pub fn data(&self) -> &[u8] {
98 &self.data
99 }
100
101 pub fn numa_node(&self) -> i32 {
103 self.numa_node
104 }
105}
106
107#[derive(Debug, Clone, Copy)]
109pub enum EvictionPolicy {
110 LRU, LFU, FIFO, Clock, ARC, }
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
119pub enum MemoryPressure {
120 Low,
121 Medium,
122 High,
123 Critical,
124}
125
126pub struct AdvancedMemoryMap {
128 mmap: Option<Mmap>,
130
131 page_cache: Arc<RwLock<LruCache<usize, Arc<PageCacheEntry>>>>,
133
134 access_patterns: Arc<RwLock<VecDeque<AccessPattern>>>,
136
137 page_frequency: Arc<RwLock<HashMap<usize, usize>>>,
139
140 eviction_policy: EvictionPolicy,
142
143 total_memory: AtomicUsize,
145 cache_hits: AtomicU64,
146 cache_misses: AtomicU64,
147
148 numa_enabled: bool,
150 numa_nodes: Vec<i32>,
151
152 memory_pressure: Arc<RwLock<MemoryPressure>>,
154
155 max_pages: usize,
157 page_size: usize,
158 prefetch_distance: usize,
159}
160
161impl AdvancedMemoryMap {
162 pub fn new(mmap: Option<Mmap>, max_pages: usize) -> Self {
164 let numa_enabled = numa::is_available();
165 let numa_nodes = if numa_enabled {
166 (0..=numa::max_node()).collect()
167 } else {
168 vec![0]
169 };
170
171 let cache_size = NonZeroUsize::new(max_pages)
172 .unwrap_or(NonZeroUsize::new(1).expect("constant 1 is non-zero"));
173
174 Self {
175 mmap,
176 page_cache: Arc::new(RwLock::new(LruCache::new(cache_size))),
177 access_patterns: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
178 page_frequency: Arc::new(RwLock::new(HashMap::new())),
179 eviction_policy: EvictionPolicy::ARC,
180 total_memory: AtomicUsize::new(0),
181 cache_hits: AtomicU64::new(0),
182 cache_misses: AtomicU64::new(0),
183 numa_enabled,
184 numa_nodes,
185 memory_pressure: Arc::new(RwLock::new(MemoryPressure::Low)),
186 max_pages,
187 page_size: VECTOR_PAGE_SIZE,
188 prefetch_distance: 3,
189 }
190 }
191
192 pub fn get_page(&self, page_id: usize) -> Result<Arc<PageCacheEntry>> {
194 {
196 let mut cache = self.page_cache.write();
197 if let Some(entry) = cache.get(&page_id) {
198 self.cache_hits.fetch_add(1, Ordering::Relaxed);
199 entry.access_count.fetch_add(1, Ordering::Relaxed);
200 self.record_access(page_id);
201 return Ok(Arc::clone(entry));
202 }
203 }
204
205 self.cache_misses.fetch_add(1, Ordering::Relaxed);
207 self.load_page(page_id)
208 }
209
210 fn load_page(&self, page_id: usize) -> Result<Arc<PageCacheEntry>> {
212 let mmap = self
213 .mmap
214 .as_ref()
215 .ok_or_else(|| anyhow::anyhow!("No memory mapping available"))?;
216
217 let start = page_id * self.page_size;
218 let end = (start + self.page_size).min(mmap.len());
219
220 if start >= mmap.len() {
221 bail!("Page {} out of bounds", page_id);
222 }
223
224 let page_data = mmap[start..end].to_vec();
226
227 let numa_node = if self.numa_enabled {
229 let cpu = sched_getcpu();
230 numa::node_of_cpu(cpu)
231 } else {
232 0
233 };
234
235 let entry = Arc::new(PageCacheEntry {
236 data: page_data,
237 page_id,
238 last_access: Instant::now(),
239 access_count: AtomicUsize::new(1),
240 dirty: false,
241 numa_node,
242 });
243
244 self.check_memory_pressure();
246 if *self.memory_pressure.read() >= MemoryPressure::High {
247 self.evict_pages(1)?;
248 }
249
250 {
252 let mut cache = self.page_cache.write();
253 cache.put(page_id, Arc::clone(&entry));
254 }
255
256 self.total_memory
257 .fetch_add(entry.data.len(), Ordering::Relaxed);
258 self.record_access(page_id);
259
260 self.prefetch_pages(page_id);
262
263 Ok(entry)
264 }
265
266 fn record_access(&self, page_id: usize) {
268 let mut patterns = self.access_patterns.write();
269 patterns.push_back(AccessPattern {
270 page_id,
271 access_time: Instant::now(),
272 access_count: 1,
273 });
274
275 while patterns.len() > 1000 {
277 patterns.pop_front();
278 }
279
280 let mut freq = self.page_frequency.write();
282 *freq.entry(page_id).or_insert(0) += 1;
283 }
284
285 fn prefetch_pages(&self, current_page: usize) {
287 let patterns = self.access_patterns.read();
288 let freq = self.page_frequency.read();
289
290 let recent_patterns: Vec<_> = patterns.iter().rev().take(10).collect();
292
293 let is_sequential = recent_patterns
295 .windows(2)
296 .all(|w| w[0].page_id > 0 && w[0].page_id == w[1].page_id + 1);
297
298 let stride = if recent_patterns.len() >= 3 {
300 let diff1 = recent_patterns[0]
301 .page_id
302 .saturating_sub(recent_patterns[1].page_id);
303 let diff2 = recent_patterns[1]
304 .page_id
305 .saturating_sub(recent_patterns[2].page_id);
306 if diff1 == diff2 && diff1 > 0 && diff1 <= 10 {
307 Some(diff1)
308 } else {
309 None
310 }
311 } else {
312 None
313 };
314
315 if is_sequential {
317 for i in 1..=(self.prefetch_distance * 2) {
319 let prefetch_page = current_page + i;
320 self.async_prefetch(prefetch_page);
321 }
322 } else if let Some(stride) = stride {
323 for i in 1..=self.prefetch_distance {
325 let prefetch_page = current_page + (i * stride);
326 self.async_prefetch(prefetch_page);
327 }
328 } else {
329 for i in 1..=self.prefetch_distance {
331 let prefetch_page = current_page + i;
332
333 let frequency = *freq.get(&prefetch_page).unwrap_or(&0);
335 if frequency > 0 {
336 self.async_prefetch(prefetch_page);
337 }
338 }
339 }
340
341 let nearby_range = current_page.saturating_sub(3)..=(current_page + 3);
343 for page_id in nearby_range {
344 let frequency = *freq.get(&page_id).unwrap_or(&0);
345 if frequency > 2 && page_id != current_page {
346 self.async_prefetch(page_id);
347 }
348 }
349 }
350
351 pub fn async_prefetch(&self, page_id: usize) {
353 {
355 let cache = self.page_cache.read();
356 if cache.contains(&page_id) {
357 return;
358 }
359 }
360
361 if *self.memory_pressure.read() >= MemoryPressure::High {
363 return;
364 }
365
366 let self_clone = self.clone_ref();
367 spawn(move || {
368 let _ = self_clone.get_page(page_id);
369 });
370 }
371
372 fn check_memory_pressure(&self) {
374 let total_memory = self.total_memory.load(Ordering::Relaxed);
375 let max_memory = self.max_pages * self.page_size;
376
377 let pressure = if total_memory < max_memory / 2 {
378 MemoryPressure::Low
379 } else if total_memory < max_memory * 3 / 4 {
380 MemoryPressure::Medium
381 } else if total_memory < max_memory * 9 / 10 {
382 MemoryPressure::High
383 } else {
384 MemoryPressure::Critical
385 };
386
387 *self.memory_pressure.write() = pressure;
388 }
389
390 fn evict_pages(&self, num_pages: usize) -> Result<()> {
392 match self.eviction_policy {
393 EvictionPolicy::LRU => self.evict_lru(num_pages),
394 EvictionPolicy::LFU => self.evict_lfu(num_pages),
395 EvictionPolicy::FIFO => self.evict_fifo(num_pages),
396 EvictionPolicy::Clock => self.evict_clock(num_pages),
397 EvictionPolicy::ARC => self.evict_arc(num_pages),
398 }
399 }
400
401 fn evict_lru(&self, num_pages: usize) -> Result<()> {
403 let mut cache = self.page_cache.write();
404
405 for _ in 0..num_pages {
407 if let Some((_, entry)) = cache.pop_lru() {
408 self.total_memory
409 .fetch_sub(entry.data.len(), Ordering::Relaxed);
410
411 if entry.dirty {
413 }
415 }
416 }
417
418 Ok(())
419 }
420
421 fn evict_lfu(&self, num_pages: usize) -> Result<()> {
423 let cache = self.page_cache.read();
424 let freq = self.page_frequency.read();
425
426 let mut pages_by_freq: Vec<(usize, usize)> = cache
428 .iter()
429 .map(|(page_id, _)| (*page_id, *freq.get(page_id).unwrap_or(&0)))
430 .collect();
431 pages_by_freq.sort_by_key(|(_, freq)| *freq);
432
433 drop(cache);
435 drop(freq);
436
437 let mut cache = self.page_cache.write();
438 for (page_id, _) in pages_by_freq.iter().take(num_pages) {
439 if let Some(entry) = cache.pop(page_id) {
440 self.total_memory
441 .fetch_sub(entry.data.len(), Ordering::Relaxed);
442 }
443 }
444
445 Ok(())
446 }
447
448 fn evict_fifo(&self, num_pages: usize) -> Result<()> {
450 self.evict_lru(num_pages)
451 }
452
453 fn evict_clock(&self, num_pages: usize) -> Result<()> {
455 self.evict_lru(num_pages)
456 }
457
458 fn evict_arc(&self, num_pages: usize) -> Result<()> {
460 let cache = self.page_cache.read();
462 let freq = self.page_frequency.read();
463
464 let now = Instant::now();
466 let mut scored_pages: Vec<(usize, f64)> = cache
467 .iter()
468 .map(|(page_id, entry)| {
469 let recency_score =
470 1.0 / (now.duration_since(entry.last_access).as_secs_f64() + 1.0);
471 let frequency_score = *freq.get(page_id).unwrap_or(&0) as f64;
472 let combined_score = recency_score * 0.5 + frequency_score * 0.5;
473 (*page_id, combined_score)
474 })
475 .collect();
476
477 scored_pages.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal));
478
479 drop(cache);
480 drop(freq);
481
482 let mut cache = self.page_cache.write();
483 for (page_id, _) in scored_pages.iter().take(num_pages) {
484 if let Some(entry) = cache.pop(page_id) {
485 self.total_memory
486 .fetch_sub(entry.data.len(), Ordering::Relaxed);
487 }
488 }
489
490 Ok(())
491 }
492
493 pub fn stats(&self) -> MemoryMapStats {
495 let cache = self.page_cache.read();
496
497 MemoryMapStats {
498 total_pages: cache.len(),
499 total_memory: self.total_memory.load(Ordering::Relaxed),
500 cache_hits: self.cache_hits.load(Ordering::Relaxed),
501 cache_misses: self.cache_misses.load(Ordering::Relaxed),
502 hit_rate: self.calculate_hit_rate(),
503 memory_pressure: *self.memory_pressure.read(),
504 numa_enabled: self.numa_enabled,
505 }
506 }
507
508 fn calculate_hit_rate(&self) -> f64 {
509 let hits = self.cache_hits.load(Ordering::Relaxed) as f64;
510 let misses = self.cache_misses.load(Ordering::Relaxed) as f64;
511 let total = hits + misses;
512 if total > 0.0 {
513 hits / total
514 } else {
515 0.0
516 }
517 }
518
519 fn clone_ref(&self) -> Self {
520 Self {
521 mmap: None, page_cache: Arc::clone(&self.page_cache),
523 access_patterns: Arc::clone(&self.access_patterns),
524 page_frequency: Arc::clone(&self.page_frequency),
525 eviction_policy: self.eviction_policy,
526 total_memory: AtomicUsize::new(0),
527 cache_hits: AtomicU64::new(0),
528 cache_misses: AtomicU64::new(0),
529 numa_enabled: self.numa_enabled,
530 numa_nodes: self.numa_nodes.clone(),
531 memory_pressure: Arc::clone(&self.memory_pressure),
532 max_pages: self.max_pages,
533 page_size: self.page_size,
534 prefetch_distance: self.prefetch_distance,
535 }
536 }
537}
538
539#[derive(Debug, Clone)]
541pub struct MemoryMapStats {
542 pub total_pages: usize,
543 pub total_memory: usize,
544 pub cache_hits: u64,
545 pub cache_misses: u64,
546 pub hit_rate: f64,
547 pub memory_pressure: MemoryPressure,
548 pub numa_enabled: bool,
549}
550
551#[cfg(target_os = "linux")]
553fn sched_getcpu() -> i32 {
554 unsafe { libc::sched_getcpu() }
555}
556
557#[cfg(not(target_os = "linux"))]
558fn sched_getcpu() -> i32 {
559 0
560}
561
562pub struct NumaVectorAllocator {
564 numa_nodes: Vec<i32>,
565 current_node: AtomicUsize,
566}
567
568impl Default for NumaVectorAllocator {
569 fn default() -> Self {
570 Self::new()
571 }
572}
573
574impl NumaVectorAllocator {
575 pub fn new() -> Self {
576 let numa_nodes = if numa::is_available() {
577 (0..=numa::max_node()).collect()
578 } else {
579 vec![0]
580 };
581
582 Self {
583 numa_nodes,
584 current_node: AtomicUsize::new(0),
585 }
586 }
587
588 pub fn allocate_on_node(&self, size: usize, node: Option<i32>) -> Vec<u8> {
590 if !numa::is_available() {
591 return vec![0u8; size];
592 }
593
594 let _target_node = node.unwrap_or_else(|| {
595 let idx = self.current_node.fetch_add(1, Ordering::Relaxed) % self.numa_nodes.len();
597 self.numa_nodes[idx]
598 });
599
600 vec![0u8; size]
603 }
604
605 pub fn allocate_vector_on_node(&self, dimensions: usize, node: Option<i32>) -> Vec<f32> {
607 if !numa::is_available() {
608 let mut vec = Vec::with_capacity(dimensions);
610 vec.resize(dimensions, 0.0f32);
611 return vec;
612 }
613
614 let _target_node = node.unwrap_or_else(|| {
615 self.preferred_node()
617 });
618
619 let mut vec = Vec::with_capacity(dimensions);
621 vec.resize(dimensions, 0.0f32);
622
623 vec
627 }
628
629 pub fn preferred_node(&self) -> i32 {
631 if numa::is_available() {
632 numa::node_of_cpu(sched_getcpu())
633 } else {
634 0
635 }
636 }
637}
638
639#[cfg(test)]
640mod tests {
641 use super::*;
642
643 #[test]
644 fn test_memory_pressure() {
645 let mmap = AdvancedMemoryMap::new(None, 100);
646
647 assert_eq!(*mmap.memory_pressure.read(), MemoryPressure::Low);
648
649 mmap.total_memory
651 .store(50 * VECTOR_PAGE_SIZE, Ordering::Relaxed);
652 mmap.check_memory_pressure();
653 assert_eq!(*mmap.memory_pressure.read(), MemoryPressure::Medium);
654
655 mmap.total_memory
656 .store(90 * VECTOR_PAGE_SIZE, Ordering::Relaxed);
657 mmap.check_memory_pressure();
658 assert_eq!(*mmap.memory_pressure.read(), MemoryPressure::Critical);
659 }
660
661 #[test]
662 fn test_cache_stats() {
663 let mmap = AdvancedMemoryMap::new(None, 100);
664
665 mmap.cache_hits.store(75, Ordering::Relaxed);
666 mmap.cache_misses.store(25, Ordering::Relaxed);
667
668 let stats = mmap.stats();
669 assert_eq!(stats.cache_hits, 75);
670 assert_eq!(stats.cache_misses, 25);
671 assert_eq!(stats.hit_rate, 0.75);
672 }
673}