ddex_parser/streaming/
aligned_comprehensive.rs

1// src/streaming/aligned_comprehensive.rs
2//! Model-aligned comprehensive streaming parser using builders
3
4#[allow(dead_code)] // Experimental streaming parser implementation
5use crate::error::{ErrorLocation, ParseError};
6use ddex_core::models::streaming_types::builders::*;
7use ddex_core::models::streaming_types::*;
8use ddex_core::models::IdentifierType;
9use ddex_core::models::{graph::*, versions::ERNVersion};
10use quick_xml::{events::{Event, BytesStart}, Reader};
11use std::collections::HashMap;
12use std::io::BufRead;
13use std::time::Instant;
14
15/// Aligned streaming element using proper core types
16#[derive(Debug, Clone)]
17pub enum AlignedStreamingElement {
18    Header(Box<MessageHeader>),
19    Release(Release),
20    Resource(Resource),
21    Party(Party),
22    EndOfStream,
23}
24
25/// Parser state using builder pattern
26#[derive(Debug)]
27enum AlignedParserState {
28    Initial,
29    InHeader(Box<MessageHeaderBuilder>),
30    InRelease(Box<ReleaseBuilder>),
31    InResource(Box<ResourceBuilder>),
32    InParty(Box<PartyBuilder>),
33    Complete,
34}
35
36/// Model-aligned streaming parser
37pub struct AlignedStreamingParser<R: BufRead> {
38    reader: Reader<R>,
39    buffer: Vec<u8>,
40    state: AlignedParserState,
41    current_path: Vec<String>,
42    current_depth: usize,
43    text_buffer: String,
44    attributes: HashMap<String, String>,
45    bytes_processed: u64,
46    elements_yielded: usize,
47    start_time: Instant,
48}
49
50impl<R: BufRead> AlignedStreamingParser<R> {
51    pub fn new(reader: R, _version: ERNVersion) -> Self {
52        let mut xml_reader = Reader::from_reader(reader);
53        xml_reader.config_mut().trim_text(true);
54        xml_reader.config_mut().check_end_names = true;
55
56        Self {
57            reader: xml_reader,
58            buffer: Vec::with_capacity(8192),
59            state: AlignedParserState::Initial,
60            current_path: Vec::new(),
61            current_depth: 0,
62            text_buffer: String::new(),
63            attributes: HashMap::new(),
64            bytes_processed: 0,
65            elements_yielded: 0,
66            start_time: Instant::now(),
67        }
68    }
69
70    pub fn parse_next(&mut self) -> Result<Option<AlignedStreamingElement>, ParseError> {
71        loop {
72            self.buffer.clear();
73            let event = self.reader.read_event_into(&mut self.buffer)?;
74            match event {
75                Event::Start(e) => {
76                    let name_bytes = e.name();
77                    let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
78
79                    // Extract attributes into a temporary structure
80                    let mut attributes = HashMap::new();
81                    for attr_result in e.attributes() {
82                        let attr = attr_result.map_err(|e| ParseError::XmlError {
83                            message: format!("Attribute error: {}", e),
84                            location: crate::error::ErrorLocation::default(),
85                        })?;
86
87                        let key = std::str::from_utf8(attr.key.as_ref())?;
88                        let value = std::str::from_utf8(&attr.value)?;
89
90                        attributes.insert(key.to_string(), value.to_string());
91                    }
92
93                    // Store the attributes
94                    self.attributes = attributes;
95
96                    self.handle_start_element_by_name(&name)?;
97                }
98                Event::End(e) => {
99                    let name_bytes = e.name();
100                    let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
101                    if let Some(element) = self.handle_end_element_by_name(&name)? {
102                        self.elements_yielded += 1;
103                        return Ok(Some(element));
104                    }
105                }
106                Event::Text(e) => {
107                    let text = std::str::from_utf8(&e)?;
108                    self.text_buffer.push_str(text.trim());
109                }
110                Event::Eof => {
111                    return Ok(Some(AlignedStreamingElement::EndOfStream));
112                }
113                _ => {
114                    // Skip other events
115                }
116            }
117
118            self.bytes_processed = self.reader.buffer_position();
119
120            // Check security limits
121            if self.current_depth > 100 {
122                return Err(ParseError::SecurityViolation {
123                    message: "Nesting depth exceeds 100 levels".to_string(),
124                });
125            }
126
127            self.buffer.clear();
128        }
129    }
130
131
132    fn handle_start_element_by_name(&mut self, name: &str) -> Result<(), ParseError> {
133        self.current_path.push(name.to_string());
134        self.current_depth += 1;
135
136        self.text_buffer.clear();
137
138        // State transitions using builders
139        match (&self.state, name) {
140            (AlignedParserState::Initial, "MessageHeader") => {
141                self.state = AlignedParserState::InHeader(Box::new(MessageHeaderBuilder::new()));
142            }
143            (AlignedParserState::Initial, "Release") => {
144                let reference = self
145                    .attributes
146                    .get("ReleaseReference")
147                    .cloned()
148                    .unwrap_or_else(|| format!("REL_{}", self.elements_yielded));
149                self.state =
150                    AlignedParserState::InRelease(Box::new(ReleaseBuilder::new(reference)));
151            }
152            (AlignedParserState::Initial, "Resource") => {
153                let reference = self
154                    .attributes
155                    .get("ResourceReference")
156                    .cloned()
157                    .unwrap_or_else(|| format!("RES_{}", self.elements_yielded));
158                self.state =
159                    AlignedParserState::InResource(Box::new(ResourceBuilder::new(reference)));
160            }
161            (AlignedParserState::Initial, "Party") => {
162                let reference = self.attributes.get("PartyReference").cloned();
163                self.state = AlignedParserState::InParty(Box::new(PartyBuilder::new(reference)));
164            }
165            _ => {
166                // Continue in current state
167            }
168        }
169
170        Ok(())
171    }
172
173    fn handle_end_element_by_name(
174        &mut self,
175        name: &str,
176    ) -> Result<Option<AlignedStreamingElement>, ParseError> {
177        let text_content = self.text_buffer.clone();
178
179        let result = match &mut self.state {
180            AlignedParserState::InHeader(builder) => {
181                match name {
182                    "MessageId" => {
183                        builder.set_message_id(text_content);
184                        None
185                    }
186                    "MessageCreatedDateTime" => {
187                        builder.set_created_date_time_from_text(text_content);
188                        None
189                    }
190                    "MessageSender" => {
191                        // For simplicity, create a basic sender
192                        let sender = create_message_sender(
193                            text_content.clone(),
194                            Some(format!("SENDER_{}", self.elements_yielded)),
195                        );
196                        builder.set_sender(sender);
197                        None
198                    }
199                    "MessageRecipient" => {
200                        let recipient = create_message_recipient(text_content);
201                        builder.set_recipient(recipient);
202                        None
203                    }
204                    "MessageHeader" => {
205                        // Complete header - use builder to create element
206                        let builder =
207                            std::mem::replace(&mut self.state, AlignedParserState::Initial);
208                        if let AlignedParserState::InHeader(header_builder) = builder {
209                            match header_builder.to_core() {
210                                Ok(header) => {
211                                    Some(AlignedStreamingElement::Header(Box::new(header)))
212                                }
213                                Err(e) => {
214                                    eprintln!("Warning: Header validation failed: {}", e);
215                                    // Create a minimal valid header
216                                    let header = self.create_fallback_header();
217                                    Some(AlignedStreamingElement::Header(Box::new(header)))
218                                }
219                            }
220                        } else {
221                            None
222                        }
223                    }
224                    _ => None,
225                }
226            }
227            AlignedParserState::InRelease(builder) => {
228                match name {
229                    "ReleaseTitle" => {
230                        let title = create_localized_string(
231                            text_content,
232                            self.attributes.get("LanguageCode").cloned(),
233                        );
234                        builder.add_title(title);
235                        None
236                    }
237                    "Genre" => {
238                        let genre = create_genre(text_content, None);
239                        builder.add_genre(genre);
240                        None
241                    }
242                    "DisplayArtist" => {
243                        let artist = create_artist(text_content, "MainArtist".to_string(), None);
244                        builder.add_artist(artist);
245                        None
246                    }
247                    "ReleaseType" => {
248                        let release_type = match text_content.as_str() {
249                            "Album" => ReleaseType::Album,
250                            "Single" => ReleaseType::Single,
251                            "EP" => ReleaseType::EP,
252                            _ => ReleaseType::Other(text_content),
253                        };
254                        builder.set_release_type(release_type);
255                        None
256                    }
257                    "Release" => {
258                        // Complete release - use builder
259                        let builder =
260                            std::mem::replace(&mut self.state, AlignedParserState::Initial);
261                        if let AlignedParserState::InRelease(release_builder) = builder {
262                            match release_builder.to_core() {
263                                Ok(release) => Some(AlignedStreamingElement::Release(release)),
264                                Err(e) => {
265                                    eprintln!("Warning: Release validation failed: {}", e);
266                                    None
267                                }
268                            }
269                        } else {
270                            None
271                        }
272                    }
273                    _ => None,
274                }
275            }
276            AlignedParserState::InResource(builder) => {
277                match name {
278                    "Title" => {
279                        let title = create_localized_string(
280                            text_content,
281                            self.attributes.get("LanguageCode").cloned(),
282                        );
283                        builder.add_title(title);
284                        None
285                    }
286                    "Duration" => {
287                        builder.set_duration_from_text(text_content);
288                        None
289                    }
290                    "ResourceType" => {
291                        let resource_type = match text_content.as_str() {
292                            "SoundRecording" => ResourceType::SoundRecording,
293                            "Video" => ResourceType::Video,
294                            "Image" => ResourceType::Image,
295                            "Text" => ResourceType::Text,
296                            "SheetMusic" => ResourceType::SheetMusic,
297                            _ => ResourceType::SoundRecording, // Default
298                        };
299                        builder.set_resource_type(resource_type);
300                        None
301                    }
302                    "ISRC" => {
303                        let identifier = create_identifier(
304                            text_content,
305                            IdentifierType::ISRC,
306                            Some("ISRC".to_string()),
307                        );
308                        builder.add_identifier(identifier);
309                        None
310                    }
311                    "Resource" => {
312                        // Complete resource - use builder
313                        let builder =
314                            std::mem::replace(&mut self.state, AlignedParserState::Initial);
315                        if let AlignedParserState::InResource(resource_builder) = builder {
316                            match resource_builder.to_core() {
317                                Ok(resource) => Some(AlignedStreamingElement::Resource(resource)),
318                                Err(e) => {
319                                    eprintln!("Warning: Resource validation failed: {}", e);
320                                    None
321                                }
322                            }
323                        } else {
324                            None
325                        }
326                    }
327                    _ => None,
328                }
329            }
330            AlignedParserState::InParty(builder) => {
331                match name {
332                    "PartyName" => {
333                        let name = create_localized_string(
334                            text_content,
335                            self.attributes.get("LanguageCode").cloned(),
336                        );
337                        builder.add_name(name);
338                        None
339                    }
340                    "ISNI" => {
341                        builder.set_isni(text_content);
342                        None
343                    }
344                    "PartyRole" => {
345                        let role = match text_content.as_str() {
346                            "Artist" => PartyRole::Artist,
347                            "Producer" => PartyRole::Producer,
348                            "Composer" => PartyRole::Composer,
349                            "Lyricist" => PartyRole::Lyricist,
350                            "Publisher" => PartyRole::Publisher,
351                            "Performer" => PartyRole::Performer,
352                            "Engineer" => PartyRole::Engineer,
353                            "Label" => PartyRole::Label,
354                            "Distributor" => PartyRole::Distributor,
355                            _ => PartyRole::Other(text_content),
356                        };
357                        builder.add_role(role);
358                        None
359                    }
360                    "Party" => {
361                        // Complete party - use builder
362                        let builder =
363                            std::mem::replace(&mut self.state, AlignedParserState::Initial);
364                        if let AlignedParserState::InParty(party_builder) = builder {
365                            match party_builder.to_core() {
366                                Ok(party) => Some(AlignedStreamingElement::Party(party)),
367                                Err(e) => {
368                                    eprintln!("Warning: Party validation failed: {}", e);
369                                    None
370                                }
371                            }
372                        } else {
373                            None
374                        }
375                    }
376                    _ => None,
377                }
378            }
379            _ => None,
380        };
381
382        self.current_depth = self.current_depth.saturating_sub(1);
383        self.current_path.pop();
384        self.text_buffer.clear();
385
386        Ok(result)
387    }
388
389    fn create_fallback_header(&self) -> MessageHeader {
390        MessageHeader {
391            message_id: "FALLBACK_MSG".to_string(),
392            message_type: MessageType::NewReleaseMessage,
393            message_created_date_time: chrono::Utc::now(),
394            message_sender: create_message_sender("Unknown Sender".to_string(), None),
395            message_recipient: create_message_recipient("Unknown Recipient".to_string()),
396            message_control_type: None,
397            message_thread_id: None,
398            attributes: None,
399            extensions: None,
400            comments: None,
401        }
402    }
403
404    fn get_current_location(&self) -> ErrorLocation {
405        ErrorLocation {
406            line: 0,
407            column: 0,
408            byte_offset: Some(self.bytes_processed as usize),
409            path: "aligned_streaming".to_string(),
410        }
411    }
412
413    pub fn stats(&self) -> AlignedStats {
414        AlignedStats {
415            bytes_processed: self.bytes_processed,
416            elements_yielded: self.elements_yielded,
417            current_depth: self.current_depth,
418            elapsed: self.start_time.elapsed(),
419        }
420    }
421}
422
423/// Iterator wrapper for aligned streaming parser
424pub struct AlignedStreamIterator<R: BufRead> {
425    parser: AlignedStreamingParser<R>,
426    finished: bool,
427}
428
429impl<R: BufRead> AlignedStreamIterator<R> {
430    pub fn new(reader: R, version: ERNVersion) -> Self {
431        Self {
432            parser: AlignedStreamingParser::new(reader, version),
433            finished: false,
434        }
435    }
436
437    pub fn stats(&self) -> AlignedStats {
438        self.parser.stats()
439    }
440}
441
442impl<R: BufRead> Iterator for AlignedStreamIterator<R> {
443    type Item = Result<AlignedStreamingElement, ParseError>;
444
445    fn next(&mut self) -> Option<Self::Item> {
446        if self.finished {
447            return None;
448        }
449
450        match self.parser.parse_next() {
451            Ok(Some(element)) => {
452                if matches!(element, AlignedStreamingElement::EndOfStream) {
453                    self.finished = true;
454                }
455                Some(Ok(element))
456            }
457            Ok(None) => {
458                self.finished = true;
459                None
460            }
461            Err(e) => {
462                self.finished = true;
463                Some(Err(e))
464            }
465        }
466    }
467}
468
469#[derive(Debug, Clone)]
470pub struct AlignedStats {
471    pub bytes_processed: u64,
472    pub elements_yielded: usize,
473    pub current_depth: usize,
474    pub elapsed: std::time::Duration,
475}
476
477impl AlignedStats {
478    pub fn throughput_mibs(&self) -> f64 {
479        if self.elapsed.as_secs_f64() > 0.0 {
480            (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
481        } else {
482            0.0
483        }
484    }
485}
486
487#[cfg(test)]
488mod tests {
489    use super::*;
490    use std::io::Cursor;
491
492    #[test]
493    fn test_aligned_streaming_parser_with_builders() {
494        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
495<ERNMessage xmlns="http://ddex.net/xml/ern/43">
496    <MessageHeader>
497        <MessageId>test-message-1</MessageId>
498        <MessageCreatedDateTime>2023-01-01T00:00:00Z</MessageCreatedDateTime>
499        <MessageSender>Test Sender</MessageSender>
500        <MessageRecipient>Test Recipient</MessageRecipient>
501    </MessageHeader>
502    <Release ReleaseReference="REL001">
503        <ReleaseTitle>Test Release</ReleaseTitle>
504        <Genre>Rock</Genre>
505        <ReleaseType>Album</ReleaseType>
506        <DisplayArtist>Test Artist</DisplayArtist>
507    </Release>
508    <Resource ResourceReference="RES001">
509        <Title>Test Track</Title>
510        <Duration>180</Duration>
511        <ResourceType>SoundRecording</ResourceType>
512        <ISRC>USRC17607839</ISRC>
513    </Resource>
514    <Party PartyReference="PARTY001">
515        <PartyName>Test Party</PartyName>
516        <PartyRole>Artist</PartyRole>
517        <ISNI>0000000123456789</ISNI>
518    </Party>
519</ERNMessage>"#;
520
521        let cursor = Cursor::new(xml.as_bytes());
522        let iterator = AlignedStreamIterator::new(cursor, ERNVersion::V4_3);
523
524        let elements: Result<Vec<_>, _> = iterator.collect();
525        assert!(elements.is_ok());
526
527        let elements = elements.unwrap();
528        assert!(elements.len() >= 4); // Header, Release, Resource, Party, EndOfStream
529
530        // Verify proper type construction
531        let has_header = elements
532            .iter()
533            .any(|e| matches!(e, AlignedStreamingElement::Header(_)));
534        let has_release = elements
535            .iter()
536            .any(|e| matches!(e, AlignedStreamingElement::Release(_)));
537        let has_resource = elements
538            .iter()
539            .any(|e| matches!(e, AlignedStreamingElement::Resource(_)));
540        let has_party = elements
541            .iter()
542            .any(|e| matches!(e, AlignedStreamingElement::Party(_)));
543
544        assert!(
545            has_header,
546            "Should parse message header using MessageHeaderBuilder"
547        );
548        assert!(has_release, "Should parse release using ReleaseBuilder");
549        assert!(has_resource, "Should parse resource using ResourceBuilder");
550        assert!(has_party, "Should parse party using PartyBuilder");
551
552        // Verify field mapping
553        for element in &elements {
554            match element {
555                AlignedStreamingElement::Header(header) => {
556                    assert_eq!(header.message_id, "test-message-1");
557                    assert_eq!(header.message_sender.party_name[0].text, "Test Sender");
558                }
559                AlignedStreamingElement::Release(release) => {
560                    assert_eq!(release.release_reference, "REL001");
561                    assert_eq!(release.release_title[0].text, "Test Release");
562                    assert_eq!(release.genre[0].genre_text, "Rock");
563                    assert_eq!(release.release_type, Some(ReleaseType::Album));
564                }
565                AlignedStreamingElement::Resource(resource) => {
566                    assert_eq!(resource.resource_reference, "RES001");
567                    assert_eq!(resource.reference_title[0].text, "Test Track");
568                    assert_eq!(resource.duration, Some(std::time::Duration::from_secs(180)));
569                    assert_eq!(resource.resource_type, ResourceType::SoundRecording);
570                }
571                AlignedStreamingElement::Party(party) => {
572                    assert_eq!(party.party_name[0].text, "Test Party");
573                    assert_eq!(party.isni, Some("0000000123456789".to_string()));
574                    assert!(party.party_role.contains(&PartyRole::Artist));
575                }
576                _ => {}
577            }
578        }
579    }
580
581    #[test]
582    fn test_builder_validation() {
583        let xml = r#"<?xml version="1.0"?>
584<ERNMessage>
585    <Release>
586        <!-- Missing required fields -->
587    </Release>
588</ERNMessage>"#;
589
590        let cursor = Cursor::new(xml.as_bytes());
591        let mut iterator = AlignedStreamIterator::new(cursor, ERNVersion::V4_3);
592
593        // Should handle validation gracefully
594        let elements: Vec<_> = iterator.collect();
595        // Should not crash despite missing required fields
596        assert!(!elements.is_empty());
597    }
598
599    #[test]
600    fn test_conversion_traits() {
601        // Test ToCore trait
602        let mut builder = ReleaseBuilder::new("TEST_REL".to_string());
603        builder.add_title(create_localized_string("Test Title".to_string(), None));
604
605        let release = builder.to_core().unwrap();
606        assert_eq!(release.release_reference, "TEST_REL");
607        assert_eq!(release.release_title[0].text, "Test Title");
608
609        // Test Validate trait
610        let empty_builder = ReleaseBuilder::default();
611        assert!(!empty_builder.is_complete());
612        assert!(empty_builder.validate().is_err());
613    }
614}