1use crate::{
14 domain::entities::Event,
15 error::Result,
16 infrastructure::persistence::{
17 lock_free::{LockFreeMetrics, ShardedEventQueue},
18 simd_json::{SimdJsonParser, SimdJsonStats},
19 },
20};
21use bumpalo::Bump;
22use serde::Deserialize;
23use std::{
24 sync::{
25 Arc,
26 atomic::{AtomicU64, AtomicUsize, Ordering},
27 },
28 time::{Duration, Instant},
29};
30
31#[derive(Debug, Clone)]
33pub struct BatchProcessorConfig {
34 pub max_batch_size: usize,
36 pub queue_capacity: usize,
38 pub shard_count: usize,
40 pub arena_size: usize,
42 pub simd_enabled: bool,
44}
45
46impl Default for BatchProcessorConfig {
47 fn default() -> Self {
48 Self {
49 max_batch_size: 10_000,
50 queue_capacity: 1_000_000,
51 shard_count: 16,
52 arena_size: 64 * 1024 * 1024, simd_enabled: true,
54 }
55 }
56}
57
58impl BatchProcessorConfig {
59 pub fn high_throughput() -> Self {
61 Self {
62 max_batch_size: 50_000,
63 queue_capacity: 10_000_000,
64 shard_count: 32,
65 arena_size: 256 * 1024 * 1024, simd_enabled: true,
67 }
68 }
69
70 pub fn low_latency() -> Self {
72 Self {
73 max_batch_size: 1_000,
74 queue_capacity: 100_000,
75 shard_count: 8,
76 arena_size: 16 * 1024 * 1024, simd_enabled: true,
78 }
79 }
80}
81
82#[derive(Debug, Clone, Deserialize)]
84pub struct RawEventData {
85 pub event_type: String,
86 pub entity_id: String,
87 #[serde(default = "default_stream")]
88 pub stream_id: String,
89 pub data: serde_json::Value,
90 #[serde(default)]
91 pub metadata: Option<serde_json::Value>,
92}
93
94fn default_stream() -> String {
95 "default".to_string()
96}
97
98#[derive(Debug, Clone)]
100pub struct BatchProcessorStats {
101 pub batches_processed: u64,
103 pub events_processed: u64,
105 pub bytes_parsed: u64,
107 pub avg_batch_size: f64,
109 pub events_per_sec: f64,
111 pub parse_throughput_mbps: f64,
113 pub queue_depth: usize,
115 pub total_time_ns: u64,
117}
118
119pub struct BatchProcessor {
124 config: BatchProcessorConfig,
125 json_parser: SimdJsonParser,
127 event_queue: ShardedEventQueue,
129 metrics: Arc<LockFreeMetrics>,
131 batches_processed: AtomicU64,
133 events_processed: AtomicU64,
134 bytes_parsed: AtomicU64,
135 total_time_ns: AtomicU64,
136 arena_index: AtomicUsize,
138}
139
140impl BatchProcessor {
141 pub fn new() -> Self {
143 Self::with_config(BatchProcessorConfig::default())
144 }
145
146 pub fn with_config(config: BatchProcessorConfig) -> Self {
148 let event_queue = ShardedEventQueue::with_shards(config.queue_capacity, config.shard_count);
149
150 Self {
151 config,
152 json_parser: SimdJsonParser::new(),
153 event_queue,
154 metrics: Arc::new(LockFreeMetrics::new()),
155 batches_processed: AtomicU64::new(0),
156 events_processed: AtomicU64::new(0),
157 bytes_parsed: AtomicU64::new(0),
158 total_time_ns: AtomicU64::new(0),
159 arena_index: AtomicUsize::new(0),
160 }
161 }
162
163 #[cfg_attr(feature = "hotpath", hotpath::measure)]
174 pub fn process_batch(&self, json_events: Vec<String>) -> BatchResult {
175 let start = Instant::now();
176 let batch_size = json_events.len();
177
178 let mut success_count = 0;
179 let mut failure_count = 0;
180 let mut bytes_parsed = 0usize;
181
182 for json_str in json_events {
183 bytes_parsed += json_str.len();
184
185 if let Ok(()) = self.parse_and_queue_event(&json_str) {
186 success_count += 1;
187 self.metrics.record_ingest();
188 } else {
189 failure_count += 1;
190 self.metrics.record_error();
191 }
192 }
193
194 let duration = start.elapsed();
195 self.record_batch_stats(batch_size, bytes_parsed, duration);
196
197 BatchResult {
198 success_count,
199 failure_count,
200 duration,
201 events_per_sec: success_count as f64 / duration.as_secs_f64(),
202 }
203 }
204
205 #[cfg_attr(feature = "hotpath", hotpath::measure)]
213 pub fn process_batch_bytes(&self, mut json_bytes: Vec<Vec<u8>>) -> BatchResult {
214 let start = Instant::now();
215 let batch_size = json_bytes.len();
216
217 let mut success_count = 0;
218 let mut failure_count = 0;
219 let mut bytes_parsed = 0usize;
220
221 for bytes in &mut json_bytes {
222 bytes_parsed += bytes.len();
223
224 if let Ok(()) = self.parse_and_queue_bytes(bytes) {
225 success_count += 1;
226 self.metrics.record_ingest();
227 } else {
228 failure_count += 1;
229 self.metrics.record_error();
230 }
231 }
232
233 let duration = start.elapsed();
234 self.record_batch_stats(batch_size, bytes_parsed, duration);
235
236 BatchResult {
237 success_count,
238 failure_count,
239 duration,
240 events_per_sec: success_count as f64 / duration.as_secs_f64(),
241 }
242 }
243
244 pub fn process_events(&self, events: Vec<Event>) -> BatchResult {
248 let start = Instant::now();
249 let batch_size = events.len();
250
251 let success_count = self.event_queue.try_push_batch(events);
252 let failure_count = batch_size - success_count;
253
254 self.metrics.record_ingest_batch(success_count as u64);
255 if failure_count > 0 {
256 for _ in 0..failure_count {
257 self.metrics.record_error();
258 }
259 }
260
261 let duration = start.elapsed();
262 self.batches_processed.fetch_add(1, Ordering::Relaxed);
263 self.events_processed
264 .fetch_add(success_count as u64, Ordering::Relaxed);
265 self.total_time_ns
266 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
267
268 BatchResult {
269 success_count,
270 failure_count,
271 duration,
272 events_per_sec: success_count as f64 / duration.as_secs_f64(),
273 }
274 }
275
276 fn parse_and_queue_event(&self, json_str: &str) -> Result<()> {
278 let raw: RawEventData = self.json_parser.parse_str(json_str)?;
279
280 let event = Event::from_strings(
281 raw.event_type,
282 raw.entity_id,
283 raw.stream_id,
284 raw.data,
285 raw.metadata,
286 )?;
287
288 self.event_queue.try_push(event)
289 }
290
291 fn parse_and_queue_bytes(&self, bytes: &mut [u8]) -> Result<()> {
293 let raw: RawEventData = self.json_parser.parse(bytes)?;
294
295 let event = Event::from_strings(
296 raw.event_type,
297 raw.entity_id,
298 raw.stream_id,
299 raw.data,
300 raw.metadata,
301 )?;
302
303 self.event_queue.try_push(event)
304 }
305
306 fn record_batch_stats(&self, batch_size: usize, bytes: usize, duration: Duration) {
308 self.batches_processed.fetch_add(1, Ordering::Relaxed);
309 self.events_processed
310 .fetch_add(batch_size as u64, Ordering::Relaxed);
311 self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
312 self.total_time_ns
313 .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
314 }
315
316 #[cfg_attr(feature = "hotpath", hotpath::measure)]
321 pub fn get_batch(&self, max_count: usize) -> Vec<Event> {
322 self.event_queue.try_pop_batch(max_count)
323 }
324
325 pub fn get_event(&self) -> Option<Event> {
327 self.event_queue.try_pop_any()
328 }
329
330 pub fn queue_depth(&self) -> usize {
332 self.event_queue.len()
333 }
334
335 pub fn is_queue_empty(&self) -> bool {
337 self.event_queue.is_empty()
338 }
339
340 pub fn stats(&self) -> BatchProcessorStats {
342 let batches = self.batches_processed.load(Ordering::Relaxed);
343 let events = self.events_processed.load(Ordering::Relaxed);
344 let bytes = self.bytes_parsed.load(Ordering::Relaxed);
345 let time_ns = self.total_time_ns.load(Ordering::Relaxed);
346
347 let time_secs = time_ns as f64 / 1_000_000_000.0;
348 let events_per_sec = if time_secs > 0.0 {
349 events as f64 / time_secs
350 } else {
351 0.0
352 };
353 let throughput_mbps = if time_secs > 0.0 {
354 (bytes as f64 / 1_000_000.0) / time_secs
355 } else {
356 0.0
357 };
358
359 BatchProcessorStats {
360 batches_processed: batches,
361 events_processed: events,
362 bytes_parsed: bytes,
363 avg_batch_size: if batches > 0 {
364 events as f64 / batches as f64
365 } else {
366 0.0
367 },
368 events_per_sec,
369 parse_throughput_mbps: throughput_mbps,
370 queue_depth: self.event_queue.len(),
371 total_time_ns: time_ns,
372 }
373 }
374
375 pub fn parser_stats(&self) -> &SimdJsonStats {
377 self.json_parser.stats()
378 }
379
380 pub fn metrics(&self) -> Arc<LockFreeMetrics> {
382 self.metrics.clone()
383 }
384
385 pub fn event_queue(&self) -> &ShardedEventQueue {
387 &self.event_queue
388 }
389
390 pub fn reset_stats(&self) {
392 self.batches_processed.store(0, Ordering::Relaxed);
393 self.events_processed.store(0, Ordering::Relaxed);
394 self.bytes_parsed.store(0, Ordering::Relaxed);
395 self.total_time_ns.store(0, Ordering::Relaxed);
396 self.json_parser.reset_stats();
397 self.metrics.reset();
398 }
399}
400
401impl Default for BatchProcessor {
402 fn default() -> Self {
403 Self::new()
404 }
405}
406
407#[derive(Debug, Clone)]
409pub struct BatchResult {
410 pub success_count: usize,
412 pub failure_count: usize,
414 pub duration: Duration,
416 pub events_per_sec: f64,
418}
419
420impl BatchResult {
421 pub fn total(&self) -> usize {
423 self.success_count + self.failure_count
424 }
425
426 pub fn success_rate(&self) -> f64 {
428 let total = self.total();
429 if total > 0 {
430 self.success_count as f64 / total as f64
431 } else {
432 1.0
433 }
434 }
435}
436
437pub struct ArenaBatchBuffer {
442 arena: Bump,
443 capacity: usize,
444}
445
446impl ArenaBatchBuffer {
447 pub fn new(capacity_bytes: usize) -> Self {
449 Self {
450 arena: Bump::with_capacity(capacity_bytes),
451 capacity: capacity_bytes,
452 }
453 }
454
455 pub fn alloc_bytes(&self, data: &[u8]) -> &[u8] {
457 self.arena.alloc_slice_copy(data)
458 }
459
460 pub fn alloc_str(&self, s: &str) -> &str {
462 self.arena.alloc_str(s)
463 }
464
465 pub fn allocated(&self) -> usize {
467 self.arena.allocated_bytes()
468 }
469
470 pub fn reset(&mut self) {
472 self.arena.reset();
473 }
474
475 pub fn capacity(&self) -> usize {
477 self.capacity
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use serde_json::json;
485
486 fn create_test_json(id: u32) -> String {
487 json!({
488 "event_type": "test.event",
489 "entity_id": format!("entity-{}", id),
490 "stream_id": "test-stream",
491 "data": {"value": id}
492 })
493 .to_string()
494 }
495
496 #[test]
497 fn test_create_batch_processor() {
498 let processor = BatchProcessor::new();
499 assert!(processor.is_queue_empty());
500 assert_eq!(processor.queue_depth(), 0);
501 }
502
503 #[test]
504 fn test_process_single_batch() {
505 let processor = BatchProcessor::new();
506
507 let events: Vec<String> = (0..100).map(create_test_json).collect();
508 let result = processor.process_batch(events);
509
510 assert_eq!(result.success_count, 100);
511 assert_eq!(result.failure_count, 0);
512 assert_eq!(processor.queue_depth(), 100);
513 }
514
515 #[test]
516 fn test_process_batch_bytes() {
517 let processor = BatchProcessor::new();
518
519 let events: Vec<Vec<u8>> = (0..50).map(|i| create_test_json(i).into_bytes()).collect();
520 let result = processor.process_batch_bytes(events);
521
522 assert_eq!(result.success_count, 50);
523 assert_eq!(result.failure_count, 0);
524 }
525
526 #[test]
527 fn test_get_batch() {
528 let processor = BatchProcessor::new();
529
530 let events: Vec<String> = (0..100).map(create_test_json).collect();
531 processor.process_batch(events);
532
533 let batch = processor.get_batch(30);
534 assert_eq!(batch.len(), 30);
535 assert_eq!(processor.queue_depth(), 70);
536 }
537
538 #[test]
539 fn test_stats() {
540 let processor = BatchProcessor::new();
541
542 let events: Vec<String> = (0..100).map(create_test_json).collect();
543 processor.process_batch(events);
544
545 let stats = processor.stats();
546 assert_eq!(stats.batches_processed, 1);
547 assert_eq!(stats.events_processed, 100);
548 assert!(stats.events_per_sec > 0.0);
549 }
550
551 #[test]
552 fn test_invalid_json() {
553 let processor = BatchProcessor::new();
554
555 let events = vec![
556 create_test_json(1),
557 "invalid json".to_string(),
558 create_test_json(3),
559 ];
560 let result = processor.process_batch(events);
561
562 assert_eq!(result.success_count, 2);
563 assert_eq!(result.failure_count, 1);
564 }
565
566 #[test]
567 fn test_batch_result_metrics() {
568 let result = BatchResult {
569 success_count: 90,
570 failure_count: 10,
571 duration: Duration::from_millis(100),
572 events_per_sec: 900.0,
573 };
574
575 assert_eq!(result.total(), 100);
576 assert!((result.success_rate() - 0.9).abs() < 0.001);
577 }
578
579 #[test]
580 fn test_arena_batch_buffer() {
581 let mut buffer = ArenaBatchBuffer::new(1024);
582
583 let s1 = buffer.alloc_str("hello");
584 let s2 = buffer.alloc_str("world");
585
586 assert_eq!(s1, "hello");
587 assert_eq!(s2, "world");
588 let allocated_before = buffer.allocated();
589 assert!(allocated_before > 0);
590
591 buffer.reset();
593
594 let s3 = buffer.alloc_str("test");
596 assert_eq!(s3, "test");
597
598 assert!(buffer.capacity() >= 1024);
600 }
601
602 #[test]
603 fn test_config_presets() {
604 let default = BatchProcessorConfig::default();
605 let high_throughput = BatchProcessorConfig::high_throughput();
606 let low_latency = BatchProcessorConfig::low_latency();
607
608 assert!(high_throughput.queue_capacity > default.queue_capacity);
609 assert!(low_latency.max_batch_size < default.max_batch_size);
610 }
611
612 #[test]
613 fn test_concurrent_processing() {
614 let processor = Arc::new(BatchProcessor::new());
615
616 std::thread::scope(|s| {
617 for t in 0..4 {
619 let proc = processor.clone();
620 s.spawn(move || {
621 let events: Vec<String> =
622 (0..100).map(|i| create_test_json(t * 100 + i)).collect();
623 proc.process_batch(events);
624 });
625 }
626 });
627
628 assert_eq!(processor.queue_depth(), 400);
630 }
631
632 #[test]
633 fn test_process_events_direct() {
634 let processor = BatchProcessor::new();
635
636 let events: Vec<Event> = (0..50)
637 .map(|i| {
638 Event::from_strings(
639 "test.event".to_string(),
640 format!("entity-{i}"),
641 "test-stream".to_string(),
642 json!({"value": i}),
643 None,
644 )
645 .unwrap()
646 })
647 .collect();
648
649 let result = processor.process_events(events);
650 assert_eq!(result.success_count, 50);
651 assert_eq!(processor.queue_depth(), 50);
652 }
653}