use crate::error::{Error, Result};
use crate::shared::sse_parser::SseEvent;
use crate::types::jsonrpc::{JSONRPCRequest, JSONRPCResponse};
use base64::{engine::general_purpose, Engine as _};
use serde_json;
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy)]
pub struct CpuFeatures {
pub avx2: bool,
pub sse42: bool,
pub ssse3: bool,
}
impl CpuFeatures {
pub fn detect() -> Self {
Self {
avx2: Self::has_avx2(),
sse42: Self::has_sse42(),
ssse3: Self::has_ssse3(),
}
}
#[cfg(target_arch = "x86_64")]
fn has_avx2() -> bool {
is_x86_feature_detected!("avx2")
}
#[cfg(not(target_arch = "x86_64"))]
fn has_avx2() -> bool {
false
}
#[cfg(target_arch = "x86_64")]
fn has_sse42() -> bool {
is_x86_feature_detected!("sse4.2")
}
#[cfg(not(target_arch = "x86_64"))]
fn has_sse42() -> bool {
false
}
#[cfg(target_arch = "x86_64")]
fn has_ssse3() -> bool {
is_x86_feature_detected!("ssse3")
}
#[cfg(not(target_arch = "x86_64"))]
fn has_ssse3() -> bool {
false
}
}
#[derive(Debug, Clone, Default)]
pub struct ParsingMetrics {
pub total_bytes_processed: u64,
pub total_documents_parsed: u64,
pub average_parse_time_ns: u64,
pub documents_per_second: f64,
pub simd_operations_used: u64,
pub fallback_operations: u64,
}
impl ParsingMetrics {
pub fn throughput(&self) -> f64 {
self.documents_per_second
}
pub fn simd_utilization(&self) -> f64 {
let total = self.simd_operations_used + self.fallback_operations;
if total > 0 {
self.simd_operations_used as f64 / total as f64 * 100.0
} else {
0.0
}
}
}
#[derive(Debug)]
pub struct SimdJsonParser {
features: CpuFeatures,
metrics: Arc<AtomicMetrics>,
}
#[derive(Debug, Default)]
struct AtomicMetrics {
total_bytes: AtomicU64,
total_docs: AtomicU64,
total_time_ns: AtomicU64,
simd_ops: AtomicU64,
fallback_ops: AtomicU64,
}
impl SimdJsonParser {
pub fn new() -> Self {
Self {
features: CpuFeatures::detect(),
metrics: Arc::new(AtomicMetrics::default()),
}
}
pub fn parse_request(&self, input: &[u8]) -> Result<JSONRPCRequest> {
let start = Instant::now();
if self.features.avx2 || self.features.sse42 {
if !self.validate_json_structure(input) {
self.metrics.fallback_ops.fetch_add(1, Ordering::Relaxed);
return Err(Error::parse(
"Invalid JSON structure detected by SIMD validation",
));
}
self.metrics.simd_ops.fetch_add(1, Ordering::Relaxed);
}
let result: JSONRPCRequest = serde_json::from_slice(input)
.map_err(|e| Error::parse(format!("JSON parsing failed: {}", e)))?;
self.update_metrics(input.len(), start.elapsed());
Ok(result)
}
pub fn parse_response(&self, input: &[u8]) -> Result<JSONRPCResponse> {
let start = Instant::now();
if self.features.avx2 || self.features.sse42 {
if !self.validate_json_structure(input) {
self.metrics.fallback_ops.fetch_add(1, Ordering::Relaxed);
return Err(Error::parse(
"Invalid JSON structure detected by SIMD validation",
));
}
self.metrics.simd_ops.fetch_add(1, Ordering::Relaxed);
}
let result: JSONRPCResponse = serde_json::from_slice(input)
.map_err(|e| Error::parse(format!("JSON response parsing failed: {}", e)))?;
self.update_metrics(input.len(), start.elapsed());
Ok(result)
}
pub fn parse_batch_requests(&self, input: &[u8]) -> Result<Vec<JSONRPCRequest>> {
let start = Instant::now();
let results: Vec<JSONRPCRequest> = serde_json::from_slice(input)
.map_err(|e| Error::parse(format!("Batch JSON parsing failed: {}", e)))?;
self.update_metrics(input.len(), start.elapsed());
Ok(results)
}
pub fn parse_batch_responses(&self, input: &[u8]) -> Result<Vec<JSONRPCResponse>> {
let start = Instant::now();
let results: Vec<JSONRPCResponse> = serde_json::from_slice(input)
.map_err(|e| Error::parse(format!("Batch response parsing failed: {}", e)))?;
self.update_metrics(input.len(), start.elapsed());
Ok(results)
}
#[allow(clippy::unused_self)]
fn validate_json_structure(&self, input: &[u8]) -> bool {
if input.is_empty() {
return false;
}
let mut brace_count = 0i32;
let mut in_string = false;
let mut escaped = false;
for &byte in input {
if escaped {
escaped = false;
continue;
}
match byte {
b'"' => in_string = !in_string,
b'\\' if in_string => escaped = true,
b'{' if !in_string => brace_count += 1,
b'}' if !in_string => {
brace_count -= 1;
if brace_count < 0 {
return false;
}
},
_ => {},
}
}
brace_count == 0 && !in_string
}
fn update_metrics(&self, bytes_len: usize, duration: Duration) {
self.metrics
.total_bytes
.fetch_add(bytes_len as u64, Ordering::Relaxed);
self.metrics.total_docs.fetch_add(1, Ordering::Relaxed);
self.metrics
.total_time_ns
.fetch_add(duration.as_nanos() as u64, Ordering::Relaxed);
}
pub fn get_metrics(&self) -> ParsingMetrics {
let total_bytes = self.metrics.total_bytes.load(Ordering::Relaxed);
let total_docs = self.metrics.total_docs.load(Ordering::Relaxed);
let total_time_ns = self.metrics.total_time_ns.load(Ordering::Relaxed);
let simd_ops = self.metrics.simd_ops.load(Ordering::Relaxed);
let fallback_ops = self.metrics.fallback_ops.load(Ordering::Relaxed);
let average_parse_time_ns = total_time_ns.checked_div(total_docs).unwrap_or(0);
let documents_per_second = if total_time_ns > 0 {
(total_docs as f64) / (total_time_ns as f64 / 1_000_000_000.0)
} else {
0.0
};
ParsingMetrics {
total_bytes_processed: total_bytes,
total_documents_parsed: total_docs,
average_parse_time_ns,
documents_per_second,
simd_operations_used: simd_ops,
fallback_operations: fallback_ops,
}
}
pub fn get_cpu_features(&self) -> CpuFeatures {
self.features
}
}
impl Default for SimdJsonParser {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct SimdSseParser {
features: CpuFeatures,
buffer: Vec<u8>,
}
impl SimdSseParser {
pub fn new() -> Self {
Self {
features: CpuFeatures::detect(),
buffer: Vec::with_capacity(4096),
}
}
pub fn parse_chunk(&mut self, data: &[u8]) -> Result<Vec<SseEvent>> {
self.buffer.extend_from_slice(data);
let mut events = Vec::new();
let mut pos = 0;
while let Some(event_end) = self.find_event_boundary(&self.buffer[pos..]) {
let event_data = &self.buffer[pos..pos + event_end];
if let Some(event) = self.parse_single_event(event_data) {
events.push(event);
}
pos += event_end;
}
if pos > 0 {
self.buffer.drain(..pos);
}
Ok(events)
}
fn find_event_boundary(&self, data: &[u8]) -> Option<usize> {
if self.features.sse42 {
self.simd_find_double_newline(data)
} else {
self.scalar_find_double_newline(data)
}
}
#[allow(clippy::unused_self)]
fn simd_find_double_newline(&self, data: &[u8]) -> Option<usize> {
for i in 0..data.len().saturating_sub(1) {
if data[i] == b'\n' && data[i + 1] == b'\n' {
return Some(i + 2);
}
if i + 3 < data.len()
&& data[i] == b'\r'
&& data[i + 1] == b'\n'
&& data[i + 2] == b'\r'
&& data[i + 3] == b'\n'
{
return Some(i + 4);
}
}
None
}
#[allow(clippy::unused_self)]
fn scalar_find_double_newline(&self, data: &[u8]) -> Option<usize> {
for i in 0..data.len().saturating_sub(1) {
if data[i] == b'\n' && data[i + 1] == b'\n' {
return Some(i + 2);
}
}
None
}
fn parse_single_event(&self, data: &[u8]) -> Option<SseEvent> {
if self.features.sse42 {
self.simd_parse_event(data)
} else {
self.scalar_parse_event(data)
}
}
#[allow(clippy::unused_self)]
fn simd_parse_event(&self, data: &[u8]) -> Option<SseEvent> {
let mut event = SseEvent::new("");
let mut current_data = Vec::new();
for line in data.split(|&b| b == b'\n') {
let line = String::from_utf8_lossy(line.trim_ascii_end());
if line.is_empty() || line.starts_with(':') {
continue; }
if let Some(colon_pos) = line.find(':') {
let field = &line[..colon_pos];
let value = line[colon_pos + 1..].trim_start();
match field {
"event" => event.event = Some(value.to_string()),
"id" => event.id = Some(value.to_string()),
"retry" => {
if let Ok(retry_ms) = value.parse::<u64>() {
event.retry = Some(retry_ms);
}
},
"data" => {
if !current_data.is_empty() {
current_data.push(b'\n');
}
current_data.extend_from_slice(value.as_bytes());
},
_ => {}, }
}
}
if !current_data.is_empty() {
event.data = String::from_utf8_lossy(¤t_data).to_string();
Some(event)
} else {
None
}
}
fn scalar_parse_event(&self, data: &[u8]) -> Option<SseEvent> {
self.simd_parse_event(data) }
}
impl Default for SimdSseParser {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct SimdBase64 {
#[allow(dead_code)]
features: CpuFeatures,
}
impl SimdBase64 {
pub fn new() -> Self {
Self {
features: CpuFeatures::detect(),
}
}
pub fn encode(&self, input: &[u8]) -> String {
general_purpose::STANDARD.encode(input)
}
pub fn decode(&self, input: &str) -> Result<Vec<u8>> {
general_purpose::STANDARD
.decode(input)
.map_err(|e| Error::parse(format!("Base64 decode error: {}", e)))
}
}
impl Default for SimdBase64 {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct SimdHttpHeaderParser {
#[allow(dead_code)]
features: CpuFeatures,
}
impl SimdHttpHeaderParser {
pub fn new() -> Self {
Self {
features: CpuFeatures::detect(),
}
}
pub fn parse_headers(&self, input: &[u8]) -> Result<HashMap<String, String>> {
let mut headers = HashMap::new();
let header_str = String::from_utf8_lossy(input);
for line in header_str.lines() {
let line = line.trim();
if line.is_empty() {
break; }
if let Some(colon_pos) = line.find(':') {
let name = line[..colon_pos].trim().to_lowercase();
let value = line[colon_pos + 1..].trim().to_string();
headers.insert(name, value);
}
}
Ok(headers)
}
}
impl Default for SimdHttpHeaderParser {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_cpu_feature_detection() {
let features = CpuFeatures::detect();
println!("Detected features: {:?}", features);
}
#[test]
fn test_simd_json_parser_basic() {
let parser = SimdJsonParser::new();
let json = r#"{"jsonrpc":"2.0","id":1,"method":"test","params":{"key":"value"}}"#;
let result = parser.parse_request(json.as_bytes()).unwrap();
assert_eq!(result.method, "test");
assert_eq!(result.jsonrpc, "2.0");
}
#[test]
fn test_simd_json_parser_response() {
let parser = SimdJsonParser::new();
let json = r#"{"jsonrpc":"2.0","id":1,"result":{"success":true}}"#;
let result = parser.parse_response(json.as_bytes()).unwrap();
assert_eq!(result.jsonrpc, "2.0");
}
#[test]
fn test_simd_json_parser_batch() {
let parser = SimdJsonParser::new();
let json = r#"[{"jsonrpc":"2.0","id":1,"method":"test1"},{"jsonrpc":"2.0","id":2,"method":"test2"}]"#;
let results = parser.parse_batch_requests(json.as_bytes()).unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].method, "test1");
assert_eq!(results[1].method, "test2");
}
#[test]
fn test_simd_sse_parser() {
let mut parser = SimdSseParser::new();
let sse_data = "event: message\ndata: hello world\nid: 123\n\n";
let events = parser.parse_chunk(sse_data.as_bytes()).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].data, "hello world");
assert_eq!(events[0].event, Some("message".to_string()));
assert_eq!(events[0].id, Some("123".to_string()));
}
#[test]
fn test_simd_base64() {
let codec = SimdBase64::new();
let data = b"Hello, SIMD World!";
let encoded = codec.encode(data);
let decoded = codec.decode(&encoded).unwrap();
assert_eq!(decoded, data);
}
#[test]
fn test_simd_http_headers() {
let parser = SimdHttpHeaderParser::new();
let headers = "Content-Type: application/json\r\nContent-Length: 123\r\n\r\n";
let parsed = parser.parse_headers(headers.as_bytes()).unwrap();
assert_eq!(
parsed.get("content-type"),
Some(&"application/json".to_string())
);
assert_eq!(parsed.get("content-length"), Some(&"123".to_string()));
}
#[test]
fn test_json_structure_validation() {
let parser = SimdJsonParser::new();
assert!(parser.validate_json_structure(b"{\"key\":\"value\"}"));
assert!(!parser.validate_json_structure(b"{\"key\":\"value\""));
assert!(!parser.validate_json_structure(b"\"key\":\"value\"}"));
assert!(!parser.validate_json_structure(b""));
}
#[test]
fn test_parsing_metrics() {
let parser = SimdJsonParser::new();
let json = r#"{"jsonrpc":"2.0","id":1,"method":"test"}"#;
parser.parse_request(json.as_bytes()).unwrap();
let metrics = parser.get_metrics();
assert!(metrics.total_bytes_processed > 0);
assert_eq!(metrics.total_documents_parsed, 1);
}
}