allsource_core/infrastructure/persistence/
simd_json.rs1use bumpalo::Bump;
14use serde::de::DeserializeOwned;
15use simd_json::prelude::*;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Instant;
18
19#[derive(Debug, Default)]
21pub struct SimdJsonStats {
22 pub bytes_parsed: AtomicU64,
24 pub documents_parsed: AtomicU64,
26 pub parse_time_ns: AtomicU64,
28 pub parse_errors: AtomicU64,
30}
31
32impl SimdJsonStats {
33 pub fn new() -> Self {
35 Self::default()
36 }
37
38 pub fn throughput_mbps(&self) -> f64 {
40 let bytes = self.bytes_parsed.load(Ordering::Relaxed) as f64;
41 let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
42 if time_ns > 0.0 {
43 (bytes / 1_000_000.0) / (time_ns / 1_000_000_000.0)
44 } else {
45 0.0
46 }
47 }
48
49 pub fn docs_per_second(&self) -> f64 {
51 let docs = self.documents_parsed.load(Ordering::Relaxed) as f64;
52 let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
53 if time_ns > 0.0 {
54 docs / (time_ns / 1_000_000_000.0)
55 } else {
56 0.0
57 }
58 }
59
60 fn record_parse(&self, bytes: usize, duration_ns: u64) {
62 self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
63 self.documents_parsed.fetch_add(1, Ordering::Relaxed);
64 self.parse_time_ns.fetch_add(duration_ns, Ordering::Relaxed);
65 }
66
67 fn record_error(&self) {
69 self.parse_errors.fetch_add(1, Ordering::Relaxed);
70 }
71
72 pub fn reset(&self) {
74 self.bytes_parsed.store(0, Ordering::Relaxed);
75 self.documents_parsed.store(0, Ordering::Relaxed);
76 self.parse_time_ns.store(0, Ordering::Relaxed);
77 self.parse_errors.store(0, Ordering::Relaxed);
78 }
79}
80
81pub struct SimdJsonParser {
83 stats: SimdJsonStats,
84}
85
86impl Default for SimdJsonParser {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl SimdJsonParser {
93 pub fn new() -> Self {
95 Self {
96 stats: SimdJsonStats::new(),
97 }
98 }
99
100 pub fn parse<T: DeserializeOwned>(&self, data: &mut [u8]) -> Result<T, SimdJsonError> {
108 let start = Instant::now();
109 let len = data.len();
110
111 match simd_json::from_slice(data) {
112 Ok(value) => {
113 let duration = start.elapsed().as_nanos() as u64;
114 self.stats.record_parse(len, duration);
115 Ok(value)
116 }
117 Err(e) => {
118 self.stats.record_error();
119 Err(SimdJsonError::ParseError(e.to_string()))
120 }
121 }
122 }
123
124 pub fn parse_str<T: DeserializeOwned>(&self, data: &str) -> Result<T, SimdJsonError> {
129 let mut bytes = data.as_bytes().to_vec();
130 self.parse(&mut bytes)
131 }
132
133 pub fn parse_batch<T: DeserializeOwned>(
137 &self,
138 documents: &mut [Vec<u8>],
139 ) -> Vec<Result<T, SimdJsonError>> {
140 documents.iter_mut().map(|doc| self.parse(doc)).collect()
141 }
142
143 pub fn parse_with_arena<'a, T: DeserializeOwned>(
148 &self,
149 data: &mut [u8],
150 _arena: &'a Bump,
151 ) -> Result<T, SimdJsonError> {
152 self.parse(data)
155 }
156
157 pub fn stats(&self) -> &SimdJsonStats {
159 &self.stats
160 }
161
162 pub fn reset_stats(&self) {
164 self.stats.reset();
165 }
166}
167
168#[derive(Debug, thiserror::Error)]
170pub enum SimdJsonError {
171 #[error("JSON parse error: {0}")]
172 ParseError(String),
173
174 #[error("Invalid UTF-8 in JSON: {0}")]
175 Utf8Error(#[from] std::str::Utf8Error),
176
177 #[error("Buffer too small for parsing")]
178 BufferTooSmall,
179}
180
181pub struct BatchEventParser {
183 parser: SimdJsonParser,
184 buffer_pool: Vec<Vec<u8>>,
186 max_batch_size: usize,
188}
189
190impl BatchEventParser {
191 pub fn new(max_batch_size: usize) -> Self {
193 Self {
194 parser: SimdJsonParser::new(),
195 buffer_pool: Vec::with_capacity(max_batch_size),
196 max_batch_size,
197 }
198 }
199
200 pub fn parse_events<T: DeserializeOwned>(
202 &mut self,
203 events: &[String],
204 ) -> Vec<Result<T, SimdJsonError>> {
205 self.buffer_pool.clear();
207 self.buffer_pool
208 .extend(events.iter().map(|e| e.as_bytes().to_vec()));
209
210 self.parser.parse_batch(&mut self.buffer_pool)
211 }
212
213 pub fn parse_events_bytes<T: DeserializeOwned>(
215 &self,
216 events: &mut [Vec<u8>],
217 ) -> Vec<Result<T, SimdJsonError>> {
218 self.parser.parse_batch(events)
219 }
220
221 pub fn stats(&self) -> &SimdJsonStats {
223 self.parser.stats()
224 }
225
226 pub fn max_batch_size(&self) -> usize {
228 self.max_batch_size
229 }
230}
231
232pub struct ZeroCopyJson<'a> {
237 tape: simd_json::BorrowedValue<'a>,
238}
239
240impl<'a> ZeroCopyJson<'a> {
241 pub fn parse(data: &'a mut [u8]) -> Result<Self, SimdJsonError> {
243 let tape = simd_json::to_borrowed_value(data)
244 .map_err(|e| SimdJsonError::ParseError(e.to_string()))?;
245 Ok(Self { tape })
246 }
247
248 pub fn get_str(&self, key: &str) -> Option<&str> {
250 self.tape.get(key).and_then(|v| v.as_str())
251 }
252
253 pub fn get_i64(&self, key: &str) -> Option<i64> {
255 self.tape.get(key).and_then(|v| v.as_i64())
256 }
257
258 pub fn get_f64(&self, key: &str) -> Option<f64> {
260 self.tape.get(key).and_then(|v| v.as_f64())
261 }
262
263 pub fn get_bool(&self, key: &str) -> Option<bool> {
265 self.tape.get(key).and_then(|v| v.as_bool())
266 }
267
268 pub fn contains_key(&self, key: &str) -> bool {
270 self.tape.get(key).is_some()
271 }
272
273 pub fn as_value(&self) -> &simd_json::BorrowedValue<'a> {
275 &self.tape
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use serde::Deserialize;
283
284 #[derive(Debug, Deserialize, PartialEq)]
285 struct TestEvent {
286 id: String,
287 #[serde(rename = "type")]
288 event_type: String,
289 value: i64,
290 }
291
292 #[test]
293 fn test_simd_json_parse() {
294 let parser = SimdJsonParser::new();
295 let mut json = r#"{"id":"123","type":"test","value":42}"#
296 .as_bytes()
297 .to_vec();
298
299 let result: TestEvent = parser.parse(&mut json).unwrap();
300 assert_eq!(result.id, "123");
301 assert_eq!(result.event_type, "test");
302 assert_eq!(result.value, 42);
303 }
304
305 #[test]
306 fn test_simd_json_parse_str() {
307 let parser = SimdJsonParser::new();
308 let json = r#"{"id":"456","type":"event","value":100}"#;
309
310 let result: TestEvent = parser.parse_str(json).unwrap();
311 assert_eq!(result.id, "456");
312 assert_eq!(result.event_type, "event");
313 assert_eq!(result.value, 100);
314 }
315
316 #[test]
317 fn test_batch_parsing() {
318 let parser = SimdJsonParser::new();
319 let mut docs: Vec<Vec<u8>> = vec![
320 r#"{"id":"1","type":"a","value":1}"#.as_bytes().to_vec(),
321 r#"{"id":"2","type":"b","value":2}"#.as_bytes().to_vec(),
322 r#"{"id":"3","type":"c","value":3}"#.as_bytes().to_vec(),
323 ];
324
325 let results: Vec<Result<TestEvent, _>> = parser.parse_batch(&mut docs);
326 assert_eq!(results.len(), 3);
327
328 for (i, result) in results.into_iter().enumerate() {
329 let event = result.unwrap();
330 assert_eq!(event.id, (i + 1).to_string());
331 assert_eq!(event.value, (i + 1) as i64);
332 }
333 }
334
335 #[test]
336 fn test_stats_tracking() {
337 let parser = SimdJsonParser::new();
338 let mut json = r#"{"id":"test","type":"event","value":0}"#
339 .as_bytes()
340 .to_vec();
341
342 let _: TestEvent = parser.parse(&mut json).unwrap();
343
344 let stats = parser.stats();
345 assert!(stats.bytes_parsed.load(Ordering::Relaxed) > 0);
346 assert_eq!(stats.documents_parsed.load(Ordering::Relaxed), 1);
347 assert_eq!(stats.parse_errors.load(Ordering::Relaxed), 0);
348 }
349
350 #[test]
351 fn test_parse_error_tracking() {
352 let parser = SimdJsonParser::new();
353 let mut invalid = b"not valid json".to_vec();
354
355 let result: Result<TestEvent, _> = parser.parse(&mut invalid);
356 assert!(result.is_err());
357 assert_eq!(parser.stats().parse_errors.load(Ordering::Relaxed), 1);
358 }
359
360 #[test]
361 fn test_zero_copy_json() {
362 let mut json = r#"{"name":"test","count":42,"active":true}"#
363 .as_bytes()
364 .to_vec();
365
366 let zc = ZeroCopyJson::parse(&mut json).unwrap();
367 assert_eq!(zc.get_str("name"), Some("test"));
368 assert_eq!(zc.get_i64("count"), Some(42));
369 assert_eq!(zc.get_bool("active"), Some(true));
370 assert!(zc.contains_key("name"));
371 assert!(!zc.contains_key("missing"));
372 }
373
374 #[test]
375 fn test_batch_event_parser() {
376 let mut parser = BatchEventParser::new(100);
377 let events = vec![
378 r#"{"id":"a","type":"x","value":10}"#.to_string(),
379 r#"{"id":"b","type":"y","value":20}"#.to_string(),
380 ];
381
382 let results: Vec<Result<TestEvent, _>> = parser.parse_events(&events);
383 assert_eq!(results.len(), 2);
384
385 let first = results[0].as_ref().unwrap();
386 assert_eq!(first.id, "a");
387 assert_eq!(first.value, 10);
388 }
389
390 #[test]
391 fn test_throughput_calculation() {
392 let stats = SimdJsonStats::new();
393 stats.bytes_parsed.store(1_000_000, Ordering::Relaxed);
394 stats.parse_time_ns.store(1_000_000_000, Ordering::Relaxed); stats.documents_parsed.store(10_000, Ordering::Relaxed);
396
397 assert!((stats.throughput_mbps() - 1.0).abs() < 0.001);
398 assert!((stats.docs_per_second() - 10_000.0).abs() < 1.0);
399 }
400}