allsource_core/infrastructure/persistence/
simd_json.rs

1//! SIMD-accelerated JSON parsing for high-throughput event ingestion
2//!
3//! This module provides zero-copy JSON parsing using simd-json, which leverages
4//! SIMD instructions (AVX2, SSE4.2, NEON) for ~2-3x faster parsing compared to
5//! serde_json.
6//!
7//! # Performance Characteristics
8//! - Zero-copy parsing where possible
9//! - SIMD-accelerated structural parsing
10//! - Batch processing support
11//! - Memory-efficient arena allocation
12
13use bumpalo::Bump;
14use serde::de::DeserializeOwned;
15use simd_json::prelude::*;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Instant;
18
19/// Statistics for SIMD JSON parsing performance
20#[derive(Debug, Default)]
21pub struct SimdJsonStats {
22    /// Total bytes parsed
23    pub bytes_parsed: AtomicU64,
24    /// Total documents parsed
25    pub documents_parsed: AtomicU64,
26    /// Total parsing time in nanoseconds
27    pub parse_time_ns: AtomicU64,
28    /// Parse errors encountered
29    pub parse_errors: AtomicU64,
30}
31
32impl SimdJsonStats {
33    /// Create new stats tracker
34    pub fn new() -> Self {
35        Self::default()
36    }
37
38    /// Get throughput in MB/s
39    pub fn throughput_mbps(&self) -> f64 {
40        let bytes = self.bytes_parsed.load(Ordering::Relaxed) as f64;
41        let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
42        if time_ns > 0.0 {
43            (bytes / 1_000_000.0) / (time_ns / 1_000_000_000.0)
44        } else {
45            0.0
46        }
47    }
48
49    /// Get documents per second
50    pub fn docs_per_second(&self) -> f64 {
51        let docs = self.documents_parsed.load(Ordering::Relaxed) as f64;
52        let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
53        if time_ns > 0.0 {
54            docs / (time_ns / 1_000_000_000.0)
55        } else {
56            0.0
57        }
58    }
59
60    /// Record a successful parse
61    fn record_parse(&self, bytes: usize, duration_ns: u64) {
62        self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
63        self.documents_parsed.fetch_add(1, Ordering::Relaxed);
64        self.parse_time_ns.fetch_add(duration_ns, Ordering::Relaxed);
65    }
66
67    /// Record a parse error
68    fn record_error(&self) {
69        self.parse_errors.fetch_add(1, Ordering::Relaxed);
70    }
71
72    /// Reset all statistics
73    pub fn reset(&self) {
74        self.bytes_parsed.store(0, Ordering::Relaxed);
75        self.documents_parsed.store(0, Ordering::Relaxed);
76        self.parse_time_ns.store(0, Ordering::Relaxed);
77        self.parse_errors.store(0, Ordering::Relaxed);
78    }
79}
80
81/// High-performance JSON parser using SIMD instructions
82pub struct SimdJsonParser {
83    stats: SimdJsonStats,
84}
85
86impl Default for SimdJsonParser {
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92impl SimdJsonParser {
93    /// Create a new SIMD JSON parser
94    pub fn new() -> Self {
95        Self {
96            stats: SimdJsonStats::new(),
97        }
98    }
99
100    /// Parse JSON bytes into a typed value using SIMD acceleration
101    ///
102    /// # Arguments
103    /// * `data` - Mutable JSON bytes (simd-json requires mutable access for in-place parsing)
104    ///
105    /// # Returns
106    /// Parsed value or error
107    pub fn parse<T: DeserializeOwned>(&self, data: &mut [u8]) -> Result<T, SimdJsonError> {
108        let start = Instant::now();
109        let len = data.len();
110
111        match simd_json::from_slice(data) {
112            Ok(value) => {
113                let duration = start.elapsed().as_nanos() as u64;
114                self.stats.record_parse(len, duration);
115                Ok(value)
116            }
117            Err(e) => {
118                self.stats.record_error();
119                Err(SimdJsonError::ParseError(e.to_string()))
120            }
121        }
122    }
123
124    /// Parse JSON string into a typed value
125    ///
126    /// Note: This creates a copy of the string for mutation. For zero-copy,
127    /// use `parse` with a mutable byte slice.
128    pub fn parse_str<T: DeserializeOwned>(&self, data: &str) -> Result<T, SimdJsonError> {
129        let mut bytes = data.as_bytes().to_vec();
130        self.parse(&mut bytes)
131    }
132
133    /// Parse multiple JSON documents in a batch
134    ///
135    /// Optimized for processing many small documents efficiently.
136    pub fn parse_batch<T: DeserializeOwned>(
137        &self,
138        documents: &mut [Vec<u8>],
139    ) -> Vec<Result<T, SimdJsonError>> {
140        documents.iter_mut().map(|doc| self.parse(doc)).collect()
141    }
142
143    /// Parse JSON with arena allocation for zero-copy string handling
144    ///
145    /// The arena provides fast bump allocation and all allocations are freed
146    /// together when the arena is dropped.
147    pub fn parse_with_arena<'a, T: DeserializeOwned>(
148        &self,
149        data: &mut [u8],
150        _arena: &'a Bump,
151    ) -> Result<T, SimdJsonError> {
152        // Currently just uses standard parsing, but arena can be used for
153        // custom string interning or buffer management in the future
154        self.parse(data)
155    }
156
157    /// Get parsing statistics
158    pub fn stats(&self) -> &SimdJsonStats {
159        &self.stats
160    }
161
162    /// Reset statistics
163    pub fn reset_stats(&self) {
164        self.stats.reset();
165    }
166}
167
168/// Errors from SIMD JSON parsing
169#[derive(Debug, thiserror::Error)]
170pub enum SimdJsonError {
171    #[error("JSON parse error: {0}")]
172    ParseError(String),
173
174    #[error("Invalid UTF-8 in JSON: {0}")]
175    Utf8Error(#[from] std::str::Utf8Error),
176
177    #[error("Buffer too small for parsing")]
178    BufferTooSmall,
179}
180
181/// Batch event parser optimized for high-throughput ingestion
182pub struct BatchEventParser {
183    parser: SimdJsonParser,
184    /// Pre-allocated buffer for batch processing
185    buffer_pool: Vec<Vec<u8>>,
186    /// Maximum batch size
187    max_batch_size: usize,
188}
189
190impl BatchEventParser {
191    /// Create a new batch parser with specified capacity
192    pub fn new(max_batch_size: usize) -> Self {
193        Self {
194            parser: SimdJsonParser::new(),
195            buffer_pool: Vec::with_capacity(max_batch_size),
196            max_batch_size,
197        }
198    }
199
200    /// Parse a batch of JSON event strings
201    pub fn parse_events<T: DeserializeOwned>(
202        &mut self,
203        events: &[String],
204    ) -> Vec<Result<T, SimdJsonError>> {
205        // Reuse buffers from pool
206        self.buffer_pool.clear();
207        self.buffer_pool
208            .extend(events.iter().map(|e| e.as_bytes().to_vec()));
209
210        self.parser.parse_batch(&mut self.buffer_pool)
211    }
212
213    /// Parse events from byte slices (more efficient - avoids string conversion)
214    pub fn parse_events_bytes<T: DeserializeOwned>(
215        &self,
216        events: &mut [Vec<u8>],
217    ) -> Vec<Result<T, SimdJsonError>> {
218        self.parser.parse_batch(events)
219    }
220
221    /// Get the underlying parser's statistics
222    pub fn stats(&self) -> &SimdJsonStats {
223        self.parser.stats()
224    }
225
226    /// Maximum batch size
227    pub fn max_batch_size(&self) -> usize {
228        self.max_batch_size
229    }
230}
231
232/// Zero-copy JSON value for read-only access
233///
234/// This wraps simd-json's tape-based representation for efficient
235/// field access without full deserialization.
236pub struct ZeroCopyJson<'a> {
237    tape: simd_json::BorrowedValue<'a>,
238}
239
240impl<'a> ZeroCopyJson<'a> {
241    /// Parse JSON into zero-copy representation
242    pub fn parse(data: &'a mut [u8]) -> Result<Self, SimdJsonError> {
243        let tape = simd_json::to_borrowed_value(data)
244            .map_err(|e| SimdJsonError::ParseError(e.to_string()))?;
245        Ok(Self { tape })
246    }
247
248    /// Get a string field without copying
249    pub fn get_str(&self, key: &str) -> Option<&str> {
250        self.tape.get(key).and_then(|v| v.as_str())
251    }
252
253    /// Get a numeric field
254    pub fn get_i64(&self, key: &str) -> Option<i64> {
255        self.tape.get(key).and_then(|v| v.as_i64())
256    }
257
258    /// Get a float field
259    pub fn get_f64(&self, key: &str) -> Option<f64> {
260        self.tape.get(key).and_then(|v| v.as_f64())
261    }
262
263    /// Get a boolean field
264    pub fn get_bool(&self, key: &str) -> Option<bool> {
265        self.tape.get(key).and_then(|v| v.as_bool())
266    }
267
268    /// Check if a field exists
269    pub fn contains_key(&self, key: &str) -> bool {
270        self.tape.get(key).is_some()
271    }
272
273    /// Get the underlying borrowed value
274    pub fn as_value(&self) -> &simd_json::BorrowedValue<'a> {
275        &self.tape
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use serde::Deserialize;
283
284    #[derive(Debug, Deserialize, PartialEq)]
285    struct TestEvent {
286        id: String,
287        #[serde(rename = "type")]
288        event_type: String,
289        value: i64,
290    }
291
292    #[test]
293    fn test_simd_json_parse() {
294        let parser = SimdJsonParser::new();
295        let mut json = r#"{"id":"123","type":"test","value":42}"#.as_bytes().to_vec();
296
297        let result: TestEvent = parser.parse(&mut json).unwrap();
298        assert_eq!(result.id, "123");
299        assert_eq!(result.event_type, "test");
300        assert_eq!(result.value, 42);
301    }
302
303    #[test]
304    fn test_simd_json_parse_str() {
305        let parser = SimdJsonParser::new();
306        let json = r#"{"id":"456","type":"event","value":100}"#;
307
308        let result: TestEvent = parser.parse_str(json).unwrap();
309        assert_eq!(result.id, "456");
310        assert_eq!(result.event_type, "event");
311        assert_eq!(result.value, 100);
312    }
313
314    #[test]
315    fn test_batch_parsing() {
316        let parser = SimdJsonParser::new();
317        let mut docs: Vec<Vec<u8>> = vec![
318            r#"{"id":"1","type":"a","value":1}"#.as_bytes().to_vec(),
319            r#"{"id":"2","type":"b","value":2}"#.as_bytes().to_vec(),
320            r#"{"id":"3","type":"c","value":3}"#.as_bytes().to_vec(),
321        ];
322
323        let results: Vec<Result<TestEvent, _>> = parser.parse_batch(&mut docs);
324        assert_eq!(results.len(), 3);
325
326        for (i, result) in results.into_iter().enumerate() {
327            let event = result.unwrap();
328            assert_eq!(event.id, (i + 1).to_string());
329            assert_eq!(event.value, (i + 1) as i64);
330        }
331    }
332
333    #[test]
334    fn test_stats_tracking() {
335        let parser = SimdJsonParser::new();
336        let mut json = r#"{"id":"test","type":"event","value":0}"#.as_bytes().to_vec();
337
338        let _: TestEvent = parser.parse(&mut json).unwrap();
339
340        let stats = parser.stats();
341        assert!(stats.bytes_parsed.load(Ordering::Relaxed) > 0);
342        assert_eq!(stats.documents_parsed.load(Ordering::Relaxed), 1);
343        assert_eq!(stats.parse_errors.load(Ordering::Relaxed), 0);
344    }
345
346    #[test]
347    fn test_parse_error_tracking() {
348        let parser = SimdJsonParser::new();
349        let mut invalid = b"not valid json".to_vec();
350
351        let result: Result<TestEvent, _> = parser.parse(&mut invalid);
352        assert!(result.is_err());
353        assert_eq!(parser.stats().parse_errors.load(Ordering::Relaxed), 1);
354    }
355
356    #[test]
357    fn test_zero_copy_json() {
358        let mut json = r#"{"name":"test","count":42,"active":true}"#.as_bytes().to_vec();
359
360        let zc = ZeroCopyJson::parse(&mut json).unwrap();
361        assert_eq!(zc.get_str("name"), Some("test"));
362        assert_eq!(zc.get_i64("count"), Some(42));
363        assert_eq!(zc.get_bool("active"), Some(true));
364        assert!(zc.contains_key("name"));
365        assert!(!zc.contains_key("missing"));
366    }
367
368    #[test]
369    fn test_batch_event_parser() {
370        let mut parser = BatchEventParser::new(100);
371        let events = vec![
372            r#"{"id":"a","type":"x","value":10}"#.to_string(),
373            r#"{"id":"b","type":"y","value":20}"#.to_string(),
374        ];
375
376        let results: Vec<Result<TestEvent, _>> = parser.parse_events(&events);
377        assert_eq!(results.len(), 2);
378
379        let first = results[0].as_ref().unwrap();
380        assert_eq!(first.id, "a");
381        assert_eq!(first.value, 10);
382    }
383
384    #[test]
385    fn test_throughput_calculation() {
386        let stats = SimdJsonStats::new();
387        stats.bytes_parsed.store(1_000_000, Ordering::Relaxed);
388        stats.parse_time_ns.store(1_000_000_000, Ordering::Relaxed); // 1 second
389        stats.documents_parsed.store(10_000, Ordering::Relaxed);
390
391        assert!((stats.throughput_mbps() - 1.0).abs() < 0.001);
392        assert!((stats.docs_per_second() - 10_000.0).abs() < 1.0);
393    }
394}