hedl_xml/
streaming.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Streaming XML parser for handling large documents
19//!
20//! This module provides memory-efficient streaming parsing for multi-gigabyte XML files.
21//! Instead of loading the entire document into memory, items are yielded incrementally.
22//!
23//! # Features
24//!
25//! - Memory-efficient: Process files larger than available RAM
26//! - Incremental: Yields items as they're parsed
27//! - Configurable: Adjustable buffer sizes and recursion limits
28//! - Type-safe: Returns `Result` for error handling
29//!
30//! # Examples
31//!
32//! ```text
33//! use std::fs::File;
34//! use hedl_xml::streaming::{from_xml_stream, StreamConfig};
35//!
36//! let file = File::open("large.xml")?;
37//! let config = StreamConfig::default();
38//!
39//! for result in from_xml_stream(file, &config)? {
40//!     match result {
41//!         Ok(item) => println!("Processed: {:?}", item),
42//!         Err(e) => eprintln!("Error: {}", e),
43//!     }
44//! }
45//! ```
46
47use hedl_core::convert::parse_reference;
48use hedl_core::{Item, MatrixList, Node, Value};
49use hedl_core::lex::{parse_expression_token, singularize_and_capitalize};
50use hedl_core::lex::Tensor;
51use quick_xml::events::Event;
52use quick_xml::Reader;
53use std::collections::BTreeMap;
54use std::io::{BufRead, BufReader, Read};
55
56/// Configuration for streaming XML parsing
57#[derive(Debug, Clone)]
58pub struct StreamConfig {
59    /// Buffer size for reading chunks (default: 64KB)
60    pub buffer_size: usize,
61    /// Maximum recursion depth (default: 100)
62    pub max_recursion_depth: usize,
63    /// Maximum list size before yielding (default: 1000)
64    pub max_batch_size: usize,
65    /// Default type name for inferred lists
66    pub default_type_name: String,
67    /// HEDL version
68    pub version: (u32, u32),
69    /// Try to infer list structures from repeated elements
70    pub infer_lists: bool,
71}
72
73impl Default for StreamConfig {
74    fn default() -> Self {
75        Self {
76            buffer_size: 65536, // 64KB
77            max_recursion_depth: 100,
78            max_batch_size: 1000,
79            default_type_name: "Item".to_string(),
80            version: (1, 0),
81            infer_lists: true,
82        }
83    }
84}
85
86/// A streaming XML parser that yields items incrementally
87///
88/// This iterator yields `StreamItem` results as the XML is parsed.
89/// Memory usage is bounded by the `buffer_size` configuration.
90pub struct XmlStreamingParser<R: Read> {
91    reader: Reader<BufReader<R>>,
92    config: StreamConfig,
93    root_element_name: String,
94    root_parsed: bool,
95    exhausted: bool,
96    buf: Vec<u8>,
97}
98
99/// An item yielded by the streaming parser
100#[derive(Debug, Clone)]
101pub struct StreamItem {
102    /// Key/field name in the HEDL document
103    pub key: String,
104    /// The parsed value (scalar, object, or list)
105    pub value: Item,
106}
107
108impl<R: Read> XmlStreamingParser<R> {
109    /// Create a new streaming parser
110    pub fn new(reader: R, config: StreamConfig) -> Result<Self, String> {
111        let buf_reader = BufReader::with_capacity(config.buffer_size, reader);
112        let xml_reader = Reader::from_reader(buf_reader);
113        Ok(XmlStreamingParser {
114            reader: xml_reader,
115            config,
116            root_element_name: String::new(),
117            root_parsed: false,
118            exhausted: false,
119            buf: Vec::with_capacity(8192),
120        })
121    }
122
123    /// Internal method to find and parse the root element
124    fn find_root(&mut self) -> Result<bool, String> {
125        loop {
126            self.buf.clear();
127            match self.reader.read_event_into(&mut self.buf) {
128                Ok(Event::Start(e)) | Ok(Event::Empty(e)) => {
129                    self.root_element_name =
130                        String::from_utf8_lossy(e.name().as_ref()).to_string();
131                    self.root_parsed = true;
132                    return Ok(true);
133                }
134                Ok(Event::Eof) => return Ok(false),
135                Err(e) => {
136                    return Err(format!(
137                        "XML parse error at position {}: {}",
138                        self.reader.buffer_position(),
139                        e
140                    ))
141                }
142                _ => {}
143            }
144        }
145    }
146
147    /// Parse the next element at the root level
148    fn parse_next_root_element(&mut self) -> Result<Option<StreamItem>, String> {
149        loop {
150            self.buf.clear();
151            match self.reader.read_event_into(&mut self.buf) {
152                Ok(Event::Start(e)) => {
153                    let raw_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
154                    let name = to_hedl_key(&raw_name);
155                    let elem_owned = e.to_owned();
156
157                    let item = parse_element(&mut self.reader, &elem_owned, &self.config, 1)?;
158                    return Ok(Some(StreamItem {
159                        key: name,
160                        value: item,
161                    }));
162                }
163                Ok(Event::Empty(e)) => {
164                    let raw_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
165                    let name = to_hedl_key(&raw_name);
166                    let elem_owned = e.to_owned();
167
168                    let item = parse_empty_element(&elem_owned)?;
169                    return Ok(Some(StreamItem {
170                        key: name,
171                        value: item,
172                    }));
173                }
174                Ok(Event::End(e)) => {
175                    let end_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
176                    if end_name == self.root_element_name {
177                        return Ok(None); // End of root element
178                    }
179                }
180                Ok(Event::Eof) => return Ok(None),
181                Err(e) => {
182                    return Err(format!(
183                        "XML parse error at position {}: {}",
184                        self.reader.buffer_position(),
185                        e
186                    ))
187                }
188                _ => {}
189            }
190        }
191    }
192}
193
194impl<R: Read> Iterator for XmlStreamingParser<R> {
195    type Item = Result<StreamItem, String>;
196
197    fn next(&mut self) -> Option<Self::Item> {
198        if self.exhausted {
199            return None;
200        }
201
202        // If we haven't found the root element yet, do that first
203        if !self.root_parsed {
204            match self.find_root() {
205                Ok(true) => {
206                    // Root element found, continue to next element
207                }
208                Ok(false) => {
209                    self.exhausted = true;
210                    return None;
211                }
212                Err(e) => {
213                    self.exhausted = true;
214                    return Some(Err(e));
215                }
216            }
217        }
218
219        // Try to parse the next element
220        match self.parse_next_root_element() {
221            Ok(Some(item)) => Some(Ok(item)),
222            Ok(None) => {
223                self.exhausted = true;
224                None
225            }
226            Err(e) => {
227                self.exhausted = true;
228                Some(Err(e))
229            }
230        }
231    }
232}
233
234/// Create a streaming XML parser from a reader
235///
236/// Returns an iterator that yields `Result<StreamItem, String>` as items are parsed.
237/// This is memory-efficient for multi-gigabyte XML files.
238///
239/// # Examples
240///
241/// ```no_run
242/// use std::fs::File;
243/// use hedl_xml::streaming::{from_xml_stream, StreamConfig};
244///
245/// let file = File::open("data.xml")?;
246/// let config = StreamConfig::default();
247///
248/// let mut count = 0;
249/// for result in from_xml_stream(file, &config)? {
250///     match result {
251///         Ok(_item) => count += 1,
252///         Err(e) => eprintln!("Parse error: {}", e),
253///     }
254/// }
255/// println!("Processed {} items", count);
256/// # Ok::<(), Box<dyn std::error::Error>>(())
257/// ```
258pub fn from_xml_stream<R: Read>(
259    reader: R,
260    config: &StreamConfig,
261) -> Result<XmlStreamingParser<R>, String> {
262    XmlStreamingParser::new(reader, config.clone())
263}
264
265// ============================================================================
266// Parsing functions (shared with from_xml.rs)
267// ============================================================================
268
269fn parse_element<R>(
270    reader: &mut Reader<R>,
271    elem: &quick_xml::events::BytesStart,
272    config: &StreamConfig,
273    depth: usize,
274) -> Result<Item, String>
275where
276    R: BufRead,
277{
278    if depth > config.max_recursion_depth {
279        return Err(format!(
280            "XML recursion depth exceeded (max: {})",
281            config.max_recursion_depth
282        ));
283    }
284
285    let name = String::from_utf8_lossy(elem.name().as_ref()).to_string();
286
287    // Extract attributes
288    let mut attributes = BTreeMap::new();
289    let mut is_reference = false;
290    for attr in elem.attributes().flatten() {
291        let raw_key = String::from_utf8_lossy(attr.key.as_ref()).to_string();
292        let value = String::from_utf8_lossy(&attr.value).to_string();
293
294        if raw_key == "__hedl_type__" {
295            if value == "ref" {
296                is_reference = true;
297            }
298            continue;
299        }
300
301        let key = to_hedl_key(&raw_key);
302        attributes.insert(key, value);
303    }
304
305    // Parse content
306    let mut text_content = String::new();
307    let mut child_elements: BTreeMap<String, Vec<Item>> = BTreeMap::new();
308    let mut has_children = false;
309    let mut buf = Vec::new();
310
311    loop {
312        buf.clear();
313        match reader.read_event_into(&mut buf) {
314            Ok(Event::Start(e)) => {
315                has_children = true;
316                let raw_child_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
317                let child_name = to_hedl_key(&raw_child_name);
318                let elem_owned = e.to_owned();
319                let child_item = parse_element(reader, &elem_owned, config, depth + 1)?;
320
321                child_elements
322                    .entry(child_name)
323                    .or_default()
324                    .push(child_item);
325            }
326            Ok(Event::Empty(e)) => {
327                has_children = true;
328                let raw_child_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
329                let child_name = to_hedl_key(&raw_child_name);
330                let elem_owned = e.to_owned();
331                let child_item = parse_empty_element(&elem_owned)?;
332
333                child_elements
334                    .entry(child_name)
335                    .or_default()
336                    .push(child_item);
337            }
338            Ok(Event::Text(e)) => {
339                text_content.push_str(
340                    &e.unescape()
341                        .map_err(|e| format!("Text unescape error: {}", e))?,
342                );
343            }
344            Ok(Event::End(e)) => {
345                let end_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
346                if end_name == name {
347                    break;
348                }
349            }
350            Ok(Event::Eof) => break,
351            Err(e) => return Err(format!("XML parse error: {}", e)),
352            _ => {}
353        }
354    }
355
356    // Determine item type
357    if has_children {
358        let mut result_children = BTreeMap::new();
359        for (child_name, items) in child_elements {
360            if items.len() > 1 && config.infer_lists {
361                if child_name == "item" && items_are_tensor_elements(&items) {
362                    let tensor = items_to_tensor(&items)?;
363                    result_children.insert(child_name, Item::Scalar(Value::Tensor(tensor)));
364                } else {
365                    let list = items_to_matrix_list(&child_name, items, config)?;
366                    result_children.insert(child_name, Item::List(list));
367                }
368            } else if let Some(item) = items.into_iter().next() {
369                result_children.insert(child_name, item);
370            }
371        }
372
373        // Check for flattening
374        if result_children.len() == 1 {
375            let (child_key, child_item) = result_children.iter().next().unwrap();
376            if let Item::List(list) = child_item {
377                let has_nested_children = list.rows.iter().any(|node| !node.children.is_empty());
378                if !has_nested_children {
379                    let parent_singular =
380                        singularize_and_capitalize(&to_hedl_key(&name)).to_lowercase();
381                    let child_type = singularize_and_capitalize(child_key).to_lowercase();
382                    if parent_singular == child_type {
383                        return Ok(result_children.into_values().next().unwrap());
384                    }
385                }
386            }
387        }
388
389        Ok(Item::Object(result_children))
390    } else if !text_content.trim().is_empty() {
391        let value = if is_reference {
392            Value::Reference(parse_reference(text_content.trim())?)
393        } else {
394            parse_value(&text_content)?
395        };
396        Ok(Item::Scalar(value))
397    } else if !attributes.is_empty() {
398        let mut obj = BTreeMap::new();
399        for (key, value_str) in attributes {
400            let value = parse_value(&value_str)?;
401            obj.insert(key, Item::Scalar(value));
402        }
403        Ok(Item::Object(obj))
404    } else {
405        Ok(Item::Scalar(Value::Null))
406    }
407}
408
409fn parse_empty_element(elem: &quick_xml::events::BytesStart) -> Result<Item, String> {
410    let mut attributes = BTreeMap::new();
411
412    for attr in elem.attributes().flatten() {
413        let raw_key = String::from_utf8_lossy(attr.key.as_ref()).to_string();
414        let key = to_hedl_key(&raw_key);
415        let value = String::from_utf8_lossy(&attr.value).to_string();
416        attributes.insert(key, value);
417    }
418
419    if attributes.is_empty() {
420        Ok(Item::Scalar(Value::Null))
421    } else if attributes.len() == 1 && attributes.contains_key("value") {
422        let value_str = attributes.get("value").unwrap();
423        let value = parse_value(value_str)?;
424        Ok(Item::Scalar(value))
425    } else {
426        let mut obj = BTreeMap::new();
427        for (key, value_str) in attributes {
428            let value = parse_value(&value_str)?;
429            obj.insert(key, Item::Scalar(value));
430        }
431        Ok(Item::Object(obj))
432    }
433}
434
435fn parse_value(s: &str) -> Result<Value, String> {
436    let trimmed = s.trim();
437
438    if trimmed.is_empty() {
439        return Ok(Value::Null);
440    }
441
442    if trimmed.starts_with("$(") && trimmed.ends_with(')') {
443        let expr =
444            parse_expression_token(trimmed).map_err(|e| format!("Invalid expression: {}", e))?;
445        return Ok(Value::Expression(expr));
446    }
447
448    if trimmed == "true" {
449        return Ok(Value::Bool(true));
450    }
451    if trimmed == "false" {
452        return Ok(Value::Bool(false));
453    }
454
455    if let Ok(i) = trimmed.parse::<i64>() {
456        return Ok(Value::Int(i));
457    }
458    if let Ok(f) = trimmed.parse::<f64>() {
459        return Ok(Value::Float(f));
460    }
461
462    Ok(Value::String(trimmed.to_string()))
463}
464
465fn items_to_matrix_list(
466    name: &str,
467    items: Vec<Item>,
468    _config: &StreamConfig,
469) -> Result<MatrixList, String> {
470    let type_name = singularize_and_capitalize(name);
471    let schema = infer_schema(&items)?;
472
473    let mut rows = Vec::new();
474    for (idx, item) in items.into_iter().enumerate() {
475        let node = item_to_node(&type_name, &schema, item, idx)?;
476        rows.push(node);
477    }
478
479    Ok(MatrixList {
480        type_name,
481        schema,
482        rows,
483        count_hint: None,
484    })
485}
486
487fn infer_schema(items: &[Item]) -> Result<Vec<String>, String> {
488    if let Some(Item::Object(first_obj)) = items.first() {
489        let mut keys: Vec<_> = first_obj
490            .iter()
491            .filter(|(_, item)| matches!(item, Item::Scalar(_)))
492            .map(|(k, _)| k.clone())
493            .collect();
494        keys.sort();
495
496        if let Some(pos) = keys.iter().position(|k| k == "id") {
497            keys.remove(pos);
498            keys.insert(0, "id".to_string());
499        } else {
500            keys.insert(0, "id".to_string());
501        }
502
503        Ok(keys)
504    } else {
505        Ok(vec!["id".to_string(), "value".to_string()])
506    }
507}
508
509fn item_to_node(
510    type_name: &str,
511    schema: &[String],
512    item: Item,
513    idx: usize,
514) -> Result<Node, String> {
515    match item {
516        Item::Object(obj) => {
517            let id = obj
518                .get(&schema[0])
519                .and_then(|i| i.as_scalar())
520                .and_then(|v| v.as_str())
521                .map(|s| s.to_string())
522                .unwrap_or_else(|| format!("{}", idx));
523
524            let mut fields = Vec::new();
525            for col in schema {
526                let value = obj
527                    .get(col)
528                    .and_then(|i| i.as_scalar())
529                    .cloned()
530                    .unwrap_or(Value::Null);
531                fields.push(value);
532            }
533
534            let mut children: BTreeMap<String, Vec<Node>> = BTreeMap::new();
535            for child_item in obj.values() {
536                if let Item::List(child_list) = child_item {
537                    children.insert(child_list.type_name.clone(), child_list.rows.clone());
538                }
539            }
540
541            Ok(Node {
542                type_name: type_name.to_string(),
543                id,
544                fields,
545                children,
546                child_count: None,
547            })
548        }
549        Item::Scalar(value) => {
550            let id = format!("{}", idx);
551            Ok(Node {
552                type_name: type_name.to_string(),
553                id: id.clone(),
554                fields: vec![Value::String(id), value],
555                children: BTreeMap::new(),
556                child_count: None,
557            })
558        }
559        Item::List(_) => Err("Cannot convert nested list to node".to_string()),
560    }
561}
562
563fn to_hedl_key(s: &str) -> String {
564    let mut result = String::new();
565    let mut prev_was_upper = false;
566
567    for (i, c) in s.chars().enumerate() {
568        if c.is_ascii_uppercase() {
569            if i > 0 && !prev_was_upper && !result.ends_with('_') {
570                result.push('_');
571            }
572            result.push(c.to_ascii_lowercase());
573            prev_was_upper = true;
574        } else {
575            result.push(c);
576            prev_was_upper = false;
577        }
578    }
579
580    while result.contains("__") {
581        result = result.replace("__", "_");
582    }
583
584    result.trim_matches('_').to_string()
585}
586
587fn items_are_tensor_elements(items: &[Item]) -> bool {
588    items.iter().all(|item| {
589        match item {
590            Item::Scalar(Value::Int(_)) => true,
591            Item::Scalar(Value::Float(_)) => true,
592            Item::Scalar(Value::Tensor(_)) => true,
593            Item::Object(obj) if obj.len() == 1 => {
594                matches!(obj.get("item"), Some(Item::Scalar(Value::Tensor(_))))
595            }
596            _ => false,
597        }
598    })
599}
600
601fn items_to_tensor(items: &[Item]) -> Result<Tensor, String> {
602    let mut tensor_items = Vec::new();
603
604    for item in items {
605        let tensor = match item {
606            Item::Scalar(Value::Int(n)) => Tensor::Scalar(*n as f64),
607            Item::Scalar(Value::Float(f)) => Tensor::Scalar(*f),
608            Item::Scalar(Value::Tensor(t)) => t.clone(),
609            Item::Object(obj) if obj.len() == 1 => {
610                if let Some(Item::Scalar(Value::Tensor(t))) = obj.get("item") {
611                    t.clone()
612                } else {
613                    return Err("Cannot convert non-numeric item to tensor".to_string());
614                }
615            }
616            _ => return Err("Cannot convert non-numeric item to tensor".to_string()),
617        };
618        tensor_items.push(tensor);
619    }
620
621    Ok(Tensor::Array(tensor_items))
622}
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627
628    #[test]
629    fn test_stream_config_default() {
630        let config = StreamConfig::default();
631        assert_eq!(config.buffer_size, 65536);
632        assert_eq!(config.max_recursion_depth, 100);
633        assert_eq!(config.max_batch_size, 1000);
634        assert_eq!(config.default_type_name, "Item");
635        assert_eq!(config.version, (1, 0));
636        assert!(config.infer_lists);
637    }
638
639    #[test]
640    fn test_stream_config_custom() {
641        let config = StreamConfig {
642            buffer_size: 131072,
643            max_recursion_depth: 50,
644            max_batch_size: 500,
645            default_type_name: "CustomItem".to_string(),
646            version: (2, 0),
647            infer_lists: false,
648        };
649        assert_eq!(config.buffer_size, 131072);
650        assert_eq!(config.max_recursion_depth, 50);
651        assert_eq!(config.max_batch_size, 500);
652        assert_eq!(config.default_type_name, "CustomItem");
653        assert_eq!(config.version, (2, 0));
654        assert!(!config.infer_lists);
655    }
656
657    #[test]
658    fn test_stream_item_construction() {
659        let item = StreamItem {
660            key: "test".to_string(),
661            value: Item::Scalar(Value::String("value".to_string())),
662        };
663        assert_eq!(item.key, "test");
664        assert_eq!(
665            item.value.as_scalar(),
666            Some(&Value::String("value".to_string()))
667        );
668    }
669
670    #[test]
671    fn test_parse_value_string() {
672        assert_eq!(parse_value("hello"), Ok(Value::String("hello".to_string())));
673    }
674
675    #[test]
676    fn test_parse_value_bool() {
677        assert_eq!(parse_value("true"), Ok(Value::Bool(true)));
678        assert_eq!(parse_value("false"), Ok(Value::Bool(false)));
679    }
680
681    #[test]
682    fn test_parse_value_int() {
683        assert_eq!(parse_value("42"), Ok(Value::Int(42)));
684    }
685
686    #[test]
687    fn test_parse_value_float() {
688        match parse_value("3.14") {
689            Ok(Value::Float(f)) => assert!((f - 3.14).abs() < 0.001),
690            _ => panic!("Expected float"),
691        }
692    }
693
694    #[test]
695    fn test_parse_value_null() {
696        assert_eq!(parse_value(""), Ok(Value::Null));
697        assert_eq!(parse_value("   "), Ok(Value::Null));
698    }
699
700    #[test]
701    fn test_to_hedl_key_pascal_case() {
702        assert_eq!(to_hedl_key("Category"), "category");
703        assert_eq!(to_hedl_key("UserPost"), "user_post");
704    }
705
706    #[test]
707    fn test_to_hedl_key_lowercase() {
708        assert_eq!(to_hedl_key("users"), "users");
709    }
710
711    #[test]
712    fn test_infer_schema_from_objects() {
713        let items = vec![
714            Item::Object({
715                let mut m = BTreeMap::new();
716                m.insert("id".to_string(), Item::Scalar(Value::String("1".to_string())));
717                m.insert("name".to_string(), Item::Scalar(Value::String("Alice".to_string())));
718                m
719            }),
720        ];
721        let schema = infer_schema(&items).unwrap();
722        assert!(schema.contains(&"id".to_string()));
723        assert!(schema.contains(&"name".to_string()));
724    }
725
726    #[test]
727    fn test_items_are_tensor_elements_numeric() {
728        let items = vec![
729            Item::Scalar(Value::Int(1)),
730            Item::Scalar(Value::Float(2.0)),
731            Item::Scalar(Value::Int(3)),
732        ];
733        assert!(items_are_tensor_elements(&items));
734    }
735
736    #[test]
737    fn test_items_are_tensor_elements_non_numeric() {
738        let items = vec![
739            Item::Scalar(Value::Int(1)),
740            Item::Scalar(Value::String("hello".to_string())),
741        ];
742        assert!(!items_are_tensor_elements(&items));
743    }
744}