Skip to main content

allsource_core/infrastructure/persistence/
simd_json.rs

1//! SIMD-accelerated JSON parsing for high-throughput event ingestion
2//!
3//! This module provides zero-copy JSON parsing using simd-json, which leverages
4//! SIMD instructions (AVX2, SSE4.2, NEON) for ~2-3x faster parsing compared to
5//! serde_json.
6//!
7//! # Performance Characteristics
8//! - Zero-copy parsing where possible
9//! - SIMD-accelerated structural parsing
10//! - Batch processing support
11//! - Memory-efficient arena allocation
12
13use bumpalo::Bump;
14use serde::de::DeserializeOwned;
15use simd_json::prelude::*;
16use std::{
17    sync::atomic::{AtomicU64, Ordering},
18    time::Instant,
19};
20
21/// Statistics for SIMD JSON parsing performance
22#[derive(Debug, Default)]
23pub struct SimdJsonStats {
24    /// Total bytes parsed
25    pub bytes_parsed: AtomicU64,
26    /// Total documents parsed
27    pub documents_parsed: AtomicU64,
28    /// Total parsing time in nanoseconds
29    pub parse_time_ns: AtomicU64,
30    /// Parse errors encountered
31    pub parse_errors: AtomicU64,
32}
33
34impl SimdJsonStats {
35    /// Create new stats tracker
36    pub fn new() -> Self {
37        Self::default()
38    }
39
40    /// Get throughput in MB/s
41    pub fn throughput_mbps(&self) -> f64 {
42        let bytes = self.bytes_parsed.load(Ordering::Relaxed) as f64;
43        let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
44        if time_ns > 0.0 {
45            (bytes / 1_000_000.0) / (time_ns / 1_000_000_000.0)
46        } else {
47            0.0
48        }
49    }
50
51    /// Get documents per second
52    pub fn docs_per_second(&self) -> f64 {
53        let docs = self.documents_parsed.load(Ordering::Relaxed) as f64;
54        let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
55        if time_ns > 0.0 {
56            docs / (time_ns / 1_000_000_000.0)
57        } else {
58            0.0
59        }
60    }
61
62    /// Record a successful parse
63    fn record_parse(&self, bytes: usize, duration_ns: u64) {
64        self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
65        self.documents_parsed.fetch_add(1, Ordering::Relaxed);
66        self.parse_time_ns.fetch_add(duration_ns, Ordering::Relaxed);
67    }
68
69    /// Record a parse error
70    fn record_error(&self) {
71        self.parse_errors.fetch_add(1, Ordering::Relaxed);
72    }
73
74    /// Reset all statistics
75    pub fn reset(&self) {
76        self.bytes_parsed.store(0, Ordering::Relaxed);
77        self.documents_parsed.store(0, Ordering::Relaxed);
78        self.parse_time_ns.store(0, Ordering::Relaxed);
79        self.parse_errors.store(0, Ordering::Relaxed);
80    }
81}
82
83/// High-performance JSON parser using SIMD instructions
84pub struct SimdJsonParser {
85    stats: SimdJsonStats,
86}
87
88impl Default for SimdJsonParser {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl SimdJsonParser {
95    /// Create a new SIMD JSON parser
96    pub fn new() -> Self {
97        Self {
98            stats: SimdJsonStats::new(),
99        }
100    }
101
102    /// Parse JSON bytes into a typed value using SIMD acceleration
103    ///
104    /// # Arguments
105    /// * `data` - Mutable JSON bytes (simd-json requires mutable access for in-place parsing)
106    ///
107    /// # Returns
108    /// Parsed value or error
109    pub fn parse<T: DeserializeOwned>(&self, data: &mut [u8]) -> Result<T, SimdJsonError> {
110        let start = Instant::now();
111        let len = data.len();
112
113        match simd_json::from_slice(data) {
114            Ok(value) => {
115                let duration = start.elapsed().as_nanos() as u64;
116                self.stats.record_parse(len, duration);
117                Ok(value)
118            }
119            Err(e) => {
120                self.stats.record_error();
121                Err(SimdJsonError::ParseError(e.to_string()))
122            }
123        }
124    }
125
126    /// Parse JSON string into a typed value
127    ///
128    /// Note: This creates a copy of the string for mutation. For zero-copy,
129    /// use `parse` with a mutable byte slice.
130    pub fn parse_str<T: DeserializeOwned>(&self, data: &str) -> Result<T, SimdJsonError> {
131        let mut bytes = data.as_bytes().to_vec();
132        self.parse(&mut bytes)
133    }
134
135    /// Parse multiple JSON documents in a batch
136    ///
137    /// Optimized for processing many small documents efficiently.
138    pub fn parse_batch<T: DeserializeOwned>(
139        &self,
140        documents: &mut [Vec<u8>],
141    ) -> Vec<Result<T, SimdJsonError>> {
142        documents.iter_mut().map(|doc| self.parse(doc)).collect()
143    }
144
145    /// Parse JSON with arena allocation for zero-copy string handling
146    ///
147    /// The arena provides fast bump allocation and all allocations are freed
148    /// together when the arena is dropped.
149    pub fn parse_with_arena<T: DeserializeOwned>(
150        &self,
151        data: &mut [u8],
152        _arena: &Bump,
153    ) -> Result<T, SimdJsonError> {
154        // Currently just uses standard parsing, but arena can be used for
155        // custom string interning or buffer management in the future
156        self.parse(data)
157    }
158
159    /// Get parsing statistics
160    pub fn stats(&self) -> &SimdJsonStats {
161        &self.stats
162    }
163
164    /// Reset statistics
165    pub fn reset_stats(&self) {
166        self.stats.reset();
167    }
168}
169
170/// Errors from SIMD JSON parsing
171#[derive(Debug, thiserror::Error)]
172pub enum SimdJsonError {
173    #[error("JSON parse error: {0}")]
174    ParseError(String),
175
176    #[error("Invalid UTF-8 in JSON: {0}")]
177    Utf8Error(#[from] std::str::Utf8Error),
178
179    #[error("Buffer too small for parsing")]
180    BufferTooSmall,
181}
182
183/// Batch event parser optimized for high-throughput ingestion
184pub struct BatchEventParser {
185    parser: SimdJsonParser,
186    /// Pre-allocated buffer for batch processing
187    buffer_pool: Vec<Vec<u8>>,
188    /// Maximum batch size
189    max_batch_size: usize,
190}
191
192impl BatchEventParser {
193    /// Create a new batch parser with specified capacity
194    pub fn new(max_batch_size: usize) -> Self {
195        Self {
196            parser: SimdJsonParser::new(),
197            buffer_pool: Vec::with_capacity(max_batch_size),
198            max_batch_size,
199        }
200    }
201
202    /// Parse a batch of JSON event strings
203    pub fn parse_events<T: DeserializeOwned>(
204        &mut self,
205        events: &[String],
206    ) -> Vec<Result<T, SimdJsonError>> {
207        // Reuse buffers from pool
208        self.buffer_pool.clear();
209        self.buffer_pool
210            .extend(events.iter().map(|e| e.as_bytes().to_vec()));
211
212        self.parser.parse_batch(&mut self.buffer_pool)
213    }
214
215    /// Parse events from byte slices (more efficient - avoids string conversion)
216    pub fn parse_events_bytes<T: DeserializeOwned>(
217        &self,
218        events: &mut [Vec<u8>],
219    ) -> Vec<Result<T, SimdJsonError>> {
220        self.parser.parse_batch(events)
221    }
222
223    /// Get the underlying parser's statistics
224    pub fn stats(&self) -> &SimdJsonStats {
225        self.parser.stats()
226    }
227
228    /// Maximum batch size
229    pub fn max_batch_size(&self) -> usize {
230        self.max_batch_size
231    }
232}
233
234/// Zero-copy JSON value for read-only access
235///
236/// This wraps simd-json's tape-based representation for efficient
237/// field access without full deserialization.
238pub struct ZeroCopyJson<'a> {
239    tape: simd_json::BorrowedValue<'a>,
240}
241
242impl<'a> ZeroCopyJson<'a> {
243    /// Parse JSON into zero-copy representation
244    pub fn parse(data: &'a mut [u8]) -> Result<Self, SimdJsonError> {
245        let tape = simd_json::to_borrowed_value(data)
246            .map_err(|e| SimdJsonError::ParseError(e.to_string()))?;
247        Ok(Self { tape })
248    }
249
250    /// Get a string field without copying
251    pub fn get_str(&self, key: &str) -> Option<&str> {
252        self.tape.get(key).and_then(|v| v.as_str())
253    }
254
255    /// Get a numeric field
256    pub fn get_i64(&self, key: &str) -> Option<i64> {
257        self.tape.get(key).and_then(|v| v.as_i64())
258    }
259
260    /// Get a float field
261    pub fn get_f64(&self, key: &str) -> Option<f64> {
262        self.tape.get(key).and_then(|v| v.as_f64())
263    }
264
265    /// Get a boolean field
266    pub fn get_bool(&self, key: &str) -> Option<bool> {
267        self.tape.get(key).and_then(|v| v.as_bool())
268    }
269
270    /// Check if a field exists
271    pub fn contains_key(&self, key: &str) -> bool {
272        self.tape.get(key).is_some()
273    }
274
275    /// Get the underlying borrowed value
276    pub fn as_value(&self) -> &simd_json::BorrowedValue<'a> {
277        &self.tape
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use super::*;
284    use serde::Deserialize;
285
286    #[derive(Debug, Deserialize, PartialEq)]
287    struct TestEvent {
288        id: String,
289        #[serde(rename = "type")]
290        event_type: String,
291        value: i64,
292    }
293
294    #[test]
295    fn test_simd_json_parse() {
296        let parser = SimdJsonParser::new();
297        let mut json = r#"{"id":"123","type":"test","value":42}"#.as_bytes().to_vec();
298
299        let result: TestEvent = parser.parse(&mut json).unwrap();
300        assert_eq!(result.id, "123");
301        assert_eq!(result.event_type, "test");
302        assert_eq!(result.value, 42);
303    }
304
305    #[test]
306    fn test_simd_json_parse_str() {
307        let parser = SimdJsonParser::new();
308        let json = r#"{"id":"456","type":"event","value":100}"#;
309
310        let result: TestEvent = parser.parse_str(json).unwrap();
311        assert_eq!(result.id, "456");
312        assert_eq!(result.event_type, "event");
313        assert_eq!(result.value, 100);
314    }
315
316    #[test]
317    fn test_batch_parsing() {
318        let parser = SimdJsonParser::new();
319        let mut docs: Vec<Vec<u8>> = vec![
320            r#"{"id":"1","type":"a","value":1}"#.as_bytes().to_vec(),
321            r#"{"id":"2","type":"b","value":2}"#.as_bytes().to_vec(),
322            r#"{"id":"3","type":"c","value":3}"#.as_bytes().to_vec(),
323        ];
324
325        let results: Vec<Result<TestEvent, _>> = parser.parse_batch(&mut docs);
326        assert_eq!(results.len(), 3);
327
328        for (i, result) in results.into_iter().enumerate() {
329            let event = result.unwrap();
330            assert_eq!(event.id, (i + 1).to_string());
331            assert_eq!(event.value, (i + 1) as i64);
332        }
333    }
334
335    #[test]
336    fn test_stats_tracking() {
337        let parser = SimdJsonParser::new();
338        let mut json = r#"{"id":"test","type":"event","value":0}"#.as_bytes().to_vec();
339
340        let _: TestEvent = parser.parse(&mut json).unwrap();
341
342        let stats = parser.stats();
343        assert!(stats.bytes_parsed.load(Ordering::Relaxed) > 0);
344        assert_eq!(stats.documents_parsed.load(Ordering::Relaxed), 1);
345        assert_eq!(stats.parse_errors.load(Ordering::Relaxed), 0);
346    }
347
348    #[test]
349    fn test_parse_error_tracking() {
350        let parser = SimdJsonParser::new();
351        let mut invalid = b"not valid json".to_vec();
352
353        let result: Result<TestEvent, _> = parser.parse(&mut invalid);
354        assert!(result.is_err());
355        assert_eq!(parser.stats().parse_errors.load(Ordering::Relaxed), 1);
356    }
357
358    #[test]
359    fn test_zero_copy_json() {
360        let mut json = r#"{"name":"test","count":42,"active":true}"#.as_bytes().to_vec();
361
362        let zc = ZeroCopyJson::parse(&mut json).unwrap();
363        assert_eq!(zc.get_str("name"), Some("test"));
364        assert_eq!(zc.get_i64("count"), Some(42));
365        assert_eq!(zc.get_bool("active"), Some(true));
366        assert!(zc.contains_key("name"));
367        assert!(!zc.contains_key("missing"));
368    }
369
370    #[test]
371    fn test_batch_event_parser() {
372        let mut parser = BatchEventParser::new(100);
373        let events = vec![
374            r#"{"id":"a","type":"x","value":10}"#.to_string(),
375            r#"{"id":"b","type":"y","value":20}"#.to_string(),
376        ];
377
378        let results: Vec<Result<TestEvent, _>> = parser.parse_events(&events);
379        assert_eq!(results.len(), 2);
380
381        let first = results[0].as_ref().unwrap();
382        assert_eq!(first.id, "a");
383        assert_eq!(first.value, 10);
384    }
385
386    #[test]
387    fn test_throughput_calculation() {
388        let stats = SimdJsonStats::new();
389        stats.bytes_parsed.store(1_000_000, Ordering::Relaxed);
390        stats.parse_time_ns.store(1_000_000_000, Ordering::Relaxed); // 1 second
391        stats.documents_parsed.store(10_000, Ordering::Relaxed);
392
393        assert!((stats.throughput_mbps() - 1.0).abs() < 0.001);
394        assert!((stats.docs_per_second() - 10_000.0).abs() < 1.0);
395    }
396}