memscope_rs/core/
threshold_batch_processor.rs1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7use std::sync::Mutex;
8use std::time::{Duration, Instant};
9
10#[derive(Debug, Clone)]
12pub struct BatchConfig {
13 pub batch_size: usize,
14 pub frequency_threshold: u64, pub measurement_window: Duration,
16}
17
18impl BatchConfig {
19 pub fn low_frequency() -> Self {
21 Self {
22 batch_size: 10,
23 frequency_threshold: 100,
24 measurement_window: Duration::from_secs(1),
25 }
26 }
27
28 pub fn medium_frequency() -> Self {
30 Self {
31 batch_size: 25,
32 frequency_threshold: 500,
33 measurement_window: Duration::from_secs(1),
34 }
35 }
36
37 pub fn high_frequency() -> Self {
39 Self {
40 batch_size: 50,
41 frequency_threshold: 1000,
42 measurement_window: Duration::from_secs(1),
43 }
44 }
45
46 pub fn custom(
48 batch_size: usize,
49 frequency_threshold: u64,
50 measurement_window: Duration,
51 ) -> Self {
52 Self {
53 batch_size,
54 frequency_threshold,
55 measurement_window,
56 }
57 }
58}
59
60impl Default for BatchConfig {
61 fn default() -> Self {
62 Self::medium_frequency()
63 }
64}
65
66pub struct ThresholdBatchProcessor<T> {
68 config: BatchConfig,
69 buffer: Mutex<Vec<T>>,
70 #[allow(clippy::type_complexity)]
71 processor: Box<dyn Fn(&[T]) + Send + Sync>,
72
73 operation_count: AtomicU64,
75 last_measurement: Mutex<Instant>,
76 batching_enabled: AtomicBool,
77
78 total_operations: AtomicU64,
80 batched_operations: AtomicU64,
81}
82
83impl<T> ThresholdBatchProcessor<T> {
84 pub fn new<F>(config: BatchConfig, processor: F) -> Self
86 where
87 F: Fn(&[T]) + Send + Sync + 'static,
88 {
89 Self {
90 config,
91 buffer: Mutex::new(Vec::new()),
92 processor: Box::new(processor),
93 operation_count: AtomicU64::new(0),
94 last_measurement: Mutex::new(Instant::now()),
95 batching_enabled: AtomicBool::new(false),
96 total_operations: AtomicU64::new(0),
97 batched_operations: AtomicU64::new(0),
98 }
99 }
100
101 pub fn with_default_config<F>(processor: F) -> Self
103 where
104 F: Fn(&[T]) + Send + Sync + 'static,
105 {
106 Self::new(BatchConfig::default(), processor)
107 }
108
109 pub fn process(&self, item: T) {
111 self.total_operations.fetch_add(1, Ordering::Relaxed);
112 self.operation_count.fetch_add(1, Ordering::Relaxed);
113
114 self.update_batching_mode();
116
117 if self.batching_enabled.load(Ordering::Relaxed) {
118 self.process_batched(item);
119 } else {
120 self.process_direct(item);
121 }
122 }
123
124 fn process_direct(&self, item: T) {
126 let items = vec![item];
127 (self.processor)(&items);
128 }
129
130 fn process_batched(&self, item: T) {
132 let should_flush = {
133 if let Ok(mut buffer) = self.buffer.try_lock() {
134 buffer.push(item);
135 buffer.len() >= self.config.batch_size
136 } else {
137 self.process_direct(item);
139 return;
140 }
141 };
142
143 if should_flush {
144 self.flush_batch();
145 }
146
147 self.batched_operations.fetch_add(1, Ordering::Relaxed);
148 }
149
150 pub fn flush_batch(&self) {
152 if let Ok(mut buffer) = self.buffer.try_lock() {
153 if !buffer.is_empty() {
154 let items = std::mem::take(&mut *buffer);
155 drop(buffer); (self.processor)(&items);
157 }
158 }
159 }
160
161 fn update_batching_mode(&self) {
163 if let Ok(mut last_measurement) = self.last_measurement.try_lock() {
164 let now = Instant::now();
165 let elapsed = now.duration_since(*last_measurement);
166
167 if elapsed >= self.config.measurement_window {
168 let ops_count = self.operation_count.swap(0, Ordering::Relaxed);
169 let frequency = if elapsed.as_secs() > 0 {
170 ops_count / elapsed.as_secs()
171 } else {
172 ops_count * 1000 / elapsed.as_millis().max(1) as u64
173 };
174
175 let should_batch = frequency > self.config.frequency_threshold;
177 self.batching_enabled.store(should_batch, Ordering::Relaxed);
178
179 *last_measurement = now;
180 }
181 }
182 }
183
184 pub fn current_frequency(&self) -> u64 {
186 if let Ok(last_measurement) = self.last_measurement.try_lock() {
187 let elapsed = last_measurement.elapsed();
188 let ops_count = self.operation_count.load(Ordering::Relaxed);
189
190 if elapsed.as_secs() > 0 {
191 ops_count / elapsed.as_secs()
192 } else {
193 ops_count * 1000 / elapsed.as_millis().max(1) as u64
194 }
195 } else {
196 0
197 }
198 }
199
200 pub fn is_batching_enabled(&self) -> bool {
202 self.batching_enabled.load(Ordering::Relaxed)
203 }
204
205 pub fn stats(&self) -> ProcessingStats {
207 let total = self.total_operations.load(Ordering::Relaxed);
208 let batched = self.batched_operations.load(Ordering::Relaxed);
209
210 ProcessingStats {
211 total_operations: total,
212 batched_operations: batched,
213 direct_operations: total - batched,
214 batching_ratio: if total > 0 {
215 batched as f64 / total as f64
216 } else {
217 0.0
218 },
219 current_frequency: self.current_frequency(),
220 batching_enabled: self.is_batching_enabled(),
221 }
222 }
223
224 pub fn reset_stats(&self) {
226 self.total_operations.store(0, Ordering::Relaxed);
227 self.batched_operations.store(0, Ordering::Relaxed);
228 self.operation_count.store(0, Ordering::Relaxed);
229
230 if let Ok(mut last_measurement) = self.last_measurement.try_lock() {
231 *last_measurement = Instant::now();
232 }
233 }
234}
235
236#[derive(Debug, Clone)]
238pub struct ProcessingStats {
239 pub total_operations: u64,
240 pub batched_operations: u64,
241 pub direct_operations: u64,
242 pub batching_ratio: f64,
243 pub current_frequency: u64,
244 pub batching_enabled: bool,
245}
246
247unsafe impl<T: Send> Send for ThresholdBatchProcessor<T> {}
249
250unsafe impl<T: Send> Sync for ThresholdBatchProcessor<T> {}
252
253#[cfg(test)]
254mod tests {
255 use crate::core::safe_operations::SafeLock;
256
257 use super::*;
258 use std::sync::{Arc, Mutex as StdMutex};
259 use std::time::Duration;
260
261 #[test]
262 fn test_low_frequency_direct_processing() {
263 let processed = Arc::new(StdMutex::new(Vec::new()));
264 let processed_clone = processed.clone();
265
266 let config = BatchConfig::custom(5, 100, Duration::from_millis(100));
267 let processor = ThresholdBatchProcessor::new(config, move |items: &[i32]| {
268 let mut p = processed_clone
269 .safe_lock()
270 .expect("Failed to acquire lock on processed");
271 p.extend_from_slice(items);
272 });
273
274 for i in 0..10 {
276 processor.process(i);
277 }
279
280 processor.flush_batch();
281
282 let stats = processor.stats();
283 tracing::info!("Low frequency stats: {stats:?}");
284
285 assert!(!processor.is_batching_enabled());
287
288 let processed_items = processed
289 .safe_lock()
290 .expect("Failed to acquire lock on processed");
291 assert_eq!(processed_items.len(), 10);
292 }
293
294 #[test]
295 fn test_high_frequency_batch_processing() {
296 let processed = Arc::new(StdMutex::new(Vec::new()));
297 let processed_clone = processed.clone();
298
299 let config = BatchConfig::custom(3, 50, Duration::from_millis(100));
300 let processor = ThresholdBatchProcessor::new(config, move |items: &[i32]| {
301 if let Ok(mut p) = processed_clone.lock() {
302 p.extend_from_slice(items);
303 }
304 });
305
306 for i in 0..20 {
308 processor.process(i);
309 }
311
312 for i in 20..25 {
316 processor.process(i);
317 }
318
319 processor.flush_batch();
320
321 let stats = processor.stats();
322 tracing::info!("High frequency stats: {stats:?}");
323
324 let processed_items = processed
325 .safe_lock()
326 .expect("Failed to acquire lock on processed");
327 assert_eq!(processed_items.len(), 25);
328 }
329
330 #[test]
331 fn test_config_presets() {
332 let low = BatchConfig::low_frequency();
333 assert_eq!(low.frequency_threshold, 100);
334
335 let medium = BatchConfig::medium_frequency();
336 assert_eq!(medium.frequency_threshold, 500);
337
338 let high = BatchConfig::high_frequency();
339 assert_eq!(high.frequency_threshold, 1000);
340 }
341}