ddex_parser/streaming/
aligned_comprehensive.rs

1// src/streaming/aligned_comprehensive.rs
2//! Model-aligned comprehensive streaming parser using builders
3
4#[allow(dead_code)] // Experimental streaming parser implementation
5use crate::error::ParseError;
6use ddex_core::models::streaming_types::builders::*;
7use ddex_core::models::streaming_types::*;
8use ddex_core::models::IdentifierType;
9use ddex_core::models::{graph::*, versions::ERNVersion};
10use quick_xml::{events::{Event, BytesStart}, Reader};
11use std::collections::HashMap;
12use std::io::BufRead;
13use std::time::Instant;
14
15/// Aligned streaming element using proper core types
16#[derive(Debug, Clone)]
17pub enum AlignedStreamingElement {
18    Header(Box<MessageHeader>),
19    Release(Release),
20    Resource(Resource),
21    Party(Party),
22    EndOfStream,
23}
24
25/// Parser state using builder pattern
26#[derive(Debug)]
27enum AlignedParserState {
28    Initial,
29    InHeader(Box<MessageHeaderBuilder>),
30    InRelease(Box<ReleaseBuilder>),
31    InResource(Box<ResourceBuilder>),
32    InParty(Box<PartyBuilder>),
33    Complete,
34}
35
36/// Model-aligned streaming parser
37pub struct AlignedStreamingParser<R: BufRead> {
38    reader: Reader<R>,
39    buffer: Vec<u8>,
40    state: AlignedParserState,
41    current_path: Vec<String>,
42    current_depth: usize,
43    text_buffer: String,
44    attributes: HashMap<String, String>,
45    bytes_processed: u64,
46    elements_yielded: usize,
47    start_time: Instant,
48}
49
50impl<R: BufRead> AlignedStreamingParser<R> {
51    pub fn new(reader: R, _version: ERNVersion) -> Self {
52        let mut xml_reader = Reader::from_reader(reader);
53        xml_reader.config_mut().trim_text(true);
54        xml_reader.config_mut().check_end_names = true;
55
56        Self {
57            reader: xml_reader,
58            buffer: Vec::with_capacity(8192),
59            state: AlignedParserState::Initial,
60            current_path: Vec::new(),
61            current_depth: 0,
62            text_buffer: String::new(),
63            attributes: HashMap::new(),
64            bytes_processed: 0,
65            elements_yielded: 0,
66            start_time: Instant::now(),
67        }
68    }
69
70    pub fn parse_next(&mut self) -> Result<Option<AlignedStreamingElement>, ParseError> {
71        loop {
72            self.buffer.clear();
73            let event = self.reader.read_event_into(&mut self.buffer)?;
74            match event {
75                Event::Start(e) => {
76                    let name_bytes = e.name();
77                    let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
78
79                    // Extract attributes into a temporary structure
80                    let mut attributes = HashMap::new();
81                    for attr_result in e.attributes() {
82                        let attr = attr_result.map_err(|e| ParseError::XmlError(format!("Attribute error: {}", e)))?;
83
84                        let key = std::str::from_utf8(attr.key.as_ref())?;
85                        let value = std::str::from_utf8(&attr.value)?;
86
87                        attributes.insert(key.to_string(), value.to_string());
88                    }
89
90                    // Store the attributes
91                    self.attributes = attributes;
92
93                    self.handle_start_element_by_name(&name)?;
94                }
95                Event::End(e) => {
96                    let name_bytes = e.name();
97                    let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
98                    if let Some(element) = self.handle_end_element_by_name(&name)? {
99                        self.elements_yielded += 1;
100                        return Ok(Some(element));
101                    }
102                }
103                Event::Text(e) => {
104                    let text = std::str::from_utf8(&e)?;
105                    self.text_buffer.push_str(text.trim());
106                }
107                Event::Eof => {
108                    return Ok(Some(AlignedStreamingElement::EndOfStream));
109                }
110                _ => {
111                    // Skip other events
112                }
113            }
114
115            self.bytes_processed = self.reader.buffer_position();
116
117            // Check security limits
118            if self.current_depth > 100 {
119                return Err(ParseError::SecurityViolation {
120                    message: "Nesting depth exceeds 100 levels".to_string(),
121                });
122            }
123
124            self.buffer.clear();
125        }
126    }
127
128
129    fn handle_start_element_by_name(&mut self, name: &str) -> Result<(), ParseError> {
130        self.current_path.push(name.to_string());
131        self.current_depth += 1;
132
133        self.text_buffer.clear();
134
135        // State transitions using builders
136        match (&self.state, name) {
137            (AlignedParserState::Initial, "MessageHeader") => {
138                self.state = AlignedParserState::InHeader(Box::new(MessageHeaderBuilder::new()));
139            }
140            (AlignedParserState::Initial, "Release") => {
141                let reference = self
142                    .attributes
143                    .get("ReleaseReference")
144                    .cloned()
145                    .unwrap_or_else(|| format!("REL_{}", self.elements_yielded));
146                self.state =
147                    AlignedParserState::InRelease(Box::new(ReleaseBuilder::new(reference)));
148            }
149            (AlignedParserState::Initial, "Resource") => {
150                let reference = self
151                    .attributes
152                    .get("ResourceReference")
153                    .cloned()
154                    .unwrap_or_else(|| format!("RES_{}", self.elements_yielded));
155                self.state =
156                    AlignedParserState::InResource(Box::new(ResourceBuilder::new(reference)));
157            }
158            (AlignedParserState::Initial, "Party") => {
159                let reference = self.attributes.get("PartyReference").cloned();
160                self.state = AlignedParserState::InParty(Box::new(PartyBuilder::new(reference)));
161            }
162            _ => {
163                // Continue in current state
164            }
165        }
166
167        Ok(())
168    }
169
170    fn handle_end_element_by_name(
171        &mut self,
172        name: &str,
173    ) -> Result<Option<AlignedStreamingElement>, ParseError> {
174        let text_content = self.text_buffer.clone();
175
176        let result = match &mut self.state {
177            AlignedParserState::InHeader(builder) => {
178                match name {
179                    "MessageId" => {
180                        builder.set_message_id(text_content);
181                        None
182                    }
183                    "MessageCreatedDateTime" => {
184                        builder.set_created_date_time_from_text(text_content);
185                        None
186                    }
187                    "MessageSender" => {
188                        // For simplicity, create a basic sender
189                        let sender = create_message_sender(
190                            text_content.clone(),
191                            Some(format!("SENDER_{}", self.elements_yielded)),
192                        );
193                        builder.set_sender(sender);
194                        None
195                    }
196                    "MessageRecipient" => {
197                        let recipient = create_message_recipient(text_content);
198                        builder.set_recipient(recipient);
199                        None
200                    }
201                    "MessageHeader" => {
202                        // Complete header - use builder to create element
203                        let builder =
204                            std::mem::replace(&mut self.state, AlignedParserState::Initial);
205                        if let AlignedParserState::InHeader(header_builder) = builder {
206                            match header_builder.to_core() {
207                                Ok(header) => {
208                                    Some(AlignedStreamingElement::Header(Box::new(header)))
209                                }
210                                Err(e) => {
211                                    eprintln!("Warning: Header validation failed: {}", e);
212                                    // Create a minimal valid header
213                                    let header = self.create_fallback_header();
214                                    Some(AlignedStreamingElement::Header(Box::new(header)))
215                                }
216                            }
217                        } else {
218                            None
219                        }
220                    }
221                    _ => None,
222                }
223            }
224            AlignedParserState::InRelease(builder) => {
225                match name {
226                    "ReleaseTitle" => {
227                        let title = create_localized_string(
228                            text_content,
229                            self.attributes.get("LanguageCode").cloned(),
230                        );
231                        builder.add_title(title);
232                        None
233                    }
234                    "Genre" => {
235                        let genre = create_genre(text_content, None);
236                        builder.add_genre(genre);
237                        None
238                    }
239                    "DisplayArtist" => {
240                        let artist = create_artist(text_content, "MainArtist".to_string(), None);
241                        builder.add_artist(artist);
242                        None
243                    }
244                    "ReleaseType" => {
245                        let release_type = match text_content.as_str() {
246                            "Album" => ReleaseType::Album,
247                            "Single" => ReleaseType::Single,
248                            "EP" => ReleaseType::EP,
249                            _ => ReleaseType::Other(text_content),
250                        };
251                        builder.set_release_type(release_type);
252                        None
253                    }
254                    "Release" => {
255                        // Complete release - use builder
256                        let builder =
257                            std::mem::replace(&mut self.state, AlignedParserState::Initial);
258                        if let AlignedParserState::InRelease(release_builder) = builder {
259                            match release_builder.to_core() {
260                                Ok(release) => Some(AlignedStreamingElement::Release(release)),
261                                Err(e) => {
262                                    eprintln!("Warning: Release validation failed: {}", e);
263                                    None
264                                }
265                            }
266                        } else {
267                            None
268                        }
269                    }
270                    _ => None,
271                }
272            }
273            AlignedParserState::InResource(builder) => {
274                match name {
275                    "Title" => {
276                        let title = create_localized_string(
277                            text_content,
278                            self.attributes.get("LanguageCode").cloned(),
279                        );
280                        builder.add_title(title);
281                        None
282                    }
283                    "Duration" => {
284                        builder.set_duration_from_text(text_content);
285                        None
286                    }
287                    "ResourceType" => {
288                        let resource_type = match text_content.as_str() {
289                            "SoundRecording" => ResourceType::SoundRecording,
290                            "Video" => ResourceType::Video,
291                            "Image" => ResourceType::Image,
292                            "Text" => ResourceType::Text,
293                            "SheetMusic" => ResourceType::SheetMusic,
294                            _ => ResourceType::SoundRecording, // Default
295                        };
296                        builder.set_resource_type(resource_type);
297                        None
298                    }
299                    "ISRC" => {
300                        let identifier = create_identifier(
301                            text_content,
302                            IdentifierType::ISRC,
303                            Some("ISRC".to_string()),
304                        );
305                        builder.add_identifier(identifier);
306                        None
307                    }
308                    "Resource" => {
309                        // Complete resource - use builder
310                        let builder =
311                            std::mem::replace(&mut self.state, AlignedParserState::Initial);
312                        if let AlignedParserState::InResource(resource_builder) = builder {
313                            match resource_builder.to_core() {
314                                Ok(resource) => Some(AlignedStreamingElement::Resource(resource)),
315                                Err(e) => {
316                                    eprintln!("Warning: Resource validation failed: {}", e);
317                                    None
318                                }
319                            }
320                        } else {
321                            None
322                        }
323                    }
324                    _ => None,
325                }
326            }
327            AlignedParserState::InParty(builder) => {
328                match name {
329                    "PartyName" => {
330                        let name = create_localized_string(
331                            text_content,
332                            self.attributes.get("LanguageCode").cloned(),
333                        );
334                        builder.add_name(name);
335                        None
336                    }
337                    "ISNI" => {
338                        builder.set_isni(text_content);
339                        None
340                    }
341                    "PartyRole" => {
342                        let role = match text_content.as_str() {
343                            "Artist" => PartyRole::Artist,
344                            "Producer" => PartyRole::Producer,
345                            "Composer" => PartyRole::Composer,
346                            "Lyricist" => PartyRole::Lyricist,
347                            "Publisher" => PartyRole::Publisher,
348                            "Performer" => PartyRole::Performer,
349                            "Engineer" => PartyRole::Engineer,
350                            "Label" => PartyRole::Label,
351                            "Distributor" => PartyRole::Distributor,
352                            _ => PartyRole::Other(text_content),
353                        };
354                        builder.add_role(role);
355                        None
356                    }
357                    "Party" => {
358                        // Complete party - use builder
359                        let builder =
360                            std::mem::replace(&mut self.state, AlignedParserState::Initial);
361                        if let AlignedParserState::InParty(party_builder) = builder {
362                            match party_builder.to_core() {
363                                Ok(party) => Some(AlignedStreamingElement::Party(party)),
364                                Err(e) => {
365                                    eprintln!("Warning: Party validation failed: {}", e);
366                                    None
367                                }
368                            }
369                        } else {
370                            None
371                        }
372                    }
373                    _ => None,
374                }
375            }
376            _ => None,
377        };
378
379        self.current_depth = self.current_depth.saturating_sub(1);
380        self.current_path.pop();
381        self.text_buffer.clear();
382
383        Ok(result)
384    }
385
386    fn create_fallback_header(&self) -> MessageHeader {
387        MessageHeader {
388            message_id: "FALLBACK_MSG".to_string(),
389            message_type: MessageType::NewReleaseMessage,
390            message_created_date_time: chrono::Utc::now(),
391            message_sender: create_message_sender("Unknown Sender".to_string(), None),
392            message_recipient: create_message_recipient("Unknown Recipient".to_string()),
393            message_control_type: None,
394            message_thread_id: None,
395            attributes: None,
396            extensions: None,
397            comments: None,
398        }
399    }
400
401    fn get_current_location(&self) -> String {
402        format!("aligned_streaming:{}:0", self.bytes_processed)
403    }
404
405    pub fn stats(&self) -> AlignedStats {
406        AlignedStats {
407            bytes_processed: self.bytes_processed,
408            elements_yielded: self.elements_yielded,
409            current_depth: self.current_depth,
410            elapsed: self.start_time.elapsed(),
411        }
412    }
413}
414
415/// Iterator wrapper for aligned streaming parser
416pub struct AlignedStreamIterator<R: BufRead> {
417    parser: AlignedStreamingParser<R>,
418    finished: bool,
419}
420
421impl<R: BufRead> AlignedStreamIterator<R> {
422    pub fn new(reader: R, version: ERNVersion) -> Self {
423        Self {
424            parser: AlignedStreamingParser::new(reader, version),
425            finished: false,
426        }
427    }
428
429    pub fn stats(&self) -> AlignedStats {
430        self.parser.stats()
431    }
432}
433
434impl<R: BufRead> Iterator for AlignedStreamIterator<R> {
435    type Item = Result<AlignedStreamingElement, ParseError>;
436
437    fn next(&mut self) -> Option<Self::Item> {
438        if self.finished {
439            return None;
440        }
441
442        match self.parser.parse_next() {
443            Ok(Some(element)) => {
444                if matches!(element, AlignedStreamingElement::EndOfStream) {
445                    self.finished = true;
446                }
447                Some(Ok(element))
448            }
449            Ok(None) => {
450                self.finished = true;
451                None
452            }
453            Err(e) => {
454                self.finished = true;
455                Some(Err(e))
456            }
457        }
458    }
459}
460
461#[derive(Debug, Clone)]
462pub struct AlignedStats {
463    pub bytes_processed: u64,
464    pub elements_yielded: usize,
465    pub current_depth: usize,
466    pub elapsed: std::time::Duration,
467}
468
469impl AlignedStats {
470    pub fn throughput_mibs(&self) -> f64 {
471        if self.elapsed.as_secs_f64() > 0.0 {
472            (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
473        } else {
474            0.0
475        }
476    }
477}
478
479#[cfg(test)]
480mod tests {
481    use super::*;
482    use std::io::Cursor;
483
484    #[test]
485    fn test_aligned_streaming_parser_with_builders() {
486        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
487<ERNMessage xmlns="http://ddex.net/xml/ern/43">
488    <MessageHeader>
489        <MessageId>UMG-2024-NEW-RELEASE-001</MessageId>
490        <MessageCreatedDateTime>2024-03-15T14:30:00Z</MessageCreatedDateTime>
491        <MessageSender>Universal Music Group</MessageSender>
492        <MessageRecipient>Spotify Technology</MessageRecipient>
493    </MessageHeader>
494    <Release ReleaseReference="TAYLOR_SWIFT_MIDNIGHTS_DELUXE">
495        <ReleaseTitle>Midnights (3am Edition)</ReleaseTitle>
496        <Genre>Pop</Genre>
497        <ReleaseType>Album</ReleaseType>
498        <DisplayArtist>Taylor Swift</DisplayArtist>
499    </Release>
500    <Resource ResourceReference="ANTI_HERO_TRACK">
501        <Title>Anti-Hero</Title>
502        <Duration>200</Duration>
503        <ResourceType>SoundRecording</ResourceType>
504        <ISRC>USUA12204925</ISRC>
505    </Resource>
506    <Party PartyReference="TAYLOR_SWIFT_ARTIST">
507        <PartyName>Taylor Swift</PartyName>
508        <PartyRole>Artist</PartyRole>
509        <ISNI>0000000368570204</ISNI>
510    </Party>
511</ERNMessage>"#;
512
513        let cursor = Cursor::new(xml.as_bytes());
514        let iterator = AlignedStreamIterator::new(cursor, ERNVersion::V4_3);
515
516        let elements: Result<Vec<_>, _> = iterator.collect();
517        assert!(elements.is_ok());
518
519        let elements = elements.unwrap();
520        assert!(elements.len() >= 4); // Header, Release, Resource, Party, EndOfStream
521
522        // Verify proper type construction
523        let has_header = elements
524            .iter()
525            .any(|e| matches!(e, AlignedStreamingElement::Header(_)));
526        let has_release = elements
527            .iter()
528            .any(|e| matches!(e, AlignedStreamingElement::Release(_)));
529        let has_resource = elements
530            .iter()
531            .any(|e| matches!(e, AlignedStreamingElement::Resource(_)));
532        let has_party = elements
533            .iter()
534            .any(|e| matches!(e, AlignedStreamingElement::Party(_)));
535
536        assert!(
537            has_header,
538            "Should parse message header using MessageHeaderBuilder"
539        );
540        assert!(has_release, "Should parse release using ReleaseBuilder");
541        assert!(has_resource, "Should parse resource using ResourceBuilder");
542        assert!(has_party, "Should parse party using PartyBuilder");
543
544        // Verify field mapping
545        for element in &elements {
546            match element {
547                AlignedStreamingElement::Header(header) => {
548                    assert_eq!(header.message_id, "UMG-2024-NEW-RELEASE-001");
549                    assert_eq!(header.message_sender.party_name[0].text, "Universal Music Group");
550                }
551                AlignedStreamingElement::Release(release) => {
552                    assert_eq!(release.release_reference, "TAYLOR_SWIFT_MIDNIGHTS_DELUXE");
553                    assert_eq!(release.release_title[0].text, "Midnights (3am Edition)");
554                    assert_eq!(release.genre[0].genre_text, "Pop");
555                    assert_eq!(release.release_type, Some(ReleaseType::Album));
556                }
557                AlignedStreamingElement::Resource(resource) => {
558                    assert_eq!(resource.resource_reference, "ANTI_HERO_TRACK");
559                    assert_eq!(resource.reference_title[0].text, "Anti-Hero");
560                    assert_eq!(resource.duration, Some(std::time::Duration::from_secs(200)));
561                    assert_eq!(resource.resource_type, ResourceType::SoundRecording);
562                }
563                AlignedStreamingElement::Party(party) => {
564                    assert_eq!(party.party_name[0].text, "Taylor Swift");
565                    assert_eq!(party.isni, Some("0000000368570204".to_string())); // Real ISNI for Taylor Swift
566                    assert!(party.party_role.contains(&PartyRole::Artist));
567                }
568                _ => {}
569            }
570        }
571    }
572
573    #[test]
574    fn test_builder_validation() {
575        let xml = r#"<?xml version="1.0"?>
576<ERNMessage>
577    <Release>
578        <!-- Missing required fields -->
579    </Release>
580</ERNMessage>"#;
581
582        let cursor = Cursor::new(xml.as_bytes());
583        let mut iterator = AlignedStreamIterator::new(cursor, ERNVersion::V4_3);
584
585        // Should handle validation gracefully
586        let elements: Vec<_> = iterator.collect();
587        // Should not crash despite missing required fields
588        assert!(!elements.is_empty());
589    }
590
591    #[test]
592    fn test_conversion_traits() {
593        // Test ToCore trait
594        let mut builder = ReleaseBuilder::new("FOLKLORE_DELUXE".to_string());
595        builder.add_title(create_localized_string("Folklore (Deluxe Version)".to_string(), None));
596
597        let release = builder.to_core().unwrap();
598        assert_eq!(release.release_reference, "FOLKLORE_DELUXE");
599        assert_eq!(release.release_title[0].text, "Folklore (Deluxe Version)");
600
601        // Test Validate trait
602        let empty_builder = ReleaseBuilder::default();
603        assert!(!empty_builder.is_complete());
604        assert!(empty_builder.validate().is_err());
605    }
606}