oxirs_stream/performance_optimizer/
batching.rs1use super::config::BatchConfig;
7use crate::StreamEvent;
8use anyhow::Result;
9use serde::{Deserialize, Serialize};
10use std::collections::VecDeque;
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{debug, info};
16
17pub struct BatchSizePredictor {
19 config: BatchConfig,
20 performance_history: Arc<RwLock<VecDeque<BatchPerformancePoint>>>,
21 current_batch_size: AtomicUsize,
22 stats: Arc<BatchingStats>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct BatchPerformancePoint {
28 pub batch_size: usize,
30 pub latency_ms: u64,
32 pub throughput_eps: f64,
34 pub cpu_usage: f64,
36 pub memory_usage: f64,
38 pub timestamp: u64,
40}
41
42#[derive(Debug)]
44pub struct BatchingStats {
45 pub total_batches: AtomicU64,
47 pub total_events: AtomicU64,
49 pub average_batch_size: AtomicUsize,
51 pub current_batch_size: AtomicUsize,
53 pub peak_batch_size: AtomicUsize,
55 pub total_processing_time_ms: AtomicU64,
57 pub average_processing_time_ms: AtomicU64,
59 pub throughput_eps: AtomicU64,
61 pub peak_throughput_eps: AtomicU64,
63 pub adjustments_made: AtomicU64,
65 pub last_adjustment: AtomicU64,
67}
68
69impl Default for BatchingStats {
70 fn default() -> Self {
71 Self {
72 total_batches: AtomicU64::new(0),
73 total_events: AtomicU64::new(0),
74 average_batch_size: AtomicUsize::new(0),
75 current_batch_size: AtomicUsize::new(0),
76 peak_batch_size: AtomicUsize::new(0),
77 total_processing_time_ms: AtomicU64::new(0),
78 average_processing_time_ms: AtomicU64::new(0),
79 throughput_eps: AtomicU64::new(0),
80 peak_throughput_eps: AtomicU64::new(0),
81 adjustments_made: AtomicU64::new(0),
82 last_adjustment: AtomicU64::new(0),
83 }
84 }
85}
86
87impl Default for BatchSizePredictor {
88 fn default() -> Self {
89 Self::new(BatchConfig::default())
90 }
91}
92
93impl BatchSizePredictor {
94 pub fn new(config: BatchConfig) -> Self {
96 Self {
97 current_batch_size: AtomicUsize::new(config.initial_batch_size),
98 config,
99 performance_history: Arc::new(RwLock::new(VecDeque::new())),
100 stats: Arc::new(BatchingStats::default()),
101 }
102 }
103
104 pub async fn record_performance(&self, performance: BatchPerformancePoint) -> Result<()> {
106 let mut history = self.performance_history.write().await;
107
108 if history.len() >= 1000 {
110 history.pop_front();
111 }
112
113 history.push_back(performance.clone());
114
115 self.stats.total_batches.fetch_add(1, Ordering::Relaxed);
117 self.stats
118 .total_events
119 .fetch_add(performance.batch_size as u64, Ordering::Relaxed);
120
121 let processing_time_ms = performance.latency_ms;
122 self.stats
123 .total_processing_time_ms
124 .fetch_add(processing_time_ms, Ordering::Relaxed);
125
126 let total_batches = self.stats.total_batches.load(Ordering::Relaxed);
128 let total_time = self.stats.total_processing_time_ms.load(Ordering::Relaxed);
129 let avg_time = total_time.checked_div(total_batches).unwrap_or(0);
130 self.stats
131 .average_processing_time_ms
132 .store(avg_time, Ordering::Relaxed);
133
134 let throughput = performance.throughput_eps as u64;
136 self.stats
137 .throughput_eps
138 .store(throughput, Ordering::Relaxed);
139
140 let peak_throughput = self.stats.peak_throughput_eps.load(Ordering::Relaxed);
141 if throughput > peak_throughput {
142 self.stats
143 .peak_throughput_eps
144 .store(throughput, Ordering::Relaxed);
145 }
146
147 debug!("Recorded batch performance: {:?}", performance);
148 Ok(())
149 }
150
151 pub async fn predict_batch_size(&self) -> Result<usize> {
153 let history = self.performance_history.read().await;
154
155 if history.is_empty() {
156 return Ok(self.config.initial_batch_size);
157 }
158
159 let recent_performance: Vec<_> = history.iter().rev().take(10).collect();
161 let avg_latency: f64 = recent_performance
162 .iter()
163 .map(|p| p.latency_ms as f64)
164 .sum::<f64>()
165 / recent_performance.len() as f64;
166
167 let current_size = self.current_batch_size.load(Ordering::Relaxed);
168 let target_latency = self.config.target_latency_ms as f64;
169 let tolerance = self.config.latency_tolerance_ms as f64;
170
171 let new_size = if avg_latency > target_latency + tolerance {
172 ((current_size as f64) / self.config.adjustment_factor)
174 .max(self.config.min_batch_size as f64) as usize
175 } else if avg_latency < target_latency - tolerance {
176 ((current_size as f64) * self.config.adjustment_factor)
178 .min(self.config.max_batch_size as f64) as usize
179 } else {
180 current_size
182 };
183
184 if new_size != current_size {
185 self.current_batch_size.store(new_size, Ordering::Relaxed);
186 self.stats.adjustments_made.fetch_add(1, Ordering::Relaxed);
187 self.stats.last_adjustment.store(
188 std::time::SystemTime::now()
189 .duration_since(std::time::UNIX_EPOCH)
190 .expect("SystemTime should be after UNIX_EPOCH")
191 .as_secs(),
192 Ordering::Relaxed,
193 );
194
195 info!(
196 "Adjusted batch size from {} to {} (avg_latency: {:.2}ms, target: {}ms)",
197 current_size, new_size, avg_latency, target_latency
198 );
199 }
200
201 Ok(new_size)
202 }
203
204 pub fn current_batch_size(&self) -> usize {
206 self.current_batch_size.load(Ordering::Relaxed)
207 }
208
209 pub fn stats(&self) -> BatchingStats {
211 BatchingStats {
212 total_batches: AtomicU64::new(self.stats.total_batches.load(Ordering::Relaxed)),
213 total_events: AtomicU64::new(self.stats.total_events.load(Ordering::Relaxed)),
214 average_batch_size: AtomicUsize::new(
215 self.stats.average_batch_size.load(Ordering::Relaxed),
216 ),
217 current_batch_size: AtomicUsize::new(self.current_batch_size.load(Ordering::Relaxed)),
218 peak_batch_size: AtomicUsize::new(self.stats.peak_batch_size.load(Ordering::Relaxed)),
219 total_processing_time_ms: AtomicU64::new(
220 self.stats.total_processing_time_ms.load(Ordering::Relaxed),
221 ),
222 average_processing_time_ms: AtomicU64::new(
223 self.stats
224 .average_processing_time_ms
225 .load(Ordering::Relaxed),
226 ),
227 throughput_eps: AtomicU64::new(self.stats.throughput_eps.load(Ordering::Relaxed)),
228 peak_throughput_eps: AtomicU64::new(
229 self.stats.peak_throughput_eps.load(Ordering::Relaxed),
230 ),
231 adjustments_made: AtomicU64::new(self.stats.adjustments_made.load(Ordering::Relaxed)),
232 last_adjustment: AtomicU64::new(self.stats.last_adjustment.load(Ordering::Relaxed)),
233 }
234 }
235}
236
237pub struct AdaptiveBatcher {
239 predictor: BatchSizePredictor,
240 buffer: Arc<RwLock<Vec<StreamEvent>>>,
241 last_flush: Arc<RwLock<Instant>>,
242 flush_interval: Duration,
243}
244
245impl AdaptiveBatcher {
246 pub fn new(config: BatchConfig, flush_interval: Duration) -> Self {
248 Self {
249 predictor: BatchSizePredictor::new(config),
250 buffer: Arc::new(RwLock::new(Vec::new())),
251 last_flush: Arc::new(RwLock::new(Instant::now())),
252 flush_interval,
253 }
254 }
255
256 pub async fn add_event(&self, event: StreamEvent) -> Result<Option<Vec<StreamEvent>>> {
258 let mut buffer = self.buffer.write().await;
259 buffer.push(event);
260
261 let target_size = self.predictor.predict_batch_size().await?;
262
263 let should_flush_size = buffer.len() >= target_size;
265 let should_flush_time = {
266 let last_flush = self.last_flush.read().await;
267 last_flush.elapsed() >= self.flush_interval
268 };
269
270 if should_flush_size || should_flush_time {
271 let batch: Vec<StreamEvent> = buffer.drain(..).collect();
272 let mut last_flush = self.last_flush.write().await;
273 *last_flush = Instant::now();
274
275 debug!(
276 "Flushed batch of {} events (size: {}, time: {})",
277 batch.len(),
278 should_flush_size,
279 should_flush_time
280 );
281
282 Ok(Some(batch))
283 } else {
284 Ok(None)
285 }
286 }
287
288 pub async fn flush(&self) -> Result<Vec<StreamEvent>> {
290 let mut buffer = self.buffer.write().await;
291 let batch: Vec<StreamEvent> = buffer.drain(..).collect();
292
293 let mut last_flush = self.last_flush.write().await;
294 *last_flush = Instant::now();
295
296 debug!("Force flushed batch of {} events", batch.len());
297 Ok(batch)
298 }
299
300 pub async fn record_batch_performance(&self, performance: BatchPerformancePoint) -> Result<()> {
302 self.predictor.record_performance(performance).await
303 }
304
305 pub async fn buffer_size(&self) -> usize {
307 self.buffer.read().await.len()
308 }
309
310 pub fn stats(&self) -> BatchingStats {
312 self.predictor.stats()
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use crate::event::EventMetadata;
320
321 #[tokio::test]
322 async fn test_batch_size_predictor() {
323 let config = BatchConfig::default();
324 let predictor = BatchSizePredictor::new(config);
325
326 assert_eq!(predictor.current_batch_size(), 100);
327
328 let batch_size = predictor.predict_batch_size().await.unwrap();
329 assert_eq!(batch_size, 100);
330 }
331
332 #[tokio::test]
333 async fn test_adaptive_batcher() {
334 let config = BatchConfig::default();
335 let batcher = AdaptiveBatcher::new(config, Duration::from_millis(100));
336
337 let event = StreamEvent::TripleAdded {
338 subject: "test".to_string(),
339 predicate: "test".to_string(),
340 object: "test".to_string(),
341 graph: None,
342 metadata: EventMetadata::default(),
343 };
344
345 let result = batcher.add_event(event).await.unwrap();
346 assert!(result.is_none());
347
348 assert_eq!(batcher.buffer_size().await, 1);
349 }
350
351 #[tokio::test]
352 async fn test_batch_flush() {
353 let config = BatchConfig::default();
354 let batcher = AdaptiveBatcher::new(config, Duration::from_millis(100));
355
356 let event = StreamEvent::TripleAdded {
357 subject: "test".to_string(),
358 predicate: "test".to_string(),
359 object: "test".to_string(),
360 graph: None,
361 metadata: EventMetadata::default(),
362 };
363
364 batcher.add_event(event).await.unwrap();
365
366 let batch = batcher.flush().await.unwrap();
367 assert_eq!(batch.len(), 1);
368
369 assert_eq!(batcher.buffer_size().await, 0);
370 }
371
372 #[tokio::test]
373 async fn test_performance_recording() {
374 let config = BatchConfig::default();
375 let predictor = BatchSizePredictor::new(config);
376
377 let performance = BatchPerformancePoint {
378 batch_size: 100,
379 latency_ms: 5,
380 throughput_eps: 20000.0,
381 cpu_usage: 50.0,
382 memory_usage: 30.0,
383 timestamp: std::time::SystemTime::now()
384 .duration_since(std::time::UNIX_EPOCH)
385 .unwrap()
386 .as_secs(),
387 };
388
389 predictor.record_performance(performance).await.unwrap();
390
391 let stats = predictor.stats();
392 assert_eq!(stats.total_batches.load(Ordering::Relaxed), 1);
393 assert_eq!(stats.total_events.load(Ordering::Relaxed), 100);
394 }
395}