1#[allow(dead_code)] use crate::error::ParseError;
6use ddex_core::models::streaming_types::builders::*;
7use ddex_core::models::streaming_types::*;
8use ddex_core::models::IdentifierType;
9use ddex_core::models::{graph::*, versions::ERNVersion};
10use quick_xml::{events::{Event, BytesStart}, Reader};
11use std::collections::HashMap;
12use std::io::BufRead;
13use std::time::Instant;
14
15#[derive(Debug, Clone)]
17pub enum AlignedStreamingElement {
18 Header(Box<MessageHeader>),
19 Release(Release),
20 Resource(Resource),
21 Party(Party),
22 EndOfStream,
23}
24
25#[derive(Debug)]
27enum AlignedParserState {
28 Initial,
29 InHeader(Box<MessageHeaderBuilder>),
30 InRelease(Box<ReleaseBuilder>),
31 InResource(Box<ResourceBuilder>),
32 InParty(Box<PartyBuilder>),
33 Complete,
34}
35
36pub struct AlignedStreamingParser<R: BufRead> {
38 reader: Reader<R>,
39 buffer: Vec<u8>,
40 state: AlignedParserState,
41 current_path: Vec<String>,
42 current_depth: usize,
43 text_buffer: String,
44 attributes: HashMap<String, String>,
45 bytes_processed: u64,
46 elements_yielded: usize,
47 start_time: Instant,
48}
49
50impl<R: BufRead> AlignedStreamingParser<R> {
51 pub fn new(reader: R, _version: ERNVersion) -> Self {
52 let mut xml_reader = Reader::from_reader(reader);
53 xml_reader.config_mut().trim_text(true);
54 xml_reader.config_mut().check_end_names = true;
55
56 Self {
57 reader: xml_reader,
58 buffer: Vec::with_capacity(8192),
59 state: AlignedParserState::Initial,
60 current_path: Vec::new(),
61 current_depth: 0,
62 text_buffer: String::new(),
63 attributes: HashMap::new(),
64 bytes_processed: 0,
65 elements_yielded: 0,
66 start_time: Instant::now(),
67 }
68 }
69
70 pub fn parse_next(&mut self) -> Result<Option<AlignedStreamingElement>, ParseError> {
71 loop {
72 self.buffer.clear();
73 let event = self.reader.read_event_into(&mut self.buffer)?;
74 match event {
75 Event::Start(e) => {
76 let name_bytes = e.name();
77 let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
78
79 let mut attributes = HashMap::new();
81 for attr_result in e.attributes() {
82 let attr = attr_result.map_err(|e| ParseError::XmlError(format!("Attribute error: {}", e)))?;
83
84 let key = std::str::from_utf8(attr.key.as_ref())?;
85 let value = std::str::from_utf8(&attr.value)?;
86
87 attributes.insert(key.to_string(), value.to_string());
88 }
89
90 self.attributes = attributes;
92
93 self.handle_start_element_by_name(&name)?;
94 }
95 Event::End(e) => {
96 let name_bytes = e.name();
97 let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
98 if let Some(element) = self.handle_end_element_by_name(&name)? {
99 self.elements_yielded += 1;
100 return Ok(Some(element));
101 }
102 }
103 Event::Text(e) => {
104 let text = std::str::from_utf8(&e)?;
105 self.text_buffer.push_str(text.trim());
106 }
107 Event::Eof => {
108 return Ok(Some(AlignedStreamingElement::EndOfStream));
109 }
110 _ => {
111 }
113 }
114
115 self.bytes_processed = self.reader.buffer_position();
116
117 if self.current_depth > 100 {
119 return Err(ParseError::SecurityViolation {
120 message: "Nesting depth exceeds 100 levels".to_string(),
121 });
122 }
123
124 self.buffer.clear();
125 }
126 }
127
128
129 fn handle_start_element_by_name(&mut self, name: &str) -> Result<(), ParseError> {
130 self.current_path.push(name.to_string());
131 self.current_depth += 1;
132
133 self.text_buffer.clear();
134
135 match (&self.state, name) {
137 (AlignedParserState::Initial, "MessageHeader") => {
138 self.state = AlignedParserState::InHeader(Box::new(MessageHeaderBuilder::new()));
139 }
140 (AlignedParserState::Initial, "Release") => {
141 let reference = self
142 .attributes
143 .get("ReleaseReference")
144 .cloned()
145 .unwrap_or_else(|| format!("REL_{}", self.elements_yielded));
146 self.state =
147 AlignedParserState::InRelease(Box::new(ReleaseBuilder::new(reference)));
148 }
149 (AlignedParserState::Initial, "Resource") => {
150 let reference = self
151 .attributes
152 .get("ResourceReference")
153 .cloned()
154 .unwrap_or_else(|| format!("RES_{}", self.elements_yielded));
155 self.state =
156 AlignedParserState::InResource(Box::new(ResourceBuilder::new(reference)));
157 }
158 (AlignedParserState::Initial, "Party") => {
159 let reference = self.attributes.get("PartyReference").cloned();
160 self.state = AlignedParserState::InParty(Box::new(PartyBuilder::new(reference)));
161 }
162 _ => {
163 }
165 }
166
167 Ok(())
168 }
169
170 fn handle_end_element_by_name(
171 &mut self,
172 name: &str,
173 ) -> Result<Option<AlignedStreamingElement>, ParseError> {
174 let text_content = self.text_buffer.clone();
175
176 let result = match &mut self.state {
177 AlignedParserState::InHeader(builder) => {
178 match name {
179 "MessageId" => {
180 builder.set_message_id(text_content);
181 None
182 }
183 "MessageCreatedDateTime" => {
184 builder.set_created_date_time_from_text(text_content);
185 None
186 }
187 "MessageSender" => {
188 let sender = create_message_sender(
190 text_content.clone(),
191 Some(format!("SENDER_{}", self.elements_yielded)),
192 );
193 builder.set_sender(sender);
194 None
195 }
196 "MessageRecipient" => {
197 let recipient = create_message_recipient(text_content);
198 builder.set_recipient(recipient);
199 None
200 }
201 "MessageHeader" => {
202 let builder =
204 std::mem::replace(&mut self.state, AlignedParserState::Initial);
205 if let AlignedParserState::InHeader(header_builder) = builder {
206 match header_builder.to_core() {
207 Ok(header) => {
208 Some(AlignedStreamingElement::Header(Box::new(header)))
209 }
210 Err(e) => {
211 eprintln!("Warning: Header validation failed: {}", e);
212 let header = self.create_fallback_header();
214 Some(AlignedStreamingElement::Header(Box::new(header)))
215 }
216 }
217 } else {
218 None
219 }
220 }
221 _ => None,
222 }
223 }
224 AlignedParserState::InRelease(builder) => {
225 match name {
226 "ReleaseTitle" => {
227 let title = create_localized_string(
228 text_content,
229 self.attributes.get("LanguageCode").cloned(),
230 );
231 builder.add_title(title);
232 None
233 }
234 "Genre" => {
235 let genre = create_genre(text_content, None);
236 builder.add_genre(genre);
237 None
238 }
239 "DisplayArtist" => {
240 let artist = create_artist(text_content, "MainArtist".to_string(), None);
241 builder.add_artist(artist);
242 None
243 }
244 "ReleaseType" => {
245 let release_type = match text_content.as_str() {
246 "Album" => ReleaseType::Album,
247 "Single" => ReleaseType::Single,
248 "EP" => ReleaseType::EP,
249 _ => ReleaseType::Other(text_content),
250 };
251 builder.set_release_type(release_type);
252 None
253 }
254 "Release" => {
255 let builder =
257 std::mem::replace(&mut self.state, AlignedParserState::Initial);
258 if let AlignedParserState::InRelease(release_builder) = builder {
259 match release_builder.to_core() {
260 Ok(release) => Some(AlignedStreamingElement::Release(release)),
261 Err(e) => {
262 eprintln!("Warning: Release validation failed: {}", e);
263 None
264 }
265 }
266 } else {
267 None
268 }
269 }
270 _ => None,
271 }
272 }
273 AlignedParserState::InResource(builder) => {
274 match name {
275 "Title" => {
276 let title = create_localized_string(
277 text_content,
278 self.attributes.get("LanguageCode").cloned(),
279 );
280 builder.add_title(title);
281 None
282 }
283 "Duration" => {
284 builder.set_duration_from_text(text_content);
285 None
286 }
287 "ResourceType" => {
288 let resource_type = match text_content.as_str() {
289 "SoundRecording" => ResourceType::SoundRecording,
290 "Video" => ResourceType::Video,
291 "Image" => ResourceType::Image,
292 "Text" => ResourceType::Text,
293 "SheetMusic" => ResourceType::SheetMusic,
294 _ => ResourceType::SoundRecording, };
296 builder.set_resource_type(resource_type);
297 None
298 }
299 "ISRC" => {
300 let identifier = create_identifier(
301 text_content,
302 IdentifierType::ISRC,
303 Some("ISRC".to_string()),
304 );
305 builder.add_identifier(identifier);
306 None
307 }
308 "Resource" => {
309 let builder =
311 std::mem::replace(&mut self.state, AlignedParserState::Initial);
312 if let AlignedParserState::InResource(resource_builder) = builder {
313 match resource_builder.to_core() {
314 Ok(resource) => Some(AlignedStreamingElement::Resource(resource)),
315 Err(e) => {
316 eprintln!("Warning: Resource validation failed: {}", e);
317 None
318 }
319 }
320 } else {
321 None
322 }
323 }
324 _ => None,
325 }
326 }
327 AlignedParserState::InParty(builder) => {
328 match name {
329 "PartyName" => {
330 let name = create_localized_string(
331 text_content,
332 self.attributes.get("LanguageCode").cloned(),
333 );
334 builder.add_name(name);
335 None
336 }
337 "ISNI" => {
338 builder.set_isni(text_content);
339 None
340 }
341 "PartyRole" => {
342 let role = match text_content.as_str() {
343 "Artist" => PartyRole::Artist,
344 "Producer" => PartyRole::Producer,
345 "Composer" => PartyRole::Composer,
346 "Lyricist" => PartyRole::Lyricist,
347 "Publisher" => PartyRole::Publisher,
348 "Performer" => PartyRole::Performer,
349 "Engineer" => PartyRole::Engineer,
350 "Label" => PartyRole::Label,
351 "Distributor" => PartyRole::Distributor,
352 _ => PartyRole::Other(text_content),
353 };
354 builder.add_role(role);
355 None
356 }
357 "Party" => {
358 let builder =
360 std::mem::replace(&mut self.state, AlignedParserState::Initial);
361 if let AlignedParserState::InParty(party_builder) = builder {
362 match party_builder.to_core() {
363 Ok(party) => Some(AlignedStreamingElement::Party(party)),
364 Err(e) => {
365 eprintln!("Warning: Party validation failed: {}", e);
366 None
367 }
368 }
369 } else {
370 None
371 }
372 }
373 _ => None,
374 }
375 }
376 _ => None,
377 };
378
379 self.current_depth = self.current_depth.saturating_sub(1);
380 self.current_path.pop();
381 self.text_buffer.clear();
382
383 Ok(result)
384 }
385
386 fn create_fallback_header(&self) -> MessageHeader {
387 MessageHeader {
388 message_id: "FALLBACK_MSG".to_string(),
389 message_type: MessageType::NewReleaseMessage,
390 message_created_date_time: chrono::Utc::now(),
391 message_sender: create_message_sender("Unknown Sender".to_string(), None),
392 message_recipient: create_message_recipient("Unknown Recipient".to_string()),
393 message_control_type: None,
394 message_thread_id: None,
395 attributes: None,
396 extensions: None,
397 comments: None,
398 }
399 }
400
401 fn get_current_location(&self) -> String {
402 format!("aligned_streaming:{}:0", self.bytes_processed)
403 }
404
405 pub fn stats(&self) -> AlignedStats {
406 AlignedStats {
407 bytes_processed: self.bytes_processed,
408 elements_yielded: self.elements_yielded,
409 current_depth: self.current_depth,
410 elapsed: self.start_time.elapsed(),
411 }
412 }
413}
414
415pub struct AlignedStreamIterator<R: BufRead> {
417 parser: AlignedStreamingParser<R>,
418 finished: bool,
419}
420
421impl<R: BufRead> AlignedStreamIterator<R> {
422 pub fn new(reader: R, version: ERNVersion) -> Self {
423 Self {
424 parser: AlignedStreamingParser::new(reader, version),
425 finished: false,
426 }
427 }
428
429 pub fn stats(&self) -> AlignedStats {
430 self.parser.stats()
431 }
432}
433
434impl<R: BufRead> Iterator for AlignedStreamIterator<R> {
435 type Item = Result<AlignedStreamingElement, ParseError>;
436
437 fn next(&mut self) -> Option<Self::Item> {
438 if self.finished {
439 return None;
440 }
441
442 match self.parser.parse_next() {
443 Ok(Some(element)) => {
444 if matches!(element, AlignedStreamingElement::EndOfStream) {
445 self.finished = true;
446 }
447 Some(Ok(element))
448 }
449 Ok(None) => {
450 self.finished = true;
451 None
452 }
453 Err(e) => {
454 self.finished = true;
455 Some(Err(e))
456 }
457 }
458 }
459}
460
461#[derive(Debug, Clone)]
462pub struct AlignedStats {
463 pub bytes_processed: u64,
464 pub elements_yielded: usize,
465 pub current_depth: usize,
466 pub elapsed: std::time::Duration,
467}
468
469impl AlignedStats {
470 pub fn throughput_mibs(&self) -> f64 {
471 if self.elapsed.as_secs_f64() > 0.0 {
472 (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
473 } else {
474 0.0
475 }
476 }
477}
478
479#[cfg(test)]
480mod tests {
481 use super::*;
482 use std::io::Cursor;
483
484 #[test]
485 fn test_aligned_streaming_parser_with_builders() {
486 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
487<ERNMessage xmlns="http://ddex.net/xml/ern/43">
488 <MessageHeader>
489 <MessageId>UMG-2024-NEW-RELEASE-001</MessageId>
490 <MessageCreatedDateTime>2024-03-15T14:30:00Z</MessageCreatedDateTime>
491 <MessageSender>Universal Music Group</MessageSender>
492 <MessageRecipient>Spotify Technology</MessageRecipient>
493 </MessageHeader>
494 <Release ReleaseReference="TAYLOR_SWIFT_MIDNIGHTS_DELUXE">
495 <ReleaseTitle>Midnights (3am Edition)</ReleaseTitle>
496 <Genre>Pop</Genre>
497 <ReleaseType>Album</ReleaseType>
498 <DisplayArtist>Taylor Swift</DisplayArtist>
499 </Release>
500 <Resource ResourceReference="ANTI_HERO_TRACK">
501 <Title>Anti-Hero</Title>
502 <Duration>200</Duration>
503 <ResourceType>SoundRecording</ResourceType>
504 <ISRC>USUA12204925</ISRC>
505 </Resource>
506 <Party PartyReference="TAYLOR_SWIFT_ARTIST">
507 <PartyName>Taylor Swift</PartyName>
508 <PartyRole>Artist</PartyRole>
509 <ISNI>0000000368570204</ISNI>
510 </Party>
511</ERNMessage>"#;
512
513 let cursor = Cursor::new(xml.as_bytes());
514 let iterator = AlignedStreamIterator::new(cursor, ERNVersion::V4_3);
515
516 let elements: Result<Vec<_>, _> = iterator.collect();
517 assert!(elements.is_ok());
518
519 let elements = elements.unwrap();
520 assert!(elements.len() >= 4); let has_header = elements
524 .iter()
525 .any(|e| matches!(e, AlignedStreamingElement::Header(_)));
526 let has_release = elements
527 .iter()
528 .any(|e| matches!(e, AlignedStreamingElement::Release(_)));
529 let has_resource = elements
530 .iter()
531 .any(|e| matches!(e, AlignedStreamingElement::Resource(_)));
532 let has_party = elements
533 .iter()
534 .any(|e| matches!(e, AlignedStreamingElement::Party(_)));
535
536 assert!(
537 has_header,
538 "Should parse message header using MessageHeaderBuilder"
539 );
540 assert!(has_release, "Should parse release using ReleaseBuilder");
541 assert!(has_resource, "Should parse resource using ResourceBuilder");
542 assert!(has_party, "Should parse party using PartyBuilder");
543
544 for element in &elements {
546 match element {
547 AlignedStreamingElement::Header(header) => {
548 assert_eq!(header.message_id, "UMG-2024-NEW-RELEASE-001");
549 assert_eq!(header.message_sender.party_name[0].text, "Universal Music Group");
550 }
551 AlignedStreamingElement::Release(release) => {
552 assert_eq!(release.release_reference, "TAYLOR_SWIFT_MIDNIGHTS_DELUXE");
553 assert_eq!(release.release_title[0].text, "Midnights (3am Edition)");
554 assert_eq!(release.genre[0].genre_text, "Pop");
555 assert_eq!(release.release_type, Some(ReleaseType::Album));
556 }
557 AlignedStreamingElement::Resource(resource) => {
558 assert_eq!(resource.resource_reference, "ANTI_HERO_TRACK");
559 assert_eq!(resource.reference_title[0].text, "Anti-Hero");
560 assert_eq!(resource.duration, Some(std::time::Duration::from_secs(200)));
561 assert_eq!(resource.resource_type, ResourceType::SoundRecording);
562 }
563 AlignedStreamingElement::Party(party) => {
564 assert_eq!(party.party_name[0].text, "Taylor Swift");
565 assert_eq!(party.isni, Some("0000000368570204".to_string())); assert!(party.party_role.contains(&PartyRole::Artist));
567 }
568 _ => {}
569 }
570 }
571 }
572
573 #[test]
574 fn test_builder_validation() {
575 let xml = r#"<?xml version="1.0"?>
576<ERNMessage>
577 <Release>
578 <!-- Missing required fields -->
579 </Release>
580</ERNMessage>"#;
581
582 let cursor = Cursor::new(xml.as_bytes());
583 let mut iterator = AlignedStreamIterator::new(cursor, ERNVersion::V4_3);
584
585 let elements: Vec<_> = iterator.collect();
587 assert!(!elements.is_empty());
589 }
590
591 #[test]
592 fn test_conversion_traits() {
593 let mut builder = ReleaseBuilder::new("FOLKLORE_DELUXE".to_string());
595 builder.add_title(create_localized_string("Folklore (Deluxe Version)".to_string(), None));
596
597 let release = builder.to_core().unwrap();
598 assert_eq!(release.release_reference, "FOLKLORE_DELUXE");
599 assert_eq!(release.release_title[0].text, "Folklore (Deluxe Version)");
600
601 let empty_builder = ReleaseBuilder::default();
603 assert!(!empty_builder.is_complete());
604 assert!(empty_builder.validate().is_err());
605 }
606}