Skip to main content

allsource_core/infrastructure/persistence/
simd_json.rs

1//! SIMD-accelerated JSON parsing for high-throughput event ingestion
2//!
3//! This module provides zero-copy JSON parsing using simd-json, which leverages
4//! SIMD instructions (AVX2, SSE4.2, NEON) for ~2-3x faster parsing compared to
5//! serde_json.
6//!
7//! # Performance Characteristics
8//! - Zero-copy parsing where possible
9//! - SIMD-accelerated structural parsing
10//! - Batch processing support
11//! - Memory-efficient arena allocation
12
13use bumpalo::Bump;
14use serde::de::DeserializeOwned;
15use simd_json::prelude::*;
16use std::{
17    sync::atomic::{AtomicU64, Ordering},
18    time::Instant,
19};
20
21/// Statistics for SIMD JSON parsing performance
22#[derive(Debug, Default)]
23pub struct SimdJsonStats {
24    /// Total bytes parsed
25    pub bytes_parsed: AtomicU64,
26    /// Total documents parsed
27    pub documents_parsed: AtomicU64,
28    /// Total parsing time in nanoseconds
29    pub parse_time_ns: AtomicU64,
30    /// Parse errors encountered
31    pub parse_errors: AtomicU64,
32}
33
34impl SimdJsonStats {
35    /// Create new stats tracker
36    pub fn new() -> Self {
37        Self::default()
38    }
39
40    /// Get throughput in MB/s
41    pub fn throughput_mbps(&self) -> f64 {
42        let bytes = self.bytes_parsed.load(Ordering::Relaxed) as f64;
43        let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
44        if time_ns > 0.0 {
45            (bytes / 1_000_000.0) / (time_ns / 1_000_000_000.0)
46        } else {
47            0.0
48        }
49    }
50
51    /// Get documents per second
52    pub fn docs_per_second(&self) -> f64 {
53        let docs = self.documents_parsed.load(Ordering::Relaxed) as f64;
54        let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
55        if time_ns > 0.0 {
56            docs / (time_ns / 1_000_000_000.0)
57        } else {
58            0.0
59        }
60    }
61
62    /// Record a successful parse
63    fn record_parse(&self, bytes: usize, duration_ns: u64) {
64        self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
65        self.documents_parsed.fetch_add(1, Ordering::Relaxed);
66        self.parse_time_ns.fetch_add(duration_ns, Ordering::Relaxed);
67    }
68
69    /// Record a parse error
70    fn record_error(&self) {
71        self.parse_errors.fetch_add(1, Ordering::Relaxed);
72    }
73
74    /// Reset all statistics
75    pub fn reset(&self) {
76        self.bytes_parsed.store(0, Ordering::Relaxed);
77        self.documents_parsed.store(0, Ordering::Relaxed);
78        self.parse_time_ns.store(0, Ordering::Relaxed);
79        self.parse_errors.store(0, Ordering::Relaxed);
80    }
81}
82
83/// High-performance JSON parser using SIMD instructions
84pub struct SimdJsonParser {
85    stats: SimdJsonStats,
86}
87
88impl Default for SimdJsonParser {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl SimdJsonParser {
95    /// Create a new SIMD JSON parser
96    pub fn new() -> Self {
97        Self {
98            stats: SimdJsonStats::new(),
99        }
100    }
101
102    /// Parse JSON bytes into a typed value using SIMD acceleration
103    ///
104    /// # Arguments
105    /// * `data` - Mutable JSON bytes (simd-json requires mutable access for in-place parsing)
106    ///
107    /// # Returns
108    /// Parsed value or error
109    #[cfg_attr(feature = "hotpath", hotpath::measure)]
110    pub fn parse<T: DeserializeOwned>(&self, data: &mut [u8]) -> Result<T, SimdJsonError> {
111        let start = Instant::now();
112        let len = data.len();
113
114        match simd_json::from_slice(data) {
115            Ok(value) => {
116                let duration = start.elapsed().as_nanos() as u64;
117                self.stats.record_parse(len, duration);
118                Ok(value)
119            }
120            Err(e) => {
121                self.stats.record_error();
122                Err(SimdJsonError::ParseError(e.to_string()))
123            }
124        }
125    }
126
127    /// Parse JSON string into a typed value
128    ///
129    /// Note: This creates a copy of the string for mutation. For zero-copy,
130    /// use `parse` with a mutable byte slice.
131    pub fn parse_str<T: DeserializeOwned>(&self, data: &str) -> Result<T, SimdJsonError> {
132        let mut bytes = data.as_bytes().to_vec();
133        self.parse(&mut bytes)
134    }
135
136    /// Parse multiple JSON documents in a batch
137    ///
138    /// Optimized for processing many small documents efficiently.
139    #[cfg_attr(feature = "hotpath", hotpath::measure)]
140    pub fn parse_batch<T: DeserializeOwned>(
141        &self,
142        documents: &mut [Vec<u8>],
143    ) -> Vec<Result<T, SimdJsonError>> {
144        documents.iter_mut().map(|doc| self.parse(doc)).collect()
145    }
146
147    /// Parse JSON with arena allocation for zero-copy string handling
148    ///
149    /// The arena provides fast bump allocation and all allocations are freed
150    /// together when the arena is dropped.
151    pub fn parse_with_arena<T: DeserializeOwned>(
152        &self,
153        data: &mut [u8],
154        _arena: &Bump,
155    ) -> Result<T, SimdJsonError> {
156        // Currently just uses standard parsing, but arena can be used for
157        // custom string interning or buffer management in the future
158        self.parse(data)
159    }
160
161    /// Get parsing statistics
162    pub fn stats(&self) -> &SimdJsonStats {
163        &self.stats
164    }
165
166    /// Reset statistics
167    pub fn reset_stats(&self) {
168        self.stats.reset();
169    }
170}
171
172/// Errors from SIMD JSON parsing
173#[derive(Debug, thiserror::Error)]
174pub enum SimdJsonError {
175    #[error("JSON parse error: {0}")]
176    ParseError(String),
177
178    #[error("Invalid UTF-8 in JSON: {0}")]
179    Utf8Error(#[from] std::str::Utf8Error),
180
181    #[error("Buffer too small for parsing")]
182    BufferTooSmall,
183}
184
185/// Batch event parser optimized for high-throughput ingestion
186pub struct BatchEventParser {
187    parser: SimdJsonParser,
188    /// Pre-allocated buffer for batch processing
189    buffer_pool: Vec<Vec<u8>>,
190    /// Maximum batch size
191    max_batch_size: usize,
192}
193
194impl BatchEventParser {
195    /// Create a new batch parser with specified capacity
196    pub fn new(max_batch_size: usize) -> Self {
197        Self {
198            parser: SimdJsonParser::new(),
199            buffer_pool: Vec::with_capacity(max_batch_size),
200            max_batch_size,
201        }
202    }
203
204    /// Parse a batch of JSON event strings
205    pub fn parse_events<T: DeserializeOwned>(
206        &mut self,
207        events: &[String],
208    ) -> Vec<Result<T, SimdJsonError>> {
209        // Reuse buffers from pool
210        self.buffer_pool.clear();
211        self.buffer_pool
212            .extend(events.iter().map(|e| e.as_bytes().to_vec()));
213
214        self.parser.parse_batch(&mut self.buffer_pool)
215    }
216
217    /// Parse events from byte slices (more efficient - avoids string conversion)
218    #[cfg_attr(feature = "hotpath", hotpath::measure)]
219    pub fn parse_events_bytes<T: DeserializeOwned>(
220        &self,
221        events: &mut [Vec<u8>],
222    ) -> Vec<Result<T, SimdJsonError>> {
223        self.parser.parse_batch(events)
224    }
225
226    /// Get the underlying parser's statistics
227    pub fn stats(&self) -> &SimdJsonStats {
228        self.parser.stats()
229    }
230
231    /// Maximum batch size
232    pub fn max_batch_size(&self) -> usize {
233        self.max_batch_size
234    }
235}
236
237/// Zero-copy JSON value for read-only access
238///
239/// This wraps simd-json's tape-based representation for efficient
240/// field access without full deserialization.
241pub struct ZeroCopyJson<'a> {
242    tape: simd_json::BorrowedValue<'a>,
243}
244
245impl<'a> ZeroCopyJson<'a> {
246    /// Parse JSON into zero-copy representation
247    pub fn parse(data: &'a mut [u8]) -> Result<Self, SimdJsonError> {
248        let tape = simd_json::to_borrowed_value(data)
249            .map_err(|e| SimdJsonError::ParseError(e.to_string()))?;
250        Ok(Self { tape })
251    }
252
253    /// Get a string field without copying
254    pub fn get_str(&self, key: &str) -> Option<&str> {
255        self.tape.get(key).and_then(|v| v.as_str())
256    }
257
258    /// Get a numeric field
259    pub fn get_i64(&self, key: &str) -> Option<i64> {
260        self.tape
261            .get(key)
262            .and_then(simd_json::prelude::ValueAsScalar::as_i64)
263    }
264
265    /// Get a float field
266    pub fn get_f64(&self, key: &str) -> Option<f64> {
267        self.tape
268            .get(key)
269            .and_then(simd_json::prelude::ValueAsScalar::as_f64)
270    }
271
272    /// Get a boolean field
273    pub fn get_bool(&self, key: &str) -> Option<bool> {
274        self.tape
275            .get(key)
276            .and_then(simd_json::prelude::ValueAsScalar::as_bool)
277    }
278
279    /// Check if a field exists
280    pub fn contains_key(&self, key: &str) -> bool {
281        self.tape.get(key).is_some()
282    }
283
284    /// Get the underlying borrowed value
285    pub fn as_value(&self) -> &simd_json::BorrowedValue<'a> {
286        &self.tape
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use serde::Deserialize;
294
295    #[derive(Debug, Deserialize, PartialEq)]
296    struct TestEvent {
297        id: String,
298        #[serde(rename = "type")]
299        event_type: String,
300        value: i64,
301    }
302
303    #[test]
304    fn test_simd_json_parse() {
305        let parser = SimdJsonParser::new();
306        let mut json = r#"{"id":"123","type":"test","value":42}"#.as_bytes().to_vec();
307
308        let result: TestEvent = parser.parse(&mut json).unwrap();
309        assert_eq!(result.id, "123");
310        assert_eq!(result.event_type, "test");
311        assert_eq!(result.value, 42);
312    }
313
314    #[test]
315    fn test_simd_json_parse_str() {
316        let parser = SimdJsonParser::new();
317        let json = r#"{"id":"456","type":"event","value":100}"#;
318
319        let result: TestEvent = parser.parse_str(json).unwrap();
320        assert_eq!(result.id, "456");
321        assert_eq!(result.event_type, "event");
322        assert_eq!(result.value, 100);
323    }
324
325    #[test]
326    fn test_batch_parsing() {
327        let parser = SimdJsonParser::new();
328        let mut docs: Vec<Vec<u8>> = vec![
329            r#"{"id":"1","type":"a","value":1}"#.as_bytes().to_vec(),
330            r#"{"id":"2","type":"b","value":2}"#.as_bytes().to_vec(),
331            r#"{"id":"3","type":"c","value":3}"#.as_bytes().to_vec(),
332        ];
333
334        let results: Vec<Result<TestEvent, _>> = parser.parse_batch(&mut docs);
335        assert_eq!(results.len(), 3);
336
337        for (i, result) in results.into_iter().enumerate() {
338            let event = result.unwrap();
339            assert_eq!(event.id, (i + 1).to_string());
340            assert_eq!(event.value, (i + 1) as i64);
341        }
342    }
343
344    #[test]
345    fn test_stats_tracking() {
346        let parser = SimdJsonParser::new();
347        let mut json = r#"{"id":"test","type":"event","value":0}"#.as_bytes().to_vec();
348
349        let _: TestEvent = parser.parse(&mut json).unwrap();
350
351        let stats = parser.stats();
352        assert!(stats.bytes_parsed.load(Ordering::Relaxed) > 0);
353        assert_eq!(stats.documents_parsed.load(Ordering::Relaxed), 1);
354        assert_eq!(stats.parse_errors.load(Ordering::Relaxed), 0);
355    }
356
357    #[test]
358    fn test_parse_error_tracking() {
359        let parser = SimdJsonParser::new();
360        let mut invalid = b"not valid json".to_vec();
361
362        let result: Result<TestEvent, _> = parser.parse(&mut invalid);
363        assert!(result.is_err());
364        assert_eq!(parser.stats().parse_errors.load(Ordering::Relaxed), 1);
365    }
366
367    #[test]
368    fn test_zero_copy_json() {
369        let mut json = r#"{"name":"test","count":42,"active":true}"#.as_bytes().to_vec();
370
371        let zc = ZeroCopyJson::parse(&mut json).unwrap();
372        assert_eq!(zc.get_str("name"), Some("test"));
373        assert_eq!(zc.get_i64("count"), Some(42));
374        assert_eq!(zc.get_bool("active"), Some(true));
375        assert!(zc.contains_key("name"));
376        assert!(!zc.contains_key("missing"));
377    }
378
379    #[test]
380    fn test_batch_event_parser() {
381        let mut parser = BatchEventParser::new(100);
382        let events = vec![
383            r#"{"id":"a","type":"x","value":10}"#.to_string(),
384            r#"{"id":"b","type":"y","value":20}"#.to_string(),
385        ];
386
387        let results: Vec<Result<TestEvent, _>> = parser.parse_events(&events);
388        assert_eq!(results.len(), 2);
389
390        let first = results[0].as_ref().unwrap();
391        assert_eq!(first.id, "a");
392        assert_eq!(first.value, 10);
393    }
394
395    #[test]
396    fn test_throughput_calculation() {
397        let stats = SimdJsonStats::new();
398        stats.bytes_parsed.store(1_000_000, Ordering::Relaxed);
399        stats.parse_time_ns.store(1_000_000_000, Ordering::Relaxed); // 1 second
400        stats.documents_parsed.store(10_000, Ordering::Relaxed);
401
402        assert!((stats.throughput_mbps() - 1.0).abs() < 0.001);
403        assert!((stats.docs_per_second() - 10_000.0).abs() < 1.0);
404    }
405}