allsource_core/infrastructure/persistence/
batch_processor.rs1use crate::domain::entities::Event;
14use crate::error::{AllSourceError, Result};
15use crate::infrastructure::persistence::lock_free::{LockFreeMetrics, ShardedEventQueue};
16use crate::infrastructure::persistence::simd_json::{SimdJsonParser, SimdJsonStats};
17use bumpalo::Bump;
18use serde::Deserialize;
19use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23#[derive(Debug, Clone)]
25pub struct BatchProcessorConfig {
26 pub max_batch_size: usize,
28 pub queue_capacity: usize,
30 pub shard_count: usize,
32 pub arena_size: usize,
34 pub simd_enabled: bool,
36}
37
38impl Default for BatchProcessorConfig {
39 fn default() -> Self {
40 Self {
41 max_batch_size: 10_000,
42 queue_capacity: 1_000_000,
43 shard_count: 16,
44 arena_size: 64 * 1024 * 1024, simd_enabled: true,
46 }
47 }
48}
49
50impl BatchProcessorConfig {
51 pub fn high_throughput() -> Self {
53 Self {
54 max_batch_size: 50_000,
55 queue_capacity: 10_000_000,
56 shard_count: 32,
57 arena_size: 256 * 1024 * 1024, simd_enabled: true,
59 }
60 }
61
62 pub fn low_latency() -> Self {
64 Self {
65 max_batch_size: 1_000,
66 queue_capacity: 100_000,
67 shard_count: 8,
68 arena_size: 16 * 1024 * 1024, simd_enabled: true,
70 }
71 }
72}
73
74#[derive(Debug, Clone, Deserialize)]
76pub struct RawEventData {
77 pub event_type: String,
78 pub entity_id: String,
79 #[serde(default = "default_stream")]
80 pub stream_id: String,
81 pub data: serde_json::Value,
82 #[serde(default)]
83 pub metadata: Option<serde_json::Value>,
84}
85
86fn default_stream() -> String {
87 "default".to_string()
88}
89
90#[derive(Debug, Clone)]
92pub struct BatchProcessorStats {
93 pub batches_processed: u64,
95 pub events_processed: u64,
97 pub bytes_parsed: u64,
99 pub avg_batch_size: f64,
101 pub events_per_sec: f64,
103 pub parse_throughput_mbps: f64,
105 pub queue_depth: usize,
107 pub total_time_ns: u64,
109}
110
111pub struct BatchProcessor {
116 config: BatchProcessorConfig,
117 json_parser: SimdJsonParser,
119 event_queue: ShardedEventQueue,
121 metrics: Arc<LockFreeMetrics>,
123 batches_processed: AtomicU64,
125 events_processed: AtomicU64,
126 bytes_parsed: AtomicU64,
127 total_time_ns: AtomicU64,
128 arena_index: AtomicUsize,
130}
131
132impl BatchProcessor {
133 pub fn new() -> Self {
135 Self::with_config(BatchProcessorConfig::default())
136 }
137
138 pub fn with_config(config: BatchProcessorConfig) -> Self {
140 let event_queue = ShardedEventQueue::with_shards(config.queue_capacity, config.shard_count);
141
142 Self {
143 config,
144 json_parser: SimdJsonParser::new(),
145 event_queue,
146 metrics: Arc::new(LockFreeMetrics::new()),
147 batches_processed: AtomicU64::new(0),
148 events_processed: AtomicU64::new(0),
149 bytes_parsed: AtomicU64::new(0),
150 total_time_ns: AtomicU64::new(0),
151 arena_index: AtomicUsize::new(0),
152 }
153 }
154
155 pub fn process_batch(&self, json_events: Vec<String>) -> BatchResult {
166 let start = Instant::now();
167 let batch_size = json_events.len();
168
169 let mut success_count = 0;
170 let mut failure_count = 0;
171 let mut bytes_parsed = 0usize;
172
173 for json_str in json_events {
174 bytes_parsed += json_str.len();
175
176 match self.parse_and_queue_event(json_str) {
177 Ok(()) => {
178 success_count += 1;
179 self.metrics.record_ingest();
180 }
181 Err(_) => {
182 failure_count += 1;
183 self.metrics.record_error();
184 }
185 }
186 }
187
188 let duration = start.elapsed();
189 self.record_batch_stats(batch_size, bytes_parsed, duration);
190
191 BatchResult {
192 success_count,
193 failure_count,
194 duration,
195 events_per_sec: success_count as f64 / duration.as_secs_f64(),
196 }
197 }
198
199 pub fn process_batch_bytes(&self, mut json_bytes: Vec<Vec<u8>>) -> BatchResult {
207 let start = Instant::now();
208 let batch_size = json_bytes.len();
209
210 let mut success_count = 0;
211 let mut failure_count = 0;
212 let mut bytes_parsed = 0usize;
213
214 for bytes in &mut json_bytes {
215 bytes_parsed += bytes.len();
216
217 match self.parse_and_queue_bytes(bytes) {
218 Ok(()) => {
219 success_count += 1;
220 self.metrics.record_ingest();
221 }
222 Err(_) => {
223 failure_count += 1;
224 self.metrics.record_error();
225 }
226 }
227 }
228
229 let duration = start.elapsed();
230 self.record_batch_stats(batch_size, bytes_parsed, duration);
231
232 BatchResult {
233 success_count,
234 failure_count,
235 duration,
236 events_per_sec: success_count as f64 / duration.as_secs_f64(),
237 }
238 }
239
240 pub fn process_events(&self, events: Vec<Event>) -> BatchResult {
244 let start = Instant::now();
245 let batch_size = events.len();
246
247 let success_count = self.event_queue.try_push_batch(events);
248 let failure_count = batch_size - success_count;
249
250 self.metrics.record_ingest_batch(success_count as u64);
251 if failure_count > 0 {
252 for _ in 0..failure_count {
253 self.metrics.record_error();
254 }
255 }
256
257 let duration = start.elapsed();
258 self.batches_processed.fetch_add(1, Ordering::Relaxed);
259 self.events_processed
260 .fetch_add(success_count as u64, Ordering::Relaxed);
261 self.total_time_ns
262 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
263
264 BatchResult {
265 success_count,
266 failure_count,
267 duration,
268 events_per_sec: success_count as f64 / duration.as_secs_f64(),
269 }
270 }
271
272 fn parse_and_queue_event(&self, json_str: String) -> Result<()> {
274 let raw: RawEventData = self.json_parser.parse_str(&json_str)?;
275
276 let event = Event::from_strings(
277 raw.event_type,
278 raw.entity_id,
279 raw.stream_id,
280 raw.data,
281 raw.metadata,
282 )?;
283
284 self.event_queue.try_push(event)
285 }
286
287 fn parse_and_queue_bytes(&self, bytes: &mut [u8]) -> Result<()> {
289 let raw: RawEventData = self.json_parser.parse(bytes)?;
290
291 let event = Event::from_strings(
292 raw.event_type,
293 raw.entity_id,
294 raw.stream_id,
295 raw.data,
296 raw.metadata,
297 )?;
298
299 self.event_queue.try_push(event)
300 }
301
302 fn record_batch_stats(&self, batch_size: usize, bytes: usize, duration: Duration) {
304 self.batches_processed.fetch_add(1, Ordering::Relaxed);
305 self.events_processed
306 .fetch_add(batch_size as u64, Ordering::Relaxed);
307 self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
308 self.total_time_ns
309 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
310 }
311
312 pub fn get_batch(&self, max_count: usize) -> Vec<Event> {
317 self.event_queue.try_pop_batch(max_count)
318 }
319
320 pub fn get_event(&self) -> Option<Event> {
322 self.event_queue.try_pop_any()
323 }
324
325 pub fn queue_depth(&self) -> usize {
327 self.event_queue.len()
328 }
329
330 pub fn is_queue_empty(&self) -> bool {
332 self.event_queue.is_empty()
333 }
334
335 pub fn stats(&self) -> BatchProcessorStats {
337 let batches = self.batches_processed.load(Ordering::Relaxed);
338 let events = self.events_processed.load(Ordering::Relaxed);
339 let bytes = self.bytes_parsed.load(Ordering::Relaxed);
340 let time_ns = self.total_time_ns.load(Ordering::Relaxed);
341
342 let time_secs = time_ns as f64 / 1_000_000_000.0;
343 let events_per_sec = if time_secs > 0.0 {
344 events as f64 / time_secs
345 } else {
346 0.0
347 };
348 let throughput_mbps = if time_secs > 0.0 {
349 (bytes as f64 / 1_000_000.0) / time_secs
350 } else {
351 0.0
352 };
353
354 BatchProcessorStats {
355 batches_processed: batches,
356 events_processed: events,
357 bytes_parsed: bytes,
358 avg_batch_size: if batches > 0 {
359 events as f64 / batches as f64
360 } else {
361 0.0
362 },
363 events_per_sec,
364 parse_throughput_mbps: throughput_mbps,
365 queue_depth: self.event_queue.len(),
366 total_time_ns: time_ns,
367 }
368 }
369
370 pub fn parser_stats(&self) -> &SimdJsonStats {
372 self.json_parser.stats()
373 }
374
375 pub fn metrics(&self) -> Arc<LockFreeMetrics> {
377 self.metrics.clone()
378 }
379
380 pub fn event_queue(&self) -> &ShardedEventQueue {
382 &self.event_queue
383 }
384
385 pub fn reset_stats(&self) {
387 self.batches_processed.store(0, Ordering::Relaxed);
388 self.events_processed.store(0, Ordering::Relaxed);
389 self.bytes_parsed.store(0, Ordering::Relaxed);
390 self.total_time_ns.store(0, Ordering::Relaxed);
391 self.json_parser.reset_stats();
392 self.metrics.reset();
393 }
394}
395
396impl Default for BatchProcessor {
397 fn default() -> Self {
398 Self::new()
399 }
400}
401
402#[derive(Debug, Clone)]
404pub struct BatchResult {
405 pub success_count: usize,
407 pub failure_count: usize,
409 pub duration: Duration,
411 pub events_per_sec: f64,
413}
414
415impl BatchResult {
416 pub fn total(&self) -> usize {
418 self.success_count + self.failure_count
419 }
420
421 pub fn success_rate(&self) -> f64 {
423 let total = self.total();
424 if total > 0 {
425 self.success_count as f64 / total as f64
426 } else {
427 1.0
428 }
429 }
430}
431
432pub struct ArenaBatchBuffer {
437 arena: Bump,
438 capacity: usize,
439}
440
441impl ArenaBatchBuffer {
442 pub fn new(capacity_bytes: usize) -> Self {
444 Self {
445 arena: Bump::with_capacity(capacity_bytes),
446 capacity: capacity_bytes,
447 }
448 }
449
450 pub fn alloc_bytes(&self, data: &[u8]) -> &[u8] {
452 self.arena.alloc_slice_copy(data)
453 }
454
455 pub fn alloc_str(&self, s: &str) -> &str {
457 self.arena.alloc_str(s)
458 }
459
460 pub fn allocated(&self) -> usize {
462 self.arena.allocated_bytes()
463 }
464
465 pub fn reset(&mut self) {
467 self.arena.reset();
468 }
469
470 pub fn capacity(&self) -> usize {
472 self.capacity
473 }
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479 use serde_json::json;
480
481 fn create_test_json(id: u32) -> String {
482 json!({
483 "event_type": "test.event",
484 "entity_id": format!("entity-{}", id),
485 "stream_id": "test-stream",
486 "data": {"value": id}
487 })
488 .to_string()
489 }
490
491 #[test]
492 fn test_create_batch_processor() {
493 let processor = BatchProcessor::new();
494 assert!(processor.is_queue_empty());
495 assert_eq!(processor.queue_depth(), 0);
496 }
497
498 #[test]
499 fn test_process_single_batch() {
500 let processor = BatchProcessor::new();
501
502 let events: Vec<String> = (0..100).map(create_test_json).collect();
503 let result = processor.process_batch(events);
504
505 assert_eq!(result.success_count, 100);
506 assert_eq!(result.failure_count, 0);
507 assert_eq!(processor.queue_depth(), 100);
508 }
509
510 #[test]
511 fn test_process_batch_bytes() {
512 let processor = BatchProcessor::new();
513
514 let events: Vec<Vec<u8>> = (0..50)
515 .map(|i| create_test_json(i).into_bytes())
516 .collect();
517 let result = processor.process_batch_bytes(events);
518
519 assert_eq!(result.success_count, 50);
520 assert_eq!(result.failure_count, 0);
521 }
522
523 #[test]
524 fn test_get_batch() {
525 let processor = BatchProcessor::new();
526
527 let events: Vec<String> = (0..100).map(create_test_json).collect();
528 processor.process_batch(events);
529
530 let batch = processor.get_batch(30);
531 assert_eq!(batch.len(), 30);
532 assert_eq!(processor.queue_depth(), 70);
533 }
534
535 #[test]
536 fn test_stats() {
537 let processor = BatchProcessor::new();
538
539 let events: Vec<String> = (0..100).map(create_test_json).collect();
540 processor.process_batch(events);
541
542 let stats = processor.stats();
543 assert_eq!(stats.batches_processed, 1);
544 assert_eq!(stats.events_processed, 100);
545 assert!(stats.events_per_sec > 0.0);
546 }
547
548 #[test]
549 fn test_invalid_json() {
550 let processor = BatchProcessor::new();
551
552 let events = vec![
553 create_test_json(1),
554 "invalid json".to_string(),
555 create_test_json(3),
556 ];
557 let result = processor.process_batch(events);
558
559 assert_eq!(result.success_count, 2);
560 assert_eq!(result.failure_count, 1);
561 }
562
563 #[test]
564 fn test_batch_result_metrics() {
565 let result = BatchResult {
566 success_count: 90,
567 failure_count: 10,
568 duration: Duration::from_millis(100),
569 events_per_sec: 900.0,
570 };
571
572 assert_eq!(result.total(), 100);
573 assert!((result.success_rate() - 0.9).abs() < 0.001);
574 }
575
576 #[test]
577 fn test_arena_batch_buffer() {
578 let mut buffer = ArenaBatchBuffer::new(1024);
579
580 let s1 = buffer.alloc_str("hello");
581 let s2 = buffer.alloc_str("world");
582
583 assert_eq!(s1, "hello");
584 assert_eq!(s2, "world");
585 let allocated_before = buffer.allocated();
586 assert!(allocated_before > 0);
587
588 buffer.reset();
590
591 let s3 = buffer.alloc_str("test");
593 assert_eq!(s3, "test");
594
595 assert!(buffer.capacity() >= 1024);
597 }
598
599 #[test]
600 fn test_config_presets() {
601 let default = BatchProcessorConfig::default();
602 let high_throughput = BatchProcessorConfig::high_throughput();
603 let low_latency = BatchProcessorConfig::low_latency();
604
605 assert!(high_throughput.queue_capacity > default.queue_capacity);
606 assert!(low_latency.max_batch_size < default.max_batch_size);
607 }
608
609 #[test]
610 fn test_concurrent_processing() {
611 let processor = Arc::new(BatchProcessor::new());
612
613 std::thread::scope(|s| {
614 for t in 0..4 {
616 let proc = processor.clone();
617 s.spawn(move || {
618 let events: Vec<String> = (0..100)
619 .map(|i| create_test_json(t * 100 + i))
620 .collect();
621 proc.process_batch(events);
622 });
623 }
624 });
625
626 assert_eq!(processor.queue_depth(), 400);
628 }
629
630 #[test]
631 fn test_process_events_direct() {
632 let processor = BatchProcessor::new();
633
634 let events: Vec<Event> = (0..50)
635 .map(|i| {
636 Event::from_strings(
637 "test.event".to_string(),
638 format!("entity-{}", i),
639 "test-stream".to_string(),
640 json!({"value": i}),
641 None,
642 )
643 .unwrap()
644 })
645 .collect();
646
647 let result = processor.process_events(events);
648 assert_eq!(result.success_count, 50);
649 assert_eq!(processor.queue_depth(), 50);
650 }
651}