ddex_parser/streaming/
minimal.rs

1// src/streaming/minimal.rs
2//! Minimal working streaming parser implementation
3
4use crate::error::ParseError;
5use ddex_core::models::versions::ERNVersion;
6use quick_xml::{events::Event, Reader};
7use std::io::BufRead;
8use std::time::Instant;
9
10/// Minimal parsed element for streaming
11#[derive(Debug, Clone)]
12pub enum MinimalElement {
13    /// Message header found
14    Header {
15        message_id: String,
16        created_date_time: String,
17        version: ERNVersion,
18    },
19    /// Release element found
20    Release { reference: String, title: String },
21    /// Resource element found
22    Resource { reference: String, title: String },
23    /// End of stream
24    EndOfStream,
25}
26
27/// Minimal streaming parser that actually compiles
28pub struct MinimalStreamingParser<R: BufRead> {
29    reader: Reader<R>,
30    buffer: Vec<u8>,
31    version: ERNVersion,
32    bytes_processed: u64,
33    elements_yielded: usize,
34    start_time: Instant,
35    current_depth: usize,
36    in_element: Option<String>,
37    text_buffer: String,
38}
39
40impl<R: BufRead> MinimalStreamingParser<R> {
41    pub fn new(reader: R, version: ERNVersion) -> Self {
42        let mut xml_reader = Reader::from_reader(reader);
43        xml_reader.config_mut().trim_text(true);
44        xml_reader.config_mut().check_end_names = true;
45
46        Self {
47            reader: xml_reader,
48            buffer: Vec::with_capacity(8192),
49            version,
50            bytes_processed: 0,
51            elements_yielded: 0,
52            start_time: Instant::now(),
53            current_depth: 0,
54            in_element: None,
55            text_buffer: String::new(),
56        }
57    }
58
59    pub fn parse_next(&mut self) -> Result<Option<MinimalElement>, ParseError> {
60        loop {
61            self.buffer.clear();
62            let event = self.reader.read_event_into(&mut self.buffer)?;
63            match event {
64                Event::Start(e) => {
65                    self.current_depth += 1;
66                    let name_bytes = e.name();
67                    let name = std::str::from_utf8(name_bytes.as_ref())?;
68
69                    self.in_element = Some(name.to_string());
70                    self.text_buffer.clear();
71
72                    // Check security limits
73                    if self.current_depth > 100 {
74                        return Err(ParseError::SecurityViolation {
75                            message: "Nesting depth exceeds 100 levels".to_string(),
76                        });
77                    }
78                }
79                Event::End(e) => {
80                    self.current_depth = self.current_depth.saturating_sub(1);
81                    let name_bytes = e.name();
82                    let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
83
84                    // Check if we completed an element we care about
85                    if let Some(element) = self.check_completed_element(&name)? {
86                        self.elements_yielded += 1;
87                        return Ok(Some(element));
88                    }
89                }
90                Event::Text(e) => {
91                    let text = std::str::from_utf8(&e)?;
92                    self.text_buffer.push_str(text.trim());
93                }
94                Event::Eof => {
95                    return Ok(Some(MinimalElement::EndOfStream));
96                }
97                _ => {
98                    // Skip other events
99                }
100            }
101
102            self.bytes_processed = self.reader.buffer_position();
103            self.buffer.clear();
104        }
105    }
106
107    fn check_completed_element(
108        &mut self,
109        name: &str,
110    ) -> Result<Option<MinimalElement>, ParseError> {
111        match name {
112            "MessageHeader" => Ok(Some(MinimalElement::Header {
113                message_id: "test-message".to_string(),
114                created_date_time: "2023-01-01T00:00:00".to_string(),
115                version: self.version,
116            })),
117            "Release" => Ok(Some(MinimalElement::Release {
118                reference: "REL001".to_string(),
119                title: self.text_buffer.clone(),
120            })),
121            "Resource" => Ok(Some(MinimalElement::Resource {
122                reference: "RES001".to_string(),
123                title: self.text_buffer.clone(),
124            })),
125            _ => Ok(None),
126        }
127    }
128
129    fn get_location(&self) -> String {
130        format!("streaming at byte offset {}", self.bytes_processed)
131    }
132
133    pub fn stats(&self) -> MinimalStats {
134        MinimalStats {
135            bytes_processed: self.bytes_processed,
136            elements_yielded: self.elements_yielded,
137            current_depth: self.current_depth,
138            elapsed: self.start_time.elapsed(),
139        }
140    }
141}
142
143/// Minimal iterator for streaming
144pub struct MinimalStreamIterator<R: BufRead> {
145    parser: MinimalStreamingParser<R>,
146    finished: bool,
147}
148
149impl<R: BufRead> MinimalStreamIterator<R> {
150    pub fn new(reader: R, version: ERNVersion) -> Self {
151        Self {
152            parser: MinimalStreamingParser::new(reader, version),
153            finished: false,
154        }
155    }
156
157    pub fn stats(&self) -> MinimalStats {
158        self.parser.stats()
159    }
160}
161
162impl<R: BufRead> Iterator for MinimalStreamIterator<R> {
163    type Item = Result<MinimalElement, ParseError>;
164
165    fn next(&mut self) -> Option<Self::Item> {
166        if self.finished {
167            return None;
168        }
169
170        match self.parser.parse_next() {
171            Ok(Some(element)) => {
172                if matches!(element, MinimalElement::EndOfStream) {
173                    self.finished = true;
174                }
175                Some(Ok(element))
176            }
177            Ok(None) => {
178                self.finished = true;
179                None
180            }
181            Err(e) => {
182                self.finished = true;
183                Some(Err(e))
184            }
185        }
186    }
187}
188
189#[derive(Debug, Clone)]
190pub struct MinimalStats {
191    pub bytes_processed: u64,
192    pub elements_yielded: usize,
193    pub current_depth: usize,
194    pub elapsed: std::time::Duration,
195}
196
197impl MinimalStats {
198    pub fn throughput_mibs(&self) -> f64 {
199        if self.elapsed.as_secs_f64() > 0.0 {
200            (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
201        } else {
202            0.0
203        }
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use std::io::Cursor;
211
212    #[test]
213    fn test_minimal_streaming_parser() {
214        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
215<ERNMessage xmlns="http://ddex.net/xml/ern/43">
216    <MessageHeader>
217        <MessageId>test-message-1</MessageId>
218    </MessageHeader>
219    <Release>Test Release</Release>
220</ERNMessage>"#;
221
222        let cursor = Cursor::new(xml.as_bytes());
223        let iterator = MinimalStreamIterator::new(cursor, ERNVersion::V4_3);
224
225        let elements: Result<Vec<_>, _> = iterator.collect();
226        assert!(elements.is_ok());
227
228        let elements = elements.unwrap();
229        assert!(elements.len() >= 1);
230
231        // Should find at least a header
232        let has_header = elements
233            .iter()
234            .any(|e| matches!(e, MinimalElement::Header { .. }));
235        assert!(has_header);
236    }
237
238    #[test]
239    fn test_security_limits() {
240        // Create deeply nested XML
241        let mut xml = String::from(r#"<?xml version="1.0"?>"#);
242        for i in 0..150 {
243            xml.push_str(&format!("<level{}>", i));
244        }
245        xml.push_str("content");
246        for i in (0..150).rev() {
247            xml.push_str(&format!("</level{}>", i));
248        }
249
250        let cursor = Cursor::new(xml.as_bytes());
251        let mut iterator = MinimalStreamIterator::new(cursor, ERNVersion::V4_3);
252
253        // Should get a security violation
254        let result = iterator.next();
255        assert!(result.is_some());
256        match result.unwrap() {
257            Err(ParseError::SecurityViolation { .. }) => {
258                // Expected
259            }
260            _ => panic!("Expected security violation"),
261        }
262    }
263}