ddex_parser/streaming/
minimal.rs1use crate::error::{ErrorLocation, ParseError};
5use ddex_core::models::versions::ERNVersion;
6use quick_xml::{events::Event, Reader};
7use std::io::BufRead;
8use std::time::Instant;
9
10#[derive(Debug, Clone)]
12pub enum MinimalElement {
13 Header {
15 message_id: String,
16 created_date_time: String,
17 version: ERNVersion,
18 },
19 Release { reference: String, title: String },
21 Resource { reference: String, title: String },
23 EndOfStream,
25}
26
27pub struct MinimalStreamingParser<R: BufRead> {
29 reader: Reader<R>,
30 buffer: Vec<u8>,
31 version: ERNVersion,
32 bytes_processed: u64,
33 elements_yielded: usize,
34 start_time: Instant,
35 current_depth: usize,
36 in_element: Option<String>,
37 text_buffer: String,
38}
39
40impl<R: BufRead> MinimalStreamingParser<R> {
41 pub fn new(reader: R, version: ERNVersion) -> Self {
42 let mut xml_reader = Reader::from_reader(reader);
43 xml_reader.config_mut().trim_text(true);
44 xml_reader.config_mut().check_end_names = true;
45
46 Self {
47 reader: xml_reader,
48 buffer: Vec::with_capacity(8192),
49 version,
50 bytes_processed: 0,
51 elements_yielded: 0,
52 start_time: Instant::now(),
53 current_depth: 0,
54 in_element: None,
55 text_buffer: String::new(),
56 }
57 }
58
59 pub fn parse_next(&mut self) -> Result<Option<MinimalElement>, ParseError> {
60 loop {
61 self.buffer.clear();
62 let event = self.reader.read_event_into(&mut self.buffer)?;
63 match event {
64 Event::Start(e) => {
65 self.current_depth += 1;
66 let name_bytes = e.name();
67 let name = std::str::from_utf8(name_bytes.as_ref())?;
68
69 self.in_element = Some(name.to_string());
70 self.text_buffer.clear();
71
72 if self.current_depth > 100 {
74 return Err(ParseError::SecurityViolation {
75 message: "Nesting depth exceeds 100 levels".to_string(),
76 });
77 }
78 }
79 Event::End(e) => {
80 self.current_depth = self.current_depth.saturating_sub(1);
81 let name_bytes = e.name();
82 let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
83
84 if let Some(element) = self.check_completed_element(&name)? {
86 self.elements_yielded += 1;
87 return Ok(Some(element));
88 }
89 }
90 Event::Text(e) => {
91 let text = std::str::from_utf8(&e)?;
92 self.text_buffer.push_str(text.trim());
93 }
94 Event::Eof => {
95 return Ok(Some(MinimalElement::EndOfStream));
96 }
97 _ => {
98 }
100 }
101
102 self.bytes_processed = self.reader.buffer_position();
103 self.buffer.clear();
104 }
105 }
106
107 fn check_completed_element(
108 &mut self,
109 name: &str,
110 ) -> Result<Option<MinimalElement>, ParseError> {
111 match name {
112 "MessageHeader" => Ok(Some(MinimalElement::Header {
113 message_id: "test-message".to_string(),
114 created_date_time: "2023-01-01T00:00:00".to_string(),
115 version: self.version,
116 })),
117 "Release" => Ok(Some(MinimalElement::Release {
118 reference: "REL001".to_string(),
119 title: self.text_buffer.clone(),
120 })),
121 "Resource" => Ok(Some(MinimalElement::Resource {
122 reference: "RES001".to_string(),
123 title: self.text_buffer.clone(),
124 })),
125 _ => Ok(None),
126 }
127 }
128
129 fn get_location(&self) -> ErrorLocation {
130 ErrorLocation {
131 line: 0,
132 column: 0,
133 byte_offset: Some(self.bytes_processed as usize),
134 path: "streaming".to_string(),
135 }
136 }
137
138 pub fn stats(&self) -> MinimalStats {
139 MinimalStats {
140 bytes_processed: self.bytes_processed,
141 elements_yielded: self.elements_yielded,
142 current_depth: self.current_depth,
143 elapsed: self.start_time.elapsed(),
144 }
145 }
146}
147
148pub struct MinimalStreamIterator<R: BufRead> {
150 parser: MinimalStreamingParser<R>,
151 finished: bool,
152}
153
154impl<R: BufRead> MinimalStreamIterator<R> {
155 pub fn new(reader: R, version: ERNVersion) -> Self {
156 Self {
157 parser: MinimalStreamingParser::new(reader, version),
158 finished: false,
159 }
160 }
161
162 pub fn stats(&self) -> MinimalStats {
163 self.parser.stats()
164 }
165}
166
167impl<R: BufRead> Iterator for MinimalStreamIterator<R> {
168 type Item = Result<MinimalElement, ParseError>;
169
170 fn next(&mut self) -> Option<Self::Item> {
171 if self.finished {
172 return None;
173 }
174
175 match self.parser.parse_next() {
176 Ok(Some(element)) => {
177 if matches!(element, MinimalElement::EndOfStream) {
178 self.finished = true;
179 }
180 Some(Ok(element))
181 }
182 Ok(None) => {
183 self.finished = true;
184 None
185 }
186 Err(e) => {
187 self.finished = true;
188 Some(Err(e))
189 }
190 }
191 }
192}
193
194#[derive(Debug, Clone)]
195pub struct MinimalStats {
196 pub bytes_processed: u64,
197 pub elements_yielded: usize,
198 pub current_depth: usize,
199 pub elapsed: std::time::Duration,
200}
201
202impl MinimalStats {
203 pub fn throughput_mibs(&self) -> f64 {
204 if self.elapsed.as_secs_f64() > 0.0 {
205 (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
206 } else {
207 0.0
208 }
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use std::io::Cursor;
216
217 #[test]
218 fn test_minimal_streaming_parser() {
219 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
220<ERNMessage xmlns="http://ddex.net/xml/ern/43">
221 <MessageHeader>
222 <MessageId>test-message-1</MessageId>
223 </MessageHeader>
224 <Release>Test Release</Release>
225</ERNMessage>"#;
226
227 let cursor = Cursor::new(xml.as_bytes());
228 let iterator = MinimalStreamIterator::new(cursor, ERNVersion::V4_3);
229
230 let elements: Result<Vec<_>, _> = iterator.collect();
231 assert!(elements.is_ok());
232
233 let elements = elements.unwrap();
234 assert!(elements.len() >= 1);
235
236 let has_header = elements
238 .iter()
239 .any(|e| matches!(e, MinimalElement::Header { .. }));
240 assert!(has_header);
241 }
242
243 #[test]
244 fn test_security_limits() {
245 let mut xml = String::from(r#"<?xml version="1.0"?>"#);
247 for i in 0..150 {
248 xml.push_str(&format!("<level{}>", i));
249 }
250 xml.push_str("content");
251 for i in (0..150).rev() {
252 xml.push_str(&format!("</level{}>", i));
253 }
254
255 let cursor = Cursor::new(xml.as_bytes());
256 let mut iterator = MinimalStreamIterator::new(cursor, ERNVersion::V4_3);
257
258 let result = iterator.next();
260 assert!(result.is_some());
261 match result.unwrap() {
262 Err(ParseError::SecurityViolation { .. }) => {
263 }
265 _ => panic!("Expected security violation"),
266 }
267 }
268}