use crate::error::ParseError;
use ddex_core::models::versions::ERNVersion;
use quick_xml::{events::Event, Reader};
use std::io::BufRead;
use std::time::Instant;
#[derive(Debug, Clone)]
pub enum WorkingStreamingElement {
MessageHeader {
message_id: String,
created_date_time: String,
version: ERNVersion,
},
Release {
reference: String,
title: String,
resource_references: Vec<String>,
},
SoundRecording {
reference: String,
title: String,
duration: Option<String>,
isrc: Option<String>,
},
Video {
reference: String,
title: String,
duration: Option<String>,
},
Image {
reference: String,
title: String,
width: Option<u32>,
height: Option<u32>,
},
Text {
reference: String,
title: String,
language_code: Option<String>,
},
EndOfStream { stats: WorkingStreamingStats },
}
pub struct WorkingStreamingParser<R: BufRead> {
reader: Reader<R>,
buffer: Vec<u8>,
version: ERNVersion,
current_element: Vec<String>,
current_depth: usize,
text_buffer: String,
in_message_header: bool,
in_release: bool,
in_resource: bool,
current_resource_type: Option<String>,
current_attributes: std::collections::HashMap<String, String>,
current_fields: std::collections::HashMap<String, String>,
release_attributes: std::collections::HashMap<String, String>,
resource_attributes: std::collections::HashMap<String, String>,
bytes_processed: u64,
elements_yielded: usize,
start_time: Instant,
max_memory_used: usize,
current_memory: usize,
}
impl<R: BufRead> WorkingStreamingParser<R> {
pub fn new(reader: R, version: ERNVersion) -> Self {
let mut xml_reader = Reader::from_reader(reader);
xml_reader.config_mut().trim_text(true);
xml_reader.config_mut().check_end_names = true;
xml_reader.config_mut().check_comments = false;
xml_reader.config_mut().expand_empty_elements = false;
Self {
reader: xml_reader,
buffer: Vec::with_capacity(8192),
version,
current_element: Vec::new(),
current_depth: 0,
text_buffer: String::new(),
in_message_header: false,
in_release: false,
in_resource: false,
current_resource_type: None,
current_attributes: std::collections::HashMap::new(),
current_fields: std::collections::HashMap::new(),
release_attributes: std::collections::HashMap::new(),
resource_attributes: std::collections::HashMap::new(),
bytes_processed: 0,
elements_yielded: 0,
start_time: Instant::now(),
max_memory_used: 0,
current_memory: 0,
}
}
pub fn feed_chunk(
&mut self,
chunk: &[u8],
) -> Result<Option<WorkingStreamingElement>, ParseError> {
self.bytes_processed += chunk.len() as u64;
self.update_memory_usage();
if self.current_memory > 100 * 1024 * 1024 {
return Err(ParseError::SecurityViolation {
message: "Memory usage exceeds 100MB limit".to_string(),
});
}
self.parse_next()
}
pub fn parse_next(&mut self) -> Result<Option<WorkingStreamingElement>, ParseError> {
loop {
self.buffer.clear();
let event = self.reader.read_event_into(&mut self.buffer)?;
match event {
Event::Start(e) => {
let name = std::str::from_utf8(e.name().as_ref())?.to_string();
let mut attributes = std::collections::HashMap::new();
for attr_result in e.attributes() {
let attr = attr_result?;
let key = std::str::from_utf8(attr.key.as_ref())?;
let value = std::str::from_utf8(&attr.value)?;
attributes.insert(key.to_string(), value.to_string());
}
self.handle_start_element_with_attrs(&name, attributes)?;
}
Event::End(e) => {
let name = std::str::from_utf8(e.name().as_ref())?.to_string();
if let Some(element) = self.handle_end_element(&name)? {
self.elements_yielded += 1;
return Ok(Some(element));
}
}
Event::Text(e) => {
let text = std::str::from_utf8(&e)?;
if !text.trim().is_empty() {
self.text_buffer.push_str(text.trim());
}
}
Event::CData(e) => {
let text = std::str::from_utf8(&e)?;
self.text_buffer.push_str(text);
}
Event::Eof => {
return Ok(Some(WorkingStreamingElement::EndOfStream {
stats: self.get_stats(),
}));
}
_ => {
}
}
self.bytes_processed = self.reader.buffer_position();
}
}
fn handle_start_element_with_attrs(
&mut self,
name: &str,
attributes: std::collections::HashMap<String, String>,
) -> Result<(), ParseError> {
self.current_element.push(name.to_string());
self.current_depth += 1;
if self.current_depth > 100 {
return Err(ParseError::SecurityViolation {
message: "XML nesting depth exceeds 100 levels".to_string(),
});
}
self.current_attributes = attributes;
self.text_buffer.clear();
match name {
"MessageHeader" => {
self.in_message_header = true;
}
"Release" => {
self.in_release = true;
self.current_fields.clear();
self.release_attributes = self.current_attributes.clone();
}
"SoundRecording" | "Video" | "Image" | "Text" => {
self.in_resource = true;
self.current_resource_type = Some(name.to_string());
self.current_fields.clear();
self.resource_attributes = self.current_attributes.clone();
}
_ => {}
}
Ok(())
}
fn handle_end_element(
&mut self,
name: &str,
) -> Result<Option<WorkingStreamingElement>, ParseError> {
self.current_depth = self.current_depth.saturating_sub(1);
self.current_element.pop();
let text_content = self.text_buffer.clone();
if !text_content.is_empty() {
self.current_fields.insert(name.to_string(), text_content);
}
let result = match name {
"MessageHeader" => {
self.in_message_header = false;
Some(WorkingStreamingElement::MessageHeader {
message_id: self
.current_fields
.get("MessageId")
.unwrap_or(&"unknown".to_string())
.clone(),
created_date_time: self
.current_fields
.get("CreatedDateTime")
.unwrap_or(&chrono::Utc::now().to_rfc3339())
.clone(),
version: self.version,
})
}
"Release" => {
self.in_release = false;
let reference = self
.release_attributes
.get("ReleaseReference")
.or_else(|| self.current_fields.get("ReleaseReference"))
.unwrap_or(&format!("REL-{}", self.elements_yielded))
.clone();
let title = self
.current_fields
.get("TitleText")
.or_else(|| self.current_fields.get("Title"))
.or_else(|| self.current_fields.get("ReferenceTitle"))
.unwrap_or(&"Untitled Release".to_string())
.clone();
Some(WorkingStreamingElement::Release {
reference,
title,
resource_references: self.extract_resource_references(),
})
}
"SoundRecording" => {
if self.in_resource {
self.in_resource = false;
self.current_resource_type = None;
Some(WorkingStreamingElement::SoundRecording {
reference: self.get_resource_reference(),
title: self.get_resource_title(),
duration: self.current_fields.get("Duration").cloned(),
isrc: self.current_fields.get("ISRC").cloned(),
})
} else {
None
}
}
"Video" => {
if self.in_resource {
self.in_resource = false;
self.current_resource_type = None;
Some(WorkingStreamingElement::Video {
reference: self.get_resource_reference(),
title: self.get_resource_title(),
duration: self.current_fields.get("Duration").cloned(),
})
} else {
None
}
}
"Image" => {
if self.in_resource {
self.in_resource = false;
self.current_resource_type = None;
Some(WorkingStreamingElement::Image {
reference: self.get_resource_reference(),
title: self.get_resource_title(),
width: self
.current_fields
.get("Width")
.and_then(|w| w.parse().ok()),
height: self
.current_fields
.get("Height")
.and_then(|h| h.parse().ok()),
})
} else {
None
}
}
"Text" => {
if self.in_resource {
self.in_resource = false;
self.current_resource_type = None;
Some(WorkingStreamingElement::Text {
reference: self.get_resource_reference(),
title: self.get_resource_title(),
language_code: self
.current_fields
.get("LanguageOfPerformance")
.or_else(|| self.current_fields.get("LanguageCode"))
.cloned(),
})
} else {
None
}
}
_ => None,
};
self.text_buffer.clear();
Ok(result)
}
fn get_resource_reference(&self) -> String {
self.resource_attributes
.get("ResourceReference")
.or_else(|| self.current_fields.get("ResourceReference"))
.unwrap_or(&format!("RES-{}", self.elements_yielded))
.clone()
}
fn get_resource_title(&self) -> String {
self.current_fields
.get("TitleText")
.or_else(|| self.current_fields.get("Title"))
.or_else(|| self.current_fields.get("ReferenceTitle"))
.unwrap_or(&"Untitled Resource".to_string())
.clone()
}
fn extract_resource_references(&self) -> Vec<String> {
vec![]
}
fn update_memory_usage(&mut self) {
let estimated_memory = self.buffer.capacity()
+ self.current_element.iter().map(|s| s.len()).sum::<usize>()
+ self.text_buffer.capacity()
+ self
.current_attributes
.iter()
.map(|(k, v)| k.len() + v.len())
.sum::<usize>()
+ self
.current_fields
.iter()
.map(|(k, v)| k.len() + v.len())
.sum::<usize>()
+ 1024;
self.current_memory = estimated_memory;
self.max_memory_used = self.max_memory_used.max(estimated_memory);
}
pub fn get_stats(&self) -> WorkingStreamingStats {
WorkingStreamingStats {
bytes_processed: self.bytes_processed,
elements_yielded: self.elements_yielded,
current_depth: self.current_depth,
max_depth_reached: self.current_element.len(),
current_memory_bytes: self.current_memory,
max_memory_used_bytes: self.max_memory_used,
elapsed_time: self.start_time.elapsed(),
throughput_mb_per_sec: if self.start_time.elapsed().as_secs_f64() > 0.0 {
(self.bytes_processed as f64 / (1024.0 * 1024.0))
/ self.start_time.elapsed().as_secs_f64()
} else {
0.0
},
}
}
}
#[derive(Debug, Clone)]
pub struct WorkingStreamingStats {
pub bytes_processed: u64,
pub elements_yielded: usize,
pub current_depth: usize,
pub max_depth_reached: usize,
pub current_memory_bytes: usize,
pub max_memory_used_bytes: usize,
pub elapsed_time: std::time::Duration,
pub throughput_mb_per_sec: f64,
}
impl WorkingStreamingStats {
pub fn is_memory_bounded(&self) -> bool {
self.max_memory_used_bytes < 10 * 1024 * 1024
}
pub fn memory_efficiency(&self) -> f64 {
if self.max_memory_used_bytes > 0 {
(self.bytes_processed as f64 / (1024.0 * 1024.0))
/ (self.max_memory_used_bytes as f64 / (1024.0 * 1024.0))
} else {
0.0
}
}
}
pub struct WorkingStreamIterator<R: BufRead> {
parser: WorkingStreamingParser<R>,
finished: bool,
}
impl<R: BufRead> WorkingStreamIterator<R> {
pub fn new(reader: R, version: ERNVersion) -> Self {
Self {
parser: WorkingStreamingParser::new(reader, version),
finished: false,
}
}
pub fn stats(&self) -> WorkingStreamingStats {
self.parser.get_stats()
}
pub fn is_finished(&self) -> bool {
self.finished
}
}
impl<R: BufRead> Iterator for WorkingStreamIterator<R> {
type Item = Result<WorkingStreamingElement, ParseError>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
match self.parser.parse_next() {
Ok(Some(element)) => {
if matches!(element, WorkingStreamingElement::EndOfStream { .. }) {
self.finished = true;
}
Some(Ok(element))
}
Ok(None) => {
self.next()
}
Err(e) => {
self.finished = true;
Some(Err(e))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_working_streaming_basic() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
<MessageHeader>
<MessageId>MSG-001</MessageId>
<CreatedDateTime>2023-01-01T00:00:00Z</CreatedDateTime>
</MessageHeader>
<Release ReleaseReference="REL-001">
<Title>Test Release</Title>
</Release>
<SoundRecording ResourceReference="RES-001">
<Title>Test Track</Title>
<Duration>PT3M45S</Duration>
<ISRC>USRC17607839</ISRC>
</SoundRecording>
</ern:NewReleaseMessage>"#;
let cursor = Cursor::new(xml.as_bytes());
let iterator = WorkingStreamIterator::new(cursor, ERNVersion::V4_3);
let elements: Result<Vec<_>, _> = iterator.collect();
assert!(elements.is_ok(), "Parsing should succeed");
let elements = elements.unwrap();
assert!(
elements.len() >= 3,
"Should find header, release, and sound recording"
);
let has_header = elements
.iter()
.any(|e| matches!(e, WorkingStreamingElement::MessageHeader { .. }));
let has_release = elements
.iter()
.any(|e| matches!(e, WorkingStreamingElement::Release { .. }));
let has_sound = elements
.iter()
.any(|e| matches!(e, WorkingStreamingElement::SoundRecording { .. }));
assert!(has_header, "Should find MessageHeader");
assert!(has_release, "Should find Release");
assert!(has_sound, "Should find SoundRecording");
}
#[test]
fn test_memory_bounded() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
<MessageHeader>
<MessageId>MSG-MEMORY-TEST</MessageId>
</MessageHeader>
</ern:NewReleaseMessage>"#;
let cursor = Cursor::new(xml.as_bytes());
let mut iterator = WorkingStreamIterator::new(cursor, ERNVersion::V4_3);
let _: Vec<_> = iterator.by_ref().collect();
let stats = iterator.stats();
assert!(
stats.is_memory_bounded(),
"Memory usage should be bounded under 10MB, got {} bytes",
stats.max_memory_used_bytes
);
}
#[test]
fn test_security_depth_limit() {
let mut xml = String::from(r#"<?xml version="1.0"?>"#);
for i in 0..150 {
xml.push_str(&format!("<level{}>", i));
}
xml.push_str("content");
for i in (0..150).rev() {
xml.push_str(&format!("</level{}>", i));
}
let cursor = Cursor::new(xml.as_bytes());
let mut iterator = WorkingStreamIterator::new(cursor, ERNVersion::V4_3);
let result = iterator.next();
assert!(result.is_some());
match result.unwrap() {
Err(ParseError::SecurityViolation { .. }) => {
}
_ => panic!("Expected security violation for deep nesting"),
}
}
}