Skip to main content

allsource_core/infrastructure/persistence/
batch_processor.rs

1//! High-throughput batch processing pipeline
2//!
3//! This module provides an optimized batch processing system that combines:
4//! - SIMD-accelerated JSON parsing for fast deserialization
5//! - Lock-free sharded queues for parallel ingestion
6//! - Memory pooling for zero-allocation hot paths
7//!
8//! # Performance Target
9//! - 1M+ events/sec sustained throughput
10//! - Sub-millisecond p99 latency
11//! - Linear scalability with CPU cores
12
13use crate::{
14    domain::entities::Event,
15    error::Result,
16    infrastructure::persistence::{
17        lock_free::{LockFreeMetrics, ShardedEventQueue},
18        simd_json::{SimdJsonParser, SimdJsonStats},
19    },
20};
21use bumpalo::Bump;
22use serde::Deserialize;
23use std::{
24    sync::{
25        Arc,
26        atomic::{AtomicU64, AtomicUsize, Ordering},
27    },
28    time::{Duration, Instant},
29};
30
31/// Configuration for batch processor
32#[derive(Debug, Clone)]
33pub struct BatchProcessorConfig {
34    /// Maximum batch size for processing
35    pub max_batch_size: usize,
36    /// Queue capacity for pending events
37    pub queue_capacity: usize,
38    /// Number of shards for the event queue
39    pub shard_count: usize,
40    /// Arena allocation pool size (bytes)
41    pub arena_size: usize,
42    /// Enable SIMD JSON parsing
43    pub simd_enabled: bool,
44}
45
46impl Default for BatchProcessorConfig {
47    fn default() -> Self {
48        Self {
49            max_batch_size: 10_000,
50            queue_capacity: 1_000_000,
51            shard_count: 16,
52            arena_size: 64 * 1024 * 1024, // 64MB
53            simd_enabled: true,
54        }
55    }
56}
57
58impl BatchProcessorConfig {
59    /// High-throughput configuration for production use
60    pub fn high_throughput() -> Self {
61        Self {
62            max_batch_size: 50_000,
63            queue_capacity: 10_000_000,
64            shard_count: 32,
65            arena_size: 256 * 1024 * 1024, // 256MB
66            simd_enabled: true,
67        }
68    }
69
70    /// Low-latency configuration optimized for quick responses
71    pub fn low_latency() -> Self {
72        Self {
73            max_batch_size: 1_000,
74            queue_capacity: 100_000,
75            shard_count: 8,
76            arena_size: 16 * 1024 * 1024, // 16MB
77            simd_enabled: true,
78        }
79    }
80}
81
82/// Raw event data for batch parsing
83#[derive(Debug, Clone, Deserialize)]
84pub struct RawEventData {
85    pub event_type: String,
86    pub entity_id: String,
87    #[serde(default = "default_stream")]
88    pub stream_id: String,
89    pub data: serde_json::Value,
90    #[serde(default)]
91    pub metadata: Option<serde_json::Value>,
92}
93
94fn default_stream() -> String {
95    "default".to_string()
96}
97
98/// Statistics for batch processing
99#[derive(Debug, Clone)]
100pub struct BatchProcessorStats {
101    /// Total batches processed
102    pub batches_processed: u64,
103    /// Total events processed
104    pub events_processed: u64,
105    /// Total bytes parsed
106    pub bytes_parsed: u64,
107    /// Average batch size
108    pub avg_batch_size: f64,
109    /// Events per second
110    pub events_per_sec: f64,
111    /// Parse throughput in MB/s
112    pub parse_throughput_mbps: f64,
113    /// Current queue depth
114    pub queue_depth: usize,
115    /// Total processing time in nanoseconds
116    pub total_time_ns: u64,
117}
118
119/// High-throughput batch event processor
120///
121/// Combines SIMD JSON parsing with lock-free queues for maximum throughput.
122/// Designed to achieve 1M+ events/sec sustained ingestion.
123pub struct BatchProcessor {
124    config: BatchProcessorConfig,
125    /// SIMD JSON parser
126    json_parser: SimdJsonParser,
127    /// Lock-free sharded queue for processed events
128    event_queue: ShardedEventQueue,
129    /// Lock-free metrics
130    metrics: Arc<LockFreeMetrics>,
131    /// Batch processing statistics
132    batches_processed: AtomicU64,
133    events_processed: AtomicU64,
134    bytes_parsed: AtomicU64,
135    total_time_ns: AtomicU64,
136    /// Current arena pool index for round-robin
137    arena_index: AtomicUsize,
138}
139
140impl BatchProcessor {
141    /// Create a new batch processor with default configuration
142    pub fn new() -> Self {
143        Self::with_config(BatchProcessorConfig::default())
144    }
145
146    /// Create a new batch processor with custom configuration
147    pub fn with_config(config: BatchProcessorConfig) -> Self {
148        let event_queue = ShardedEventQueue::with_shards(config.queue_capacity, config.shard_count);
149
150        Self {
151            config,
152            json_parser: SimdJsonParser::new(),
153            event_queue,
154            metrics: Arc::new(LockFreeMetrics::new()),
155            batches_processed: AtomicU64::new(0),
156            events_processed: AtomicU64::new(0),
157            bytes_parsed: AtomicU64::new(0),
158            total_time_ns: AtomicU64::new(0),
159            arena_index: AtomicUsize::new(0),
160        }
161    }
162
163    /// Process a batch of raw JSON event strings
164    ///
165    /// This is the main entry point for high-throughput ingestion.
166    /// Uses SIMD-accelerated parsing and lock-free queuing.
167    ///
168    /// # Arguments
169    /// * `json_events` - Vector of JSON strings to parse and process
170    ///
171    /// # Returns
172    /// BatchResult with success/failure counts
173    #[cfg_attr(feature = "hotpath", hotpath::measure)]
174    pub fn process_batch(&self, json_events: Vec<String>) -> BatchResult {
175        let start = Instant::now();
176        let batch_size = json_events.len();
177
178        let mut success_count = 0;
179        let mut failure_count = 0;
180        let mut bytes_parsed = 0usize;
181
182        for json_str in json_events {
183            bytes_parsed += json_str.len();
184
185            if let Ok(()) = self.parse_and_queue_event(&json_str) {
186                success_count += 1;
187                self.metrics.record_ingest();
188            } else {
189                failure_count += 1;
190                self.metrics.record_error();
191            }
192        }
193
194        let duration = start.elapsed();
195        self.record_batch_stats(batch_size, bytes_parsed, duration);
196
197        BatchResult {
198            success_count,
199            failure_count,
200            duration,
201            events_per_sec: success_count as f64 / duration.as_secs_f64(),
202        }
203    }
204
205    /// Process a batch of raw JSON bytes (more efficient - avoids string conversion)
206    ///
207    /// # Arguments
208    /// * `json_bytes` - Vector of JSON byte slices to parse
209    ///
210    /// # Returns
211    /// BatchResult with success/failure counts
212    #[cfg_attr(feature = "hotpath", hotpath::measure)]
213    pub fn process_batch_bytes(&self, mut json_bytes: Vec<Vec<u8>>) -> BatchResult {
214        let start = Instant::now();
215        let batch_size = json_bytes.len();
216
217        let mut success_count = 0;
218        let mut failure_count = 0;
219        let mut bytes_parsed = 0usize;
220
221        for bytes in &mut json_bytes {
222            bytes_parsed += bytes.len();
223
224            if let Ok(()) = self.parse_and_queue_bytes(bytes) {
225                success_count += 1;
226                self.metrics.record_ingest();
227            } else {
228                failure_count += 1;
229                self.metrics.record_error();
230            }
231        }
232
233        let duration = start.elapsed();
234        self.record_batch_stats(batch_size, bytes_parsed, duration);
235
236        BatchResult {
237            success_count,
238            failure_count,
239            duration,
240            events_per_sec: success_count as f64 / duration.as_secs_f64(),
241        }
242    }
243
244    /// Process pre-parsed events directly (fastest path)
245    ///
246    /// Use this when events are already parsed (e.g., from Arrow/Parquet).
247    pub fn process_events(&self, events: Vec<Event>) -> BatchResult {
248        let start = Instant::now();
249        let batch_size = events.len();
250
251        let success_count = self.event_queue.try_push_batch(events);
252        let failure_count = batch_size - success_count;
253
254        self.metrics.record_ingest_batch(success_count as u64);
255        if failure_count > 0 {
256            for _ in 0..failure_count {
257                self.metrics.record_error();
258            }
259        }
260
261        let duration = start.elapsed();
262        self.batches_processed.fetch_add(1, Ordering::Relaxed);
263        self.events_processed
264            .fetch_add(success_count as u64, Ordering::Relaxed);
265        self.total_time_ns
266            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
267
268        BatchResult {
269            success_count,
270            failure_count,
271            duration,
272            events_per_sec: success_count as f64 / duration.as_secs_f64(),
273        }
274    }
275
276    /// Parse and queue a single JSON event string
277    fn parse_and_queue_event(&self, json_str: &str) -> Result<()> {
278        let raw: RawEventData = self.json_parser.parse_str(json_str)?;
279
280        let event = Event::from_strings(
281            raw.event_type,
282            raw.entity_id,
283            raw.stream_id,
284            raw.data,
285            raw.metadata,
286        )?;
287
288        self.event_queue.try_push(event)
289    }
290
291    /// Parse and queue from bytes (SIMD path)
292    fn parse_and_queue_bytes(&self, bytes: &mut [u8]) -> Result<()> {
293        let raw: RawEventData = self.json_parser.parse(bytes)?;
294
295        let event = Event::from_strings(
296            raw.event_type,
297            raw.entity_id,
298            raw.stream_id,
299            raw.data,
300            raw.metadata,
301        )?;
302
303        self.event_queue.try_push(event)
304    }
305
306    /// Record batch statistics
307    fn record_batch_stats(&self, batch_size: usize, bytes: usize, duration: Duration) {
308        self.batches_processed.fetch_add(1, Ordering::Relaxed);
309        self.events_processed
310            .fetch_add(batch_size as u64, Ordering::Relaxed);
311        self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
312        self.total_time_ns
313            .fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
314    }
315
316    /// Get a batch of processed events from the queue
317    ///
318    /// # Arguments
319    /// * `max_count` - Maximum number of events to retrieve
320    #[cfg_attr(feature = "hotpath", hotpath::measure)]
321    pub fn get_batch(&self, max_count: usize) -> Vec<Event> {
322        self.event_queue.try_pop_batch(max_count)
323    }
324
325    /// Get a single event from the queue
326    pub fn get_event(&self) -> Option<Event> {
327        self.event_queue.try_pop_any()
328    }
329
330    /// Get current queue depth
331    pub fn queue_depth(&self) -> usize {
332        self.event_queue.len()
333    }
334
335    /// Check if queue is empty
336    pub fn is_queue_empty(&self) -> bool {
337        self.event_queue.is_empty()
338    }
339
340    /// Get processing statistics
341    pub fn stats(&self) -> BatchProcessorStats {
342        let batches = self.batches_processed.load(Ordering::Relaxed);
343        let events = self.events_processed.load(Ordering::Relaxed);
344        let bytes = self.bytes_parsed.load(Ordering::Relaxed);
345        let time_ns = self.total_time_ns.load(Ordering::Relaxed);
346
347        let time_secs = time_ns as f64 / 1_000_000_000.0;
348        let events_per_sec = if time_secs > 0.0 {
349            events as f64 / time_secs
350        } else {
351            0.0
352        };
353        let throughput_mbps = if time_secs > 0.0 {
354            (bytes as f64 / 1_000_000.0) / time_secs
355        } else {
356            0.0
357        };
358
359        BatchProcessorStats {
360            batches_processed: batches,
361            events_processed: events,
362            bytes_parsed: bytes,
363            avg_batch_size: if batches > 0 {
364                events as f64 / batches as f64
365            } else {
366                0.0
367            },
368            events_per_sec,
369            parse_throughput_mbps: throughput_mbps,
370            queue_depth: self.event_queue.len(),
371            total_time_ns: time_ns,
372        }
373    }
374
375    /// Get JSON parser statistics
376    pub fn parser_stats(&self) -> &SimdJsonStats {
377        self.json_parser.stats()
378    }
379
380    /// Get metrics collector
381    pub fn metrics(&self) -> Arc<LockFreeMetrics> {
382        self.metrics.clone()
383    }
384
385    /// Get the event queue for direct access
386    pub fn event_queue(&self) -> &ShardedEventQueue {
387        &self.event_queue
388    }
389
390    /// Reset all statistics
391    pub fn reset_stats(&self) {
392        self.batches_processed.store(0, Ordering::Relaxed);
393        self.events_processed.store(0, Ordering::Relaxed);
394        self.bytes_parsed.store(0, Ordering::Relaxed);
395        self.total_time_ns.store(0, Ordering::Relaxed);
396        self.json_parser.reset_stats();
397        self.metrics.reset();
398    }
399}
400
401impl Default for BatchProcessor {
402    fn default() -> Self {
403        Self::new()
404    }
405}
406
407/// Result of batch processing operation
408#[derive(Debug, Clone)]
409pub struct BatchResult {
410    /// Number of successfully processed events
411    pub success_count: usize,
412    /// Number of failed events
413    pub failure_count: usize,
414    /// Total processing duration
415    pub duration: Duration,
416    /// Events processed per second
417    pub events_per_sec: f64,
418}
419
420impl BatchResult {
421    /// Get total events (success + failure)
422    pub fn total(&self) -> usize {
423        self.success_count + self.failure_count
424    }
425
426    /// Get success rate (0.0 to 1.0)
427    pub fn success_rate(&self) -> f64 {
428        let total = self.total();
429        if total > 0 {
430            self.success_count as f64 / total as f64
431        } else {
432            1.0
433        }
434    }
435}
436
437/// Arena-backed batch buffer for zero-allocation processing
438///
439/// Uses bumpalo for fast arena allocation during batch processing.
440/// All allocations are freed together when the arena is reset.
441pub struct ArenaBatchBuffer {
442    arena: Bump,
443    capacity: usize,
444}
445
446impl ArenaBatchBuffer {
447    /// Create a new arena buffer with specified capacity
448    pub fn new(capacity_bytes: usize) -> Self {
449        Self {
450            arena: Bump::with_capacity(capacity_bytes),
451            capacity: capacity_bytes,
452        }
453    }
454
455    /// Allocate a byte slice in the arena
456    pub fn alloc_bytes(&self, data: &[u8]) -> &[u8] {
457        self.arena.alloc_slice_copy(data)
458    }
459
460    /// Allocate a string in the arena
461    pub fn alloc_str(&self, s: &str) -> &str {
462        self.arena.alloc_str(s)
463    }
464
465    /// Get current arena allocation
466    pub fn allocated(&self) -> usize {
467        self.arena.allocated_bytes()
468    }
469
470    /// Reset the arena (frees all allocations)
471    pub fn reset(&mut self) {
472        self.arena.reset();
473    }
474
475    /// Get capacity
476    pub fn capacity(&self) -> usize {
477        self.capacity
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use serde_json::json;
485
486    fn create_test_json(id: u32) -> String {
487        json!({
488            "event_type": "test.event",
489            "entity_id": format!("entity-{}", id),
490            "stream_id": "test-stream",
491            "data": {"value": id}
492        })
493        .to_string()
494    }
495
496    #[test]
497    fn test_create_batch_processor() {
498        let processor = BatchProcessor::new();
499        assert!(processor.is_queue_empty());
500        assert_eq!(processor.queue_depth(), 0);
501    }
502
503    #[test]
504    fn test_process_single_batch() {
505        let processor = BatchProcessor::new();
506
507        let events: Vec<String> = (0..100).map(create_test_json).collect();
508        let result = processor.process_batch(events);
509
510        assert_eq!(result.success_count, 100);
511        assert_eq!(result.failure_count, 0);
512        assert_eq!(processor.queue_depth(), 100);
513    }
514
515    #[test]
516    fn test_process_batch_bytes() {
517        let processor = BatchProcessor::new();
518
519        let events: Vec<Vec<u8>> = (0..50).map(|i| create_test_json(i).into_bytes()).collect();
520        let result = processor.process_batch_bytes(events);
521
522        assert_eq!(result.success_count, 50);
523        assert_eq!(result.failure_count, 0);
524    }
525
526    #[test]
527    fn test_get_batch() {
528        let processor = BatchProcessor::new();
529
530        let events: Vec<String> = (0..100).map(create_test_json).collect();
531        processor.process_batch(events);
532
533        let batch = processor.get_batch(30);
534        assert_eq!(batch.len(), 30);
535        assert_eq!(processor.queue_depth(), 70);
536    }
537
538    #[test]
539    fn test_stats() {
540        let processor = BatchProcessor::new();
541
542        let events: Vec<String> = (0..100).map(create_test_json).collect();
543        processor.process_batch(events);
544
545        let stats = processor.stats();
546        assert_eq!(stats.batches_processed, 1);
547        assert_eq!(stats.events_processed, 100);
548        assert!(stats.events_per_sec > 0.0);
549    }
550
551    #[test]
552    fn test_invalid_json() {
553        let processor = BatchProcessor::new();
554
555        let events = vec![
556            create_test_json(1),
557            "invalid json".to_string(),
558            create_test_json(3),
559        ];
560        let result = processor.process_batch(events);
561
562        assert_eq!(result.success_count, 2);
563        assert_eq!(result.failure_count, 1);
564    }
565
566    #[test]
567    fn test_batch_result_metrics() {
568        let result = BatchResult {
569            success_count: 90,
570            failure_count: 10,
571            duration: Duration::from_millis(100),
572            events_per_sec: 900.0,
573        };
574
575        assert_eq!(result.total(), 100);
576        assert!((result.success_rate() - 0.9).abs() < 0.001);
577    }
578
579    #[test]
580    fn test_arena_batch_buffer() {
581        let mut buffer = ArenaBatchBuffer::new(1024);
582
583        let s1 = buffer.alloc_str("hello");
584        let s2 = buffer.alloc_str("world");
585
586        assert_eq!(s1, "hello");
587        assert_eq!(s2, "world");
588        let allocated_before = buffer.allocated();
589        assert!(allocated_before > 0);
590
591        // Reset makes memory available for reuse (but allocated_bytes may not change)
592        buffer.reset();
593
594        // After reset, new allocations should reuse the memory
595        let s3 = buffer.alloc_str("test");
596        assert_eq!(s3, "test");
597
598        // Verify the buffer is functional after reset
599        assert!(buffer.capacity() >= 1024);
600    }
601
602    #[test]
603    fn test_config_presets() {
604        let default = BatchProcessorConfig::default();
605        let high_throughput = BatchProcessorConfig::high_throughput();
606        let low_latency = BatchProcessorConfig::low_latency();
607
608        assert!(high_throughput.queue_capacity > default.queue_capacity);
609        assert!(low_latency.max_batch_size < default.max_batch_size);
610    }
611
612    #[test]
613    fn test_concurrent_processing() {
614        let processor = Arc::new(BatchProcessor::new());
615
616        std::thread::scope(|s| {
617            // Multiple producer threads
618            for t in 0..4 {
619                let proc = processor.clone();
620                s.spawn(move || {
621                    let events: Vec<String> =
622                        (0..100).map(|i| create_test_json(t * 100 + i)).collect();
623                    proc.process_batch(events);
624                });
625            }
626        });
627
628        // All 400 events should be in the queue
629        assert_eq!(processor.queue_depth(), 400);
630    }
631
632    #[test]
633    fn test_process_events_direct() {
634        let processor = BatchProcessor::new();
635
636        let events: Vec<Event> = (0..50)
637            .map(|i| {
638                Event::from_strings(
639                    "test.event".to_string(),
640                    format!("entity-{i}"),
641                    "test-stream".to_string(),
642                    json!({"value": i}),
643                    None,
644                )
645                .unwrap()
646            })
647            .collect();
648
649        let result = processor.process_events(events);
650        assert_eq!(result.success_count, 50);
651        assert_eq!(processor.queue_depth(), 50);
652    }
653}