1use crate::error::ParseError;
5use crate::parser::security::SecurityConfig;
6use ddex_core::models::graph::{
7 MessageHeader, MessageRecipient, MessageSender, MessageType, Release, ReleaseType,
8};
9use ddex_core::models::versions::ERNVersion;
10use ddex_core::models::{Identifier, IdentifierType, LocalizedString};
11use quick_xml::{events::Event, Reader};
12use std::collections::HashMap;
13use std::io::BufRead;
14
15#[derive(Debug, Clone)]
17pub struct MultiReleaseParser {
18 version: ERNVersion,
20 security_config: SecurityConfig,
22 detailed_parsing: bool,
24 max_releases: usize,
26 stats: MultiReleaseStats,
28}
29
30#[derive(Debug, Clone, Default)]
32pub struct MultiReleaseStats {
33 pub total_releases_found: usize,
34 pub releases_parsed: usize,
35 pub main_releases: usize,
36 pub secondary_releases: usize,
37 pub elements_processed: usize,
38 pub bytes_processed: usize,
39 pub parse_duration: std::time::Duration,
40 pub release_list_count: usize,
41}
42
43#[derive(Debug, Clone)]
45pub struct MultiReleaseResult {
46 pub releases: Vec<Release>,
48 pub stats: MultiReleaseStats,
50 pub message_header: Option<MessageHeader>,
52 pub release_count: usize,
54 pub release_references: Vec<String>,
56}
57
58#[derive(Debug, Clone)]
60#[allow(dead_code)]
61struct ReleaseContext {
62 release: Release,
63 depth: usize,
64 current_element_path: Vec<String>,
65 attributes: HashMap<String, String>,
66 is_main_release: Option<bool>,
67 position: usize,
68}
69
70impl MultiReleaseParser {
71 pub fn new(version: ERNVersion) -> Self {
73 Self {
74 version,
75 security_config: SecurityConfig::default(),
76 detailed_parsing: true,
77 max_releases: 0,
78 stats: MultiReleaseStats::default(),
79 }
80 }
81
82 pub fn with_security_config(version: ERNVersion, security_config: SecurityConfig) -> Self {
84 Self {
85 version,
86 security_config,
87 detailed_parsing: true,
88 max_releases: 0,
89 stats: MultiReleaseStats::default(),
90 }
91 }
92
93 pub fn detailed_parsing(mut self, enabled: bool) -> Self {
95 self.detailed_parsing = enabled;
96 self
97 }
98
99 pub fn max_releases(mut self, max: usize) -> Self {
101 self.max_releases = max;
102 self
103 }
104
105 pub fn count_releases<R: BufRead>(&mut self, reader: R) -> Result<usize, ParseError> {
107 let start_time = std::time::Instant::now();
108 let mut xml_reader = Reader::from_reader(reader);
109 xml_reader.config_mut().trim_text(false); let mut buf = Vec::new();
112 let mut release_count = 0;
113 let mut depth = 0;
114 let mut elements_processed = 0;
115
116 loop {
117 match xml_reader.read_event_into(&mut buf) {
118 Ok(Event::Start(ref e)) => {
119 elements_processed += 1;
120 depth += 1;
121
122 if depth > self.security_config.max_element_depth {
124 return Err(ParseError::DepthLimitExceeded {
125 depth,
126 max: self.security_config.max_element_depth,
127 });
128 }
129
130 let element_name = self.extract_element_name(e.name().as_ref())?;
131 if element_name == "Release" || element_name.ends_with(":Release") {
132 release_count += 1;
133
134 if self.max_releases > 0 && release_count >= self.max_releases {
136 break;
137 }
138 }
139 }
140 Ok(Event::End(_)) => {
141 depth = depth.saturating_sub(1);
142 }
143 Ok(Event::Empty(ref e)) => {
144 elements_processed += 1;
145 let element_name = self.extract_element_name(e.name().as_ref())?;
146 if element_name == "Release" || element_name.ends_with(":Release") {
147 release_count += 1;
148
149 if self.max_releases > 0 && release_count >= self.max_releases {
150 break;
151 }
152 }
153 }
154 Ok(Event::Eof) => break,
155 Err(e) => {
156 return Err(ParseError::XmlError {
157 message: format!("XML parsing error: {}", e),
158 location: crate::error::ErrorLocation {
159 line: 0,
160 column: 0,
161 byte_offset: Some(xml_reader.buffer_position() as usize),
162 path: "multi_release_counter".to_string(),
163 },
164 });
165 }
166 _ => {} }
168 buf.clear();
169 }
170
171 self.stats.total_releases_found = release_count;
173 self.stats.elements_processed = elements_processed;
174 self.stats.bytes_processed = xml_reader.buffer_position() as usize;
175 self.stats.parse_duration = start_time.elapsed();
176
177 Ok(release_count)
178 }
179
180 pub fn parse_releases<R: BufRead>(
182 &mut self,
183 reader: R,
184 ) -> Result<MultiReleaseResult, ParseError> {
185 let start_time = std::time::Instant::now();
186 let mut xml_reader = Reader::from_reader(reader);
187 xml_reader.config_mut().trim_text(true);
188 xml_reader.config_mut().check_end_names = true;
189
190 let mut releases = Vec::new();
191 let mut buf = Vec::new();
192 let mut current_context: Option<ReleaseContext> = None;
193 let mut depth = 0;
194 let mut elements_processed = 0;
195 let mut release_references = Vec::new();
196 let mut message_header: Option<MessageHeader> = None;
197 let mut in_release_list = false;
198 let mut release_list_count = 0;
199
200 loop {
201 match xml_reader.read_event_into(&mut buf) {
202 Ok(Event::Start(ref e)) => {
203 elements_processed += 1;
204 depth += 1;
205
206 if depth > self.security_config.max_element_depth {
208 return Err(ParseError::DepthLimitExceeded {
209 depth,
210 max: self.security_config.max_element_depth,
211 });
212 }
213
214 let element_name = self.extract_element_name(e.name().as_ref())?;
215
216 let mut attributes = HashMap::new();
218 for attr in e.attributes().flatten() {
219 let key = String::from_utf8_lossy(attr.key.as_ref()).to_string();
220 let value = String::from_utf8_lossy(&attr.value).to_string();
221 attributes.insert(key, value);
222 }
223
224 match element_name.as_str() {
225 "ReleaseList" | "ern:ReleaseList" => {
226 in_release_list = true;
227 release_list_count += 1;
228 }
229 "Release" | "ern:Release" if in_release_list => {
230 let is_main = attributes
232 .get("IsMainRelease")
233 .or_else(|| attributes.get("isMainRelease"))
234 .map(|v| v.to_lowercase() == "true");
235
236 let release = self.create_default_release();
237 current_context = Some(ReleaseContext {
238 release,
239 depth,
240 current_element_path: vec![element_name.clone()],
241 attributes: attributes.clone(),
242 is_main_release: is_main,
243 position: xml_reader.buffer_position() as usize,
244 });
245
246 if is_main.unwrap_or(false) {
247 self.stats.main_releases += 1;
248 } else {
249 self.stats.secondary_releases += 1;
250 }
251 }
252 "MessageHeader" | "ern:MessageHeader" if message_header.is_none() => {
253 if self.detailed_parsing {
255 message_header =
256 Some(self.parse_message_header(&mut xml_reader, &mut buf)?);
257 }
258 }
259 _ => {
260 if let Some(ref mut context) = current_context {
262 context.current_element_path.push(element_name.clone());
263 self.process_release_element(
264 context,
265 &element_name,
266 &attributes,
267 &mut xml_reader,
268 &mut buf,
269 )?;
270 }
271 }
272 }
273 }
274 Ok(Event::End(ref e)) => {
275 depth = depth.saturating_sub(1);
276 let element_name = self.extract_element_name(e.name().as_ref())?;
277
278 match element_name.as_str() {
279 "ReleaseList" | "ern:ReleaseList" => {
280 in_release_list = false;
281 }
282 "Release" | "ern:Release" => {
283 if let Some(context) = current_context.take() {
284 if let Some(reference) =
286 self.extract_release_reference(&context.release)
287 {
288 release_references.push(reference);
289 }
290
291 releases.push(context.release);
292 self.stats.releases_parsed += 1;
293
294 if self.max_releases > 0 && releases.len() >= self.max_releases {
296 break;
297 }
298 }
299 }
300 _ => {
301 if let Some(ref mut context) = current_context {
302 context.current_element_path.pop();
303 }
304 }
305 }
306 }
307 Ok(Event::Empty(ref e)) => {
308 elements_processed += 1;
309 let element_name = self.extract_element_name(e.name().as_ref())?;
310
311 if (element_name == "Release" || element_name.ends_with(":Release"))
313 && in_release_list
314 {
315 let mut attributes = HashMap::new();
316 for attr in e.attributes().flatten() {
317 let key = String::from_utf8_lossy(attr.key.as_ref()).to_string();
318 let value = String::from_utf8_lossy(&attr.value).to_string();
319 attributes.insert(key, value);
320 }
321
322 let is_main = attributes
323 .get("IsMainRelease")
324 .or_else(|| attributes.get("isMainRelease"))
325 .map(|v| v.to_lowercase() == "true");
326
327 let release = self.create_default_release();
328 releases.push(release);
329
330 if is_main.unwrap_or(false) {
331 self.stats.main_releases += 1;
332 } else {
333 self.stats.secondary_releases += 1;
334 }
335
336 self.stats.releases_parsed += 1;
337
338 if self.max_releases > 0 && releases.len() >= self.max_releases {
339 break;
340 }
341 }
342 }
343 Ok(Event::Text(ref e)) => {
344 if let Some(ref mut context) = current_context {
345 let current_pos = xml_reader.buffer_position() as usize;
347 let text = crate::utf8_utils::handle_text_node(e, current_pos)?
348 .trim()
349 .to_string();
350
351 if !text.is_empty() {
352 self.process_release_text_content(context, &text)?;
353 }
354 }
355 }
356 Ok(Event::Eof) => break,
357 Err(e) => {
358 return Err(ParseError::XmlError {
359 message: format!("XML parsing error: {}", e),
360 location: crate::error::ErrorLocation {
361 line: 0,
362 column: 0,
363 byte_offset: Some(xml_reader.buffer_position() as usize),
364 path: "multi_release_parser".to_string(),
365 },
366 });
367 }
368 _ => {} }
370 buf.clear();
371 }
372
373 self.stats.total_releases_found = self.stats.releases_parsed;
375 self.stats.elements_processed = elements_processed;
376 self.stats.bytes_processed = xml_reader.buffer_position() as usize;
377 self.stats.parse_duration = start_time.elapsed();
378 self.stats.release_list_count = release_list_count;
379
380 Ok(MultiReleaseResult {
381 releases,
382 stats: self.stats.clone(),
383 message_header,
384 release_count: self.stats.releases_parsed,
385 release_references,
386 })
387 }
388
389 fn extract_element_name(&self, qname: &[u8]) -> Result<String, ParseError> {
391 let name_str = std::str::from_utf8(qname).map_err(|_| ParseError::Io {
392 message: "Invalid UTF-8 in element name".to_string(),
393 })?;
394 Ok(name_str.to_string())
395 }
396
397 fn create_default_release(&self) -> Release {
399 Release {
400 release_reference: format!("REL_{:?}_{}", self.version, chrono::Utc::now().timestamp()),
401 release_id: Vec::new(),
402 release_title: vec![LocalizedString::new("Untitled Release".to_string())],
403 release_subtitle: None,
404 release_type: None,
405 genre: Vec::new(),
406 release_resource_reference_list: Vec::new(),
407 display_artist: Vec::new(),
408 party_list: Vec::new(),
409 release_date: Vec::new(),
410 territory_code: Vec::new(),
411 excluded_territory_code: Vec::new(),
412 extensions: None,
413 attributes: None,
414 comments: None,
415 }
416 }
417
418 fn parse_message_header<R: BufRead>(
420 &self,
421 _reader: &mut Reader<R>,
422 _buf: &mut [u8],
423 ) -> Result<MessageHeader, ParseError> {
424 Ok(MessageHeader {
426 message_id: format!("MSG_{:?}", self.version),
427 message_type: MessageType::NewReleaseMessage,
428 message_created_date_time: chrono::Utc::now(),
429 message_sender: MessageSender {
430 party_id: Vec::new(),
431 party_name: Vec::new(),
432 trading_name: None,
433 extensions: None,
434 attributes: None,
435 comments: None,
436 },
437 message_recipient: MessageRecipient {
438 party_id: Vec::new(),
439 party_name: Vec::new(),
440 trading_name: None,
441 extensions: None,
442 attributes: None,
443 comments: None,
444 },
445 message_control_type: None,
446 message_thread_id: Some("MULTI_RELEASE_THREAD".to_string()),
447 extensions: None,
448 attributes: None,
449 comments: None,
450 })
451 }
452
453 fn process_release_element(
455 &self,
456 context: &mut ReleaseContext,
457 element_name: &str,
458 attributes: &HashMap<String, String>,
459 _reader: &mut Reader<impl BufRead>,
460 _buf: &mut [u8],
461 ) -> Result<(), ParseError> {
462 match element_name {
464 "ReleaseReference" | "ern:ReleaseReference" => {
465 }
467 "ReleaseId" | "ern:ReleaseId" => {
468 }
470 "ReferenceTitle" | "ern:ReferenceTitle" => {
471 }
473 "TitleText" | "ern:TitleText" => {
474 }
476 "ReleaseType" | "ern:ReleaseType" => {
477 }
479 _ => {
480 }
482 }
483
484 for (key, value) in attributes {
486 context
487 .attributes
488 .insert(format!("{}:{}", element_name, key), value.clone());
489 }
490
491 Ok(())
492 }
493
494 fn process_release_text_content(
496 &self,
497 context: &mut ReleaseContext,
498 text: &str,
499 ) -> Result<(), ParseError> {
500 let current_path = context.current_element_path.join("/");
501
502 if current_path.contains("ReleaseReference") {
503 context.release.release_reference = text.to_string();
504 } else if current_path.contains("ReleaseId") {
505 context.release.release_id.push(Identifier {
507 id_type: IdentifierType::Proprietary,
508 namespace: None,
509 value: text.to_string(),
510 });
511 } else if current_path.contains("TitleText") {
512 if !context.release.release_title.is_empty() {
514 context.release.release_title[0] = LocalizedString::new(text.to_string());
515 } else {
516 context
517 .release
518 .release_title
519 .push(LocalizedString::new(text.to_string()));
520 }
521 } else if current_path.contains("ReleaseType") {
522 context.release.release_type = Some(match text {
523 "Album" => ReleaseType::Album,
524 "Single" => ReleaseType::Single,
525 "EP" => ReleaseType::EP,
526 "Compilation" => ReleaseType::Compilation,
527 other => ReleaseType::Other(other.to_string()),
528 });
529 }
530
531 Ok(())
532 }
533
534 fn extract_release_reference(&self, release: &Release) -> Option<String> {
536 if !release.release_reference.is_empty() {
537 Some(release.release_reference.clone())
538 } else if !release.release_id.is_empty() {
539 Some(release.release_id[0].value.clone())
540 } else {
541 None
542 }
543 }
544}
545
546#[cfg(test)]
547mod tests {
548 use super::*;
549 use std::io::Cursor;
550
551 #[test]
552 fn test_release_counting() {
553 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
554 <ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
555 <ern:MessageHeader>
556 <ern:MessageId>MSG001</ern:MessageId>
557 </ern:MessageHeader>
558 <ern:ReleaseList>
559 <ern:Release IsMainRelease="true">
560 <ern:ReleaseReference>REL001</ern:ReleaseReference>
561 <ern:ReferenceTitle>
562 <ern:TitleText>Album One</ern:TitleText>
563 </ern:ReferenceTitle>
564 </ern:Release>
565 <ern:Release IsMainRelease="false">
566 <ern:ReleaseReference>REL002</ern:ReleaseReference>
567 <ern:ReferenceTitle>
568 <ern:TitleText>Album Two</ern:TitleText>
569 </ern:ReferenceTitle>
570 </ern:Release>
571 <ern:Release>
572 <ern:ReleaseReference>REL003</ern:ReleaseReference>
573 <ern:ReferenceTitle>
574 <ern:TitleText>Album Three</ern:TitleText>
575 </ern:ReferenceTitle>
576 </ern:Release>
577 </ern:ReleaseList>
578 </ern:NewReleaseMessage>"#;
579
580 let cursor = Cursor::new(xml.as_bytes());
581 let mut parser = MultiReleaseParser::new(ERNVersion::V4_3);
582
583 let count = parser
584 .count_releases(cursor)
585 .expect("Should count releases");
586
587 assert_eq!(count, 3);
588 assert_eq!(parser.stats.total_releases_found, 3);
589 assert!(parser.stats.elements_processed > 0);
590 assert!(parser.stats.bytes_processed > 0);
591 }
592
593 #[test]
594 fn test_multi_release_parsing() {
595 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
596 <ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
597 <ern:MessageHeader>
598 <ern:MessageId>MSG001</ern:MessageId>
599 <ern:MessageSender>
600 <ern:PartyName>Test Label</ern:PartyName>
601 </ern:MessageSender>
602 <ern:MessageRecipient>
603 <ern:PartyName>Test Recipient</ern:PartyName>
604 </ern:MessageRecipient>
605 <ern:MessageCreatedDateTime>2024-01-15T10:30:00Z</ern:MessageCreatedDateTime>
606 </ern:MessageHeader>
607 <ern:ReleaseList>
608 <ern:Release IsMainRelease="true">
609 <ern:ReleaseReference>MAIN_RELEASE_001</ern:ReleaseReference>
610 <ern:ReleaseId Namespace="GRid">A1-123456789-1234567890-A</ern:ReleaseId>
611 <ern:ReferenceTitle>
612 <ern:TitleText>My Main Album</ern:TitleText>
613 </ern:ReferenceTitle>
614 <ern:ReleaseType>Album</ern:ReleaseType>
615 </ern:Release>
616 <ern:Release IsMainRelease="false">
617 <ern:ReleaseReference>SECONDARY_RELEASE_002</ern:ReleaseReference>
618 <ern:ReleaseId>REL_SEC_002</ern:ReleaseId>
619 <ern:ReferenceTitle>
620 <ern:TitleText>Bonus Tracks</ern:TitleText>
621 </ern:ReferenceTitle>
622 <ern:ReleaseType>EP</ern:ReleaseType>
623 </ern:Release>
624 </ern:ReleaseList>
625 </ern:NewReleaseMessage>"#;
626
627 let cursor = Cursor::new(xml.as_bytes());
628 let mut parser = MultiReleaseParser::new(ERNVersion::V4_3).detailed_parsing(true);
629
630 let result = parser
631 .parse_releases(cursor)
632 .expect("Should parse releases");
633
634 assert_eq!(result.releases.len(), 2);
635 assert_eq!(result.release_count, 2);
636 assert_eq!(result.stats.main_releases, 1);
637 assert_eq!(result.stats.secondary_releases, 1);
638
639 let main_release = &result.releases[0];
641 assert_eq!(main_release.release_reference, "MAIN_RELEASE_001");
642 assert_eq!(main_release.release_title[0].text, "My Main Album");
643 assert_eq!(
644 main_release.release_type.as_ref().unwrap(),
645 &ReleaseType::Album
646 );
647
648 let secondary_release = &result.releases[1];
649 assert_eq!(secondary_release.release_reference, "SECONDARY_RELEASE_002");
650 assert_eq!(secondary_release.release_title[0].text, "Bonus Tracks");
651 assert_eq!(
652 secondary_release.release_type.as_ref().unwrap(),
653 &ReleaseType::EP
654 );
655
656 assert_eq!(result.release_references.len(), 2);
658 assert!(result
659 .release_references
660 .contains(&"MAIN_RELEASE_001".to_string()));
661 assert!(result
662 .release_references
663 .contains(&"SECONDARY_RELEASE_002".to_string()));
664
665 println!("Multi-release parsing stats: {:#?}", result.stats);
666 }
667
668 #[test]
669 fn test_max_releases_limit() {
670 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
671 <ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
672 <ern:ReleaseList>
673 <ern:Release><ern:ReleaseReference>REL001</ern:ReleaseReference></ern:Release>
674 <ern:Release><ern:ReleaseReference>REL002</ern:ReleaseReference></ern:Release>
675 <ern:Release><ern:ReleaseReference>REL003</ern:ReleaseReference></ern:Release>
676 <ern:Release><ern:ReleaseReference>REL004</ern:ReleaseReference></ern:Release>
677 <ern:Release><ern:ReleaseReference>REL005</ern:ReleaseReference></ern:Release>
678 </ern:ReleaseList>
679 </ern:NewReleaseMessage>"#;
680
681 let cursor = Cursor::new(xml.as_bytes());
682 let mut parser = MultiReleaseParser::new(ERNVersion::V4_3).max_releases(3);
683
684 let result = parser
685 .parse_releases(cursor)
686 .expect("Should parse with limit");
687
688 assert_eq!(result.releases.len(), 3);
689 assert_eq!(result.release_count, 3);
690 assert_eq!(result.stats.releases_parsed, 3);
691 }
692
693 #[test]
694 fn test_empty_and_self_closing_releases() {
695 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
696 <ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
697 <ern:ReleaseList>
698 <ern:Release/>
699 <ern:Release IsMainRelease="true"/>
700 <ern:Release>
701 <ern:ReleaseReference>REL003</ern:ReleaseReference>
702 </ern:Release>
703 </ern:ReleaseList>
704 </ern:NewReleaseMessage>"#;
705
706 let cursor = Cursor::new(xml.as_bytes());
707 let mut parser = MultiReleaseParser::new(ERNVersion::V4_3);
708
709 let result = parser
710 .parse_releases(cursor)
711 .expect("Should parse empty releases");
712
713 assert_eq!(result.releases.len(), 3);
714 assert_eq!(result.stats.main_releases, 1);
715 assert_eq!(result.stats.secondary_releases, 2);
716 }
717
718 #[test]
719 fn test_performance_with_many_releases() {
720 let mut xml = String::from(
722 r#"<?xml version="1.0" encoding="UTF-8"?>
723 <ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
724 <ern:ReleaseList>"#,
725 );
726
727 for i in 0..1000 {
728 xml.push_str(&format!(
729 r#"
730 <ern:Release IsMainRelease="{}">
731 <ern:ReleaseReference>REL{:06}</ern:ReleaseReference>
732 <ern:ReferenceTitle>
733 <ern:TitleText>Release {}</ern:TitleText>
734 </ern:ReferenceTitle>
735 </ern:Release>"#,
736 i == 0,
737 i,
738 i
739 ));
740 }
741 xml.push_str("</ern:ReleaseList></ern:NewReleaseMessage>");
742
743 let cursor = Cursor::new(xml.as_bytes());
744 let mut parser = MultiReleaseParser::new(ERNVersion::V4_3);
745
746 let start = std::time::Instant::now();
747 let count = parser
748 .count_releases(cursor)
749 .expect("Should count many releases");
750 let count_duration = start.elapsed();
751
752 assert_eq!(count, 1000);
753
754 let cursor2 = Cursor::new(xml.as_bytes());
756 let mut parser2 = MultiReleaseParser::new(ERNVersion::V4_3)
757 .detailed_parsing(true)
758 .max_releases(100); let start2 = std::time::Instant::now();
761 let result = parser2
762 .parse_releases(cursor2)
763 .expect("Should parse many releases");
764 let parse_duration = start2.elapsed();
765
766 assert_eq!(result.releases.len(), 100);
767 assert_eq!(result.stats.main_releases, 1);
768 assert_eq!(result.stats.secondary_releases, 99);
769
770 println!("Performance test results:");
771 println!(" Count 1000 releases: {:?}", count_duration);
772 println!(" Parse 100 releases: {:?}", parse_duration);
773 println!(
774 " Count throughput: {:.0} releases/sec",
775 1000.0 / count_duration.as_secs_f64()
776 );
777 println!(
778 " Parse throughput: {:.0} releases/sec",
779 100.0 / parse_duration.as_secs_f64()
780 );
781 }
782}