ddex_parser/streaming/
aligned_comprehensive.rs

1// src/streaming/aligned_comprehensive.rs
2//! Model-aligned comprehensive streaming parser using builders
3
4#[allow(dead_code)] // Experimental streaming parser implementation
5use crate::error::ParseError;
6use ddex_core::models::streaming_types::builders::*;
7use ddex_core::models::streaming_types::*;
8use ddex_core::models::IdentifierType;
9use ddex_core::models::{graph::*, versions::ERNVersion};
10use log::warn;
11use quick_xml::{events::{Event, BytesStart}, Reader};
12use std::collections::HashMap;
13use std::io::BufRead;
14use std::time::Instant;
15
16/// Aligned streaming element using proper core types
17#[derive(Debug, Clone)]
18pub enum AlignedStreamingElement {
19    Header(Box<MessageHeader>),
20    Release(Release),
21    Resource(Resource),
22    Party(Party),
23    EndOfStream,
24}
25
26/// Parser state using builder pattern
27#[derive(Debug)]
28enum AlignedParserState {
29    Initial,
30    InHeader(Box<MessageHeaderBuilder>),
31    InRelease(Box<ReleaseBuilder>),
32    InResource(Box<ResourceBuilder>),
33    InParty(Box<PartyBuilder>),
34    Complete,
35}
36
37/// Model-aligned streaming parser
38pub struct AlignedStreamingParser<R: BufRead> {
39    reader: Reader<R>,
40    buffer: Vec<u8>,
41    state: AlignedParserState,
42    current_path: Vec<String>,
43    current_depth: usize,
44    text_buffer: String,
45    attributes: HashMap<String, String>,
46    bytes_processed: u64,
47    elements_yielded: usize,
48    start_time: Instant,
49}
50
51impl<R: BufRead> AlignedStreamingParser<R> {
52    pub fn new(reader: R, _version: ERNVersion) -> Self {
53        let mut xml_reader = Reader::from_reader(reader);
54        xml_reader.config_mut().trim_text(true);
55        xml_reader.config_mut().check_end_names = true;
56
57        Self {
58            reader: xml_reader,
59            buffer: Vec::with_capacity(8192),
60            state: AlignedParserState::Initial,
61            current_path: Vec::new(),
62            current_depth: 0,
63            text_buffer: String::new(),
64            attributes: HashMap::new(),
65            bytes_processed: 0,
66            elements_yielded: 0,
67            start_time: Instant::now(),
68        }
69    }
70
71    pub fn parse_next(&mut self) -> Result<Option<AlignedStreamingElement>, ParseError> {
72        loop {
73            self.buffer.clear();
74            let event = self.reader.read_event_into(&mut self.buffer)?;
75            match event {
76                Event::Start(e) => {
77                    let name_bytes = e.name();
78                    let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
79
80                    // Extract attributes into a temporary structure
81                    let mut attributes = HashMap::new();
82                    for attr_result in e.attributes() {
83                        let attr = attr_result.map_err(|e| ParseError::XmlError(format!("Attribute error: {}", e)))?;
84
85                        let key = std::str::from_utf8(attr.key.as_ref())?;
86                        let value = std::str::from_utf8(&attr.value)?;
87
88                        attributes.insert(key.to_string(), value.to_string());
89                    }
90
91                    // Store the attributes
92                    self.attributes = attributes;
93
94                    self.handle_start_element_by_name(&name)?;
95                }
96                Event::End(e) => {
97                    let name_bytes = e.name();
98                    let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
99                    if let Some(element) = self.handle_end_element_by_name(&name)? {
100                        self.elements_yielded += 1;
101                        return Ok(Some(element));
102                    }
103                }
104                Event::Text(e) => {
105                    let text = std::str::from_utf8(&e)?;
106                    self.text_buffer.push_str(text.trim());
107                }
108                Event::Eof => {
109                    return Ok(Some(AlignedStreamingElement::EndOfStream));
110                }
111                _ => {
112                    // Skip other events
113                }
114            }
115
116            self.bytes_processed = self.reader.buffer_position();
117
118            // Check security limits
119            if self.current_depth > 100 {
120                return Err(ParseError::SecurityViolation {
121                    message: "Nesting depth exceeds 100 levels".to_string(),
122                });
123            }
124
125            self.buffer.clear();
126        }
127    }
128
129
130    fn handle_start_element_by_name(&mut self, name: &str) -> Result<(), ParseError> {
131        self.current_path.push(name.to_string());
132        self.current_depth += 1;
133
134        self.text_buffer.clear();
135
136        // State transitions using builders
137        match (&self.state, name) {
138            (AlignedParserState::Initial, "MessageHeader") => {
139                self.state = AlignedParserState::InHeader(Box::new(MessageHeaderBuilder::new()));
140            }
141            (AlignedParserState::Initial, "Release") => {
142                let reference = self
143                    .attributes
144                    .get("ReleaseReference")
145                    .cloned()
146                    .unwrap_or_else(|| format!("REL_{}", self.elements_yielded));
147                self.state =
148                    AlignedParserState::InRelease(Box::new(ReleaseBuilder::new(reference)));
149            }
150            (AlignedParserState::Initial, "Resource") => {
151                let reference = self
152                    .attributes
153                    .get("ResourceReference")
154                    .cloned()
155                    .unwrap_or_else(|| format!("RES_{}", self.elements_yielded));
156                self.state =
157                    AlignedParserState::InResource(Box::new(ResourceBuilder::new(reference)));
158            }
159            (AlignedParserState::Initial, "Party") => {
160                let reference = self.attributes.get("PartyReference").cloned();
161                self.state = AlignedParserState::InParty(Box::new(PartyBuilder::new(reference)));
162            }
163            _ => {
164                // Continue in current state
165            }
166        }
167
168        Ok(())
169    }
170
171    fn handle_end_element_by_name(
172        &mut self,
173        name: &str,
174    ) -> Result<Option<AlignedStreamingElement>, ParseError> {
175        let text_content = self.text_buffer.clone();
176
177        let result = match &mut self.state {
178            AlignedParserState::InHeader(builder) => {
179                match name {
180                    "MessageId" => {
181                        builder.set_message_id(text_content);
182                        None
183                    }
184                    "MessageCreatedDateTime" => {
185                        builder.set_created_date_time_from_text(text_content);
186                        None
187                    }
188                    "MessageSender" => {
189                        // For simplicity, create a basic sender
190                        let sender = create_message_sender(
191                            text_content.clone(),
192                            Some(format!("SENDER_{}", self.elements_yielded)),
193                        );
194                        builder.set_sender(sender);
195                        None
196                    }
197                    "MessageRecipient" => {
198                        let recipient = create_message_recipient(text_content);
199                        builder.set_recipient(recipient);
200                        None
201                    }
202                    "MessageHeader" => {
203                        // Complete header - use builder to create element
204                        let builder =
205                            std::mem::replace(&mut self.state, AlignedParserState::Initial);
206                        if let AlignedParserState::InHeader(header_builder) = builder {
207                            match header_builder.to_core() {
208                                Ok(header) => {
209                                    Some(AlignedStreamingElement::Header(Box::new(header)))
210                                }
211                                Err(e) => {
212                                    warn!("Header validation failed, using fallback: {}", e);
213                                    // Create a minimal valid header
214                                    let header = self.create_fallback_header();
215                                    Some(AlignedStreamingElement::Header(Box::new(header)))
216                                }
217                            }
218                        } else {
219                            None
220                        }
221                    }
222                    _ => None,
223                }
224            }
225            AlignedParserState::InRelease(builder) => {
226                match name {
227                    "ReleaseTitle" => {
228                        let title = create_localized_string(
229                            text_content,
230                            self.attributes.get("LanguageCode").cloned(),
231                        );
232                        builder.add_title(title);
233                        None
234                    }
235                    "Genre" => {
236                        let genre = create_genre(text_content, None);
237                        builder.add_genre(genre);
238                        None
239                    }
240                    "DisplayArtist" => {
241                        let artist = create_artist(text_content, "MainArtist".to_string(), None);
242                        builder.add_artist(artist);
243                        None
244                    }
245                    "ReleaseType" => {
246                        let release_type = match text_content.as_str() {
247                            "Album" => ReleaseType::Album,
248                            "Single" => ReleaseType::Single,
249                            "EP" => ReleaseType::EP,
250                            _ => ReleaseType::Other(text_content),
251                        };
252                        builder.set_release_type(release_type);
253                        None
254                    }
255                    "Release" => {
256                        // Complete release - use builder
257                        let builder =
258                            std::mem::replace(&mut self.state, AlignedParserState::Initial);
259                        if let AlignedParserState::InRelease(release_builder) = builder {
260                            match release_builder.to_core() {
261                                Ok(release) => Some(AlignedStreamingElement::Release(release)),
262                                Err(e) => {
263                                    warn!("Release validation failed, skipping release: {}", e);
264                                    None
265                                }
266                            }
267                        } else {
268                            None
269                        }
270                    }
271                    _ => None,
272                }
273            }
274            AlignedParserState::InResource(builder) => {
275                match name {
276                    "Title" => {
277                        let title = create_localized_string(
278                            text_content,
279                            self.attributes.get("LanguageCode").cloned(),
280                        );
281                        builder.add_title(title);
282                        None
283                    }
284                    "Duration" => {
285                        builder.set_duration_from_text(text_content);
286                        None
287                    }
288                    "ResourceType" => {
289                        let resource_type = match text_content.as_str() {
290                            "SoundRecording" => ResourceType::SoundRecording,
291                            "Video" => ResourceType::Video,
292                            "Image" => ResourceType::Image,
293                            "Text" => ResourceType::Text,
294                            "SheetMusic" => ResourceType::SheetMusic,
295                            _ => ResourceType::SoundRecording, // Default
296                        };
297                        builder.set_resource_type(resource_type);
298                        None
299                    }
300                    "ISRC" => {
301                        let identifier = create_identifier(
302                            text_content,
303                            IdentifierType::ISRC,
304                            Some("ISRC".to_string()),
305                        );
306                        builder.add_identifier(identifier);
307                        None
308                    }
309                    "Resource" => {
310                        // Complete resource - use builder
311                        let builder =
312                            std::mem::replace(&mut self.state, AlignedParserState::Initial);
313                        if let AlignedParserState::InResource(resource_builder) = builder {
314                            match resource_builder.to_core() {
315                                Ok(resource) => Some(AlignedStreamingElement::Resource(resource)),
316                                Err(e) => {
317                                    warn!("Resource validation failed, skipping resource: {}", e);
318                                    None
319                                }
320                            }
321                        } else {
322                            None
323                        }
324                    }
325                    _ => None,
326                }
327            }
328            AlignedParserState::InParty(builder) => {
329                match name {
330                    "PartyName" => {
331                        let name = create_localized_string(
332                            text_content,
333                            self.attributes.get("LanguageCode").cloned(),
334                        );
335                        builder.add_name(name);
336                        None
337                    }
338                    "ISNI" => {
339                        builder.set_isni(text_content);
340                        None
341                    }
342                    "PartyRole" => {
343                        let role = match text_content.as_str() {
344                            "Artist" => PartyRole::Artist,
345                            "Producer" => PartyRole::Producer,
346                            "Composer" => PartyRole::Composer,
347                            "Lyricist" => PartyRole::Lyricist,
348                            "Publisher" => PartyRole::Publisher,
349                            "Performer" => PartyRole::Performer,
350                            "Engineer" => PartyRole::Engineer,
351                            "Label" => PartyRole::Label,
352                            "Distributor" => PartyRole::Distributor,
353                            _ => PartyRole::Other(text_content),
354                        };
355                        builder.add_role(role);
356                        None
357                    }
358                    "Party" => {
359                        // Complete party - use builder
360                        let builder =
361                            std::mem::replace(&mut self.state, AlignedParserState::Initial);
362                        if let AlignedParserState::InParty(party_builder) = builder {
363                            match party_builder.to_core() {
364                                Ok(party) => Some(AlignedStreamingElement::Party(party)),
365                                Err(e) => {
366                                    warn!("Party validation failed, skipping party: {}", e);
367                                    None
368                                }
369                            }
370                        } else {
371                            None
372                        }
373                    }
374                    _ => None,
375                }
376            }
377            _ => None,
378        };
379
380        self.current_depth = self.current_depth.saturating_sub(1);
381        self.current_path.pop();
382        self.text_buffer.clear();
383
384        Ok(result)
385    }
386
387    fn create_fallback_header(&self) -> MessageHeader {
388        MessageHeader {
389            message_id: "FALLBACK_MSG".to_string(),
390            message_type: MessageType::NewReleaseMessage,
391            message_created_date_time: chrono::Utc::now(),
392            message_sender: create_message_sender("Unknown Sender".to_string(), None),
393            message_recipient: create_message_recipient("Unknown Recipient".to_string()),
394            message_control_type: None,
395            message_thread_id: None,
396            attributes: None,
397            extensions: None,
398            comments: None,
399        }
400    }
401
402    fn get_current_location(&self) -> String {
403        format!("aligned_streaming:{}:0", self.bytes_processed)
404    }
405
406    pub fn stats(&self) -> AlignedStats {
407        AlignedStats {
408            bytes_processed: self.bytes_processed,
409            elements_yielded: self.elements_yielded,
410            current_depth: self.current_depth,
411            elapsed: self.start_time.elapsed(),
412        }
413    }
414}
415
416/// Iterator wrapper for aligned streaming parser
417pub struct AlignedStreamIterator<R: BufRead> {
418    parser: AlignedStreamingParser<R>,
419    finished: bool,
420}
421
422impl<R: BufRead> AlignedStreamIterator<R> {
423    pub fn new(reader: R, version: ERNVersion) -> Self {
424        Self {
425            parser: AlignedStreamingParser::new(reader, version),
426            finished: false,
427        }
428    }
429
430    pub fn stats(&self) -> AlignedStats {
431        self.parser.stats()
432    }
433}
434
435impl<R: BufRead> Iterator for AlignedStreamIterator<R> {
436    type Item = Result<AlignedStreamingElement, ParseError>;
437
438    fn next(&mut self) -> Option<Self::Item> {
439        if self.finished {
440            return None;
441        }
442
443        match self.parser.parse_next() {
444            Ok(Some(element)) => {
445                if matches!(element, AlignedStreamingElement::EndOfStream) {
446                    self.finished = true;
447                }
448                Some(Ok(element))
449            }
450            Ok(None) => {
451                self.finished = true;
452                None
453            }
454            Err(e) => {
455                self.finished = true;
456                Some(Err(e))
457            }
458        }
459    }
460}
461
462#[derive(Debug, Clone)]
463pub struct AlignedStats {
464    pub bytes_processed: u64,
465    pub elements_yielded: usize,
466    pub current_depth: usize,
467    pub elapsed: std::time::Duration,
468}
469
470impl AlignedStats {
471    pub fn throughput_mibs(&self) -> f64 {
472        if self.elapsed.as_secs_f64() > 0.0 {
473            (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
474        } else {
475            0.0
476        }
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use super::*;
483    use std::io::Cursor;
484
485    #[test]
486    fn test_aligned_streaming_parser_with_builders() {
487        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
488<ERNMessage xmlns="http://ddex.net/xml/ern/43">
489    <MessageHeader>
490        <MessageId>UMG-2024-NEW-RELEASE-001</MessageId>
491        <MessageCreatedDateTime>2024-03-15T14:30:00Z</MessageCreatedDateTime>
492        <MessageSender>Universal Music Group</MessageSender>
493        <MessageRecipient>Spotify Technology</MessageRecipient>
494    </MessageHeader>
495    <Release ReleaseReference="TAYLOR_SWIFT_MIDNIGHTS_DELUXE">
496        <ReleaseTitle>Midnights (3am Edition)</ReleaseTitle>
497        <Genre>Pop</Genre>
498        <ReleaseType>Album</ReleaseType>
499        <DisplayArtist>Taylor Swift</DisplayArtist>
500    </Release>
501    <Resource ResourceReference="ANTI_HERO_TRACK">
502        <Title>Anti-Hero</Title>
503        <Duration>200</Duration>
504        <ResourceType>SoundRecording</ResourceType>
505        <ISRC>USUA12204925</ISRC>
506    </Resource>
507    <Party PartyReference="TAYLOR_SWIFT_ARTIST">
508        <PartyName>Taylor Swift</PartyName>
509        <PartyRole>Artist</PartyRole>
510        <ISNI>0000000368570204</ISNI>
511    </Party>
512</ERNMessage>"#;
513
514        let cursor = Cursor::new(xml.as_bytes());
515        let iterator = AlignedStreamIterator::new(cursor, ERNVersion::V4_3);
516
517        let elements: Result<Vec<_>, _> = iterator.collect();
518        assert!(elements.is_ok());
519
520        let elements = elements.unwrap();
521        assert!(elements.len() >= 4); // Header, Release, Resource, Party, EndOfStream
522
523        // Verify proper type construction
524        let has_header = elements
525            .iter()
526            .any(|e| matches!(e, AlignedStreamingElement::Header(_)));
527        let has_release = elements
528            .iter()
529            .any(|e| matches!(e, AlignedStreamingElement::Release(_)));
530        let has_resource = elements
531            .iter()
532            .any(|e| matches!(e, AlignedStreamingElement::Resource(_)));
533        let has_party = elements
534            .iter()
535            .any(|e| matches!(e, AlignedStreamingElement::Party(_)));
536
537        assert!(
538            has_header,
539            "Should parse message header using MessageHeaderBuilder"
540        );
541        assert!(has_release, "Should parse release using ReleaseBuilder");
542        assert!(has_resource, "Should parse resource using ResourceBuilder");
543        assert!(has_party, "Should parse party using PartyBuilder");
544
545        // Verify field mapping
546        for element in &elements {
547            match element {
548                AlignedStreamingElement::Header(header) => {
549                    assert_eq!(header.message_id, "UMG-2024-NEW-RELEASE-001");
550                    assert_eq!(header.message_sender.party_name[0].text, "Universal Music Group");
551                }
552                AlignedStreamingElement::Release(release) => {
553                    assert_eq!(release.release_reference, "TAYLOR_SWIFT_MIDNIGHTS_DELUXE");
554                    assert_eq!(release.release_title[0].text, "Midnights (3am Edition)");
555                    assert_eq!(release.genre[0].genre_text, "Pop");
556                    assert_eq!(release.release_type, Some(ReleaseType::Album));
557                }
558                AlignedStreamingElement::Resource(resource) => {
559                    assert_eq!(resource.resource_reference, "ANTI_HERO_TRACK");
560                    assert_eq!(resource.reference_title[0].text, "Anti-Hero");
561                    assert_eq!(resource.duration, Some(std::time::Duration::from_secs(200)));
562                    assert_eq!(resource.resource_type, ResourceType::SoundRecording);
563                }
564                AlignedStreamingElement::Party(party) => {
565                    assert_eq!(party.party_name[0].text, "Taylor Swift");
566                    assert_eq!(party.isni, Some("0000000368570204".to_string())); // Real ISNI for Taylor Swift
567                    assert!(party.party_role.contains(&PartyRole::Artist));
568                }
569                _ => {}
570            }
571        }
572    }
573
574    #[test]
575    fn test_builder_validation() {
576        let xml = r#"<?xml version="1.0"?>
577<ERNMessage>
578    <Release>
579        <!-- Missing required fields -->
580    </Release>
581</ERNMessage>"#;
582
583        let cursor = Cursor::new(xml.as_bytes());
584        let mut iterator = AlignedStreamIterator::new(cursor, ERNVersion::V4_3);
585
586        // Should handle validation gracefully
587        let elements: Vec<_> = iterator.collect();
588        // Should not crash despite missing required fields
589        assert!(!elements.is_empty());
590    }
591
592    #[test]
593    fn test_conversion_traits() {
594        // Test ToCore trait
595        let mut builder = ReleaseBuilder::new("FOLKLORE_DELUXE".to_string());
596        builder.add_title(create_localized_string("Folklore (Deluxe Version)".to_string(), None));
597
598        let release = builder.to_core().unwrap();
599        assert_eq!(release.release_reference, "FOLKLORE_DELUXE");
600        assert_eq!(release.release_title[0].text, "Folklore (Deluxe Version)");
601
602        // Test Validate trait
603        let empty_builder = ReleaseBuilder::default();
604        assert!(!empty_builder.is_complete());
605        assert!(empty_builder.validate().is_err());
606    }
607}