ddex_parser/streaming/
minimal.rs1use crate::error::ParseError;
5use ddex_core::models::versions::ERNVersion;
6use quick_xml::{events::Event, Reader};
7use std::io::BufRead;
8use std::time::Instant;
9
10#[derive(Debug, Clone)]
12pub enum MinimalElement {
13 Header {
15 message_id: String,
16 created_date_time: String,
17 version: ERNVersion,
18 },
19 Release { reference: String, title: String },
21 Resource { reference: String, title: String },
23 EndOfStream,
25}
26
27pub struct MinimalStreamingParser<R: BufRead> {
29 reader: Reader<R>,
30 buffer: Vec<u8>,
31 version: ERNVersion,
32 bytes_processed: u64,
33 elements_yielded: usize,
34 start_time: Instant,
35 current_depth: usize,
36 in_element: Option<String>,
37 text_buffer: String,
38}
39
40impl<R: BufRead> MinimalStreamingParser<R> {
41 pub fn new(reader: R, version: ERNVersion) -> Self {
42 let mut xml_reader = Reader::from_reader(reader);
43 xml_reader.config_mut().trim_text(true);
44 xml_reader.config_mut().check_end_names = true;
45
46 Self {
47 reader: xml_reader,
48 buffer: Vec::with_capacity(8192),
49 version,
50 bytes_processed: 0,
51 elements_yielded: 0,
52 start_time: Instant::now(),
53 current_depth: 0,
54 in_element: None,
55 text_buffer: String::new(),
56 }
57 }
58
59 pub fn parse_next(&mut self) -> Result<Option<MinimalElement>, ParseError> {
60 loop {
61 self.buffer.clear();
62 let event = self.reader.read_event_into(&mut self.buffer)?;
63 match event {
64 Event::Start(e) => {
65 self.current_depth += 1;
66 let name_bytes = e.name();
67 let name = std::str::from_utf8(name_bytes.as_ref())?;
68
69 self.in_element = Some(name.to_string());
70 self.text_buffer.clear();
71
72 if self.current_depth > 100 {
74 return Err(ParseError::SecurityViolation {
75 message: "Nesting depth exceeds 100 levels".to_string(),
76 });
77 }
78 }
79 Event::End(e) => {
80 self.current_depth = self.current_depth.saturating_sub(1);
81 let name_bytes = e.name();
82 let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
83
84 if let Some(element) = self.check_completed_element(&name)? {
86 self.elements_yielded += 1;
87 return Ok(Some(element));
88 }
89 }
90 Event::Text(e) => {
91 let text = std::str::from_utf8(&e)?;
92 self.text_buffer.push_str(text.trim());
93 }
94 Event::Eof => {
95 return Ok(Some(MinimalElement::EndOfStream));
96 }
97 _ => {
98 }
100 }
101
102 self.bytes_processed = self.reader.buffer_position();
103 self.buffer.clear();
104 }
105 }
106
107 fn check_completed_element(
108 &mut self,
109 name: &str,
110 ) -> Result<Option<MinimalElement>, ParseError> {
111 match name {
112 "MessageHeader" => Ok(Some(MinimalElement::Header {
113 message_id: "test-message".to_string(),
114 created_date_time: "2023-01-01T00:00:00".to_string(),
115 version: self.version,
116 })),
117 "Release" => Ok(Some(MinimalElement::Release {
118 reference: "REL001".to_string(),
119 title: self.text_buffer.clone(),
120 })),
121 "Resource" => Ok(Some(MinimalElement::Resource {
122 reference: "RES001".to_string(),
123 title: self.text_buffer.clone(),
124 })),
125 _ => Ok(None),
126 }
127 }
128
129 fn get_location(&self) -> String {
130 format!("streaming at byte offset {}", self.bytes_processed)
131 }
132
133 pub fn stats(&self) -> MinimalStats {
134 MinimalStats {
135 bytes_processed: self.bytes_processed,
136 elements_yielded: self.elements_yielded,
137 current_depth: self.current_depth,
138 elapsed: self.start_time.elapsed(),
139 }
140 }
141}
142
143pub struct MinimalStreamIterator<R: BufRead> {
145 parser: MinimalStreamingParser<R>,
146 finished: bool,
147}
148
149impl<R: BufRead> MinimalStreamIterator<R> {
150 pub fn new(reader: R, version: ERNVersion) -> Self {
151 Self {
152 parser: MinimalStreamingParser::new(reader, version),
153 finished: false,
154 }
155 }
156
157 pub fn stats(&self) -> MinimalStats {
158 self.parser.stats()
159 }
160}
161
162impl<R: BufRead> Iterator for MinimalStreamIterator<R> {
163 type Item = Result<MinimalElement, ParseError>;
164
165 fn next(&mut self) -> Option<Self::Item> {
166 if self.finished {
167 return None;
168 }
169
170 match self.parser.parse_next() {
171 Ok(Some(element)) => {
172 if matches!(element, MinimalElement::EndOfStream) {
173 self.finished = true;
174 }
175 Some(Ok(element))
176 }
177 Ok(None) => {
178 self.finished = true;
179 None
180 }
181 Err(e) => {
182 self.finished = true;
183 Some(Err(e))
184 }
185 }
186 }
187}
188
189#[derive(Debug, Clone)]
190pub struct MinimalStats {
191 pub bytes_processed: u64,
192 pub elements_yielded: usize,
193 pub current_depth: usize,
194 pub elapsed: std::time::Duration,
195}
196
197impl MinimalStats {
198 pub fn throughput_mibs(&self) -> f64 {
199 if self.elapsed.as_secs_f64() > 0.0 {
200 (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
201 } else {
202 0.0
203 }
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use std::io::Cursor;
211
212 #[test]
213 fn test_minimal_streaming_parser() {
214 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
215<ERNMessage xmlns="http://ddex.net/xml/ern/43">
216 <MessageHeader>
217 <MessageId>test-message-1</MessageId>
218 </MessageHeader>
219 <Release>Test Release</Release>
220</ERNMessage>"#;
221
222 let cursor = Cursor::new(xml.as_bytes());
223 let iterator = MinimalStreamIterator::new(cursor, ERNVersion::V4_3);
224
225 let elements: Result<Vec<_>, _> = iterator.collect();
226 assert!(elements.is_ok());
227
228 let elements = elements.unwrap();
229 assert!(elements.len() >= 1);
230
231 let has_header = elements
233 .iter()
234 .any(|e| matches!(e, MinimalElement::Header { .. }));
235 assert!(has_header);
236 }
237
238 #[test]
239 fn test_security_limits() {
240 let mut xml = String::from(r#"<?xml version="1.0"?>"#);
242 for i in 0..150 {
243 xml.push_str(&format!("<level{}>", i));
244 }
245 xml.push_str("content");
246 for i in (0..150).rev() {
247 xml.push_str(&format!("</level{}>", i));
248 }
249
250 let cursor = Cursor::new(xml.as_bytes());
251 let mut iterator = MinimalStreamIterator::new(cursor, ERNVersion::V4_3);
252
253 let result = iterator.next();
255 assert!(result.is_some());
256 match result.unwrap() {
257 Err(ParseError::SecurityViolation { .. }) => {
258 }
260 _ => panic!("Expected security violation"),
261 }
262 }
263}