1#[allow(dead_code)] use crate::error::ParseError;
6use ddex_core::models::streaming_types::builders::*;
7use ddex_core::models::streaming_types::*;
8use ddex_core::models::IdentifierType;
9use ddex_core::models::{graph::*, versions::ERNVersion};
10use log::warn;
11use quick_xml::{events::{Event, BytesStart}, Reader};
12use std::collections::HashMap;
13use std::io::BufRead;
14use std::time::Instant;
15
16#[derive(Debug, Clone)]
18pub enum AlignedStreamingElement {
19 Header(Box<MessageHeader>),
20 Release(Release),
21 Resource(Resource),
22 Party(Party),
23 EndOfStream,
24}
25
26#[derive(Debug)]
28enum AlignedParserState {
29 Initial,
30 InHeader(Box<MessageHeaderBuilder>),
31 InRelease(Box<ReleaseBuilder>),
32 InResource(Box<ResourceBuilder>),
33 InParty(Box<PartyBuilder>),
34 Complete,
35}
36
37pub struct AlignedStreamingParser<R: BufRead> {
39 reader: Reader<R>,
40 buffer: Vec<u8>,
41 state: AlignedParserState,
42 current_path: Vec<String>,
43 current_depth: usize,
44 text_buffer: String,
45 attributes: HashMap<String, String>,
46 bytes_processed: u64,
47 elements_yielded: usize,
48 start_time: Instant,
49}
50
51impl<R: BufRead> AlignedStreamingParser<R> {
52 pub fn new(reader: R, _version: ERNVersion) -> Self {
53 let mut xml_reader = Reader::from_reader(reader);
54 xml_reader.config_mut().trim_text(true);
55 xml_reader.config_mut().check_end_names = true;
56
57 Self {
58 reader: xml_reader,
59 buffer: Vec::with_capacity(8192),
60 state: AlignedParserState::Initial,
61 current_path: Vec::new(),
62 current_depth: 0,
63 text_buffer: String::new(),
64 attributes: HashMap::new(),
65 bytes_processed: 0,
66 elements_yielded: 0,
67 start_time: Instant::now(),
68 }
69 }
70
71 pub fn parse_next(&mut self) -> Result<Option<AlignedStreamingElement>, ParseError> {
72 loop {
73 self.buffer.clear();
74 let event = self.reader.read_event_into(&mut self.buffer)?;
75 match event {
76 Event::Start(e) => {
77 let name_bytes = e.name();
78 let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
79
80 let mut attributes = HashMap::new();
82 for attr_result in e.attributes() {
83 let attr = attr_result.map_err(|e| ParseError::XmlError(format!("Attribute error: {}", e)))?;
84
85 let key = std::str::from_utf8(attr.key.as_ref())?;
86 let value = std::str::from_utf8(&attr.value)?;
87
88 attributes.insert(key.to_string(), value.to_string());
89 }
90
91 self.attributes = attributes;
93
94 self.handle_start_element_by_name(&name)?;
95 }
96 Event::End(e) => {
97 let name_bytes = e.name();
98 let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
99 if let Some(element) = self.handle_end_element_by_name(&name)? {
100 self.elements_yielded += 1;
101 return Ok(Some(element));
102 }
103 }
104 Event::Text(e) => {
105 let text = std::str::from_utf8(&e)?;
106 self.text_buffer.push_str(text.trim());
107 }
108 Event::Eof => {
109 return Ok(Some(AlignedStreamingElement::EndOfStream));
110 }
111 _ => {
112 }
114 }
115
116 self.bytes_processed = self.reader.buffer_position();
117
118 if self.current_depth > 100 {
120 return Err(ParseError::SecurityViolation {
121 message: "Nesting depth exceeds 100 levels".to_string(),
122 });
123 }
124
125 self.buffer.clear();
126 }
127 }
128
129
130 fn handle_start_element_by_name(&mut self, name: &str) -> Result<(), ParseError> {
131 self.current_path.push(name.to_string());
132 self.current_depth += 1;
133
134 self.text_buffer.clear();
135
136 match (&self.state, name) {
138 (AlignedParserState::Initial, "MessageHeader") => {
139 self.state = AlignedParserState::InHeader(Box::new(MessageHeaderBuilder::new()));
140 }
141 (AlignedParserState::Initial, "Release") => {
142 let reference = self
143 .attributes
144 .get("ReleaseReference")
145 .cloned()
146 .unwrap_or_else(|| format!("REL_{}", self.elements_yielded));
147 self.state =
148 AlignedParserState::InRelease(Box::new(ReleaseBuilder::new(reference)));
149 }
150 (AlignedParserState::Initial, "Resource") => {
151 let reference = self
152 .attributes
153 .get("ResourceReference")
154 .cloned()
155 .unwrap_or_else(|| format!("RES_{}", self.elements_yielded));
156 self.state =
157 AlignedParserState::InResource(Box::new(ResourceBuilder::new(reference)));
158 }
159 (AlignedParserState::Initial, "Party") => {
160 let reference = self.attributes.get("PartyReference").cloned();
161 self.state = AlignedParserState::InParty(Box::new(PartyBuilder::new(reference)));
162 }
163 _ => {
164 }
166 }
167
168 Ok(())
169 }
170
171 fn handle_end_element_by_name(
172 &mut self,
173 name: &str,
174 ) -> Result<Option<AlignedStreamingElement>, ParseError> {
175 let text_content = self.text_buffer.clone();
176
177 let result = match &mut self.state {
178 AlignedParserState::InHeader(builder) => {
179 match name {
180 "MessageId" => {
181 builder.set_message_id(text_content);
182 None
183 }
184 "MessageCreatedDateTime" => {
185 builder.set_created_date_time_from_text(text_content);
186 None
187 }
188 "MessageSender" => {
189 let sender = create_message_sender(
191 text_content.clone(),
192 Some(format!("SENDER_{}", self.elements_yielded)),
193 );
194 builder.set_sender(sender);
195 None
196 }
197 "MessageRecipient" => {
198 let recipient = create_message_recipient(text_content);
199 builder.set_recipient(recipient);
200 None
201 }
202 "MessageHeader" => {
203 let builder =
205 std::mem::replace(&mut self.state, AlignedParserState::Initial);
206 if let AlignedParserState::InHeader(header_builder) = builder {
207 match header_builder.to_core() {
208 Ok(header) => {
209 Some(AlignedStreamingElement::Header(Box::new(header)))
210 }
211 Err(e) => {
212 warn!("Header validation failed, using fallback: {}", e);
213 let header = self.create_fallback_header();
215 Some(AlignedStreamingElement::Header(Box::new(header)))
216 }
217 }
218 } else {
219 None
220 }
221 }
222 _ => None,
223 }
224 }
225 AlignedParserState::InRelease(builder) => {
226 match name {
227 "ReleaseTitle" => {
228 let title = create_localized_string(
229 text_content,
230 self.attributes.get("LanguageCode").cloned(),
231 );
232 builder.add_title(title);
233 None
234 }
235 "Genre" => {
236 let genre = create_genre(text_content, None);
237 builder.add_genre(genre);
238 None
239 }
240 "DisplayArtist" => {
241 let artist = create_artist(text_content, "MainArtist".to_string(), None);
242 builder.add_artist(artist);
243 None
244 }
245 "ReleaseType" => {
246 let release_type = match text_content.as_str() {
247 "Album" => ReleaseType::Album,
248 "Single" => ReleaseType::Single,
249 "EP" => ReleaseType::EP,
250 _ => ReleaseType::Other(text_content),
251 };
252 builder.set_release_type(release_type);
253 None
254 }
255 "Release" => {
256 let builder =
258 std::mem::replace(&mut self.state, AlignedParserState::Initial);
259 if let AlignedParserState::InRelease(release_builder) = builder {
260 match release_builder.to_core() {
261 Ok(release) => Some(AlignedStreamingElement::Release(release)),
262 Err(e) => {
263 warn!("Release validation failed, skipping release: {}", e);
264 None
265 }
266 }
267 } else {
268 None
269 }
270 }
271 _ => None,
272 }
273 }
274 AlignedParserState::InResource(builder) => {
275 match name {
276 "Title" => {
277 let title = create_localized_string(
278 text_content,
279 self.attributes.get("LanguageCode").cloned(),
280 );
281 builder.add_title(title);
282 None
283 }
284 "Duration" => {
285 builder.set_duration_from_text(text_content);
286 None
287 }
288 "ResourceType" => {
289 let resource_type = match text_content.as_str() {
290 "SoundRecording" => ResourceType::SoundRecording,
291 "Video" => ResourceType::Video,
292 "Image" => ResourceType::Image,
293 "Text" => ResourceType::Text,
294 "SheetMusic" => ResourceType::SheetMusic,
295 _ => ResourceType::SoundRecording, };
297 builder.set_resource_type(resource_type);
298 None
299 }
300 "ISRC" => {
301 let identifier = create_identifier(
302 text_content,
303 IdentifierType::ISRC,
304 Some("ISRC".to_string()),
305 );
306 builder.add_identifier(identifier);
307 None
308 }
309 "Resource" => {
310 let builder =
312 std::mem::replace(&mut self.state, AlignedParserState::Initial);
313 if let AlignedParserState::InResource(resource_builder) = builder {
314 match resource_builder.to_core() {
315 Ok(resource) => Some(AlignedStreamingElement::Resource(resource)),
316 Err(e) => {
317 warn!("Resource validation failed, skipping resource: {}", e);
318 None
319 }
320 }
321 } else {
322 None
323 }
324 }
325 _ => None,
326 }
327 }
328 AlignedParserState::InParty(builder) => {
329 match name {
330 "PartyName" => {
331 let name = create_localized_string(
332 text_content,
333 self.attributes.get("LanguageCode").cloned(),
334 );
335 builder.add_name(name);
336 None
337 }
338 "ISNI" => {
339 builder.set_isni(text_content);
340 None
341 }
342 "PartyRole" => {
343 let role = match text_content.as_str() {
344 "Artist" => PartyRole::Artist,
345 "Producer" => PartyRole::Producer,
346 "Composer" => PartyRole::Composer,
347 "Lyricist" => PartyRole::Lyricist,
348 "Publisher" => PartyRole::Publisher,
349 "Performer" => PartyRole::Performer,
350 "Engineer" => PartyRole::Engineer,
351 "Label" => PartyRole::Label,
352 "Distributor" => PartyRole::Distributor,
353 _ => PartyRole::Other(text_content),
354 };
355 builder.add_role(role);
356 None
357 }
358 "Party" => {
359 let builder =
361 std::mem::replace(&mut self.state, AlignedParserState::Initial);
362 if let AlignedParserState::InParty(party_builder) = builder {
363 match party_builder.to_core() {
364 Ok(party) => Some(AlignedStreamingElement::Party(party)),
365 Err(e) => {
366 warn!("Party validation failed, skipping party: {}", e);
367 None
368 }
369 }
370 } else {
371 None
372 }
373 }
374 _ => None,
375 }
376 }
377 _ => None,
378 };
379
380 self.current_depth = self.current_depth.saturating_sub(1);
381 self.current_path.pop();
382 self.text_buffer.clear();
383
384 Ok(result)
385 }
386
387 fn create_fallback_header(&self) -> MessageHeader {
388 MessageHeader {
389 message_id: "FALLBACK_MSG".to_string(),
390 message_type: MessageType::NewReleaseMessage,
391 message_created_date_time: chrono::Utc::now(),
392 message_sender: create_message_sender("Unknown Sender".to_string(), None),
393 message_recipient: create_message_recipient("Unknown Recipient".to_string()),
394 message_control_type: None,
395 message_thread_id: None,
396 attributes: None,
397 extensions: None,
398 comments: None,
399 }
400 }
401
402 fn get_current_location(&self) -> String {
403 format!("aligned_streaming:{}:0", self.bytes_processed)
404 }
405
406 pub fn stats(&self) -> AlignedStats {
407 AlignedStats {
408 bytes_processed: self.bytes_processed,
409 elements_yielded: self.elements_yielded,
410 current_depth: self.current_depth,
411 elapsed: self.start_time.elapsed(),
412 }
413 }
414}
415
416pub struct AlignedStreamIterator<R: BufRead> {
418 parser: AlignedStreamingParser<R>,
419 finished: bool,
420}
421
422impl<R: BufRead> AlignedStreamIterator<R> {
423 pub fn new(reader: R, version: ERNVersion) -> Self {
424 Self {
425 parser: AlignedStreamingParser::new(reader, version),
426 finished: false,
427 }
428 }
429
430 pub fn stats(&self) -> AlignedStats {
431 self.parser.stats()
432 }
433}
434
435impl<R: BufRead> Iterator for AlignedStreamIterator<R> {
436 type Item = Result<AlignedStreamingElement, ParseError>;
437
438 fn next(&mut self) -> Option<Self::Item> {
439 if self.finished {
440 return None;
441 }
442
443 match self.parser.parse_next() {
444 Ok(Some(element)) => {
445 if matches!(element, AlignedStreamingElement::EndOfStream) {
446 self.finished = true;
447 }
448 Some(Ok(element))
449 }
450 Ok(None) => {
451 self.finished = true;
452 None
453 }
454 Err(e) => {
455 self.finished = true;
456 Some(Err(e))
457 }
458 }
459 }
460}
461
462#[derive(Debug, Clone)]
463pub struct AlignedStats {
464 pub bytes_processed: u64,
465 pub elements_yielded: usize,
466 pub current_depth: usize,
467 pub elapsed: std::time::Duration,
468}
469
470impl AlignedStats {
471 pub fn throughput_mibs(&self) -> f64 {
472 if self.elapsed.as_secs_f64() > 0.0 {
473 (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
474 } else {
475 0.0
476 }
477 }
478}
479
480#[cfg(test)]
481mod tests {
482 use super::*;
483 use std::io::Cursor;
484
485 #[test]
486 fn test_aligned_streaming_parser_with_builders() {
487 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
488<ERNMessage xmlns="http://ddex.net/xml/ern/43">
489 <MessageHeader>
490 <MessageId>UMG-2024-NEW-RELEASE-001</MessageId>
491 <MessageCreatedDateTime>2024-03-15T14:30:00Z</MessageCreatedDateTime>
492 <MessageSender>Universal Music Group</MessageSender>
493 <MessageRecipient>Spotify Technology</MessageRecipient>
494 </MessageHeader>
495 <Release ReleaseReference="TAYLOR_SWIFT_MIDNIGHTS_DELUXE">
496 <ReleaseTitle>Midnights (3am Edition)</ReleaseTitle>
497 <Genre>Pop</Genre>
498 <ReleaseType>Album</ReleaseType>
499 <DisplayArtist>Taylor Swift</DisplayArtist>
500 </Release>
501 <Resource ResourceReference="ANTI_HERO_TRACK">
502 <Title>Anti-Hero</Title>
503 <Duration>200</Duration>
504 <ResourceType>SoundRecording</ResourceType>
505 <ISRC>USUA12204925</ISRC>
506 </Resource>
507 <Party PartyReference="TAYLOR_SWIFT_ARTIST">
508 <PartyName>Taylor Swift</PartyName>
509 <PartyRole>Artist</PartyRole>
510 <ISNI>0000000368570204</ISNI>
511 </Party>
512</ERNMessage>"#;
513
514 let cursor = Cursor::new(xml.as_bytes());
515 let iterator = AlignedStreamIterator::new(cursor, ERNVersion::V4_3);
516
517 let elements: Result<Vec<_>, _> = iterator.collect();
518 assert!(elements.is_ok());
519
520 let elements = elements.unwrap();
521 assert!(elements.len() >= 4); let has_header = elements
525 .iter()
526 .any(|e| matches!(e, AlignedStreamingElement::Header(_)));
527 let has_release = elements
528 .iter()
529 .any(|e| matches!(e, AlignedStreamingElement::Release(_)));
530 let has_resource = elements
531 .iter()
532 .any(|e| matches!(e, AlignedStreamingElement::Resource(_)));
533 let has_party = elements
534 .iter()
535 .any(|e| matches!(e, AlignedStreamingElement::Party(_)));
536
537 assert!(
538 has_header,
539 "Should parse message header using MessageHeaderBuilder"
540 );
541 assert!(has_release, "Should parse release using ReleaseBuilder");
542 assert!(has_resource, "Should parse resource using ResourceBuilder");
543 assert!(has_party, "Should parse party using PartyBuilder");
544
545 for element in &elements {
547 match element {
548 AlignedStreamingElement::Header(header) => {
549 assert_eq!(header.message_id, "UMG-2024-NEW-RELEASE-001");
550 assert_eq!(header.message_sender.party_name[0].text, "Universal Music Group");
551 }
552 AlignedStreamingElement::Release(release) => {
553 assert_eq!(release.release_reference, "TAYLOR_SWIFT_MIDNIGHTS_DELUXE");
554 assert_eq!(release.release_title[0].text, "Midnights (3am Edition)");
555 assert_eq!(release.genre[0].genre_text, "Pop");
556 assert_eq!(release.release_type, Some(ReleaseType::Album));
557 }
558 AlignedStreamingElement::Resource(resource) => {
559 assert_eq!(resource.resource_reference, "ANTI_HERO_TRACK");
560 assert_eq!(resource.reference_title[0].text, "Anti-Hero");
561 assert_eq!(resource.duration, Some(std::time::Duration::from_secs(200)));
562 assert_eq!(resource.resource_type, ResourceType::SoundRecording);
563 }
564 AlignedStreamingElement::Party(party) => {
565 assert_eq!(party.party_name[0].text, "Taylor Swift");
566 assert_eq!(party.isni, Some("0000000368570204".to_string())); assert!(party.party_role.contains(&PartyRole::Artist));
568 }
569 _ => {}
570 }
571 }
572 }
573
574 #[test]
575 fn test_builder_validation() {
576 let xml = r#"<?xml version="1.0"?>
577<ERNMessage>
578 <Release>
579 <!-- Missing required fields -->
580 </Release>
581</ERNMessage>"#;
582
583 let cursor = Cursor::new(xml.as_bytes());
584 let mut iterator = AlignedStreamIterator::new(cursor, ERNVersion::V4_3);
585
586 let elements: Vec<_> = iterator.collect();
588 assert!(!elements.is_empty());
590 }
591
592 #[test]
593 fn test_conversion_traits() {
594 let mut builder = ReleaseBuilder::new("FOLKLORE_DELUXE".to_string());
596 builder.add_title(create_localized_string("Folklore (Deluxe Version)".to_string(), None));
597
598 let release = builder.to_core().unwrap();
599 assert_eq!(release.release_reference, "FOLKLORE_DELUXE");
600 assert_eq!(release.release_title[0].text, "Folklore (Deluxe Version)");
601
602 let empty_builder = ReleaseBuilder::default();
604 assert!(!empty_builder.is_complete());
605 assert!(empty_builder.validate().is_err());
606 }
607}