allsource_core/infrastructure/persistence/
simd_json.rs1use bumpalo::Bump;
14use serde::de::DeserializeOwned;
15use simd_json::prelude::*;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Instant;
18
19#[derive(Debug, Default)]
21pub struct SimdJsonStats {
22 pub bytes_parsed: AtomicU64,
24 pub documents_parsed: AtomicU64,
26 pub parse_time_ns: AtomicU64,
28 pub parse_errors: AtomicU64,
30}
31
32impl SimdJsonStats {
33 pub fn new() -> Self {
35 Self::default()
36 }
37
38 pub fn throughput_mbps(&self) -> f64 {
40 let bytes = self.bytes_parsed.load(Ordering::Relaxed) as f64;
41 let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
42 if time_ns > 0.0 {
43 (bytes / 1_000_000.0) / (time_ns / 1_000_000_000.0)
44 } else {
45 0.0
46 }
47 }
48
49 pub fn docs_per_second(&self) -> f64 {
51 let docs = self.documents_parsed.load(Ordering::Relaxed) as f64;
52 let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
53 if time_ns > 0.0 {
54 docs / (time_ns / 1_000_000_000.0)
55 } else {
56 0.0
57 }
58 }
59
60 fn record_parse(&self, bytes: usize, duration_ns: u64) {
62 self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
63 self.documents_parsed.fetch_add(1, Ordering::Relaxed);
64 self.parse_time_ns.fetch_add(duration_ns, Ordering::Relaxed);
65 }
66
67 fn record_error(&self) {
69 self.parse_errors.fetch_add(1, Ordering::Relaxed);
70 }
71
72 pub fn reset(&self) {
74 self.bytes_parsed.store(0, Ordering::Relaxed);
75 self.documents_parsed.store(0, Ordering::Relaxed);
76 self.parse_time_ns.store(0, Ordering::Relaxed);
77 self.parse_errors.store(0, Ordering::Relaxed);
78 }
79}
80
81pub struct SimdJsonParser {
83 stats: SimdJsonStats,
84}
85
86impl Default for SimdJsonParser {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl SimdJsonParser {
93 pub fn new() -> Self {
95 Self {
96 stats: SimdJsonStats::new(),
97 }
98 }
99
100 pub fn parse<T: DeserializeOwned>(&self, data: &mut [u8]) -> Result<T, SimdJsonError> {
108 let start = Instant::now();
109 let len = data.len();
110
111 match simd_json::from_slice(data) {
112 Ok(value) => {
113 let duration = start.elapsed().as_nanos() as u64;
114 self.stats.record_parse(len, duration);
115 Ok(value)
116 }
117 Err(e) => {
118 self.stats.record_error();
119 Err(SimdJsonError::ParseError(e.to_string()))
120 }
121 }
122 }
123
124 pub fn parse_str<T: DeserializeOwned>(&self, data: &str) -> Result<T, SimdJsonError> {
129 let mut bytes = data.as_bytes().to_vec();
130 self.parse(&mut bytes)
131 }
132
133 pub fn parse_batch<T: DeserializeOwned>(
137 &self,
138 documents: &mut [Vec<u8>],
139 ) -> Vec<Result<T, SimdJsonError>> {
140 documents.iter_mut().map(|doc| self.parse(doc)).collect()
141 }
142
143 pub fn parse_with_arena<'a, T: DeserializeOwned>(
148 &self,
149 data: &mut [u8],
150 _arena: &'a Bump,
151 ) -> Result<T, SimdJsonError> {
152 self.parse(data)
155 }
156
157 pub fn stats(&self) -> &SimdJsonStats {
159 &self.stats
160 }
161
162 pub fn reset_stats(&self) {
164 self.stats.reset();
165 }
166}
167
168#[derive(Debug, thiserror::Error)]
170pub enum SimdJsonError {
171 #[error("JSON parse error: {0}")]
172 ParseError(String),
173
174 #[error("Invalid UTF-8 in JSON: {0}")]
175 Utf8Error(#[from] std::str::Utf8Error),
176
177 #[error("Buffer too small for parsing")]
178 BufferTooSmall,
179}
180
181pub struct BatchEventParser {
183 parser: SimdJsonParser,
184 buffer_pool: Vec<Vec<u8>>,
186 max_batch_size: usize,
188}
189
190impl BatchEventParser {
191 pub fn new(max_batch_size: usize) -> Self {
193 Self {
194 parser: SimdJsonParser::new(),
195 buffer_pool: Vec::with_capacity(max_batch_size),
196 max_batch_size,
197 }
198 }
199
200 pub fn parse_events<T: DeserializeOwned>(
202 &mut self,
203 events: &[String],
204 ) -> Vec<Result<T, SimdJsonError>> {
205 self.buffer_pool.clear();
207 self.buffer_pool
208 .extend(events.iter().map(|e| e.as_bytes().to_vec()));
209
210 self.parser.parse_batch(&mut self.buffer_pool)
211 }
212
213 pub fn parse_events_bytes<T: DeserializeOwned>(
215 &self,
216 events: &mut [Vec<u8>],
217 ) -> Vec<Result<T, SimdJsonError>> {
218 self.parser.parse_batch(events)
219 }
220
221 pub fn stats(&self) -> &SimdJsonStats {
223 self.parser.stats()
224 }
225
226 pub fn max_batch_size(&self) -> usize {
228 self.max_batch_size
229 }
230}
231
232pub struct ZeroCopyJson<'a> {
237 tape: simd_json::BorrowedValue<'a>,
238}
239
240impl<'a> ZeroCopyJson<'a> {
241 pub fn parse(data: &'a mut [u8]) -> Result<Self, SimdJsonError> {
243 let tape = simd_json::to_borrowed_value(data)
244 .map_err(|e| SimdJsonError::ParseError(e.to_string()))?;
245 Ok(Self { tape })
246 }
247
248 pub fn get_str(&self, key: &str) -> Option<&str> {
250 self.tape.get(key).and_then(|v| v.as_str())
251 }
252
253 pub fn get_i64(&self, key: &str) -> Option<i64> {
255 self.tape.get(key).and_then(|v| v.as_i64())
256 }
257
258 pub fn get_f64(&self, key: &str) -> Option<f64> {
260 self.tape.get(key).and_then(|v| v.as_f64())
261 }
262
263 pub fn get_bool(&self, key: &str) -> Option<bool> {
265 self.tape.get(key).and_then(|v| v.as_bool())
266 }
267
268 pub fn contains_key(&self, key: &str) -> bool {
270 self.tape.get(key).is_some()
271 }
272
273 pub fn as_value(&self) -> &simd_json::BorrowedValue<'a> {
275 &self.tape
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use serde::Deserialize;
283
284 #[derive(Debug, Deserialize, PartialEq)]
285 struct TestEvent {
286 id: String,
287 #[serde(rename = "type")]
288 event_type: String,
289 value: i64,
290 }
291
292 #[test]
293 fn test_simd_json_parse() {
294 let parser = SimdJsonParser::new();
295 let mut json = r#"{"id":"123","type":"test","value":42}"#.as_bytes().to_vec();
296
297 let result: TestEvent = parser.parse(&mut json).unwrap();
298 assert_eq!(result.id, "123");
299 assert_eq!(result.event_type, "test");
300 assert_eq!(result.value, 42);
301 }
302
303 #[test]
304 fn test_simd_json_parse_str() {
305 let parser = SimdJsonParser::new();
306 let json = r#"{"id":"456","type":"event","value":100}"#;
307
308 let result: TestEvent = parser.parse_str(json).unwrap();
309 assert_eq!(result.id, "456");
310 assert_eq!(result.event_type, "event");
311 assert_eq!(result.value, 100);
312 }
313
314 #[test]
315 fn test_batch_parsing() {
316 let parser = SimdJsonParser::new();
317 let mut docs: Vec<Vec<u8>> = vec![
318 r#"{"id":"1","type":"a","value":1}"#.as_bytes().to_vec(),
319 r#"{"id":"2","type":"b","value":2}"#.as_bytes().to_vec(),
320 r#"{"id":"3","type":"c","value":3}"#.as_bytes().to_vec(),
321 ];
322
323 let results: Vec<Result<TestEvent, _>> = parser.parse_batch(&mut docs);
324 assert_eq!(results.len(), 3);
325
326 for (i, result) in results.into_iter().enumerate() {
327 let event = result.unwrap();
328 assert_eq!(event.id, (i + 1).to_string());
329 assert_eq!(event.value, (i + 1) as i64);
330 }
331 }
332
333 #[test]
334 fn test_stats_tracking() {
335 let parser = SimdJsonParser::new();
336 let mut json = r#"{"id":"test","type":"event","value":0}"#.as_bytes().to_vec();
337
338 let _: TestEvent = parser.parse(&mut json).unwrap();
339
340 let stats = parser.stats();
341 assert!(stats.bytes_parsed.load(Ordering::Relaxed) > 0);
342 assert_eq!(stats.documents_parsed.load(Ordering::Relaxed), 1);
343 assert_eq!(stats.parse_errors.load(Ordering::Relaxed), 0);
344 }
345
346 #[test]
347 fn test_parse_error_tracking() {
348 let parser = SimdJsonParser::new();
349 let mut invalid = b"not valid json".to_vec();
350
351 let result: Result<TestEvent, _> = parser.parse(&mut invalid);
352 assert!(result.is_err());
353 assert_eq!(parser.stats().parse_errors.load(Ordering::Relaxed), 1);
354 }
355
356 #[test]
357 fn test_zero_copy_json() {
358 let mut json = r#"{"name":"test","count":42,"active":true}"#.as_bytes().to_vec();
359
360 let zc = ZeroCopyJson::parse(&mut json).unwrap();
361 assert_eq!(zc.get_str("name"), Some("test"));
362 assert_eq!(zc.get_i64("count"), Some(42));
363 assert_eq!(zc.get_bool("active"), Some(true));
364 assert!(zc.contains_key("name"));
365 assert!(!zc.contains_key("missing"));
366 }
367
368 #[test]
369 fn test_batch_event_parser() {
370 let mut parser = BatchEventParser::new(100);
371 let events = vec![
372 r#"{"id":"a","type":"x","value":10}"#.to_string(),
373 r#"{"id":"b","type":"y","value":20}"#.to_string(),
374 ];
375
376 let results: Vec<Result<TestEvent, _>> = parser.parse_events(&events);
377 assert_eq!(results.len(), 2);
378
379 let first = results[0].as_ref().unwrap();
380 assert_eq!(first.id, "a");
381 assert_eq!(first.value, 10);
382 }
383
384 #[test]
385 fn test_throughput_calculation() {
386 let stats = SimdJsonStats::new();
387 stats.bytes_parsed.store(1_000_000, Ordering::Relaxed);
388 stats.parse_time_ns.store(1_000_000_000, Ordering::Relaxed); stats.documents_parsed.store(10_000, Ordering::Relaxed);
390
391 assert!((stats.throughput_mbps() - 1.0).abs() < 0.001);
392 assert!((stats.docs_per_second() - 10_000.0).abs() < 1.0);
393 }
394}