use super::element::HeaderBuilder;
use super::state::{
ParserState, ParsingContext, PartialDeal, PartialMessageHeader, PartialParty, PartialRelease,
PartialResource,
};
use super::{ParsedElement, StreamingConfig, StreamingProgress};
use crate::error::ParseError;
use ddex_core::models::{graph::*, versions::ERNVersion};
use ddex_core::models::{Identifier, IdentifierType, LocalizedString};
use quick_xml::{events::Event, Reader};
use std::io::BufRead;
use std::time::Instant;
pub struct StreamingDDEXParser<R: BufRead> {
reader: Reader<R>,
pub(crate) context: ParsingContext,
config: StreamingConfig,
buffer: Vec<u8>,
pub(crate) start_time: Instant,
pub(crate) bytes_processed: u64,
pub(crate) elements_yielded: usize,
pub(crate) current_memory: usize,
progress_callback: Option<Box<dyn FnMut(StreamingProgress) + Send>>,
}
impl<R: BufRead> StreamingDDEXParser<R> {
pub fn new(reader: R, version: ERNVersion) -> Self {
let mut xml_reader = Reader::from_reader(reader);
xml_reader.config_mut().trim_text(true);
xml_reader.config_mut().check_end_names = true;
xml_reader.config_mut().expand_empty_elements = false;
Self {
reader: xml_reader,
context: ParsingContext::new(version),
config: StreamingConfig::default(),
buffer: Vec::with_capacity(8192),
start_time: Instant::now(),
bytes_processed: 0,
elements_yielded: 0,
current_memory: 0,
progress_callback: None,
}
}
pub fn with_config(reader: R, version: ERNVersion, config: StreamingConfig) -> Self {
let mut parser = Self::new(reader, version);
let buffer_size = config.buffer_size;
parser.config = config;
parser.buffer.reserve(buffer_size);
parser
}
pub fn with_progress_callback<F>(mut self, callback: F) -> Self
where
F: FnMut(StreamingProgress) + Send + 'static,
{
self.progress_callback = Some(Box::new(callback));
self
}
pub fn parse_next_element(&mut self) -> Result<Option<ParsedElement>, ParseError> {
loop {
self.buffer.clear();
let event = self.reader.read_event_into(&mut self.buffer)?;
match event {
Event::Start(e) | Event::Empty(e) => {
let name_bytes = e.name();
let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
let mut temp_attributes = std::collections::HashMap::new();
for attr_result in e.attributes() {
let attr = attr_result?;
let key = std::str::from_utf8(attr.key.as_ref())?;
let value = std::str::from_utf8(&attr.value)?;
temp_attributes.insert(key.to_string(), value.to_string());
}
self.handle_start_element_by_name_and_attrs(&name, temp_attributes)?;
}
Event::End(e) => {
let name_bytes = e.name();
let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
if let Some(element) = self.handle_end_element_by_name(&name)? {
self.elements_yielded += 1;
self.update_progress();
return Ok(Some(element));
}
}
Event::Text(e) => {
let text = std::str::from_utf8(&e)?;
self.context.add_text(text);
}
Event::CData(e) => {
let text = std::str::from_utf8(&e)?;
self.context.add_text(text);
}
Event::Eof => {
if matches!(self.context.state, ParserState::Complete) {
return Ok(Some(ParsedElement::EndOfStream));
} else {
return Ok(None);
}
}
_ => {
}
}
self.bytes_processed = self.reader.buffer_position();
self.check_security_limits()?;
if self.should_yield_for_memory()? {
continue;
}
self.buffer.clear();
}
}
fn handle_start_element(
&mut self,
element: &quick_xml::events::BytesStart,
) -> Result<(), ParseError> {
let name_bytes = element.name();
let name = std::str::from_utf8(name_bytes.as_ref())?;
self.context.push_element(name);
self.context.attributes.clear();
for attr in element.attributes() {
let attr = attr?;
let key = std::str::from_utf8(attr.key.as_ref())?;
let value = std::str::from_utf8(&attr.value)?;
self.context
.attributes
.insert(key.to_string(), value.to_string());
}
self.context.clear_text_buffer();
match (&self.context.state, name) {
(ParserState::Initial, "ERNMessage") => {
}
(ParserState::Initial, "MessageHeader") => {
self.context.state = ParserState::InHeader {
header: PartialMessageHeader::default(),
depth: self.context.current_depth,
};
}
(ParserState::Initial, "Release") => {
let mut release = PartialRelease::default();
if let Some(reference) = self.context.attributes.get("ReleaseReference") {
release.release_reference = Some(reference.clone());
}
self.context.state = ParserState::InRelease {
release,
depth: self.context.current_depth,
};
}
(ParserState::Initial, "Resource") => {
let mut resource = PartialResource::default();
if let Some(reference) = self.context.attributes.get("ResourceReference") {
resource.resource_reference = Some(reference.clone());
}
self.context.state = ParserState::InResource {
resource,
depth: self.context.current_depth,
};
}
(ParserState::Initial, "Party") => {
let mut party = PartialParty::default();
if let Some(reference) = self.context.attributes.get("PartyReference") {
party.party_reference = Some(reference.clone());
}
self.context.state = ParserState::InParty {
party,
depth: self.context.current_depth,
};
}
(ParserState::Initial, "Deal") => {
let mut deal = PartialDeal::default();
if let Some(reference) = self.context.attributes.get("DealReference") {
deal.deal_reference = Some(reference.clone());
}
self.context.state = ParserState::InDeal {
deal,
depth: self.context.current_depth,
};
}
_ => {
self.handle_nested_start_element(name)?;
}
}
Ok(())
}
fn handle_nested_start_element(&mut self, name: &str) -> Result<(), ParseError> {
match &mut self.context.state {
ParserState::InHeader { .. } => {
match name {
"MessageId" => {
}
"MessageCreatedDateTime" => {
}
"MessageSender" | "MessageRecipient" => {
}
_ => {
}
}
}
ParserState::InRelease { .. } => {
match name {
"ReleaseId" | "ReleaseTitle" | "DisplayArtist" | "Genre" => {
}
_ => {
}
}
}
ParserState::InResource { .. } => {
match name {
"ResourceId" | "Title" | "DisplayArtist" | "Duration" | "Genre" => {
}
_ => {
}
}
}
_ => {
if !matches!(self.context.state, ParserState::Skipping { .. }) {
self.context.state = ParserState::Skipping {
start_depth: self.context.current_depth,
current_depth: self.context.current_depth,
};
}
}
}
Ok(())
}
fn handle_end_element(
&mut self,
element: &quick_xml::events::BytesEnd,
) -> Result<Option<ParsedElement>, ParseError> {
let name_bytes = element.name();
let name = std::str::from_utf8(name_bytes.as_ref())?;
let text_content = self.context.take_text();
let result = match std::mem::take(&mut self.context.state) {
ParserState::InHeader { mut header, depth } => {
let res =
self.handle_header_end_element(name, &text_content, &mut header, depth)?;
self.context.state = ParserState::InHeader { header, depth };
res
}
ParserState::InRelease { mut release, depth } => {
let res =
self.handle_release_end_element(name, &text_content, &mut release, depth)?;
self.context.state = ParserState::InRelease { release, depth };
res
}
ParserState::InResource {
mut resource,
depth,
} => {
let res =
self.handle_resource_end_element(name, &text_content, &mut resource, depth)?;
self.context.state = ParserState::InResource { resource, depth };
res
}
ParserState::InParty { mut party, depth } => {
let res = self.handle_party_end_element(name, &text_content, &mut party, depth)?;
self.context.state = ParserState::InParty { party, depth };
res
}
ParserState::InDeal { mut deal, depth } => {
let res = self.handle_deal_end_element(name, &text_content, &mut deal, depth)?;
self.context.state = ParserState::InDeal { deal, depth };
res
}
ParserState::Skipping {
start_depth,
current_depth: _,
} => {
if self.context.current_depth <= start_depth {
self.context.state = ParserState::Initial;
}
None
}
_ => None,
};
self.context.pop_element();
Ok(result)
}
fn handle_header_end_element(
&mut self,
name: &str,
text_content: &str,
header: &mut PartialMessageHeader,
depth: usize,
) -> Result<Option<ParsedElement>, ParseError> {
match name {
"MessageId" => {
header.message_id = Some(Identifier {
id_type: IdentifierType::Proprietary,
namespace: None,
value: text_content.to_string(),
});
}
"MessageCreatedDateTime" => {
header.message_created_date_time = Some(text_content.to_string());
}
"MessageHeader" if self.context.current_depth == depth => {
let sender = header.sender.take().unwrap_or_else(|| MessageSender {
party_id: vec![],
party_name: vec![LocalizedString {
text: "Unknown".to_string(),
language_code: None,
script: None,
}],
trading_name: None,
attributes: None,
extensions: None,
comments: None,
});
let element = HeaderBuilder::new()
.sender(sender)
.message_id(header.message_id.take().unwrap_or_else(|| Identifier {
id_type: IdentifierType::Proprietary,
namespace: None,
value: "unknown".to_string(),
}))
.created_date_time(header.message_created_date_time.take().unwrap_or_default())
.version(self.context.version)
.build()?;
self.context.state = ParserState::Initial;
return Ok(Some(element));
}
_ => {}
}
Ok(None)
}
fn handle_release_end_element(
&mut self,
name: &str,
text_content: &str,
release: &mut PartialRelease,
depth: usize,
) -> Result<Option<ParsedElement>, ParseError> {
match name {
"ReleaseTitle" => {
release.release_title.push(LocalizedString {
text: text_content.to_string(),
language_code: self.context.attributes.get("LanguageCode").cloned(),
script: None,
});
}
"Genre" => {
release.genre.push(Genre {
genre_text: text_content.to_string(),
sub_genre: None,
attributes: None,
extensions: None,
comments: None,
});
}
"ReleaseDate" => {
}
"Release" if self.context.current_depth == depth => {
if release.is_complete() {
let completed_release = release.clone().into_release();
self.context.state = ParserState::Initial;
return Ok(Some(ParsedElement::Release(completed_release)));
}
}
_ => {}
}
Ok(None)
}
fn handle_resource_end_element(
&mut self,
name: &str,
text_content: &str,
resource: &mut PartialResource,
depth: usize,
) -> Result<Option<ParsedElement>, ParseError> {
match name {
"Title" => {
resource.reference_title.push(LocalizedString {
text: text_content.to_string(),
language_code: self.context.attributes.get("LanguageCode").cloned(),
script: None,
});
}
"Genre" => {
}
"Duration" => {
if let Ok(seconds) = text_content.parse::<u64>() {
resource.duration = Some(std::time::Duration::from_secs(seconds));
}
}
"Resource" if self.context.current_depth == depth => {
if resource.is_complete() {
let completed_resource = resource.clone().into_resource();
self.context.state = ParserState::Initial;
return Ok(Some(ParsedElement::Resource(completed_resource)));
}
}
_ => {}
}
Ok(None)
}
fn handle_party_end_element(
&mut self,
_name: &str,
_text_content: &str,
_party: &mut PartialParty,
_depth: usize,
) -> Result<Option<ParsedElement>, ParseError> {
Ok(None)
}
fn handle_deal_end_element(
&mut self,
_name: &str,
_text_content: &str,
_deal: &mut PartialDeal,
_depth: usize,
) -> Result<Option<ParsedElement>, ParseError> {
Ok(None)
}
fn check_security_limits(&self) -> Result<(), ParseError> {
if self.context.current_depth > self.config.security.max_element_depth {
return Err(ParseError::SecurityViolation {
message: format!(
"Nesting depth {} exceeds maximum {}",
self.context.current_depth, self.config.security.max_element_depth
),
});
}
if self.current_memory > self.config.max_memory {
return Err(ParseError::SecurityViolation {
message: format!(
"Memory usage {} exceeds maximum {}",
self.current_memory, self.config.max_memory
),
});
}
Ok(())
}
fn should_yield_for_memory(&self) -> Result<bool, ParseError> {
Ok(self.current_memory > self.config.max_memory / 2)
}
fn update_progress(&mut self) {
if self.config.enable_progress && self.bytes_processed % self.config.progress_interval == 0
{
if let Some(ref mut callback) = self.progress_callback {
let progress = StreamingProgress {
bytes_processed: self.bytes_processed,
elements_parsed: self.elements_yielded,
releases_parsed: 0, resources_parsed: 0, parties_parsed: 0,
deals_parsed: 0,
elapsed: self.start_time.elapsed(),
estimated_total_bytes: None,
current_depth: self.context.current_depth,
memory_usage: self.current_memory,
};
callback(progress);
}
}
}
fn get_current_location(&self) -> String {
format!("streaming at byte offset {}", self.bytes_processed)
}
}
impl<R: BufRead> std::fmt::Debug for StreamingDDEXParser<R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamingDDEXParser")
.field("bytes_processed", &self.bytes_processed)
.field("elements_yielded", &self.elements_yielded)
.field("current_depth", &self.context.current_depth)
.field("current_memory", &self.current_memory)
.finish()
}
}
impl<R: BufRead> StreamingDDEXParser<R> {
fn handle_start_element_by_name_and_attrs(
&mut self,
name: &str,
attrs: std::collections::HashMap<String, String>,
) -> Result<(), ParseError> {
self.context.push_element(name);
self.context.attributes = attrs;
self.context.clear_text_buffer();
match (&self.context.state, name) {
(ParserState::Initial, "MessageHeader") => {
self.context.state = ParserState::InHeader {
header: crate::streaming::state::PartialMessageHeader::default(),
depth: self.context.current_depth,
};
}
(ParserState::Initial, "Release") => {
let _reference = self
.context
.attributes
.get("ReleaseReference")
.unwrap_or(&"default".to_string())
.clone();
self.context.state = ParserState::InRelease {
release: crate::streaming::state::PartialRelease::default(),
depth: self.context.current_depth,
};
}
_ => {} }
Ok(())
}
fn handle_end_element_by_name(
&mut self,
name: &str,
) -> Result<Option<ParsedElement>, ParseError> {
let _text_content = self.context.take_text();
self.context.pop_element();
match name {
"MessageHeader" => {
if matches!(self.context.state, ParserState::InHeader { .. }) {
self.context.state = ParserState::Initial;
return Ok(None); }
}
"Release" => {
if matches!(self.context.state, ParserState::InRelease { .. }) {
self.context.state = ParserState::Initial;
return Ok(None); }
}
_ => {
}
}
Ok(None)
}
}