oxirs_stream/performance_optimizer/
batching.rs1use super::config::BatchConfig;
7use crate::StreamEvent;
8use anyhow::Result;
9use serde::{Deserialize, Serialize};
10use std::collections::VecDeque;
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14use tokio::sync::RwLock;
15use tracing::{debug, info};
16
17pub struct BatchSizePredictor {
19 config: BatchConfig,
20 performance_history: Arc<RwLock<VecDeque<BatchPerformancePoint>>>,
21 current_batch_size: AtomicUsize,
22 stats: Arc<BatchingStats>,
23}
24
25#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct BatchPerformancePoint {
28 pub batch_size: usize,
30 pub latency_ms: u64,
32 pub throughput_eps: f64,
34 pub cpu_usage: f64,
36 pub memory_usage: f64,
38 pub timestamp: u64,
40}
41
42#[derive(Debug)]
44pub struct BatchingStats {
45 pub total_batches: AtomicU64,
47 pub total_events: AtomicU64,
49 pub average_batch_size: AtomicUsize,
51 pub current_batch_size: AtomicUsize,
53 pub peak_batch_size: AtomicUsize,
55 pub total_processing_time_ms: AtomicU64,
57 pub average_processing_time_ms: AtomicU64,
59 pub throughput_eps: AtomicU64,
61 pub peak_throughput_eps: AtomicU64,
63 pub adjustments_made: AtomicU64,
65 pub last_adjustment: AtomicU64,
67}
68
69impl Default for BatchingStats {
70 fn default() -> Self {
71 Self {
72 total_batches: AtomicU64::new(0),
73 total_events: AtomicU64::new(0),
74 average_batch_size: AtomicUsize::new(0),
75 current_batch_size: AtomicUsize::new(0),
76 peak_batch_size: AtomicUsize::new(0),
77 total_processing_time_ms: AtomicU64::new(0),
78 average_processing_time_ms: AtomicU64::new(0),
79 throughput_eps: AtomicU64::new(0),
80 peak_throughput_eps: AtomicU64::new(0),
81 adjustments_made: AtomicU64::new(0),
82 last_adjustment: AtomicU64::new(0),
83 }
84 }
85}
86
87impl Default for BatchSizePredictor {
88 fn default() -> Self {
89 Self::new(BatchConfig::default())
90 }
91}
92
93impl BatchSizePredictor {
94 pub fn new(config: BatchConfig) -> Self {
96 Self {
97 current_batch_size: AtomicUsize::new(config.initial_batch_size),
98 config,
99 performance_history: Arc::new(RwLock::new(VecDeque::new())),
100 stats: Arc::new(BatchingStats::default()),
101 }
102 }
103
104 pub async fn record_performance(&self, performance: BatchPerformancePoint) -> Result<()> {
106 let mut history = self.performance_history.write().await;
107
108 if history.len() >= 1000 {
110 history.pop_front();
111 }
112
113 history.push_back(performance.clone());
114
115 self.stats.total_batches.fetch_add(1, Ordering::Relaxed);
117 self.stats
118 .total_events
119 .fetch_add(performance.batch_size as u64, Ordering::Relaxed);
120
121 let processing_time_ms = performance.latency_ms;
122 self.stats
123 .total_processing_time_ms
124 .fetch_add(processing_time_ms, Ordering::Relaxed);
125
126 let total_batches = self.stats.total_batches.load(Ordering::Relaxed);
128 let total_time = self.stats.total_processing_time_ms.load(Ordering::Relaxed);
129 let avg_time = if total_batches > 0 {
130 total_time / total_batches
131 } else {
132 0
133 };
134 self.stats
135 .average_processing_time_ms
136 .store(avg_time, Ordering::Relaxed);
137
138 let throughput = performance.throughput_eps as u64;
140 self.stats
141 .throughput_eps
142 .store(throughput, Ordering::Relaxed);
143
144 let peak_throughput = self.stats.peak_throughput_eps.load(Ordering::Relaxed);
145 if throughput > peak_throughput {
146 self.stats
147 .peak_throughput_eps
148 .store(throughput, Ordering::Relaxed);
149 }
150
151 debug!("Recorded batch performance: {:?}", performance);
152 Ok(())
153 }
154
155 pub async fn predict_batch_size(&self) -> Result<usize> {
157 let history = self.performance_history.read().await;
158
159 if history.is_empty() {
160 return Ok(self.config.initial_batch_size);
161 }
162
163 let recent_performance: Vec<_> = history.iter().rev().take(10).collect();
165 let avg_latency: f64 = recent_performance
166 .iter()
167 .map(|p| p.latency_ms as f64)
168 .sum::<f64>()
169 / recent_performance.len() as f64;
170
171 let current_size = self.current_batch_size.load(Ordering::Relaxed);
172 let target_latency = self.config.target_latency_ms as f64;
173 let tolerance = self.config.latency_tolerance_ms as f64;
174
175 let new_size = if avg_latency > target_latency + tolerance {
176 ((current_size as f64) / self.config.adjustment_factor)
178 .max(self.config.min_batch_size as f64) as usize
179 } else if avg_latency < target_latency - tolerance {
180 ((current_size as f64) * self.config.adjustment_factor)
182 .min(self.config.max_batch_size as f64) as usize
183 } else {
184 current_size
186 };
187
188 if new_size != current_size {
189 self.current_batch_size.store(new_size, Ordering::Relaxed);
190 self.stats.adjustments_made.fetch_add(1, Ordering::Relaxed);
191 self.stats.last_adjustment.store(
192 std::time::SystemTime::now()
193 .duration_since(std::time::UNIX_EPOCH)
194 .unwrap()
195 .as_secs(),
196 Ordering::Relaxed,
197 );
198
199 info!(
200 "Adjusted batch size from {} to {} (avg_latency: {:.2}ms, target: {}ms)",
201 current_size, new_size, avg_latency, target_latency
202 );
203 }
204
205 Ok(new_size)
206 }
207
208 pub fn current_batch_size(&self) -> usize {
210 self.current_batch_size.load(Ordering::Relaxed)
211 }
212
213 pub fn stats(&self) -> BatchingStats {
215 BatchingStats {
216 total_batches: AtomicU64::new(self.stats.total_batches.load(Ordering::Relaxed)),
217 total_events: AtomicU64::new(self.stats.total_events.load(Ordering::Relaxed)),
218 average_batch_size: AtomicUsize::new(
219 self.stats.average_batch_size.load(Ordering::Relaxed),
220 ),
221 current_batch_size: AtomicUsize::new(self.current_batch_size.load(Ordering::Relaxed)),
222 peak_batch_size: AtomicUsize::new(self.stats.peak_batch_size.load(Ordering::Relaxed)),
223 total_processing_time_ms: AtomicU64::new(
224 self.stats.total_processing_time_ms.load(Ordering::Relaxed),
225 ),
226 average_processing_time_ms: AtomicU64::new(
227 self.stats
228 .average_processing_time_ms
229 .load(Ordering::Relaxed),
230 ),
231 throughput_eps: AtomicU64::new(self.stats.throughput_eps.load(Ordering::Relaxed)),
232 peak_throughput_eps: AtomicU64::new(
233 self.stats.peak_throughput_eps.load(Ordering::Relaxed),
234 ),
235 adjustments_made: AtomicU64::new(self.stats.adjustments_made.load(Ordering::Relaxed)),
236 last_adjustment: AtomicU64::new(self.stats.last_adjustment.load(Ordering::Relaxed)),
237 }
238 }
239}
240
241pub struct AdaptiveBatcher {
243 predictor: BatchSizePredictor,
244 buffer: Arc<RwLock<Vec<StreamEvent>>>,
245 last_flush: Arc<RwLock<Instant>>,
246 flush_interval: Duration,
247}
248
249impl AdaptiveBatcher {
250 pub fn new(config: BatchConfig, flush_interval: Duration) -> Self {
252 Self {
253 predictor: BatchSizePredictor::new(config),
254 buffer: Arc::new(RwLock::new(Vec::new())),
255 last_flush: Arc::new(RwLock::new(Instant::now())),
256 flush_interval,
257 }
258 }
259
260 pub async fn add_event(&self, event: StreamEvent) -> Result<Option<Vec<StreamEvent>>> {
262 let mut buffer = self.buffer.write().await;
263 buffer.push(event);
264
265 let target_size = self.predictor.predict_batch_size().await?;
266
267 let should_flush_size = buffer.len() >= target_size;
269 let should_flush_time = {
270 let last_flush = self.last_flush.read().await;
271 last_flush.elapsed() >= self.flush_interval
272 };
273
274 if should_flush_size || should_flush_time {
275 let batch: Vec<StreamEvent> = buffer.drain(..).collect();
276 let mut last_flush = self.last_flush.write().await;
277 *last_flush = Instant::now();
278
279 debug!(
280 "Flushed batch of {} events (size: {}, time: {})",
281 batch.len(),
282 should_flush_size,
283 should_flush_time
284 );
285
286 Ok(Some(batch))
287 } else {
288 Ok(None)
289 }
290 }
291
292 pub async fn flush(&self) -> Result<Vec<StreamEvent>> {
294 let mut buffer = self.buffer.write().await;
295 let batch: Vec<StreamEvent> = buffer.drain(..).collect();
296
297 let mut last_flush = self.last_flush.write().await;
298 *last_flush = Instant::now();
299
300 debug!("Force flushed batch of {} events", batch.len());
301 Ok(batch)
302 }
303
304 pub async fn record_batch_performance(&self, performance: BatchPerformancePoint) -> Result<()> {
306 self.predictor.record_performance(performance).await
307 }
308
309 pub async fn buffer_size(&self) -> usize {
311 self.buffer.read().await.len()
312 }
313
314 pub fn stats(&self) -> BatchingStats {
316 self.predictor.stats()
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323 use crate::event::EventMetadata;
324
325 #[tokio::test]
326 async fn test_batch_size_predictor() {
327 let config = BatchConfig::default();
328 let predictor = BatchSizePredictor::new(config);
329
330 assert_eq!(predictor.current_batch_size(), 100);
331
332 let batch_size = predictor.predict_batch_size().await.unwrap();
333 assert_eq!(batch_size, 100);
334 }
335
336 #[tokio::test]
337 async fn test_adaptive_batcher() {
338 let config = BatchConfig::default();
339 let batcher = AdaptiveBatcher::new(config, Duration::from_millis(100));
340
341 let event = StreamEvent::TripleAdded {
342 subject: "test".to_string(),
343 predicate: "test".to_string(),
344 object: "test".to_string(),
345 graph: None,
346 metadata: EventMetadata::default(),
347 };
348
349 let result = batcher.add_event(event).await.unwrap();
350 assert!(result.is_none());
351
352 assert_eq!(batcher.buffer_size().await, 1);
353 }
354
355 #[tokio::test]
356 async fn test_batch_flush() {
357 let config = BatchConfig::default();
358 let batcher = AdaptiveBatcher::new(config, Duration::from_millis(100));
359
360 let event = StreamEvent::TripleAdded {
361 subject: "test".to_string(),
362 predicate: "test".to_string(),
363 object: "test".to_string(),
364 graph: None,
365 metadata: EventMetadata::default(),
366 };
367
368 batcher.add_event(event).await.unwrap();
369
370 let batch = batcher.flush().await.unwrap();
371 assert_eq!(batch.len(), 1);
372
373 assert_eq!(batcher.buffer_size().await, 0);
374 }
375
376 #[tokio::test]
377 async fn test_performance_recording() {
378 let config = BatchConfig::default();
379 let predictor = BatchSizePredictor::new(config);
380
381 let performance = BatchPerformancePoint {
382 batch_size: 100,
383 latency_ms: 5,
384 throughput_eps: 20000.0,
385 cpu_usage: 50.0,
386 memory_usage: 30.0,
387 timestamp: std::time::SystemTime::now()
388 .duration_since(std::time::UNIX_EPOCH)
389 .unwrap()
390 .as_secs(),
391 };
392
393 predictor.record_performance(performance).await.unwrap();
394
395 let stats = predictor.stats();
396 assert_eq!(stats.total_batches.load(Ordering::Relaxed), 1);
397 assert_eq!(stats.total_events.load(Ordering::Relaxed), 100);
398 }
399}