allsource_core/infrastructure/persistence/
simd_json.rs1use bumpalo::Bump;
14use serde::de::DeserializeOwned;
15use simd_json::prelude::*;
16use std::{
17 sync::atomic::{AtomicU64, Ordering},
18 time::Instant,
19};
20
21#[derive(Debug, Default)]
23pub struct SimdJsonStats {
24 pub bytes_parsed: AtomicU64,
26 pub documents_parsed: AtomicU64,
28 pub parse_time_ns: AtomicU64,
30 pub parse_errors: AtomicU64,
32}
33
34impl SimdJsonStats {
35 pub fn new() -> Self {
37 Self::default()
38 }
39
40 pub fn throughput_mbps(&self) -> f64 {
42 let bytes = self.bytes_parsed.load(Ordering::Relaxed) as f64;
43 let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
44 if time_ns > 0.0 {
45 (bytes / 1_000_000.0) / (time_ns / 1_000_000_000.0)
46 } else {
47 0.0
48 }
49 }
50
51 pub fn docs_per_second(&self) -> f64 {
53 let docs = self.documents_parsed.load(Ordering::Relaxed) as f64;
54 let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
55 if time_ns > 0.0 {
56 docs / (time_ns / 1_000_000_000.0)
57 } else {
58 0.0
59 }
60 }
61
62 fn record_parse(&self, bytes: usize, duration_ns: u64) {
64 self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
65 self.documents_parsed.fetch_add(1, Ordering::Relaxed);
66 self.parse_time_ns.fetch_add(duration_ns, Ordering::Relaxed);
67 }
68
69 fn record_error(&self) {
71 self.parse_errors.fetch_add(1, Ordering::Relaxed);
72 }
73
74 pub fn reset(&self) {
76 self.bytes_parsed.store(0, Ordering::Relaxed);
77 self.documents_parsed.store(0, Ordering::Relaxed);
78 self.parse_time_ns.store(0, Ordering::Relaxed);
79 self.parse_errors.store(0, Ordering::Relaxed);
80 }
81}
82
83pub struct SimdJsonParser {
85 stats: SimdJsonStats,
86}
87
88impl Default for SimdJsonParser {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl SimdJsonParser {
95 pub fn new() -> Self {
97 Self {
98 stats: SimdJsonStats::new(),
99 }
100 }
101
102 #[cfg_attr(feature = "hotpath", hotpath::measure)]
110 pub fn parse<T: DeserializeOwned>(&self, data: &mut [u8]) -> Result<T, SimdJsonError> {
111 let start = Instant::now();
112 let len = data.len();
113
114 match simd_json::from_slice(data) {
115 Ok(value) => {
116 let duration = start.elapsed().as_nanos() as u64;
117 self.stats.record_parse(len, duration);
118 Ok(value)
119 }
120 Err(e) => {
121 self.stats.record_error();
122 Err(SimdJsonError::ParseError(e.to_string()))
123 }
124 }
125 }
126
127 pub fn parse_str<T: DeserializeOwned>(&self, data: &str) -> Result<T, SimdJsonError> {
132 let mut bytes = data.as_bytes().to_vec();
133 self.parse(&mut bytes)
134 }
135
136 #[cfg_attr(feature = "hotpath", hotpath::measure)]
140 pub fn parse_batch<T: DeserializeOwned>(
141 &self,
142 documents: &mut [Vec<u8>],
143 ) -> Vec<Result<T, SimdJsonError>> {
144 documents.iter_mut().map(|doc| self.parse(doc)).collect()
145 }
146
147 pub fn parse_with_arena<T: DeserializeOwned>(
152 &self,
153 data: &mut [u8],
154 _arena: &Bump,
155 ) -> Result<T, SimdJsonError> {
156 self.parse(data)
159 }
160
161 pub fn stats(&self) -> &SimdJsonStats {
163 &self.stats
164 }
165
166 pub fn reset_stats(&self) {
168 self.stats.reset();
169 }
170}
171
172#[derive(Debug, thiserror::Error)]
174pub enum SimdJsonError {
175 #[error("JSON parse error: {0}")]
176 ParseError(String),
177
178 #[error("Invalid UTF-8 in JSON: {0}")]
179 Utf8Error(#[from] std::str::Utf8Error),
180
181 #[error("Buffer too small for parsing")]
182 BufferTooSmall,
183}
184
185pub struct BatchEventParser {
187 parser: SimdJsonParser,
188 buffer_pool: Vec<Vec<u8>>,
190 max_batch_size: usize,
192}
193
194impl BatchEventParser {
195 pub fn new(max_batch_size: usize) -> Self {
197 Self {
198 parser: SimdJsonParser::new(),
199 buffer_pool: Vec::with_capacity(max_batch_size),
200 max_batch_size,
201 }
202 }
203
204 pub fn parse_events<T: DeserializeOwned>(
206 &mut self,
207 events: &[String],
208 ) -> Vec<Result<T, SimdJsonError>> {
209 self.buffer_pool.clear();
211 self.buffer_pool
212 .extend(events.iter().map(|e| e.as_bytes().to_vec()));
213
214 self.parser.parse_batch(&mut self.buffer_pool)
215 }
216
217 #[cfg_attr(feature = "hotpath", hotpath::measure)]
219 pub fn parse_events_bytes<T: DeserializeOwned>(
220 &self,
221 events: &mut [Vec<u8>],
222 ) -> Vec<Result<T, SimdJsonError>> {
223 self.parser.parse_batch(events)
224 }
225
226 pub fn stats(&self) -> &SimdJsonStats {
228 self.parser.stats()
229 }
230
231 pub fn max_batch_size(&self) -> usize {
233 self.max_batch_size
234 }
235}
236
237pub struct ZeroCopyJson<'a> {
242 tape: simd_json::BorrowedValue<'a>,
243}
244
245impl<'a> ZeroCopyJson<'a> {
246 pub fn parse(data: &'a mut [u8]) -> Result<Self, SimdJsonError> {
248 let tape = simd_json::to_borrowed_value(data)
249 .map_err(|e| SimdJsonError::ParseError(e.to_string()))?;
250 Ok(Self { tape })
251 }
252
253 pub fn get_str(&self, key: &str) -> Option<&str> {
255 self.tape.get(key).and_then(|v| v.as_str())
256 }
257
258 pub fn get_i64(&self, key: &str) -> Option<i64> {
260 self.tape
261 .get(key)
262 .and_then(simd_json::prelude::ValueAsScalar::as_i64)
263 }
264
265 pub fn get_f64(&self, key: &str) -> Option<f64> {
267 self.tape
268 .get(key)
269 .and_then(simd_json::prelude::ValueAsScalar::as_f64)
270 }
271
272 pub fn get_bool(&self, key: &str) -> Option<bool> {
274 self.tape
275 .get(key)
276 .and_then(simd_json::prelude::ValueAsScalar::as_bool)
277 }
278
279 pub fn contains_key(&self, key: &str) -> bool {
281 self.tape.get(key).is_some()
282 }
283
284 pub fn as_value(&self) -> &simd_json::BorrowedValue<'a> {
286 &self.tape
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293 use serde::Deserialize;
294
295 #[derive(Debug, Deserialize, PartialEq)]
296 struct TestEvent {
297 id: String,
298 #[serde(rename = "type")]
299 event_type: String,
300 value: i64,
301 }
302
303 #[test]
304 fn test_simd_json_parse() {
305 let parser = SimdJsonParser::new();
306 let mut json = r#"{"id":"123","type":"test","value":42}"#.as_bytes().to_vec();
307
308 let result: TestEvent = parser.parse(&mut json).unwrap();
309 assert_eq!(result.id, "123");
310 assert_eq!(result.event_type, "test");
311 assert_eq!(result.value, 42);
312 }
313
314 #[test]
315 fn test_simd_json_parse_str() {
316 let parser = SimdJsonParser::new();
317 let json = r#"{"id":"456","type":"event","value":100}"#;
318
319 let result: TestEvent = parser.parse_str(json).unwrap();
320 assert_eq!(result.id, "456");
321 assert_eq!(result.event_type, "event");
322 assert_eq!(result.value, 100);
323 }
324
325 #[test]
326 fn test_batch_parsing() {
327 let parser = SimdJsonParser::new();
328 let mut docs: Vec<Vec<u8>> = vec![
329 r#"{"id":"1","type":"a","value":1}"#.as_bytes().to_vec(),
330 r#"{"id":"2","type":"b","value":2}"#.as_bytes().to_vec(),
331 r#"{"id":"3","type":"c","value":3}"#.as_bytes().to_vec(),
332 ];
333
334 let results: Vec<Result<TestEvent, _>> = parser.parse_batch(&mut docs);
335 assert_eq!(results.len(), 3);
336
337 for (i, result) in results.into_iter().enumerate() {
338 let event = result.unwrap();
339 assert_eq!(event.id, (i + 1).to_string());
340 assert_eq!(event.value, (i + 1) as i64);
341 }
342 }
343
344 #[test]
345 fn test_stats_tracking() {
346 let parser = SimdJsonParser::new();
347 let mut json = r#"{"id":"test","type":"event","value":0}"#.as_bytes().to_vec();
348
349 let _: TestEvent = parser.parse(&mut json).unwrap();
350
351 let stats = parser.stats();
352 assert!(stats.bytes_parsed.load(Ordering::Relaxed) > 0);
353 assert_eq!(stats.documents_parsed.load(Ordering::Relaxed), 1);
354 assert_eq!(stats.parse_errors.load(Ordering::Relaxed), 0);
355 }
356
357 #[test]
358 fn test_parse_error_tracking() {
359 let parser = SimdJsonParser::new();
360 let mut invalid = b"not valid json".to_vec();
361
362 let result: Result<TestEvent, _> = parser.parse(&mut invalid);
363 assert!(result.is_err());
364 assert_eq!(parser.stats().parse_errors.load(Ordering::Relaxed), 1);
365 }
366
367 #[test]
368 fn test_zero_copy_json() {
369 let mut json = r#"{"name":"test","count":42,"active":true}"#.as_bytes().to_vec();
370
371 let zc = ZeroCopyJson::parse(&mut json).unwrap();
372 assert_eq!(zc.get_str("name"), Some("test"));
373 assert_eq!(zc.get_i64("count"), Some(42));
374 assert_eq!(zc.get_bool("active"), Some(true));
375 assert!(zc.contains_key("name"));
376 assert!(!zc.contains_key("missing"));
377 }
378
379 #[test]
380 fn test_batch_event_parser() {
381 let mut parser = BatchEventParser::new(100);
382 let events = vec![
383 r#"{"id":"a","type":"x","value":10}"#.to_string(),
384 r#"{"id":"b","type":"y","value":20}"#.to_string(),
385 ];
386
387 let results: Vec<Result<TestEvent, _>> = parser.parse_events(&events);
388 assert_eq!(results.len(), 2);
389
390 let first = results[0].as_ref().unwrap();
391 assert_eq!(first.id, "a");
392 assert_eq!(first.value, 10);
393 }
394
395 #[test]
396 fn test_throughput_calculation() {
397 let stats = SimdJsonStats::new();
398 stats.bytes_parsed.store(1_000_000, Ordering::Relaxed);
399 stats.parse_time_ns.store(1_000_000_000, Ordering::Relaxed); stats.documents_parsed.store(10_000, Ordering::Relaxed);
401
402 assert!((stats.throughput_mbps() - 1.0).abs() < 0.001);
403 assert!((stats.docs_per_second() - 10_000.0).abs() < 1.0);
404 }
405}