1use anyhow::{bail, Result};
10use lru::LruCache;
11use memmap2::Mmap;
12use oxirs_core::parallel::*;
13use parking_lot::RwLock;
14use std::collections::{HashMap, VecDeque};
15use std::num::NonZeroUsize;
16use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::Instant;
19
20const VECTOR_PAGE_SIZE: usize = 16384;
22
23const DEFAULT_MAX_PAGES: usize = 10000;
25
26#[cfg(target_os = "linux")]
28mod numa {
29 use libc::{c_ulong, c_void};
30
31 extern "C" {
32 fn numa_available() -> i32;
33 fn numa_max_node() -> i32;
34 fn numa_node_of_cpu(cpu: i32) -> i32;
35 fn numa_alloc_onnode(size: usize, node: i32) -> *mut c_void;
36 fn numa_free(ptr: *mut c_void, size: usize);
37 fn mbind(
38 addr: *mut c_void,
39 len: c_ulong,
40 mode: i32,
41 nodemask: *const c_ulong,
42 maxnode: c_ulong,
43 flags: u32,
44 ) -> i32;
45 }
46
47 pub const MPOL_BIND: i32 = 2;
48 pub const MPOL_INTERLEAVE: i32 = 3;
49
50 pub fn is_available() -> bool {
51 unsafe { numa_available() >= 0 }
52 }
53
54 pub fn max_node() -> i32 {
55 unsafe { numa_max_node() }
56 }
57
58 pub fn node_of_cpu(cpu: i32) -> i32 {
59 unsafe { numa_node_of_cpu(cpu) }
60 }
61}
62
63#[cfg(not(target_os = "linux"))]
64mod numa {
65 pub fn is_available() -> bool {
66 false
67 }
68 pub fn max_node() -> i32 {
69 0
70 }
71 pub fn node_of_cpu(_cpu: i32) -> i32 {
72 0
73 }
74}
75
76#[derive(Debug, Clone)]
78struct AccessPattern {
79 page_id: usize,
80 access_time: Instant,
81 access_count: usize,
82}
83
84#[derive(Debug)]
86pub struct PageCacheEntry {
87 data: Vec<u8>,
88 page_id: usize,
89 last_access: Instant,
90 access_count: AtomicUsize,
91 dirty: bool,
92 numa_node: i32,
93}
94
95impl PageCacheEntry {
96 pub fn data(&self) -> &[u8] {
98 &self.data
99 }
100
101 pub fn numa_node(&self) -> i32 {
103 self.numa_node
104 }
105}
106
107#[derive(Debug, Clone, Copy)]
109pub enum EvictionPolicy {
110 LRU, LFU, FIFO, Clock, ARC, }
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
119pub enum MemoryPressure {
120 Low,
121 Medium,
122 High,
123 Critical,
124}
125
126pub struct AdvancedMemoryMap {
128 mmap: Option<Mmap>,
130
131 page_cache: Arc<RwLock<LruCache<usize, Arc<PageCacheEntry>>>>,
133
134 access_patterns: Arc<RwLock<VecDeque<AccessPattern>>>,
136
137 page_frequency: Arc<RwLock<HashMap<usize, usize>>>,
139
140 eviction_policy: EvictionPolicy,
142
143 total_memory: AtomicUsize,
145 cache_hits: AtomicU64,
146 cache_misses: AtomicU64,
147
148 numa_enabled: bool,
150 numa_nodes: Vec<i32>,
151
152 memory_pressure: Arc<RwLock<MemoryPressure>>,
154
155 max_pages: usize,
157 page_size: usize,
158 prefetch_distance: usize,
159}
160
161impl AdvancedMemoryMap {
162 pub fn new(mmap: Option<Mmap>, max_pages: usize) -> Self {
164 let numa_enabled = numa::is_available();
165 let numa_nodes = if numa_enabled {
166 (0..=numa::max_node()).collect()
167 } else {
168 vec![0]
169 };
170
171 let cache_size = NonZeroUsize::new(max_pages).unwrap_or(NonZeroUsize::new(1).unwrap());
172
173 Self {
174 mmap,
175 page_cache: Arc::new(RwLock::new(LruCache::new(cache_size))),
176 access_patterns: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
177 page_frequency: Arc::new(RwLock::new(HashMap::new())),
178 eviction_policy: EvictionPolicy::ARC,
179 total_memory: AtomicUsize::new(0),
180 cache_hits: AtomicU64::new(0),
181 cache_misses: AtomicU64::new(0),
182 numa_enabled,
183 numa_nodes,
184 memory_pressure: Arc::new(RwLock::new(MemoryPressure::Low)),
185 max_pages,
186 page_size: VECTOR_PAGE_SIZE,
187 prefetch_distance: 3,
188 }
189 }
190
191 pub fn get_page(&self, page_id: usize) -> Result<Arc<PageCacheEntry>> {
193 {
195 let mut cache = self.page_cache.write();
196 if let Some(entry) = cache.get(&page_id) {
197 self.cache_hits.fetch_add(1, Ordering::Relaxed);
198 entry.access_count.fetch_add(1, Ordering::Relaxed);
199 self.record_access(page_id);
200 return Ok(Arc::clone(entry));
201 }
202 }
203
204 self.cache_misses.fetch_add(1, Ordering::Relaxed);
206 self.load_page(page_id)
207 }
208
209 fn load_page(&self, page_id: usize) -> Result<Arc<PageCacheEntry>> {
211 let mmap = self
212 .mmap
213 .as_ref()
214 .ok_or_else(|| anyhow::anyhow!("No memory mapping available"))?;
215
216 let start = page_id * self.page_size;
217 let end = (start + self.page_size).min(mmap.len());
218
219 if start >= mmap.len() {
220 bail!("Page {} out of bounds", page_id);
221 }
222
223 let page_data = mmap[start..end].to_vec();
225
226 let numa_node = if self.numa_enabled {
228 let cpu = sched_getcpu();
229 numa::node_of_cpu(cpu)
230 } else {
231 0
232 };
233
234 let entry = Arc::new(PageCacheEntry {
235 data: page_data,
236 page_id,
237 last_access: Instant::now(),
238 access_count: AtomicUsize::new(1),
239 dirty: false,
240 numa_node,
241 });
242
243 self.check_memory_pressure();
245 if *self.memory_pressure.read() >= MemoryPressure::High {
246 self.evict_pages(1)?;
247 }
248
249 {
251 let mut cache = self.page_cache.write();
252 cache.put(page_id, Arc::clone(&entry));
253 }
254
255 self.total_memory
256 .fetch_add(entry.data.len(), Ordering::Relaxed);
257 self.record_access(page_id);
258
259 self.prefetch_pages(page_id);
261
262 Ok(entry)
263 }
264
265 fn record_access(&self, page_id: usize) {
267 let mut patterns = self.access_patterns.write();
268 patterns.push_back(AccessPattern {
269 page_id,
270 access_time: Instant::now(),
271 access_count: 1,
272 });
273
274 while patterns.len() > 1000 {
276 patterns.pop_front();
277 }
278
279 let mut freq = self.page_frequency.write();
281 *freq.entry(page_id).or_insert(0) += 1;
282 }
283
284 fn prefetch_pages(&self, current_page: usize) {
286 let patterns = self.access_patterns.read();
287 let freq = self.page_frequency.read();
288
289 let recent_patterns: Vec<_> = patterns.iter().rev().take(10).collect();
291
292 let is_sequential = recent_patterns
294 .windows(2)
295 .all(|w| w[0].page_id > 0 && w[0].page_id == w[1].page_id + 1);
296
297 let stride = if recent_patterns.len() >= 3 {
299 let diff1 = recent_patterns[0]
300 .page_id
301 .saturating_sub(recent_patterns[1].page_id);
302 let diff2 = recent_patterns[1]
303 .page_id
304 .saturating_sub(recent_patterns[2].page_id);
305 if diff1 == diff2 && diff1 > 0 && diff1 <= 10 {
306 Some(diff1)
307 } else {
308 None
309 }
310 } else {
311 None
312 };
313
314 if is_sequential {
316 for i in 1..=(self.prefetch_distance * 2) {
318 let prefetch_page = current_page + i;
319 self.async_prefetch(prefetch_page);
320 }
321 } else if let Some(stride) = stride {
322 for i in 1..=self.prefetch_distance {
324 let prefetch_page = current_page + (i * stride);
325 self.async_prefetch(prefetch_page);
326 }
327 } else {
328 for i in 1..=self.prefetch_distance {
330 let prefetch_page = current_page + i;
331
332 let frequency = *freq.get(&prefetch_page).unwrap_or(&0);
334 if frequency > 0 {
335 self.async_prefetch(prefetch_page);
336 }
337 }
338 }
339
340 let nearby_range = current_page.saturating_sub(3)..=(current_page + 3);
342 for page_id in nearby_range {
343 let frequency = *freq.get(&page_id).unwrap_or(&0);
344 if frequency > 2 && page_id != current_page {
345 self.async_prefetch(page_id);
346 }
347 }
348 }
349
350 pub fn async_prefetch(&self, page_id: usize) {
352 {
354 let cache = self.page_cache.read();
355 if cache.contains(&page_id) {
356 return;
357 }
358 }
359
360 if *self.memory_pressure.read() >= MemoryPressure::High {
362 return;
363 }
364
365 let self_clone = self.clone_ref();
366 spawn(move || {
367 let _ = self_clone.get_page(page_id);
368 });
369 }
370
371 fn check_memory_pressure(&self) {
373 let total_memory = self.total_memory.load(Ordering::Relaxed);
374 let max_memory = self.max_pages * self.page_size;
375
376 let pressure = if total_memory < max_memory / 2 {
377 MemoryPressure::Low
378 } else if total_memory < max_memory * 3 / 4 {
379 MemoryPressure::Medium
380 } else if total_memory < max_memory * 9 / 10 {
381 MemoryPressure::High
382 } else {
383 MemoryPressure::Critical
384 };
385
386 *self.memory_pressure.write() = pressure;
387 }
388
389 fn evict_pages(&self, num_pages: usize) -> Result<()> {
391 match self.eviction_policy {
392 EvictionPolicy::LRU => self.evict_lru(num_pages),
393 EvictionPolicy::LFU => self.evict_lfu(num_pages),
394 EvictionPolicy::FIFO => self.evict_fifo(num_pages),
395 EvictionPolicy::Clock => self.evict_clock(num_pages),
396 EvictionPolicy::ARC => self.evict_arc(num_pages),
397 }
398 }
399
400 fn evict_lru(&self, num_pages: usize) -> Result<()> {
402 let mut cache = self.page_cache.write();
403
404 for _ in 0..num_pages {
406 if let Some((_, entry)) = cache.pop_lru() {
407 self.total_memory
408 .fetch_sub(entry.data.len(), Ordering::Relaxed);
409
410 if entry.dirty {
412 }
414 }
415 }
416
417 Ok(())
418 }
419
420 fn evict_lfu(&self, num_pages: usize) -> Result<()> {
422 let cache = self.page_cache.read();
423 let freq = self.page_frequency.read();
424
425 let mut pages_by_freq: Vec<(usize, usize)> = cache
427 .iter()
428 .map(|(page_id, _)| (*page_id, *freq.get(page_id).unwrap_or(&0)))
429 .collect();
430 pages_by_freq.sort_by_key(|(_, freq)| *freq);
431
432 drop(cache);
434 drop(freq);
435
436 let mut cache = self.page_cache.write();
437 for (page_id, _) in pages_by_freq.iter().take(num_pages) {
438 if let Some(entry) = cache.pop(page_id) {
439 self.total_memory
440 .fetch_sub(entry.data.len(), Ordering::Relaxed);
441 }
442 }
443
444 Ok(())
445 }
446
447 fn evict_fifo(&self, num_pages: usize) -> Result<()> {
449 self.evict_lru(num_pages)
450 }
451
452 fn evict_clock(&self, num_pages: usize) -> Result<()> {
454 self.evict_lru(num_pages)
455 }
456
457 fn evict_arc(&self, num_pages: usize) -> Result<()> {
459 let cache = self.page_cache.read();
461 let freq = self.page_frequency.read();
462
463 let now = Instant::now();
465 let mut scored_pages: Vec<(usize, f64)> = cache
466 .iter()
467 .map(|(page_id, entry)| {
468 let recency_score =
469 1.0 / (now.duration_since(entry.last_access).as_secs_f64() + 1.0);
470 let frequency_score = *freq.get(page_id).unwrap_or(&0) as f64;
471 let combined_score = recency_score * 0.5 + frequency_score * 0.5;
472 (*page_id, combined_score)
473 })
474 .collect();
475
476 scored_pages.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
477
478 drop(cache);
479 drop(freq);
480
481 let mut cache = self.page_cache.write();
482 for (page_id, _) in scored_pages.iter().take(num_pages) {
483 if let Some(entry) = cache.pop(page_id) {
484 self.total_memory
485 .fetch_sub(entry.data.len(), Ordering::Relaxed);
486 }
487 }
488
489 Ok(())
490 }
491
492 pub fn stats(&self) -> MemoryMapStats {
494 let cache = self.page_cache.read();
495
496 MemoryMapStats {
497 total_pages: cache.len(),
498 total_memory: self.total_memory.load(Ordering::Relaxed),
499 cache_hits: self.cache_hits.load(Ordering::Relaxed),
500 cache_misses: self.cache_misses.load(Ordering::Relaxed),
501 hit_rate: self.calculate_hit_rate(),
502 memory_pressure: *self.memory_pressure.read(),
503 numa_enabled: self.numa_enabled,
504 }
505 }
506
507 fn calculate_hit_rate(&self) -> f64 {
508 let hits = self.cache_hits.load(Ordering::Relaxed) as f64;
509 let misses = self.cache_misses.load(Ordering::Relaxed) as f64;
510 let total = hits + misses;
511 if total > 0.0 {
512 hits / total
513 } else {
514 0.0
515 }
516 }
517
518 fn clone_ref(&self) -> Self {
519 Self {
520 mmap: None, page_cache: Arc::clone(&self.page_cache),
522 access_patterns: Arc::clone(&self.access_patterns),
523 page_frequency: Arc::clone(&self.page_frequency),
524 eviction_policy: self.eviction_policy,
525 total_memory: AtomicUsize::new(0),
526 cache_hits: AtomicU64::new(0),
527 cache_misses: AtomicU64::new(0),
528 numa_enabled: self.numa_enabled,
529 numa_nodes: self.numa_nodes.clone(),
530 memory_pressure: Arc::clone(&self.memory_pressure),
531 max_pages: self.max_pages,
532 page_size: self.page_size,
533 prefetch_distance: self.prefetch_distance,
534 }
535 }
536}
537
538#[derive(Debug, Clone)]
540pub struct MemoryMapStats {
541 pub total_pages: usize,
542 pub total_memory: usize,
543 pub cache_hits: u64,
544 pub cache_misses: u64,
545 pub hit_rate: f64,
546 pub memory_pressure: MemoryPressure,
547 pub numa_enabled: bool,
548}
549
550#[cfg(target_os = "linux")]
552fn sched_getcpu() -> i32 {
553 unsafe { libc::sched_getcpu() }
554}
555
556#[cfg(not(target_os = "linux"))]
557fn sched_getcpu() -> i32 {
558 0
559}
560
561pub struct NumaVectorAllocator {
563 numa_nodes: Vec<i32>,
564 current_node: AtomicUsize,
565}
566
567impl Default for NumaVectorAllocator {
568 fn default() -> Self {
569 Self::new()
570 }
571}
572
573impl NumaVectorAllocator {
574 pub fn new() -> Self {
575 let numa_nodes = if numa::is_available() {
576 (0..=numa::max_node()).collect()
577 } else {
578 vec![0]
579 };
580
581 Self {
582 numa_nodes,
583 current_node: AtomicUsize::new(0),
584 }
585 }
586
587 pub fn allocate_on_node(&self, size: usize, node: Option<i32>) -> Vec<u8> {
589 if !numa::is_available() {
590 return vec![0u8; size];
591 }
592
593 let _target_node = node.unwrap_or_else(|| {
594 let idx = self.current_node.fetch_add(1, Ordering::Relaxed) % self.numa_nodes.len();
596 self.numa_nodes[idx]
597 });
598
599 vec![0u8; size]
602 }
603
604 pub fn allocate_vector_on_node(&self, dimensions: usize, node: Option<i32>) -> Vec<f32> {
606 if !numa::is_available() {
607 let mut vec = Vec::with_capacity(dimensions);
609 vec.resize(dimensions, 0.0f32);
610 return vec;
611 }
612
613 let _target_node = node.unwrap_or_else(|| {
614 self.preferred_node()
616 });
617
618 let mut vec = Vec::with_capacity(dimensions);
620 vec.resize(dimensions, 0.0f32);
621
622 vec
626 }
627
628 pub fn preferred_node(&self) -> i32 {
630 if numa::is_available() {
631 numa::node_of_cpu(sched_getcpu())
632 } else {
633 0
634 }
635 }
636}
637
638#[cfg(test)]
639mod tests {
640 use super::*;
641
642 #[test]
643 fn test_memory_pressure() {
644 let mmap = AdvancedMemoryMap::new(None, 100);
645
646 assert_eq!(*mmap.memory_pressure.read(), MemoryPressure::Low);
647
648 mmap.total_memory
650 .store(50 * VECTOR_PAGE_SIZE, Ordering::Relaxed);
651 mmap.check_memory_pressure();
652 assert_eq!(*mmap.memory_pressure.read(), MemoryPressure::Medium);
653
654 mmap.total_memory
655 .store(90 * VECTOR_PAGE_SIZE, Ordering::Relaxed);
656 mmap.check_memory_pressure();
657 assert_eq!(*mmap.memory_pressure.read(), MemoryPressure::Critical);
658 }
659
660 #[test]
661 fn test_cache_stats() {
662 let mmap = AdvancedMemoryMap::new(None, 100);
663
664 mmap.cache_hits.store(75, Ordering::Relaxed);
665 mmap.cache_misses.store(25, Ordering::Relaxed);
666
667 let stats = mmap.stats();
668 assert_eq!(stats.cache_hits, 75);
669 assert_eq!(stats.cache_misses, 25);
670 assert_eq!(stats.hit_rate, 0.75);
671 }
672}