ddex-parser 0.4.5

High-performance DDEX XML parser with SIMD optimization (40+ MB/s)
Documentation
// src/streaming/minimal.rs
//! Minimal working streaming parser implementation

use crate::error::ParseError;
use ddex_core::models::versions::ERNVersion;
use quick_xml::{events::Event, Reader};
use std::io::BufRead;
use std::time::Instant;

/// Minimal parsed element for streaming
#[derive(Debug, Clone)]
pub enum MinimalElement {
    /// Message header found
    Header {
        message_id: String,
        created_date_time: String,
        version: ERNVersion,
    },
    /// Release element found
    Release { reference: String, title: String },
    /// Resource element found
    Resource { reference: String, title: String },
    /// End of stream
    EndOfStream,
}

/// Minimal streaming parser that actually compiles
pub struct MinimalStreamingParser<R: BufRead> {
    reader: Reader<R>,
    buffer: Vec<u8>,
    version: ERNVersion,
    bytes_processed: u64,
    elements_yielded: usize,
    start_time: Instant,
    current_depth: usize,
    in_element: Option<String>,
    text_buffer: String,
}

impl<R: BufRead> MinimalStreamingParser<R> {
    pub fn new(reader: R, version: ERNVersion) -> Self {
        let mut xml_reader = Reader::from_reader(reader);
        xml_reader.config_mut().trim_text(true);
        xml_reader.config_mut().check_end_names = true;

        Self {
            reader: xml_reader,
            buffer: Vec::with_capacity(8192),
            version,
            bytes_processed: 0,
            elements_yielded: 0,
            start_time: Instant::now(),
            current_depth: 0,
            in_element: None,
            text_buffer: String::new(),
        }
    }

    pub fn parse_next(&mut self) -> Result<Option<MinimalElement>, ParseError> {
        loop {
            self.buffer.clear();
            let event = self.reader.read_event_into(&mut self.buffer)?;
            match event {
                Event::Start(e) => {
                    self.current_depth += 1;
                    let name_bytes = e.name();
                    let name = std::str::from_utf8(name_bytes.as_ref())?;

                    self.in_element = Some(name.to_string());
                    self.text_buffer.clear();

                    // Check security limits
                    if self.current_depth > 100 {
                        return Err(ParseError::SecurityViolation {
                            message: "Nesting depth exceeds 100 levels".to_string(),
                        });
                    }
                }
                Event::End(e) => {
                    self.current_depth = self.current_depth.saturating_sub(1);
                    let name_bytes = e.name();
                    let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();

                    // Check if we completed an element we care about
                    if let Some(element) = self.check_completed_element(&name)? {
                        self.elements_yielded += 1;
                        return Ok(Some(element));
                    }
                }
                Event::Text(e) => {
                    let text = std::str::from_utf8(&e)?;
                    self.text_buffer.push_str(text.trim());
                }
                Event::Eof => {
                    return Ok(Some(MinimalElement::EndOfStream));
                }
                _ => {
                    // Skip other events
                }
            }

            self.bytes_processed = self.reader.buffer_position();
            self.buffer.clear();
        }
    }

    fn check_completed_element(
        &mut self,
        name: &str,
    ) -> Result<Option<MinimalElement>, ParseError> {
        match name {
            "MessageHeader" => Ok(Some(MinimalElement::Header {
                message_id: "test-message".to_string(),
                created_date_time: "2023-01-01T00:00:00".to_string(),
                version: self.version,
            })),
            "Release" => Ok(Some(MinimalElement::Release {
                reference: "REL001".to_string(),
                title: self.text_buffer.clone(),
            })),
            "Resource" => Ok(Some(MinimalElement::Resource {
                reference: "RES001".to_string(),
                title: self.text_buffer.clone(),
            })),
            _ => Ok(None),
        }
    }

    fn get_location(&self) -> String {
        format!("streaming at byte offset {}", self.bytes_processed)
    }

    pub fn stats(&self) -> MinimalStats {
        MinimalStats {
            bytes_processed: self.bytes_processed,
            elements_yielded: self.elements_yielded,
            current_depth: self.current_depth,
            elapsed: self.start_time.elapsed(),
        }
    }
}

/// Minimal iterator for streaming
pub struct MinimalStreamIterator<R: BufRead> {
    parser: MinimalStreamingParser<R>,
    finished: bool,
}

impl<R: BufRead> MinimalStreamIterator<R> {
    pub fn new(reader: R, version: ERNVersion) -> Self {
        Self {
            parser: MinimalStreamingParser::new(reader, version),
            finished: false,
        }
    }

    pub fn stats(&self) -> MinimalStats {
        self.parser.stats()
    }
}

impl<R: BufRead> Iterator for MinimalStreamIterator<R> {
    type Item = Result<MinimalElement, ParseError>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.finished {
            return None;
        }

        match self.parser.parse_next() {
            Ok(Some(element)) => {
                if matches!(element, MinimalElement::EndOfStream) {
                    self.finished = true;
                }
                Some(Ok(element))
            }
            Ok(None) => {
                self.finished = true;
                None
            }
            Err(e) => {
                self.finished = true;
                Some(Err(e))
            }
        }
    }
}

#[derive(Debug, Clone)]
pub struct MinimalStats {
    pub bytes_processed: u64,
    pub elements_yielded: usize,
    pub current_depth: usize,
    pub elapsed: std::time::Duration,
}

impl MinimalStats {
    pub fn throughput_mibs(&self) -> f64 {
        if self.elapsed.as_secs_f64() > 0.0 {
            (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
        } else {
            0.0
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::io::Cursor;

    #[test]
    fn test_minimal_streaming_parser() {
        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<ERNMessage xmlns="http://ddex.net/xml/ern/43">
    <MessageHeader>
        <MessageId>test-message-1</MessageId>
    </MessageHeader>
    <Release>Test Release</Release>
</ERNMessage>"#;

        let cursor = Cursor::new(xml.as_bytes());
        let iterator = MinimalStreamIterator::new(cursor, ERNVersion::V4_3);

        let elements: Result<Vec<_>, _> = iterator.collect();
        assert!(elements.is_ok());

        let elements = elements.unwrap();
        assert!(elements.len() >= 1);

        // Should find at least a header
        let has_header = elements
            .iter()
            .any(|e| matches!(e, MinimalElement::Header { .. }));
        assert!(has_header);
    }

    #[test]
    fn test_security_limits() {
        // Create deeply nested XML
        let mut xml = String::from(r#"<?xml version="1.0"?>"#);
        for i in 0..150 {
            xml.push_str(&format!("<level{}>", i));
        }
        xml.push_str("content");
        for i in (0..150).rev() {
            xml.push_str(&format!("</level{}>", i));
        }

        let cursor = Cursor::new(xml.as_bytes());
        let mut iterator = MinimalStreamIterator::new(cursor, ERNVersion::V4_3);

        // Should get a security violation
        let result = iterator.next();
        assert!(result.is_some());
        match result.unwrap() {
            Err(ParseError::SecurityViolation { .. }) => {
                // Expected
            }
            _ => panic!("Expected security violation"),
        }
    }
}