memscope_rs/core/
threshold_batch_processor.rs

1//! Threshold-based batch processor that adapts based on operation frequency
2//!
3//! This module provides a batch processor that automatically switches between
4//! direct processing and batching based on operation frequency.
5
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::Mutex;
8use std::time::{Duration, Instant};
9
10/// Configuration for batch processing behavior
11#[derive(Debug, Clone)]
12pub struct BatchConfig {
13    pub batch_size: usize,
14    pub frequency_threshold: u64, // operations per second
15    pub measurement_window: Duration,
16}
17
18impl BatchConfig {
19    /// Low frequency configuration (100 ops/sec threshold)
20    pub fn low_frequency() -> Self {
21        Self {
22            batch_size: 10,
23            frequency_threshold: 100,
24            measurement_window: Duration::from_secs(1),
25        }
26    }
27
28    /// Medium frequency configuration (500 ops/sec threshold)
29    pub fn medium_frequency() -> Self {
30        Self {
31            batch_size: 25,
32            frequency_threshold: 500,
33            measurement_window: Duration::from_secs(1),
34        }
35    }
36
37    /// High frequency configuration (1000 ops/sec threshold)
38    pub fn high_frequency() -> Self {
39        Self {
40            batch_size: 50,
41            frequency_threshold: 1000,
42            measurement_window: Duration::from_secs(1),
43        }
44    }
45
46    /// Create custom configuration
47    pub fn custom(
48        batch_size: usize,
49        frequency_threshold: u64,
50        measurement_window: Duration,
51    ) -> Self {
52        Self {
53            batch_size,
54            frequency_threshold,
55            measurement_window,
56        }
57    }
58}
59
60impl Default for BatchConfig {
61    fn default() -> Self {
62        Self::medium_frequency()
63    }
64}
65
66/// Threshold-based batch processor
67pub struct ThresholdBatchProcessor<T> {
68    config: BatchConfig,
69    buffer: Mutex<Vec<T>>,
70    #[allow(clippy::type_complexity)]
71    processor: Box<dyn Fn(&[T]) + Send + Sync>,
72
73    // Frequency tracking
74    operation_count: AtomicU64,
75    last_measurement: Mutex<Instant>,
76    batching_enabled: AtomicBool,
77
78    // Statistics
79    total_operations: AtomicU64,
80    batched_operations: AtomicU64,
81}
82
83impl<T> ThresholdBatchProcessor<T> {
84    /// Create new threshold batch processor
85    pub fn new<F>(config: BatchConfig, processor: F) -> Self
86    where
87        F: Fn(&[T]) + Send + Sync + 'static,
88    {
89        Self {
90            config,
91            buffer: Mutex::new(Vec::new()),
92            processor: Box::new(processor),
93            operation_count: AtomicU64::new(0),
94            last_measurement: Mutex::new(Instant::now()),
95            batching_enabled: AtomicBool::new(false),
96            total_operations: AtomicU64::new(0),
97            batched_operations: AtomicU64::new(0),
98        }
99    }
100
101    /// Create with default medium frequency configuration
102    pub fn with_default_config<F>(processor: F) -> Self
103    where
104        F: Fn(&[T]) + Send + Sync + 'static,
105    {
106        Self::new(BatchConfig::default(), processor)
107    }
108
109    /// Process an item (either directly or via batching)
110    pub fn process(&self, item: T) {
111        self.total_operations.fetch_add(1, Ordering::Relaxed);
112        self.operation_count.fetch_add(1, Ordering::Relaxed);
113
114        // Check if we should update batching mode
115        self.update_batching_mode();
116
117        if self.batching_enabled.load(Ordering::Relaxed) {
118            self.process_batched(item);
119        } else {
120            self.process_direct(item);
121        }
122    }
123
124    /// Process item directly (no batching)
125    fn process_direct(&self, item: T) {
126        let items = vec![item];
127        (self.processor)(&items);
128    }
129
130    /// Process item via batching
131    fn process_batched(&self, item: T) {
132        let should_flush = {
133            if let Ok(mut buffer) = self.buffer.try_lock() {
134                buffer.push(item);
135                buffer.len() >= self.config.batch_size
136            } else {
137                // If we can't get the lock, process directly to avoid blocking
138                self.process_direct(item);
139                return;
140            }
141        };
142
143        if should_flush {
144            self.flush_batch();
145        }
146
147        self.batched_operations.fetch_add(1, Ordering::Relaxed);
148    }
149
150    /// Flush the current batch
151    pub fn flush_batch(&self) {
152        if let Ok(mut buffer) = self.buffer.try_lock() {
153            if !buffer.is_empty() {
154                let items = std::mem::take(&mut *buffer);
155                drop(buffer); // Release lock before processing
156                (self.processor)(&items);
157            }
158        }
159    }
160
161    /// Update batching mode based on current frequency
162    fn update_batching_mode(&self) {
163        if let Ok(mut last_measurement) = self.last_measurement.try_lock() {
164            let now = Instant::now();
165            let elapsed = now.duration_since(*last_measurement);
166
167            if elapsed >= self.config.measurement_window {
168                let ops_count = self.operation_count.swap(0, Ordering::Relaxed);
169                let frequency = if elapsed.as_secs() > 0 {
170                    ops_count / elapsed.as_secs()
171                } else {
172                    ops_count * 1000 / elapsed.as_millis().max(1) as u64
173                };
174
175                // Enable batching if frequency exceeds threshold
176                let should_batch = frequency > self.config.frequency_threshold;
177                self.batching_enabled.store(should_batch, Ordering::Relaxed);
178
179                *last_measurement = now;
180            }
181        }
182    }
183
184    /// Get current frequency (operations per second)
185    pub fn current_frequency(&self) -> u64 {
186        if let Ok(last_measurement) = self.last_measurement.try_lock() {
187            let elapsed = last_measurement.elapsed();
188            let ops_count = self.operation_count.load(Ordering::Relaxed);
189
190            if elapsed.as_secs() > 0 {
191                ops_count / elapsed.as_secs()
192            } else {
193                ops_count * 1000 / elapsed.as_millis().max(1) as u64
194            }
195        } else {
196            0
197        }
198    }
199
200    /// Check if batching is currently enabled
201    pub fn is_batching_enabled(&self) -> bool {
202        self.batching_enabled.load(Ordering::Relaxed)
203    }
204
205    /// Get processing statistics
206    pub fn stats(&self) -> ProcessingStats {
207        let total = self.total_operations.load(Ordering::Relaxed);
208        let batched = self.batched_operations.load(Ordering::Relaxed);
209
210        ProcessingStats {
211            total_operations: total,
212            batched_operations: batched,
213            direct_operations: total - batched,
214            batching_ratio: if total > 0 {
215                batched as f64 / total as f64
216            } else {
217                0.0
218            },
219            current_frequency: self.current_frequency(),
220            batching_enabled: self.is_batching_enabled(),
221        }
222    }
223
224    /// Reset all statistics
225    pub fn reset_stats(&self) {
226        self.total_operations.store(0, Ordering::Relaxed);
227        self.batched_operations.store(0, Ordering::Relaxed);
228        self.operation_count.store(0, Ordering::Relaxed);
229
230        if let Ok(mut last_measurement) = self.last_measurement.try_lock() {
231            *last_measurement = Instant::now();
232        }
233    }
234}
235
236/// Processing statistics
237#[derive(Debug, Clone)]
238pub struct ProcessingStats {
239    pub total_operations: u64,
240    pub batched_operations: u64,
241    pub direct_operations: u64,
242    pub batching_ratio: f64,
243    pub current_frequency: u64,
244    pub batching_enabled: bool,
245}
246
247// Safety: ThresholdBatchProcessor is Send if T is Send
248unsafe impl<T: Send> Send for ThresholdBatchProcessor<T> {}
249
250// Safety: ThresholdBatchProcessor is Sync if T is Send
251unsafe impl<T: Send> Sync for ThresholdBatchProcessor<T> {}
252
253#[cfg(test)]
254mod tests {
255    use crate::core::safe_operations::SafeLock;
256
257    use super::*;
258    use std::sync::{Arc, Mutex as StdMutex};
259    use std::time::Duration;
260
261    #[test]
262    fn test_low_frequency_direct_processing() {
263        let processed = Arc::new(StdMutex::new(Vec::new()));
264        let processed_clone = processed.clone();
265
266        let config = BatchConfig::custom(5, 100, Duration::from_millis(100));
267        let processor = ThresholdBatchProcessor::new(config, move |items: &[i32]| {
268            let mut p = processed_clone
269                .safe_lock()
270                .expect("Failed to acquire lock on processed");
271            p.extend_from_slice(items);
272        });
273
274        // Process items slowly (low frequency) - simulate with smaller batches
275        for i in 0..10 {
276            processor.process(i);
277            // Remove sleep - just process normally for testing
278        }
279
280        processor.flush_batch();
281
282        let stats = processor.stats();
283        tracing::info!("Low frequency stats: {stats:?}");
284
285        // Should mostly use direct processing
286        assert!(!processor.is_batching_enabled());
287
288        let processed_items = processed
289            .safe_lock()
290            .expect("Failed to acquire lock on processed");
291        assert_eq!(processed_items.len(), 10);
292    }
293
294    #[test]
295    fn test_high_frequency_batch_processing() {
296        let processed = Arc::new(StdMutex::new(Vec::new()));
297        let processed_clone = processed.clone();
298
299        let config = BatchConfig::custom(3, 50, Duration::from_millis(100));
300        let processor = ThresholdBatchProcessor::new(config, move |items: &[i32]| {
301            if let Ok(mut p) = processed_clone.lock() {
302                p.extend_from_slice(items);
303            }
304        });
305
306        // Process items quickly (high frequency)
307        for i in 0..20 {
308            processor.process(i);
309            // No sleep - maximum frequency
310        }
311
312        // No need to wait - measurement happens immediately for testing
313
314        // Process a few more to trigger batching mode check
315        for i in 20..25 {
316            processor.process(i);
317        }
318
319        processor.flush_batch();
320
321        let stats = processor.stats();
322        tracing::info!("High frequency stats: {stats:?}");
323
324        let processed_items = processed
325            .safe_lock()
326            .expect("Failed to acquire lock on processed");
327        assert_eq!(processed_items.len(), 25);
328    }
329
330    #[test]
331    fn test_config_presets() {
332        let low = BatchConfig::low_frequency();
333        assert_eq!(low.frequency_threshold, 100);
334
335        let medium = BatchConfig::medium_frequency();
336        assert_eq!(medium.frequency_threshold, 500);
337
338        let high = BatchConfig::high_frequency();
339        assert_eq!(high.frequency_threshold, 1000);
340    }
341}