ddex_parser/streaming/
comprehensive.rs

1// src/streaming/comprehensive.rs
2//! Comprehensive streaming DDEX parser using model-aligned types
3
4#[allow(dead_code)] // Experimental streaming parser implementation
5use crate::error::ParseError;
6use ddex_core::models::streaming_types::*;
7use ddex_core::models::LocalizedString;
8use ddex_core::models::{graph::*, versions::ERNVersion};
9use log::error;
10use quick_xml::{events::Event, Reader};
11use std::collections::HashMap;
12use std::io::BufRead;
13use std::time::Instant;
14
15/// Comprehensive streaming element types using model-aligned builders
16#[derive(Debug, Clone)]
17pub enum StreamingElement {
18    Header(Box<MessageHeader>),
19    Release(Release),
20    Resource(Resource),
21    Party(Party),
22    EndOfStream,
23}
24
25/// Parser state using streaming builders
26#[derive(Debug, Clone)]
27enum ParserState {
28    Initial,
29    InHeader(Box<MessageHeaderBuilder>),
30    InRelease(Box<ReleaseBuilder>),
31    InResource(Box<ResourceBuilder>),
32    InParty(Box<PartyBuilder>),
33    Complete,
34}
35
36/// Comprehensive streaming parser
37pub struct ComprehensiveStreamingParser<R: BufRead> {
38    reader: Reader<R>,
39    buffer: Vec<u8>,
40    state: ParserState,
41    current_path: Vec<String>,
42    current_depth: usize,
43    text_buffer: String,
44    attributes: HashMap<String, String>,
45    bytes_processed: u64,
46    elements_yielded: usize,
47    start_time: Instant,
48}
49
50impl<R: BufRead> ComprehensiveStreamingParser<R> {
51    pub fn new(reader: R, _version: ERNVersion) -> Self {
52        let mut xml_reader = Reader::from_reader(reader);
53        xml_reader.config_mut().trim_text(true);
54        xml_reader.config_mut().check_end_names = true;
55
56        Self {
57            reader: xml_reader,
58            buffer: Vec::with_capacity(8192),
59            state: ParserState::Initial,
60            current_path: Vec::new(),
61            current_depth: 0,
62            text_buffer: String::new(),
63            attributes: HashMap::new(),
64            bytes_processed: 0,
65            elements_yielded: 0,
66            start_time: Instant::now(),
67        }
68    }
69
70    pub fn parse_next(&mut self) -> Result<Option<StreamingElement>, ParseError> {
71        loop {
72            self.buffer.clear();
73            let event = self.reader.read_event_into(&mut self.buffer);
74            let bytes_position = self.reader.buffer_position();
75
76            match event {
77                Ok(Event::Start(e)) => {
78                    let name_bytes = e.name();
79                    let name = std::str::from_utf8(name_bytes.as_ref())?;
80                    self.current_path.push(name.to_string());
81                    self.current_depth += 1;
82
83                    // Extract attributes
84                    self.attributes.clear();
85                    for attr in e.attributes() {
86                        let attr = attr?;
87                        let key = std::str::from_utf8(attr.key.as_ref())?;
88                        let value = std::str::from_utf8(&attr.value)?;
89                        self.attributes.insert(key.to_string(), value.to_string());
90                    }
91
92                    self.text_buffer.clear();
93
94                    // State transitions using builders
95                    match (&self.state, name) {
96                        (ParserState::Initial, "MessageHeader") => {
97                            self.state =
98                                ParserState::InHeader(Box::new(MessageHeaderBuilder::new()));
99                        }
100                        (ParserState::Initial, "Release") => {
101                            let reference = self
102                                .attributes
103                                .get("ReleaseReference")
104                                .unwrap_or(&"default".to_string())
105                                .clone();
106                            let release = ReleaseBuilder::new(reference);
107                            self.state = ParserState::InRelease(Box::new(release));
108                        }
109                        (ParserState::Initial, "Resource") => {
110                            let reference = self
111                                .attributes
112                                .get("ResourceReference")
113                                .unwrap_or(&"default".to_string())
114                                .clone();
115                            let resource = ResourceBuilder::new(reference);
116                            self.state = ParserState::InResource(Box::new(resource));
117                        }
118                        (ParserState::Initial, "Party") => {
119                            self.state = ParserState::InParty(Box::new(PartyBuilder::new(None)));
120                        }
121                        _ => {
122                            // Continue in current state
123                        }
124                    }
125                }
126                Ok(Event::End(e)) => {
127                    let name_bytes = e.name();
128                    let name = std::str::from_utf8(name_bytes.as_ref())?;
129                    let text_content = self.text_buffer.clone();
130                    let bytes_processed = self.bytes_processed;
131
132                    let location = format!("streaming at byte offset {}", bytes_processed);
133
134                    let result = match &mut self.state {
135                        ParserState::InHeader(header) => {
136                            match name {
137                                "MessageId" => {
138                                    header.set_message_id(text_content.clone());
139                                    None
140                                }
141                                "MessageCreatedDateTime" => {
142                                    header.set_created_date_time_from_text(text_content.clone());
143                                    None
144                                }
145                                "MessageHeader" => {
146                                    // Complete header - convert using ToCore trait
147                                    let core_header = header.clone().to_core().map_err(|e| {
148                                        ParseError::ConversionError {
149                                            from: "StreamingHeader".to_string(),
150                                            to: "MessageHeader".to_string(),
151                                            message: format!("Failed to convert header at {}: {:?}", location, e),
152                                        }
153                                    })?;
154                                    self.state = ParserState::Initial;
155                                    Some(StreamingElement::Header(Box::new(core_header)))
156                                }
157                                _ => None,
158                            }
159                        }
160                        ParserState::InRelease(release) => {
161                            match name {
162                                "ReleaseTitle" => {
163                                    release.add_title(LocalizedString {
164                                        text: text_content.clone(),
165                                        language_code: None,
166                                        script: None,
167                                    });
168                                    None
169                                }
170                                "Genre" => {
171                                    let genre = Genre {
172                                        genre_text: text_content.clone(),
173                                        sub_genre: None,
174                                        attributes: None,
175                                        extensions: None,
176                                        comments: None,
177                                    };
178                                    release.add_genre(genre);
179                                    None
180                                }
181                                "Release" => {
182                                    // Complete release - convert using ToCore trait
183                                    let core_release = release.clone().to_core().map_err(|e| {
184                                        ParseError::ConversionError {
185                                            from: "StreamingRelease".to_string(),
186                                            to: "Release".to_string(),
187                                            message: format!("Failed to convert release at {}: {:?}", location, e),
188                                        }
189                                    })?;
190                                    self.state = ParserState::Initial;
191                                    Some(StreamingElement::Release(core_release))
192                                }
193                                _ => None,
194                            }
195                        }
196                        ParserState::InResource(resource) => {
197                            match name {
198                                "Title" | "ReferenceTitle" => {
199                                    resource.add_title(LocalizedString {
200                                        text: text_content.clone(),
201                                        language_code: None,
202                                        script: None,
203                                    });
204                                    None
205                                }
206                                "Duration" => {
207                                    resource.set_duration_from_text(text_content.clone());
208                                    None
209                                }
210                                "Resource" => {
211                                    // Complete resource - convert using ToCore trait
212                                    let core_resource =
213                                        resource.clone().to_core().map_err(|e| {
214                                            ParseError::ConversionError {
215                                                from: "StreamingResource".to_string(),
216                                                to: "Resource".to_string(),
217                                                message: format!(
218                                                    "Failed to convert resource at {}: {:?}",
219                                                    location, e
220                                                ),
221                                            }
222                                        })?;
223                                    self.state = ParserState::Initial;
224                                    Some(StreamingElement::Resource(core_resource))
225                                }
226                                _ => None,
227                            }
228                        }
229                        ParserState::InParty(party) => {
230                            match name {
231                                "PartyName" => {
232                                    party.add_name(LocalizedString {
233                                        text: text_content.clone(),
234                                        language_code: None,
235                                        script: None,
236                                    });
237                                    None
238                                }
239                                "Party" => {
240                                    // Complete party - convert using ToCore trait
241                                    let core_party = party.clone().to_core().map_err(|e| {
242                                        ParseError::ConversionError {
243                                            from: "StreamingParty".to_string(),
244                                            to: "Party".to_string(),
245                                            message: format!("Failed to convert party at {}: {:?}", location, e),
246                                        }
247                                    })?;
248                                    self.state = ParserState::Initial;
249                                    Some(StreamingElement::Party(core_party))
250                                }
251                                _ => None,
252                            }
253                        }
254                        _ => None,
255                    };
256
257                    self.current_depth = self.current_depth.saturating_sub(1);
258                    self.current_path.pop();
259                    self.text_buffer.clear();
260
261                    if let Some(element) = result {
262                        self.elements_yielded += 1;
263                        return Ok(Some(element));
264                    }
265                }
266                Ok(Event::Text(e)) => {
267                    let text = std::str::from_utf8(&e)?;
268                    self.text_buffer.push_str(text.trim());
269                }
270                Ok(Event::Eof) => {
271                    return Ok(Some(StreamingElement::EndOfStream));
272                }
273                Ok(_) => {
274                    // Skip other events
275                }
276                Err(e) => {
277                    return Err(ParseError::XmlError(format!("XML parsing error: {}", e)));
278                }
279            }
280
281            self.bytes_processed = bytes_position;
282
283            // Check security limits
284            if self.current_depth > 100 {
285                return Err(ParseError::SecurityViolation {
286                    message: "Nesting depth exceeds 100 levels".to_string(),
287                });
288            }
289        }
290    }
291
292    // Helper methods for error handling
293
294    fn get_current_location(&self) -> String {
295        format!("streaming at byte offset {}", self.bytes_processed)
296    }
297
298    pub fn stats(&self) -> ComprehensiveStats {
299        ComprehensiveStats {
300            bytes_processed: self.bytes_processed,
301            elements_yielded: self.elements_yielded,
302            current_depth: self.current_depth,
303            elapsed: self.start_time.elapsed(),
304        }
305    }
306}
307
308/// Iterator wrapper for comprehensive streaming parser
309pub struct ComprehensiveStreamIterator<R: BufRead> {
310    parser: ComprehensiveStreamingParser<R>,
311    finished: bool,
312}
313
314impl<R: BufRead> ComprehensiveStreamIterator<R> {
315    pub fn new(reader: R, version: ERNVersion) -> Self {
316        Self {
317            parser: ComprehensiveStreamingParser::new(reader, version),
318            finished: false,
319        }
320    }
321
322    pub fn stats(&self) -> ComprehensiveStats {
323        self.parser.stats()
324    }
325}
326
327impl<R: BufRead> Iterator for ComprehensiveStreamIterator<R> {
328    type Item = Result<StreamingElement, ParseError>;
329
330    fn next(&mut self) -> Option<Self::Item> {
331        if self.finished {
332            return None;
333        }
334
335        match self.parser.parse_next() {
336            Ok(Some(element)) => {
337                if matches!(element, StreamingElement::EndOfStream) {
338                    self.finished = true;
339                }
340                Some(Ok(element))
341            }
342            Ok(None) => {
343                self.finished = true;
344                None
345            }
346            Err(e) => {
347                self.finished = true;
348                Some(Err(e))
349            }
350        }
351    }
352}
353
354#[derive(Debug, Clone)]
355pub struct ComprehensiveStats {
356    pub bytes_processed: u64,
357    pub elements_yielded: usize,
358    pub current_depth: usize,
359    pub elapsed: std::time::Duration,
360}
361
362impl ComprehensiveStats {
363    pub fn throughput_mibs(&self) -> f64 {
364        if self.elapsed.as_secs_f64() > 0.0 {
365            (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
366        } else {
367            0.0
368        }
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use std::io::Cursor;
376
377    #[test]
378    fn test_comprehensive_streaming_parser() {
379        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
380<ERNMessage xmlns="http://ddex.net/xml/ern/43">
381    <MessageHeader>
382        <MessageId>test-message-1</MessageId>
383        <MessageCreatedDateTime>2023-01-01T00:00:00</MessageCreatedDateTime>
384    </MessageHeader>
385    <Release ReleaseReference="REL001">
386        <ReleaseTitle>Test Release</ReleaseTitle>
387        <Genre>Rock</Genre>
388    </Release>
389    <Resource ResourceReference="RES001">
390        <Title>Test Resource</Title>
391        <Duration>180</Duration>
392    </Resource>
393</ERNMessage>"#;
394
395        let cursor = Cursor::new(xml.as_bytes());
396        let iterator = ComprehensiveStreamIterator::new(cursor, ERNVersion::V4_3);
397
398        let elements: Result<Vec<_>, _> = iterator.collect();
399        if let Err(ref e) = elements {
400            error!("Iterator failed to collect elements: {:?}", e);
401        }
402        assert!(elements.is_ok(), "Iterator failed with error: {:?}", elements.as_ref().err());
403
404        let elements = elements.unwrap();
405        assert!(elements.len() >= 3); // Header, Release, Resource, EndOfStream
406
407        // Check we got the expected elements
408        let has_header = elements
409            .iter()
410            .any(|e| matches!(e, StreamingElement::Header(_)));
411        let has_release = elements
412            .iter()
413            .any(|e| matches!(e, StreamingElement::Release(_)));
414        let has_resource = elements
415            .iter()
416            .any(|e| matches!(e, StreamingElement::Resource(_)));
417        let has_end_stream = elements
418            .iter()
419            .any(|e| matches!(e, StreamingElement::EndOfStream));
420
421        assert!(has_header, "Should parse message header");
422        assert!(has_release, "Should parse release");
423        assert!(has_resource, "Should parse resource");
424        assert!(has_end_stream, "Should have end of stream marker");
425    }
426
427    #[test]
428    fn test_comprehensive_security_limits() {
429        // Create deeply nested XML
430        let mut xml = String::from(r#"<?xml version="1.0"?>"#);
431        for i in 0..150 {
432            xml.push_str(&format!("<level{}>", i));
433        }
434        xml.push_str("content");
435        for i in (0..150).rev() {
436            xml.push_str(&format!("</level{}>", i));
437        }
438
439        let cursor = Cursor::new(xml.as_bytes());
440        let mut iterator = ComprehensiveStreamIterator::new(cursor, ERNVersion::V4_3);
441
442        // Should get a security violation
443        let result = iterator.next();
444        assert!(result.is_some());
445        match result.unwrap() {
446            Err(ParseError::SecurityViolation { .. }) => {
447                // Expected
448            }
449            _ => panic!("Expected security violation"),
450        }
451    }
452}