Skip to main content

oxirs_stream/performance_optimizer/
batching.rs

1//! Adaptive batching for performance optimization
2//!
3//! This module provides adaptive batching capabilities to optimize throughput
4//! and latency by dynamically adjusting batch sizes based on system performance.
5
6use super::config::BatchConfig;
7use crate::StreamEvent;
8use anyhow::Result;
9use serde::{Deserialize, Serialize};
10use std::collections::VecDeque;
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{debug, info};
16
17/// Batch size predictor using historical performance data
18pub struct BatchSizePredictor {
19    config: BatchConfig,
20    performance_history: Arc<RwLock<VecDeque<BatchPerformancePoint>>>,
21    current_batch_size: AtomicUsize,
22    stats: Arc<BatchingStats>,
23}
24
25/// Performance data point for batch processing
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct BatchPerformancePoint {
28    /// Batch size used
29    pub batch_size: usize,
30    /// Processing latency in milliseconds
31    pub latency_ms: u64,
32    /// Throughput in events per second
33    pub throughput_eps: f64,
34    /// CPU utilization percentage
35    pub cpu_usage: f64,
36    /// Memory utilization percentage
37    pub memory_usage: f64,
38    /// Timestamp of measurement (as seconds since Unix epoch)
39    pub timestamp: u64,
40}
41
42/// Batching statistics
43#[derive(Debug)]
44pub struct BatchingStats {
45    /// Total batches processed
46    pub total_batches: AtomicU64,
47    /// Total events processed
48    pub total_events: AtomicU64,
49    /// Average batch size
50    pub average_batch_size: AtomicUsize,
51    /// Current batch size
52    pub current_batch_size: AtomicUsize,
53    /// Peak batch size
54    pub peak_batch_size: AtomicUsize,
55    /// Total processing time
56    pub total_processing_time_ms: AtomicU64,
57    /// Average processing time per batch
58    pub average_processing_time_ms: AtomicU64,
59    /// Throughput (events per second)
60    pub throughput_eps: AtomicU64,
61    /// Peak throughput
62    pub peak_throughput_eps: AtomicU64,
63    /// Number of adjustments made
64    pub adjustments_made: AtomicU64,
65    /// Last adjustment timestamp
66    pub last_adjustment: AtomicU64,
67}
68
69impl Default for BatchingStats {
70    fn default() -> Self {
71        Self {
72            total_batches: AtomicU64::new(0),
73            total_events: AtomicU64::new(0),
74            average_batch_size: AtomicUsize::new(0),
75            current_batch_size: AtomicUsize::new(0),
76            peak_batch_size: AtomicUsize::new(0),
77            total_processing_time_ms: AtomicU64::new(0),
78            average_processing_time_ms: AtomicU64::new(0),
79            throughput_eps: AtomicU64::new(0),
80            peak_throughput_eps: AtomicU64::new(0),
81            adjustments_made: AtomicU64::new(0),
82            last_adjustment: AtomicU64::new(0),
83        }
84    }
85}
86
87impl Default for BatchSizePredictor {
88    fn default() -> Self {
89        Self::new(BatchConfig::default())
90    }
91}
92
93impl BatchSizePredictor {
94    /// Create a new batch size predictor
95    pub fn new(config: BatchConfig) -> Self {
96        Self {
97            current_batch_size: AtomicUsize::new(config.initial_batch_size),
98            config,
99            performance_history: Arc::new(RwLock::new(VecDeque::new())),
100            stats: Arc::new(BatchingStats::default()),
101        }
102    }
103
104    /// Record performance data for a batch
105    pub async fn record_performance(&self, performance: BatchPerformancePoint) -> Result<()> {
106        let mut history = self.performance_history.write().await;
107
108        // Keep only recent history to avoid memory growth
109        if history.len() >= 1000 {
110            history.pop_front();
111        }
112
113        history.push_back(performance.clone());
114
115        // Update statistics
116        self.stats.total_batches.fetch_add(1, Ordering::Relaxed);
117        self.stats
118            .total_events
119            .fetch_add(performance.batch_size as u64, Ordering::Relaxed);
120
121        let processing_time_ms = performance.latency_ms;
122        self.stats
123            .total_processing_time_ms
124            .fetch_add(processing_time_ms, Ordering::Relaxed);
125
126        // Update average processing time
127        let total_batches = self.stats.total_batches.load(Ordering::Relaxed);
128        let total_time = self.stats.total_processing_time_ms.load(Ordering::Relaxed);
129        let avg_time = total_time.checked_div(total_batches).unwrap_or(0);
130        self.stats
131            .average_processing_time_ms
132            .store(avg_time, Ordering::Relaxed);
133
134        // Update throughput
135        let throughput = performance.throughput_eps as u64;
136        self.stats
137            .throughput_eps
138            .store(throughput, Ordering::Relaxed);
139
140        let peak_throughput = self.stats.peak_throughput_eps.load(Ordering::Relaxed);
141        if throughput > peak_throughput {
142            self.stats
143                .peak_throughput_eps
144                .store(throughput, Ordering::Relaxed);
145        }
146
147        debug!("Recorded batch performance: {:?}", performance);
148        Ok(())
149    }
150
151    /// Predict optimal batch size based on historical data
152    pub async fn predict_batch_size(&self) -> Result<usize> {
153        let history = self.performance_history.read().await;
154
155        if history.is_empty() {
156            return Ok(self.config.initial_batch_size);
157        }
158
159        // Simple adaptive algorithm
160        let recent_performance: Vec<_> = history.iter().rev().take(10).collect();
161        let avg_latency: f64 = recent_performance
162            .iter()
163            .map(|p| p.latency_ms as f64)
164            .sum::<f64>()
165            / recent_performance.len() as f64;
166
167        let current_size = self.current_batch_size.load(Ordering::Relaxed);
168        let target_latency = self.config.target_latency_ms as f64;
169        let tolerance = self.config.latency_tolerance_ms as f64;
170
171        let new_size = if avg_latency > target_latency + tolerance {
172            // Latency too high, decrease batch size
173            ((current_size as f64) / self.config.adjustment_factor)
174                .max(self.config.min_batch_size as f64) as usize
175        } else if avg_latency < target_latency - tolerance {
176            // Latency acceptable, try to increase batch size
177            ((current_size as f64) * self.config.adjustment_factor)
178                .min(self.config.max_batch_size as f64) as usize
179        } else {
180            // Latency is within tolerance, keep current size
181            current_size
182        };
183
184        if new_size != current_size {
185            self.current_batch_size.store(new_size, Ordering::Relaxed);
186            self.stats.adjustments_made.fetch_add(1, Ordering::Relaxed);
187            self.stats.last_adjustment.store(
188                std::time::SystemTime::now()
189                    .duration_since(std::time::UNIX_EPOCH)
190                    .expect("SystemTime should be after UNIX_EPOCH")
191                    .as_secs(),
192                Ordering::Relaxed,
193            );
194
195            info!(
196                "Adjusted batch size from {} to {} (avg_latency: {:.2}ms, target: {}ms)",
197                current_size, new_size, avg_latency, target_latency
198            );
199        }
200
201        Ok(new_size)
202    }
203
204    /// Get current batch size
205    pub fn current_batch_size(&self) -> usize {
206        self.current_batch_size.load(Ordering::Relaxed)
207    }
208
209    /// Get batching statistics
210    pub fn stats(&self) -> BatchingStats {
211        BatchingStats {
212            total_batches: AtomicU64::new(self.stats.total_batches.load(Ordering::Relaxed)),
213            total_events: AtomicU64::new(self.stats.total_events.load(Ordering::Relaxed)),
214            average_batch_size: AtomicUsize::new(
215                self.stats.average_batch_size.load(Ordering::Relaxed),
216            ),
217            current_batch_size: AtomicUsize::new(self.current_batch_size.load(Ordering::Relaxed)),
218            peak_batch_size: AtomicUsize::new(self.stats.peak_batch_size.load(Ordering::Relaxed)),
219            total_processing_time_ms: AtomicU64::new(
220                self.stats.total_processing_time_ms.load(Ordering::Relaxed),
221            ),
222            average_processing_time_ms: AtomicU64::new(
223                self.stats
224                    .average_processing_time_ms
225                    .load(Ordering::Relaxed),
226            ),
227            throughput_eps: AtomicU64::new(self.stats.throughput_eps.load(Ordering::Relaxed)),
228            peak_throughput_eps: AtomicU64::new(
229                self.stats.peak_throughput_eps.load(Ordering::Relaxed),
230            ),
231            adjustments_made: AtomicU64::new(self.stats.adjustments_made.load(Ordering::Relaxed)),
232            last_adjustment: AtomicU64::new(self.stats.last_adjustment.load(Ordering::Relaxed)),
233        }
234    }
235}
236
237/// Adaptive batcher for events
238pub struct AdaptiveBatcher {
239    predictor: BatchSizePredictor,
240    buffer: Arc<RwLock<Vec<StreamEvent>>>,
241    last_flush: Arc<RwLock<Instant>>,
242    flush_interval: Duration,
243}
244
245impl AdaptiveBatcher {
246    /// Create a new adaptive batcher
247    pub fn new(config: BatchConfig, flush_interval: Duration) -> Self {
248        Self {
249            predictor: BatchSizePredictor::new(config),
250            buffer: Arc::new(RwLock::new(Vec::new())),
251            last_flush: Arc::new(RwLock::new(Instant::now())),
252            flush_interval,
253        }
254    }
255
256    /// Add an event to the batch
257    pub async fn add_event(&self, event: StreamEvent) -> Result<Option<Vec<StreamEvent>>> {
258        let mut buffer = self.buffer.write().await;
259        buffer.push(event);
260
261        let target_size = self.predictor.predict_batch_size().await?;
262
263        // Check if we should flush based on size or time
264        let should_flush_size = buffer.len() >= target_size;
265        let should_flush_time = {
266            let last_flush = self.last_flush.read().await;
267            last_flush.elapsed() >= self.flush_interval
268        };
269
270        if should_flush_size || should_flush_time {
271            let batch: Vec<StreamEvent> = buffer.drain(..).collect();
272            let mut last_flush = self.last_flush.write().await;
273            *last_flush = Instant::now();
274
275            debug!(
276                "Flushed batch of {} events (size: {}, time: {})",
277                batch.len(),
278                should_flush_size,
279                should_flush_time
280            );
281
282            Ok(Some(batch))
283        } else {
284            Ok(None)
285        }
286    }
287
288    /// Force flush the current batch
289    pub async fn flush(&self) -> Result<Vec<StreamEvent>> {
290        let mut buffer = self.buffer.write().await;
291        let batch: Vec<StreamEvent> = buffer.drain(..).collect();
292
293        let mut last_flush = self.last_flush.write().await;
294        *last_flush = Instant::now();
295
296        debug!("Force flushed batch of {} events", batch.len());
297        Ok(batch)
298    }
299
300    /// Record performance for the last batch
301    pub async fn record_batch_performance(&self, performance: BatchPerformancePoint) -> Result<()> {
302        self.predictor.record_performance(performance).await
303    }
304
305    /// Get current buffer size
306    pub async fn buffer_size(&self) -> usize {
307        self.buffer.read().await.len()
308    }
309
310    /// Get batching statistics
311    pub fn stats(&self) -> BatchingStats {
312        self.predictor.stats()
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use crate::event::EventMetadata;
320
321    #[tokio::test]
322    async fn test_batch_size_predictor() {
323        let config = BatchConfig::default();
324        let predictor = BatchSizePredictor::new(config);
325
326        assert_eq!(predictor.current_batch_size(), 100);
327
328        let batch_size = predictor.predict_batch_size().await.unwrap();
329        assert_eq!(batch_size, 100);
330    }
331
332    #[tokio::test]
333    async fn test_adaptive_batcher() {
334        let config = BatchConfig::default();
335        let batcher = AdaptiveBatcher::new(config, Duration::from_millis(100));
336
337        let event = StreamEvent::TripleAdded {
338            subject: "test".to_string(),
339            predicate: "test".to_string(),
340            object: "test".to_string(),
341            graph: None,
342            metadata: EventMetadata::default(),
343        };
344
345        let result = batcher.add_event(event).await.unwrap();
346        assert!(result.is_none());
347
348        assert_eq!(batcher.buffer_size().await, 1);
349    }
350
351    #[tokio::test]
352    async fn test_batch_flush() {
353        let config = BatchConfig::default();
354        let batcher = AdaptiveBatcher::new(config, Duration::from_millis(100));
355
356        let event = StreamEvent::TripleAdded {
357            subject: "test".to_string(),
358            predicate: "test".to_string(),
359            object: "test".to_string(),
360            graph: None,
361            metadata: EventMetadata::default(),
362        };
363
364        batcher.add_event(event).await.unwrap();
365
366        let batch = batcher.flush().await.unwrap();
367        assert_eq!(batch.len(), 1);
368
369        assert_eq!(batcher.buffer_size().await, 0);
370    }
371
372    #[tokio::test]
373    async fn test_performance_recording() {
374        let config = BatchConfig::default();
375        let predictor = BatchSizePredictor::new(config);
376
377        let performance = BatchPerformancePoint {
378            batch_size: 100,
379            latency_ms: 5,
380            throughput_eps: 20000.0,
381            cpu_usage: 50.0,
382            memory_usage: 30.0,
383            timestamp: std::time::SystemTime::now()
384                .duration_since(std::time::UNIX_EPOCH)
385                .unwrap()
386                .as_secs(),
387        };
388
389        predictor.record_performance(performance).await.unwrap();
390
391        let stats = predictor.stats();
392        assert_eq!(stats.total_batches.load(Ordering::Relaxed), 1);
393        assert_eq!(stats.total_events.load(Ordering::Relaxed), 100);
394    }
395}