1use hedl_core::convert::parse_reference;
48use hedl_core::{Item, MatrixList, Node, Value};
49use hedl_core::lex::{parse_expression_token, singularize_and_capitalize};
50use hedl_core::lex::Tensor;
51use quick_xml::events::Event;
52use quick_xml::Reader;
53use std::collections::BTreeMap;
54use std::io::{BufRead, BufReader, Read};
55
56#[derive(Debug, Clone)]
58pub struct StreamConfig {
59 pub buffer_size: usize,
61 pub max_recursion_depth: usize,
63 pub max_batch_size: usize,
65 pub default_type_name: String,
67 pub version: (u32, u32),
69 pub infer_lists: bool,
71}
72
73impl Default for StreamConfig {
74 fn default() -> Self {
75 Self {
76 buffer_size: 65536, max_recursion_depth: 100,
78 max_batch_size: 1000,
79 default_type_name: "Item".to_string(),
80 version: (1, 0),
81 infer_lists: true,
82 }
83 }
84}
85
86pub struct XmlStreamingParser<R: Read> {
91 reader: Reader<BufReader<R>>,
92 config: StreamConfig,
93 root_element_name: String,
94 root_parsed: bool,
95 exhausted: bool,
96 buf: Vec<u8>,
97}
98
99#[derive(Debug, Clone)]
101pub struct StreamItem {
102 pub key: String,
104 pub value: Item,
106}
107
108impl<R: Read> XmlStreamingParser<R> {
109 pub fn new(reader: R, config: StreamConfig) -> Result<Self, String> {
111 let buf_reader = BufReader::with_capacity(config.buffer_size, reader);
112 let xml_reader = Reader::from_reader(buf_reader);
113 Ok(XmlStreamingParser {
114 reader: xml_reader,
115 config,
116 root_element_name: String::new(),
117 root_parsed: false,
118 exhausted: false,
119 buf: Vec::with_capacity(8192),
120 })
121 }
122
123 fn find_root(&mut self) -> Result<bool, String> {
125 loop {
126 self.buf.clear();
127 match self.reader.read_event_into(&mut self.buf) {
128 Ok(Event::Start(e)) | Ok(Event::Empty(e)) => {
129 self.root_element_name =
130 String::from_utf8_lossy(e.name().as_ref()).to_string();
131 self.root_parsed = true;
132 return Ok(true);
133 }
134 Ok(Event::Eof) => return Ok(false),
135 Err(e) => {
136 return Err(format!(
137 "XML parse error at position {}: {}",
138 self.reader.buffer_position(),
139 e
140 ))
141 }
142 _ => {}
143 }
144 }
145 }
146
147 fn parse_next_root_element(&mut self) -> Result<Option<StreamItem>, String> {
149 loop {
150 self.buf.clear();
151 match self.reader.read_event_into(&mut self.buf) {
152 Ok(Event::Start(e)) => {
153 let raw_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
154 let name = to_hedl_key(&raw_name);
155 let elem_owned = e.to_owned();
156
157 let item = parse_element(&mut self.reader, &elem_owned, &self.config, 1)?;
158 return Ok(Some(StreamItem {
159 key: name,
160 value: item,
161 }));
162 }
163 Ok(Event::Empty(e)) => {
164 let raw_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
165 let name = to_hedl_key(&raw_name);
166 let elem_owned = e.to_owned();
167
168 let item = parse_empty_element(&elem_owned)?;
169 return Ok(Some(StreamItem {
170 key: name,
171 value: item,
172 }));
173 }
174 Ok(Event::End(e)) => {
175 let end_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
176 if end_name == self.root_element_name {
177 return Ok(None); }
179 }
180 Ok(Event::Eof) => return Ok(None),
181 Err(e) => {
182 return Err(format!(
183 "XML parse error at position {}: {}",
184 self.reader.buffer_position(),
185 e
186 ))
187 }
188 _ => {}
189 }
190 }
191 }
192}
193
194impl<R: Read> Iterator for XmlStreamingParser<R> {
195 type Item = Result<StreamItem, String>;
196
197 fn next(&mut self) -> Option<Self::Item> {
198 if self.exhausted {
199 return None;
200 }
201
202 if !self.root_parsed {
204 match self.find_root() {
205 Ok(true) => {
206 }
208 Ok(false) => {
209 self.exhausted = true;
210 return None;
211 }
212 Err(e) => {
213 self.exhausted = true;
214 return Some(Err(e));
215 }
216 }
217 }
218
219 match self.parse_next_root_element() {
221 Ok(Some(item)) => Some(Ok(item)),
222 Ok(None) => {
223 self.exhausted = true;
224 None
225 }
226 Err(e) => {
227 self.exhausted = true;
228 Some(Err(e))
229 }
230 }
231 }
232}
233
234pub fn from_xml_stream<R: Read>(
259 reader: R,
260 config: &StreamConfig,
261) -> Result<XmlStreamingParser<R>, String> {
262 XmlStreamingParser::new(reader, config.clone())
263}
264
265fn parse_element<R>(
270 reader: &mut Reader<R>,
271 elem: &quick_xml::events::BytesStart,
272 config: &StreamConfig,
273 depth: usize,
274) -> Result<Item, String>
275where
276 R: BufRead,
277{
278 if depth > config.max_recursion_depth {
279 return Err(format!(
280 "XML recursion depth exceeded (max: {})",
281 config.max_recursion_depth
282 ));
283 }
284
285 let name = String::from_utf8_lossy(elem.name().as_ref()).to_string();
286
287 let mut attributes = BTreeMap::new();
289 let mut is_reference = false;
290 for attr in elem.attributes().flatten() {
291 let raw_key = String::from_utf8_lossy(attr.key.as_ref()).to_string();
292 let value = String::from_utf8_lossy(&attr.value).to_string();
293
294 if raw_key == "__hedl_type__" {
295 if value == "ref" {
296 is_reference = true;
297 }
298 continue;
299 }
300
301 let key = to_hedl_key(&raw_key);
302 attributes.insert(key, value);
303 }
304
305 let mut text_content = String::new();
307 let mut child_elements: BTreeMap<String, Vec<Item>> = BTreeMap::new();
308 let mut has_children = false;
309 let mut buf = Vec::new();
310
311 loop {
312 buf.clear();
313 match reader.read_event_into(&mut buf) {
314 Ok(Event::Start(e)) => {
315 has_children = true;
316 let raw_child_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
317 let child_name = to_hedl_key(&raw_child_name);
318 let elem_owned = e.to_owned();
319 let child_item = parse_element(reader, &elem_owned, config, depth + 1)?;
320
321 child_elements
322 .entry(child_name)
323 .or_default()
324 .push(child_item);
325 }
326 Ok(Event::Empty(e)) => {
327 has_children = true;
328 let raw_child_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
329 let child_name = to_hedl_key(&raw_child_name);
330 let elem_owned = e.to_owned();
331 let child_item = parse_empty_element(&elem_owned)?;
332
333 child_elements
334 .entry(child_name)
335 .or_default()
336 .push(child_item);
337 }
338 Ok(Event::Text(e)) => {
339 text_content.push_str(
340 &e.unescape()
341 .map_err(|e| format!("Text unescape error: {}", e))?,
342 );
343 }
344 Ok(Event::End(e)) => {
345 let end_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
346 if end_name == name {
347 break;
348 }
349 }
350 Ok(Event::Eof) => break,
351 Err(e) => return Err(format!("XML parse error: {}", e)),
352 _ => {}
353 }
354 }
355
356 if has_children {
358 let mut result_children = BTreeMap::new();
359 for (child_name, items) in child_elements {
360 if items.len() > 1 && config.infer_lists {
361 if child_name == "item" && items_are_tensor_elements(&items) {
362 let tensor = items_to_tensor(&items)?;
363 result_children.insert(child_name, Item::Scalar(Value::Tensor(tensor)));
364 } else {
365 let list = items_to_matrix_list(&child_name, items, config)?;
366 result_children.insert(child_name, Item::List(list));
367 }
368 } else if let Some(item) = items.into_iter().next() {
369 result_children.insert(child_name, item);
370 }
371 }
372
373 if result_children.len() == 1 {
375 let (child_key, child_item) = result_children.iter().next().unwrap();
376 if let Item::List(list) = child_item {
377 let has_nested_children = list.rows.iter().any(|node| !node.children.is_empty());
378 if !has_nested_children {
379 let parent_singular =
380 singularize_and_capitalize(&to_hedl_key(&name)).to_lowercase();
381 let child_type = singularize_and_capitalize(child_key).to_lowercase();
382 if parent_singular == child_type {
383 return Ok(result_children.into_values().next().unwrap());
384 }
385 }
386 }
387 }
388
389 Ok(Item::Object(result_children))
390 } else if !text_content.trim().is_empty() {
391 let value = if is_reference {
392 Value::Reference(parse_reference(text_content.trim())?)
393 } else {
394 parse_value(&text_content)?
395 };
396 Ok(Item::Scalar(value))
397 } else if !attributes.is_empty() {
398 let mut obj = BTreeMap::new();
399 for (key, value_str) in attributes {
400 let value = parse_value(&value_str)?;
401 obj.insert(key, Item::Scalar(value));
402 }
403 Ok(Item::Object(obj))
404 } else {
405 Ok(Item::Scalar(Value::Null))
406 }
407}
408
409fn parse_empty_element(elem: &quick_xml::events::BytesStart) -> Result<Item, String> {
410 let mut attributes = BTreeMap::new();
411
412 for attr in elem.attributes().flatten() {
413 let raw_key = String::from_utf8_lossy(attr.key.as_ref()).to_string();
414 let key = to_hedl_key(&raw_key);
415 let value = String::from_utf8_lossy(&attr.value).to_string();
416 attributes.insert(key, value);
417 }
418
419 if attributes.is_empty() {
420 Ok(Item::Scalar(Value::Null))
421 } else if attributes.len() == 1 && attributes.contains_key("value") {
422 let value_str = attributes.get("value").unwrap();
423 let value = parse_value(value_str)?;
424 Ok(Item::Scalar(value))
425 } else {
426 let mut obj = BTreeMap::new();
427 for (key, value_str) in attributes {
428 let value = parse_value(&value_str)?;
429 obj.insert(key, Item::Scalar(value));
430 }
431 Ok(Item::Object(obj))
432 }
433}
434
435fn parse_value(s: &str) -> Result<Value, String> {
436 let trimmed = s.trim();
437
438 if trimmed.is_empty() {
439 return Ok(Value::Null);
440 }
441
442 if trimmed.starts_with("$(") && trimmed.ends_with(')') {
443 let expr =
444 parse_expression_token(trimmed).map_err(|e| format!("Invalid expression: {}", e))?;
445 return Ok(Value::Expression(expr));
446 }
447
448 if trimmed == "true" {
449 return Ok(Value::Bool(true));
450 }
451 if trimmed == "false" {
452 return Ok(Value::Bool(false));
453 }
454
455 if let Ok(i) = trimmed.parse::<i64>() {
456 return Ok(Value::Int(i));
457 }
458 if let Ok(f) = trimmed.parse::<f64>() {
459 return Ok(Value::Float(f));
460 }
461
462 Ok(Value::String(trimmed.to_string()))
463}
464
465fn items_to_matrix_list(
466 name: &str,
467 items: Vec<Item>,
468 _config: &StreamConfig,
469) -> Result<MatrixList, String> {
470 let type_name = singularize_and_capitalize(name);
471 let schema = infer_schema(&items)?;
472
473 let mut rows = Vec::new();
474 for (idx, item) in items.into_iter().enumerate() {
475 let node = item_to_node(&type_name, &schema, item, idx)?;
476 rows.push(node);
477 }
478
479 Ok(MatrixList {
480 type_name,
481 schema,
482 rows,
483 count_hint: None,
484 })
485}
486
487fn infer_schema(items: &[Item]) -> Result<Vec<String>, String> {
488 if let Some(Item::Object(first_obj)) = items.first() {
489 let mut keys: Vec<_> = first_obj
490 .iter()
491 .filter(|(_, item)| matches!(item, Item::Scalar(_)))
492 .map(|(k, _)| k.clone())
493 .collect();
494 keys.sort();
495
496 if let Some(pos) = keys.iter().position(|k| k == "id") {
497 keys.remove(pos);
498 keys.insert(0, "id".to_string());
499 } else {
500 keys.insert(0, "id".to_string());
501 }
502
503 Ok(keys)
504 } else {
505 Ok(vec!["id".to_string(), "value".to_string()])
506 }
507}
508
509fn item_to_node(
510 type_name: &str,
511 schema: &[String],
512 item: Item,
513 idx: usize,
514) -> Result<Node, String> {
515 match item {
516 Item::Object(obj) => {
517 let id = obj
518 .get(&schema[0])
519 .and_then(|i| i.as_scalar())
520 .and_then(|v| v.as_str())
521 .map(|s| s.to_string())
522 .unwrap_or_else(|| format!("{}", idx));
523
524 let mut fields = Vec::new();
525 for col in schema {
526 let value = obj
527 .get(col)
528 .and_then(|i| i.as_scalar())
529 .cloned()
530 .unwrap_or(Value::Null);
531 fields.push(value);
532 }
533
534 let mut children: BTreeMap<String, Vec<Node>> = BTreeMap::new();
535 for child_item in obj.values() {
536 if let Item::List(child_list) = child_item {
537 children.insert(child_list.type_name.clone(), child_list.rows.clone());
538 }
539 }
540
541 Ok(Node {
542 type_name: type_name.to_string(),
543 id,
544 fields,
545 children,
546 child_count: None,
547 })
548 }
549 Item::Scalar(value) => {
550 let id = format!("{}", idx);
551 Ok(Node {
552 type_name: type_name.to_string(),
553 id: id.clone(),
554 fields: vec![Value::String(id), value],
555 children: BTreeMap::new(),
556 child_count: None,
557 })
558 }
559 Item::List(_) => Err("Cannot convert nested list to node".to_string()),
560 }
561}
562
563fn to_hedl_key(s: &str) -> String {
564 let mut result = String::new();
565 let mut prev_was_upper = false;
566
567 for (i, c) in s.chars().enumerate() {
568 if c.is_ascii_uppercase() {
569 if i > 0 && !prev_was_upper && !result.ends_with('_') {
570 result.push('_');
571 }
572 result.push(c.to_ascii_lowercase());
573 prev_was_upper = true;
574 } else {
575 result.push(c);
576 prev_was_upper = false;
577 }
578 }
579
580 while result.contains("__") {
581 result = result.replace("__", "_");
582 }
583
584 result.trim_matches('_').to_string()
585}
586
587fn items_are_tensor_elements(items: &[Item]) -> bool {
588 items.iter().all(|item| {
589 match item {
590 Item::Scalar(Value::Int(_)) => true,
591 Item::Scalar(Value::Float(_)) => true,
592 Item::Scalar(Value::Tensor(_)) => true,
593 Item::Object(obj) if obj.len() == 1 => {
594 matches!(obj.get("item"), Some(Item::Scalar(Value::Tensor(_))))
595 }
596 _ => false,
597 }
598 })
599}
600
601fn items_to_tensor(items: &[Item]) -> Result<Tensor, String> {
602 let mut tensor_items = Vec::new();
603
604 for item in items {
605 let tensor = match item {
606 Item::Scalar(Value::Int(n)) => Tensor::Scalar(*n as f64),
607 Item::Scalar(Value::Float(f)) => Tensor::Scalar(*f),
608 Item::Scalar(Value::Tensor(t)) => t.clone(),
609 Item::Object(obj) if obj.len() == 1 => {
610 if let Some(Item::Scalar(Value::Tensor(t))) = obj.get("item") {
611 t.clone()
612 } else {
613 return Err("Cannot convert non-numeric item to tensor".to_string());
614 }
615 }
616 _ => return Err("Cannot convert non-numeric item to tensor".to_string()),
617 };
618 tensor_items.push(tensor);
619 }
620
621 Ok(Tensor::Array(tensor_items))
622}
623
624#[cfg(test)]
625mod tests {
626 use super::*;
627
628 #[test]
629 fn test_stream_config_default() {
630 let config = StreamConfig::default();
631 assert_eq!(config.buffer_size, 65536);
632 assert_eq!(config.max_recursion_depth, 100);
633 assert_eq!(config.max_batch_size, 1000);
634 assert_eq!(config.default_type_name, "Item");
635 assert_eq!(config.version, (1, 0));
636 assert!(config.infer_lists);
637 }
638
639 #[test]
640 fn test_stream_config_custom() {
641 let config = StreamConfig {
642 buffer_size: 131072,
643 max_recursion_depth: 50,
644 max_batch_size: 500,
645 default_type_name: "CustomItem".to_string(),
646 version: (2, 0),
647 infer_lists: false,
648 };
649 assert_eq!(config.buffer_size, 131072);
650 assert_eq!(config.max_recursion_depth, 50);
651 assert_eq!(config.max_batch_size, 500);
652 assert_eq!(config.default_type_name, "CustomItem");
653 assert_eq!(config.version, (2, 0));
654 assert!(!config.infer_lists);
655 }
656
657 #[test]
658 fn test_stream_item_construction() {
659 let item = StreamItem {
660 key: "test".to_string(),
661 value: Item::Scalar(Value::String("value".to_string())),
662 };
663 assert_eq!(item.key, "test");
664 assert_eq!(
665 item.value.as_scalar(),
666 Some(&Value::String("value".to_string()))
667 );
668 }
669
670 #[test]
671 fn test_parse_value_string() {
672 assert_eq!(parse_value("hello"), Ok(Value::String("hello".to_string())));
673 }
674
675 #[test]
676 fn test_parse_value_bool() {
677 assert_eq!(parse_value("true"), Ok(Value::Bool(true)));
678 assert_eq!(parse_value("false"), Ok(Value::Bool(false)));
679 }
680
681 #[test]
682 fn test_parse_value_int() {
683 assert_eq!(parse_value("42"), Ok(Value::Int(42)));
684 }
685
686 #[test]
687 fn test_parse_value_float() {
688 match parse_value("3.14") {
689 Ok(Value::Float(f)) => assert!((f - 3.14).abs() < 0.001),
690 _ => panic!("Expected float"),
691 }
692 }
693
694 #[test]
695 fn test_parse_value_null() {
696 assert_eq!(parse_value(""), Ok(Value::Null));
697 assert_eq!(parse_value(" "), Ok(Value::Null));
698 }
699
700 #[test]
701 fn test_to_hedl_key_pascal_case() {
702 assert_eq!(to_hedl_key("Category"), "category");
703 assert_eq!(to_hedl_key("UserPost"), "user_post");
704 }
705
706 #[test]
707 fn test_to_hedl_key_lowercase() {
708 assert_eq!(to_hedl_key("users"), "users");
709 }
710
711 #[test]
712 fn test_infer_schema_from_objects() {
713 let items = vec![
714 Item::Object({
715 let mut m = BTreeMap::new();
716 m.insert("id".to_string(), Item::Scalar(Value::String("1".to_string())));
717 m.insert("name".to_string(), Item::Scalar(Value::String("Alice".to_string())));
718 m
719 }),
720 ];
721 let schema = infer_schema(&items).unwrap();
722 assert!(schema.contains(&"id".to_string()));
723 assert!(schema.contains(&"name".to_string()));
724 }
725
726 #[test]
727 fn test_items_are_tensor_elements_numeric() {
728 let items = vec![
729 Item::Scalar(Value::Int(1)),
730 Item::Scalar(Value::Float(2.0)),
731 Item::Scalar(Value::Int(3)),
732 ];
733 assert!(items_are_tensor_elements(&items));
734 }
735
736 #[test]
737 fn test_items_are_tensor_elements_non_numeric() {
738 let items = vec![
739 Item::Scalar(Value::Int(1)),
740 Item::Scalar(Value::String("hello".to_string())),
741 ];
742 assert!(!items_are_tensor_elements(&items));
743 }
744}