1use hedl_core::convert::parse_reference;
48use hedl_core::lex::Tensor;
49use hedl_core::lex::{parse_expression_token, singularize_and_capitalize};
50use hedl_core::{Item, MatrixList, Node, Value};
51use quick_xml::events::Event;
52use quick_xml::Reader;
53use std::collections::BTreeMap;
54use std::io::{BufRead, BufReader, Read};
55
56pub use crate::from_xml::EntityPolicy;
58
59#[derive(Debug, Clone)]
61pub struct StreamConfig {
62 pub buffer_size: usize,
64 pub max_recursion_depth: usize,
66 pub max_batch_size: usize,
68 pub default_type_name: String,
70 pub version: (u32, u32),
72 pub infer_lists: bool,
74
75 pub entity_policy: EntityPolicy,
77
78 pub log_security_events: bool,
80}
81
82#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
84pub struct StreamPosition {
85 pub byte_offset: u64,
87 pub items_parsed: usize,
89}
90
91impl Default for StreamConfig {
92 fn default() -> Self {
93 Self {
94 buffer_size: 65536, max_recursion_depth: 100,
96 max_batch_size: 1000,
97 default_type_name: "Item".to_string(),
98 version: (1, 0),
99 infer_lists: true,
100 entity_policy: EntityPolicy::default(),
101 log_security_events: false,
102 }
103 }
104}
105
106pub struct XmlStreamingParser<R: Read> {
111 reader: Reader<BufReader<R>>,
112 config: StreamConfig,
113 root_element_name: String,
114 root_parsed: bool,
115 exhausted: bool,
116 buf: Vec<u8>,
117 position: StreamPosition,
119}
120
121#[derive(Debug, Clone)]
123pub struct StreamItem {
124 pub key: String,
126 pub value: Item,
128}
129
130impl<R: Read> XmlStreamingParser<R> {
131 pub fn new(reader: R, config: StreamConfig) -> Result<Self, String> {
133 let buf_reader = BufReader::with_capacity(config.buffer_size, reader);
134 let xml_reader = Reader::from_reader(buf_reader);
135
136 Ok(XmlStreamingParser {
137 reader: xml_reader,
138 config,
139 root_element_name: String::new(),
140 root_parsed: false,
141 exhausted: false,
142 buf: Vec::with_capacity(8192),
143 position: StreamPosition::default(),
144 })
145 }
146
147 #[inline]
152 pub fn position(&self) -> StreamPosition {
153 StreamPosition {
154 byte_offset: self.reader.buffer_position(),
155 items_parsed: self.position.items_parsed,
156 }
157 }
158
159 #[inline]
161 pub fn bytes_processed(&self) -> u64 {
162 self.reader.buffer_position()
163 }
164
165 #[inline]
167 pub fn items_parsed(&self) -> usize {
168 self.position.items_parsed
169 }
170
171 fn find_root(&mut self) -> Result<bool, String> {
173 loop {
174 self.buf.clear();
175 match self.reader.read_event_into(&mut self.buf) {
176 Ok(Event::DocType(_e)) => {
177 if self.config.log_security_events {
178 eprintln!(
179 "[SECURITY] DTD detected in streaming XML at position {}",
180 self.reader.buffer_position()
181 );
182 }
183
184 if self.config.entity_policy == EntityPolicy::RejectDtd {
185 return Err(format!(
186 "DOCTYPE declaration rejected at position {} (XXE prevention)",
187 self.reader.buffer_position()
188 ));
189 }
190 }
192 Ok(Event::Start(e)) | Ok(Event::Empty(e)) => {
193 self.root_element_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
194 self.root_parsed = true;
195 return Ok(true);
196 }
197 Ok(Event::Eof) => return Ok(false),
198 Err(e) => {
199 return Err(format!(
200 "XML parse error at position {}: {}",
201 self.reader.buffer_position(),
202 e
203 ))
204 }
205 _ => {}
206 }
207 }
208 }
209
210 fn parse_next_root_element(&mut self) -> Result<Option<StreamItem>, String> {
212 loop {
213 self.buf.clear();
214 match self.reader.read_event_into(&mut self.buf) {
215 Ok(Event::Start(e)) => {
216 let raw_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
217 let name = to_hedl_key(&raw_name);
218 let elem_owned = e.to_owned();
219
220 let item = parse_element(&mut self.reader, &elem_owned, &self.config, 1)?;
221 return Ok(Some(StreamItem {
222 key: name,
223 value: item,
224 }));
225 }
226 Ok(Event::Empty(e)) => {
227 let raw_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
228 let name = to_hedl_key(&raw_name);
229 let elem_owned = e.to_owned();
230
231 let item = parse_empty_element(&elem_owned, &self.config)?;
232 return Ok(Some(StreamItem {
233 key: name,
234 value: item,
235 }));
236 }
237 Ok(Event::End(e)) => {
238 let end_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
239 if end_name == self.root_element_name {
240 return Ok(None); }
242 }
243 Ok(Event::Eof) => return Ok(None),
244 Err(e) => {
245 return Err(format!(
246 "XML parse error at position {}: {}",
247 self.reader.buffer_position(),
248 e
249 ))
250 }
251 _ => {}
252 }
253 }
254 }
255}
256
257impl<R: Read> Iterator for XmlStreamingParser<R> {
258 type Item = Result<StreamItem, String>;
259
260 fn next(&mut self) -> Option<Self::Item> {
261 if self.exhausted {
262 return None;
263 }
264
265 if !self.root_parsed {
267 match self.find_root() {
268 Ok(true) => {
269 }
271 Ok(false) => {
272 self.exhausted = true;
273 return None;
274 }
275 Err(e) => {
276 self.exhausted = true;
277 return Some(Err(e));
278 }
279 }
280 }
281
282 match self.parse_next_root_element() {
284 Ok(Some(item)) => {
285 self.position.items_parsed += 1;
286 Some(Ok(item))
287 }
288 Ok(None) => {
289 self.exhausted = true;
290 None
291 }
292 Err(e) => {
293 self.exhausted = true;
294 Some(Err(e))
295 }
296 }
297 }
298}
299
300pub fn from_xml_stream<R: Read>(
325 reader: R,
326 config: &StreamConfig,
327) -> Result<XmlStreamingParser<R>, String> {
328 XmlStreamingParser::new(reader, config.clone())
329}
330
331fn parse_element<R>(
336 reader: &mut Reader<R>,
337 elem: &quick_xml::events::BytesStart<'_>,
338 config: &StreamConfig,
339 depth: usize,
340) -> Result<Item, String>
341where
342 R: BufRead,
343{
344 if depth > config.max_recursion_depth {
345 return Err(format!(
346 "XML recursion depth exceeded (max: {})",
347 config.max_recursion_depth
348 ));
349 }
350
351 let name = String::from_utf8_lossy(elem.name().as_ref()).to_string();
352
353 let mut attributes = BTreeMap::new();
355 let mut is_reference = false;
356 for attr in elem.attributes().flatten() {
357 let raw_key = String::from_utf8_lossy(attr.key.as_ref()).to_string();
358 let value = String::from_utf8_lossy(&attr.value).to_string();
359
360 if raw_key == "__hedl_type__" {
361 if value == "ref" {
362 is_reference = true;
363 }
364 continue;
365 }
366
367 let key = to_hedl_key(&raw_key);
368 attributes.insert(key, value);
369 }
370
371 let mut text_content = String::new();
373 let mut child_elements: BTreeMap<String, Vec<Item>> = BTreeMap::new();
374 let mut has_children = false;
375 let mut buf = Vec::new();
376
377 loop {
378 buf.clear();
379 match reader.read_event_into(&mut buf) {
380 Ok(Event::Start(e)) => {
381 has_children = true;
382 let raw_child_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
383 let child_name = to_hedl_key(&raw_child_name);
384 let elem_owned = e.to_owned();
385 let child_item = parse_element(reader, &elem_owned, config, depth + 1)?;
386
387 child_elements
388 .entry(child_name)
389 .or_default()
390 .push(child_item);
391 }
392 Ok(Event::Empty(e)) => {
393 has_children = true;
394 let raw_child_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
395 let child_name = to_hedl_key(&raw_child_name);
396 let elem_owned = e.to_owned();
397 let child_item = parse_empty_element(&elem_owned, config)?;
398
399 child_elements
400 .entry(child_name)
401 .or_default()
402 .push(child_item);
403 }
404 Ok(Event::Text(e)) => {
405 let content = e
406 .xml_content()
407 .map_err(|e| format!("Text decode error: {}", e))?;
408 text_content.push_str(&content);
409 }
410 Ok(Event::GeneralRef(e)) => {
411 let ref_name = e.decode().map_err(|e| format!("Ref decode error: {}", e))?;
413 let unescaped = match ref_name.as_ref() {
414 "amp" => "&",
415 "lt" => "<",
416 "gt" => ">",
417 "quot" => "\"",
418 "apos" => "'",
419 _ => return Err(format!("Unknown entity reference: {}", ref_name)),
420 };
421 text_content.push_str(unescaped);
422 }
423 Ok(Event::End(e)) => {
424 let end_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
425 if end_name == name {
426 break;
427 }
428 }
429 Ok(Event::Eof) => break,
430 Err(e) => return Err(format!("XML parse error: {}", e)),
431 _ => {}
432 }
433 }
434
435 if has_children {
437 let mut result_children = BTreeMap::new();
438 for (child_name, items) in child_elements {
439 if items.len() > 1 && config.infer_lists {
440 if child_name == "item" && items_are_tensor_elements(&items) {
441 let tensor = items_to_tensor(&items)?;
442 result_children
443 .insert(child_name, Item::Scalar(Value::Tensor(Box::new(tensor))));
444 } else {
445 let list = items_to_matrix_list(&child_name, items, config)?;
446 result_children.insert(child_name, Item::List(list));
447 }
448 } else if let Some(item) = items.into_iter().next() {
449 result_children.insert(child_name, item);
450 }
451 }
452
453 if result_children.len() == 1 {
455 let (child_key, child_item) = result_children.iter().next().expect("len == 1");
457 if let Item::List(list) = child_item {
458 let has_nested_children = list
459 .rows
460 .iter()
461 .any(|node| node.children().map(|c| !c.is_empty()).unwrap_or(false));
462 if !has_nested_children {
463 let parent_singular =
464 singularize_and_capitalize(&to_hedl_key(&name)).to_lowercase();
465 let child_type = singularize_and_capitalize(child_key).to_lowercase();
466 if parent_singular == child_type {
467 return Ok(result_children.into_values().next().expect("len == 1"));
469 }
470 }
471 }
472 }
473
474 Ok(Item::Object(result_children))
475 } else if !text_content.trim().is_empty() {
476 let value = if is_reference {
477 Value::Reference(parse_reference(text_content.trim())?)
478 } else {
479 parse_value_with_config(&text_content, config)?
480 };
481 Ok(Item::Scalar(value))
482 } else if !attributes.is_empty() {
483 let mut obj = BTreeMap::new();
484 for (key, value_str) in attributes {
485 let value = parse_value_with_config(&value_str, config)?;
486 obj.insert(key, Item::Scalar(value));
487 }
488 Ok(Item::Object(obj))
489 } else {
490 Ok(Item::Scalar(Value::Null))
491 }
492}
493
494fn parse_empty_element(
495 elem: &quick_xml::events::BytesStart<'_>,
496 config: &StreamConfig,
497) -> Result<Item, String> {
498 let mut attributes = BTreeMap::new();
499
500 for attr in elem.attributes().flatten() {
501 let raw_key = String::from_utf8_lossy(attr.key.as_ref()).to_string();
502 let key = to_hedl_key(&raw_key);
503 let value = String::from_utf8_lossy(&attr.value).to_string();
504 attributes.insert(key, value);
505 }
506
507 if attributes.is_empty() {
508 Ok(Item::Scalar(Value::Null))
509 } else if attributes.len() == 1 && attributes.contains_key("value") {
510 let value_str = attributes.get("value").expect("key exists");
512 let value = parse_value_with_config(value_str, config)?;
513 Ok(Item::Scalar(value))
514 } else {
515 let mut obj = BTreeMap::new();
516 for (key, value_str) in attributes {
517 let value = parse_value_with_config(&value_str, config)?;
518 obj.insert(key, Item::Scalar(value));
519 }
520 Ok(Item::Object(obj))
521 }
522}
523
524fn parse_value_with_config(s: &str, config: &StreamConfig) -> Result<Value, String> {
525 let trimmed = s.trim();
526
527 if trimmed.contains('&') && trimmed.contains(';') {
529 if config.log_security_events {
530 eprintln!("[SECURITY] Entity reference detected in value: {}", trimmed);
531 }
532
533 if (trimmed.contains("&xxe;")
535 || trimmed.contains("&file;")
536 || trimmed.contains("&passwd;")
537 || trimmed.contains("&secret;"))
538 && config.entity_policy == EntityPolicy::WarnOnEntities
539 {
540 eprintln!(
541 "[WARNING] Suspicious entity reference detected: {}",
542 trimmed
543 );
544 }
545 }
546
547 if trimmed.is_empty() {
548 return Ok(Value::Null);
549 }
550
551 if trimmed.starts_with("$(") && trimmed.ends_with(')') {
552 let expr =
553 parse_expression_token(trimmed).map_err(|e| format!("Invalid expression: {}", e))?;
554 return Ok(Value::Expression(Box::new(expr)));
555 }
556
557 if trimmed == "true" {
558 return Ok(Value::Bool(true));
559 }
560 if trimmed == "false" {
561 return Ok(Value::Bool(false));
562 }
563
564 if let Ok(i) = trimmed.parse::<i64>() {
565 return Ok(Value::Int(i));
566 }
567 if let Ok(f) = trimmed.parse::<f64>() {
568 return Ok(Value::Float(f));
569 }
570
571 Ok(Value::String(trimmed.to_string().into()))
572}
573
574#[cfg(test)]
575fn parse_value(s: &str) -> Result<Value, String> {
576 let config = StreamConfig::default();
578 parse_value_with_config(s, &config)
579}
580
581fn items_to_matrix_list(
582 name: &str,
583 items: Vec<Item>,
584 _config: &StreamConfig,
585) -> Result<MatrixList, String> {
586 let type_name = singularize_and_capitalize(name);
587 let schema = infer_schema(&items)?;
588
589 let mut rows = Vec::new();
590 for (idx, item) in items.into_iter().enumerate() {
591 let node = item_to_node(&type_name, &schema, item, idx)?;
592 rows.push(node);
593 }
594
595 Ok(MatrixList {
596 type_name,
597 schema,
598 rows,
599 count_hint: None,
600 })
601}
602
603fn infer_schema(items: &[Item]) -> Result<Vec<String>, String> {
604 if let Some(Item::Object(first_obj)) = items.first() {
605 let mut keys: Vec<_> = first_obj
606 .iter()
607 .filter(|(_, item)| matches!(item, Item::Scalar(_)))
608 .map(|(k, _)| k.clone())
609 .collect();
610 keys.sort();
611
612 if let Some(pos) = keys.iter().position(|k| k == "id") {
613 keys.remove(pos);
614 keys.insert(0, "id".to_string());
615 } else {
616 keys.insert(0, "id".to_string());
617 }
618
619 Ok(keys)
620 } else {
621 Ok(vec!["id".to_string(), "value".to_string()])
622 }
623}
624
625fn item_to_node(
626 type_name: &str,
627 schema: &[String],
628 item: Item,
629 idx: usize,
630) -> Result<Node, String> {
631 match item {
632 Item::Object(obj) => {
633 let id = obj
634 .get(&schema[0])
635 .and_then(|i| i.as_scalar())
636 .and_then(|v| v.as_str())
637 .map(|s| s.to_string())
638 .unwrap_or_else(|| format!("{}", idx));
639
640 let mut fields = Vec::new();
641 for col in schema {
642 let value = obj
643 .get(col)
644 .and_then(|i| i.as_scalar())
645 .cloned()
646 .unwrap_or(Value::Null);
647 fields.push(value);
648 }
649
650 let mut children: BTreeMap<String, Vec<Node>> = BTreeMap::new();
651 for child_item in obj.values() {
652 if let Item::List(child_list) = child_item {
653 children.insert(child_list.type_name.clone(), child_list.rows.clone());
654 }
655 }
656
657 Ok(Node {
658 type_name: type_name.to_string(),
659 id,
660 fields: fields.into(),
661 children: if children.is_empty() {
662 None
663 } else {
664 Some(Box::new(children))
665 },
666 child_count: 0,
667 })
668 }
669 Item::Scalar(value) => {
670 let id = format!("{}", idx);
671 Ok(Node {
672 type_name: type_name.to_string(),
673 id: id.clone(),
674 fields: vec![Value::String(id.into()), value].into(),
675 children: None,
676 child_count: 0,
677 })
678 }
679 Item::List(_) => Err("Cannot convert nested list to node".to_string()),
680 }
681}
682
683fn to_hedl_key(s: &str) -> String {
684 let mut result = String::with_capacity(s.len() + 4); let mut prev_was_upper = false;
686 let mut prev_was_underscore = false;
687
688 for (i, c) in s.chars().enumerate() {
689 if c.is_ascii_uppercase() {
690 if i > 0 && !prev_was_upper && !prev_was_underscore {
692 result.push('_');
693 }
694 result.push(c.to_ascii_lowercase());
695 prev_was_upper = true;
696 prev_was_underscore = false;
697 } else if c == '_' {
698 if !prev_was_underscore && !result.is_empty() {
700 result.push(c);
701 }
702 prev_was_underscore = true;
703 prev_was_upper = false;
704 } else {
705 result.push(c);
706 prev_was_upper = false;
707 prev_was_underscore = false;
708 }
709 }
710
711 while result.ends_with('_') {
713 result.pop();
714 }
715
716 result
717}
718
719fn items_are_tensor_elements(items: &[Item]) -> bool {
720 items.iter().all(|item| match item {
721 Item::Scalar(Value::Int(_)) => true,
722 Item::Scalar(Value::Float(_)) => true,
723 Item::Scalar(Value::Tensor(_)) => true,
724 Item::Object(obj) if obj.len() == 1 => {
725 matches!(obj.get("item"), Some(Item::Scalar(Value::Tensor(_))))
726 }
727 _ => false,
728 })
729}
730
731fn items_to_tensor(items: &[Item]) -> Result<Tensor, String> {
732 let mut tensor_items = Vec::new();
733
734 for item in items {
735 let tensor = match item {
736 Item::Scalar(Value::Int(n)) => Tensor::Scalar(*n as f64),
737 Item::Scalar(Value::Float(f)) => Tensor::Scalar(*f),
738 Item::Scalar(Value::Tensor(t)) => (**t).clone(),
739 Item::Object(obj) if obj.len() == 1 => {
740 if let Some(Item::Scalar(Value::Tensor(t))) = obj.get("item") {
741 (**t).clone()
742 } else {
743 return Err("Cannot convert non-numeric item to tensor".to_string());
744 }
745 }
746 _ => return Err("Cannot convert non-numeric item to tensor".to_string()),
747 };
748 tensor_items.push(tensor);
749 }
750
751 Ok(Tensor::Array(tensor_items))
752}
753
754#[cfg(test)]
755mod tests {
756 use super::*;
757
758 #[test]
759 fn test_stream_config_default() {
760 let config = StreamConfig::default();
761 assert_eq!(config.buffer_size, 65536);
762 assert_eq!(config.max_recursion_depth, 100);
763 assert_eq!(config.max_batch_size, 1000);
764 assert_eq!(config.default_type_name, "Item");
765 assert_eq!(config.version, (1, 0));
766 assert!(config.infer_lists);
767 }
768
769 #[test]
770 fn test_stream_config_custom() {
771 let config = StreamConfig {
772 buffer_size: 131072,
773 max_recursion_depth: 50,
774 max_batch_size: 500,
775 default_type_name: "CustomItem".to_string(),
776 version: (2, 0),
777 infer_lists: false,
778 entity_policy: EntityPolicy::RejectDtd,
779 log_security_events: true,
780 };
781 assert_eq!(config.buffer_size, 131072);
782 assert_eq!(config.max_recursion_depth, 50);
783 assert_eq!(config.max_batch_size, 500);
784 assert_eq!(config.default_type_name, "CustomItem");
785 assert_eq!(config.version, (2, 0));
786 assert!(!config.infer_lists);
787 }
788
789 #[test]
790 fn test_stream_item_construction() {
791 let item = StreamItem {
792 key: "test".to_string(),
793 value: Item::Scalar(Value::String("value".to_string().into())),
794 };
795 assert_eq!(item.key, "test");
796 assert_eq!(
797 item.value.as_scalar(),
798 Some(&Value::String("value".to_string().into()))
799 );
800 }
801
802 #[test]
803 fn test_parse_value_string() {
804 assert_eq!(
805 parse_value("hello"),
806 Ok(Value::String("hello".to_string().into()))
807 );
808 }
809
810 #[test]
811 fn test_parse_value_bool() {
812 assert_eq!(parse_value("true"), Ok(Value::Bool(true)));
813 assert_eq!(parse_value("false"), Ok(Value::Bool(false)));
814 }
815
816 #[test]
817 fn test_parse_value_int() {
818 assert_eq!(parse_value("42"), Ok(Value::Int(42)));
819 }
820
821 #[test]
822 fn test_parse_value_float() {
823 match parse_value("4.56") {
824 Ok(Value::Float(f)) => assert!((f - 4.56).abs() < 0.001),
825 _ => panic!("Expected float"),
826 }
827 }
828
829 #[test]
830 fn test_parse_value_null() {
831 assert_eq!(parse_value(""), Ok(Value::Null));
832 assert_eq!(parse_value(" "), Ok(Value::Null));
833 }
834
835 #[test]
836 fn test_to_hedl_key_pascal_case() {
837 assert_eq!(to_hedl_key("Category"), "category");
838 assert_eq!(to_hedl_key("UserPost"), "user_post");
839 }
840
841 #[test]
842 fn test_to_hedl_key_lowercase() {
843 assert_eq!(to_hedl_key("users"), "users");
844 }
845
846 #[test]
847 fn test_infer_schema_from_objects() {
848 let items = vec![Item::Object({
849 let mut m = BTreeMap::new();
850 m.insert(
851 "id".to_string(),
852 Item::Scalar(Value::String("1".to_string().into())),
853 );
854 m.insert(
855 "name".to_string(),
856 Item::Scalar(Value::String("Alice".to_string().into())),
857 );
858 m
859 })];
860 let schema = infer_schema(&items).unwrap();
861 assert!(schema.contains(&"id".to_string()));
862 assert!(schema.contains(&"name".to_string()));
863 }
864
865 #[test]
866 fn test_items_are_tensor_elements_numeric() {
867 let items = vec![
868 Item::Scalar(Value::Int(1)),
869 Item::Scalar(Value::Float(2.0)),
870 Item::Scalar(Value::Int(3)),
871 ];
872 assert!(items_are_tensor_elements(&items));
873 }
874
875 #[test]
876 fn test_items_are_tensor_elements_non_numeric() {
877 let items = vec![
878 Item::Scalar(Value::Int(1)),
879 Item::Scalar(Value::String("hello".to_string().into())),
880 ];
881 assert!(!items_are_tensor_elements(&items));
882 }
883}