1use hedl_core::convert::parse_reference;
48use hedl_core::lex::Tensor;
49use hedl_core::lex::{parse_expression_token, singularize_and_capitalize};
50use hedl_core::{Item, MatrixList, Node, Value};
51use quick_xml::events::Event;
52use quick_xml::Reader;
53use std::collections::BTreeMap;
54use std::io::{BufRead, BufReader, Read};
55
56pub use crate::from_xml::EntityPolicy;
58
59#[derive(Debug, Clone)]
61pub struct StreamConfig {
62 pub buffer_size: usize,
64 pub max_recursion_depth: usize,
66 pub max_batch_size: usize,
68 pub default_type_name: String,
70 pub version: (u32, u32),
72 pub infer_lists: bool,
74
75 pub entity_policy: EntityPolicy,
77
78 pub log_security_events: bool,
80}
81
82#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
84pub struct StreamPosition {
85 pub byte_offset: u64,
87 pub items_parsed: usize,
89}
90
91impl Default for StreamConfig {
92 fn default() -> Self {
93 Self {
94 buffer_size: 65536, max_recursion_depth: 100,
96 max_batch_size: 1000,
97 default_type_name: "Item".to_string(),
98 version: (1, 0),
99 infer_lists: true,
100 entity_policy: EntityPolicy::default(),
101 log_security_events: false,
102 }
103 }
104}
105
106pub struct XmlStreamingParser<R: Read> {
111 reader: Reader<BufReader<R>>,
112 config: StreamConfig,
113 root_element_name: String,
114 root_parsed: bool,
115 exhausted: bool,
116 buf: Vec<u8>,
117 position: StreamPosition,
119}
120
121#[derive(Debug, Clone)]
123pub struct StreamItem {
124 pub key: String,
126 pub value: Item,
128}
129
130impl<R: Read> XmlStreamingParser<R> {
131 pub fn new(reader: R, config: StreamConfig) -> Result<Self, String> {
133 let buf_reader = BufReader::with_capacity(config.buffer_size, reader);
134 let xml_reader = Reader::from_reader(buf_reader);
135
136 Ok(XmlStreamingParser {
137 reader: xml_reader,
138 config,
139 root_element_name: String::new(),
140 root_parsed: false,
141 exhausted: false,
142 buf: Vec::with_capacity(8192),
143 position: StreamPosition::default(),
144 })
145 }
146
147 #[inline]
152 pub fn position(&self) -> StreamPosition {
153 StreamPosition {
154 byte_offset: self.reader.buffer_position(),
155 items_parsed: self.position.items_parsed,
156 }
157 }
158
159 #[inline]
161 pub fn bytes_processed(&self) -> u64 {
162 self.reader.buffer_position()
163 }
164
165 #[inline]
167 pub fn items_parsed(&self) -> usize {
168 self.position.items_parsed
169 }
170
171 fn find_root(&mut self) -> Result<bool, String> {
173 loop {
174 self.buf.clear();
175 match self.reader.read_event_into(&mut self.buf) {
176 Ok(Event::DocType(_e)) => {
177 if self.config.log_security_events {
178 eprintln!(
179 "[SECURITY] DTD detected in streaming XML at position {}",
180 self.reader.buffer_position()
181 );
182 }
183
184 if self.config.entity_policy == EntityPolicy::RejectDtd {
185 return Err(format!(
186 "DOCTYPE declaration rejected at position {} (XXE prevention)",
187 self.reader.buffer_position()
188 ));
189 }
190 }
192 Ok(Event::Start(e)) | Ok(Event::Empty(e)) => {
193 self.root_element_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
194 self.root_parsed = true;
195 return Ok(true);
196 }
197 Ok(Event::Eof) => return Ok(false),
198 Err(e) => {
199 return Err(format!(
200 "XML parse error at position {}: {}",
201 self.reader.buffer_position(),
202 e
203 ))
204 }
205 _ => {}
206 }
207 }
208 }
209
210 fn parse_next_root_element(&mut self) -> Result<Option<StreamItem>, String> {
212 loop {
213 self.buf.clear();
214 match self.reader.read_event_into(&mut self.buf) {
215 Ok(Event::Start(e)) => {
216 let raw_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
217 let name = to_hedl_key(&raw_name);
218 let elem_owned = e.to_owned();
219
220 let item = parse_element(&mut self.reader, &elem_owned, &self.config, 1)?;
221 return Ok(Some(StreamItem {
222 key: name,
223 value: item,
224 }));
225 }
226 Ok(Event::Empty(e)) => {
227 let raw_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
228 let name = to_hedl_key(&raw_name);
229 let elem_owned = e.to_owned();
230
231 let item = parse_empty_element(&elem_owned, &self.config)?;
232 return Ok(Some(StreamItem {
233 key: name,
234 value: item,
235 }));
236 }
237 Ok(Event::End(e)) => {
238 let end_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
239 if end_name == self.root_element_name {
240 return Ok(None); }
242 }
243 Ok(Event::Eof) => return Ok(None),
244 Err(e) => {
245 return Err(format!(
246 "XML parse error at position {}: {}",
247 self.reader.buffer_position(),
248 e
249 ))
250 }
251 _ => {}
252 }
253 }
254 }
255}
256
257impl<R: Read> Iterator for XmlStreamingParser<R> {
258 type Item = Result<StreamItem, String>;
259
260 fn next(&mut self) -> Option<Self::Item> {
261 if self.exhausted {
262 return None;
263 }
264
265 if !self.root_parsed {
267 match self.find_root() {
268 Ok(true) => {
269 }
271 Ok(false) => {
272 self.exhausted = true;
273 return None;
274 }
275 Err(e) => {
276 self.exhausted = true;
277 return Some(Err(e));
278 }
279 }
280 }
281
282 match self.parse_next_root_element() {
284 Ok(Some(item)) => {
285 self.position.items_parsed += 1;
286 Some(Ok(item))
287 }
288 Ok(None) => {
289 self.exhausted = true;
290 None
291 }
292 Err(e) => {
293 self.exhausted = true;
294 Some(Err(e))
295 }
296 }
297 }
298}
299
300pub fn from_xml_stream<R: Read>(
325 reader: R,
326 config: &StreamConfig,
327) -> Result<XmlStreamingParser<R>, String> {
328 XmlStreamingParser::new(reader, config.clone())
329}
330
331fn parse_element<R>(
336 reader: &mut Reader<R>,
337 elem: &quick_xml::events::BytesStart<'_>,
338 config: &StreamConfig,
339 depth: usize,
340) -> Result<Item, String>
341where
342 R: BufRead,
343{
344 if depth > config.max_recursion_depth {
345 return Err(format!(
346 "XML recursion depth exceeded (max: {})",
347 config.max_recursion_depth
348 ));
349 }
350
351 let name = String::from_utf8_lossy(elem.name().as_ref()).to_string();
352
353 let mut attributes = BTreeMap::new();
355 let mut is_reference = false;
356 for attr in elem.attributes().flatten() {
357 let raw_key = String::from_utf8_lossy(attr.key.as_ref()).to_string();
358 let value = String::from_utf8_lossy(&attr.value).to_string();
359
360 if raw_key == "__hedl_type__" {
361 if value == "ref" {
362 is_reference = true;
363 }
364 continue;
365 }
366
367 let key = to_hedl_key(&raw_key);
368 attributes.insert(key, value);
369 }
370
371 let mut text_content = String::new();
373 let mut child_elements: BTreeMap<String, Vec<Item>> = BTreeMap::new();
374 let mut has_children = false;
375 let mut buf = Vec::new();
376
377 loop {
378 buf.clear();
379 match reader.read_event_into(&mut buf) {
380 Ok(Event::Start(e)) => {
381 has_children = true;
382 let raw_child_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
383 let child_name = to_hedl_key(&raw_child_name);
384 let elem_owned = e.to_owned();
385 let child_item = parse_element(reader, &elem_owned, config, depth + 1)?;
386
387 child_elements
388 .entry(child_name)
389 .or_default()
390 .push(child_item);
391 }
392 Ok(Event::Empty(e)) => {
393 has_children = true;
394 let raw_child_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
395 let child_name = to_hedl_key(&raw_child_name);
396 let elem_owned = e.to_owned();
397 let child_item = parse_empty_element(&elem_owned, config)?;
398
399 child_elements
400 .entry(child_name)
401 .or_default()
402 .push(child_item);
403 }
404 Ok(Event::Text(e)) => {
405 let content = e
406 .xml_content()
407 .map_err(|e| format!("Text decode error: {}", e))?;
408 text_content.push_str(&content);
409 }
410 Ok(Event::GeneralRef(e)) => {
411 let ref_name = e.decode().map_err(|e| format!("Ref decode error: {}", e))?;
413 let unescaped = match ref_name.as_ref() {
414 "amp" => "&",
415 "lt" => "<",
416 "gt" => ">",
417 "quot" => "\"",
418 "apos" => "'",
419 _ => return Err(format!("Unknown entity reference: {}", ref_name)),
420 };
421 text_content.push_str(unescaped);
422 }
423 Ok(Event::End(e)) => {
424 let end_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
425 if end_name == name {
426 break;
427 }
428 }
429 Ok(Event::Eof) => break,
430 Err(e) => return Err(format!("XML parse error: {}", e)),
431 _ => {}
432 }
433 }
434
435 if has_children {
437 let mut result_children = BTreeMap::new();
438 for (child_name, items) in child_elements {
439 if items.len() > 1 && config.infer_lists {
440 if child_name == "item" && items_are_tensor_elements(&items) {
441 let tensor = items_to_tensor(&items)?;
442 result_children
443 .insert(child_name, Item::Scalar(Value::Tensor(Box::new(tensor))));
444 } else {
445 let list = items_to_matrix_list(&child_name, items, config)?;
446 result_children.insert(child_name, Item::List(list));
447 }
448 } else if let Some(item) = items.into_iter().next() {
449 result_children.insert(child_name, item);
450 }
451 }
452
453 if result_children.len() == 1 {
455 let (child_key, child_item) = result_children.iter().next().unwrap();
456 if let Item::List(list) = child_item {
457 let has_nested_children = list
458 .rows
459 .iter()
460 .any(|node| node.children().map(|c| !c.is_empty()).unwrap_or(false));
461 if !has_nested_children {
462 let parent_singular =
463 singularize_and_capitalize(&to_hedl_key(&name)).to_lowercase();
464 let child_type = singularize_and_capitalize(child_key).to_lowercase();
465 if parent_singular == child_type {
466 return Ok(result_children.into_values().next().unwrap());
467 }
468 }
469 }
470 }
471
472 Ok(Item::Object(result_children))
473 } else if !text_content.trim().is_empty() {
474 let value = if is_reference {
475 Value::Reference(parse_reference(text_content.trim())?)
476 } else {
477 parse_value_with_config(&text_content, config)?
478 };
479 Ok(Item::Scalar(value))
480 } else if !attributes.is_empty() {
481 let mut obj = BTreeMap::new();
482 for (key, value_str) in attributes {
483 let value = parse_value_with_config(&value_str, config)?;
484 obj.insert(key, Item::Scalar(value));
485 }
486 Ok(Item::Object(obj))
487 } else {
488 Ok(Item::Scalar(Value::Null))
489 }
490}
491
492fn parse_empty_element(
493 elem: &quick_xml::events::BytesStart<'_>,
494 config: &StreamConfig,
495) -> Result<Item, String> {
496 let mut attributes = BTreeMap::new();
497
498 for attr in elem.attributes().flatten() {
499 let raw_key = String::from_utf8_lossy(attr.key.as_ref()).to_string();
500 let key = to_hedl_key(&raw_key);
501 let value = String::from_utf8_lossy(&attr.value).to_string();
502 attributes.insert(key, value);
503 }
504
505 if attributes.is_empty() {
506 Ok(Item::Scalar(Value::Null))
507 } else if attributes.len() == 1 && attributes.contains_key("value") {
508 let value_str = attributes.get("value").unwrap();
509 let value = parse_value_with_config(value_str, config)?;
510 Ok(Item::Scalar(value))
511 } else {
512 let mut obj = BTreeMap::new();
513 for (key, value_str) in attributes {
514 let value = parse_value_with_config(&value_str, config)?;
515 obj.insert(key, Item::Scalar(value));
516 }
517 Ok(Item::Object(obj))
518 }
519}
520
521fn parse_value_with_config(s: &str, config: &StreamConfig) -> Result<Value, String> {
522 let trimmed = s.trim();
523
524 if trimmed.contains('&') && trimmed.contains(';') {
526 if config.log_security_events {
527 eprintln!("[SECURITY] Entity reference detected in value: {}", trimmed);
528 }
529
530 if (trimmed.contains("&xxe;")
532 || trimmed.contains("&file;")
533 || trimmed.contains("&passwd;")
534 || trimmed.contains("&secret;"))
535 && config.entity_policy == EntityPolicy::WarnOnEntities
536 {
537 eprintln!(
538 "[WARNING] Suspicious entity reference detected: {}",
539 trimmed
540 );
541 }
542 }
543
544 if trimmed.is_empty() {
545 return Ok(Value::Null);
546 }
547
548 if trimmed.starts_with("$(") && trimmed.ends_with(')') {
549 let expr =
550 parse_expression_token(trimmed).map_err(|e| format!("Invalid expression: {}", e))?;
551 return Ok(Value::Expression(Box::new(expr)));
552 }
553
554 if trimmed == "true" {
555 return Ok(Value::Bool(true));
556 }
557 if trimmed == "false" {
558 return Ok(Value::Bool(false));
559 }
560
561 if let Ok(i) = trimmed.parse::<i64>() {
562 return Ok(Value::Int(i));
563 }
564 if let Ok(f) = trimmed.parse::<f64>() {
565 return Ok(Value::Float(f));
566 }
567
568 Ok(Value::String(trimmed.to_string().into()))
569}
570
571#[allow(dead_code)]
572fn parse_value(s: &str) -> Result<Value, String> {
573 let config = StreamConfig::default();
575 parse_value_with_config(s, &config)
576}
577
578fn items_to_matrix_list(
579 name: &str,
580 items: Vec<Item>,
581 _config: &StreamConfig,
582) -> Result<MatrixList, String> {
583 let type_name = singularize_and_capitalize(name);
584 let schema = infer_schema(&items)?;
585
586 let mut rows = Vec::new();
587 for (idx, item) in items.into_iter().enumerate() {
588 let node = item_to_node(&type_name, &schema, item, idx)?;
589 rows.push(node);
590 }
591
592 Ok(MatrixList {
593 type_name,
594 schema,
595 rows,
596 count_hint: None,
597 })
598}
599
600fn infer_schema(items: &[Item]) -> Result<Vec<String>, String> {
601 if let Some(Item::Object(first_obj)) = items.first() {
602 let mut keys: Vec<_> = first_obj
603 .iter()
604 .filter(|(_, item)| matches!(item, Item::Scalar(_)))
605 .map(|(k, _)| k.clone())
606 .collect();
607 keys.sort();
608
609 if let Some(pos) = keys.iter().position(|k| k == "id") {
610 keys.remove(pos);
611 keys.insert(0, "id".to_string());
612 } else {
613 keys.insert(0, "id".to_string());
614 }
615
616 Ok(keys)
617 } else {
618 Ok(vec!["id".to_string(), "value".to_string()])
619 }
620}
621
622fn item_to_node(
623 type_name: &str,
624 schema: &[String],
625 item: Item,
626 idx: usize,
627) -> Result<Node, String> {
628 match item {
629 Item::Object(obj) => {
630 let id = obj
631 .get(&schema[0])
632 .and_then(|i| i.as_scalar())
633 .and_then(|v| v.as_str())
634 .map(|s| s.to_string())
635 .unwrap_or_else(|| format!("{}", idx));
636
637 let mut fields = Vec::new();
638 for col in schema {
639 let value = obj
640 .get(col)
641 .and_then(|i| i.as_scalar())
642 .cloned()
643 .unwrap_or(Value::Null);
644 fields.push(value);
645 }
646
647 let mut children: BTreeMap<String, Vec<Node>> = BTreeMap::new();
648 for child_item in obj.values() {
649 if let Item::List(child_list) = child_item {
650 children.insert(child_list.type_name.clone(), child_list.rows.clone());
651 }
652 }
653
654 Ok(Node {
655 type_name: type_name.to_string(),
656 id,
657 fields: fields.into(),
658 children: if children.is_empty() {
659 None
660 } else {
661 Some(Box::new(children))
662 },
663 child_count: 0,
664 })
665 }
666 Item::Scalar(value) => {
667 let id = format!("{}", idx);
668 Ok(Node {
669 type_name: type_name.to_string(),
670 id: id.clone(),
671 fields: vec![Value::String(id.into()), value].into(),
672 children: None,
673 child_count: 0,
674 })
675 }
676 Item::List(_) => Err("Cannot convert nested list to node".to_string()),
677 }
678}
679
680fn to_hedl_key(s: &str) -> String {
681 let mut result = String::with_capacity(s.len() + 4); let mut prev_was_upper = false;
683 let mut prev_was_underscore = false;
684
685 for (i, c) in s.chars().enumerate() {
686 if c.is_ascii_uppercase() {
687 if i > 0 && !prev_was_upper && !prev_was_underscore {
689 result.push('_');
690 }
691 result.push(c.to_ascii_lowercase());
692 prev_was_upper = true;
693 prev_was_underscore = false;
694 } else if c == '_' {
695 if !prev_was_underscore && !result.is_empty() {
697 result.push(c);
698 }
699 prev_was_underscore = true;
700 prev_was_upper = false;
701 } else {
702 result.push(c);
703 prev_was_upper = false;
704 prev_was_underscore = false;
705 }
706 }
707
708 while result.ends_with('_') {
710 result.pop();
711 }
712
713 result
714}
715
716fn items_are_tensor_elements(items: &[Item]) -> bool {
717 items.iter().all(|item| match item {
718 Item::Scalar(Value::Int(_)) => true,
719 Item::Scalar(Value::Float(_)) => true,
720 Item::Scalar(Value::Tensor(_)) => true,
721 Item::Object(obj) if obj.len() == 1 => {
722 matches!(obj.get("item"), Some(Item::Scalar(Value::Tensor(_))))
723 }
724 _ => false,
725 })
726}
727
728fn items_to_tensor(items: &[Item]) -> Result<Tensor, String> {
729 let mut tensor_items = Vec::new();
730
731 for item in items {
732 let tensor = match item {
733 Item::Scalar(Value::Int(n)) => Tensor::Scalar(*n as f64),
734 Item::Scalar(Value::Float(f)) => Tensor::Scalar(*f),
735 Item::Scalar(Value::Tensor(t)) => (**t).clone(),
736 Item::Object(obj) if obj.len() == 1 => {
737 if let Some(Item::Scalar(Value::Tensor(t))) = obj.get("item") {
738 (**t).clone()
739 } else {
740 return Err("Cannot convert non-numeric item to tensor".to_string());
741 }
742 }
743 _ => return Err("Cannot convert non-numeric item to tensor".to_string()),
744 };
745 tensor_items.push(tensor);
746 }
747
748 Ok(Tensor::Array(tensor_items))
749}
750
751#[cfg(test)]
752mod tests {
753 use super::*;
754
755 #[test]
756 fn test_stream_config_default() {
757 let config = StreamConfig::default();
758 assert_eq!(config.buffer_size, 65536);
759 assert_eq!(config.max_recursion_depth, 100);
760 assert_eq!(config.max_batch_size, 1000);
761 assert_eq!(config.default_type_name, "Item");
762 assert_eq!(config.version, (1, 0));
763 assert!(config.infer_lists);
764 }
765
766 #[test]
767 fn test_stream_config_custom() {
768 let config = StreamConfig {
769 buffer_size: 131072,
770 max_recursion_depth: 50,
771 max_batch_size: 500,
772 default_type_name: "CustomItem".to_string(),
773 version: (2, 0),
774 infer_lists: false,
775 entity_policy: EntityPolicy::RejectDtd,
776 log_security_events: true,
777 };
778 assert_eq!(config.buffer_size, 131072);
779 assert_eq!(config.max_recursion_depth, 50);
780 assert_eq!(config.max_batch_size, 500);
781 assert_eq!(config.default_type_name, "CustomItem");
782 assert_eq!(config.version, (2, 0));
783 assert!(!config.infer_lists);
784 }
785
786 #[test]
787 fn test_stream_item_construction() {
788 let item = StreamItem {
789 key: "test".to_string(),
790 value: Item::Scalar(Value::String("value".to_string().into())),
791 };
792 assert_eq!(item.key, "test");
793 assert_eq!(
794 item.value.as_scalar(),
795 Some(&Value::String("value".to_string().into()))
796 );
797 }
798
799 #[test]
800 fn test_parse_value_string() {
801 assert_eq!(
802 parse_value("hello"),
803 Ok(Value::String("hello".to_string().into()))
804 );
805 }
806
807 #[test]
808 fn test_parse_value_bool() {
809 assert_eq!(parse_value("true"), Ok(Value::Bool(true)));
810 assert_eq!(parse_value("false"), Ok(Value::Bool(false)));
811 }
812
813 #[test]
814 fn test_parse_value_int() {
815 assert_eq!(parse_value("42"), Ok(Value::Int(42)));
816 }
817
818 #[test]
819 #[allow(clippy::approx_constant)]
820 fn test_parse_value_float() {
821 match parse_value("3.14") {
822 Ok(Value::Float(f)) => assert!((f - 3.14).abs() < 0.001),
823 _ => panic!("Expected float"),
824 }
825 }
826
827 #[test]
828 fn test_parse_value_null() {
829 assert_eq!(parse_value(""), Ok(Value::Null));
830 assert_eq!(parse_value(" "), Ok(Value::Null));
831 }
832
833 #[test]
834 fn test_to_hedl_key_pascal_case() {
835 assert_eq!(to_hedl_key("Category"), "category");
836 assert_eq!(to_hedl_key("UserPost"), "user_post");
837 }
838
839 #[test]
840 fn test_to_hedl_key_lowercase() {
841 assert_eq!(to_hedl_key("users"), "users");
842 }
843
844 #[test]
845 fn test_infer_schema_from_objects() {
846 let items = vec![Item::Object({
847 let mut m = BTreeMap::new();
848 m.insert(
849 "id".to_string(),
850 Item::Scalar(Value::String("1".to_string().into())),
851 );
852 m.insert(
853 "name".to_string(),
854 Item::Scalar(Value::String("Alice".to_string().into())),
855 );
856 m
857 })];
858 let schema = infer_schema(&items).unwrap();
859 assert!(schema.contains(&"id".to_string()));
860 assert!(schema.contains(&"name".to_string()));
861 }
862
863 #[test]
864 fn test_items_are_tensor_elements_numeric() {
865 let items = vec![
866 Item::Scalar(Value::Int(1)),
867 Item::Scalar(Value::Float(2.0)),
868 Item::Scalar(Value::Int(3)),
869 ];
870 assert!(items_are_tensor_elements(&items));
871 }
872
873 #[test]
874 fn test_items_are_tensor_elements_non_numeric() {
875 let items = vec![
876 Item::Scalar(Value::Int(1)),
877 Item::Scalar(Value::String("hello".to_string().into())),
878 ];
879 assert!(!items_are_tensor_elements(&items));
880 }
881}