Skip to main content

scirs2_core/memory_efficient/
adaptive_feedback.rs

1//! Adaptive feedback system for chunk size optimization.
2//!
3//! This module provides a feedback-based system that learns optimal chunk sizes
4//! by tracking performance metrics and adjusting parameters over time.
5
6use serde::{Deserialize, Serialize};
7use std::collections::VecDeque;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant};
10
11/// Default timestamp for deserialization
12fn default_instant() -> Instant {
13    Instant::now()
14}
15
16/// Performance metrics for a chunking operation
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct PerformanceMetrics {
19    /// Chunk size used (in elements)
20    pub chunk_size: usize,
21
22    /// Time taken to process the chunk
23    pub duration: Duration,
24
25    /// Throughput (elements per second)
26    pub throughput: f64,
27
28    /// Cache hit rate (0.0 to 1.0)
29    pub cache_hit_rate: f64,
30
31    /// Memory usage during processing (bytes)
32    pub memory_usage: u64,
33
34    /// Timestamp when metric was recorded
35    #[serde(skip, default = "default_instant")]
36    pub timestamp: Instant,
37}
38
39impl PerformanceMetrics {
40    /// Create new performance metrics
41    pub fn new(
42        chunk_size: usize,
43        duration: Duration,
44        cache_hit_rate: f64,
45        memory_usage: u64,
46    ) -> Self {
47        let throughput = if duration.as_secs_f64() > 0.0 {
48            chunk_size as f64 / duration.as_secs_f64()
49        } else {
50            0.0
51        };
52
53        Self {
54            chunk_size,
55            duration,
56            throughput,
57            cache_hit_rate,
58            memory_usage,
59            timestamp: Instant::now(),
60        }
61    }
62
63    /// Calculate a performance score (higher is better)
64    pub fn score(&self) -> f64 {
65        // Weighted combination of throughput and cache hit rate
66        // Normalize throughput to prevent dominating the score
67        let normalized_throughput = self.throughput.log10().max(0.0);
68        let cache_weight = 0.3;
69        let throughput_weight = 0.7;
70
71        (cache_weight * self.cache_hit_rate) + (throughput_weight * normalized_throughput)
72    }
73}
74
75/// Chunk size predictor with historical tracking
76#[derive(Debug, Clone)]
77pub struct ChunkSizePredictor {
78    /// Historical performance metrics (limited to last N entries)
79    history: VecDeque<PerformanceMetrics>,
80
81    /// Maximum history size
82    max_history: usize,
83
84    /// Current best chunk size
85    best_chunk_size: usize,
86
87    /// Current best score
88    best_score: f64,
89
90    /// Moving average window size
91    window_size: usize,
92
93    /// Minimum chunk size to consider
94    min_chunk_size: usize,
95
96    /// Maximum chunk size to consider
97    max_chunk_size: usize,
98
99    /// Learning rate for adjustments (0.0 to 1.0)
100    learning_rate: f64,
101}
102
103impl ChunkSizePredictor {
104    /// Create a new chunk size predictor
105    pub fn new(initial_chunk_size: usize, min_chunk_size: usize, max_chunk_size: usize) -> Self {
106        Self {
107            history: VecDeque::new(),
108            max_history: 100,
109            best_chunk_size: initial_chunk_size,
110            best_score: 0.0,
111            window_size: 10,
112            min_chunk_size,
113            max_chunk_size,
114            learning_rate: 0.1,
115        }
116    }
117
118    /// Create a new predictor with custom configuration
119    pub fn with_config(
120        initial_chunk_size: usize,
121        min_chunk_size: usize,
122        max_chunk_size: usize,
123        max_history: usize,
124        window_size: usize,
125        learning_rate: f64,
126    ) -> Self {
127        Self {
128            history: VecDeque::new(),
129            max_history,
130            best_chunk_size: initial_chunk_size,
131            best_score: 0.0,
132            window_size,
133            min_chunk_size,
134            max_chunk_size,
135            learning_rate,
136        }
137    }
138
139    /// Record a performance measurement
140    pub fn record(&mut self, metrics: PerformanceMetrics) {
141        let score = metrics.score();
142
143        // Update best chunk size if this performs better
144        if score > self.best_score {
145            self.best_score = score;
146            self.best_chunk_size = metrics.chunk_size;
147        }
148
149        // Add to history
150        self.history.push_back(metrics);
151
152        // Limit history size
153        while self.history.len() > self.max_history {
154            self.history.pop_front();
155        }
156    }
157
158    /// Predict optimal chunk size based on historical data
159    pub fn predict(&self) -> usize {
160        if self.history.is_empty() {
161            return self.best_chunk_size;
162        }
163
164        // Use moving average of recent best performers
165        let recent_metrics: Vec<&PerformanceMetrics> =
166            self.history.iter().rev().take(self.window_size).collect();
167
168        if recent_metrics.is_empty() {
169            return self.best_chunk_size;
170        }
171
172        // Calculate weighted average chunk size
173        let total_score: f64 = recent_metrics.iter().map(|m| m.score()).sum();
174
175        if total_score == 0.0 {
176            return self.best_chunk_size;
177        }
178
179        let weighted_chunk_size: f64 = recent_metrics
180            .iter()
181            .map(|m| m.chunk_size as f64 * m.score())
182            .sum::<f64>()
183            / total_score;
184
185        // Apply learning rate to blend with current best
186        let predicted_size = (self.best_chunk_size as f64 * (1.0 - self.learning_rate))
187            + (weighted_chunk_size * self.learning_rate);
188
189        // Clamp to min/max bounds
190        predicted_size
191            .round()
192            .max(self.min_chunk_size as f64)
193            .min(self.max_chunk_size as f64) as usize
194    }
195
196    /// Get the current best chunk size
197    pub fn best_chunk_size(&self) -> usize {
198        self.best_chunk_size
199    }
200
201    /// Get the average throughput over recent operations
202    pub fn average_throughput(&self) -> f64 {
203        if self.history.is_empty() {
204            return 0.0;
205        }
206
207        let recent: Vec<&PerformanceMetrics> =
208            self.history.iter().rev().take(self.window_size).collect();
209
210        recent.iter().map(|m| m.throughput).sum::<f64>() / recent.len() as f64
211    }
212
213    /// Get the average cache hit rate over recent operations
214    pub fn average_cache_hit_rate(&self) -> f64 {
215        if self.history.is_empty() {
216            return 0.0;
217        }
218
219        let recent: Vec<&PerformanceMetrics> =
220            self.history.iter().rev().take(self.window_size).collect();
221
222        recent.iter().map(|m| m.cache_hit_rate).sum::<f64>() / recent.len() as f64
223    }
224
225    /// Clear all historical data
226    pub fn reset(&mut self) {
227        self.history.clear();
228        self.best_score = 0.0;
229    }
230
231    /// Get the number of recorded metrics
232    pub fn history_size(&self) -> usize {
233        self.history.len()
234    }
235}
236
237/// Thread-safe wrapper for chunk size predictor
238pub type SharedPredictor = Arc<Mutex<ChunkSizePredictor>>;
239
240/// Create a new shared predictor
241#[allow(dead_code)]
242pub fn create_shared_predictor(
243    initial_chunk_size: usize,
244    min_chunk_size: usize,
245    max_chunk_size: usize,
246) -> SharedPredictor {
247    Arc::new(Mutex::new(ChunkSizePredictor::new(
248        initial_chunk_size,
249        min_chunk_size,
250        max_chunk_size,
251    )))
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    #[test]
259    fn test_performance_metrics_creation() {
260        let metrics = PerformanceMetrics::new(1000, Duration::from_secs(1), 0.8, 1024 * 1024);
261
262        assert_eq!(metrics.chunk_size, 1000);
263        assert_eq!(metrics.duration, Duration::from_secs(1));
264        assert!((metrics.throughput - 1000.0).abs() < 1e-6);
265        assert_eq!(metrics.cache_hit_rate, 0.8);
266        assert_eq!(metrics.memory_usage, 1024 * 1024);
267    }
268
269    #[test]
270    fn test_performance_score() {
271        let metrics1 = PerformanceMetrics::new(1000, Duration::from_millis(100), 0.9, 1024 * 1024);
272
273        let metrics2 = PerformanceMetrics::new(1000, Duration::from_millis(100), 0.5, 1024 * 1024);
274
275        // Higher cache hit rate should result in higher score
276        assert!(metrics1.score() > metrics2.score());
277    }
278
279    #[test]
280    fn test_predictor_creation() {
281        let predictor = ChunkSizePredictor::new(1000, 100, 10000);
282
283        assert_eq!(predictor.best_chunk_size(), 1000);
284        assert_eq!(predictor.history_size(), 0);
285    }
286
287    #[test]
288    fn test_predictor_record_and_predict() {
289        let mut predictor = ChunkSizePredictor::new(1000, 100, 10000);
290
291        // Record some metrics with varying performance
292        let metrics1 = PerformanceMetrics::new(1000, Duration::from_millis(100), 0.8, 1024 * 1024);
293        predictor.record(metrics1);
294
295        let metrics2 = PerformanceMetrics::new(1500, Duration::from_millis(80), 0.85, 1024 * 1024);
296        predictor.record(metrics2);
297
298        let metrics3 = PerformanceMetrics::new(2000, Duration::from_millis(90), 0.9, 1024 * 1024);
299        predictor.record(metrics3);
300
301        assert_eq!(predictor.history_size(), 3);
302
303        // Predict should return a chunk size within bounds
304        let predicted = predictor.predict();
305        assert!(predicted >= 100);
306        assert!(predicted <= 10000);
307    }
308
309    #[test]
310    fn test_predictor_best_tracking() {
311        let mut predictor = ChunkSizePredictor::new(1000, 100, 10000);
312
313        // Record a high-performing configuration
314        let good_metrics =
315            PerformanceMetrics::new(2000, Duration::from_millis(50), 0.95, 1024 * 1024);
316        predictor.record(good_metrics);
317
318        // Record a lower-performing configuration
319        let bad_metrics =
320            PerformanceMetrics::new(1000, Duration::from_millis(200), 0.5, 1024 * 1024);
321        predictor.record(bad_metrics);
322
323        // Best should still be the high-performing one
324        assert_eq!(predictor.best_chunk_size(), 2000);
325    }
326
327    #[test]
328    fn test_predictor_history_limit() {
329        let mut predictor = ChunkSizePredictor::with_config(
330            1000, 100, 10000, 5, // max_history
331            3, 0.1,
332        );
333
334        // Record more than max_history entries
335        for i in 0..10 {
336            let metrics = PerformanceMetrics::new(
337                1000 + i * 100,
338                Duration::from_millis(100),
339                0.8,
340                1024 * 1024,
341            );
342            predictor.record(metrics);
343        }
344
345        // Should be limited to max_history
346        assert_eq!(predictor.history_size(), 5);
347    }
348
349    #[test]
350    fn test_predictor_averages() {
351        let mut predictor = ChunkSizePredictor::new(1000, 100, 10000);
352
353        let metrics1 = PerformanceMetrics::new(1000, Duration::from_millis(100), 0.8, 1024 * 1024);
354        predictor.record(metrics1);
355
356        let metrics2 = PerformanceMetrics::new(1000, Duration::from_millis(100), 0.6, 1024 * 1024);
357        predictor.record(metrics2);
358
359        let avg_cache_hit = predictor.average_cache_hit_rate();
360        assert!((avg_cache_hit - 0.7).abs() < 1e-6);
361
362        assert!(predictor.average_throughput() > 0.0);
363    }
364
365    #[test]
366    fn test_predictor_reset() {
367        let mut predictor = ChunkSizePredictor::new(1000, 100, 10000);
368
369        let metrics = PerformanceMetrics::new(2000, Duration::from_millis(100), 0.8, 1024 * 1024);
370        predictor.record(metrics);
371
372        assert_eq!(predictor.history_size(), 1);
373
374        predictor.reset();
375
376        assert_eq!(predictor.history_size(), 0);
377    }
378
379    #[test]
380    fn test_shared_predictor() {
381        let predictor = create_shared_predictor(1000, 100, 10000);
382
383        // Should be able to lock and use it
384        {
385            let mut p = predictor.lock().expect("Lock failed");
386            let metrics =
387                PerformanceMetrics::new(1500, Duration::from_millis(100), 0.8, 1024 * 1024);
388            p.record(metrics);
389        }
390
391        // Verify the record was stored
392        {
393            let p = predictor.lock().expect("Lock failed");
394            assert_eq!(p.history_size(), 1);
395        }
396    }
397}