ddex_parser/streaming/
minimal.rs

1// src/streaming/minimal.rs
2//! Minimal working streaming parser implementation
3
4use crate::error::{ErrorLocation, ParseError};
5use ddex_core::models::versions::ERNVersion;
6use quick_xml::{events::Event, Reader};
7use std::io::BufRead;
8use std::time::Instant;
9
10/// Minimal parsed element for streaming
11#[derive(Debug, Clone)]
12pub enum MinimalElement {
13    /// Message header found
14    Header {
15        message_id: String,
16        created_date_time: String,
17        version: ERNVersion,
18    },
19    /// Release element found
20    Release { reference: String, title: String },
21    /// Resource element found
22    Resource { reference: String, title: String },
23    /// End of stream
24    EndOfStream,
25}
26
27/// Minimal streaming parser that actually compiles
28pub struct MinimalStreamingParser<R: BufRead> {
29    reader: Reader<R>,
30    buffer: Vec<u8>,
31    version: ERNVersion,
32    bytes_processed: u64,
33    elements_yielded: usize,
34    start_time: Instant,
35    current_depth: usize,
36    in_element: Option<String>,
37    text_buffer: String,
38}
39
40impl<R: BufRead> MinimalStreamingParser<R> {
41    pub fn new(reader: R, version: ERNVersion) -> Self {
42        let mut xml_reader = Reader::from_reader(reader);
43        xml_reader.config_mut().trim_text(true);
44        xml_reader.config_mut().check_end_names = true;
45
46        Self {
47            reader: xml_reader,
48            buffer: Vec::with_capacity(8192),
49            version,
50            bytes_processed: 0,
51            elements_yielded: 0,
52            start_time: Instant::now(),
53            current_depth: 0,
54            in_element: None,
55            text_buffer: String::new(),
56        }
57    }
58
59    pub fn parse_next(&mut self) -> Result<Option<MinimalElement>, ParseError> {
60        loop {
61            self.buffer.clear();
62            let event = self.reader.read_event_into(&mut self.buffer)?;
63            match event {
64                Event::Start(e) => {
65                    self.current_depth += 1;
66                    let name_bytes = e.name();
67                    let name = std::str::from_utf8(name_bytes.as_ref())?;
68
69                    self.in_element = Some(name.to_string());
70                    self.text_buffer.clear();
71
72                    // Check security limits
73                    if self.current_depth > 100 {
74                        return Err(ParseError::SecurityViolation {
75                            message: "Nesting depth exceeds 100 levels".to_string(),
76                        });
77                    }
78                }
79                Event::End(e) => {
80                    self.current_depth = self.current_depth.saturating_sub(1);
81                    let name_bytes = e.name();
82                    let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
83
84                    // Check if we completed an element we care about
85                    if let Some(element) = self.check_completed_element(&name)? {
86                        self.elements_yielded += 1;
87                        return Ok(Some(element));
88                    }
89                }
90                Event::Text(e) => {
91                    let text = std::str::from_utf8(&e)?;
92                    self.text_buffer.push_str(text.trim());
93                }
94                Event::Eof => {
95                    return Ok(Some(MinimalElement::EndOfStream));
96                }
97                _ => {
98                    // Skip other events
99                }
100            }
101
102            self.bytes_processed = self.reader.buffer_position();
103            self.buffer.clear();
104        }
105    }
106
107    fn check_completed_element(
108        &mut self,
109        name: &str,
110    ) -> Result<Option<MinimalElement>, ParseError> {
111        match name {
112            "MessageHeader" => Ok(Some(MinimalElement::Header {
113                message_id: "test-message".to_string(),
114                created_date_time: "2023-01-01T00:00:00".to_string(),
115                version: self.version,
116            })),
117            "Release" => Ok(Some(MinimalElement::Release {
118                reference: "REL001".to_string(),
119                title: self.text_buffer.clone(),
120            })),
121            "Resource" => Ok(Some(MinimalElement::Resource {
122                reference: "RES001".to_string(),
123                title: self.text_buffer.clone(),
124            })),
125            _ => Ok(None),
126        }
127    }
128
129    fn get_location(&self) -> ErrorLocation {
130        ErrorLocation {
131            line: 0,
132            column: 0,
133            byte_offset: Some(self.bytes_processed as usize),
134            path: "streaming".to_string(),
135        }
136    }
137
138    pub fn stats(&self) -> MinimalStats {
139        MinimalStats {
140            bytes_processed: self.bytes_processed,
141            elements_yielded: self.elements_yielded,
142            current_depth: self.current_depth,
143            elapsed: self.start_time.elapsed(),
144        }
145    }
146}
147
148/// Minimal iterator for streaming
149pub struct MinimalStreamIterator<R: BufRead> {
150    parser: MinimalStreamingParser<R>,
151    finished: bool,
152}
153
154impl<R: BufRead> MinimalStreamIterator<R> {
155    pub fn new(reader: R, version: ERNVersion) -> Self {
156        Self {
157            parser: MinimalStreamingParser::new(reader, version),
158            finished: false,
159        }
160    }
161
162    pub fn stats(&self) -> MinimalStats {
163        self.parser.stats()
164    }
165}
166
167impl<R: BufRead> Iterator for MinimalStreamIterator<R> {
168    type Item = Result<MinimalElement, ParseError>;
169
170    fn next(&mut self) -> Option<Self::Item> {
171        if self.finished {
172            return None;
173        }
174
175        match self.parser.parse_next() {
176            Ok(Some(element)) => {
177                if matches!(element, MinimalElement::EndOfStream) {
178                    self.finished = true;
179                }
180                Some(Ok(element))
181            }
182            Ok(None) => {
183                self.finished = true;
184                None
185            }
186            Err(e) => {
187                self.finished = true;
188                Some(Err(e))
189            }
190        }
191    }
192}
193
194#[derive(Debug, Clone)]
195pub struct MinimalStats {
196    pub bytes_processed: u64,
197    pub elements_yielded: usize,
198    pub current_depth: usize,
199    pub elapsed: std::time::Duration,
200}
201
202impl MinimalStats {
203    pub fn throughput_mibs(&self) -> f64 {
204        if self.elapsed.as_secs_f64() > 0.0 {
205            (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
206        } else {
207            0.0
208        }
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use std::io::Cursor;
216
217    #[test]
218    fn test_minimal_streaming_parser() {
219        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
220<ERNMessage xmlns="http://ddex.net/xml/ern/43">
221    <MessageHeader>
222        <MessageId>test-message-1</MessageId>
223    </MessageHeader>
224    <Release>Test Release</Release>
225</ERNMessage>"#;
226
227        let cursor = Cursor::new(xml.as_bytes());
228        let iterator = MinimalStreamIterator::new(cursor, ERNVersion::V4_3);
229
230        let elements: Result<Vec<_>, _> = iterator.collect();
231        assert!(elements.is_ok());
232
233        let elements = elements.unwrap();
234        assert!(elements.len() >= 1);
235
236        // Should find at least a header
237        let has_header = elements
238            .iter()
239            .any(|e| matches!(e, MinimalElement::Header { .. }));
240        assert!(has_header);
241    }
242
243    #[test]
244    fn test_security_limits() {
245        // Create deeply nested XML
246        let mut xml = String::from(r#"<?xml version="1.0"?>"#);
247        for i in 0..150 {
248            xml.push_str(&format!("<level{}>", i));
249        }
250        xml.push_str("content");
251        for i in (0..150).rev() {
252            xml.push_str(&format!("</level{}>", i));
253        }
254
255        let cursor = Cursor::new(xml.as_bytes());
256        let mut iterator = MinimalStreamIterator::new(cursor, ERNVersion::V4_3);
257
258        // Should get a security violation
259        let result = iterator.next();
260        assert!(result.is_some());
261        match result.unwrap() {
262            Err(ParseError::SecurityViolation { .. }) => {
263                // Expected
264            }
265            _ => panic!("Expected security violation"),
266        }
267    }
268}