1use crate::error::ParseError;
10use ddex_core::models::versions::ERNVersion;
11use quick_xml::{events::Event, Reader};
12use std::io::BufRead;
13use std::time::Instant;
14
15#[derive(Debug, Clone)]
17pub enum WorkingStreamingElement {
18 MessageHeader {
20 message_id: String,
21 created_date_time: String,
22 version: ERNVersion,
23 },
24 Release {
26 reference: String,
27 title: String,
28 resource_references: Vec<String>,
29 },
30 SoundRecording {
32 reference: String,
33 title: String,
34 duration: Option<String>,
35 isrc: Option<String>,
36 },
37 Video {
39 reference: String,
40 title: String,
41 duration: Option<String>,
42 },
43 Image {
45 reference: String,
46 title: String,
47 width: Option<u32>,
48 height: Option<u32>,
49 },
50 Text {
52 reference: String,
53 title: String,
54 language_code: Option<String>,
55 },
56 EndOfStream { stats: WorkingStreamingStats },
58}
59
60pub struct WorkingStreamingParser<R: BufRead> {
62 reader: Reader<R>,
63 buffer: Vec<u8>,
64 version: ERNVersion,
65
66 current_element: Vec<String>,
68 current_depth: usize,
69 text_buffer: String,
70
71 in_message_header: bool,
73 in_release: bool,
74 in_resource: bool,
75 current_resource_type: Option<String>,
76
77 current_attributes: std::collections::HashMap<String, String>,
79 current_fields: std::collections::HashMap<String, String>,
80
81 release_attributes: std::collections::HashMap<String, String>,
83 resource_attributes: std::collections::HashMap<String, String>,
84
85 bytes_processed: u64,
87 elements_yielded: usize,
88 start_time: Instant,
89 max_memory_used: usize,
90 current_memory: usize,
91}
92
93impl<R: BufRead> WorkingStreamingParser<R> {
94 pub fn new(reader: R, version: ERNVersion) -> Self {
96 let mut xml_reader = Reader::from_reader(reader);
97 xml_reader.config_mut().trim_text(true);
98 xml_reader.config_mut().check_end_names = true;
99 xml_reader.config_mut().check_comments = false;
100 xml_reader.config_mut().expand_empty_elements = false;
101
102 Self {
103 reader: xml_reader,
104 buffer: Vec::with_capacity(8192),
105 version,
106 current_element: Vec::new(),
107 current_depth: 0,
108 text_buffer: String::new(),
109 in_message_header: false,
110 in_release: false,
111 in_resource: false,
112 current_resource_type: None,
113 current_attributes: std::collections::HashMap::new(),
114 current_fields: std::collections::HashMap::new(),
115 release_attributes: std::collections::HashMap::new(),
116 resource_attributes: std::collections::HashMap::new(),
117 bytes_processed: 0,
118 elements_yielded: 0,
119 start_time: Instant::now(),
120 max_memory_used: 0,
121 current_memory: 0,
122 }
123 }
124
125 pub fn feed_chunk(
127 &mut self,
128 chunk: &[u8],
129 ) -> Result<Option<WorkingStreamingElement>, ParseError> {
130 self.bytes_processed += chunk.len() as u64;
131 self.update_memory_usage();
132
133 if self.current_memory > 100 * 1024 * 1024 {
135 return Err(ParseError::SecurityViolation {
137 message: "Memory usage exceeds 100MB limit".to_string(),
138 });
139 }
140
141 self.parse_next()
142 }
143
144 pub fn parse_next(&mut self) -> Result<Option<WorkingStreamingElement>, ParseError> {
146 loop {
147 self.buffer.clear();
148 let event = self.reader.read_event_into(&mut self.buffer)?;
149
150 match event {
151 Event::Start(e) => {
152 let name = std::str::from_utf8(e.name().as_ref())?.to_string();
153
154 let mut attributes = std::collections::HashMap::new();
156 for attr_result in e.attributes() {
157 let attr = attr_result?;
158 let key = std::str::from_utf8(attr.key.as_ref())?;
159 let value = std::str::from_utf8(&attr.value)?;
160 attributes.insert(key.to_string(), value.to_string());
161 }
162
163 self.handle_start_element_with_attrs(&name, attributes)?;
164 }
165 Event::End(e) => {
166 let name = std::str::from_utf8(e.name().as_ref())?.to_string();
167 if let Some(element) = self.handle_end_element(&name)? {
168 self.elements_yielded += 1;
169 return Ok(Some(element));
170 }
171 }
172 Event::Text(e) => {
173 let text = std::str::from_utf8(&e)?;
174 if !text.trim().is_empty() {
175 self.text_buffer.push_str(text.trim());
176 }
177 }
178 Event::CData(e) => {
179 let text = std::str::from_utf8(&e)?;
180 self.text_buffer.push_str(text);
181 }
182 Event::Eof => {
183 return Ok(Some(WorkingStreamingElement::EndOfStream {
184 stats: self.get_stats(),
185 }));
186 }
187 _ => {
188 }
190 }
191
192 self.bytes_processed = self.reader.buffer_position();
193 }
194 }
195
196 fn handle_start_element_with_attrs(
198 &mut self,
199 name: &str,
200 attributes: std::collections::HashMap<String, String>,
201 ) -> Result<(), ParseError> {
202 self.current_element.push(name.to_string());
203 self.current_depth += 1;
204
205 if self.current_depth > 100 {
207 return Err(ParseError::SecurityViolation {
208 message: "XML nesting depth exceeds 100 levels".to_string(),
209 });
210 }
211
212 self.current_attributes = attributes;
214
215 self.text_buffer.clear();
217
218 match name {
220 "MessageHeader" => {
221 self.in_message_header = true;
222 }
223 "Release" => {
224 self.in_release = true;
225 self.current_fields.clear();
226 self.release_attributes = self.current_attributes.clone();
228 }
229 "SoundRecording" | "Video" | "Image" | "Text" => {
230 self.in_resource = true;
231 self.current_resource_type = Some(name.to_string());
232 self.current_fields.clear();
233 self.resource_attributes = self.current_attributes.clone();
235 }
236 _ => {}
237 }
238
239 Ok(())
240 }
241
242 fn handle_end_element(
244 &mut self,
245 name: &str,
246 ) -> Result<Option<WorkingStreamingElement>, ParseError> {
247 self.current_depth = self.current_depth.saturating_sub(1);
248 self.current_element.pop();
249
250 let text_content = self.text_buffer.clone();
252 if !text_content.is_empty() {
253 self.current_fields.insert(name.to_string(), text_content);
254 }
255
256 let result = match name {
258 "MessageHeader" => {
259 self.in_message_header = false;
260 Some(WorkingStreamingElement::MessageHeader {
261 message_id: self
262 .current_fields
263 .get("MessageId")
264 .unwrap_or(&"unknown".to_string())
265 .clone(),
266 created_date_time: self
267 .current_fields
268 .get("CreatedDateTime")
269 .unwrap_or(&chrono::Utc::now().to_rfc3339())
270 .clone(),
271 version: self.version,
272 })
273 }
274 "Release" => {
275 self.in_release = false;
276 let reference = self
277 .release_attributes
278 .get("ReleaseReference")
279 .or_else(|| self.current_fields.get("ReleaseReference"))
280 .unwrap_or(&format!("REL-{}", self.elements_yielded))
281 .clone();
282 let title = self
283 .current_fields
284 .get("TitleText")
285 .or_else(|| self.current_fields.get("Title"))
286 .or_else(|| self.current_fields.get("ReferenceTitle"))
287 .unwrap_or(&"Untitled Release".to_string())
288 .clone();
289 Some(WorkingStreamingElement::Release {
290 reference,
291 title,
292 resource_references: self.extract_resource_references(),
293 })
294 }
295 "SoundRecording" => {
296 if self.in_resource {
297 self.in_resource = false;
298 self.current_resource_type = None;
299 Some(WorkingStreamingElement::SoundRecording {
300 reference: self.get_resource_reference(),
301 title: self.get_resource_title(),
302 duration: self.current_fields.get("Duration").cloned(),
303 isrc: self.current_fields.get("ISRC").cloned(),
304 })
305 } else {
306 None
307 }
308 }
309 "Video" => {
310 if self.in_resource {
311 self.in_resource = false;
312 self.current_resource_type = None;
313 Some(WorkingStreamingElement::Video {
314 reference: self.get_resource_reference(),
315 title: self.get_resource_title(),
316 duration: self.current_fields.get("Duration").cloned(),
317 })
318 } else {
319 None
320 }
321 }
322 "Image" => {
323 if self.in_resource {
324 self.in_resource = false;
325 self.current_resource_type = None;
326 Some(WorkingStreamingElement::Image {
327 reference: self.get_resource_reference(),
328 title: self.get_resource_title(),
329 width: self
330 .current_fields
331 .get("Width")
332 .and_then(|w| w.parse().ok()),
333 height: self
334 .current_fields
335 .get("Height")
336 .and_then(|h| h.parse().ok()),
337 })
338 } else {
339 None
340 }
341 }
342 "Text" => {
343 if self.in_resource {
344 self.in_resource = false;
345 self.current_resource_type = None;
346 Some(WorkingStreamingElement::Text {
347 reference: self.get_resource_reference(),
348 title: self.get_resource_title(),
349 language_code: self
350 .current_fields
351 .get("LanguageOfPerformance")
352 .or_else(|| self.current_fields.get("LanguageCode"))
353 .cloned(),
354 })
355 } else {
356 None
357 }
358 }
359 _ => None,
360 };
361
362 self.text_buffer.clear();
364
365 Ok(result)
366 }
367
368 fn get_resource_reference(&self) -> String {
370 self.resource_attributes
371 .get("ResourceReference")
372 .or_else(|| self.current_fields.get("ResourceReference"))
373 .unwrap_or(&format!("RES-{}", self.elements_yielded))
374 .clone()
375 }
376
377 fn get_resource_title(&self) -> String {
379 self.current_fields
380 .get("TitleText")
381 .or_else(|| self.current_fields.get("Title"))
382 .or_else(|| self.current_fields.get("ReferenceTitle"))
383 .unwrap_or(&"Untitled Resource".to_string())
384 .clone()
385 }
386
387 fn extract_resource_references(&self) -> Vec<String> {
389 vec![]
392 }
393
394 fn update_memory_usage(&mut self) {
396 let estimated_memory = self.buffer.capacity()
397 + self.current_element.iter().map(|s| s.len()).sum::<usize>()
398 + self.text_buffer.capacity()
399 + self
400 .current_attributes
401 .iter()
402 .map(|(k, v)| k.len() + v.len())
403 .sum::<usize>()
404 + self
405 .current_fields
406 .iter()
407 .map(|(k, v)| k.len() + v.len())
408 .sum::<usize>()
409 + 1024; self.current_memory = estimated_memory;
412 self.max_memory_used = self.max_memory_used.max(estimated_memory);
413 }
414
415 pub fn get_stats(&self) -> WorkingStreamingStats {
417 WorkingStreamingStats {
418 bytes_processed: self.bytes_processed,
419 elements_yielded: self.elements_yielded,
420 current_depth: self.current_depth,
421 max_depth_reached: self.current_element.len(),
422 current_memory_bytes: self.current_memory,
423 max_memory_used_bytes: self.max_memory_used,
424 elapsed_time: self.start_time.elapsed(),
425 throughput_mb_per_sec: if self.start_time.elapsed().as_secs_f64() > 0.0 {
426 (self.bytes_processed as f64 / (1024.0 * 1024.0))
427 / self.start_time.elapsed().as_secs_f64()
428 } else {
429 0.0
430 },
431 }
432 }
433}
434
435#[derive(Debug, Clone)]
437pub struct WorkingStreamingStats {
438 pub bytes_processed: u64,
439 pub elements_yielded: usize,
440 pub current_depth: usize,
441 pub max_depth_reached: usize,
442 pub current_memory_bytes: usize,
443 pub max_memory_used_bytes: usize,
444 pub elapsed_time: std::time::Duration,
445 pub throughput_mb_per_sec: f64,
446}
447
448impl WorkingStreamingStats {
449 pub fn is_memory_bounded(&self) -> bool {
451 self.max_memory_used_bytes < 10 * 1024 * 1024
452 }
453
454 pub fn memory_efficiency(&self) -> f64 {
456 if self.max_memory_used_bytes > 0 {
457 (self.bytes_processed as f64 / (1024.0 * 1024.0))
458 / (self.max_memory_used_bytes as f64 / (1024.0 * 1024.0))
459 } else {
460 0.0
461 }
462 }
463}
464
465pub struct WorkingStreamIterator<R: BufRead> {
467 parser: WorkingStreamingParser<R>,
468 finished: bool,
469}
470
471impl<R: BufRead> WorkingStreamIterator<R> {
472 pub fn new(reader: R, version: ERNVersion) -> Self {
473 Self {
474 parser: WorkingStreamingParser::new(reader, version),
475 finished: false,
476 }
477 }
478
479 pub fn stats(&self) -> WorkingStreamingStats {
481 self.parser.get_stats()
482 }
483
484 pub fn is_finished(&self) -> bool {
486 self.finished
487 }
488}
489
490impl<R: BufRead> Iterator for WorkingStreamIterator<R> {
491 type Item = Result<WorkingStreamingElement, ParseError>;
492
493 fn next(&mut self) -> Option<Self::Item> {
494 if self.finished {
495 return None;
496 }
497
498 match self.parser.parse_next() {
499 Ok(Some(element)) => {
500 if matches!(element, WorkingStreamingElement::EndOfStream { .. }) {
501 self.finished = true;
502 }
503 Some(Ok(element))
504 }
505 Ok(None) => {
506 self.next()
508 }
509 Err(e) => {
510 self.finished = true;
511 Some(Err(e))
512 }
513 }
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use std::io::Cursor;
521
522 #[test]
523 fn test_working_streaming_basic() {
524 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
525<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
526 <MessageHeader>
527 <MessageId>MSG-001</MessageId>
528 <CreatedDateTime>2023-01-01T00:00:00Z</CreatedDateTime>
529 </MessageHeader>
530 <Release ReleaseReference="REL-001">
531 <Title>Test Release</Title>
532 </Release>
533 <SoundRecording ResourceReference="RES-001">
534 <Title>Test Track</Title>
535 <Duration>PT3M45S</Duration>
536 <ISRC>USRC17607839</ISRC>
537 </SoundRecording>
538</ern:NewReleaseMessage>"#;
539
540 let cursor = Cursor::new(xml.as_bytes());
541 let iterator = WorkingStreamIterator::new(cursor, ERNVersion::V4_3);
542
543 let elements: Result<Vec<_>, _> = iterator.collect();
544 assert!(elements.is_ok(), "Parsing should succeed");
545
546 let elements = elements.unwrap();
547 assert!(
548 elements.len() >= 3,
549 "Should find header, release, and sound recording"
550 );
551
552 let has_header = elements
554 .iter()
555 .any(|e| matches!(e, WorkingStreamingElement::MessageHeader { .. }));
556 let has_release = elements
557 .iter()
558 .any(|e| matches!(e, WorkingStreamingElement::Release { .. }));
559 let has_sound = elements
560 .iter()
561 .any(|e| matches!(e, WorkingStreamingElement::SoundRecording { .. }));
562
563 assert!(has_header, "Should find MessageHeader");
564 assert!(has_release, "Should find Release");
565 assert!(has_sound, "Should find SoundRecording");
566 }
567
568 #[test]
569 fn test_memory_bounded() {
570 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
571<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
572 <MessageHeader>
573 <MessageId>MSG-MEMORY-TEST</MessageId>
574 </MessageHeader>
575</ern:NewReleaseMessage>"#;
576
577 let cursor = Cursor::new(xml.as_bytes());
578 let mut iterator = WorkingStreamIterator::new(cursor, ERNVersion::V4_3);
579
580 let _: Vec<_> = iterator.by_ref().collect();
582
583 let stats = iterator.stats();
584 assert!(
585 stats.is_memory_bounded(),
586 "Memory usage should be bounded under 10MB, got {} bytes",
587 stats.max_memory_used_bytes
588 );
589 }
590
591 #[test]
592 fn test_security_depth_limit() {
593 let mut xml = String::from(r#"<?xml version="1.0"?>"#);
595 for i in 0..150 {
596 xml.push_str(&format!("<level{}>", i));
597 }
598 xml.push_str("content");
599 for i in (0..150).rev() {
600 xml.push_str(&format!("</level{}>", i));
601 }
602
603 let cursor = Cursor::new(xml.as_bytes());
604 let mut iterator = WorkingStreamIterator::new(cursor, ERNVersion::V4_3);
605
606 let result = iterator.next();
608 assert!(result.is_some());
609 match result.unwrap() {
610 Err(ParseError::SecurityViolation { .. }) => {
611 }
613 _ => panic!("Expected security violation for deep nesting"),
614 }
615 }
616}