1#[allow(dead_code)] use crate::error::{ErrorLocation, ParseError};
6use ddex_core::models::streaming_types::*;
7use ddex_core::models::LocalizedString;
8use ddex_core::models::{graph::*, versions::ERNVersion};
9use quick_xml::{events::Event, Reader};
10use std::collections::HashMap;
11use std::io::BufRead;
12use std::time::Instant;
13
14#[derive(Debug, Clone)]
16pub enum StreamingElement {
17 Header(Box<MessageHeader>),
18 Release(Release),
19 Resource(Resource),
20 Party(Party),
21 EndOfStream,
22}
23
24#[derive(Debug, Clone)]
26enum ParserState {
27 Initial,
28 InHeader(Box<MessageHeaderBuilder>),
29 InRelease(Box<ReleaseBuilder>),
30 InResource(Box<ResourceBuilder>),
31 InParty(Box<PartyBuilder>),
32 Complete,
33}
34
35pub struct ComprehensiveStreamingParser<R: BufRead> {
37 reader: Reader<R>,
38 buffer: Vec<u8>,
39 state: ParserState,
40 current_path: Vec<String>,
41 current_depth: usize,
42 text_buffer: String,
43 attributes: HashMap<String, String>,
44 bytes_processed: u64,
45 elements_yielded: usize,
46 start_time: Instant,
47}
48
49impl<R: BufRead> ComprehensiveStreamingParser<R> {
50 pub fn new(reader: R, _version: ERNVersion) -> Self {
51 let mut xml_reader = Reader::from_reader(reader);
52 xml_reader.config_mut().trim_text(true);
53 xml_reader.config_mut().check_end_names = true;
54
55 Self {
56 reader: xml_reader,
57 buffer: Vec::with_capacity(8192),
58 state: ParserState::Initial,
59 current_path: Vec::new(),
60 current_depth: 0,
61 text_buffer: String::new(),
62 attributes: HashMap::new(),
63 bytes_processed: 0,
64 elements_yielded: 0,
65 start_time: Instant::now(),
66 }
67 }
68
69 pub fn parse_next(&mut self) -> Result<Option<StreamingElement>, ParseError> {
70 loop {
71 self.buffer.clear();
72 let event = self.reader.read_event_into(&mut self.buffer);
73 let bytes_position = self.reader.buffer_position();
74
75 match event {
76 Ok(Event::Start(e)) => {
77 let name_bytes = e.name();
78 let name = std::str::from_utf8(name_bytes.as_ref())?;
79 self.current_path.push(name.to_string());
80 self.current_depth += 1;
81
82 self.attributes.clear();
84 for attr in e.attributes() {
85 let attr = attr?;
86 let key = std::str::from_utf8(attr.key.as_ref())?;
87 let value = std::str::from_utf8(&attr.value)?;
88 self.attributes.insert(key.to_string(), value.to_string());
89 }
90
91 self.text_buffer.clear();
92
93 match (&self.state, name) {
95 (ParserState::Initial, "MessageHeader") => {
96 self.state =
97 ParserState::InHeader(Box::new(MessageHeaderBuilder::new()));
98 }
99 (ParserState::Initial, "Release") => {
100 let reference = self
101 .attributes
102 .get("ReleaseReference")
103 .unwrap_or(&"default".to_string())
104 .clone();
105 let release = ReleaseBuilder::new(reference);
106 self.state = ParserState::InRelease(Box::new(release));
107 }
108 (ParserState::Initial, "Resource") => {
109 let reference = self
110 .attributes
111 .get("ResourceReference")
112 .unwrap_or(&"default".to_string())
113 .clone();
114 let resource = ResourceBuilder::new(reference);
115 self.state = ParserState::InResource(Box::new(resource));
116 }
117 (ParserState::Initial, "Party") => {
118 self.state = ParserState::InParty(Box::new(PartyBuilder::new(None)));
119 }
120 _ => {
121 }
123 }
124 }
125 Ok(Event::End(e)) => {
126 let name_bytes = e.name();
127 let name = std::str::from_utf8(name_bytes.as_ref())?;
128 let text_content = self.text_buffer.clone();
129 let bytes_processed = self.bytes_processed;
130
131 let location = ErrorLocation {
132 line: 0,
133 column: 0,
134 byte_offset: Some(bytes_processed as usize),
135 path: "streaming".to_string(),
136 };
137
138 let result = match &mut self.state {
139 ParserState::InHeader(header) => {
140 match name {
141 "MessageId" => {
142 header.set_message_id(text_content.clone());
143 None
144 }
145 "MessageCreatedDateTime" => {
146 header.set_created_date_time_from_text(text_content.clone());
147 None
148 }
149 "MessageHeader" => {
150 let core_header = header.clone().to_core().map_err(|e| {
152 ParseError::ConversionError {
153 message: format!("Failed to convert header: {:?}", e),
154 location: location.clone(),
155 }
156 })?;
157 self.state = ParserState::Initial;
158 Some(StreamingElement::Header(Box::new(core_header)))
159 }
160 _ => None,
161 }
162 }
163 ParserState::InRelease(release) => {
164 match name {
165 "ReleaseTitle" => {
166 release.add_title(LocalizedString {
167 text: text_content.clone(),
168 language_code: None,
169 script: None,
170 });
171 None
172 }
173 "Genre" => {
174 let genre = Genre {
175 genre_text: text_content.clone(),
176 sub_genre: None,
177 attributes: None,
178 extensions: None,
179 comments: None,
180 };
181 release.add_genre(genre);
182 None
183 }
184 "Release" => {
185 let core_release = release.clone().to_core().map_err(|e| {
187 ParseError::ConversionError {
188 message: format!("Failed to convert release: {:?}", e),
189 location: location.clone(),
190 }
191 })?;
192 self.state = ParserState::Initial;
193 Some(StreamingElement::Release(core_release))
194 }
195 _ => None,
196 }
197 }
198 ParserState::InResource(resource) => {
199 match name {
200 "Title" | "ReferenceTitle" => {
201 resource.add_title(LocalizedString {
202 text: text_content.clone(),
203 language_code: None,
204 script: None,
205 });
206 None
207 }
208 "Duration" => {
209 resource.set_duration_from_text(text_content.clone());
210 None
211 }
212 "Resource" => {
213 let core_resource =
215 resource.clone().to_core().map_err(|e| {
216 ParseError::ConversionError {
217 message: format!(
218 "Failed to convert resource: {:?}",
219 e
220 ),
221 location: location.clone(),
222 }
223 })?;
224 self.state = ParserState::Initial;
225 Some(StreamingElement::Resource(core_resource))
226 }
227 _ => None,
228 }
229 }
230 ParserState::InParty(party) => {
231 match name {
232 "PartyName" => {
233 party.add_name(LocalizedString {
234 text: text_content.clone(),
235 language_code: None,
236 script: None,
237 });
238 None
239 }
240 "Party" => {
241 let core_party = party.clone().to_core().map_err(|e| {
243 ParseError::ConversionError {
244 message: format!("Failed to convert party: {:?}", e),
245 location: location.clone(),
246 }
247 })?;
248 self.state = ParserState::Initial;
249 Some(StreamingElement::Party(core_party))
250 }
251 _ => None,
252 }
253 }
254 _ => None,
255 };
256
257 self.current_depth = self.current_depth.saturating_sub(1);
258 self.current_path.pop();
259 self.text_buffer.clear();
260
261 if let Some(element) = result {
262 self.elements_yielded += 1;
263 return Ok(Some(element));
264 }
265 }
266 Ok(Event::Text(e)) => {
267 let text = std::str::from_utf8(&e)?;
268 self.text_buffer.push_str(text.trim());
269 }
270 Ok(Event::Eof) => {
271 return Ok(Some(StreamingElement::EndOfStream));
272 }
273 Ok(_) => {
274 }
276 Err(e) => {
277 return Err(ParseError::XmlError {
278 message: format!("XML parsing error: {}", e),
279 location: self.get_current_location(),
280 });
281 }
282 }
283
284 self.bytes_processed = bytes_position;
285
286 if self.current_depth > 100 {
288 return Err(ParseError::SecurityViolation {
289 message: "Nesting depth exceeds 100 levels".to_string(),
290 });
291 }
292 }
293 }
294
295 fn get_current_location(&self) -> ErrorLocation {
298 ErrorLocation {
299 line: 0,
300 column: 0,
301 byte_offset: Some(self.bytes_processed as usize),
302 path: "streaming".to_string(),
303 }
304 }
305
306 pub fn stats(&self) -> ComprehensiveStats {
307 ComprehensiveStats {
308 bytes_processed: self.bytes_processed,
309 elements_yielded: self.elements_yielded,
310 current_depth: self.current_depth,
311 elapsed: self.start_time.elapsed(),
312 }
313 }
314}
315
316pub struct ComprehensiveStreamIterator<R: BufRead> {
318 parser: ComprehensiveStreamingParser<R>,
319 finished: bool,
320}
321
322impl<R: BufRead> ComprehensiveStreamIterator<R> {
323 pub fn new(reader: R, version: ERNVersion) -> Self {
324 Self {
325 parser: ComprehensiveStreamingParser::new(reader, version),
326 finished: false,
327 }
328 }
329
330 pub fn stats(&self) -> ComprehensiveStats {
331 self.parser.stats()
332 }
333}
334
335impl<R: BufRead> Iterator for ComprehensiveStreamIterator<R> {
336 type Item = Result<StreamingElement, ParseError>;
337
338 fn next(&mut self) -> Option<Self::Item> {
339 if self.finished {
340 return None;
341 }
342
343 match self.parser.parse_next() {
344 Ok(Some(element)) => {
345 if matches!(element, StreamingElement::EndOfStream) {
346 self.finished = true;
347 }
348 Some(Ok(element))
349 }
350 Ok(None) => {
351 self.finished = true;
352 None
353 }
354 Err(e) => {
355 self.finished = true;
356 Some(Err(e))
357 }
358 }
359 }
360}
361
362#[derive(Debug, Clone)]
363pub struct ComprehensiveStats {
364 pub bytes_processed: u64,
365 pub elements_yielded: usize,
366 pub current_depth: usize,
367 pub elapsed: std::time::Duration,
368}
369
370impl ComprehensiveStats {
371 pub fn throughput_mibs(&self) -> f64 {
372 if self.elapsed.as_secs_f64() > 0.0 {
373 (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
374 } else {
375 0.0
376 }
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use super::*;
383 use std::io::Cursor;
384
385 #[test]
386 fn test_comprehensive_streaming_parser() {
387 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
388<ERNMessage xmlns="http://ddex.net/xml/ern/43">
389 <MessageHeader>
390 <MessageId>test-message-1</MessageId>
391 <MessageCreatedDateTime>2023-01-01T00:00:00</MessageCreatedDateTime>
392 </MessageHeader>
393 <Release ReleaseReference="REL001">
394 <ReleaseTitle>Test Release</ReleaseTitle>
395 <Genre>Rock</Genre>
396 </Release>
397 <Resource ResourceReference="RES001">
398 <Title>Test Resource</Title>
399 <Duration>180</Duration>
400 </Resource>
401</ERNMessage>"#;
402
403 let cursor = Cursor::new(xml.as_bytes());
404 let iterator = ComprehensiveStreamIterator::new(cursor, ERNVersion::V4_3);
405
406 let elements: Result<Vec<_>, _> = iterator.collect();
407 if let Err(ref e) = elements {
408 eprintln!("Iterator error: {:?}", e);
409 }
410 assert!(elements.is_ok(), "Iterator failed with error: {:?}", elements.as_ref().err());
411
412 let elements = elements.unwrap();
413 assert!(elements.len() >= 3); let has_header = elements
417 .iter()
418 .any(|e| matches!(e, StreamingElement::Header(_)));
419 let has_release = elements
420 .iter()
421 .any(|e| matches!(e, StreamingElement::Release(_)));
422 let has_resource = elements
423 .iter()
424 .any(|e| matches!(e, StreamingElement::Resource(_)));
425 let has_end_stream = elements
426 .iter()
427 .any(|e| matches!(e, StreamingElement::EndOfStream));
428
429 assert!(has_header, "Should parse message header");
430 assert!(has_release, "Should parse release");
431 assert!(has_resource, "Should parse resource");
432 assert!(has_end_stream, "Should have end of stream marker");
433 }
434
435 #[test]
436 fn test_comprehensive_security_limits() {
437 let mut xml = String::from(r#"<?xml version="1.0"?>"#);
439 for i in 0..150 {
440 xml.push_str(&format!("<level{}>", i));
441 }
442 xml.push_str("content");
443 for i in (0..150).rev() {
444 xml.push_str(&format!("</level{}>", i));
445 }
446
447 let cursor = Cursor::new(xml.as_bytes());
448 let mut iterator = ComprehensiveStreamIterator::new(cursor, ERNVersion::V4_3);
449
450 let result = iterator.next();
452 assert!(result.is_some());
453 match result.unwrap() {
454 Err(ParseError::SecurityViolation { .. }) => {
455 }
457 _ => panic!("Expected security violation"),
458 }
459 }
460}