ddex_parser/streaming/
comprehensive.rs

1// src/streaming/comprehensive.rs
2//! Comprehensive streaming DDEX parser using model-aligned types
3
4#[allow(dead_code)] // Experimental streaming parser implementation
5use crate::error::ParseError;
6use ddex_core::models::streaming_types::*;
7use ddex_core::models::LocalizedString;
8use ddex_core::models::{graph::*, versions::ERNVersion};
9use quick_xml::{events::Event, Reader};
10use std::collections::HashMap;
11use std::io::BufRead;
12use std::time::Instant;
13
14/// Comprehensive streaming element types using model-aligned builders
15#[derive(Debug, Clone)]
16pub enum StreamingElement {
17    Header(Box<MessageHeader>),
18    Release(Release),
19    Resource(Resource),
20    Party(Party),
21    EndOfStream,
22}
23
24/// Parser state using streaming builders
25#[derive(Debug, Clone)]
26enum ParserState {
27    Initial,
28    InHeader(Box<MessageHeaderBuilder>),
29    InRelease(Box<ReleaseBuilder>),
30    InResource(Box<ResourceBuilder>),
31    InParty(Box<PartyBuilder>),
32    Complete,
33}
34
35/// Comprehensive streaming parser
36pub struct ComprehensiveStreamingParser<R: BufRead> {
37    reader: Reader<R>,
38    buffer: Vec<u8>,
39    state: ParserState,
40    current_path: Vec<String>,
41    current_depth: usize,
42    text_buffer: String,
43    attributes: HashMap<String, String>,
44    bytes_processed: u64,
45    elements_yielded: usize,
46    start_time: Instant,
47}
48
49impl<R: BufRead> ComprehensiveStreamingParser<R> {
50    pub fn new(reader: R, _version: ERNVersion) -> Self {
51        let mut xml_reader = Reader::from_reader(reader);
52        xml_reader.config_mut().trim_text(true);
53        xml_reader.config_mut().check_end_names = true;
54
55        Self {
56            reader: xml_reader,
57            buffer: Vec::with_capacity(8192),
58            state: ParserState::Initial,
59            current_path: Vec::new(),
60            current_depth: 0,
61            text_buffer: String::new(),
62            attributes: HashMap::new(),
63            bytes_processed: 0,
64            elements_yielded: 0,
65            start_time: Instant::now(),
66        }
67    }
68
69    pub fn parse_next(&mut self) -> Result<Option<StreamingElement>, ParseError> {
70        loop {
71            self.buffer.clear();
72            let event = self.reader.read_event_into(&mut self.buffer);
73            let bytes_position = self.reader.buffer_position();
74
75            match event {
76                Ok(Event::Start(e)) => {
77                    let name_bytes = e.name();
78                    let name = std::str::from_utf8(name_bytes.as_ref())?;
79                    self.current_path.push(name.to_string());
80                    self.current_depth += 1;
81
82                    // Extract attributes
83                    self.attributes.clear();
84                    for attr in e.attributes() {
85                        let attr = attr?;
86                        let key = std::str::from_utf8(attr.key.as_ref())?;
87                        let value = std::str::from_utf8(&attr.value)?;
88                        self.attributes.insert(key.to_string(), value.to_string());
89                    }
90
91                    self.text_buffer.clear();
92
93                    // State transitions using builders
94                    match (&self.state, name) {
95                        (ParserState::Initial, "MessageHeader") => {
96                            self.state =
97                                ParserState::InHeader(Box::new(MessageHeaderBuilder::new()));
98                        }
99                        (ParserState::Initial, "Release") => {
100                            let reference = self
101                                .attributes
102                                .get("ReleaseReference")
103                                .unwrap_or(&"default".to_string())
104                                .clone();
105                            let release = ReleaseBuilder::new(reference);
106                            self.state = ParserState::InRelease(Box::new(release));
107                        }
108                        (ParserState::Initial, "Resource") => {
109                            let reference = self
110                                .attributes
111                                .get("ResourceReference")
112                                .unwrap_or(&"default".to_string())
113                                .clone();
114                            let resource = ResourceBuilder::new(reference);
115                            self.state = ParserState::InResource(Box::new(resource));
116                        }
117                        (ParserState::Initial, "Party") => {
118                            self.state = ParserState::InParty(Box::new(PartyBuilder::new(None)));
119                        }
120                        _ => {
121                            // Continue in current state
122                        }
123                    }
124                }
125                Ok(Event::End(e)) => {
126                    let name_bytes = e.name();
127                    let name = std::str::from_utf8(name_bytes.as_ref())?;
128                    let text_content = self.text_buffer.clone();
129                    let bytes_processed = self.bytes_processed;
130
131                    let location = format!("streaming at byte offset {}", bytes_processed);
132
133                    let result = match &mut self.state {
134                        ParserState::InHeader(header) => {
135                            match name {
136                                "MessageId" => {
137                                    header.set_message_id(text_content.clone());
138                                    None
139                                }
140                                "MessageCreatedDateTime" => {
141                                    header.set_created_date_time_from_text(text_content.clone());
142                                    None
143                                }
144                                "MessageHeader" => {
145                                    // Complete header - convert using ToCore trait
146                                    let core_header = header.clone().to_core().map_err(|e| {
147                                        ParseError::ConversionError {
148                                            from: "StreamingHeader".to_string(),
149                                            to: "MessageHeader".to_string(),
150                                            message: format!("Failed to convert header at {}: {:?}", location, e),
151                                        }
152                                    })?;
153                                    self.state = ParserState::Initial;
154                                    Some(StreamingElement::Header(Box::new(core_header)))
155                                }
156                                _ => None,
157                            }
158                        }
159                        ParserState::InRelease(release) => {
160                            match name {
161                                "ReleaseTitle" => {
162                                    release.add_title(LocalizedString {
163                                        text: text_content.clone(),
164                                        language_code: None,
165                                        script: None,
166                                    });
167                                    None
168                                }
169                                "Genre" => {
170                                    let genre = Genre {
171                                        genre_text: text_content.clone(),
172                                        sub_genre: None,
173                                        attributes: None,
174                                        extensions: None,
175                                        comments: None,
176                                    };
177                                    release.add_genre(genre);
178                                    None
179                                }
180                                "Release" => {
181                                    // Complete release - convert using ToCore trait
182                                    let core_release = release.clone().to_core().map_err(|e| {
183                                        ParseError::ConversionError {
184                                            from: "StreamingRelease".to_string(),
185                                            to: "Release".to_string(),
186                                            message: format!("Failed to convert release at {}: {:?}", location, e),
187                                        }
188                                    })?;
189                                    self.state = ParserState::Initial;
190                                    Some(StreamingElement::Release(core_release))
191                                }
192                                _ => None,
193                            }
194                        }
195                        ParserState::InResource(resource) => {
196                            match name {
197                                "Title" | "ReferenceTitle" => {
198                                    resource.add_title(LocalizedString {
199                                        text: text_content.clone(),
200                                        language_code: None,
201                                        script: None,
202                                    });
203                                    None
204                                }
205                                "Duration" => {
206                                    resource.set_duration_from_text(text_content.clone());
207                                    None
208                                }
209                                "Resource" => {
210                                    // Complete resource - convert using ToCore trait
211                                    let core_resource =
212                                        resource.clone().to_core().map_err(|e| {
213                                            ParseError::ConversionError {
214                                                from: "StreamingResource".to_string(),
215                                                to: "Resource".to_string(),
216                                                message: format!(
217                                                    "Failed to convert resource at {}: {:?}",
218                                                    location, e
219                                                ),
220                                            }
221                                        })?;
222                                    self.state = ParserState::Initial;
223                                    Some(StreamingElement::Resource(core_resource))
224                                }
225                                _ => None,
226                            }
227                        }
228                        ParserState::InParty(party) => {
229                            match name {
230                                "PartyName" => {
231                                    party.add_name(LocalizedString {
232                                        text: text_content.clone(),
233                                        language_code: None,
234                                        script: None,
235                                    });
236                                    None
237                                }
238                                "Party" => {
239                                    // Complete party - convert using ToCore trait
240                                    let core_party = party.clone().to_core().map_err(|e| {
241                                        ParseError::ConversionError {
242                                            from: "StreamingParty".to_string(),
243                                            to: "Party".to_string(),
244                                            message: format!("Failed to convert party at {}: {:?}", location, e),
245                                        }
246                                    })?;
247                                    self.state = ParserState::Initial;
248                                    Some(StreamingElement::Party(core_party))
249                                }
250                                _ => None,
251                            }
252                        }
253                        _ => None,
254                    };
255
256                    self.current_depth = self.current_depth.saturating_sub(1);
257                    self.current_path.pop();
258                    self.text_buffer.clear();
259
260                    if let Some(element) = result {
261                        self.elements_yielded += 1;
262                        return Ok(Some(element));
263                    }
264                }
265                Ok(Event::Text(e)) => {
266                    let text = std::str::from_utf8(&e)?;
267                    self.text_buffer.push_str(text.trim());
268                }
269                Ok(Event::Eof) => {
270                    return Ok(Some(StreamingElement::EndOfStream));
271                }
272                Ok(_) => {
273                    // Skip other events
274                }
275                Err(e) => {
276                    return Err(ParseError::XmlError(format!("XML parsing error: {}", e)));
277                }
278            }
279
280            self.bytes_processed = bytes_position;
281
282            // Check security limits
283            if self.current_depth > 100 {
284                return Err(ParseError::SecurityViolation {
285                    message: "Nesting depth exceeds 100 levels".to_string(),
286                });
287            }
288        }
289    }
290
291    // Helper methods for error handling
292
293    fn get_current_location(&self) -> String {
294        format!("streaming at byte offset {}", self.bytes_processed)
295    }
296
297    pub fn stats(&self) -> ComprehensiveStats {
298        ComprehensiveStats {
299            bytes_processed: self.bytes_processed,
300            elements_yielded: self.elements_yielded,
301            current_depth: self.current_depth,
302            elapsed: self.start_time.elapsed(),
303        }
304    }
305}
306
307/// Iterator wrapper for comprehensive streaming parser
308pub struct ComprehensiveStreamIterator<R: BufRead> {
309    parser: ComprehensiveStreamingParser<R>,
310    finished: bool,
311}
312
313impl<R: BufRead> ComprehensiveStreamIterator<R> {
314    pub fn new(reader: R, version: ERNVersion) -> Self {
315        Self {
316            parser: ComprehensiveStreamingParser::new(reader, version),
317            finished: false,
318        }
319    }
320
321    pub fn stats(&self) -> ComprehensiveStats {
322        self.parser.stats()
323    }
324}
325
326impl<R: BufRead> Iterator for ComprehensiveStreamIterator<R> {
327    type Item = Result<StreamingElement, ParseError>;
328
329    fn next(&mut self) -> Option<Self::Item> {
330        if self.finished {
331            return None;
332        }
333
334        match self.parser.parse_next() {
335            Ok(Some(element)) => {
336                if matches!(element, StreamingElement::EndOfStream) {
337                    self.finished = true;
338                }
339                Some(Ok(element))
340            }
341            Ok(None) => {
342                self.finished = true;
343                None
344            }
345            Err(e) => {
346                self.finished = true;
347                Some(Err(e))
348            }
349        }
350    }
351}
352
353#[derive(Debug, Clone)]
354pub struct ComprehensiveStats {
355    pub bytes_processed: u64,
356    pub elements_yielded: usize,
357    pub current_depth: usize,
358    pub elapsed: std::time::Duration,
359}
360
361impl ComprehensiveStats {
362    pub fn throughput_mibs(&self) -> f64 {
363        if self.elapsed.as_secs_f64() > 0.0 {
364            (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
365        } else {
366            0.0
367        }
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use std::io::Cursor;
375
376    #[test]
377    fn test_comprehensive_streaming_parser() {
378        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
379<ERNMessage xmlns="http://ddex.net/xml/ern/43">
380    <MessageHeader>
381        <MessageId>test-message-1</MessageId>
382        <MessageCreatedDateTime>2023-01-01T00:00:00</MessageCreatedDateTime>
383    </MessageHeader>
384    <Release ReleaseReference="REL001">
385        <ReleaseTitle>Test Release</ReleaseTitle>
386        <Genre>Rock</Genre>
387    </Release>
388    <Resource ResourceReference="RES001">
389        <Title>Test Resource</Title>
390        <Duration>180</Duration>
391    </Resource>
392</ERNMessage>"#;
393
394        let cursor = Cursor::new(xml.as_bytes());
395        let iterator = ComprehensiveStreamIterator::new(cursor, ERNVersion::V4_3);
396
397        let elements: Result<Vec<_>, _> = iterator.collect();
398        if let Err(ref e) = elements {
399            eprintln!("Iterator error: {:?}", e);
400        }
401        assert!(elements.is_ok(), "Iterator failed with error: {:?}", elements.as_ref().err());
402
403        let elements = elements.unwrap();
404        assert!(elements.len() >= 3); // Header, Release, Resource, EndOfStream
405
406        // Check we got the expected elements
407        let has_header = elements
408            .iter()
409            .any(|e| matches!(e, StreamingElement::Header(_)));
410        let has_release = elements
411            .iter()
412            .any(|e| matches!(e, StreamingElement::Release(_)));
413        let has_resource = elements
414            .iter()
415            .any(|e| matches!(e, StreamingElement::Resource(_)));
416        let has_end_stream = elements
417            .iter()
418            .any(|e| matches!(e, StreamingElement::EndOfStream));
419
420        assert!(has_header, "Should parse message header");
421        assert!(has_release, "Should parse release");
422        assert!(has_resource, "Should parse resource");
423        assert!(has_end_stream, "Should have end of stream marker");
424    }
425
426    #[test]
427    fn test_comprehensive_security_limits() {
428        // Create deeply nested XML
429        let mut xml = String::from(r#"<?xml version="1.0"?>"#);
430        for i in 0..150 {
431            xml.push_str(&format!("<level{}>", i));
432        }
433        xml.push_str("content");
434        for i in (0..150).rev() {
435            xml.push_str(&format!("</level{}>", i));
436        }
437
438        let cursor = Cursor::new(xml.as_bytes());
439        let mut iterator = ComprehensiveStreamIterator::new(cursor, ERNVersion::V4_3);
440
441        // Should get a security violation
442        let result = iterator.next();
443        assert!(result.is_some());
444        match result.unwrap() {
445            Err(ParseError::SecurityViolation { .. }) => {
446                // Expected
447            }
448            _ => panic!("Expected security violation"),
449        }
450    }
451}