Skip to main content

hedl_xml/
streaming.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Streaming XML parser for handling large documents
19//!
20//! This module provides memory-efficient streaming parsing for multi-gigabyte XML files.
21//! Instead of loading the entire document into memory, items are yielded incrementally.
22//!
23//! # Features
24//!
25//! - Memory-efficient: Process files larger than available RAM
26//! - Incremental: Yields items as they're parsed
27//! - Configurable: Adjustable buffer sizes and recursion limits
28//! - Type-safe: Returns `Result` for error handling
29//!
30//! # Examples
31//!
32//! ```text
33//! use std::fs::File;
34//! use hedl_xml::streaming::{from_xml_stream, StreamConfig};
35//!
36//! let file = File::open("large.xml")?;
37//! let config = StreamConfig::default();
38//!
39//! for result in from_xml_stream(file, &config)? {
40//!     match result {
41//!         Ok(item) => println!("Processed: {:?}", item),
42//!         Err(e) => eprintln!("Error: {}", e),
43//!     }
44//! }
45//! ```
46
47use hedl_core::convert::parse_reference;
48use hedl_core::lex::Tensor;
49use hedl_core::lex::{parse_expression_token, singularize_and_capitalize};
50use hedl_core::{Item, MatrixList, Node, Value};
51use quick_xml::events::Event;
52use quick_xml::Reader;
53use std::collections::BTreeMap;
54use std::io::{BufRead, BufReader, Read};
55
56// Re-export EntityPolicy from from_xml module
57pub use crate::from_xml::EntityPolicy;
58
59/// Configuration for streaming XML parsing
60#[derive(Debug, Clone)]
61pub struct StreamConfig {
62    /// Buffer size for reading chunks (default: 64KB)
63    pub buffer_size: usize,
64    /// Maximum recursion depth (default: 100)
65    pub max_recursion_depth: usize,
66    /// Maximum list size before yielding (default: 1000)
67    pub max_batch_size: usize,
68    /// Default type name for inferred lists
69    pub default_type_name: String,
70    /// HEDL version
71    pub version: (u32, u32),
72    /// Try to infer list structures from repeated elements
73    pub infer_lists: bool,
74
75    /// Entity handling policy (XXE prevention)
76    pub entity_policy: EntityPolicy,
77
78    /// Enable security event logging
79    pub log_security_events: bool,
80}
81
82/// Position information for progress tracking and error reporting
83#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
84pub struct StreamPosition {
85    /// Byte offset from start of stream
86    pub byte_offset: u64,
87    /// Number of items parsed so far
88    pub items_parsed: usize,
89}
90
91impl Default for StreamConfig {
92    fn default() -> Self {
93        Self {
94            buffer_size: 65536, // 64KB
95            max_recursion_depth: 100,
96            max_batch_size: 1000,
97            default_type_name: "Item".to_string(),
98            version: (1, 0),
99            infer_lists: true,
100            entity_policy: EntityPolicy::default(),
101            log_security_events: false,
102        }
103    }
104}
105
106/// A streaming XML parser that yields items incrementally
107///
108/// This iterator yields `StreamItem` results as the XML is parsed.
109/// Memory usage is bounded by the `buffer_size` configuration.
110pub struct XmlStreamingParser<R: Read> {
111    reader: Reader<BufReader<R>>,
112    config: StreamConfig,
113    root_element_name: String,
114    root_parsed: bool,
115    exhausted: bool,
116    buf: Vec<u8>,
117    /// Current position in the stream for progress tracking
118    position: StreamPosition,
119}
120
121/// An item yielded by the streaming parser
122#[derive(Debug, Clone)]
123pub struct StreamItem {
124    /// Key/field name in the HEDL document
125    pub key: String,
126    /// The parsed value (scalar, object, or list)
127    pub value: Item,
128}
129
130impl<R: Read> XmlStreamingParser<R> {
131    /// Create a new streaming parser
132    pub fn new(reader: R, config: StreamConfig) -> Result<Self, String> {
133        let buf_reader = BufReader::with_capacity(config.buffer_size, reader);
134        let xml_reader = Reader::from_reader(buf_reader);
135
136        Ok(XmlStreamingParser {
137            reader: xml_reader,
138            config,
139            root_element_name: String::new(),
140            root_parsed: false,
141            exhausted: false,
142            buf: Vec::with_capacity(8192),
143            position: StreamPosition::default(),
144        })
145    }
146
147    /// Get the current stream position for progress tracking
148    ///
149    /// Returns the byte offset and number of items parsed so far.
150    /// Useful for progress bars and error reporting.
151    #[inline]
152    pub fn position(&self) -> StreamPosition {
153        StreamPosition {
154            byte_offset: self.reader.buffer_position(),
155            items_parsed: self.position.items_parsed,
156        }
157    }
158
159    /// Get the number of bytes processed so far
160    #[inline]
161    pub fn bytes_processed(&self) -> u64 {
162        self.reader.buffer_position()
163    }
164
165    /// Get the number of items parsed so far
166    #[inline]
167    pub fn items_parsed(&self) -> usize {
168        self.position.items_parsed
169    }
170
171    /// Internal method to find and parse the root element
172    fn find_root(&mut self) -> Result<bool, String> {
173        loop {
174            self.buf.clear();
175            match self.reader.read_event_into(&mut self.buf) {
176                Ok(Event::DocType(_e)) => {
177                    if self.config.log_security_events {
178                        eprintln!(
179                            "[SECURITY] DTD detected in streaming XML at position {}",
180                            self.reader.buffer_position()
181                        );
182                    }
183
184                    if self.config.entity_policy == EntityPolicy::RejectDtd {
185                        return Err(format!(
186                            "DOCTYPE declaration rejected at position {} (XXE prevention)",
187                            self.reader.buffer_position()
188                        ));
189                    }
190                    // Continue for other policies
191                }
192                Ok(Event::Start(e)) | Ok(Event::Empty(e)) => {
193                    self.root_element_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
194                    self.root_parsed = true;
195                    return Ok(true);
196                }
197                Ok(Event::Eof) => return Ok(false),
198                Err(e) => {
199                    return Err(format!(
200                        "XML parse error at position {}: {}",
201                        self.reader.buffer_position(),
202                        e
203                    ))
204                }
205                _ => {}
206            }
207        }
208    }
209
210    /// Parse the next element at the root level
211    fn parse_next_root_element(&mut self) -> Result<Option<StreamItem>, String> {
212        loop {
213            self.buf.clear();
214            match self.reader.read_event_into(&mut self.buf) {
215                Ok(Event::Start(e)) => {
216                    let raw_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
217                    let name = to_hedl_key(&raw_name);
218                    let elem_owned = e.to_owned();
219
220                    let item = parse_element(&mut self.reader, &elem_owned, &self.config, 1)?;
221                    return Ok(Some(StreamItem {
222                        key: name,
223                        value: item,
224                    }));
225                }
226                Ok(Event::Empty(e)) => {
227                    let raw_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
228                    let name = to_hedl_key(&raw_name);
229                    let elem_owned = e.to_owned();
230
231                    let item = parse_empty_element(&elem_owned, &self.config)?;
232                    return Ok(Some(StreamItem {
233                        key: name,
234                        value: item,
235                    }));
236                }
237                Ok(Event::End(e)) => {
238                    let end_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
239                    if end_name == self.root_element_name {
240                        return Ok(None); // End of root element
241                    }
242                }
243                Ok(Event::Eof) => return Ok(None),
244                Err(e) => {
245                    return Err(format!(
246                        "XML parse error at position {}: {}",
247                        self.reader.buffer_position(),
248                        e
249                    ))
250                }
251                _ => {}
252            }
253        }
254    }
255}
256
257impl<R: Read> Iterator for XmlStreamingParser<R> {
258    type Item = Result<StreamItem, String>;
259
260    fn next(&mut self) -> Option<Self::Item> {
261        if self.exhausted {
262            return None;
263        }
264
265        // If we haven't found the root element yet, do that first
266        if !self.root_parsed {
267            match self.find_root() {
268                Ok(true) => {
269                    // Root element found, continue to next element
270                }
271                Ok(false) => {
272                    self.exhausted = true;
273                    return None;
274                }
275                Err(e) => {
276                    self.exhausted = true;
277                    return Some(Err(e));
278                }
279            }
280        }
281
282        // Try to parse the next element
283        match self.parse_next_root_element() {
284            Ok(Some(item)) => {
285                self.position.items_parsed += 1;
286                Some(Ok(item))
287            }
288            Ok(None) => {
289                self.exhausted = true;
290                None
291            }
292            Err(e) => {
293                self.exhausted = true;
294                Some(Err(e))
295            }
296        }
297    }
298}
299
300/// Create a streaming XML parser from a reader
301///
302/// Returns an iterator that yields `Result<StreamItem, String>` as items are parsed.
303/// This is memory-efficient for multi-gigabyte XML files.
304///
305/// # Examples
306///
307/// ```no_run
308/// use std::fs::File;
309/// use hedl_xml::streaming::{from_xml_stream, StreamConfig};
310///
311/// let file = File::open("data.xml")?;
312/// let config = StreamConfig::default();
313///
314/// let mut count = 0;
315/// for result in from_xml_stream(file, &config)? {
316///     match result {
317///         Ok(_item) => count += 1,
318///         Err(e) => eprintln!("Parse error: {}", e),
319///     }
320/// }
321/// println!("Processed {} items", count);
322/// # Ok::<(), Box<dyn std::error::Error>>(())
323/// ```
324pub fn from_xml_stream<R: Read>(
325    reader: R,
326    config: &StreamConfig,
327) -> Result<XmlStreamingParser<R>, String> {
328    XmlStreamingParser::new(reader, config.clone())
329}
330
331// ============================================================================
332// Parsing functions (shared with from_xml module)
333// ============================================================================
334
335fn parse_element<R>(
336    reader: &mut Reader<R>,
337    elem: &quick_xml::events::BytesStart<'_>,
338    config: &StreamConfig,
339    depth: usize,
340) -> Result<Item, String>
341where
342    R: BufRead,
343{
344    if depth > config.max_recursion_depth {
345        return Err(format!(
346            "XML recursion depth exceeded (max: {})",
347            config.max_recursion_depth
348        ));
349    }
350
351    let name = String::from_utf8_lossy(elem.name().as_ref()).to_string();
352
353    // Extract attributes
354    let mut attributes = BTreeMap::new();
355    let mut is_reference = false;
356    for attr in elem.attributes().flatten() {
357        let raw_key = String::from_utf8_lossy(attr.key.as_ref()).to_string();
358        let value = String::from_utf8_lossy(&attr.value).to_string();
359
360        if raw_key == "__hedl_type__" {
361            if value == "ref" {
362                is_reference = true;
363            }
364            continue;
365        }
366
367        let key = to_hedl_key(&raw_key);
368        attributes.insert(key, value);
369    }
370
371    // Parse content
372    let mut text_content = String::new();
373    let mut child_elements: BTreeMap<String, Vec<Item>> = BTreeMap::new();
374    let mut has_children = false;
375    let mut buf = Vec::new();
376
377    loop {
378        buf.clear();
379        match reader.read_event_into(&mut buf) {
380            Ok(Event::Start(e)) => {
381                has_children = true;
382                let raw_child_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
383                let child_name = to_hedl_key(&raw_child_name);
384                let elem_owned = e.to_owned();
385                let child_item = parse_element(reader, &elem_owned, config, depth + 1)?;
386
387                child_elements
388                    .entry(child_name)
389                    .or_default()
390                    .push(child_item);
391            }
392            Ok(Event::Empty(e)) => {
393                has_children = true;
394                let raw_child_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
395                let child_name = to_hedl_key(&raw_child_name);
396                let elem_owned = e.to_owned();
397                let child_item = parse_empty_element(&elem_owned, config)?;
398
399                child_elements
400                    .entry(child_name)
401                    .or_default()
402                    .push(child_item);
403            }
404            Ok(Event::Text(e)) => {
405                let content = e
406                    .xml_content()
407                    .map_err(|e| format!("Text decode error: {}", e))?;
408                text_content.push_str(&content);
409            }
410            Ok(Event::GeneralRef(e)) => {
411                // Handle entity references (quick-xml 0.38+ reports these as separate events)
412                let ref_name = e.decode().map_err(|e| format!("Ref decode error: {}", e))?;
413                let unescaped = match ref_name.as_ref() {
414                    "amp" => "&",
415                    "lt" => "<",
416                    "gt" => ">",
417                    "quot" => "\"",
418                    "apos" => "'",
419                    _ => return Err(format!("Unknown entity reference: {}", ref_name)),
420                };
421                text_content.push_str(unescaped);
422            }
423            Ok(Event::End(e)) => {
424                let end_name = String::from_utf8_lossy(e.name().as_ref()).to_string();
425                if end_name == name {
426                    break;
427                }
428            }
429            Ok(Event::Eof) => break,
430            Err(e) => return Err(format!("XML parse error: {}", e)),
431            _ => {}
432        }
433    }
434
435    // Determine item type
436    if has_children {
437        let mut result_children = BTreeMap::new();
438        for (child_name, items) in child_elements {
439            if items.len() > 1 && config.infer_lists {
440                if child_name == "item" && items_are_tensor_elements(&items) {
441                    let tensor = items_to_tensor(&items)?;
442                    result_children
443                        .insert(child_name, Item::Scalar(Value::Tensor(Box::new(tensor))));
444                } else {
445                    let list = items_to_matrix_list(&child_name, items, config)?;
446                    result_children.insert(child_name, Item::List(list));
447                }
448            } else if let Some(item) = items.into_iter().next() {
449                result_children.insert(child_name, item);
450            }
451        }
452
453        // Check for flattening
454        if result_children.len() == 1 {
455            // SAFETY: len() == 1 guarantees at least one element
456            let (child_key, child_item) = result_children.iter().next().expect("len == 1");
457            if let Item::List(list) = child_item {
458                let has_nested_children = list
459                    .rows
460                    .iter()
461                    .any(|node| node.children().map(|c| !c.is_empty()).unwrap_or(false));
462                if !has_nested_children {
463                    let parent_singular =
464                        singularize_and_capitalize(&to_hedl_key(&name)).to_lowercase();
465                    let child_type = singularize_and_capitalize(child_key).to_lowercase();
466                    if parent_singular == child_type {
467                        // SAFETY: len() == 1 guarantees at least one element
468                        return Ok(result_children.into_values().next().expect("len == 1"));
469                    }
470                }
471            }
472        }
473
474        Ok(Item::Object(result_children))
475    } else if !text_content.trim().is_empty() {
476        let value = if is_reference {
477            Value::Reference(parse_reference(text_content.trim())?)
478        } else {
479            parse_value_with_config(&text_content, config)?
480        };
481        Ok(Item::Scalar(value))
482    } else if !attributes.is_empty() {
483        let mut obj = BTreeMap::new();
484        for (key, value_str) in attributes {
485            let value = parse_value_with_config(&value_str, config)?;
486            obj.insert(key, Item::Scalar(value));
487        }
488        Ok(Item::Object(obj))
489    } else {
490        Ok(Item::Scalar(Value::Null))
491    }
492}
493
494fn parse_empty_element(
495    elem: &quick_xml::events::BytesStart<'_>,
496    config: &StreamConfig,
497) -> Result<Item, String> {
498    let mut attributes = BTreeMap::new();
499
500    for attr in elem.attributes().flatten() {
501        let raw_key = String::from_utf8_lossy(attr.key.as_ref()).to_string();
502        let key = to_hedl_key(&raw_key);
503        let value = String::from_utf8_lossy(&attr.value).to_string();
504        attributes.insert(key, value);
505    }
506
507    if attributes.is_empty() {
508        Ok(Item::Scalar(Value::Null))
509    } else if attributes.len() == 1 && attributes.contains_key("value") {
510        // SAFETY: contains_key("value") guarantees get() succeeds
511        let value_str = attributes.get("value").expect("key exists");
512        let value = parse_value_with_config(value_str, config)?;
513        Ok(Item::Scalar(value))
514    } else {
515        let mut obj = BTreeMap::new();
516        for (key, value_str) in attributes {
517            let value = parse_value_with_config(&value_str, config)?;
518            obj.insert(key, Item::Scalar(value));
519        }
520        Ok(Item::Object(obj))
521    }
522}
523
524fn parse_value_with_config(s: &str, config: &StreamConfig) -> Result<Value, String> {
525    let trimmed = s.trim();
526
527    // Detect entity references (&entity;)
528    if trimmed.contains('&') && trimmed.contains(';') {
529        if config.log_security_events {
530            eprintln!("[SECURITY] Entity reference detected in value: {}", trimmed);
531        }
532
533        // Check for potentially malicious entity patterns
534        if (trimmed.contains("&xxe;")
535            || trimmed.contains("&file;")
536            || trimmed.contains("&passwd;")
537            || trimmed.contains("&secret;"))
538            && config.entity_policy == EntityPolicy::WarnOnEntities
539        {
540            eprintln!(
541                "[WARNING] Suspicious entity reference detected: {}",
542                trimmed
543            );
544        }
545    }
546
547    if trimmed.is_empty() {
548        return Ok(Value::Null);
549    }
550
551    if trimmed.starts_with("$(") && trimmed.ends_with(')') {
552        let expr =
553            parse_expression_token(trimmed).map_err(|e| format!("Invalid expression: {}", e))?;
554        return Ok(Value::Expression(Box::new(expr)));
555    }
556
557    if trimmed == "true" {
558        return Ok(Value::Bool(true));
559    }
560    if trimmed == "false" {
561        return Ok(Value::Bool(false));
562    }
563
564    if let Ok(i) = trimmed.parse::<i64>() {
565        return Ok(Value::Int(i));
566    }
567    if let Ok(f) = trimmed.parse::<f64>() {
568        return Ok(Value::Float(f));
569    }
570
571    Ok(Value::String(trimmed.to_string().into()))
572}
573
574#[cfg(test)]
575fn parse_value(s: &str) -> Result<Value, String> {
576    // Legacy function for backward compatibility
577    let config = StreamConfig::default();
578    parse_value_with_config(s, &config)
579}
580
581fn items_to_matrix_list(
582    name: &str,
583    items: Vec<Item>,
584    _config: &StreamConfig,
585) -> Result<MatrixList, String> {
586    let type_name = singularize_and_capitalize(name);
587    let schema = infer_schema(&items)?;
588
589    let mut rows = Vec::new();
590    for (idx, item) in items.into_iter().enumerate() {
591        let node = item_to_node(&type_name, &schema, item, idx)?;
592        rows.push(node);
593    }
594
595    Ok(MatrixList {
596        type_name,
597        schema,
598        rows,
599        count_hint: None,
600    })
601}
602
603fn infer_schema(items: &[Item]) -> Result<Vec<String>, String> {
604    if let Some(Item::Object(first_obj)) = items.first() {
605        let mut keys: Vec<_> = first_obj
606            .iter()
607            .filter(|(_, item)| matches!(item, Item::Scalar(_)))
608            .map(|(k, _)| k.clone())
609            .collect();
610        keys.sort();
611
612        if let Some(pos) = keys.iter().position(|k| k == "id") {
613            keys.remove(pos);
614            keys.insert(0, "id".to_string());
615        } else {
616            keys.insert(0, "id".to_string());
617        }
618
619        Ok(keys)
620    } else {
621        Ok(vec!["id".to_string(), "value".to_string()])
622    }
623}
624
625fn item_to_node(
626    type_name: &str,
627    schema: &[String],
628    item: Item,
629    idx: usize,
630) -> Result<Node, String> {
631    match item {
632        Item::Object(obj) => {
633            let id = obj
634                .get(&schema[0])
635                .and_then(|i| i.as_scalar())
636                .and_then(|v| v.as_str())
637                .map(|s| s.to_string())
638                .unwrap_or_else(|| format!("{}", idx));
639
640            let mut fields = Vec::new();
641            for col in schema {
642                let value = obj
643                    .get(col)
644                    .and_then(|i| i.as_scalar())
645                    .cloned()
646                    .unwrap_or(Value::Null);
647                fields.push(value);
648            }
649
650            let mut children: BTreeMap<String, Vec<Node>> = BTreeMap::new();
651            for child_item in obj.values() {
652                if let Item::List(child_list) = child_item {
653                    children.insert(child_list.type_name.clone(), child_list.rows.clone());
654                }
655            }
656
657            Ok(Node {
658                type_name: type_name.to_string(),
659                id,
660                fields: fields.into(),
661                children: if children.is_empty() {
662                    None
663                } else {
664                    Some(Box::new(children))
665                },
666                child_count: 0,
667            })
668        }
669        Item::Scalar(value) => {
670            let id = format!("{}", idx);
671            Ok(Node {
672                type_name: type_name.to_string(),
673                id: id.clone(),
674                fields: vec![Value::String(id.into()), value].into(),
675                children: None,
676                child_count: 0,
677            })
678        }
679        Item::List(_) => Err("Cannot convert nested list to node".to_string()),
680    }
681}
682
683fn to_hedl_key(s: &str) -> String {
684    let mut result = String::with_capacity(s.len() + 4); // Pre-allocate with small buffer
685    let mut prev_was_upper = false;
686    let mut prev_was_underscore = false;
687
688    for (i, c) in s.chars().enumerate() {
689        if c.is_ascii_uppercase() {
690            // Insert underscore before uppercase if not at start, not after underscore, and previous wasn't uppercase
691            if i > 0 && !prev_was_upper && !prev_was_underscore {
692                result.push('_');
693            }
694            result.push(c.to_ascii_lowercase());
695            prev_was_upper = true;
696            prev_was_underscore = false;
697        } else if c == '_' {
698            // Only add underscore if previous character wasn't also underscore
699            if !prev_was_underscore && !result.is_empty() {
700                result.push(c);
701            }
702            prev_was_underscore = true;
703            prev_was_upper = false;
704        } else {
705            result.push(c);
706            prev_was_upper = false;
707            prev_was_underscore = false;
708        }
709    }
710
711    // Trim trailing underscores (already handled leading via !result.is_empty() check)
712    while result.ends_with('_') {
713        result.pop();
714    }
715
716    result
717}
718
719fn items_are_tensor_elements(items: &[Item]) -> bool {
720    items.iter().all(|item| match item {
721        Item::Scalar(Value::Int(_)) => true,
722        Item::Scalar(Value::Float(_)) => true,
723        Item::Scalar(Value::Tensor(_)) => true,
724        Item::Object(obj) if obj.len() == 1 => {
725            matches!(obj.get("item"), Some(Item::Scalar(Value::Tensor(_))))
726        }
727        _ => false,
728    })
729}
730
731fn items_to_tensor(items: &[Item]) -> Result<Tensor, String> {
732    let mut tensor_items = Vec::new();
733
734    for item in items {
735        let tensor = match item {
736            Item::Scalar(Value::Int(n)) => Tensor::Scalar(*n as f64),
737            Item::Scalar(Value::Float(f)) => Tensor::Scalar(*f),
738            Item::Scalar(Value::Tensor(t)) => (**t).clone(),
739            Item::Object(obj) if obj.len() == 1 => {
740                if let Some(Item::Scalar(Value::Tensor(t))) = obj.get("item") {
741                    (**t).clone()
742                } else {
743                    return Err("Cannot convert non-numeric item to tensor".to_string());
744                }
745            }
746            _ => return Err("Cannot convert non-numeric item to tensor".to_string()),
747        };
748        tensor_items.push(tensor);
749    }
750
751    Ok(Tensor::Array(tensor_items))
752}
753
754#[cfg(test)]
755mod tests {
756    use super::*;
757
758    #[test]
759    fn test_stream_config_default() {
760        let config = StreamConfig::default();
761        assert_eq!(config.buffer_size, 65536);
762        assert_eq!(config.max_recursion_depth, 100);
763        assert_eq!(config.max_batch_size, 1000);
764        assert_eq!(config.default_type_name, "Item");
765        assert_eq!(config.version, (1, 0));
766        assert!(config.infer_lists);
767    }
768
769    #[test]
770    fn test_stream_config_custom() {
771        let config = StreamConfig {
772            buffer_size: 131072,
773            max_recursion_depth: 50,
774            max_batch_size: 500,
775            default_type_name: "CustomItem".to_string(),
776            version: (2, 0),
777            infer_lists: false,
778            entity_policy: EntityPolicy::RejectDtd,
779            log_security_events: true,
780        };
781        assert_eq!(config.buffer_size, 131072);
782        assert_eq!(config.max_recursion_depth, 50);
783        assert_eq!(config.max_batch_size, 500);
784        assert_eq!(config.default_type_name, "CustomItem");
785        assert_eq!(config.version, (2, 0));
786        assert!(!config.infer_lists);
787    }
788
789    #[test]
790    fn test_stream_item_construction() {
791        let item = StreamItem {
792            key: "test".to_string(),
793            value: Item::Scalar(Value::String("value".to_string().into())),
794        };
795        assert_eq!(item.key, "test");
796        assert_eq!(
797            item.value.as_scalar(),
798            Some(&Value::String("value".to_string().into()))
799        );
800    }
801
802    #[test]
803    fn test_parse_value_string() {
804        assert_eq!(
805            parse_value("hello"),
806            Ok(Value::String("hello".to_string().into()))
807        );
808    }
809
810    #[test]
811    fn test_parse_value_bool() {
812        assert_eq!(parse_value("true"), Ok(Value::Bool(true)));
813        assert_eq!(parse_value("false"), Ok(Value::Bool(false)));
814    }
815
816    #[test]
817    fn test_parse_value_int() {
818        assert_eq!(parse_value("42"), Ok(Value::Int(42)));
819    }
820
821    #[test]
822    fn test_parse_value_float() {
823        match parse_value("4.56") {
824            Ok(Value::Float(f)) => assert!((f - 4.56).abs() < 0.001),
825            _ => panic!("Expected float"),
826        }
827    }
828
829    #[test]
830    fn test_parse_value_null() {
831        assert_eq!(parse_value(""), Ok(Value::Null));
832        assert_eq!(parse_value("   "), Ok(Value::Null));
833    }
834
835    #[test]
836    fn test_to_hedl_key_pascal_case() {
837        assert_eq!(to_hedl_key("Category"), "category");
838        assert_eq!(to_hedl_key("UserPost"), "user_post");
839    }
840
841    #[test]
842    fn test_to_hedl_key_lowercase() {
843        assert_eq!(to_hedl_key("users"), "users");
844    }
845
846    #[test]
847    fn test_infer_schema_from_objects() {
848        let items = vec![Item::Object({
849            let mut m = BTreeMap::new();
850            m.insert(
851                "id".to_string(),
852                Item::Scalar(Value::String("1".to_string().into())),
853            );
854            m.insert(
855                "name".to_string(),
856                Item::Scalar(Value::String("Alice".to_string().into())),
857            );
858            m
859        })];
860        let schema = infer_schema(&items).unwrap();
861        assert!(schema.contains(&"id".to_string()));
862        assert!(schema.contains(&"name".to_string()));
863    }
864
865    #[test]
866    fn test_items_are_tensor_elements_numeric() {
867        let items = vec![
868            Item::Scalar(Value::Int(1)),
869            Item::Scalar(Value::Float(2.0)),
870            Item::Scalar(Value::Int(3)),
871        ];
872        assert!(items_are_tensor_elements(&items));
873    }
874
875    #[test]
876    fn test_items_are_tensor_elements_non_numeric() {
877        let items = vec![
878            Item::Scalar(Value::Int(1)),
879            Item::Scalar(Value::String("hello".to_string().into())),
880        ];
881        assert!(!items_are_tensor_elements(&items));
882    }
883}