scirs2_core/memory_efficient/
prefetch.rs

1//! Smart prefetching for memory-mapped arrays.
2//!
3//! This module provides intelligent prefetching capabilities for memory-mapped arrays,
4//! which can significantly improve performance for workloads with predictable access patterns.
5//! By analyzing access patterns and prefetching blocks that are likely to be needed soon,
6//! the system can reduce latency and improve throughput.
7//!
8//! The prefetching system supports:
9//! - Automatic detection of sequential access patterns
10//! - Recognition of strided access patterns
11//! - Adaptive prefetching based on historical access patterns
12//! - Integration with the block cache system to manage prefetched blocks
13
14use std::collections::{HashSet, VecDeque};
15#[cfg(feature = "memory_compression")]
16use std::sync::{Arc, Mutex};
17use std::time::{Duration, Instant};
18
19#[cfg(feature = "memory_compression")]
20use super::compressed_memmap::CompressedMemMappedArray;
21use crate::error::CoreResult;
22#[cfg(feature = "memory_compression")]
23use crate::error::{CoreError, ErrorContext};
24
25/// Types of access patterns that can be detected and prefetched.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum AccessPattern {
28    /// Sequential access (consecutive blocks)
29    Sequential,
30
31    /// Strided access (blocks with a fixed stride)
32    Strided(usize),
33
34    /// Random access (no discernable pattern)
35    Random,
36
37    /// Custom pattern defined by a specific sequence of offsets
38    Custom,
39}
40
41/// Configuration for the prefetching system.
42#[derive(Debug, Clone)]
43pub struct PrefetchConfig {
44    /// Whether prefetching is enabled
45    pub enabled: bool,
46
47    /// Number of blocks to prefetch ahead of the current access
48    pub prefetch_count: usize,
49
50    /// Maximum number of blocks to keep in the prefetch history
51    pub history_size: usize,
52
53    /// Minimum number of accesses needed to detect a pattern
54    pub min_pattern_length: usize,
55
56    /// Whether to prefetch in a background thread
57    pub async_prefetch: bool,
58
59    /// Timeout for prefetch operations (to avoid blocking too long)
60    pub prefetch_timeout: Duration,
61}
62
63impl Default for PrefetchConfig {
64    fn default() -> Self {
65        Self {
66            enabled: true,
67            prefetch_count: 2,
68            history_size: 32,
69            min_pattern_length: 4,
70            async_prefetch: true,
71            prefetch_timeout: Duration::from_millis(100),
72        }
73    }
74}
75
76/// Builder for prefetch configuration.
77#[derive(Debug, Clone, Default)]
78pub struct PrefetchConfigBuilder {
79    config: PrefetchConfig,
80}
81
82impl PrefetchConfigBuilder {
83    /// Create a new prefetch config builder with default settings.
84    pub fn new() -> Self {
85        Self::default()
86    }
87
88    /// Enable or disable prefetching.
89    pub const fn enabled(mut self, enabled: bool) -> Self {
90        self.config.enabled = enabled;
91        self
92    }
93
94    /// Set the number of blocks to prefetch ahead of the current access.
95    pub const fn prefetch_count(mut self, count: usize) -> Self {
96        self.config.prefetch_count = count;
97        self
98    }
99
100    /// Set the maximum number of blocks to keep in the prefetch history.
101    pub const fn history_size(mut self, size: usize) -> Self {
102        self.config.history_size = size;
103        self
104    }
105
106    /// Set the minimum number of accesses needed to detect a pattern.
107    pub const fn min_pattern_length(mut self, length: usize) -> Self {
108        self.config.min_pattern_length = length;
109        self
110    }
111
112    /// Enable or disable asynchronous prefetching.
113    pub const fn async_prefetch(mut self, asyncprefetch: bool) -> Self {
114        self.config.async_prefetch = asyncprefetch;
115        self
116    }
117
118    /// Set the timeout for prefetch operations.
119    pub const fn prefetch_timeout(mut self, timeout: Duration) -> Self {
120        self.config.prefetch_timeout = timeout;
121        self
122    }
123
124    /// Build the prefetch configuration.
125    pub fn build(self) -> PrefetchConfig {
126        self.config
127    }
128}
129
130/// Trait for tracking and predicting access patterns.
131pub trait AccessPatternTracker: std::fmt::Debug {
132    /// Record an access to a block.
133    fn record_access(&mut self, blockidx: usize);
134
135    /// Predict which blocks will be accessed next.
136    fn predict_next_blocks(&self, count: usize) -> Vec<usize>;
137
138    /// Get the current detected access pattern.
139    fn current_pattern(&self) -> AccessPattern;
140
141    /// Clear the access history.
142    fn clear_history(&mut self);
143}
144
145/// Implementation of access pattern tracking.
146#[derive(Debug)]
147pub struct BlockAccessTracker {
148    /// Configuration for the tracker
149    config: PrefetchConfig,
150
151    /// History of accessed blocks
152    history: VecDeque<usize>,
153
154    /// The currently detected pattern
155    current_pattern: AccessPattern,
156
157    /// For strided patterns, the stride value
158    stride: Option<usize>,
159
160    /// Last time the pattern was updated
161    last_update: Instant,
162}
163
164impl BlockAccessTracker {
165    /// Create a new block access tracker with the given configuration.
166    pub fn new(config: PrefetchConfig) -> Self {
167        let history_size = config.history_size;
168        Self {
169            config,
170            history: VecDeque::with_capacity(history_size),
171            current_pattern: AccessPattern::Random,
172            stride: None,
173            last_update: Instant::now(),
174        }
175    }
176
177    /// Detect the access pattern based on the history.
178    fn detect_pattern(&mut self) {
179        if self.history.len() < self.config.min_pattern_length {
180            // Not enough history to detect a pattern
181            self.current_pattern = AccessPattern::Random;
182            return;
183        }
184
185        // Check for sequential access
186        let mut is_sequential = true;
187        let mut prev = *self.history.front().expect("Operation failed");
188
189        for &block_idx in self.history.iter().skip(1) {
190            if block_idx != prev + 1 {
191                is_sequential = false;
192                break;
193            }
194            prev = block_idx;
195        }
196
197        if is_sequential {
198            self.current_pattern = AccessPattern::Sequential;
199            return;
200        }
201
202        // Check for strided access
203        let mut is_strided = true;
204        let stride = self.history.get(1).expect("Operation failed")
205            - self.history.front().expect("Operation failed");
206        prev = *self.history.front().expect("Operation failed");
207
208        for &block_idx in self.history.iter().skip(1) {
209            if block_idx != prev + stride {
210                is_strided = false;
211                break;
212            }
213            prev = block_idx;
214        }
215
216        if is_strided {
217            self.current_pattern = AccessPattern::Strided(stride);
218            self.stride = Some(stride);
219            return;
220        }
221
222        // If no pattern detected, mark as random
223        self.current_pattern = AccessPattern::Random;
224    }
225}
226
227impl AccessPatternTracker for BlockAccessTracker {
228    fn record_access(&mut self, blockidx: usize) {
229        // Add to history and remove oldest if needed
230        self.history.push_back(blockidx);
231
232        if self.history.len() > self.config.history_size {
233            self.history.pop_front();
234        }
235
236        // Update pattern if we have enough history
237        if self.history.len() >= self.config.min_pattern_length {
238            self.detect_pattern();
239        }
240
241        // Update timestamp
242        self.last_update = Instant::now();
243    }
244
245    fn predict_next_blocks(&self, count: usize) -> Vec<usize> {
246        if self.history.is_empty() {
247            return Vec::new();
248        }
249
250        let mut predictions = Vec::with_capacity(count);
251        let latest = *self.history.back().expect("Operation failed");
252
253        match self.current_pattern {
254            AccessPattern::Sequential => {
255                // For sequential access, predict the next 'count' blocks
256                for i in 1..=count {
257                    predictions.push(latest + i);
258                }
259            }
260            AccessPattern::Strided(stride) => {
261                // For strided access, predict the next 'count' blocks with the given stride
262                for i in 1..=count {
263                    predictions.push(latest + stride * i);
264                }
265            }
266            _ => {
267                // For random access, we can't make good predictions
268                // but we could predict nearby blocks as a heuristic
269                if latest > 0 {
270                    predictions.push(latest - 1);
271                }
272                predictions.push(latest + 1);
273
274                // Fill remaining predictions with adjacent blocks
275                let mut offset = 2;
276                while predictions.len() < count {
277                    if latest >= offset {
278                        predictions.push(latest - offset);
279                    }
280                    predictions.push(latest + offset);
281                    offset += 1;
282                }
283
284                // Trim to requested count
285                predictions.truncate(count);
286            }
287        }
288
289        predictions
290    }
291
292    fn current_pattern(&self) -> AccessPattern {
293        self.current_pattern
294    }
295
296    fn clear_history(&mut self) {
297        self.history.clear();
298        self.current_pattern = AccessPattern::Random;
299        self.stride = None;
300    }
301}
302
303/// Shared state for the prefetching system.
304#[derive(Debug)]
305#[allow(dead_code)]
306pub struct PrefetchingState {
307    /// Configuration for prefetching
308    config: PrefetchConfig,
309
310    /// Access pattern tracker
311    tracker: Box<dyn AccessPatternTracker + Send + Sync>,
312
313    /// Set of blocks that are currently being prefetched
314    prefetching: HashSet<usize>,
315
316    /// Set of blocks that have been prefetched and are available in the cache
317    prefetched: HashSet<usize>,
318
319    /// Statistics about prefetching
320    #[allow(dead_code)]
321    stats: PrefetchStats,
322}
323
324/// Statistics about prefetching performance.
325#[derive(Debug, Default, Clone)]
326pub struct PrefetchStats {
327    /// Total number of prefetch operations performed
328    pub prefetch_count: usize,
329
330    /// Number of cache hits on prefetched blocks
331    pub prefetch_hits: usize,
332
333    /// Number of accesses to blocks that were not prefetched
334    pub prefetch_misses: usize,
335
336    /// Hit rate (hits / (hits + misses))
337    pub hit_rate: f64,
338}
339
340impl PrefetchingState {
341    /// Create a new prefetching state with the given configuration.
342    #[allow(dead_code)]
343    pub fn new(config: PrefetchConfig) -> Self {
344        Self {
345            tracker: Box::new(BlockAccessTracker::new(config.clone())),
346            config,
347            prefetching: HashSet::new(),
348            prefetched: HashSet::new(),
349            stats: PrefetchStats::default(),
350        }
351    }
352
353    /// Record an access to a block.
354    #[allow(dead_code)]
355    pub fn idx(&mut self, blockidx: usize) {
356        self.tracker.record_access(blockidx);
357
358        // Update stats if this was a prefetched block
359        if self.prefetched.contains(&blockidx) {
360            self.stats.prefetch_hits += 1;
361            self.prefetched.remove(&blockidx);
362        } else {
363            self.stats.prefetch_misses += 1;
364        }
365
366        // Update hit rate
367        let total = self.stats.prefetch_hits + self.stats.prefetch_misses;
368        if total > 0 {
369            self.stats.hit_rate = self.stats.prefetch_hits as f64 / total as f64;
370        }
371    }
372
373    /// Get the blocks that should be prefetched next.
374    #[allow(dead_code)]
375    pub fn get_blocks_to_prefetch(&self) -> Vec<usize> {
376        if !self.config.enabled {
377            return Vec::new();
378        }
379
380        // Predict next blocks
381        let predicted = self.tracker.predict_next_blocks(self.config.prefetch_count);
382
383        // Filter out blocks that are already prefetched or being prefetched
384        predicted
385            .into_iter()
386            .filter(|&block_idx| {
387                !self.prefetched.contains(&block_idx) && !self.prefetching.contains(&block_idx)
388            })
389            .collect()
390    }
391
392    /// Mark a block as being prefetched.
393    #[allow(dead_code)]
394    pub fn idx_2(&mut self, blockidx: usize) {
395        self.prefetching.insert(blockidx);
396    }
397
398    /// Mark a block as prefetched and available in the cache.
399    #[allow(dead_code)]
400    pub fn idx_3(&mut self, blockidx: usize) {
401        self.prefetching.remove(&blockidx);
402        self.prefetched.insert(blockidx);
403        self.stats.prefetch_count += 1;
404    }
405
406    /// Get the current prefetching statistics.
407    #[allow(dead_code)]
408    pub fn stats(&self) -> PrefetchStats {
409        self.stats.clone()
410    }
411}
412
413/// Trait for memory-mapped arrays that support prefetching.
414pub trait Prefetching {
415    /// Enable prefetching with the given configuration.
416    fn enable_prefetching(&mut self, config: PrefetchConfig) -> CoreResult<()>;
417
418    /// Disable prefetching.
419    fn disable_prefetching(&mut self) -> CoreResult<()>;
420
421    /// Get the current prefetching statistics.
422    fn prefetch_stats(&self) -> CoreResult<PrefetchStats>;
423
424    /// Prefetch a specific block.
425    fn prefetch_block_by_idx_by_idx(&mut self, idx: usize) -> CoreResult<()>;
426
427    /// Prefetch multiple blocks.
428    fn prefetch_indices(&mut self, indices: &[usize]) -> CoreResult<()>;
429
430    /// Clear the prefetching state.
431    fn clear_prefetch_state(&mut self) -> CoreResult<()>;
432}
433
434// Extended CompressedMemMappedArray struct with prefetching support
435#[cfg(feature = "memory_compression")]
436#[derive(Debug)]
437pub struct PrefetchingCompressedArray<A: Clone + Copy + 'static + Send + Sync> {
438    /// The underlying compressed memory-mapped array
439    array: CompressedMemMappedArray<A>,
440
441    /// Prefetching state
442    prefetch_state: Arc<Mutex<PrefetchingState>>,
443
444    /// Prefetching enabled flag
445    prefetching_enabled: bool,
446
447    /// Background prefetching thread handle (if async prefetching is enabled)
448    #[allow(dead_code)] // May be unused if async prefetching is disabled
449    prefetch_thread: Option<std::thread::JoinHandle<()>>,
450
451    /// Channel to send blocks to prefetch
452    #[allow(dead_code)] // May be unused if async prefetching is disabled
453    prefetch_sender: Option<std::sync::mpsc::Sender<PrefetchCommand>>,
454}
455
456/// Commands for the prefetching thread
457#[cfg(feature = "memory_compression")]
458enum PrefetchCommand {
459    /// Prefetch a specific block
460    Prefetch(usize),
461
462    /// Stop the prefetching thread
463    Stop,
464}
465
466#[cfg(feature = "memory_compression")]
467impl<A: Clone + Copy + 'static + Send + Sync> PrefetchingCompressedArray<A> {
468    /// Create a new prefetching compressed array from an existing compressed memory-mapped array.
469    pub fn new(array: CompressedMemMappedArray<A>) -> Self {
470        // Create prefetching state with default config
471        let prefetch_state = Arc::new(Mutex::new(PrefetchingState::new(PrefetchConfig::default())));
472
473        Self {
474            array,
475            prefetch_state,
476            prefetching_enabled: false,
477            prefetch_thread: None,
478            prefetch_sender: None,
479        }
480    }
481
482    /// Create a new prefetching compressed array with the given configuration.
483    pub fn new_with_config(
484        array: CompressedMemMappedArray<A>,
485        config: PrefetchConfig,
486    ) -> CoreResult<Self> {
487        let mut prefetching_array = Self::new(array);
488        prefetching_array.enable_prefetching(config)?;
489        Ok(prefetching_array)
490    }
491
492    /// Start the background prefetching thread.
493    fn start_background_prefetching(
494        &mut self,
495        state: Arc<Mutex<PrefetchingState>>,
496    ) -> CoreResult<()> {
497        // Create channel for sending prefetch commands
498        let (sender, receiver) = std::sync::mpsc::channel();
499        self.prefetch_sender = Some(sender);
500
501        // Clone array and state for the thread
502        let array = self.array.clone();
503        let prefetch_state = state.clone();
504
505        // Get the timeout from the config
506        let timeout = {
507            let guard = self.prefetch_state.lock().map_err(|_| {
508                CoreError::MutexError(ErrorContext::new(
509                    "Failed to lock prefetch _state".to_string(),
510                ))
511            })?;
512            guard.config.prefetch_timeout
513        };
514
515        // Start the thread
516        let thread = std::thread::spawn(move || {
517            // Background thread for prefetching
518            loop {
519                // Get the next command
520                match receiver.recv_timeout(timeout) {
521                    Ok(PrefetchCommand::Prefetch(block_idx)) => {
522                        // Mark the block as being prefetched
523                        {
524                            if let Ok(mut guard) = prefetch_state.lock() {
525                                guard.idx_2(block_idx);
526                            }
527                        }
528
529                        // Attempt to prefetch the block (ignoring errors)
530                        if array.preload_block(block_idx).is_ok() {
531                            // Mark the block as prefetched
532                            if let Ok(mut guard) = prefetch_state.lock() {
533                                guard.idx_3(block_idx);
534                            }
535                        }
536                    }
537                    Ok(PrefetchCommand::Stop) => {
538                        // Stop the thread
539                        break;
540                    }
541                    Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
542                        // Timeout, check if there are new blocks to prefetch
543                        if let Ok(guard) = prefetch_state.lock() {
544                            let blocks = guard.get_blocks_to_prefetch();
545
546                            // If there are blocks to prefetch, we need to drop the lock
547                            // and then prefetch them
548                            if !blocks.is_empty() {
549                                drop(guard);
550
551                                for &block_idx in &blocks {
552                                    // Mark the block as being prefetched
553                                    if let Ok(mut guard) = prefetch_state.lock() {
554                                        guard.idx_2(block_idx);
555                                    }
556
557                                    // Attempt to prefetch the block (ignoring errors)
558                                    if array.preload_block(block_idx).is_ok() {
559                                        // Mark the block as prefetched
560                                        if let Ok(mut guard) = prefetch_state.lock() {
561                                            guard.idx_3(block_idx);
562                                        }
563                                    }
564                                }
565                            }
566                        }
567                    }
568                    Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
569                        // Channel closed, exit thread
570                        break;
571                    }
572                }
573            }
574        });
575
576        self.prefetch_thread = Some(thread);
577        Ok(())
578    }
579
580    /// Stop the background prefetching thread.
581    fn stop_prefetch_thread(&mut self) -> CoreResult<()> {
582        if let Some(sender) = self.prefetch_sender.take() {
583            // Send stop command to the thread
584            sender.send(PrefetchCommand::Stop).map_err(|_| {
585                CoreError::ThreadError(ErrorContext::new("Failed to send stop command".to_string()))
586            })?;
587
588            // Wait for the thread to finish
589            if let Some(thread) = self.prefetch_thread.take() {
590                thread.join().map_err(|_| {
591                    CoreError::ThreadError(ErrorContext::new(
592                        "Failed to join prefetch thread".to_string(),
593                    ))
594                })?;
595            }
596        }
597
598        Ok(())
599    }
600
601    /// Get access to the underlying compressed memory-mapped array.
602    pub const fn inner(&self) -> &CompressedMemMappedArray<A> {
603        &self.array
604    }
605
606    /// Get mutable access to the underlying compressed memory-mapped array.
607    pub fn inner_mut(&mut self) -> &mut CompressedMemMappedArray<A> {
608        &mut self.array
609    }
610
611    /// Request prefetching of a specific block through the background thread.
612    fn request_prefetch(&self, blockidx: usize) -> CoreResult<()> {
613        if let Some(sender) = &self.prefetch_sender {
614            sender
615                .send(PrefetchCommand::Prefetch(blockidx))
616                .map_err(|_| {
617                    CoreError::ThreadError(ErrorContext::new(
618                        "Failed to send prefetch command".to_string(),
619                    ))
620                })?;
621        }
622
623        Ok(())
624    }
625}
626
627#[cfg(feature = "memory_compression")]
628impl<A: Clone + Copy + 'static + Send + Sync> Prefetching for PrefetchingCompressedArray<A> {
629    fn enable_prefetching(&mut self, config: PrefetchConfig) -> CoreResult<()> {
630        // Already enabled with the same config?
631        if self.prefetching_enabled {
632            // Check if we need to update the config
633            let current_config = {
634                let guard = self.prefetch_state.lock().map_err(|_| {
635                    CoreError::MutexError(ErrorContext::new(
636                        "Failed to lock prefetch state".to_string(),
637                    ))
638                })?;
639                guard.config.clone()
640            };
641
642            if current_config.async_prefetch == config.async_prefetch
643                && current_config.prefetch_count == config.prefetch_count
644                && current_config.history_size == config.history_size
645            {
646                // No significant changes, just update the config
647                let mut guard = self.prefetch_state.lock().map_err(|_| {
648                    CoreError::MutexError(ErrorContext::new(
649                        "Failed to lock prefetch state".to_string(),
650                    ))
651                })?;
652                guard.config = config;
653                return Ok(());
654            }
655
656            // Need to stop the current prefetching and restart with new config
657            self.disable_prefetching()?;
658        }
659
660        // Create new prefetching state
661        let prefetch_state = Arc::new(Mutex::new(PrefetchingState::new(config.clone())));
662        self.prefetch_state = prefetch_state.clone();
663
664        // Start background thread if async prefetching is enabled
665        if config.async_prefetch {
666            self.start_background_prefetching(prefetch_state)?;
667        }
668
669        self.prefetching_enabled = true;
670        Ok(())
671    }
672
673    fn disable_prefetching(&mut self) -> CoreResult<()> {
674        if self.prefetching_enabled {
675            // Stop background thread if it's running
676            self.stop_prefetch_thread()?;
677
678            // Clear prefetching state
679            let mut guard = self.prefetch_state.lock().map_err(|_| {
680                CoreError::MutexError(ErrorContext::new(
681                    "Failed to lock prefetch state".to_string(),
682                ))
683            })?;
684
685            // Disable prefetching in the config
686            guard.config.enabled = false;
687
688            self.prefetching_enabled = false;
689        }
690
691        Ok(())
692    }
693
694    fn prefetch_stats(&self) -> CoreResult<PrefetchStats> {
695        let guard = self.prefetch_state.lock().map_err(|_| {
696            CoreError::MutexError(ErrorContext::new(
697                "Failed to lock prefetch state".to_string(),
698            ))
699        })?;
700
701        Ok(guard.stats())
702    }
703
704    fn prefetch_block_by_idx_by_idx(&mut self, blockidx: usize) -> CoreResult<()> {
705        if !self.prefetching_enabled {
706            return Ok(());
707        }
708
709        // Check if the block is already prefetched
710        let should_prefetch = {
711            let guard = self.prefetch_state.lock().map_err(|_| {
712                CoreError::MutexError(ErrorContext::new(
713                    "Failed to lock prefetch state".to_string(),
714                ))
715            })?;
716
717            !guard.prefetched.contains(&blockidx) && !guard.prefetching.contains(&blockidx)
718        };
719
720        if should_prefetch {
721            // Check if we should do sync or async prefetching
722            let is_async = {
723                let guard = self.prefetch_state.lock().map_err(|_| {
724                    CoreError::MutexError(ErrorContext::new(
725                        "Failed to lock prefetch state".to_string(),
726                    ))
727                })?;
728
729                guard.config.async_prefetch
730            };
731
732            if is_async {
733                // Request async prefetching
734                self.request_prefetch(blockidx)?;
735            } else {
736                // Mark the block as being prefetched
737                {
738                    let mut guard = self.prefetch_state.lock().map_err(|_| {
739                        CoreError::MutexError(ErrorContext::new(
740                            "Failed to lock prefetch state".to_string(),
741                        ))
742                    })?;
743
744                    guard.idx_2(blockidx);
745                }
746
747                // Prefetch the block
748                self.array.preload_block(blockidx)?;
749
750                // Mark the block as prefetched
751                let mut guard = self.prefetch_state.lock().map_err(|_| {
752                    CoreError::MutexError(ErrorContext::new(
753                        "Failed to lock prefetch state".to_string(),
754                    ))
755                })?;
756
757                guard.idx_3(blockidx);
758            }
759        }
760
761        Ok(())
762    }
763
764    fn prefetch_indices(&mut self, indices: &[usize]) -> CoreResult<()> {
765        if !self.prefetching_enabled {
766            return Ok(());
767        }
768
769        for &block_idx in indices {
770            self.prefetch_block_by_idx_by_idx(block_idx)?;
771        }
772
773        Ok(())
774    }
775
776    fn clear_prefetch_state(&mut self) -> CoreResult<()> {
777        let mut guard = self.prefetch_state.lock().map_err(|_| {
778            CoreError::MutexError(ErrorContext::new(
779                "Failed to lock prefetch state".to_string(),
780            ))
781        })?;
782
783        guard.prefetched.clear();
784        guard.prefetching.clear();
785        guard.tracker.clear_history();
786
787        Ok(())
788    }
789}
790
791// Extension methods for CompressedMemMappedArray to add prefetching support
792#[cfg(feature = "memory_compression")]
793impl<A: Clone + Copy + 'static + Send + Sync> CompressedMemMappedArray<A> {
794    /// Convert into a prefetching compressed array.
795    pub fn with_prefetching(self) -> PrefetchingCompressedArray<A> {
796        PrefetchingCompressedArray::new(self)
797    }
798
799    /// Convert into a prefetching compressed array with the given configuration.
800    pub fn with_prefetching_config(
801        self,
802        config: PrefetchConfig,
803    ) -> CoreResult<PrefetchingCompressedArray<A>> {
804        PrefetchingCompressedArray::new_with_config(self, config)
805    }
806}
807
808// For transparent pass-through to underlying array methods
809#[cfg(feature = "memory_compression")]
810impl<A> std::ops::Deref for PrefetchingCompressedArray<A>
811where
812    A: Clone + Copy + 'static + Send + Sync,
813{
814    type Target = CompressedMemMappedArray<A>;
815
816    fn deref(&self) -> &Self::Target {
817        &self.array
818    }
819}
820
821// Implement wrapper method for get that records accesses
822#[cfg(feature = "memory_compression")]
823impl<A: Clone + Copy + 'static + Send + Sync> PrefetchingCompressedArray<A> {
824    /// Get a specific element from the array, with prefetching support.
825    pub fn get(&self, indices: &[usize]) -> CoreResult<A> {
826        // Calculate block index from the access
827        let flat_index = self.calculate_flat_index(indices)?;
828        let block_idx = flat_index / self.metadata().block_size;
829
830        // Record the access
831        if self.prefetching_enabled {
832            let mut guard = self.prefetch_state.lock().map_err(|_| {
833                CoreError::MutexError(ErrorContext::new(
834                    "Failed to lock prefetch state".to_string(),
835                ))
836            })?;
837
838            guard.idx(block_idx);
839
840            // Get blocks to prefetch
841            let to_prefetch = guard.get_blocks_to_prefetch();
842
843            // Drop the lock before prefetching
844            drop(guard);
845
846            // Request prefetching of predicted blocks
847            // TODO: Fix mutable reference issue - needs interior mutability or redesign
848            // for &idx in &to_prefetch {
849            //     self.prefetch_block_by_idx_by_idx(idx)?;
850            // }
851        }
852
853        // Get the element from the underlying array
854        self.array.get(indices)
855    }
856
857    /// Calculate the flat index from multidimensional indices.
858    fn calculate_flat_index(&self, indices: &[usize]) -> CoreResult<usize> {
859        // Check that the indices are valid
860        if indices.len() != self.metadata().shape.len() {
861            return Err(CoreError::DimensionError(ErrorContext::new(format!(
862                "Expected {} indices, got {}",
863                self.metadata().shape.len(),
864                indices.len()
865            ))));
866        }
867
868        for (_, &idx) in indices.iter().enumerate() {
869            if idx >= self.metadata().shape[0] {
870                return Err(CoreError::IndexError(ErrorContext::new(format!(
871                    "Index {} out of bounds for dimension {} (max {})",
872                    idx,
873                    0,
874                    self.metadata().shape[0] - 1
875                ))));
876            }
877        }
878
879        // Calculate flat index
880        let mut flat_index = 0;
881        let mut stride = 1;
882        for i in (0..indices.len()).rev() {
883            flat_index += indices[i] * stride;
884            if i > 0 {
885                stride *= self.metadata().shape[i];
886            }
887        }
888
889        Ok(flat_index)
890    }
891
892    /// Slice the array with prefetching support.
893    pub fn slice(
894        &self,
895        ranges: &[(usize, usize)],
896    ) -> CoreResult<crate::ndarray::Array<A, crate::ndarray::IxDyn>> {
897        // Record accesses for the blocks that will be accessed
898        if self.prefetching_enabled {
899            // Determine which blocks will be accessed
900            let blocks = self.calculate_blocks_for_slice(ranges)?;
901
902            // Record accesses and prefetch
903            let mut guard = self.prefetch_state.lock().map_err(|_| {
904                CoreError::MutexError(ErrorContext::new(
905                    "Failed to lock prefetch state".to_string(),
906                ))
907            })?;
908
909            // Record each block access
910            for &block_idx in &blocks {
911                guard.idx(block_idx);
912            }
913
914            // Get blocks to prefetch
915            let to_prefetch = guard.get_blocks_to_prefetch();
916
917            // Drop the lock before prefetching
918            drop(guard);
919
920            // Request prefetching of predicted blocks
921            // TODO: Fix mutable reference issue - needs interior mutability or redesign
922            // for &idx in &to_prefetch {
923            //     self.prefetch_block_by_idx_by_idx(idx)?;
924            // }
925        }
926
927        // Use the underlying array's slice method
928        self.array.slice(ranges)
929    }
930
931    /// Calculate which blocks will be accessed for a slice operation.
932    fn calculate_blocks_for_slice(&self, ranges: &[(usize, usize)]) -> CoreResult<HashSet<usize>> {
933        // Check that the ranges are valid
934        if ranges.len() != self.metadata().shape.len() {
935            return Err(CoreError::DimensionError(ErrorContext::new(format!(
936                "Expected {} ranges, got {}",
937                self.metadata().shape.len(),
938                ranges.len()
939            ))));
940        }
941
942        // Calculate the total number of elements in the slice
943        let mut resultshape = Vec::with_capacity(ranges.len());
944        for (_, &(start, end)) in ranges.iter().enumerate() {
945            if start >= end {
946                return Err(CoreError::ValueError(ErrorContext::new(format!(
947                    "Invalid range for dimension {}: {}..{}",
948                    0, start, end
949                ))));
950            }
951            if end > self.metadata().shape[0] {
952                return Err(CoreError::IndexError(ErrorContext::new(format!(
953                    "Range {}..{} out of bounds for dimension {} (max {})",
954                    start,
955                    end,
956                    0,
957                    self.metadata().shape[0]
958                ))));
959            }
960            resultshape.push(end - start);
961        }
962
963        // Calculate the strides for each dimension
964        let mut strides = Vec::with_capacity(self.metadata().shape.len());
965        let mut stride = 1;
966        for i in (0..self.metadata().shape.len()).rev() {
967            strides.push(stride);
968            if i > 0 {
969                stride *= self.metadata().shape[i];
970            }
971        }
972        strides.reverse();
973
974        // Calculate the blocks that will be accessed
975        let mut blocks = HashSet::new();
976        let block_size = self.metadata().block_size;
977
978        // Calculate the corners of the hypercube
979        let mut corners = Vec::with_capacity(1 << ranges.len());
980        corners.push(vec![0; ranges.len()]);
981
982        for dim in 0..ranges.len() {
983            let mut new_corners = Vec::new();
984            for corner in &corners {
985                let mut corner1 = corner.clone();
986                let mut corner2 = corner.clone();
987                corner1[dim] = 0;
988                corner2[dim] = resultshape[dim] - 1;
989                new_corners.push(corner1);
990                new_corners.push(corner2);
991            }
992            corners = new_corners;
993        }
994
995        // Convert corners to flat indices and blocks
996        for corner in corners {
997            let mut flat_index = 0;
998            for (dim, &offset) in corner.iter().enumerate() {
999                flat_index += (ranges[dim].0 + offset) * strides[dim];
1000            }
1001
1002            let block_idx = flat_index / block_size;
1003            blocks.insert(block_idx);
1004        }
1005
1006        // For large slices, we should also check intermediate blocks along the edges
1007        // This is a simplification, but covers many common cases
1008        if blocks.len() > 1 {
1009            let min_block = *blocks.iter().min().expect("Operation failed");
1010            let max_block = *blocks.iter().max().expect("Operation failed");
1011
1012            // Add all blocks in between
1013            for block_idx in min_block..=max_block {
1014                blocks.insert(block_idx);
1015            }
1016        }
1017
1018        Ok(blocks)
1019    }
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024    use super::*;
1025
1026    #[test]
1027    fn test_access_pattern_detection_sequential() {
1028        let config = PrefetchConfig {
1029            min_pattern_length: 4,
1030            ..Default::default()
1031        };
1032
1033        let mut tracker = BlockAccessTracker::new(config);
1034
1035        // Record sequential access
1036        for i in 0..10 {
1037            tracker.record_access(i);
1038        }
1039
1040        // Check that the pattern was detected correctly
1041        assert_eq!(tracker.current_pattern(), AccessPattern::Sequential);
1042
1043        // Check predictions
1044        let predictions = tracker.predict_next_blocks(3);
1045        assert_eq!(predictions, vec![10, 11, 12]);
1046    }
1047
1048    #[test]
1049    fn test_access_pattern_detection_strided() {
1050        let config = PrefetchConfig {
1051            min_pattern_length: 4,
1052            ..Default::default()
1053        };
1054
1055        let mut tracker = BlockAccessTracker::new(config);
1056
1057        // Record strided access with stride 3
1058        for i in (0..30).step_by(3) {
1059            tracker.record_access(i);
1060        }
1061
1062        // Check that the pattern was detected correctly
1063        assert_eq!(tracker.current_pattern(), AccessPattern::Strided(3));
1064
1065        // Check predictions
1066        let predictions = tracker.predict_next_blocks(3);
1067        assert_eq!(predictions, vec![30, 33, 36]);
1068    }
1069
1070    #[test]
1071    fn test_prefetching_state() {
1072        let config = PrefetchConfig {
1073            prefetch_count: 3,
1074            ..Default::default()
1075        };
1076
1077        let mut state = PrefetchingState::new(config);
1078
1079        // Record sequential access (these will be misses since nothing is prefetched yet)
1080        for i in 0..5 {
1081            state.idx(i);
1082        }
1083
1084        // Get blocks to prefetch
1085        let to_prefetch = state.get_blocks_to_prefetch();
1086        assert_eq!(to_prefetch, vec![5, 6, 7]);
1087
1088        // Mark blocks as being prefetched
1089        for &block in &to_prefetch {
1090            // Mark block as prefetching
1091            state.prefetching.insert(block);
1092        }
1093
1094        // Mark block 5 as prefetched
1095        state.prefetched.insert(5);
1096        state.prefetching.remove(&5);
1097
1098        // Access block 5 (should be a hit)
1099        state.idx(5);
1100
1101        // Check stats
1102        let stats = state.stats();
1103        assert_eq!(stats.prefetch_hits, 1);
1104        assert_eq!(stats.prefetch_misses, 5); // Initial 5 accesses
1105        assert!(stats.hit_rate > 0.0);
1106    }
1107}