1#[allow(dead_code)] use crate::error::{ErrorLocation, ParseError};
6use ddex_core::models::streaming_types::builders::*;
7use ddex_core::models::streaming_types::*;
8use ddex_core::models::IdentifierType;
9use ddex_core::models::{graph::*, versions::ERNVersion};
10use quick_xml::{events::{Event, BytesStart}, Reader};
11use std::collections::HashMap;
12use std::io::BufRead;
13use std::time::Instant;
14
15#[derive(Debug, Clone)]
17pub enum AlignedStreamingElement {
18 Header(Box<MessageHeader>),
19 Release(Release),
20 Resource(Resource),
21 Party(Party),
22 EndOfStream,
23}
24
25#[derive(Debug)]
27enum AlignedParserState {
28 Initial,
29 InHeader(Box<MessageHeaderBuilder>),
30 InRelease(Box<ReleaseBuilder>),
31 InResource(Box<ResourceBuilder>),
32 InParty(Box<PartyBuilder>),
33 Complete,
34}
35
36pub struct AlignedStreamingParser<R: BufRead> {
38 reader: Reader<R>,
39 buffer: Vec<u8>,
40 state: AlignedParserState,
41 current_path: Vec<String>,
42 current_depth: usize,
43 text_buffer: String,
44 attributes: HashMap<String, String>,
45 bytes_processed: u64,
46 elements_yielded: usize,
47 start_time: Instant,
48}
49
50impl<R: BufRead> AlignedStreamingParser<R> {
51 pub fn new(reader: R, _version: ERNVersion) -> Self {
52 let mut xml_reader = Reader::from_reader(reader);
53 xml_reader.config_mut().trim_text(true);
54 xml_reader.config_mut().check_end_names = true;
55
56 Self {
57 reader: xml_reader,
58 buffer: Vec::with_capacity(8192),
59 state: AlignedParserState::Initial,
60 current_path: Vec::new(),
61 current_depth: 0,
62 text_buffer: String::new(),
63 attributes: HashMap::new(),
64 bytes_processed: 0,
65 elements_yielded: 0,
66 start_time: Instant::now(),
67 }
68 }
69
70 pub fn parse_next(&mut self) -> Result<Option<AlignedStreamingElement>, ParseError> {
71 loop {
72 self.buffer.clear();
73 let event = self.reader.read_event_into(&mut self.buffer)?;
74 match event {
75 Event::Start(e) => {
76 let name_bytes = e.name();
77 let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
78
79 let mut attributes = HashMap::new();
81 for attr_result in e.attributes() {
82 let attr = attr_result.map_err(|e| ParseError::XmlError {
83 message: format!("Attribute error: {}", e),
84 location: crate::error::ErrorLocation::default(),
85 })?;
86
87 let key = std::str::from_utf8(attr.key.as_ref())?;
88 let value = std::str::from_utf8(&attr.value)?;
89
90 attributes.insert(key.to_string(), value.to_string());
91 }
92
93 self.attributes = attributes;
95
96 self.handle_start_element_by_name(&name)?;
97 }
98 Event::End(e) => {
99 let name_bytes = e.name();
100 let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
101 if let Some(element) = self.handle_end_element_by_name(&name)? {
102 self.elements_yielded += 1;
103 return Ok(Some(element));
104 }
105 }
106 Event::Text(e) => {
107 let text = std::str::from_utf8(&e)?;
108 self.text_buffer.push_str(text.trim());
109 }
110 Event::Eof => {
111 return Ok(Some(AlignedStreamingElement::EndOfStream));
112 }
113 _ => {
114 }
116 }
117
118 self.bytes_processed = self.reader.buffer_position();
119
120 if self.current_depth > 100 {
122 return Err(ParseError::SecurityViolation {
123 message: "Nesting depth exceeds 100 levels".to_string(),
124 });
125 }
126
127 self.buffer.clear();
128 }
129 }
130
131
132 fn handle_start_element_by_name(&mut self, name: &str) -> Result<(), ParseError> {
133 self.current_path.push(name.to_string());
134 self.current_depth += 1;
135
136 self.text_buffer.clear();
137
138 match (&self.state, name) {
140 (AlignedParserState::Initial, "MessageHeader") => {
141 self.state = AlignedParserState::InHeader(Box::new(MessageHeaderBuilder::new()));
142 }
143 (AlignedParserState::Initial, "Release") => {
144 let reference = self
145 .attributes
146 .get("ReleaseReference")
147 .cloned()
148 .unwrap_or_else(|| format!("REL_{}", self.elements_yielded));
149 self.state =
150 AlignedParserState::InRelease(Box::new(ReleaseBuilder::new(reference)));
151 }
152 (AlignedParserState::Initial, "Resource") => {
153 let reference = self
154 .attributes
155 .get("ResourceReference")
156 .cloned()
157 .unwrap_or_else(|| format!("RES_{}", self.elements_yielded));
158 self.state =
159 AlignedParserState::InResource(Box::new(ResourceBuilder::new(reference)));
160 }
161 (AlignedParserState::Initial, "Party") => {
162 let reference = self.attributes.get("PartyReference").cloned();
163 self.state = AlignedParserState::InParty(Box::new(PartyBuilder::new(reference)));
164 }
165 _ => {
166 }
168 }
169
170 Ok(())
171 }
172
173 fn handle_end_element_by_name(
174 &mut self,
175 name: &str,
176 ) -> Result<Option<AlignedStreamingElement>, ParseError> {
177 let text_content = self.text_buffer.clone();
178
179 let result = match &mut self.state {
180 AlignedParserState::InHeader(builder) => {
181 match name {
182 "MessageId" => {
183 builder.set_message_id(text_content);
184 None
185 }
186 "MessageCreatedDateTime" => {
187 builder.set_created_date_time_from_text(text_content);
188 None
189 }
190 "MessageSender" => {
191 let sender = create_message_sender(
193 text_content.clone(),
194 Some(format!("SENDER_{}", self.elements_yielded)),
195 );
196 builder.set_sender(sender);
197 None
198 }
199 "MessageRecipient" => {
200 let recipient = create_message_recipient(text_content);
201 builder.set_recipient(recipient);
202 None
203 }
204 "MessageHeader" => {
205 let builder =
207 std::mem::replace(&mut self.state, AlignedParserState::Initial);
208 if let AlignedParserState::InHeader(header_builder) = builder {
209 match header_builder.to_core() {
210 Ok(header) => {
211 Some(AlignedStreamingElement::Header(Box::new(header)))
212 }
213 Err(e) => {
214 eprintln!("Warning: Header validation failed: {}", e);
215 let header = self.create_fallback_header();
217 Some(AlignedStreamingElement::Header(Box::new(header)))
218 }
219 }
220 } else {
221 None
222 }
223 }
224 _ => None,
225 }
226 }
227 AlignedParserState::InRelease(builder) => {
228 match name {
229 "ReleaseTitle" => {
230 let title = create_localized_string(
231 text_content,
232 self.attributes.get("LanguageCode").cloned(),
233 );
234 builder.add_title(title);
235 None
236 }
237 "Genre" => {
238 let genre = create_genre(text_content, None);
239 builder.add_genre(genre);
240 None
241 }
242 "DisplayArtist" => {
243 let artist = create_artist(text_content, "MainArtist".to_string(), None);
244 builder.add_artist(artist);
245 None
246 }
247 "ReleaseType" => {
248 let release_type = match text_content.as_str() {
249 "Album" => ReleaseType::Album,
250 "Single" => ReleaseType::Single,
251 "EP" => ReleaseType::EP,
252 _ => ReleaseType::Other(text_content),
253 };
254 builder.set_release_type(release_type);
255 None
256 }
257 "Release" => {
258 let builder =
260 std::mem::replace(&mut self.state, AlignedParserState::Initial);
261 if let AlignedParserState::InRelease(release_builder) = builder {
262 match release_builder.to_core() {
263 Ok(release) => Some(AlignedStreamingElement::Release(release)),
264 Err(e) => {
265 eprintln!("Warning: Release validation failed: {}", e);
266 None
267 }
268 }
269 } else {
270 None
271 }
272 }
273 _ => None,
274 }
275 }
276 AlignedParserState::InResource(builder) => {
277 match name {
278 "Title" => {
279 let title = create_localized_string(
280 text_content,
281 self.attributes.get("LanguageCode").cloned(),
282 );
283 builder.add_title(title);
284 None
285 }
286 "Duration" => {
287 builder.set_duration_from_text(text_content);
288 None
289 }
290 "ResourceType" => {
291 let resource_type = match text_content.as_str() {
292 "SoundRecording" => ResourceType::SoundRecording,
293 "Video" => ResourceType::Video,
294 "Image" => ResourceType::Image,
295 "Text" => ResourceType::Text,
296 "SheetMusic" => ResourceType::SheetMusic,
297 _ => ResourceType::SoundRecording, };
299 builder.set_resource_type(resource_type);
300 None
301 }
302 "ISRC" => {
303 let identifier = create_identifier(
304 text_content,
305 IdentifierType::ISRC,
306 Some("ISRC".to_string()),
307 );
308 builder.add_identifier(identifier);
309 None
310 }
311 "Resource" => {
312 let builder =
314 std::mem::replace(&mut self.state, AlignedParserState::Initial);
315 if let AlignedParserState::InResource(resource_builder) = builder {
316 match resource_builder.to_core() {
317 Ok(resource) => Some(AlignedStreamingElement::Resource(resource)),
318 Err(e) => {
319 eprintln!("Warning: Resource validation failed: {}", e);
320 None
321 }
322 }
323 } else {
324 None
325 }
326 }
327 _ => None,
328 }
329 }
330 AlignedParserState::InParty(builder) => {
331 match name {
332 "PartyName" => {
333 let name = create_localized_string(
334 text_content,
335 self.attributes.get("LanguageCode").cloned(),
336 );
337 builder.add_name(name);
338 None
339 }
340 "ISNI" => {
341 builder.set_isni(text_content);
342 None
343 }
344 "PartyRole" => {
345 let role = match text_content.as_str() {
346 "Artist" => PartyRole::Artist,
347 "Producer" => PartyRole::Producer,
348 "Composer" => PartyRole::Composer,
349 "Lyricist" => PartyRole::Lyricist,
350 "Publisher" => PartyRole::Publisher,
351 "Performer" => PartyRole::Performer,
352 "Engineer" => PartyRole::Engineer,
353 "Label" => PartyRole::Label,
354 "Distributor" => PartyRole::Distributor,
355 _ => PartyRole::Other(text_content),
356 };
357 builder.add_role(role);
358 None
359 }
360 "Party" => {
361 let builder =
363 std::mem::replace(&mut self.state, AlignedParserState::Initial);
364 if let AlignedParserState::InParty(party_builder) = builder {
365 match party_builder.to_core() {
366 Ok(party) => Some(AlignedStreamingElement::Party(party)),
367 Err(e) => {
368 eprintln!("Warning: Party validation failed: {}", e);
369 None
370 }
371 }
372 } else {
373 None
374 }
375 }
376 _ => None,
377 }
378 }
379 _ => None,
380 };
381
382 self.current_depth = self.current_depth.saturating_sub(1);
383 self.current_path.pop();
384 self.text_buffer.clear();
385
386 Ok(result)
387 }
388
389 fn create_fallback_header(&self) -> MessageHeader {
390 MessageHeader {
391 message_id: "FALLBACK_MSG".to_string(),
392 message_type: MessageType::NewReleaseMessage,
393 message_created_date_time: chrono::Utc::now(),
394 message_sender: create_message_sender("Unknown Sender".to_string(), None),
395 message_recipient: create_message_recipient("Unknown Recipient".to_string()),
396 message_control_type: None,
397 message_thread_id: None,
398 attributes: None,
399 extensions: None,
400 comments: None,
401 }
402 }
403
404 fn get_current_location(&self) -> ErrorLocation {
405 ErrorLocation {
406 line: 0,
407 column: 0,
408 byte_offset: Some(self.bytes_processed as usize),
409 path: "aligned_streaming".to_string(),
410 }
411 }
412
413 pub fn stats(&self) -> AlignedStats {
414 AlignedStats {
415 bytes_processed: self.bytes_processed,
416 elements_yielded: self.elements_yielded,
417 current_depth: self.current_depth,
418 elapsed: self.start_time.elapsed(),
419 }
420 }
421}
422
423pub struct AlignedStreamIterator<R: BufRead> {
425 parser: AlignedStreamingParser<R>,
426 finished: bool,
427}
428
429impl<R: BufRead> AlignedStreamIterator<R> {
430 pub fn new(reader: R, version: ERNVersion) -> Self {
431 Self {
432 parser: AlignedStreamingParser::new(reader, version),
433 finished: false,
434 }
435 }
436
437 pub fn stats(&self) -> AlignedStats {
438 self.parser.stats()
439 }
440}
441
442impl<R: BufRead> Iterator for AlignedStreamIterator<R> {
443 type Item = Result<AlignedStreamingElement, ParseError>;
444
445 fn next(&mut self) -> Option<Self::Item> {
446 if self.finished {
447 return None;
448 }
449
450 match self.parser.parse_next() {
451 Ok(Some(element)) => {
452 if matches!(element, AlignedStreamingElement::EndOfStream) {
453 self.finished = true;
454 }
455 Some(Ok(element))
456 }
457 Ok(None) => {
458 self.finished = true;
459 None
460 }
461 Err(e) => {
462 self.finished = true;
463 Some(Err(e))
464 }
465 }
466 }
467}
468
469#[derive(Debug, Clone)]
470pub struct AlignedStats {
471 pub bytes_processed: u64,
472 pub elements_yielded: usize,
473 pub current_depth: usize,
474 pub elapsed: std::time::Duration,
475}
476
477impl AlignedStats {
478 pub fn throughput_mibs(&self) -> f64 {
479 if self.elapsed.as_secs_f64() > 0.0 {
480 (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
481 } else {
482 0.0
483 }
484 }
485}
486
487#[cfg(test)]
488mod tests {
489 use super::*;
490 use std::io::Cursor;
491
492 #[test]
493 fn test_aligned_streaming_parser_with_builders() {
494 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
495<ERNMessage xmlns="http://ddex.net/xml/ern/43">
496 <MessageHeader>
497 <MessageId>test-message-1</MessageId>
498 <MessageCreatedDateTime>2023-01-01T00:00:00Z</MessageCreatedDateTime>
499 <MessageSender>Test Sender</MessageSender>
500 <MessageRecipient>Test Recipient</MessageRecipient>
501 </MessageHeader>
502 <Release ReleaseReference="REL001">
503 <ReleaseTitle>Test Release</ReleaseTitle>
504 <Genre>Rock</Genre>
505 <ReleaseType>Album</ReleaseType>
506 <DisplayArtist>Test Artist</DisplayArtist>
507 </Release>
508 <Resource ResourceReference="RES001">
509 <Title>Test Track</Title>
510 <Duration>180</Duration>
511 <ResourceType>SoundRecording</ResourceType>
512 <ISRC>USRC17607839</ISRC>
513 </Resource>
514 <Party PartyReference="PARTY001">
515 <PartyName>Test Party</PartyName>
516 <PartyRole>Artist</PartyRole>
517 <ISNI>0000000123456789</ISNI>
518 </Party>
519</ERNMessage>"#;
520
521 let cursor = Cursor::new(xml.as_bytes());
522 let iterator = AlignedStreamIterator::new(cursor, ERNVersion::V4_3);
523
524 let elements: Result<Vec<_>, _> = iterator.collect();
525 assert!(elements.is_ok());
526
527 let elements = elements.unwrap();
528 assert!(elements.len() >= 4); let has_header = elements
532 .iter()
533 .any(|e| matches!(e, AlignedStreamingElement::Header(_)));
534 let has_release = elements
535 .iter()
536 .any(|e| matches!(e, AlignedStreamingElement::Release(_)));
537 let has_resource = elements
538 .iter()
539 .any(|e| matches!(e, AlignedStreamingElement::Resource(_)));
540 let has_party = elements
541 .iter()
542 .any(|e| matches!(e, AlignedStreamingElement::Party(_)));
543
544 assert!(
545 has_header,
546 "Should parse message header using MessageHeaderBuilder"
547 );
548 assert!(has_release, "Should parse release using ReleaseBuilder");
549 assert!(has_resource, "Should parse resource using ResourceBuilder");
550 assert!(has_party, "Should parse party using PartyBuilder");
551
552 for element in &elements {
554 match element {
555 AlignedStreamingElement::Header(header) => {
556 assert_eq!(header.message_id, "test-message-1");
557 assert_eq!(header.message_sender.party_name[0].text, "Test Sender");
558 }
559 AlignedStreamingElement::Release(release) => {
560 assert_eq!(release.release_reference, "REL001");
561 assert_eq!(release.release_title[0].text, "Test Release");
562 assert_eq!(release.genre[0].genre_text, "Rock");
563 assert_eq!(release.release_type, Some(ReleaseType::Album));
564 }
565 AlignedStreamingElement::Resource(resource) => {
566 assert_eq!(resource.resource_reference, "RES001");
567 assert_eq!(resource.reference_title[0].text, "Test Track");
568 assert_eq!(resource.duration, Some(std::time::Duration::from_secs(180)));
569 assert_eq!(resource.resource_type, ResourceType::SoundRecording);
570 }
571 AlignedStreamingElement::Party(party) => {
572 assert_eq!(party.party_name[0].text, "Test Party");
573 assert_eq!(party.isni, Some("0000000123456789".to_string()));
574 assert!(party.party_role.contains(&PartyRole::Artist));
575 }
576 _ => {}
577 }
578 }
579 }
580
581 #[test]
582 fn test_builder_validation() {
583 let xml = r#"<?xml version="1.0"?>
584<ERNMessage>
585 <Release>
586 <!-- Missing required fields -->
587 </Release>
588</ERNMessage>"#;
589
590 let cursor = Cursor::new(xml.as_bytes());
591 let mut iterator = AlignedStreamIterator::new(cursor, ERNVersion::V4_3);
592
593 let elements: Vec<_> = iterator.collect();
595 assert!(!elements.is_empty());
597 }
598
599 #[test]
600 fn test_conversion_traits() {
601 let mut builder = ReleaseBuilder::new("TEST_REL".to_string());
603 builder.add_title(create_localized_string("Test Title".to_string(), None));
604
605 let release = builder.to_core().unwrap();
606 assert_eq!(release.release_reference, "TEST_REL");
607 assert_eq!(release.release_title[0].text, "Test Title");
608
609 let empty_builder = ReleaseBuilder::default();
611 assert!(!empty_builder.is_complete());
612 assert!(empty_builder.validate().is_err());
613 }
614}