Skip to main content

allsource_core/infrastructure/persistence/
batch_processor.rs

1//! High-throughput batch processing pipeline
2//!
3//! This module provides an optimized batch processing system that combines:
4//! - SIMD-accelerated JSON parsing for fast deserialization
5//! - Lock-free sharded queues for parallel ingestion
6//! - Memory pooling for zero-allocation hot paths
7//!
8//! # Performance Target
9//! - 1M+ events/sec sustained throughput
10//! - Sub-millisecond p99 latency
11//! - Linear scalability with CPU cores
12
13use crate::{
14    domain::entities::Event,
15    error::Result,
16    infrastructure::persistence::{
17        lock_free::{LockFreeMetrics, ShardedEventQueue},
18        simd_json::{SimdJsonParser, SimdJsonStats},
19    },
20};
21use bumpalo::Bump;
22use serde::Deserialize;
23use std::{
24    sync::{
25        Arc,
26        atomic::{AtomicU64, AtomicUsize, Ordering},
27    },
28    time::{Duration, Instant},
29};
30
31/// Configuration for batch processor
32#[derive(Debug, Clone)]
33pub struct BatchProcessorConfig {
34    /// Maximum batch size for processing
35    pub max_batch_size: usize,
36    /// Queue capacity for pending events
37    pub queue_capacity: usize,
38    /// Number of shards for the event queue
39    pub shard_count: usize,
40    /// Arena allocation pool size (bytes)
41    pub arena_size: usize,
42    /// Enable SIMD JSON parsing
43    pub simd_enabled: bool,
44}
45
46impl Default for BatchProcessorConfig {
47    fn default() -> Self {
48        Self {
49            max_batch_size: 10_000,
50            queue_capacity: 1_000_000,
51            shard_count: 16,
52            arena_size: 64 * 1024 * 1024, // 64MB
53            simd_enabled: true,
54        }
55    }
56}
57
58impl BatchProcessorConfig {
59    /// High-throughput configuration for production use
60    pub fn high_throughput() -> Self {
61        Self {
62            max_batch_size: 50_000,
63            queue_capacity: 10_000_000,
64            shard_count: 32,
65            arena_size: 256 * 1024 * 1024, // 256MB
66            simd_enabled: true,
67        }
68    }
69
70    /// Low-latency configuration optimized for quick responses
71    pub fn low_latency() -> Self {
72        Self {
73            max_batch_size: 1_000,
74            queue_capacity: 100_000,
75            shard_count: 8,
76            arena_size: 16 * 1024 * 1024, // 16MB
77            simd_enabled: true,
78        }
79    }
80}
81
82/// Raw event data for batch parsing
83#[derive(Debug, Clone, Deserialize)]
84pub struct RawEventData {
85    pub event_type: String,
86    pub entity_id: String,
87    #[serde(default = "default_stream")]
88    pub stream_id: String,
89    pub data: serde_json::Value,
90    #[serde(default)]
91    pub metadata: Option<serde_json::Value>,
92}
93
94fn default_stream() -> String {
95    "default".to_string()
96}
97
98/// Statistics for batch processing
99#[derive(Debug, Clone)]
100pub struct BatchProcessorStats {
101    /// Total batches processed
102    pub batches_processed: u64,
103    /// Total events processed
104    pub events_processed: u64,
105    /// Total bytes parsed
106    pub bytes_parsed: u64,
107    /// Average batch size
108    pub avg_batch_size: f64,
109    /// Events per second
110    pub events_per_sec: f64,
111    /// Parse throughput in MB/s
112    pub parse_throughput_mbps: f64,
113    /// Current queue depth
114    pub queue_depth: usize,
115    /// Total processing time in nanoseconds
116    pub total_time_ns: u64,
117}
118
119/// High-throughput batch event processor
120///
121/// Combines SIMD JSON parsing with lock-free queues for maximum throughput.
122/// Designed to achieve 1M+ events/sec sustained ingestion.
123pub struct BatchProcessor {
124    config: BatchProcessorConfig,
125    /// SIMD JSON parser
126    json_parser: SimdJsonParser,
127    /// Lock-free sharded queue for processed events
128    event_queue: ShardedEventQueue,
129    /// Lock-free metrics
130    metrics: Arc<LockFreeMetrics>,
131    /// Batch processing statistics
132    batches_processed: AtomicU64,
133    events_processed: AtomicU64,
134    bytes_parsed: AtomicU64,
135    total_time_ns: AtomicU64,
136    /// Current arena pool index for round-robin
137    arena_index: AtomicUsize,
138}
139
140impl BatchProcessor {
141    /// Create a new batch processor with default configuration
142    pub fn new() -> Self {
143        Self::with_config(BatchProcessorConfig::default())
144    }
145
146    /// Create a new batch processor with custom configuration
147    pub fn with_config(config: BatchProcessorConfig) -> Self {
148        let event_queue = ShardedEventQueue::with_shards(config.queue_capacity, config.shard_count);
149
150        Self {
151            config,
152            json_parser: SimdJsonParser::new(),
153            event_queue,
154            metrics: Arc::new(LockFreeMetrics::new()),
155            batches_processed: AtomicU64::new(0),
156            events_processed: AtomicU64::new(0),
157            bytes_parsed: AtomicU64::new(0),
158            total_time_ns: AtomicU64::new(0),
159            arena_index: AtomicUsize::new(0),
160        }
161    }
162
163    /// Process a batch of raw JSON event strings
164    ///
165    /// This is the main entry point for high-throughput ingestion.
166    /// Uses SIMD-accelerated parsing and lock-free queuing.
167    ///
168    /// # Arguments
169    /// * `json_events` - Vector of JSON strings to parse and process
170    ///
171    /// # Returns
172    /// BatchResult with success/failure counts
173    pub fn process_batch(&self, json_events: Vec<String>) -> BatchResult {
174        let start = Instant::now();
175        let batch_size = json_events.len();
176
177        let mut success_count = 0;
178        let mut failure_count = 0;
179        let mut bytes_parsed = 0usize;
180
181        for json_str in json_events {
182            bytes_parsed += json_str.len();
183
184            match self.parse_and_queue_event(json_str) {
185                Ok(()) => {
186                    success_count += 1;
187                    self.metrics.record_ingest();
188                }
189                Err(_) => {
190                    failure_count += 1;
191                    self.metrics.record_error();
192                }
193            }
194        }
195
196        let duration = start.elapsed();
197        self.record_batch_stats(batch_size, bytes_parsed, duration);
198
199        BatchResult {
200            success_count,
201            failure_count,
202            duration,
203            events_per_sec: success_count as f64 / duration.as_secs_f64(),
204        }
205    }
206
207    /// Process a batch of raw JSON bytes (more efficient - avoids string conversion)
208    ///
209    /// # Arguments
210    /// * `json_bytes` - Vector of JSON byte slices to parse
211    ///
212    /// # Returns
213    /// BatchResult with success/failure counts
214    pub fn process_batch_bytes(&self, mut json_bytes: Vec<Vec<u8>>) -> BatchResult {
215        let start = Instant::now();
216        let batch_size = json_bytes.len();
217
218        let mut success_count = 0;
219        let mut failure_count = 0;
220        let mut bytes_parsed = 0usize;
221
222        for bytes in &mut json_bytes {
223            bytes_parsed += bytes.len();
224
225            match self.parse_and_queue_bytes(bytes) {
226                Ok(()) => {
227                    success_count += 1;
228                    self.metrics.record_ingest();
229                }
230                Err(_) => {
231                    failure_count += 1;
232                    self.metrics.record_error();
233                }
234            }
235        }
236
237        let duration = start.elapsed();
238        self.record_batch_stats(batch_size, bytes_parsed, duration);
239
240        BatchResult {
241            success_count,
242            failure_count,
243            duration,
244            events_per_sec: success_count as f64 / duration.as_secs_f64(),
245        }
246    }
247
248    /// Process pre-parsed events directly (fastest path)
249    ///
250    /// Use this when events are already parsed (e.g., from Arrow/Parquet).
251    pub fn process_events(&self, events: Vec<Event>) -> BatchResult {
252        let start = Instant::now();
253        let batch_size = events.len();
254
255        let success_count = self.event_queue.try_push_batch(events);
256        let failure_count = batch_size - success_count;
257
258        self.metrics.record_ingest_batch(success_count as u64);
259        if failure_count > 0 {
260            for _ in 0..failure_count {
261                self.metrics.record_error();
262            }
263        }
264
265        let duration = start.elapsed();
266        self.batches_processed.fetch_add(1, Ordering::Relaxed);
267        self.events_processed
268            .fetch_add(success_count as u64, Ordering::Relaxed);
269        self.total_time_ns
270            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
271
272        BatchResult {
273            success_count,
274            failure_count,
275            duration,
276            events_per_sec: success_count as f64 / duration.as_secs_f64(),
277        }
278    }
279
280    /// Parse and queue a single JSON event string
281    fn parse_and_queue_event(&self, json_str: String) -> Result<()> {
282        let raw: RawEventData = self.json_parser.parse_str(&json_str)?;
283
284        let event = Event::from_strings(
285            raw.event_type,
286            raw.entity_id,
287            raw.stream_id,
288            raw.data,
289            raw.metadata,
290        )?;
291
292        self.event_queue.try_push(event)
293    }
294
295    /// Parse and queue from bytes (SIMD path)
296    fn parse_and_queue_bytes(&self, bytes: &mut [u8]) -> Result<()> {
297        let raw: RawEventData = self.json_parser.parse(bytes)?;
298
299        let event = Event::from_strings(
300            raw.event_type,
301            raw.entity_id,
302            raw.stream_id,
303            raw.data,
304            raw.metadata,
305        )?;
306
307        self.event_queue.try_push(event)
308    }
309
310    /// Record batch statistics
311    fn record_batch_stats(&self, batch_size: usize, bytes: usize, duration: Duration) {
312        self.batches_processed.fetch_add(1, Ordering::Relaxed);
313        self.events_processed
314            .fetch_add(batch_size as u64, Ordering::Relaxed);
315        self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
316        self.total_time_ns
317            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
318    }
319
320    /// Get a batch of processed events from the queue
321    ///
322    /// # Arguments
323    /// * `max_count` - Maximum number of events to retrieve
324    pub fn get_batch(&self, max_count: usize) -> Vec<Event> {
325        self.event_queue.try_pop_batch(max_count)
326    }
327
328    /// Get a single event from the queue
329    pub fn get_event(&self) -> Option<Event> {
330        self.event_queue.try_pop_any()
331    }
332
333    /// Get current queue depth
334    pub fn queue_depth(&self) -> usize {
335        self.event_queue.len()
336    }
337
338    /// Check if queue is empty
339    pub fn is_queue_empty(&self) -> bool {
340        self.event_queue.is_empty()
341    }
342
343    /// Get processing statistics
344    pub fn stats(&self) -> BatchProcessorStats {
345        let batches = self.batches_processed.load(Ordering::Relaxed);
346        let events = self.events_processed.load(Ordering::Relaxed);
347        let bytes = self.bytes_parsed.load(Ordering::Relaxed);
348        let time_ns = self.total_time_ns.load(Ordering::Relaxed);
349
350        let time_secs = time_ns as f64 / 1_000_000_000.0;
351        let events_per_sec = if time_secs > 0.0 {
352            events as f64 / time_secs
353        } else {
354            0.0
355        };
356        let throughput_mbps = if time_secs > 0.0 {
357            (bytes as f64 / 1_000_000.0) / time_secs
358        } else {
359            0.0
360        };
361
362        BatchProcessorStats {
363            batches_processed: batches,
364            events_processed: events,
365            bytes_parsed: bytes,
366            avg_batch_size: if batches > 0 {
367                events as f64 / batches as f64
368            } else {
369                0.0
370            },
371            events_per_sec,
372            parse_throughput_mbps: throughput_mbps,
373            queue_depth: self.event_queue.len(),
374            total_time_ns: time_ns,
375        }
376    }
377
378    /// Get JSON parser statistics
379    pub fn parser_stats(&self) -> &SimdJsonStats {
380        self.json_parser.stats()
381    }
382
383    /// Get metrics collector
384    pub fn metrics(&self) -> Arc<LockFreeMetrics> {
385        self.metrics.clone()
386    }
387
388    /// Get the event queue for direct access
389    pub fn event_queue(&self) -> &ShardedEventQueue {
390        &self.event_queue
391    }
392
393    /// Reset all statistics
394    pub fn reset_stats(&self) {
395        self.batches_processed.store(0, Ordering::Relaxed);
396        self.events_processed.store(0, Ordering::Relaxed);
397        self.bytes_parsed.store(0, Ordering::Relaxed);
398        self.total_time_ns.store(0, Ordering::Relaxed);
399        self.json_parser.reset_stats();
400        self.metrics.reset();
401    }
402}
403
404impl Default for BatchProcessor {
405    fn default() -> Self {
406        Self::new()
407    }
408}
409
410/// Result of batch processing operation
411#[derive(Debug, Clone)]
412pub struct BatchResult {
413    /// Number of successfully processed events
414    pub success_count: usize,
415    /// Number of failed events
416    pub failure_count: usize,
417    /// Total processing duration
418    pub duration: Duration,
419    /// Events processed per second
420    pub events_per_sec: f64,
421}
422
423impl BatchResult {
424    /// Get total events (success + failure)
425    pub fn total(&self) -> usize {
426        self.success_count + self.failure_count
427    }
428
429    /// Get success rate (0.0 to 1.0)
430    pub fn success_rate(&self) -> f64 {
431        let total = self.total();
432        if total > 0 {
433            self.success_count as f64 / total as f64
434        } else {
435            1.0
436        }
437    }
438}
439
440/// Arena-backed batch buffer for zero-allocation processing
441///
442/// Uses bumpalo for fast arena allocation during batch processing.
443/// All allocations are freed together when the arena is reset.
444pub struct ArenaBatchBuffer {
445    arena: Bump,
446    capacity: usize,
447}
448
449impl ArenaBatchBuffer {
450    /// Create a new arena buffer with specified capacity
451    pub fn new(capacity_bytes: usize) -> Self {
452        Self {
453            arena: Bump::with_capacity(capacity_bytes),
454            capacity: capacity_bytes,
455        }
456    }
457
458    /// Allocate a byte slice in the arena
459    pub fn alloc_bytes(&self, data: &[u8]) -> &[u8] {
460        self.arena.alloc_slice_copy(data)
461    }
462
463    /// Allocate a string in the arena
464    pub fn alloc_str(&self, s: &str) -> &str {
465        self.arena.alloc_str(s)
466    }
467
468    /// Get current arena allocation
469    pub fn allocated(&self) -> usize {
470        self.arena.allocated_bytes()
471    }
472
473    /// Reset the arena (frees all allocations)
474    pub fn reset(&mut self) {
475        self.arena.reset();
476    }
477
478    /// Get capacity
479    pub fn capacity(&self) -> usize {
480        self.capacity
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use serde_json::json;
488
489    fn create_test_json(id: u32) -> String {
490        json!({
491            "event_type": "test.event",
492            "entity_id": format!("entity-{}", id),
493            "stream_id": "test-stream",
494            "data": {"value": id}
495        })
496        .to_string()
497    }
498
499    #[test]
500    fn test_create_batch_processor() {
501        let processor = BatchProcessor::new();
502        assert!(processor.is_queue_empty());
503        assert_eq!(processor.queue_depth(), 0);
504    }
505
506    #[test]
507    fn test_process_single_batch() {
508        let processor = BatchProcessor::new();
509
510        let events: Vec<String> = (0..100).map(create_test_json).collect();
511        let result = processor.process_batch(events);
512
513        assert_eq!(result.success_count, 100);
514        assert_eq!(result.failure_count, 0);
515        assert_eq!(processor.queue_depth(), 100);
516    }
517
518    #[test]
519    fn test_process_batch_bytes() {
520        let processor = BatchProcessor::new();
521
522        let events: Vec<Vec<u8>> = (0..50).map(|i| create_test_json(i).into_bytes()).collect();
523        let result = processor.process_batch_bytes(events);
524
525        assert_eq!(result.success_count, 50);
526        assert_eq!(result.failure_count, 0);
527    }
528
529    #[test]
530    fn test_get_batch() {
531        let processor = BatchProcessor::new();
532
533        let events: Vec<String> = (0..100).map(create_test_json).collect();
534        processor.process_batch(events);
535
536        let batch = processor.get_batch(30);
537        assert_eq!(batch.len(), 30);
538        assert_eq!(processor.queue_depth(), 70);
539    }
540
541    #[test]
542    fn test_stats() {
543        let processor = BatchProcessor::new();
544
545        let events: Vec<String> = (0..100).map(create_test_json).collect();
546        processor.process_batch(events);
547
548        let stats = processor.stats();
549        assert_eq!(stats.batches_processed, 1);
550        assert_eq!(stats.events_processed, 100);
551        assert!(stats.events_per_sec > 0.0);
552    }
553
554    #[test]
555    fn test_invalid_json() {
556        let processor = BatchProcessor::new();
557
558        let events = vec![
559            create_test_json(1),
560            "invalid json".to_string(),
561            create_test_json(3),
562        ];
563        let result = processor.process_batch(events);
564
565        assert_eq!(result.success_count, 2);
566        assert_eq!(result.failure_count, 1);
567    }
568
569    #[test]
570    fn test_batch_result_metrics() {
571        let result = BatchResult {
572            success_count: 90,
573            failure_count: 10,
574            duration: Duration::from_millis(100),
575            events_per_sec: 900.0,
576        };
577
578        assert_eq!(result.total(), 100);
579        assert!((result.success_rate() - 0.9).abs() < 0.001);
580    }
581
582    #[test]
583    fn test_arena_batch_buffer() {
584        let mut buffer = ArenaBatchBuffer::new(1024);
585
586        let s1 = buffer.alloc_str("hello");
587        let s2 = buffer.alloc_str("world");
588
589        assert_eq!(s1, "hello");
590        assert_eq!(s2, "world");
591        let allocated_before = buffer.allocated();
592        assert!(allocated_before > 0);
593
594        // Reset makes memory available for reuse (but allocated_bytes may not change)
595        buffer.reset();
596
597        // After reset, new allocations should reuse the memory
598        let s3 = buffer.alloc_str("test");
599        assert_eq!(s3, "test");
600
601        // Verify the buffer is functional after reset
602        assert!(buffer.capacity() >= 1024);
603    }
604
605    #[test]
606    fn test_config_presets() {
607        let default = BatchProcessorConfig::default();
608        let high_throughput = BatchProcessorConfig::high_throughput();
609        let low_latency = BatchProcessorConfig::low_latency();
610
611        assert!(high_throughput.queue_capacity > default.queue_capacity);
612        assert!(low_latency.max_batch_size < default.max_batch_size);
613    }
614
615    #[test]
616    fn test_concurrent_processing() {
617        let processor = Arc::new(BatchProcessor::new());
618
619        std::thread::scope(|s| {
620            // Multiple producer threads
621            for t in 0..4 {
622                let proc = processor.clone();
623                s.spawn(move || {
624                    let events: Vec<String> =
625                        (0..100).map(|i| create_test_json(t * 100 + i)).collect();
626                    proc.process_batch(events);
627                });
628            }
629        });
630
631        // All 400 events should be in the queue
632        assert_eq!(processor.queue_depth(), 400);
633    }
634
635    #[test]
636    fn test_process_events_direct() {
637        let processor = BatchProcessor::new();
638
639        let events: Vec<Event> = (0..50)
640            .map(|i| {
641                Event::from_strings(
642                    "test.event".to_string(),
643                    format!("entity-{}", i),
644                    "test-stream".to_string(),
645                    json!({"value": i}),
646                    None,
647                )
648                .unwrap()
649            })
650            .collect();
651
652        let result = processor.process_events(events);
653        assert_eq!(result.success_count, 50);
654        assert_eq!(processor.queue_depth(), 50);
655    }
656}