ddex_parser/streaming/
comprehensive.rs

1// src/streaming/comprehensive.rs
2//! Comprehensive streaming DDEX parser using model-aligned types
3
4#[allow(dead_code)] // Experimental streaming parser implementation
5use crate::error::{ErrorLocation, ParseError};
6use ddex_core::models::streaming_types::*;
7use ddex_core::models::LocalizedString;
8use ddex_core::models::{graph::*, versions::ERNVersion};
9use quick_xml::{events::Event, Reader};
10use std::collections::HashMap;
11use std::io::BufRead;
12use std::time::Instant;
13
14/// Comprehensive streaming element types using model-aligned builders
15#[derive(Debug, Clone)]
16pub enum StreamingElement {
17    Header(Box<MessageHeader>),
18    Release(Release),
19    Resource(Resource),
20    Party(Party),
21    EndOfStream,
22}
23
24/// Parser state using streaming builders
25#[derive(Debug, Clone)]
26enum ParserState {
27    Initial,
28    InHeader(Box<MessageHeaderBuilder>),
29    InRelease(Box<ReleaseBuilder>),
30    InResource(Box<ResourceBuilder>),
31    InParty(Box<PartyBuilder>),
32    Complete,
33}
34
35/// Comprehensive streaming parser
36pub struct ComprehensiveStreamingParser<R: BufRead> {
37    reader: Reader<R>,
38    buffer: Vec<u8>,
39    state: ParserState,
40    current_path: Vec<String>,
41    current_depth: usize,
42    text_buffer: String,
43    attributes: HashMap<String, String>,
44    bytes_processed: u64,
45    elements_yielded: usize,
46    start_time: Instant,
47}
48
49impl<R: BufRead> ComprehensiveStreamingParser<R> {
50    pub fn new(reader: R, _version: ERNVersion) -> Self {
51        let mut xml_reader = Reader::from_reader(reader);
52        xml_reader.config_mut().trim_text(true);
53        xml_reader.config_mut().check_end_names = true;
54
55        Self {
56            reader: xml_reader,
57            buffer: Vec::with_capacity(8192),
58            state: ParserState::Initial,
59            current_path: Vec::new(),
60            current_depth: 0,
61            text_buffer: String::new(),
62            attributes: HashMap::new(),
63            bytes_processed: 0,
64            elements_yielded: 0,
65            start_time: Instant::now(),
66        }
67    }
68
69    pub fn parse_next(&mut self) -> Result<Option<StreamingElement>, ParseError> {
70        loop {
71            self.buffer.clear();
72            let event = self.reader.read_event_into(&mut self.buffer);
73            let bytes_position = self.reader.buffer_position();
74
75            match event {
76                Ok(Event::Start(e)) => {
77                    let name_bytes = e.name();
78                    let name = std::str::from_utf8(name_bytes.as_ref())?;
79                    self.current_path.push(name.to_string());
80                    self.current_depth += 1;
81
82                    // Extract attributes
83                    self.attributes.clear();
84                    for attr in e.attributes() {
85                        let attr = attr?;
86                        let key = std::str::from_utf8(attr.key.as_ref())?;
87                        let value = std::str::from_utf8(&attr.value)?;
88                        self.attributes.insert(key.to_string(), value.to_string());
89                    }
90
91                    self.text_buffer.clear();
92
93                    // State transitions using builders
94                    match (&self.state, name) {
95                        (ParserState::Initial, "MessageHeader") => {
96                            self.state =
97                                ParserState::InHeader(Box::new(MessageHeaderBuilder::new()));
98                        }
99                        (ParserState::Initial, "Release") => {
100                            let reference = self
101                                .attributes
102                                .get("ReleaseReference")
103                                .unwrap_or(&"default".to_string())
104                                .clone();
105                            let release = ReleaseBuilder::new(reference);
106                            self.state = ParserState::InRelease(Box::new(release));
107                        }
108                        (ParserState::Initial, "Resource") => {
109                            let reference = self
110                                .attributes
111                                .get("ResourceReference")
112                                .unwrap_or(&"default".to_string())
113                                .clone();
114                            let resource = ResourceBuilder::new(reference);
115                            self.state = ParserState::InResource(Box::new(resource));
116                        }
117                        (ParserState::Initial, "Party") => {
118                            self.state = ParserState::InParty(Box::new(PartyBuilder::new(None)));
119                        }
120                        _ => {
121                            // Continue in current state
122                        }
123                    }
124                }
125                Ok(Event::End(e)) => {
126                    let name_bytes = e.name();
127                    let name = std::str::from_utf8(name_bytes.as_ref())?;
128                    let text_content = self.text_buffer.clone();
129                    let bytes_processed = self.bytes_processed;
130
131                    let location = ErrorLocation {
132                        line: 0,
133                        column: 0,
134                        byte_offset: Some(bytes_processed as usize),
135                        path: "streaming".to_string(),
136                    };
137
138                    let result = match &mut self.state {
139                        ParserState::InHeader(header) => {
140                            match name {
141                                "MessageId" => {
142                                    header.set_message_id(text_content.clone());
143                                    None
144                                }
145                                "MessageCreatedDateTime" => {
146                                    header.set_created_date_time_from_text(text_content.clone());
147                                    None
148                                }
149                                "MessageHeader" => {
150                                    // Complete header - convert using ToCore trait
151                                    let core_header = header.clone().to_core().map_err(|e| {
152                                        ParseError::ConversionError {
153                                            message: format!("Failed to convert header: {:?}", e),
154                                            location: location.clone(),
155                                        }
156                                    })?;
157                                    self.state = ParserState::Initial;
158                                    Some(StreamingElement::Header(Box::new(core_header)))
159                                }
160                                _ => None,
161                            }
162                        }
163                        ParserState::InRelease(release) => {
164                            match name {
165                                "ReleaseTitle" => {
166                                    release.add_title(LocalizedString {
167                                        text: text_content.clone(),
168                                        language_code: None,
169                                        script: None,
170                                    });
171                                    None
172                                }
173                                "Genre" => {
174                                    let genre = Genre {
175                                        genre_text: text_content.clone(),
176                                        sub_genre: None,
177                                        attributes: None,
178                                        extensions: None,
179                                        comments: None,
180                                    };
181                                    release.add_genre(genre);
182                                    None
183                                }
184                                "Release" => {
185                                    // Complete release - convert using ToCore trait
186                                    let core_release = release.clone().to_core().map_err(|e| {
187                                        ParseError::ConversionError {
188                                            message: format!("Failed to convert release: {:?}", e),
189                                            location: location.clone(),
190                                        }
191                                    })?;
192                                    self.state = ParserState::Initial;
193                                    Some(StreamingElement::Release(core_release))
194                                }
195                                _ => None,
196                            }
197                        }
198                        ParserState::InResource(resource) => {
199                            match name {
200                                "Title" | "ReferenceTitle" => {
201                                    resource.add_title(LocalizedString {
202                                        text: text_content.clone(),
203                                        language_code: None,
204                                        script: None,
205                                    });
206                                    None
207                                }
208                                "Duration" => {
209                                    resource.set_duration_from_text(text_content.clone());
210                                    None
211                                }
212                                "Resource" => {
213                                    // Complete resource - convert using ToCore trait
214                                    let core_resource =
215                                        resource.clone().to_core().map_err(|e| {
216                                            ParseError::ConversionError {
217                                                message: format!(
218                                                    "Failed to convert resource: {:?}",
219                                                    e
220                                                ),
221                                                location: location.clone(),
222                                            }
223                                        })?;
224                                    self.state = ParserState::Initial;
225                                    Some(StreamingElement::Resource(core_resource))
226                                }
227                                _ => None,
228                            }
229                        }
230                        ParserState::InParty(party) => {
231                            match name {
232                                "PartyName" => {
233                                    party.add_name(LocalizedString {
234                                        text: text_content.clone(),
235                                        language_code: None,
236                                        script: None,
237                                    });
238                                    None
239                                }
240                                "Party" => {
241                                    // Complete party - convert using ToCore trait
242                                    let core_party = party.clone().to_core().map_err(|e| {
243                                        ParseError::ConversionError {
244                                            message: format!("Failed to convert party: {:?}", e),
245                                            location: location.clone(),
246                                        }
247                                    })?;
248                                    self.state = ParserState::Initial;
249                                    Some(StreamingElement::Party(core_party))
250                                }
251                                _ => None,
252                            }
253                        }
254                        _ => None,
255                    };
256
257                    self.current_depth = self.current_depth.saturating_sub(1);
258                    self.current_path.pop();
259                    self.text_buffer.clear();
260
261                    if let Some(element) = result {
262                        self.elements_yielded += 1;
263                        return Ok(Some(element));
264                    }
265                }
266                Ok(Event::Text(e)) => {
267                    let text = std::str::from_utf8(&e)?;
268                    self.text_buffer.push_str(text.trim());
269                }
270                Ok(Event::Eof) => {
271                    return Ok(Some(StreamingElement::EndOfStream));
272                }
273                Ok(_) => {
274                    // Skip other events
275                }
276                Err(e) => {
277                    return Err(ParseError::XmlError {
278                        message: format!("XML parsing error: {}", e),
279                        location: self.get_current_location(),
280                    });
281                }
282            }
283
284            self.bytes_processed = bytes_position;
285
286            // Check security limits
287            if self.current_depth > 100 {
288                return Err(ParseError::SecurityViolation {
289                    message: "Nesting depth exceeds 100 levels".to_string(),
290                });
291            }
292        }
293    }
294
295    // Helper methods for error handling
296
297    fn get_current_location(&self) -> ErrorLocation {
298        ErrorLocation {
299            line: 0,
300            column: 0,
301            byte_offset: Some(self.bytes_processed as usize),
302            path: "streaming".to_string(),
303        }
304    }
305
306    pub fn stats(&self) -> ComprehensiveStats {
307        ComprehensiveStats {
308            bytes_processed: self.bytes_processed,
309            elements_yielded: self.elements_yielded,
310            current_depth: self.current_depth,
311            elapsed: self.start_time.elapsed(),
312        }
313    }
314}
315
316/// Iterator wrapper for comprehensive streaming parser
317pub struct ComprehensiveStreamIterator<R: BufRead> {
318    parser: ComprehensiveStreamingParser<R>,
319    finished: bool,
320}
321
322impl<R: BufRead> ComprehensiveStreamIterator<R> {
323    pub fn new(reader: R, version: ERNVersion) -> Self {
324        Self {
325            parser: ComprehensiveStreamingParser::new(reader, version),
326            finished: false,
327        }
328    }
329
330    pub fn stats(&self) -> ComprehensiveStats {
331        self.parser.stats()
332    }
333}
334
335impl<R: BufRead> Iterator for ComprehensiveStreamIterator<R> {
336    type Item = Result<StreamingElement, ParseError>;
337
338    fn next(&mut self) -> Option<Self::Item> {
339        if self.finished {
340            return None;
341        }
342
343        match self.parser.parse_next() {
344            Ok(Some(element)) => {
345                if matches!(element, StreamingElement::EndOfStream) {
346                    self.finished = true;
347                }
348                Some(Ok(element))
349            }
350            Ok(None) => {
351                self.finished = true;
352                None
353            }
354            Err(e) => {
355                self.finished = true;
356                Some(Err(e))
357            }
358        }
359    }
360}
361
362#[derive(Debug, Clone)]
363pub struct ComprehensiveStats {
364    pub bytes_processed: u64,
365    pub elements_yielded: usize,
366    pub current_depth: usize,
367    pub elapsed: std::time::Duration,
368}
369
370impl ComprehensiveStats {
371    pub fn throughput_mibs(&self) -> f64 {
372        if self.elapsed.as_secs_f64() > 0.0 {
373            (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
374        } else {
375            0.0
376        }
377    }
378}
379
380#[cfg(test)]
381mod tests {
382    use super::*;
383    use std::io::Cursor;
384
385    #[test]
386    fn test_comprehensive_streaming_parser() {
387        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
388<ERNMessage xmlns="http://ddex.net/xml/ern/43">
389    <MessageHeader>
390        <MessageId>test-message-1</MessageId>
391        <MessageCreatedDateTime>2023-01-01T00:00:00</MessageCreatedDateTime>
392    </MessageHeader>
393    <Release ReleaseReference="REL001">
394        <ReleaseTitle>Test Release</ReleaseTitle>
395        <Genre>Rock</Genre>
396    </Release>
397    <Resource ResourceReference="RES001">
398        <Title>Test Resource</Title>
399        <Duration>180</Duration>
400    </Resource>
401</ERNMessage>"#;
402
403        let cursor = Cursor::new(xml.as_bytes());
404        let iterator = ComprehensiveStreamIterator::new(cursor, ERNVersion::V4_3);
405
406        let elements: Result<Vec<_>, _> = iterator.collect();
407        if let Err(ref e) = elements {
408            eprintln!("Iterator error: {:?}", e);
409        }
410        assert!(elements.is_ok(), "Iterator failed with error: {:?}", elements.as_ref().err());
411
412        let elements = elements.unwrap();
413        assert!(elements.len() >= 3); // Header, Release, Resource, EndOfStream
414
415        // Check we got the expected elements
416        let has_header = elements
417            .iter()
418            .any(|e| matches!(e, StreamingElement::Header(_)));
419        let has_release = elements
420            .iter()
421            .any(|e| matches!(e, StreamingElement::Release(_)));
422        let has_resource = elements
423            .iter()
424            .any(|e| matches!(e, StreamingElement::Resource(_)));
425        let has_end_stream = elements
426            .iter()
427            .any(|e| matches!(e, StreamingElement::EndOfStream));
428
429        assert!(has_header, "Should parse message header");
430        assert!(has_release, "Should parse release");
431        assert!(has_resource, "Should parse resource");
432        assert!(has_end_stream, "Should have end of stream marker");
433    }
434
435    #[test]
436    fn test_comprehensive_security_limits() {
437        // Create deeply nested XML
438        let mut xml = String::from(r#"<?xml version="1.0"?>"#);
439        for i in 0..150 {
440            xml.push_str(&format!("<level{}>", i));
441        }
442        xml.push_str("content");
443        for i in (0..150).rev() {
444            xml.push_str(&format!("</level{}>", i));
445        }
446
447        let cursor = Cursor::new(xml.as_bytes());
448        let mut iterator = ComprehensiveStreamIterator::new(cursor, ERNVersion::V4_3);
449
450        // Should get a security violation
451        let result = iterator.next();
452        assert!(result.is_some());
453        match result.unwrap() {
454            Err(ParseError::SecurityViolation { .. }) => {
455                // Expected
456            }
457            _ => panic!("Expected security violation"),
458        }
459    }
460}