1#[allow(dead_code)] use crate::error::ParseError;
6use ddex_core::models::streaming_types::*;
7use ddex_core::models::LocalizedString;
8use ddex_core::models::{graph::*, versions::ERNVersion};
9use quick_xml::{events::Event, Reader};
10use std::collections::HashMap;
11use std::io::BufRead;
12use std::time::Instant;
13
14#[derive(Debug, Clone)]
16pub enum StreamingElement {
17 Header(Box<MessageHeader>),
18 Release(Release),
19 Resource(Resource),
20 Party(Party),
21 EndOfStream,
22}
23
24#[derive(Debug, Clone)]
26enum ParserState {
27 Initial,
28 InHeader(Box<MessageHeaderBuilder>),
29 InRelease(Box<ReleaseBuilder>),
30 InResource(Box<ResourceBuilder>),
31 InParty(Box<PartyBuilder>),
32 Complete,
33}
34
35pub struct ComprehensiveStreamingParser<R: BufRead> {
37 reader: Reader<R>,
38 buffer: Vec<u8>,
39 state: ParserState,
40 current_path: Vec<String>,
41 current_depth: usize,
42 text_buffer: String,
43 attributes: HashMap<String, String>,
44 bytes_processed: u64,
45 elements_yielded: usize,
46 start_time: Instant,
47}
48
49impl<R: BufRead> ComprehensiveStreamingParser<R> {
50 pub fn new(reader: R, _version: ERNVersion) -> Self {
51 let mut xml_reader = Reader::from_reader(reader);
52 xml_reader.config_mut().trim_text(true);
53 xml_reader.config_mut().check_end_names = true;
54
55 Self {
56 reader: xml_reader,
57 buffer: Vec::with_capacity(8192),
58 state: ParserState::Initial,
59 current_path: Vec::new(),
60 current_depth: 0,
61 text_buffer: String::new(),
62 attributes: HashMap::new(),
63 bytes_processed: 0,
64 elements_yielded: 0,
65 start_time: Instant::now(),
66 }
67 }
68
69 pub fn parse_next(&mut self) -> Result<Option<StreamingElement>, ParseError> {
70 loop {
71 self.buffer.clear();
72 let event = self.reader.read_event_into(&mut self.buffer);
73 let bytes_position = self.reader.buffer_position();
74
75 match event {
76 Ok(Event::Start(e)) => {
77 let name_bytes = e.name();
78 let name = std::str::from_utf8(name_bytes.as_ref())?;
79 self.current_path.push(name.to_string());
80 self.current_depth += 1;
81
82 self.attributes.clear();
84 for attr in e.attributes() {
85 let attr = attr?;
86 let key = std::str::from_utf8(attr.key.as_ref())?;
87 let value = std::str::from_utf8(&attr.value)?;
88 self.attributes.insert(key.to_string(), value.to_string());
89 }
90
91 self.text_buffer.clear();
92
93 match (&self.state, name) {
95 (ParserState::Initial, "MessageHeader") => {
96 self.state =
97 ParserState::InHeader(Box::new(MessageHeaderBuilder::new()));
98 }
99 (ParserState::Initial, "Release") => {
100 let reference = self
101 .attributes
102 .get("ReleaseReference")
103 .unwrap_or(&"default".to_string())
104 .clone();
105 let release = ReleaseBuilder::new(reference);
106 self.state = ParserState::InRelease(Box::new(release));
107 }
108 (ParserState::Initial, "Resource") => {
109 let reference = self
110 .attributes
111 .get("ResourceReference")
112 .unwrap_or(&"default".to_string())
113 .clone();
114 let resource = ResourceBuilder::new(reference);
115 self.state = ParserState::InResource(Box::new(resource));
116 }
117 (ParserState::Initial, "Party") => {
118 self.state = ParserState::InParty(Box::new(PartyBuilder::new(None)));
119 }
120 _ => {
121 }
123 }
124 }
125 Ok(Event::End(e)) => {
126 let name_bytes = e.name();
127 let name = std::str::from_utf8(name_bytes.as_ref())?;
128 let text_content = self.text_buffer.clone();
129 let bytes_processed = self.bytes_processed;
130
131 let location = format!("streaming at byte offset {}", bytes_processed);
132
133 let result = match &mut self.state {
134 ParserState::InHeader(header) => {
135 match name {
136 "MessageId" => {
137 header.set_message_id(text_content.clone());
138 None
139 }
140 "MessageCreatedDateTime" => {
141 header.set_created_date_time_from_text(text_content.clone());
142 None
143 }
144 "MessageHeader" => {
145 let core_header = header.clone().to_core().map_err(|e| {
147 ParseError::ConversionError {
148 from: "StreamingHeader".to_string(),
149 to: "MessageHeader".to_string(),
150 message: format!("Failed to convert header at {}: {:?}", location, e),
151 }
152 })?;
153 self.state = ParserState::Initial;
154 Some(StreamingElement::Header(Box::new(core_header)))
155 }
156 _ => None,
157 }
158 }
159 ParserState::InRelease(release) => {
160 match name {
161 "ReleaseTitle" => {
162 release.add_title(LocalizedString {
163 text: text_content.clone(),
164 language_code: None,
165 script: None,
166 });
167 None
168 }
169 "Genre" => {
170 let genre = Genre {
171 genre_text: text_content.clone(),
172 sub_genre: None,
173 attributes: None,
174 extensions: None,
175 comments: None,
176 };
177 release.add_genre(genre);
178 None
179 }
180 "Release" => {
181 let core_release = release.clone().to_core().map_err(|e| {
183 ParseError::ConversionError {
184 from: "StreamingRelease".to_string(),
185 to: "Release".to_string(),
186 message: format!("Failed to convert release at {}: {:?}", location, e),
187 }
188 })?;
189 self.state = ParserState::Initial;
190 Some(StreamingElement::Release(core_release))
191 }
192 _ => None,
193 }
194 }
195 ParserState::InResource(resource) => {
196 match name {
197 "Title" | "ReferenceTitle" => {
198 resource.add_title(LocalizedString {
199 text: text_content.clone(),
200 language_code: None,
201 script: None,
202 });
203 None
204 }
205 "Duration" => {
206 resource.set_duration_from_text(text_content.clone());
207 None
208 }
209 "Resource" => {
210 let core_resource =
212 resource.clone().to_core().map_err(|e| {
213 ParseError::ConversionError {
214 from: "StreamingResource".to_string(),
215 to: "Resource".to_string(),
216 message: format!(
217 "Failed to convert resource at {}: {:?}",
218 location, e
219 ),
220 }
221 })?;
222 self.state = ParserState::Initial;
223 Some(StreamingElement::Resource(core_resource))
224 }
225 _ => None,
226 }
227 }
228 ParserState::InParty(party) => {
229 match name {
230 "PartyName" => {
231 party.add_name(LocalizedString {
232 text: text_content.clone(),
233 language_code: None,
234 script: None,
235 });
236 None
237 }
238 "Party" => {
239 let core_party = party.clone().to_core().map_err(|e| {
241 ParseError::ConversionError {
242 from: "StreamingParty".to_string(),
243 to: "Party".to_string(),
244 message: format!("Failed to convert party at {}: {:?}", location, e),
245 }
246 })?;
247 self.state = ParserState::Initial;
248 Some(StreamingElement::Party(core_party))
249 }
250 _ => None,
251 }
252 }
253 _ => None,
254 };
255
256 self.current_depth = self.current_depth.saturating_sub(1);
257 self.current_path.pop();
258 self.text_buffer.clear();
259
260 if let Some(element) = result {
261 self.elements_yielded += 1;
262 return Ok(Some(element));
263 }
264 }
265 Ok(Event::Text(e)) => {
266 let text = std::str::from_utf8(&e)?;
267 self.text_buffer.push_str(text.trim());
268 }
269 Ok(Event::Eof) => {
270 return Ok(Some(StreamingElement::EndOfStream));
271 }
272 Ok(_) => {
273 }
275 Err(e) => {
276 return Err(ParseError::XmlError(format!("XML parsing error: {}", e)));
277 }
278 }
279
280 self.bytes_processed = bytes_position;
281
282 if self.current_depth > 100 {
284 return Err(ParseError::SecurityViolation {
285 message: "Nesting depth exceeds 100 levels".to_string(),
286 });
287 }
288 }
289 }
290
291 fn get_current_location(&self) -> String {
294 format!("streaming at byte offset {}", self.bytes_processed)
295 }
296
297 pub fn stats(&self) -> ComprehensiveStats {
298 ComprehensiveStats {
299 bytes_processed: self.bytes_processed,
300 elements_yielded: self.elements_yielded,
301 current_depth: self.current_depth,
302 elapsed: self.start_time.elapsed(),
303 }
304 }
305}
306
307pub struct ComprehensiveStreamIterator<R: BufRead> {
309 parser: ComprehensiveStreamingParser<R>,
310 finished: bool,
311}
312
313impl<R: BufRead> ComprehensiveStreamIterator<R> {
314 pub fn new(reader: R, version: ERNVersion) -> Self {
315 Self {
316 parser: ComprehensiveStreamingParser::new(reader, version),
317 finished: false,
318 }
319 }
320
321 pub fn stats(&self) -> ComprehensiveStats {
322 self.parser.stats()
323 }
324}
325
326impl<R: BufRead> Iterator for ComprehensiveStreamIterator<R> {
327 type Item = Result<StreamingElement, ParseError>;
328
329 fn next(&mut self) -> Option<Self::Item> {
330 if self.finished {
331 return None;
332 }
333
334 match self.parser.parse_next() {
335 Ok(Some(element)) => {
336 if matches!(element, StreamingElement::EndOfStream) {
337 self.finished = true;
338 }
339 Some(Ok(element))
340 }
341 Ok(None) => {
342 self.finished = true;
343 None
344 }
345 Err(e) => {
346 self.finished = true;
347 Some(Err(e))
348 }
349 }
350 }
351}
352
353#[derive(Debug, Clone)]
354pub struct ComprehensiveStats {
355 pub bytes_processed: u64,
356 pub elements_yielded: usize,
357 pub current_depth: usize,
358 pub elapsed: std::time::Duration,
359}
360
361impl ComprehensiveStats {
362 pub fn throughput_mibs(&self) -> f64 {
363 if self.elapsed.as_secs_f64() > 0.0 {
364 (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
365 } else {
366 0.0
367 }
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use std::io::Cursor;
375
376 #[test]
377 fn test_comprehensive_streaming_parser() {
378 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
379<ERNMessage xmlns="http://ddex.net/xml/ern/43">
380 <MessageHeader>
381 <MessageId>test-message-1</MessageId>
382 <MessageCreatedDateTime>2023-01-01T00:00:00</MessageCreatedDateTime>
383 </MessageHeader>
384 <Release ReleaseReference="REL001">
385 <ReleaseTitle>Test Release</ReleaseTitle>
386 <Genre>Rock</Genre>
387 </Release>
388 <Resource ResourceReference="RES001">
389 <Title>Test Resource</Title>
390 <Duration>180</Duration>
391 </Resource>
392</ERNMessage>"#;
393
394 let cursor = Cursor::new(xml.as_bytes());
395 let iterator = ComprehensiveStreamIterator::new(cursor, ERNVersion::V4_3);
396
397 let elements: Result<Vec<_>, _> = iterator.collect();
398 if let Err(ref e) = elements {
399 eprintln!("Iterator error: {:?}", e);
400 }
401 assert!(elements.is_ok(), "Iterator failed with error: {:?}", elements.as_ref().err());
402
403 let elements = elements.unwrap();
404 assert!(elements.len() >= 3); let has_header = elements
408 .iter()
409 .any(|e| matches!(e, StreamingElement::Header(_)));
410 let has_release = elements
411 .iter()
412 .any(|e| matches!(e, StreamingElement::Release(_)));
413 let has_resource = elements
414 .iter()
415 .any(|e| matches!(e, StreamingElement::Resource(_)));
416 let has_end_stream = elements
417 .iter()
418 .any(|e| matches!(e, StreamingElement::EndOfStream));
419
420 assert!(has_header, "Should parse message header");
421 assert!(has_release, "Should parse release");
422 assert!(has_resource, "Should parse resource");
423 assert!(has_end_stream, "Should have end of stream marker");
424 }
425
426 #[test]
427 fn test_comprehensive_security_limits() {
428 let mut xml = String::from(r#"<?xml version="1.0"?>"#);
430 for i in 0..150 {
431 xml.push_str(&format!("<level{}>", i));
432 }
433 xml.push_str("content");
434 for i in (0..150).rev() {
435 xml.push_str(&format!("</level{}>", i));
436 }
437
438 let cursor = Cursor::new(xml.as_bytes());
439 let mut iterator = ComprehensiveStreamIterator::new(cursor, ERNVersion::V4_3);
440
441 let result = iterator.next();
443 assert!(result.is_some());
444 match result.unwrap() {
445 Err(ParseError::SecurityViolation { .. }) => {
446 }
448 _ => panic!("Expected security violation"),
449 }
450 }
451}