allsource_core/infrastructure/persistence/
batch_processor.rs

1//! High-throughput batch processing pipeline
2//!
3//! This module provides an optimized batch processing system that combines:
4//! - SIMD-accelerated JSON parsing for fast deserialization
5//! - Lock-free sharded queues for parallel ingestion
6//! - Memory pooling for zero-allocation hot paths
7//!
8//! # Performance Target
9//! - 1M+ events/sec sustained throughput
10//! - Sub-millisecond p99 latency
11//! - Linear scalability with CPU cores
12
13use crate::domain::entities::Event;
14use crate::error::{AllSourceError, Result};
15use crate::infrastructure::persistence::lock_free::{LockFreeMetrics, ShardedEventQueue};
16use crate::infrastructure::persistence::simd_json::{SimdJsonParser, SimdJsonStats};
17use bumpalo::Bump;
18use serde::Deserialize;
19use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23/// Configuration for batch processor
24#[derive(Debug, Clone)]
25pub struct BatchProcessorConfig {
26    /// Maximum batch size for processing
27    pub max_batch_size: usize,
28    /// Queue capacity for pending events
29    pub queue_capacity: usize,
30    /// Number of shards for the event queue
31    pub shard_count: usize,
32    /// Arena allocation pool size (bytes)
33    pub arena_size: usize,
34    /// Enable SIMD JSON parsing
35    pub simd_enabled: bool,
36}
37
38impl Default for BatchProcessorConfig {
39    fn default() -> Self {
40        Self {
41            max_batch_size: 10_000,
42            queue_capacity: 1_000_000,
43            shard_count: 16,
44            arena_size: 64 * 1024 * 1024, // 64MB
45            simd_enabled: true,
46        }
47    }
48}
49
50impl BatchProcessorConfig {
51    /// High-throughput configuration for production use
52    pub fn high_throughput() -> Self {
53        Self {
54            max_batch_size: 50_000,
55            queue_capacity: 10_000_000,
56            shard_count: 32,
57            arena_size: 256 * 1024 * 1024, // 256MB
58            simd_enabled: true,
59        }
60    }
61
62    /// Low-latency configuration optimized for quick responses
63    pub fn low_latency() -> Self {
64        Self {
65            max_batch_size: 1_000,
66            queue_capacity: 100_000,
67            shard_count: 8,
68            arena_size: 16 * 1024 * 1024, // 16MB
69            simd_enabled: true,
70        }
71    }
72}
73
74/// Raw event data for batch parsing
75#[derive(Debug, Clone, Deserialize)]
76pub struct RawEventData {
77    pub event_type: String,
78    pub entity_id: String,
79    #[serde(default = "default_stream")]
80    pub stream_id: String,
81    pub data: serde_json::Value,
82    #[serde(default)]
83    pub metadata: Option<serde_json::Value>,
84}
85
86fn default_stream() -> String {
87    "default".to_string()
88}
89
90/// Statistics for batch processing
91#[derive(Debug, Clone)]
92pub struct BatchProcessorStats {
93    /// Total batches processed
94    pub batches_processed: u64,
95    /// Total events processed
96    pub events_processed: u64,
97    /// Total bytes parsed
98    pub bytes_parsed: u64,
99    /// Average batch size
100    pub avg_batch_size: f64,
101    /// Events per second
102    pub events_per_sec: f64,
103    /// Parse throughput in MB/s
104    pub parse_throughput_mbps: f64,
105    /// Current queue depth
106    pub queue_depth: usize,
107    /// Total processing time in nanoseconds
108    pub total_time_ns: u64,
109}
110
111/// High-throughput batch event processor
112///
113/// Combines SIMD JSON parsing with lock-free queues for maximum throughput.
114/// Designed to achieve 1M+ events/sec sustained ingestion.
115pub struct BatchProcessor {
116    config: BatchProcessorConfig,
117    /// SIMD JSON parser
118    json_parser: SimdJsonParser,
119    /// Lock-free sharded queue for processed events
120    event_queue: ShardedEventQueue,
121    /// Lock-free metrics
122    metrics: Arc<LockFreeMetrics>,
123    /// Batch processing statistics
124    batches_processed: AtomicU64,
125    events_processed: AtomicU64,
126    bytes_parsed: AtomicU64,
127    total_time_ns: AtomicU64,
128    /// Current arena pool index for round-robin
129    arena_index: AtomicUsize,
130}
131
132impl BatchProcessor {
133    /// Create a new batch processor with default configuration
134    pub fn new() -> Self {
135        Self::with_config(BatchProcessorConfig::default())
136    }
137
138    /// Create a new batch processor with custom configuration
139    pub fn with_config(config: BatchProcessorConfig) -> Self {
140        let event_queue = ShardedEventQueue::with_shards(config.queue_capacity, config.shard_count);
141
142        Self {
143            config,
144            json_parser: SimdJsonParser::new(),
145            event_queue,
146            metrics: Arc::new(LockFreeMetrics::new()),
147            batches_processed: AtomicU64::new(0),
148            events_processed: AtomicU64::new(0),
149            bytes_parsed: AtomicU64::new(0),
150            total_time_ns: AtomicU64::new(0),
151            arena_index: AtomicUsize::new(0),
152        }
153    }
154
155    /// Process a batch of raw JSON event strings
156    ///
157    /// This is the main entry point for high-throughput ingestion.
158    /// Uses SIMD-accelerated parsing and lock-free queuing.
159    ///
160    /// # Arguments
161    /// * `json_events` - Vector of JSON strings to parse and process
162    ///
163    /// # Returns
164    /// BatchResult with success/failure counts
165    pub fn process_batch(&self, json_events: Vec<String>) -> BatchResult {
166        let start = Instant::now();
167        let batch_size = json_events.len();
168
169        let mut success_count = 0;
170        let mut failure_count = 0;
171        let mut bytes_parsed = 0usize;
172
173        for json_str in json_events {
174            bytes_parsed += json_str.len();
175
176            match self.parse_and_queue_event(json_str) {
177                Ok(()) => {
178                    success_count += 1;
179                    self.metrics.record_ingest();
180                }
181                Err(_) => {
182                    failure_count += 1;
183                    self.metrics.record_error();
184                }
185            }
186        }
187
188        let duration = start.elapsed();
189        self.record_batch_stats(batch_size, bytes_parsed, duration);
190
191        BatchResult {
192            success_count,
193            failure_count,
194            duration,
195            events_per_sec: success_count as f64 / duration.as_secs_f64(),
196        }
197    }
198
199    /// Process a batch of raw JSON bytes (more efficient - avoids string conversion)
200    ///
201    /// # Arguments
202    /// * `json_bytes` - Vector of JSON byte slices to parse
203    ///
204    /// # Returns
205    /// BatchResult with success/failure counts
206    pub fn process_batch_bytes(&self, mut json_bytes: Vec<Vec<u8>>) -> BatchResult {
207        let start = Instant::now();
208        let batch_size = json_bytes.len();
209
210        let mut success_count = 0;
211        let mut failure_count = 0;
212        let mut bytes_parsed = 0usize;
213
214        for bytes in &mut json_bytes {
215            bytes_parsed += bytes.len();
216
217            match self.parse_and_queue_bytes(bytes) {
218                Ok(()) => {
219                    success_count += 1;
220                    self.metrics.record_ingest();
221                }
222                Err(_) => {
223                    failure_count += 1;
224                    self.metrics.record_error();
225                }
226            }
227        }
228
229        let duration = start.elapsed();
230        self.record_batch_stats(batch_size, bytes_parsed, duration);
231
232        BatchResult {
233            success_count,
234            failure_count,
235            duration,
236            events_per_sec: success_count as f64 / duration.as_secs_f64(),
237        }
238    }
239
240    /// Process pre-parsed events directly (fastest path)
241    ///
242    /// Use this when events are already parsed (e.g., from Arrow/Parquet).
243    pub fn process_events(&self, events: Vec<Event>) -> BatchResult {
244        let start = Instant::now();
245        let batch_size = events.len();
246
247        let success_count = self.event_queue.try_push_batch(events);
248        let failure_count = batch_size - success_count;
249
250        self.metrics.record_ingest_batch(success_count as u64);
251        if failure_count > 0 {
252            for _ in 0..failure_count {
253                self.metrics.record_error();
254            }
255        }
256
257        let duration = start.elapsed();
258        self.batches_processed.fetch_add(1, Ordering::Relaxed);
259        self.events_processed
260            .fetch_add(success_count as u64, Ordering::Relaxed);
261        self.total_time_ns
262            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
263
264        BatchResult {
265            success_count,
266            failure_count,
267            duration,
268            events_per_sec: success_count as f64 / duration.as_secs_f64(),
269        }
270    }
271
272    /// Parse and queue a single JSON event string
273    fn parse_and_queue_event(&self, json_str: String) -> Result<()> {
274        let raw: RawEventData = self.json_parser.parse_str(&json_str)?;
275
276        let event = Event::from_strings(
277            raw.event_type,
278            raw.entity_id,
279            raw.stream_id,
280            raw.data,
281            raw.metadata,
282        )?;
283
284        self.event_queue.try_push(event)
285    }
286
287    /// Parse and queue from bytes (SIMD path)
288    fn parse_and_queue_bytes(&self, bytes: &mut [u8]) -> Result<()> {
289        let raw: RawEventData = self.json_parser.parse(bytes)?;
290
291        let event = Event::from_strings(
292            raw.event_type,
293            raw.entity_id,
294            raw.stream_id,
295            raw.data,
296            raw.metadata,
297        )?;
298
299        self.event_queue.try_push(event)
300    }
301
302    /// Record batch statistics
303    fn record_batch_stats(&self, batch_size: usize, bytes: usize, duration: Duration) {
304        self.batches_processed.fetch_add(1, Ordering::Relaxed);
305        self.events_processed
306            .fetch_add(batch_size as u64, Ordering::Relaxed);
307        self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
308        self.total_time_ns
309            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
310    }
311
312    /// Get a batch of processed events from the queue
313    ///
314    /// # Arguments
315    /// * `max_count` - Maximum number of events to retrieve
316    pub fn get_batch(&self, max_count: usize) -> Vec<Event> {
317        self.event_queue.try_pop_batch(max_count)
318    }
319
320    /// Get a single event from the queue
321    pub fn get_event(&self) -> Option<Event> {
322        self.event_queue.try_pop_any()
323    }
324
325    /// Get current queue depth
326    pub fn queue_depth(&self) -> usize {
327        self.event_queue.len()
328    }
329
330    /// Check if queue is empty
331    pub fn is_queue_empty(&self) -> bool {
332        self.event_queue.is_empty()
333    }
334
335    /// Get processing statistics
336    pub fn stats(&self) -> BatchProcessorStats {
337        let batches = self.batches_processed.load(Ordering::Relaxed);
338        let events = self.events_processed.load(Ordering::Relaxed);
339        let bytes = self.bytes_parsed.load(Ordering::Relaxed);
340        let time_ns = self.total_time_ns.load(Ordering::Relaxed);
341
342        let time_secs = time_ns as f64 / 1_000_000_000.0;
343        let events_per_sec = if time_secs > 0.0 {
344            events as f64 / time_secs
345        } else {
346            0.0
347        };
348        let throughput_mbps = if time_secs > 0.0 {
349            (bytes as f64 / 1_000_000.0) / time_secs
350        } else {
351            0.0
352        };
353
354        BatchProcessorStats {
355            batches_processed: batches,
356            events_processed: events,
357            bytes_parsed: bytes,
358            avg_batch_size: if batches > 0 {
359                events as f64 / batches as f64
360            } else {
361                0.0
362            },
363            events_per_sec,
364            parse_throughput_mbps: throughput_mbps,
365            queue_depth: self.event_queue.len(),
366            total_time_ns: time_ns,
367        }
368    }
369
370    /// Get JSON parser statistics
371    pub fn parser_stats(&self) -> &SimdJsonStats {
372        self.json_parser.stats()
373    }
374
375    /// Get metrics collector
376    pub fn metrics(&self) -> Arc<LockFreeMetrics> {
377        self.metrics.clone()
378    }
379
380    /// Get the event queue for direct access
381    pub fn event_queue(&self) -> &ShardedEventQueue {
382        &self.event_queue
383    }
384
385    /// Reset all statistics
386    pub fn reset_stats(&self) {
387        self.batches_processed.store(0, Ordering::Relaxed);
388        self.events_processed.store(0, Ordering::Relaxed);
389        self.bytes_parsed.store(0, Ordering::Relaxed);
390        self.total_time_ns.store(0, Ordering::Relaxed);
391        self.json_parser.reset_stats();
392        self.metrics.reset();
393    }
394}
395
396impl Default for BatchProcessor {
397    fn default() -> Self {
398        Self::new()
399    }
400}
401
402/// Result of batch processing operation
403#[derive(Debug, Clone)]
404pub struct BatchResult {
405    /// Number of successfully processed events
406    pub success_count: usize,
407    /// Number of failed events
408    pub failure_count: usize,
409    /// Total processing duration
410    pub duration: Duration,
411    /// Events processed per second
412    pub events_per_sec: f64,
413}
414
415impl BatchResult {
416    /// Get total events (success + failure)
417    pub fn total(&self) -> usize {
418        self.success_count + self.failure_count
419    }
420
421    /// Get success rate (0.0 to 1.0)
422    pub fn success_rate(&self) -> f64 {
423        let total = self.total();
424        if total > 0 {
425            self.success_count as f64 / total as f64
426        } else {
427            1.0
428        }
429    }
430}
431
432/// Arena-backed batch buffer for zero-allocation processing
433///
434/// Uses bumpalo for fast arena allocation during batch processing.
435/// All allocations are freed together when the arena is reset.
436pub struct ArenaBatchBuffer {
437    arena: Bump,
438    capacity: usize,
439}
440
441impl ArenaBatchBuffer {
442    /// Create a new arena buffer with specified capacity
443    pub fn new(capacity_bytes: usize) -> Self {
444        Self {
445            arena: Bump::with_capacity(capacity_bytes),
446            capacity: capacity_bytes,
447        }
448    }
449
450    /// Allocate a byte slice in the arena
451    pub fn alloc_bytes(&self, data: &[u8]) -> &[u8] {
452        self.arena.alloc_slice_copy(data)
453    }
454
455    /// Allocate a string in the arena
456    pub fn alloc_str(&self, s: &str) -> &str {
457        self.arena.alloc_str(s)
458    }
459
460    /// Get current arena allocation
461    pub fn allocated(&self) -> usize {
462        self.arena.allocated_bytes()
463    }
464
465    /// Reset the arena (frees all allocations)
466    pub fn reset(&mut self) {
467        self.arena.reset();
468    }
469
470    /// Get capacity
471    pub fn capacity(&self) -> usize {
472        self.capacity
473    }
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use serde_json::json;
480
481    fn create_test_json(id: u32) -> String {
482        json!({
483            "event_type": "test.event",
484            "entity_id": format!("entity-{}", id),
485            "stream_id": "test-stream",
486            "data": {"value": id}
487        })
488        .to_string()
489    }
490
491    #[test]
492    fn test_create_batch_processor() {
493        let processor = BatchProcessor::new();
494        assert!(processor.is_queue_empty());
495        assert_eq!(processor.queue_depth(), 0);
496    }
497
498    #[test]
499    fn test_process_single_batch() {
500        let processor = BatchProcessor::new();
501
502        let events: Vec<String> = (0..100).map(create_test_json).collect();
503        let result = processor.process_batch(events);
504
505        assert_eq!(result.success_count, 100);
506        assert_eq!(result.failure_count, 0);
507        assert_eq!(processor.queue_depth(), 100);
508    }
509
510    #[test]
511    fn test_process_batch_bytes() {
512        let processor = BatchProcessor::new();
513
514        let events: Vec<Vec<u8>> = (0..50)
515            .map(|i| create_test_json(i).into_bytes())
516            .collect();
517        let result = processor.process_batch_bytes(events);
518
519        assert_eq!(result.success_count, 50);
520        assert_eq!(result.failure_count, 0);
521    }
522
523    #[test]
524    fn test_get_batch() {
525        let processor = BatchProcessor::new();
526
527        let events: Vec<String> = (0..100).map(create_test_json).collect();
528        processor.process_batch(events);
529
530        let batch = processor.get_batch(30);
531        assert_eq!(batch.len(), 30);
532        assert_eq!(processor.queue_depth(), 70);
533    }
534
535    #[test]
536    fn test_stats() {
537        let processor = BatchProcessor::new();
538
539        let events: Vec<String> = (0..100).map(create_test_json).collect();
540        processor.process_batch(events);
541
542        let stats = processor.stats();
543        assert_eq!(stats.batches_processed, 1);
544        assert_eq!(stats.events_processed, 100);
545        assert!(stats.events_per_sec > 0.0);
546    }
547
548    #[test]
549    fn test_invalid_json() {
550        let processor = BatchProcessor::new();
551
552        let events = vec![
553            create_test_json(1),
554            "invalid json".to_string(),
555            create_test_json(3),
556        ];
557        let result = processor.process_batch(events);
558
559        assert_eq!(result.success_count, 2);
560        assert_eq!(result.failure_count, 1);
561    }
562
563    #[test]
564    fn test_batch_result_metrics() {
565        let result = BatchResult {
566            success_count: 90,
567            failure_count: 10,
568            duration: Duration::from_millis(100),
569            events_per_sec: 900.0,
570        };
571
572        assert_eq!(result.total(), 100);
573        assert!((result.success_rate() - 0.9).abs() < 0.001);
574    }
575
576    #[test]
577    fn test_arena_batch_buffer() {
578        let mut buffer = ArenaBatchBuffer::new(1024);
579
580        let s1 = buffer.alloc_str("hello");
581        let s2 = buffer.alloc_str("world");
582
583        assert_eq!(s1, "hello");
584        assert_eq!(s2, "world");
585        let allocated_before = buffer.allocated();
586        assert!(allocated_before > 0);
587
588        // Reset makes memory available for reuse (but allocated_bytes may not change)
589        buffer.reset();
590
591        // After reset, new allocations should reuse the memory
592        let s3 = buffer.alloc_str("test");
593        assert_eq!(s3, "test");
594
595        // Verify the buffer is functional after reset
596        assert!(buffer.capacity() >= 1024);
597    }
598
599    #[test]
600    fn test_config_presets() {
601        let default = BatchProcessorConfig::default();
602        let high_throughput = BatchProcessorConfig::high_throughput();
603        let low_latency = BatchProcessorConfig::low_latency();
604
605        assert!(high_throughput.queue_capacity > default.queue_capacity);
606        assert!(low_latency.max_batch_size < default.max_batch_size);
607    }
608
609    #[test]
610    fn test_concurrent_processing() {
611        let processor = Arc::new(BatchProcessor::new());
612
613        std::thread::scope(|s| {
614            // Multiple producer threads
615            for t in 0..4 {
616                let proc = processor.clone();
617                s.spawn(move || {
618                    let events: Vec<String> = (0..100)
619                        .map(|i| create_test_json(t * 100 + i))
620                        .collect();
621                    proc.process_batch(events);
622                });
623            }
624        });
625
626        // All 400 events should be in the queue
627        assert_eq!(processor.queue_depth(), 400);
628    }
629
630    #[test]
631    fn test_process_events_direct() {
632        let processor = BatchProcessor::new();
633
634        let events: Vec<Event> = (0..50)
635            .map(|i| {
636                Event::from_strings(
637                    "test.event".to_string(),
638                    format!("entity-{}", i),
639                    "test-stream".to_string(),
640                    json!({"value": i}),
641                    None,
642                )
643                .unwrap()
644            })
645            .collect();
646
647        let result = processor.process_events(events);
648        assert_eq!(result.success_count, 50);
649        assert_eq!(processor.queue_depth(), 50);
650    }
651}