1use crate::{
14 domain::entities::Event,
15 error::Result,
16 infrastructure::persistence::{
17 lock_free::{LockFreeMetrics, ShardedEventQueue},
18 simd_json::{SimdJsonParser, SimdJsonStats},
19 },
20};
21use bumpalo::Bump;
22use serde::Deserialize;
23use std::{
24 sync::{
25 Arc,
26 atomic::{AtomicU64, AtomicUsize, Ordering},
27 },
28 time::{Duration, Instant},
29};
30
31#[derive(Debug, Clone)]
33pub struct BatchProcessorConfig {
34 pub max_batch_size: usize,
36 pub queue_capacity: usize,
38 pub shard_count: usize,
40 pub arena_size: usize,
42 pub simd_enabled: bool,
44}
45
46impl Default for BatchProcessorConfig {
47 fn default() -> Self {
48 Self {
49 max_batch_size: 10_000,
50 queue_capacity: 1_000_000,
51 shard_count: 16,
52 arena_size: 64 * 1024 * 1024, simd_enabled: true,
54 }
55 }
56}
57
58impl BatchProcessorConfig {
59 pub fn high_throughput() -> Self {
61 Self {
62 max_batch_size: 50_000,
63 queue_capacity: 10_000_000,
64 shard_count: 32,
65 arena_size: 256 * 1024 * 1024, simd_enabled: true,
67 }
68 }
69
70 pub fn low_latency() -> Self {
72 Self {
73 max_batch_size: 1_000,
74 queue_capacity: 100_000,
75 shard_count: 8,
76 arena_size: 16 * 1024 * 1024, simd_enabled: true,
78 }
79 }
80}
81
82#[derive(Debug, Clone, Deserialize)]
84pub struct RawEventData {
85 pub event_type: String,
86 pub entity_id: String,
87 #[serde(default = "default_stream")]
88 pub stream_id: String,
89 pub data: serde_json::Value,
90 #[serde(default)]
91 pub metadata: Option<serde_json::Value>,
92}
93
94fn default_stream() -> String {
95 "default".to_string()
96}
97
98#[derive(Debug, Clone)]
100pub struct BatchProcessorStats {
101 pub batches_processed: u64,
103 pub events_processed: u64,
105 pub bytes_parsed: u64,
107 pub avg_batch_size: f64,
109 pub events_per_sec: f64,
111 pub parse_throughput_mbps: f64,
113 pub queue_depth: usize,
115 pub total_time_ns: u64,
117}
118
119pub struct BatchProcessor {
124 config: BatchProcessorConfig,
125 json_parser: SimdJsonParser,
127 event_queue: ShardedEventQueue,
129 metrics: Arc<LockFreeMetrics>,
131 batches_processed: AtomicU64,
133 events_processed: AtomicU64,
134 bytes_parsed: AtomicU64,
135 total_time_ns: AtomicU64,
136 arena_index: AtomicUsize,
138}
139
140impl BatchProcessor {
141 pub fn new() -> Self {
143 Self::with_config(BatchProcessorConfig::default())
144 }
145
146 pub fn with_config(config: BatchProcessorConfig) -> Self {
148 let event_queue = ShardedEventQueue::with_shards(config.queue_capacity, config.shard_count);
149
150 Self {
151 config,
152 json_parser: SimdJsonParser::new(),
153 event_queue,
154 metrics: Arc::new(LockFreeMetrics::new()),
155 batches_processed: AtomicU64::new(0),
156 events_processed: AtomicU64::new(0),
157 bytes_parsed: AtomicU64::new(0),
158 total_time_ns: AtomicU64::new(0),
159 arena_index: AtomicUsize::new(0),
160 }
161 }
162
163 pub fn process_batch(&self, json_events: Vec<String>) -> BatchResult {
174 let start = Instant::now();
175 let batch_size = json_events.len();
176
177 let mut success_count = 0;
178 let mut failure_count = 0;
179 let mut bytes_parsed = 0usize;
180
181 for json_str in json_events {
182 bytes_parsed += json_str.len();
183
184 match self.parse_and_queue_event(json_str) {
185 Ok(()) => {
186 success_count += 1;
187 self.metrics.record_ingest();
188 }
189 Err(_) => {
190 failure_count += 1;
191 self.metrics.record_error();
192 }
193 }
194 }
195
196 let duration = start.elapsed();
197 self.record_batch_stats(batch_size, bytes_parsed, duration);
198
199 BatchResult {
200 success_count,
201 failure_count,
202 duration,
203 events_per_sec: success_count as f64 / duration.as_secs_f64(),
204 }
205 }
206
207 pub fn process_batch_bytes(&self, mut json_bytes: Vec<Vec<u8>>) -> BatchResult {
215 let start = Instant::now();
216 let batch_size = json_bytes.len();
217
218 let mut success_count = 0;
219 let mut failure_count = 0;
220 let mut bytes_parsed = 0usize;
221
222 for bytes in &mut json_bytes {
223 bytes_parsed += bytes.len();
224
225 match self.parse_and_queue_bytes(bytes) {
226 Ok(()) => {
227 success_count += 1;
228 self.metrics.record_ingest();
229 }
230 Err(_) => {
231 failure_count += 1;
232 self.metrics.record_error();
233 }
234 }
235 }
236
237 let duration = start.elapsed();
238 self.record_batch_stats(batch_size, bytes_parsed, duration);
239
240 BatchResult {
241 success_count,
242 failure_count,
243 duration,
244 events_per_sec: success_count as f64 / duration.as_secs_f64(),
245 }
246 }
247
248 pub fn process_events(&self, events: Vec<Event>) -> BatchResult {
252 let start = Instant::now();
253 let batch_size = events.len();
254
255 let success_count = self.event_queue.try_push_batch(events);
256 let failure_count = batch_size - success_count;
257
258 self.metrics.record_ingest_batch(success_count as u64);
259 if failure_count > 0 {
260 for _ in 0..failure_count {
261 self.metrics.record_error();
262 }
263 }
264
265 let duration = start.elapsed();
266 self.batches_processed.fetch_add(1, Ordering::Relaxed);
267 self.events_processed
268 .fetch_add(success_count as u64, Ordering::Relaxed);
269 self.total_time_ns
270 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
271
272 BatchResult {
273 success_count,
274 failure_count,
275 duration,
276 events_per_sec: success_count as f64 / duration.as_secs_f64(),
277 }
278 }
279
280 fn parse_and_queue_event(&self, json_str: String) -> Result<()> {
282 let raw: RawEventData = self.json_parser.parse_str(&json_str)?;
283
284 let event = Event::from_strings(
285 raw.event_type,
286 raw.entity_id,
287 raw.stream_id,
288 raw.data,
289 raw.metadata,
290 )?;
291
292 self.event_queue.try_push(event)
293 }
294
295 fn parse_and_queue_bytes(&self, bytes: &mut [u8]) -> Result<()> {
297 let raw: RawEventData = self.json_parser.parse(bytes)?;
298
299 let event = Event::from_strings(
300 raw.event_type,
301 raw.entity_id,
302 raw.stream_id,
303 raw.data,
304 raw.metadata,
305 )?;
306
307 self.event_queue.try_push(event)
308 }
309
310 fn record_batch_stats(&self, batch_size: usize, bytes: usize, duration: Duration) {
312 self.batches_processed.fetch_add(1, Ordering::Relaxed);
313 self.events_processed
314 .fetch_add(batch_size as u64, Ordering::Relaxed);
315 self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
316 self.total_time_ns
317 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
318 }
319
320 pub fn get_batch(&self, max_count: usize) -> Vec<Event> {
325 self.event_queue.try_pop_batch(max_count)
326 }
327
328 pub fn get_event(&self) -> Option<Event> {
330 self.event_queue.try_pop_any()
331 }
332
333 pub fn queue_depth(&self) -> usize {
335 self.event_queue.len()
336 }
337
338 pub fn is_queue_empty(&self) -> bool {
340 self.event_queue.is_empty()
341 }
342
343 pub fn stats(&self) -> BatchProcessorStats {
345 let batches = self.batches_processed.load(Ordering::Relaxed);
346 let events = self.events_processed.load(Ordering::Relaxed);
347 let bytes = self.bytes_parsed.load(Ordering::Relaxed);
348 let time_ns = self.total_time_ns.load(Ordering::Relaxed);
349
350 let time_secs = time_ns as f64 / 1_000_000_000.0;
351 let events_per_sec = if time_secs > 0.0 {
352 events as f64 / time_secs
353 } else {
354 0.0
355 };
356 let throughput_mbps = if time_secs > 0.0 {
357 (bytes as f64 / 1_000_000.0) / time_secs
358 } else {
359 0.0
360 };
361
362 BatchProcessorStats {
363 batches_processed: batches,
364 events_processed: events,
365 bytes_parsed: bytes,
366 avg_batch_size: if batches > 0 {
367 events as f64 / batches as f64
368 } else {
369 0.0
370 },
371 events_per_sec,
372 parse_throughput_mbps: throughput_mbps,
373 queue_depth: self.event_queue.len(),
374 total_time_ns: time_ns,
375 }
376 }
377
378 pub fn parser_stats(&self) -> &SimdJsonStats {
380 self.json_parser.stats()
381 }
382
383 pub fn metrics(&self) -> Arc<LockFreeMetrics> {
385 self.metrics.clone()
386 }
387
388 pub fn event_queue(&self) -> &ShardedEventQueue {
390 &self.event_queue
391 }
392
393 pub fn reset_stats(&self) {
395 self.batches_processed.store(0, Ordering::Relaxed);
396 self.events_processed.store(0, Ordering::Relaxed);
397 self.bytes_parsed.store(0, Ordering::Relaxed);
398 self.total_time_ns.store(0, Ordering::Relaxed);
399 self.json_parser.reset_stats();
400 self.metrics.reset();
401 }
402}
403
404impl Default for BatchProcessor {
405 fn default() -> Self {
406 Self::new()
407 }
408}
409
410#[derive(Debug, Clone)]
412pub struct BatchResult {
413 pub success_count: usize,
415 pub failure_count: usize,
417 pub duration: Duration,
419 pub events_per_sec: f64,
421}
422
423impl BatchResult {
424 pub fn total(&self) -> usize {
426 self.success_count + self.failure_count
427 }
428
429 pub fn success_rate(&self) -> f64 {
431 let total = self.total();
432 if total > 0 {
433 self.success_count as f64 / total as f64
434 } else {
435 1.0
436 }
437 }
438}
439
440pub struct ArenaBatchBuffer {
445 arena: Bump,
446 capacity: usize,
447}
448
449impl ArenaBatchBuffer {
450 pub fn new(capacity_bytes: usize) -> Self {
452 Self {
453 arena: Bump::with_capacity(capacity_bytes),
454 capacity: capacity_bytes,
455 }
456 }
457
458 pub fn alloc_bytes(&self, data: &[u8]) -> &[u8] {
460 self.arena.alloc_slice_copy(data)
461 }
462
463 pub fn alloc_str(&self, s: &str) -> &str {
465 self.arena.alloc_str(s)
466 }
467
468 pub fn allocated(&self) -> usize {
470 self.arena.allocated_bytes()
471 }
472
473 pub fn reset(&mut self) {
475 self.arena.reset();
476 }
477
478 pub fn capacity(&self) -> usize {
480 self.capacity
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 use serde_json::json;
488
489 fn create_test_json(id: u32) -> String {
490 json!({
491 "event_type": "test.event",
492 "entity_id": format!("entity-{}", id),
493 "stream_id": "test-stream",
494 "data": {"value": id}
495 })
496 .to_string()
497 }
498
499 #[test]
500 fn test_create_batch_processor() {
501 let processor = BatchProcessor::new();
502 assert!(processor.is_queue_empty());
503 assert_eq!(processor.queue_depth(), 0);
504 }
505
506 #[test]
507 fn test_process_single_batch() {
508 let processor = BatchProcessor::new();
509
510 let events: Vec<String> = (0..100).map(create_test_json).collect();
511 let result = processor.process_batch(events);
512
513 assert_eq!(result.success_count, 100);
514 assert_eq!(result.failure_count, 0);
515 assert_eq!(processor.queue_depth(), 100);
516 }
517
518 #[test]
519 fn test_process_batch_bytes() {
520 let processor = BatchProcessor::new();
521
522 let events: Vec<Vec<u8>> = (0..50).map(|i| create_test_json(i).into_bytes()).collect();
523 let result = processor.process_batch_bytes(events);
524
525 assert_eq!(result.success_count, 50);
526 assert_eq!(result.failure_count, 0);
527 }
528
529 #[test]
530 fn test_get_batch() {
531 let processor = BatchProcessor::new();
532
533 let events: Vec<String> = (0..100).map(create_test_json).collect();
534 processor.process_batch(events);
535
536 let batch = processor.get_batch(30);
537 assert_eq!(batch.len(), 30);
538 assert_eq!(processor.queue_depth(), 70);
539 }
540
541 #[test]
542 fn test_stats() {
543 let processor = BatchProcessor::new();
544
545 let events: Vec<String> = (0..100).map(create_test_json).collect();
546 processor.process_batch(events);
547
548 let stats = processor.stats();
549 assert_eq!(stats.batches_processed, 1);
550 assert_eq!(stats.events_processed, 100);
551 assert!(stats.events_per_sec > 0.0);
552 }
553
554 #[test]
555 fn test_invalid_json() {
556 let processor = BatchProcessor::new();
557
558 let events = vec![
559 create_test_json(1),
560 "invalid json".to_string(),
561 create_test_json(3),
562 ];
563 let result = processor.process_batch(events);
564
565 assert_eq!(result.success_count, 2);
566 assert_eq!(result.failure_count, 1);
567 }
568
569 #[test]
570 fn test_batch_result_metrics() {
571 let result = BatchResult {
572 success_count: 90,
573 failure_count: 10,
574 duration: Duration::from_millis(100),
575 events_per_sec: 900.0,
576 };
577
578 assert_eq!(result.total(), 100);
579 assert!((result.success_rate() - 0.9).abs() < 0.001);
580 }
581
582 #[test]
583 fn test_arena_batch_buffer() {
584 let mut buffer = ArenaBatchBuffer::new(1024);
585
586 let s1 = buffer.alloc_str("hello");
587 let s2 = buffer.alloc_str("world");
588
589 assert_eq!(s1, "hello");
590 assert_eq!(s2, "world");
591 let allocated_before = buffer.allocated();
592 assert!(allocated_before > 0);
593
594 buffer.reset();
596
597 let s3 = buffer.alloc_str("test");
599 assert_eq!(s3, "test");
600
601 assert!(buffer.capacity() >= 1024);
603 }
604
605 #[test]
606 fn test_config_presets() {
607 let default = BatchProcessorConfig::default();
608 let high_throughput = BatchProcessorConfig::high_throughput();
609 let low_latency = BatchProcessorConfig::low_latency();
610
611 assert!(high_throughput.queue_capacity > default.queue_capacity);
612 assert!(low_latency.max_batch_size < default.max_batch_size);
613 }
614
615 #[test]
616 fn test_concurrent_processing() {
617 let processor = Arc::new(BatchProcessor::new());
618
619 std::thread::scope(|s| {
620 for t in 0..4 {
622 let proc = processor.clone();
623 s.spawn(move || {
624 let events: Vec<String> =
625 (0..100).map(|i| create_test_json(t * 100 + i)).collect();
626 proc.process_batch(events);
627 });
628 }
629 });
630
631 assert_eq!(processor.queue_depth(), 400);
633 }
634
635 #[test]
636 fn test_process_events_direct() {
637 let processor = BatchProcessor::new();
638
639 let events: Vec<Event> = (0..50)
640 .map(|i| {
641 Event::from_strings(
642 "test.event".to_string(),
643 format!("entity-{}", i),
644 "test-stream".to_string(),
645 json!({"value": i}),
646 None,
647 )
648 .unwrap()
649 })
650 .collect();
651
652 let result = processor.process_events(events);
653 assert_eq!(result.success_count, 50);
654 assert_eq!(processor.queue_depth(), 50);
655 }
656}