1use crate::{
8 error::{Error, Result},
9 frame::Frame,
10 frame::{FrameFlags, FrameHeader},
11 security::{DepthTracker, SecurityValidator},
12 semantic::{NumericDType, SemanticMeta, SemanticType},
13};
14use bytes::Bytes;
15use smallvec::SmallVec;
16use sonic_rs::{JsonContainerTrait, JsonNumberTrait, JsonValueTrait, Value as SonicValue};
17
18#[inline(always)]
20fn unlikely(b: bool) -> bool {
21 b
22}
23
24#[derive(Debug, Clone)]
26pub struct SonicConfig {
27 pub detect_semantics: bool,
29 pub max_input_size: usize,
31}
32
33impl Default for SonicConfig {
34 fn default() -> Self {
35 Self {
36 detect_semantics: true,
37 max_input_size: 100 * 1024 * 1024, }
39 }
40}
41
42pub struct SonicParser {
44 config: SonicConfig,
45 validator: SecurityValidator,
46 stats: std::cell::RefCell<SonicStats>,
47}
48
49#[derive(Debug, Default, Clone)]
51pub struct SonicStats {
52 pub total_parses: u64,
53 pub sonic_successes: u64,
54 pub serde_fallbacks: u64,
55 pub avg_parse_time_ns: u64,
56 pub bytes_processed: u64,
57}
58
59impl SonicParser {
60 pub fn new() -> Self {
62 Self {
63 config: SonicConfig::default(),
64 validator: SecurityValidator::default(),
65 stats: std::cell::RefCell::new(SonicStats::default()),
66 }
67 }
68
69 pub fn with_config(config: SonicConfig) -> Self {
71 Self {
72 config,
73 validator: SecurityValidator::default(),
74 stats: std::cell::RefCell::new(SonicStats::default()),
75 }
76 }
77
78 pub fn with_security_config(
80 config: SonicConfig,
81 security_config: crate::config::SecurityConfig,
82 ) -> Self {
83 Self {
84 config,
85 validator: SecurityValidator::new(security_config),
86 stats: std::cell::RefCell::new(SonicStats::default()),
87 }
88 }
89
90 pub fn parse(&self, input: &[u8]) -> Result<Frame> {
92 let start_time = std::time::Instant::now();
93
94 self.validator.validate_input_size(input.len())?;
96
97 if unlikely(input.len() > self.config.max_input_size) {
99 return Err(Error::Other(format!("Input too large: {}", input.len())));
100 }
101
102 let json_str = std::str::from_utf8(input)
104 .map_err(|e| Error::Other(format!("Invalid UTF-8 input: {}", e)))?;
105
106 self.pre_validate_json_string(json_str)?;
108
109 let value: SonicValue =
111 sonic_rs::from_str(json_str).map_err(|e| Error::invalid_json(0, e.to_string()))?;
112
113 self.validate_json_structure(&value)?;
115
116 let semantic_type = if self.config.detect_semantics && input.len() < 100_000 {
118 self.detect_semantic_type_sonic(&value)
119 } else {
120 SemanticType::Generic
121 };
122
123 let payload = if input.len() < 4096 {
125 Bytes::copy_from_slice(input)
127 } else {
128 Bytes::from(input.to_vec()) };
131
132 let header = FrameHeader {
134 version: 1,
135 flags: FrameFlags::empty(),
136 sequence: 0,
137 length: input.len() as u32,
138 schema_id: 0,
139 checksum: 0,
140 };
141
142 let semantics = if semantic_type != SemanticType::Generic {
143 Some(SemanticMeta::new(semantic_type))
144 } else {
145 None
146 };
147
148 {
150 let mut stats = self.stats.borrow_mut();
151 stats.total_parses += 1;
152 stats.sonic_successes += 1;
153 stats.bytes_processed += input.len() as u64;
154
155 let elapsed_ns = start_time.elapsed().as_nanos() as u64;
156 stats.avg_parse_time_ns = (stats.avg_parse_time_ns * (stats.total_parses - 1)
157 + elapsed_ns)
158 / stats.total_parses;
159 }
160
161 Ok(Frame {
162 header,
163 payload,
164 semantics,
165 })
166 }
167
168 pub fn get_stats(&self) -> SonicStats {
170 self.stats.borrow().clone()
171 }
172
173 fn pre_validate_json_string(&self, json_str: &str) -> Result<()> {
175 let mut depth = 0;
177 let mut max_depth = 0;
178
179 for ch in json_str.chars() {
180 match ch {
181 '{' | '[' => {
182 depth += 1;
183 max_depth = max_depth.max(depth);
184 self.validator.validate_json_depth(max_depth)?;
185 }
186 '}' | ']' => {
187 depth = depth.saturating_sub(1);
188 }
189 _ => {}
190 }
191 }
192
193 Ok(())
194 }
195
196 fn validate_json_structure(&self, value: &SonicValue) -> Result<()> {
198 let mut depth_tracker = DepthTracker::default();
199 self.validate_json_recursive(value, &mut depth_tracker)
200 }
201
202 fn validate_json_recursive(
204 &self,
205 value: &SonicValue,
206 depth_tracker: &mut DepthTracker,
207 ) -> Result<()> {
208 match value {
209 _ if value.is_object() => {
210 depth_tracker.enter()?;
211
212 if let Some(obj) = value.as_object() {
213 self.validator.validate_object_keys(obj.len())?;
215
216 for (key, val) in obj.iter() {
218 self.validator.validate_string_length(key.len())?;
220 self.validate_json_recursive(val, depth_tracker)?;
221 }
222 }
223
224 depth_tracker.exit();
225 }
226 _ if value.is_array() => {
227 depth_tracker.enter()?;
228
229 if let Some(arr) = value.as_array() {
230 self.validator.validate_array_length(arr.len())?;
232
233 for element in arr.iter() {
235 self.validate_json_recursive(element, depth_tracker)?;
236 }
237 }
238
239 depth_tracker.exit();
240 }
241 _ if value.is_str() => {
242 if let Some(s) = value.as_str() {
243 self.validator.validate_string_length(s.len())?;
244 }
245 }
246 _ => {
247 }
249 }
250
251 Ok(())
252 }
253
254 fn detect_semantic_type_sonic(&self, value: &SonicValue) -> SemanticType {
256 if value.is_array()
257 && let Some(arr) = value.as_array()
258 {
259 return self.analyze_array_semantics_simd(arr);
260 }
261
262 if value.is_object()
263 && let Some(obj) = value.as_object()
264 {
265 return self.analyze_object_semantics_simd(obj);
266 }
267
268 SemanticType::Generic
269 }
270
271 fn analyze_object_semantics_simd(&self, obj: &sonic_rs::Object) -> SemanticType {
273 let scan_result = crate::parser::simd::SimdClassifier::scan_object_keys(obj);
274
275 if scan_result.has_type_field && scan_result.has_coordinates {
277 return SemanticType::Geospatial {
278 coordinate_system: "WGS84".to_string(),
279 geometry_type: obj
280 .get(&"type")
281 .and_then(|v| v.as_str())
282 .unwrap_or("Point")
283 .to_string(),
284 };
285 }
286
287 if scan_result.has_timestamp {
289 let timestamp_field = if obj.contains_key(&"timestamp") {
290 "timestamp"
291 } else {
292 "time"
293 };
294
295 let value_fields: SmallVec<[String; 4]> = obj
297 .iter()
298 .filter_map(|(k, v)| {
299 if k != timestamp_field && v.is_number() {
300 Some(k.to_string())
301 } else {
302 None
303 }
304 })
305 .collect();
306
307 if !value_fields.is_empty() {
308 return SemanticType::TimeSeries {
309 timestamp_field: timestamp_field.to_string(),
310 value_fields,
311 interval_ms: None,
312 };
313 }
314 }
315
316 SemanticType::Generic
317 }
318
319 fn analyze_array_semantics_simd(&self, arr: &sonic_rs::Array) -> SemanticType {
321 let len = arr.len();
322 if len == 0 {
323 return SemanticType::Generic;
324 }
325
326 if crate::parser::simd::SimdClassifier::is_numeric_array(arr) {
328 let dtype = if let Some(first) = arr.first() {
329 if let Some(num) = first.as_number() {
330 if num.is_i64() {
331 NumericDType::I64
332 } else if num.is_u64() {
333 NumericDType::U64
334 } else {
335 NumericDType::F64
336 }
337 } else {
338 NumericDType::F64
339 }
340 } else {
341 NumericDType::F64
342 };
343
344 return SemanticType::NumericArray {
345 dtype,
346 length: Some(len),
347 };
348 }
349
350 if len >= 2 {
352 let mut is_time_series = true;
353
354 for value in arr.iter() {
356 if let Some(obj) = value.as_object() {
357 let scan_result = crate::parser::simd::SimdClassifier::scan_object_keys(obj);
358 if !scan_result.has_timestamp {
359 is_time_series = false;
360 break;
361 }
362 } else {
363 is_time_series = false;
364 break;
365 }
366 }
367
368 if is_time_series {
369 return SemanticType::TimeSeries {
370 timestamp_field: "timestamp".to_string(),
371 value_fields: SmallVec::from_vec(vec!["value".to_string()]),
372 interval_ms: None,
373 };
374 }
375 }
376
377 if len >= 3
379 && arr.iter().all(|v| v.is_object())
380 && let Some(first_obj) = arr.first().and_then(|v| v.as_object())
381 {
382 let first_scan = crate::parser::simd::SimdClassifier::scan_object_keys(first_obj);
383
384 let is_tabular = arr.iter().skip(1).filter_map(|v| v.as_object()).all(|obj| {
386 let scan = crate::parser::simd::SimdClassifier::scan_object_keys(obj);
387 let diff = scan.key_count as i32 - first_scan.key_count as i32;
389 diff.abs() <= (first_scan.key_count as i32 / 5)
390 });
391
392 if is_tabular {
393 let columns: SmallVec<[crate::semantic::ColumnMeta; 16]> = first_obj
395 .iter()
396 .map(|(k, v)| {
397 let column_type = if v.is_number() {
398 crate::semantic::ColumnType::Numeric(NumericDType::F64)
399 } else if v.is_str() {
400 crate::semantic::ColumnType::String
401 } else if v.as_bool().is_some() {
402 crate::semantic::ColumnType::Boolean
403 } else {
404 crate::semantic::ColumnType::Json
405 };
406
407 crate::semantic::ColumnMeta {
408 name: k.to_string(),
409 dtype: column_type,
410 nullable: false,
411 }
412 })
413 .collect();
414
415 return SemanticType::Table {
416 columns: Box::new(columns),
417 row_count: Some(len),
418 };
419 }
420 }
421
422 SemanticType::Generic
423 }
424}
425
426pub struct LazyFrame<'a> {
428 frame: Frame,
429 parser: &'a SonicParser,
430}
431
432impl<'a> LazyFrame<'a> {
433 pub fn frame(&self) -> &Frame {
435 &self.frame
436 }
437}
438
439impl Default for SonicParser {
440 fn default() -> Self {
441 Self::new()
442 }
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448
449 #[test]
450 fn test_sonic_parser_creation() {
451 let parser = SonicParser::new();
452 assert!(parser.config.detect_semantics);
453 assert_eq!(parser.config.max_input_size, 100 * 1024 * 1024);
454 }
455
456 #[test]
457 fn test_sonic_basic_parsing() {
458 let parser = SonicParser::new();
459 let json = br#"{"name": "test", "value": 42}"#;
460
461 let result = parser.parse(json);
462 assert!(result.is_ok());
463
464 let frame = result.unwrap();
465 assert_eq!(frame.header.version, 1);
466 assert_eq!(frame.payload.len(), json.len());
467 }
468
469 #[test]
470 fn test_sonic_numeric_array_detection() {
471 let parser = SonicParser::new();
472 let json = br#"[1.5, 2.7, 3.14, 4.2, 5.1]"#;
473
474 let result = parser.parse(json).unwrap();
475 if let Some(semantics) = result.semantics {
476 assert!(matches!(
477 semantics.semantic_type,
478 SemanticType::NumericArray { .. }
479 ));
480 } else {
481 panic!("Expected semantic metadata");
482 }
483 }
484
485 #[test]
486 fn test_sonic_time_series_detection() {
487 let parser = SonicParser::new();
488 let json = br#"[
489 {"timestamp": "2023-01-01T00:00:00Z", "value": 1.5},
490 {"timestamp": "2023-01-01T00:01:00Z", "value": 2.3}
491 ]"#;
492
493 let result = parser.parse(json).unwrap();
494 if let Some(semantics) = result.semantics {
495 assert!(matches!(
496 semantics.semantic_type,
497 SemanticType::TimeSeries { .. }
498 ));
499 } else {
500 panic!("Expected semantic metadata");
501 }
502 }
503
504 #[test]
505 fn test_sonic_performance_config() {
506 let config = SonicConfig {
507 detect_semantics: false,
508 max_input_size: 1024,
509 };
510
511 let parser = SonicParser::with_config(config);
512 assert!(!parser.config.detect_semantics);
513 assert_eq!(parser.config.max_input_size, 1024);
514 }
515
516 #[test]
517 fn test_sonic_invalid_utf8_handling() {
518 let parser = SonicParser::new();
519 let invalid_utf8 = &[0xFF, 0xFE, 0xFD];
521
522 let result = parser.parse(invalid_utf8);
523 assert!(result.is_err());
524
525 let error_msg = result.unwrap_err().to_string();
526 assert!(error_msg.contains("Invalid UTF-8"));
527 }
528
529 #[test]
530 fn test_sonic_input_size_limit() {
531 let config = SonicConfig {
532 detect_semantics: true,
533 max_input_size: 10, };
535 let parser = SonicParser::with_config(config);
536
537 let large_json = b"[1,2,3,4,5,6,7,8,9,10]"; let result = parser.parse(large_json);
539
540 assert!(result.is_err());
541 let error_msg = result.unwrap_err().to_string();
542 assert!(error_msg.contains("Input size") || error_msg.contains("Input too large"));
543 }
544
545 #[test]
546 fn test_sonic_json_depth_validation() {
547 let parser = SonicParser::new();
548
549 let mut json = String::new();
551 for _ in 0..65 {
553 json.push('{');
554 json.push_str("\"a\":");
555 }
556 json.push_str("\"value\"");
557 for _ in 0..65 {
558 json.push('}');
559 }
560
561 let result = parser.parse(json.as_bytes());
562 assert!(result.is_err());
563 let error_msg = result.unwrap_err().to_string();
564 assert!(error_msg.contains("depth"));
565 }
566
567 #[test]
568 fn test_sonic_large_string_validation() {
569 let parser = SonicParser::new();
570
571 let large_string = "a".repeat(11 * 1024 * 1024); let json = format!("{{\"key\": \"{}\"}}", large_string);
574
575 let result = parser.parse(json.as_bytes());
576 assert!(result.is_err());
577 let error_msg = result.unwrap_err().to_string();
578 assert!(error_msg.contains("String length"));
579 }
580
581 #[test]
582 fn test_sonic_large_array_validation() {
583 let parser = SonicParser::new();
584
585 let mut json = String::from("[");
587 let _max_elements = 1_000_000 + 1; for i in 0..1001 {
591 if i > 0 {
593 json.push(',');
594 }
595 json.push_str(&i.to_string());
596 }
597 json.push(']');
598
599 let result = parser.parse(json.as_bytes());
601 assert!(result.is_ok());
602 }
603
604 #[test]
605 fn test_sonic_many_object_keys_validation() {
606 let parser = SonicParser::new();
607
608 let mut json = String::from("{");
610 for i in 0..1000 {
611 if i > 0 {
613 json.push(',');
614 }
615 json.push_str(&format!("\"key{}\": {}", i, i));
616 }
617 json.push('}');
618
619 let result = parser.parse(json.as_bytes());
620 assert!(result.is_ok());
621 }
622}