1use crate::{
8 error::{Error, Result},
9 frame::Frame,
10 frame::{FrameFlags, FrameHeader},
11 security::{DepthTracker, SecurityValidator},
12 semantic::{NumericDType, SemanticMeta, SemanticType},
13};
14use bytes::Bytes;
15use smallvec::SmallVec;
16use sonic_rs::{JsonContainerTrait, JsonNumberTrait, JsonValueTrait, Value as SonicValue};
17
18#[inline(always)]
20fn unlikely(b: bool) -> bool {
21 b
22}
23
24#[derive(Debug, Clone)]
26pub struct SonicConfig {
27 pub detect_semantics: bool,
29 pub max_input_size: usize,
31}
32
33impl Default for SonicConfig {
34 fn default() -> Self {
35 Self {
36 detect_semantics: true,
37 max_input_size: 100 * 1024 * 1024, }
39 }
40}
41
42pub struct SonicParser {
44 config: SonicConfig,
45 validator: SecurityValidator,
46 stats: std::cell::RefCell<SonicStats>,
47}
48
49#[derive(Debug, Default, Clone)]
51pub struct SonicStats {
52 pub total_parses: u64,
53 pub sonic_successes: u64,
54 pub serde_fallbacks: u64,
55 pub avg_parse_time_ns: u64,
56 pub bytes_processed: u64,
57}
58
59impl SonicParser {
60 pub fn new() -> Self {
62 Self {
63 config: SonicConfig::default(),
64 validator: SecurityValidator::default(),
65 stats: std::cell::RefCell::new(SonicStats::default()),
66 }
67 }
68
69 pub fn with_config(config: SonicConfig) -> Self {
71 Self {
72 config,
73 validator: SecurityValidator::default(),
74 stats: std::cell::RefCell::new(SonicStats::default()),
75 }
76 }
77
78 pub fn with_security_config(
80 config: SonicConfig,
81 security_config: crate::config::SecurityConfig,
82 ) -> Self {
83 Self {
84 config,
85 validator: SecurityValidator::new(security_config),
86 stats: std::cell::RefCell::new(SonicStats::default()),
87 }
88 }
89
90 pub fn parse(&self, input: &[u8]) -> Result<Frame> {
92 let start_time = std::time::Instant::now();
93
94 self.validator.validate_input_size(input.len())?;
96
97 if unlikely(input.len() > self.config.max_input_size) {
99 return Err(Error::Other(format!("Input too large: {}", input.len())));
100 }
101
102 let json_str = std::str::from_utf8(input)
104 .map_err(|e| Error::Other(format!("Invalid UTF-8 input: {}", e)))?;
105
106 self.pre_validate_json_string(json_str)?;
108
109 let value: SonicValue =
111 sonic_rs::from_str(json_str).map_err(|e| Error::invalid_json(0, e.to_string()))?;
112
113 self.validate_json_structure(&value)?;
115
116 let semantic_type = if self.config.detect_semantics && input.len() < 100_000 {
118 self.detect_semantic_type_sonic(&value)
119 } else {
120 SemanticType::Generic
121 };
122
123 let payload = if input.len() < 4096 {
125 Bytes::copy_from_slice(input)
127 } else {
128 Bytes::from(input.to_vec()) };
131
132 let header = FrameHeader {
134 version: 1,
135 flags: FrameFlags::empty(),
136 sequence: 0,
137 length: input.len() as u32,
138 schema_id: 0,
139 checksum: 0,
140 };
141
142 let semantics = if semantic_type != SemanticType::Generic {
143 Some(SemanticMeta::new(semantic_type))
144 } else {
145 None
146 };
147
148 {
150 let mut stats = self.stats.borrow_mut();
151 stats.total_parses += 1;
152 stats.sonic_successes += 1;
153 stats.bytes_processed += input.len() as u64;
154
155 let elapsed_ns = start_time.elapsed().as_nanos() as u64;
156 stats.avg_parse_time_ns = (stats.avg_parse_time_ns * (stats.total_parses - 1)
157 + elapsed_ns)
158 / stats.total_parses;
159 }
160
161 Ok(Frame {
162 header,
163 payload,
164 semantics,
165 })
166 }
167
168 pub fn get_stats(&self) -> SonicStats {
170 self.stats.borrow().clone()
171 }
172
173 fn pre_validate_json_string(&self, json_str: &str) -> Result<()> {
175 let mut depth = 0;
177 let mut max_depth = 0;
178
179 for ch in json_str.chars() {
180 match ch {
181 '{' | '[' => {
182 depth += 1;
183 max_depth = max_depth.max(depth);
184 self.validator.validate_json_depth(max_depth)?;
185 }
186 '}' | ']' => {
187 depth = depth.saturating_sub(1);
188 }
189 _ => {}
190 }
191 }
192
193 Ok(())
194 }
195
196 fn validate_json_structure(&self, value: &SonicValue) -> Result<()> {
198 let mut depth_tracker = DepthTracker::default();
199 self.validate_json_recursive(value, &mut depth_tracker)
200 }
201
202 fn validate_json_recursive(
204 &self,
205 value: &SonicValue,
206 depth_tracker: &mut DepthTracker,
207 ) -> Result<()> {
208 match value {
209 _ if value.is_object() => {
210 depth_tracker.enter()?;
211
212 if let Some(obj) = value.as_object() {
213 self.validator.validate_object_keys(obj.len())?;
215
216 for (key, val) in obj.iter() {
218 self.validator.validate_string_length(key.len())?;
220 self.validate_json_recursive(val, depth_tracker)?;
221 }
222 }
223
224 depth_tracker.exit();
225 }
226 _ if value.is_array() => {
227 depth_tracker.enter()?;
228
229 if let Some(arr) = value.as_array() {
230 self.validator.validate_array_length(arr.len())?;
232
233 for element in arr.iter() {
235 self.validate_json_recursive(element, depth_tracker)?;
236 }
237 }
238
239 depth_tracker.exit();
240 }
241 _ if value.is_str() => {
242 if let Some(s) = value.as_str() {
243 self.validator.validate_string_length(s.len())?;
244 }
245 }
246 _ => {
247 }
249 }
250
251 Ok(())
252 }
253
254 fn detect_semantic_type_sonic(&self, value: &SonicValue) -> SemanticType {
256 if value.is_array()
257 && let Some(arr) = value.as_array()
258 {
259 return self.analyze_array_semantics_simd(arr);
260 }
261
262 if value.is_object()
263 && let Some(obj) = value.as_object()
264 {
265 return self.analyze_object_semantics_simd(obj);
266 }
267
268 SemanticType::Generic
269 }
270
271 fn analyze_object_semantics_simd(&self, obj: &sonic_rs::Object) -> SemanticType {
273 let scan_result = crate::parser::simd::SimdClassifier::scan_object_keys(obj);
274
275 if scan_result.has_type_field && scan_result.has_coordinates {
277 return SemanticType::Geospatial {
278 coordinate_system: "WGS84".to_string(),
279 geometry_type: obj
280 .get(&"type")
281 .and_then(|v| v.as_str())
282 .unwrap_or("Point")
283 .to_string(),
284 };
285 }
286
287 if scan_result.has_timestamp {
289 let timestamp_field = if obj.contains_key(&"timestamp") {
290 "timestamp"
291 } else {
292 "time"
293 };
294
295 let value_fields: SmallVec<[String; 4]> = obj
297 .iter()
298 .filter_map(|(k, v)| {
299 if k != timestamp_field && v.is_number() {
300 Some(k.to_string())
301 } else {
302 None
303 }
304 })
305 .collect();
306
307 if !value_fields.is_empty() {
308 return SemanticType::TimeSeries {
309 timestamp_field: timestamp_field.to_string(),
310 value_fields,
311 interval_ms: None,
312 };
313 }
314 }
315
316 SemanticType::Generic
317 }
318
319 fn analyze_array_semantics_simd(&self, arr: &sonic_rs::Array) -> SemanticType {
321 let len = arr.len();
322 if len == 0 {
323 return SemanticType::Generic;
324 }
325
326 if crate::parser::simd::SimdClassifier::is_numeric_array(arr) {
328 let dtype = if let Some(first) = arr.first() {
329 if let Some(num) = first.as_number() {
330 if num.is_i64() {
331 NumericDType::I64
332 } else if num.is_u64() {
333 NumericDType::U64
334 } else {
335 NumericDType::F64
336 }
337 } else {
338 NumericDType::F64
339 }
340 } else {
341 NumericDType::F64
342 };
343
344 return SemanticType::NumericArray {
345 dtype,
346 length: Some(len),
347 };
348 }
349
350 if len >= 2 {
352 let mut is_time_series = true;
353
354 for value in arr.iter() {
356 if let Some(obj) = value.as_object() {
357 let scan_result = crate::parser::simd::SimdClassifier::scan_object_keys(obj);
358 if !scan_result.has_timestamp {
359 is_time_series = false;
360 break;
361 }
362 } else {
363 is_time_series = false;
364 break;
365 }
366 }
367
368 if is_time_series {
369 return SemanticType::TimeSeries {
370 timestamp_field: "timestamp".to_string(),
371 value_fields: SmallVec::from_vec(vec!["value".to_string()]),
372 interval_ms: None,
373 };
374 }
375 }
376
377 if len >= 3
379 && arr.iter().all(|v| v.is_object())
380 && let Some(first_obj) = arr.first().and_then(|v| v.as_object())
381 {
382 let first_scan = crate::parser::simd::SimdClassifier::scan_object_keys(first_obj);
383
384 let is_tabular = arr.iter().skip(1).filter_map(|v| v.as_object()).all(|obj| {
386 let scan = crate::parser::simd::SimdClassifier::scan_object_keys(obj);
387 let diff = scan.key_count as i32 - first_scan.key_count as i32;
389 diff.abs() <= (first_scan.key_count as i32 / 5)
390 });
391
392 if is_tabular {
393 let columns: SmallVec<[crate::semantic::ColumnMeta; 16]> = first_obj
395 .iter()
396 .map(|(k, v)| {
397 let column_type = if v.is_number() {
398 crate::semantic::ColumnType::Numeric(NumericDType::F64)
399 } else if v.is_str() {
400 crate::semantic::ColumnType::String
401 } else if v.as_bool().is_some() {
402 crate::semantic::ColumnType::Boolean
403 } else {
404 crate::semantic::ColumnType::Json
405 };
406
407 crate::semantic::ColumnMeta {
408 name: k.to_string(),
409 dtype: column_type,
410 nullable: false,
411 }
412 })
413 .collect();
414
415 return SemanticType::Table {
416 columns: Box::new(columns),
417 row_count: Some(len),
418 };
419 }
420 }
421
422 SemanticType::Generic
423 }
424}
425
426pub struct LazyFrame<'a> {
428 frame: Frame,
429 #[allow(dead_code)] parser: &'a SonicParser,
431}
432
433impl<'a> LazyFrame<'a> {
434 pub fn frame(&self) -> &Frame {
436 &self.frame
437 }
438}
439
440impl Default for SonicParser {
441 fn default() -> Self {
442 Self::new()
443 }
444}
445
446#[cfg(test)]
447mod tests {
448 use super::*;
449
450 #[test]
451 fn test_sonic_parser_creation() {
452 let parser = SonicParser::new();
453 assert!(parser.config.detect_semantics);
454 assert_eq!(parser.config.max_input_size, 100 * 1024 * 1024);
455 }
456
457 #[test]
458 fn test_sonic_basic_parsing() {
459 let parser = SonicParser::new();
460 let json = br#"{"name": "test", "value": 42}"#;
461
462 let result = parser.parse(json);
463 assert!(result.is_ok());
464
465 let frame = result.unwrap();
466 assert_eq!(frame.header.version, 1);
467 assert_eq!(frame.payload.len(), json.len());
468 }
469
470 #[test]
471 fn test_sonic_numeric_array_detection() {
472 let parser = SonicParser::new();
473 let json = br#"[1.5, 2.7, 3.14, 4.2, 5.1]"#;
474
475 let result = parser.parse(json).unwrap();
476 if let Some(semantics) = result.semantics {
477 assert!(matches!(
478 semantics.semantic_type,
479 SemanticType::NumericArray { .. }
480 ));
481 } else {
482 panic!("Expected semantic metadata");
483 }
484 }
485
486 #[test]
487 fn test_sonic_time_series_detection() {
488 let parser = SonicParser::new();
489 let json = br#"[
490 {"timestamp": "2023-01-01T00:00:00Z", "value": 1.5},
491 {"timestamp": "2023-01-01T00:01:00Z", "value": 2.3}
492 ]"#;
493
494 let result = parser.parse(json).unwrap();
495 if let Some(semantics) = result.semantics {
496 assert!(matches!(
497 semantics.semantic_type,
498 SemanticType::TimeSeries { .. }
499 ));
500 } else {
501 panic!("Expected semantic metadata");
502 }
503 }
504
505 #[test]
506 fn test_sonic_performance_config() {
507 let config = SonicConfig {
508 detect_semantics: false,
509 max_input_size: 1024,
510 };
511
512 let parser = SonicParser::with_config(config);
513 assert!(!parser.config.detect_semantics);
514 assert_eq!(parser.config.max_input_size, 1024);
515 }
516
517 #[test]
518 fn test_sonic_invalid_utf8_handling() {
519 let parser = SonicParser::new();
520 let invalid_utf8 = &[0xFF, 0xFE, 0xFD];
522
523 let result = parser.parse(invalid_utf8);
524 assert!(result.is_err());
525
526 let error_msg = result.unwrap_err().to_string();
527 assert!(error_msg.contains("Invalid UTF-8"));
528 }
529
530 #[test]
531 fn test_sonic_input_size_limit() {
532 let config = SonicConfig {
533 detect_semantics: true,
534 max_input_size: 10, };
536 let parser = SonicParser::with_config(config);
537
538 let large_json = b"[1,2,3,4,5,6,7,8,9,10]"; let result = parser.parse(large_json);
540
541 assert!(result.is_err());
542 let error_msg = result.unwrap_err().to_string();
543 assert!(error_msg.contains("Input size") || error_msg.contains("Input too large"));
544 }
545
546 #[test]
547 fn test_sonic_json_depth_validation() {
548 let parser = SonicParser::new();
549
550 let mut json = String::new();
552 for _ in 0..65 {
554 json.push('{');
555 json.push_str("\"a\":");
556 }
557 json.push_str("\"value\"");
558 for _ in 0..65 {
559 json.push('}');
560 }
561
562 let result = parser.parse(json.as_bytes());
563 assert!(result.is_err());
564 let error_msg = result.unwrap_err().to_string();
565 assert!(error_msg.contains("depth"));
566 }
567
568 #[test]
569 fn test_sonic_large_string_validation() {
570 let parser = SonicParser::new();
571
572 let large_string = "a".repeat(11 * 1024 * 1024); let json = format!("{{\"key\": \"{}\"}}", large_string);
575
576 let result = parser.parse(json.as_bytes());
577 assert!(result.is_err());
578 let error_msg = result.unwrap_err().to_string();
579 assert!(error_msg.contains("String length"));
580 }
581
582 #[test]
583 fn test_sonic_large_array_validation() {
584 let parser = SonicParser::new();
585
586 let mut json = String::from("[");
588 let _max_elements = 1_000_000 + 1; for i in 0..1001 {
592 if i > 0 {
594 json.push(',');
595 }
596 json.push_str(&i.to_string());
597 }
598 json.push(']');
599
600 let result = parser.parse(json.as_bytes());
602 assert!(result.is_ok());
603 }
604
605 #[test]
606 fn test_sonic_many_object_keys_validation() {
607 let parser = SonicParser::new();
608
609 let mut json = String::from("{");
611 for i in 0..1000 {
612 if i > 0 {
614 json.push(',');
615 }
616 json.push_str(&format!("\"key{}\": {}", i, i));
617 }
618 json.push('}');
619
620 let result = parser.parse(json.as_bytes());
621 assert!(result.is_ok());
622 }
623}