oxirs_stream/performance_optimizer/
batching.rs

1//! Adaptive batching for performance optimization
2//!
3//! This module provides adaptive batching capabilities to optimize throughput
4//! and latency by dynamically adjusting batch sizes based on system performance.
5
6use super::config::BatchConfig;
7use crate::StreamEvent;
8use anyhow::Result;
9use serde::{Deserialize, Serialize};
10use std::collections::VecDeque;
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{debug, info};
16
17/// Batch size predictor using historical performance data
18pub struct BatchSizePredictor {
19    config: BatchConfig,
20    performance_history: Arc<RwLock<VecDeque<BatchPerformancePoint>>>,
21    current_batch_size: AtomicUsize,
22    stats: Arc<BatchingStats>,
23}
24
25/// Performance data point for batch processing
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct BatchPerformancePoint {
28    /// Batch size used
29    pub batch_size: usize,
30    /// Processing latency in milliseconds
31    pub latency_ms: u64,
32    /// Throughput in events per second
33    pub throughput_eps: f64,
34    /// CPU utilization percentage
35    pub cpu_usage: f64,
36    /// Memory utilization percentage
37    pub memory_usage: f64,
38    /// Timestamp of measurement (as seconds since Unix epoch)
39    pub timestamp: u64,
40}
41
42/// Batching statistics
43#[derive(Debug)]
44pub struct BatchingStats {
45    /// Total batches processed
46    pub total_batches: AtomicU64,
47    /// Total events processed
48    pub total_events: AtomicU64,
49    /// Average batch size
50    pub average_batch_size: AtomicUsize,
51    /// Current batch size
52    pub current_batch_size: AtomicUsize,
53    /// Peak batch size
54    pub peak_batch_size: AtomicUsize,
55    /// Total processing time
56    pub total_processing_time_ms: AtomicU64,
57    /// Average processing time per batch
58    pub average_processing_time_ms: AtomicU64,
59    /// Throughput (events per second)
60    pub throughput_eps: AtomicU64,
61    /// Peak throughput
62    pub peak_throughput_eps: AtomicU64,
63    /// Number of adjustments made
64    pub adjustments_made: AtomicU64,
65    /// Last adjustment timestamp
66    pub last_adjustment: AtomicU64,
67}
68
69impl Default for BatchingStats {
70    fn default() -> Self {
71        Self {
72            total_batches: AtomicU64::new(0),
73            total_events: AtomicU64::new(0),
74            average_batch_size: AtomicUsize::new(0),
75            current_batch_size: AtomicUsize::new(0),
76            peak_batch_size: AtomicUsize::new(0),
77            total_processing_time_ms: AtomicU64::new(0),
78            average_processing_time_ms: AtomicU64::new(0),
79            throughput_eps: AtomicU64::new(0),
80            peak_throughput_eps: AtomicU64::new(0),
81            adjustments_made: AtomicU64::new(0),
82            last_adjustment: AtomicU64::new(0),
83        }
84    }
85}
86
87impl Default for BatchSizePredictor {
88    fn default() -> Self {
89        Self::new(BatchConfig::default())
90    }
91}
92
93impl BatchSizePredictor {
94    /// Create a new batch size predictor
95    pub fn new(config: BatchConfig) -> Self {
96        Self {
97            current_batch_size: AtomicUsize::new(config.initial_batch_size),
98            config,
99            performance_history: Arc::new(RwLock::new(VecDeque::new())),
100            stats: Arc::new(BatchingStats::default()),
101        }
102    }
103
104    /// Record performance data for a batch
105    pub async fn record_performance(&self, performance: BatchPerformancePoint) -> Result<()> {
106        let mut history = self.performance_history.write().await;
107
108        // Keep only recent history to avoid memory growth
109        if history.len() >= 1000 {
110            history.pop_front();
111        }
112
113        history.push_back(performance.clone());
114
115        // Update statistics
116        self.stats.total_batches.fetch_add(1, Ordering::Relaxed);
117        self.stats
118            .total_events
119            .fetch_add(performance.batch_size as u64, Ordering::Relaxed);
120
121        let processing_time_ms = performance.latency_ms;
122        self.stats
123            .total_processing_time_ms
124            .fetch_add(processing_time_ms, Ordering::Relaxed);
125
126        // Update average processing time
127        let total_batches = self.stats.total_batches.load(Ordering::Relaxed);
128        let total_time = self.stats.total_processing_time_ms.load(Ordering::Relaxed);
129        let avg_time = if total_batches > 0 {
130            total_time / total_batches
131        } else {
132            0
133        };
134        self.stats
135            .average_processing_time_ms
136            .store(avg_time, Ordering::Relaxed);
137
138        // Update throughput
139        let throughput = performance.throughput_eps as u64;
140        self.stats
141            .throughput_eps
142            .store(throughput, Ordering::Relaxed);
143
144        let peak_throughput = self.stats.peak_throughput_eps.load(Ordering::Relaxed);
145        if throughput > peak_throughput {
146            self.stats
147                .peak_throughput_eps
148                .store(throughput, Ordering::Relaxed);
149        }
150
151        debug!("Recorded batch performance: {:?}", performance);
152        Ok(())
153    }
154
155    /// Predict optimal batch size based on historical data
156    pub async fn predict_batch_size(&self) -> Result<usize> {
157        let history = self.performance_history.read().await;
158
159        if history.is_empty() {
160            return Ok(self.config.initial_batch_size);
161        }
162
163        // Simple adaptive algorithm
164        let recent_performance: Vec<_> = history.iter().rev().take(10).collect();
165        let avg_latency: f64 = recent_performance
166            .iter()
167            .map(|p| p.latency_ms as f64)
168            .sum::<f64>()
169            / recent_performance.len() as f64;
170
171        let current_size = self.current_batch_size.load(Ordering::Relaxed);
172        let target_latency = self.config.target_latency_ms as f64;
173        let tolerance = self.config.latency_tolerance_ms as f64;
174
175        let new_size = if avg_latency > target_latency + tolerance {
176            // Latency too high, decrease batch size
177            ((current_size as f64) / self.config.adjustment_factor)
178                .max(self.config.min_batch_size as f64) as usize
179        } else if avg_latency < target_latency - tolerance {
180            // Latency acceptable, try to increase batch size
181            ((current_size as f64) * self.config.adjustment_factor)
182                .min(self.config.max_batch_size as f64) as usize
183        } else {
184            // Latency is within tolerance, keep current size
185            current_size
186        };
187
188        if new_size != current_size {
189            self.current_batch_size.store(new_size, Ordering::Relaxed);
190            self.stats.adjustments_made.fetch_add(1, Ordering::Relaxed);
191            self.stats.last_adjustment.store(
192                std::time::SystemTime::now()
193                    .duration_since(std::time::UNIX_EPOCH)
194                    .unwrap()
195                    .as_secs(),
196                Ordering::Relaxed,
197            );
198
199            info!(
200                "Adjusted batch size from {} to {} (avg_latency: {:.2}ms, target: {}ms)",
201                current_size, new_size, avg_latency, target_latency
202            );
203        }
204
205        Ok(new_size)
206    }
207
208    /// Get current batch size
209    pub fn current_batch_size(&self) -> usize {
210        self.current_batch_size.load(Ordering::Relaxed)
211    }
212
213    /// Get batching statistics
214    pub fn stats(&self) -> BatchingStats {
215        BatchingStats {
216            total_batches: AtomicU64::new(self.stats.total_batches.load(Ordering::Relaxed)),
217            total_events: AtomicU64::new(self.stats.total_events.load(Ordering::Relaxed)),
218            average_batch_size: AtomicUsize::new(
219                self.stats.average_batch_size.load(Ordering::Relaxed),
220            ),
221            current_batch_size: AtomicUsize::new(self.current_batch_size.load(Ordering::Relaxed)),
222            peak_batch_size: AtomicUsize::new(self.stats.peak_batch_size.load(Ordering::Relaxed)),
223            total_processing_time_ms: AtomicU64::new(
224                self.stats.total_processing_time_ms.load(Ordering::Relaxed),
225            ),
226            average_processing_time_ms: AtomicU64::new(
227                self.stats
228                    .average_processing_time_ms
229                    .load(Ordering::Relaxed),
230            ),
231            throughput_eps: AtomicU64::new(self.stats.throughput_eps.load(Ordering::Relaxed)),
232            peak_throughput_eps: AtomicU64::new(
233                self.stats.peak_throughput_eps.load(Ordering::Relaxed),
234            ),
235            adjustments_made: AtomicU64::new(self.stats.adjustments_made.load(Ordering::Relaxed)),
236            last_adjustment: AtomicU64::new(self.stats.last_adjustment.load(Ordering::Relaxed)),
237        }
238    }
239}
240
241/// Adaptive batcher for events
242pub struct AdaptiveBatcher {
243    predictor: BatchSizePredictor,
244    buffer: Arc<RwLock<Vec<StreamEvent>>>,
245    last_flush: Arc<RwLock<Instant>>,
246    flush_interval: Duration,
247}
248
249impl AdaptiveBatcher {
250    /// Create a new adaptive batcher
251    pub fn new(config: BatchConfig, flush_interval: Duration) -> Self {
252        Self {
253            predictor: BatchSizePredictor::new(config),
254            buffer: Arc::new(RwLock::new(Vec::new())),
255            last_flush: Arc::new(RwLock::new(Instant::now())),
256            flush_interval,
257        }
258    }
259
260    /// Add an event to the batch
261    pub async fn add_event(&self, event: StreamEvent) -> Result<Option<Vec<StreamEvent>>> {
262        let mut buffer = self.buffer.write().await;
263        buffer.push(event);
264
265        let target_size = self.predictor.predict_batch_size().await?;
266
267        // Check if we should flush based on size or time
268        let should_flush_size = buffer.len() >= target_size;
269        let should_flush_time = {
270            let last_flush = self.last_flush.read().await;
271            last_flush.elapsed() >= self.flush_interval
272        };
273
274        if should_flush_size || should_flush_time {
275            let batch: Vec<StreamEvent> = buffer.drain(..).collect();
276            let mut last_flush = self.last_flush.write().await;
277            *last_flush = Instant::now();
278
279            debug!(
280                "Flushed batch of {} events (size: {}, time: {})",
281                batch.len(),
282                should_flush_size,
283                should_flush_time
284            );
285
286            Ok(Some(batch))
287        } else {
288            Ok(None)
289        }
290    }
291
292    /// Force flush the current batch
293    pub async fn flush(&self) -> Result<Vec<StreamEvent>> {
294        let mut buffer = self.buffer.write().await;
295        let batch: Vec<StreamEvent> = buffer.drain(..).collect();
296
297        let mut last_flush = self.last_flush.write().await;
298        *last_flush = Instant::now();
299
300        debug!("Force flushed batch of {} events", batch.len());
301        Ok(batch)
302    }
303
304    /// Record performance for the last batch
305    pub async fn record_batch_performance(&self, performance: BatchPerformancePoint) -> Result<()> {
306        self.predictor.record_performance(performance).await
307    }
308
309    /// Get current buffer size
310    pub async fn buffer_size(&self) -> usize {
311        self.buffer.read().await.len()
312    }
313
314    /// Get batching statistics
315    pub fn stats(&self) -> BatchingStats {
316        self.predictor.stats()
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use crate::event::EventMetadata;
324
325    #[tokio::test]
326    async fn test_batch_size_predictor() {
327        let config = BatchConfig::default();
328        let predictor = BatchSizePredictor::new(config);
329
330        assert_eq!(predictor.current_batch_size(), 100);
331
332        let batch_size = predictor.predict_batch_size().await.unwrap();
333        assert_eq!(batch_size, 100);
334    }
335
336    #[tokio::test]
337    async fn test_adaptive_batcher() {
338        let config = BatchConfig::default();
339        let batcher = AdaptiveBatcher::new(config, Duration::from_millis(100));
340
341        let event = StreamEvent::TripleAdded {
342            subject: "test".to_string(),
343            predicate: "test".to_string(),
344            object: "test".to_string(),
345            graph: None,
346            metadata: EventMetadata::default(),
347        };
348
349        let result = batcher.add_event(event).await.unwrap();
350        assert!(result.is_none());
351
352        assert_eq!(batcher.buffer_size().await, 1);
353    }
354
355    #[tokio::test]
356    async fn test_batch_flush() {
357        let config = BatchConfig::default();
358        let batcher = AdaptiveBatcher::new(config, Duration::from_millis(100));
359
360        let event = StreamEvent::TripleAdded {
361            subject: "test".to_string(),
362            predicate: "test".to_string(),
363            object: "test".to_string(),
364            graph: None,
365            metadata: EventMetadata::default(),
366        };
367
368        batcher.add_event(event).await.unwrap();
369
370        let batch = batcher.flush().await.unwrap();
371        assert_eq!(batch.len(), 1);
372
373        assert_eq!(batcher.buffer_size().await, 0);
374    }
375
376    #[tokio::test]
377    async fn test_performance_recording() {
378        let config = BatchConfig::default();
379        let predictor = BatchSizePredictor::new(config);
380
381        let performance = BatchPerformancePoint {
382            batch_size: 100,
383            latency_ms: 5,
384            throughput_eps: 20000.0,
385            cpu_usage: 50.0,
386            memory_usage: 30.0,
387            timestamp: std::time::SystemTime::now()
388                .duration_since(std::time::UNIX_EPOCH)
389                .unwrap()
390                .as_secs(),
391        };
392
393        predictor.record_performance(performance).await.unwrap();
394
395        let stats = predictor.stats();
396        assert_eq!(stats.total_batches.load(Ordering::Relaxed), 1);
397        assert_eq!(stats.total_events.load(Ordering::Relaxed), 100);
398    }
399}