use bumpalo::Bump;
use serde::de::DeserializeOwned;
use simd_json::prelude::*;
use std::{
sync::atomic::{AtomicU64, Ordering},
time::Instant,
};
#[derive(Debug, Default)]
pub struct SimdJsonStats {
pub bytes_parsed: AtomicU64,
pub documents_parsed: AtomicU64,
pub parse_time_ns: AtomicU64,
pub parse_errors: AtomicU64,
}
impl SimdJsonStats {
pub fn new() -> Self {
Self::default()
}
pub fn throughput_mbps(&self) -> f64 {
let bytes = self.bytes_parsed.load(Ordering::Relaxed) as f64;
let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
if time_ns > 0.0 {
(bytes / 1_000_000.0) / (time_ns / 1_000_000_000.0)
} else {
0.0
}
}
pub fn docs_per_second(&self) -> f64 {
let docs = self.documents_parsed.load(Ordering::Relaxed) as f64;
let time_ns = self.parse_time_ns.load(Ordering::Relaxed) as f64;
if time_ns > 0.0 {
docs / (time_ns / 1_000_000_000.0)
} else {
0.0
}
}
fn record_parse(&self, bytes: usize, duration_ns: u64) {
self.bytes_parsed.fetch_add(bytes as u64, Ordering::Relaxed);
self.documents_parsed.fetch_add(1, Ordering::Relaxed);
self.parse_time_ns.fetch_add(duration_ns, Ordering::Relaxed);
}
fn record_error(&self) {
self.parse_errors.fetch_add(1, Ordering::Relaxed);
}
pub fn reset(&self) {
self.bytes_parsed.store(0, Ordering::Relaxed);
self.documents_parsed.store(0, Ordering::Relaxed);
self.parse_time_ns.store(0, Ordering::Relaxed);
self.parse_errors.store(0, Ordering::Relaxed);
}
}
pub struct SimdJsonParser {
stats: SimdJsonStats,
}
impl Default for SimdJsonParser {
fn default() -> Self {
Self::new()
}
}
impl SimdJsonParser {
pub fn new() -> Self {
Self {
stats: SimdJsonStats::new(),
}
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn parse<T: DeserializeOwned>(&self, data: &mut [u8]) -> Result<T, SimdJsonError> {
let start = Instant::now();
let len = data.len();
match simd_json::from_slice(data) {
Ok(value) => {
let duration = start.elapsed().as_nanos() as u64;
self.stats.record_parse(len, duration);
Ok(value)
}
Err(e) => {
self.stats.record_error();
Err(SimdJsonError::ParseError(e.to_string()))
}
}
}
pub fn parse_str<T: DeserializeOwned>(&self, data: &str) -> Result<T, SimdJsonError> {
let mut bytes = data.as_bytes().to_vec();
self.parse(&mut bytes)
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn parse_batch<T: DeserializeOwned>(
&self,
documents: &mut [Vec<u8>],
) -> Vec<Result<T, SimdJsonError>> {
documents.iter_mut().map(|doc| self.parse(doc)).collect()
}
pub fn parse_with_arena<T: DeserializeOwned>(
&self,
data: &mut [u8],
_arena: &Bump,
) -> Result<T, SimdJsonError> {
self.parse(data)
}
pub fn stats(&self) -> &SimdJsonStats {
&self.stats
}
pub fn reset_stats(&self) {
self.stats.reset();
}
}
#[derive(Debug, thiserror::Error)]
pub enum SimdJsonError {
#[error("JSON parse error: {0}")]
ParseError(String),
#[error("Invalid UTF-8 in JSON: {0}")]
Utf8Error(#[from] std::str::Utf8Error),
#[error("Buffer too small for parsing")]
BufferTooSmall,
}
pub struct BatchEventParser {
parser: SimdJsonParser,
buffer_pool: Vec<Vec<u8>>,
max_batch_size: usize,
}
impl BatchEventParser {
pub fn new(max_batch_size: usize) -> Self {
Self {
parser: SimdJsonParser::new(),
buffer_pool: Vec::with_capacity(max_batch_size),
max_batch_size,
}
}
pub fn parse_events<T: DeserializeOwned>(
&mut self,
events: &[String],
) -> Vec<Result<T, SimdJsonError>> {
self.buffer_pool.clear();
self.buffer_pool
.extend(events.iter().map(|e| e.as_bytes().to_vec()));
self.parser.parse_batch(&mut self.buffer_pool)
}
#[cfg_attr(feature = "hotpath", hotpath::measure)]
pub fn parse_events_bytes<T: DeserializeOwned>(
&self,
events: &mut [Vec<u8>],
) -> Vec<Result<T, SimdJsonError>> {
self.parser.parse_batch(events)
}
pub fn stats(&self) -> &SimdJsonStats {
self.parser.stats()
}
pub fn max_batch_size(&self) -> usize {
self.max_batch_size
}
}
pub struct ZeroCopyJson<'a> {
tape: simd_json::BorrowedValue<'a>,
}
impl<'a> ZeroCopyJson<'a> {
pub fn parse(data: &'a mut [u8]) -> Result<Self, SimdJsonError> {
let tape = simd_json::to_borrowed_value(data)
.map_err(|e| SimdJsonError::ParseError(e.to_string()))?;
Ok(Self { tape })
}
pub fn get_str(&self, key: &str) -> Option<&str> {
self.tape.get(key).and_then(|v| v.as_str())
}
pub fn get_i64(&self, key: &str) -> Option<i64> {
self.tape
.get(key)
.and_then(simd_json::prelude::ValueAsScalar::as_i64)
}
pub fn get_f64(&self, key: &str) -> Option<f64> {
self.tape
.get(key)
.and_then(simd_json::prelude::ValueAsScalar::as_f64)
}
pub fn get_bool(&self, key: &str) -> Option<bool> {
self.tape
.get(key)
.and_then(simd_json::prelude::ValueAsScalar::as_bool)
}
pub fn contains_key(&self, key: &str) -> bool {
self.tape.get(key).is_some()
}
pub fn as_value(&self) -> &simd_json::BorrowedValue<'a> {
&self.tape
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
#[derive(Debug, Deserialize, PartialEq)]
struct TestEvent {
id: String,
#[serde(rename = "type")]
event_type: String,
value: i64,
}
#[test]
fn test_simd_json_parse() {
let parser = SimdJsonParser::new();
let mut json = r#"{"id":"123","type":"test","value":42}"#.as_bytes().to_vec();
let result: TestEvent = parser.parse(&mut json).unwrap();
assert_eq!(result.id, "123");
assert_eq!(result.event_type, "test");
assert_eq!(result.value, 42);
}
#[test]
fn test_simd_json_parse_str() {
let parser = SimdJsonParser::new();
let json = r#"{"id":"456","type":"event","value":100}"#;
let result: TestEvent = parser.parse_str(json).unwrap();
assert_eq!(result.id, "456");
assert_eq!(result.event_type, "event");
assert_eq!(result.value, 100);
}
#[test]
fn test_batch_parsing() {
let parser = SimdJsonParser::new();
let mut docs: Vec<Vec<u8>> = vec![
r#"{"id":"1","type":"a","value":1}"#.as_bytes().to_vec(),
r#"{"id":"2","type":"b","value":2}"#.as_bytes().to_vec(),
r#"{"id":"3","type":"c","value":3}"#.as_bytes().to_vec(),
];
let results: Vec<Result<TestEvent, _>> = parser.parse_batch(&mut docs);
assert_eq!(results.len(), 3);
for (i, result) in results.into_iter().enumerate() {
let event = result.unwrap();
assert_eq!(event.id, (i + 1).to_string());
assert_eq!(event.value, (i + 1) as i64);
}
}
#[test]
fn test_stats_tracking() {
let parser = SimdJsonParser::new();
let mut json = r#"{"id":"test","type":"event","value":0}"#.as_bytes().to_vec();
let _: TestEvent = parser.parse(&mut json).unwrap();
let stats = parser.stats();
assert!(stats.bytes_parsed.load(Ordering::Relaxed) > 0);
assert_eq!(stats.documents_parsed.load(Ordering::Relaxed), 1);
assert_eq!(stats.parse_errors.load(Ordering::Relaxed), 0);
}
#[test]
fn test_parse_error_tracking() {
let parser = SimdJsonParser::new();
let mut invalid = b"not valid json".to_vec();
let result: Result<TestEvent, _> = parser.parse(&mut invalid);
assert!(result.is_err());
assert_eq!(parser.stats().parse_errors.load(Ordering::Relaxed), 1);
}
#[test]
fn test_zero_copy_json() {
let mut json = r#"{"name":"test","count":42,"active":true}"#.as_bytes().to_vec();
let zc = ZeroCopyJson::parse(&mut json).unwrap();
assert_eq!(zc.get_str("name"), Some("test"));
assert_eq!(zc.get_i64("count"), Some(42));
assert_eq!(zc.get_bool("active"), Some(true));
assert!(zc.contains_key("name"));
assert!(!zc.contains_key("missing"));
}
#[test]
fn test_batch_event_parser() {
let mut parser = BatchEventParser::new(100);
let events = vec![
r#"{"id":"a","type":"x","value":10}"#.to_string(),
r#"{"id":"b","type":"y","value":20}"#.to_string(),
];
let results: Vec<Result<TestEvent, _>> = parser.parse_events(&events);
assert_eq!(results.len(), 2);
let first = results[0].as_ref().unwrap();
assert_eq!(first.id, "a");
assert_eq!(first.value, 10);
}
#[test]
fn test_throughput_calculation() {
let stats = SimdJsonStats::new();
stats.bytes_parsed.store(1_000_000, Ordering::Relaxed);
stats.parse_time_ns.store(1_000_000_000, Ordering::Relaxed); stats.documents_parsed.store(10_000, Ordering::Relaxed);
assert!((stats.throughput_mbps() - 1.0).abs() < 0.001);
assert!((stats.docs_per_second() - 10_000.0).abs() < 1.0);
}
}