1#[allow(dead_code)] use crate::error::ParseError;
6use ddex_core::models::streaming_types::*;
7use ddex_core::models::LocalizedString;
8use ddex_core::models::{graph::*, versions::ERNVersion};
9use log::error;
10use quick_xml::{events::Event, Reader};
11use std::collections::HashMap;
12use std::io::BufRead;
13use std::time::Instant;
14
15#[derive(Debug, Clone)]
17pub enum StreamingElement {
18 Header(Box<MessageHeader>),
19 Release(Release),
20 Resource(Resource),
21 Party(Party),
22 EndOfStream,
23}
24
25#[derive(Debug, Clone)]
27enum ParserState {
28 Initial,
29 InHeader(Box<MessageHeaderBuilder>),
30 InRelease(Box<ReleaseBuilder>),
31 InResource(Box<ResourceBuilder>),
32 InParty(Box<PartyBuilder>),
33 Complete,
34}
35
36pub struct ComprehensiveStreamingParser<R: BufRead> {
38 reader: Reader<R>,
39 buffer: Vec<u8>,
40 state: ParserState,
41 current_path: Vec<String>,
42 current_depth: usize,
43 text_buffer: String,
44 attributes: HashMap<String, String>,
45 bytes_processed: u64,
46 elements_yielded: usize,
47 start_time: Instant,
48}
49
50impl<R: BufRead> ComprehensiveStreamingParser<R> {
51 pub fn new(reader: R, _version: ERNVersion) -> Self {
52 let mut xml_reader = Reader::from_reader(reader);
53 xml_reader.config_mut().trim_text(true);
54 xml_reader.config_mut().check_end_names = true;
55
56 Self {
57 reader: xml_reader,
58 buffer: Vec::with_capacity(8192),
59 state: ParserState::Initial,
60 current_path: Vec::new(),
61 current_depth: 0,
62 text_buffer: String::new(),
63 attributes: HashMap::new(),
64 bytes_processed: 0,
65 elements_yielded: 0,
66 start_time: Instant::now(),
67 }
68 }
69
70 pub fn parse_next(&mut self) -> Result<Option<StreamingElement>, ParseError> {
71 loop {
72 self.buffer.clear();
73 let event = self.reader.read_event_into(&mut self.buffer);
74 let bytes_position = self.reader.buffer_position();
75
76 match event {
77 Ok(Event::Start(e)) => {
78 let name_bytes = e.name();
79 let name = std::str::from_utf8(name_bytes.as_ref())?;
80 self.current_path.push(name.to_string());
81 self.current_depth += 1;
82
83 self.attributes.clear();
85 for attr in e.attributes() {
86 let attr = attr?;
87 let key = std::str::from_utf8(attr.key.as_ref())?;
88 let value = std::str::from_utf8(&attr.value)?;
89 self.attributes.insert(key.to_string(), value.to_string());
90 }
91
92 self.text_buffer.clear();
93
94 match (&self.state, name) {
96 (ParserState::Initial, "MessageHeader") => {
97 self.state =
98 ParserState::InHeader(Box::new(MessageHeaderBuilder::new()));
99 }
100 (ParserState::Initial, "Release") => {
101 let reference = self
102 .attributes
103 .get("ReleaseReference")
104 .unwrap_or(&"default".to_string())
105 .clone();
106 let release = ReleaseBuilder::new(reference);
107 self.state = ParserState::InRelease(Box::new(release));
108 }
109 (ParserState::Initial, "Resource") => {
110 let reference = self
111 .attributes
112 .get("ResourceReference")
113 .unwrap_or(&"default".to_string())
114 .clone();
115 let resource = ResourceBuilder::new(reference);
116 self.state = ParserState::InResource(Box::new(resource));
117 }
118 (ParserState::Initial, "Party") => {
119 self.state = ParserState::InParty(Box::new(PartyBuilder::new(None)));
120 }
121 _ => {
122 }
124 }
125 }
126 Ok(Event::End(e)) => {
127 let name_bytes = e.name();
128 let name = std::str::from_utf8(name_bytes.as_ref())?;
129 let text_content = self.text_buffer.clone();
130 let bytes_processed = self.bytes_processed;
131
132 let location = format!("streaming at byte offset {}", bytes_processed);
133
134 let result = match &mut self.state {
135 ParserState::InHeader(header) => {
136 match name {
137 "MessageId" => {
138 header.set_message_id(text_content.clone());
139 None
140 }
141 "MessageCreatedDateTime" => {
142 header.set_created_date_time_from_text(text_content.clone());
143 None
144 }
145 "MessageHeader" => {
146 let core_header = header.clone().to_core().map_err(|e| {
148 ParseError::ConversionError {
149 from: "StreamingHeader".to_string(),
150 to: "MessageHeader".to_string(),
151 message: format!("Failed to convert header at {}: {:?}", location, e),
152 }
153 })?;
154 self.state = ParserState::Initial;
155 Some(StreamingElement::Header(Box::new(core_header)))
156 }
157 _ => None,
158 }
159 }
160 ParserState::InRelease(release) => {
161 match name {
162 "ReleaseTitle" => {
163 release.add_title(LocalizedString {
164 text: text_content.clone(),
165 language_code: None,
166 script: None,
167 });
168 None
169 }
170 "Genre" => {
171 let genre = Genre {
172 genre_text: text_content.clone(),
173 sub_genre: None,
174 attributes: None,
175 extensions: None,
176 comments: None,
177 };
178 release.add_genre(genre);
179 None
180 }
181 "Release" => {
182 let core_release = release.clone().to_core().map_err(|e| {
184 ParseError::ConversionError {
185 from: "StreamingRelease".to_string(),
186 to: "Release".to_string(),
187 message: format!("Failed to convert release at {}: {:?}", location, e),
188 }
189 })?;
190 self.state = ParserState::Initial;
191 Some(StreamingElement::Release(core_release))
192 }
193 _ => None,
194 }
195 }
196 ParserState::InResource(resource) => {
197 match name {
198 "Title" | "ReferenceTitle" => {
199 resource.add_title(LocalizedString {
200 text: text_content.clone(),
201 language_code: None,
202 script: None,
203 });
204 None
205 }
206 "Duration" => {
207 resource.set_duration_from_text(text_content.clone());
208 None
209 }
210 "Resource" => {
211 let core_resource =
213 resource.clone().to_core().map_err(|e| {
214 ParseError::ConversionError {
215 from: "StreamingResource".to_string(),
216 to: "Resource".to_string(),
217 message: format!(
218 "Failed to convert resource at {}: {:?}",
219 location, e
220 ),
221 }
222 })?;
223 self.state = ParserState::Initial;
224 Some(StreamingElement::Resource(core_resource))
225 }
226 _ => None,
227 }
228 }
229 ParserState::InParty(party) => {
230 match name {
231 "PartyName" => {
232 party.add_name(LocalizedString {
233 text: text_content.clone(),
234 language_code: None,
235 script: None,
236 });
237 None
238 }
239 "Party" => {
240 let core_party = party.clone().to_core().map_err(|e| {
242 ParseError::ConversionError {
243 from: "StreamingParty".to_string(),
244 to: "Party".to_string(),
245 message: format!("Failed to convert party at {}: {:?}", location, e),
246 }
247 })?;
248 self.state = ParserState::Initial;
249 Some(StreamingElement::Party(core_party))
250 }
251 _ => None,
252 }
253 }
254 _ => None,
255 };
256
257 self.current_depth = self.current_depth.saturating_sub(1);
258 self.current_path.pop();
259 self.text_buffer.clear();
260
261 if let Some(element) = result {
262 self.elements_yielded += 1;
263 return Ok(Some(element));
264 }
265 }
266 Ok(Event::Text(e)) => {
267 let text = std::str::from_utf8(&e)?;
268 self.text_buffer.push_str(text.trim());
269 }
270 Ok(Event::Eof) => {
271 return Ok(Some(StreamingElement::EndOfStream));
272 }
273 Ok(_) => {
274 }
276 Err(e) => {
277 return Err(ParseError::XmlError(format!("XML parsing error: {}", e)));
278 }
279 }
280
281 self.bytes_processed = bytes_position;
282
283 if self.current_depth > 100 {
285 return Err(ParseError::SecurityViolation {
286 message: "Nesting depth exceeds 100 levels".to_string(),
287 });
288 }
289 }
290 }
291
292 fn get_current_location(&self) -> String {
295 format!("streaming at byte offset {}", self.bytes_processed)
296 }
297
298 pub fn stats(&self) -> ComprehensiveStats {
299 ComprehensiveStats {
300 bytes_processed: self.bytes_processed,
301 elements_yielded: self.elements_yielded,
302 current_depth: self.current_depth,
303 elapsed: self.start_time.elapsed(),
304 }
305 }
306}
307
308pub struct ComprehensiveStreamIterator<R: BufRead> {
310 parser: ComprehensiveStreamingParser<R>,
311 finished: bool,
312}
313
314impl<R: BufRead> ComprehensiveStreamIterator<R> {
315 pub fn new(reader: R, version: ERNVersion) -> Self {
316 Self {
317 parser: ComprehensiveStreamingParser::new(reader, version),
318 finished: false,
319 }
320 }
321
322 pub fn stats(&self) -> ComprehensiveStats {
323 self.parser.stats()
324 }
325}
326
327impl<R: BufRead> Iterator for ComprehensiveStreamIterator<R> {
328 type Item = Result<StreamingElement, ParseError>;
329
330 fn next(&mut self) -> Option<Self::Item> {
331 if self.finished {
332 return None;
333 }
334
335 match self.parser.parse_next() {
336 Ok(Some(element)) => {
337 if matches!(element, StreamingElement::EndOfStream) {
338 self.finished = true;
339 }
340 Some(Ok(element))
341 }
342 Ok(None) => {
343 self.finished = true;
344 None
345 }
346 Err(e) => {
347 self.finished = true;
348 Some(Err(e))
349 }
350 }
351 }
352}
353
354#[derive(Debug, Clone)]
355pub struct ComprehensiveStats {
356 pub bytes_processed: u64,
357 pub elements_yielded: usize,
358 pub current_depth: usize,
359 pub elapsed: std::time::Duration,
360}
361
362impl ComprehensiveStats {
363 pub fn throughput_mibs(&self) -> f64 {
364 if self.elapsed.as_secs_f64() > 0.0 {
365 (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
366 } else {
367 0.0
368 }
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use std::io::Cursor;
376
377 #[test]
378 fn test_comprehensive_streaming_parser() {
379 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
380<ERNMessage xmlns="http://ddex.net/xml/ern/43">
381 <MessageHeader>
382 <MessageId>test-message-1</MessageId>
383 <MessageCreatedDateTime>2023-01-01T00:00:00</MessageCreatedDateTime>
384 </MessageHeader>
385 <Release ReleaseReference="REL001">
386 <ReleaseTitle>Test Release</ReleaseTitle>
387 <Genre>Rock</Genre>
388 </Release>
389 <Resource ResourceReference="RES001">
390 <Title>Test Resource</Title>
391 <Duration>180</Duration>
392 </Resource>
393</ERNMessage>"#;
394
395 let cursor = Cursor::new(xml.as_bytes());
396 let iterator = ComprehensiveStreamIterator::new(cursor, ERNVersion::V4_3);
397
398 let elements: Result<Vec<_>, _> = iterator.collect();
399 if let Err(ref e) = elements {
400 error!("Iterator failed to collect elements: {:?}", e);
401 }
402 assert!(elements.is_ok(), "Iterator failed with error: {:?}", elements.as_ref().err());
403
404 let elements = elements.unwrap();
405 assert!(elements.len() >= 3); let has_header = elements
409 .iter()
410 .any(|e| matches!(e, StreamingElement::Header(_)));
411 let has_release = elements
412 .iter()
413 .any(|e| matches!(e, StreamingElement::Release(_)));
414 let has_resource = elements
415 .iter()
416 .any(|e| matches!(e, StreamingElement::Resource(_)));
417 let has_end_stream = elements
418 .iter()
419 .any(|e| matches!(e, StreamingElement::EndOfStream));
420
421 assert!(has_header, "Should parse message header");
422 assert!(has_release, "Should parse release");
423 assert!(has_resource, "Should parse resource");
424 assert!(has_end_stream, "Should have end of stream marker");
425 }
426
427 #[test]
428 fn test_comprehensive_security_limits() {
429 let mut xml = String::from(r#"<?xml version="1.0"?>"#);
431 for i in 0..150 {
432 xml.push_str(&format!("<level{}>", i));
433 }
434 xml.push_str("content");
435 for i in (0..150).rev() {
436 xml.push_str(&format!("</level{}>", i));
437 }
438
439 let cursor = Cursor::new(xml.as_bytes());
440 let mut iterator = ComprehensiveStreamIterator::new(cursor, ERNVersion::V4_3);
441
442 let result = iterator.next();
444 assert!(result.is_some());
445 match result.unwrap() {
446 Err(ParseError::SecurityViolation { .. }) => {
447 }
449 _ => panic!("Expected security violation"),
450 }
451 }
452}