allsource_core/infrastructure/persistence/
simd_json.rs1use bumpalo::Bump;
14use serde::de::DeserializeOwned;
15use simd_json::prelude::*;
16use std::{
17 sync::atomic::{AtomicU64, Ordering},
18 time::Instant,
19};
20
21#[derive(Debug, Default)]
23pub struct SimdJsonStats {
24 pub bytes_parsed: AtomicU64,
26 pub documents_parsed: AtomicU64,
28 pub parse_time_ns: AtomicU64,
30 pub parse_errors: AtomicU64,
32}
33
34impl SimdJsonStats {
35 pub fn new() -> Self {
37 Self::default()
38 }
39
40 pub fn throughput_mbps(&self) -> f64 {
42 let bytes = self.bytes_parsed.load(Ordering::Relaxed) as f64;
43 let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
44 if time_ns > 0.0 {
45 (bytes / 1_000_000.0) / (time_ns / 1_000_000_000.0)
46 } else {
47 0.0
48 }
49 }
50
51 pub fn docs_per_second(&self) -> f64 {
53 let docs = self.documents_parsed.load(Ordering::Relaxed) as f64;
54 let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
55 if time_ns > 0.0 {
56 docs / (time_ns / 1_000_000_000.0)
57 } else {
58 0.0
59 }
60 }
61
62 fn record_parse(&self, bytes: usize, duration_ns: u64) {
64 self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
65 self.documents_parsed.fetch_add(1, Ordering::Relaxed);
66 self.parse_time_ns.fetch_add(duration_ns, Ordering::Relaxed);
67 }
68
69 fn record_error(&self) {
71 self.parse_errors.fetch_add(1, Ordering::Relaxed);
72 }
73
74 pub fn reset(&self) {
76 self.bytes_parsed.store(0, Ordering::Relaxed);
77 self.documents_parsed.store(0, Ordering::Relaxed);
78 self.parse_time_ns.store(0, Ordering::Relaxed);
79 self.parse_errors.store(0, Ordering::Relaxed);
80 }
81}
82
83pub struct SimdJsonParser {
85 stats: SimdJsonStats,
86}
87
88impl Default for SimdJsonParser {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl SimdJsonParser {
95 pub fn new() -> Self {
97 Self {
98 stats: SimdJsonStats::new(),
99 }
100 }
101
102 pub fn parse<T: DeserializeOwned>(&self, data: &mut [u8]) -> Result<T, SimdJsonError> {
110 let start = Instant::now();
111 let len = data.len();
112
113 match simd_json::from_slice(data) {
114 Ok(value) => {
115 let duration = start.elapsed().as_nanos() as u64;
116 self.stats.record_parse(len, duration);
117 Ok(value)
118 }
119 Err(e) => {
120 self.stats.record_error();
121 Err(SimdJsonError::ParseError(e.to_string()))
122 }
123 }
124 }
125
126 pub fn parse_str<T: DeserializeOwned>(&self, data: &str) -> Result<T, SimdJsonError> {
131 let mut bytes = data.as_bytes().to_vec();
132 self.parse(&mut bytes)
133 }
134
135 pub fn parse_batch<T: DeserializeOwned>(
139 &self,
140 documents: &mut [Vec<u8>],
141 ) -> Vec<Result<T, SimdJsonError>> {
142 documents.iter_mut().map(|doc| self.parse(doc)).collect()
143 }
144
145 pub fn parse_with_arena<T: DeserializeOwned>(
150 &self,
151 data: &mut [u8],
152 _arena: &Bump,
153 ) -> Result<T, SimdJsonError> {
154 self.parse(data)
157 }
158
159 pub fn stats(&self) -> &SimdJsonStats {
161 &self.stats
162 }
163
164 pub fn reset_stats(&self) {
166 self.stats.reset();
167 }
168}
169
170#[derive(Debug, thiserror::Error)]
172pub enum SimdJsonError {
173 #[error("JSON parse error: {0}")]
174 ParseError(String),
175
176 #[error("Invalid UTF-8 in JSON: {0}")]
177 Utf8Error(#[from] std::str::Utf8Error),
178
179 #[error("Buffer too small for parsing")]
180 BufferTooSmall,
181}
182
183pub struct BatchEventParser {
185 parser: SimdJsonParser,
186 buffer_pool: Vec<Vec<u8>>,
188 max_batch_size: usize,
190}
191
192impl BatchEventParser {
193 pub fn new(max_batch_size: usize) -> Self {
195 Self {
196 parser: SimdJsonParser::new(),
197 buffer_pool: Vec::with_capacity(max_batch_size),
198 max_batch_size,
199 }
200 }
201
202 pub fn parse_events<T: DeserializeOwned>(
204 &mut self,
205 events: &[String],
206 ) -> Vec<Result<T, SimdJsonError>> {
207 self.buffer_pool.clear();
209 self.buffer_pool
210 .extend(events.iter().map(|e| e.as_bytes().to_vec()));
211
212 self.parser.parse_batch(&mut self.buffer_pool)
213 }
214
215 pub fn parse_events_bytes<T: DeserializeOwned>(
217 &self,
218 events: &mut [Vec<u8>],
219 ) -> Vec<Result<T, SimdJsonError>> {
220 self.parser.parse_batch(events)
221 }
222
223 pub fn stats(&self) -> &SimdJsonStats {
225 self.parser.stats()
226 }
227
228 pub fn max_batch_size(&self) -> usize {
230 self.max_batch_size
231 }
232}
233
234pub struct ZeroCopyJson<'a> {
239 tape: simd_json::BorrowedValue<'a>,
240}
241
242impl<'a> ZeroCopyJson<'a> {
243 pub fn parse(data: &'a mut [u8]) -> Result<Self, SimdJsonError> {
245 let tape = simd_json::to_borrowed_value(data)
246 .map_err(|e| SimdJsonError::ParseError(e.to_string()))?;
247 Ok(Self { tape })
248 }
249
250 pub fn get_str(&self, key: &str) -> Option<&str> {
252 self.tape.get(key).and_then(|v| v.as_str())
253 }
254
255 pub fn get_i64(&self, key: &str) -> Option<i64> {
257 self.tape.get(key).and_then(|v| v.as_i64())
258 }
259
260 pub fn get_f64(&self, key: &str) -> Option<f64> {
262 self.tape.get(key).and_then(|v| v.as_f64())
263 }
264
265 pub fn get_bool(&self, key: &str) -> Option<bool> {
267 self.tape.get(key).and_then(|v| v.as_bool())
268 }
269
270 pub fn contains_key(&self, key: &str) -> bool {
272 self.tape.get(key).is_some()
273 }
274
275 pub fn as_value(&self) -> &simd_json::BorrowedValue<'a> {
277 &self.tape
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use super::*;
284 use serde::Deserialize;
285
286 #[derive(Debug, Deserialize, PartialEq)]
287 struct TestEvent {
288 id: String,
289 #[serde(rename = "type")]
290 event_type: String,
291 value: i64,
292 }
293
294 #[test]
295 fn test_simd_json_parse() {
296 let parser = SimdJsonParser::new();
297 let mut json = r#"{"id":"123","type":"test","value":42}"#.as_bytes().to_vec();
298
299 let result: TestEvent = parser.parse(&mut json).unwrap();
300 assert_eq!(result.id, "123");
301 assert_eq!(result.event_type, "test");
302 assert_eq!(result.value, 42);
303 }
304
305 #[test]
306 fn test_simd_json_parse_str() {
307 let parser = SimdJsonParser::new();
308 let json = r#"{"id":"456","type":"event","value":100}"#;
309
310 let result: TestEvent = parser.parse_str(json).unwrap();
311 assert_eq!(result.id, "456");
312 assert_eq!(result.event_type, "event");
313 assert_eq!(result.value, 100);
314 }
315
316 #[test]
317 fn test_batch_parsing() {
318 let parser = SimdJsonParser::new();
319 let mut docs: Vec<Vec<u8>> = vec![
320 r#"{"id":"1","type":"a","value":1}"#.as_bytes().to_vec(),
321 r#"{"id":"2","type":"b","value":2}"#.as_bytes().to_vec(),
322 r#"{"id":"3","type":"c","value":3}"#.as_bytes().to_vec(),
323 ];
324
325 let results: Vec<Result<TestEvent, _>> = parser.parse_batch(&mut docs);
326 assert_eq!(results.len(), 3);
327
328 for (i, result) in results.into_iter().enumerate() {
329 let event = result.unwrap();
330 assert_eq!(event.id, (i + 1).to_string());
331 assert_eq!(event.value, (i + 1) as i64);
332 }
333 }
334
335 #[test]
336 fn test_stats_tracking() {
337 let parser = SimdJsonParser::new();
338 let mut json = r#"{"id":"test","type":"event","value":0}"#.as_bytes().to_vec();
339
340 let _: TestEvent = parser.parse(&mut json).unwrap();
341
342 let stats = parser.stats();
343 assert!(stats.bytes_parsed.load(Ordering::Relaxed) > 0);
344 assert_eq!(stats.documents_parsed.load(Ordering::Relaxed), 1);
345 assert_eq!(stats.parse_errors.load(Ordering::Relaxed), 0);
346 }
347
348 #[test]
349 fn test_parse_error_tracking() {
350 let parser = SimdJsonParser::new();
351 let mut invalid = b"not valid json".to_vec();
352
353 let result: Result<TestEvent, _> = parser.parse(&mut invalid);
354 assert!(result.is_err());
355 assert_eq!(parser.stats().parse_errors.load(Ordering::Relaxed), 1);
356 }
357
358 #[test]
359 fn test_zero_copy_json() {
360 let mut json = r#"{"name":"test","count":42,"active":true}"#.as_bytes().to_vec();
361
362 let zc = ZeroCopyJson::parse(&mut json).unwrap();
363 assert_eq!(zc.get_str("name"), Some("test"));
364 assert_eq!(zc.get_i64("count"), Some(42));
365 assert_eq!(zc.get_bool("active"), Some(true));
366 assert!(zc.contains_key("name"));
367 assert!(!zc.contains_key("missing"));
368 }
369
370 #[test]
371 fn test_batch_event_parser() {
372 let mut parser = BatchEventParser::new(100);
373 let events = vec![
374 r#"{"id":"a","type":"x","value":10}"#.to_string(),
375 r#"{"id":"b","type":"y","value":20}"#.to_string(),
376 ];
377
378 let results: Vec<Result<TestEvent, _>> = parser.parse_events(&events);
379 assert_eq!(results.len(), 2);
380
381 let first = results[0].as_ref().unwrap();
382 assert_eq!(first.id, "a");
383 assert_eq!(first.value, 10);
384 }
385
386 #[test]
387 fn test_throughput_calculation() {
388 let stats = SimdJsonStats::new();
389 stats.bytes_parsed.store(1_000_000, Ordering::Relaxed);
390 stats.parse_time_ns.store(1_000_000_000, Ordering::Relaxed); stats.documents_parsed.store(10_000, Ordering::Relaxed);
392
393 assert!((stats.throughput_mbps() - 1.0).abs() < 0.001);
394 assert!((stats.docs_per_second() - 10_000.0).abs() < 1.0);
395 }
396}