// src/streaming/parser.rs
//! Core streaming DDEX parser implementation
use super::{StreamingConfig, StreamingProgress, ParsedElement};
use super::state::{ParsingContext, ParserState, PartialRelease, PartialResource, PartialParty, PartialDeal, PartialMessageHeader};
use super::element::HeaderBuilder;
use crate::error::{ParseError, ErrorLocation};
use ddex_core::models::{graph::*, versions::ERNVersion};
use ddex_core::models::{Identifier, LocalizedString, IdentifierType};
use quick_xml::{Reader, events::Event};
use std::io::BufRead;
use std::time::Instant;
use std::collections::HashMap;
/// High-performance streaming DDEX parser
pub struct StreamingDDEXParser<R: BufRead> {
reader: Reader<R>,
pub(crate) context: ParsingContext,
config: StreamingConfig,
buffer: Vec<u8>,
pub(crate) start_time: Instant,
pub(crate) bytes_processed: u64,
pub(crate) elements_yielded: usize,
pub(crate) current_memory: usize,
progress_callback: Option<Box<dyn FnMut(StreamingProgress) + Send>>,
}
impl<R: BufRead> StreamingDDEXParser<R> {
/// Create new streaming parser
pub fn new(reader: R, version: ERNVersion) -> Self {
let mut xml_reader = Reader::from_reader(reader);
xml_reader.config_mut().trim_text(true);
xml_reader.config_mut().check_end_names = true;
xml_reader.config_mut().expand_empty_elements = false;
Self {
reader: xml_reader,
context: ParsingContext::new(version),
config: StreamingConfig::default(),
buffer: Vec::with_capacity(8192),
start_time: Instant::now(),
bytes_processed: 0,
elements_yielded: 0,
current_memory: 0,
progress_callback: None,
}
}
/// Create with custom configuration
pub fn with_config(reader: R, version: ERNVersion, config: StreamingConfig) -> Self {
let mut parser = Self::new(reader, version);
let buffer_size = config.buffer_size;
parser.config = config;
parser.buffer.reserve(buffer_size);
parser
}
/// Set progress callback
pub fn with_progress_callback<F>(mut self, callback: F) -> Self
where
F: FnMut(StreamingProgress) + Send + 'static
{
self.progress_callback = Some(Box::new(callback));
self
}
/// Parse next element from stream
pub fn parse_next_element(&mut self) -> Result<Option<ParsedElement>, ParseError> {
loop {
self.buffer.clear();
let event = self.reader.read_event_into(&mut self.buffer)?;
match event {
Event::Start(e) | Event::Empty(e) => {
self.handle_start_element(&e)?;
}
Event::End(e) => {
if let Some(element) = self.handle_end_element(&e)? {
self.elements_yielded += 1;
self.update_progress();
return Ok(Some(element));
}
}
Event::Text(e) => {
let text = std::str::from_utf8(&e)?;
self.context.add_text(text);
}
Event::CData(e) => {
let text = std::str::from_utf8(&e)?;
self.context.add_text(text);
}
Event::Eof => {
if matches!(self.context.state, ParserState::Complete) {
return Ok(Some(ParsedElement::EndOfStream));
} else {
return Ok(None);
}
}
_ => {
// Skip other events (comments, processing instructions, etc.)
}
}
self.bytes_processed = self.reader.buffer_position();
// Check security limits
self.check_security_limits()?;
// Check memory limits and yield if necessary
if self.should_yield_for_memory()? {
continue;
}
self.buffer.clear();
}
}
/// Handle start element event
fn handle_start_element(&mut self, element: &quick_xml::events::BytesStart) -> Result<(), ParseError> {
let name_bytes = element.name();
let name = std::str::from_utf8(name_bytes.as_ref())?;
self.context.push_element(name);
// Extract attributes
self.context.attributes.clear();
for attr in element.attributes() {
let attr = attr?;
let key = std::str::from_utf8(attr.key.as_ref())?;
let value = std::str::from_utf8(&attr.value)?;
self.context.attributes.insert(key.to_string(), value.to_string());
}
self.context.clear_text_buffer();
// State machine transitions
match (&self.context.state, name) {
(ParserState::Initial, "ERNMessage") => {
// Root element - stay in initial state
}
(ParserState::Initial, "MessageHeader") => {
self.context.state = ParserState::InHeader {
header: PartialMessageHeader::default(),
depth: self.context.current_depth,
};
}
(ParserState::Initial, "Release") => {
let mut release = PartialRelease::default();
if let Some(reference) = self.context.attributes.get("ReleaseReference") {
release.release_reference = Some(reference.clone());
}
self.context.state = ParserState::InRelease {
release,
depth: self.context.current_depth,
};
}
(ParserState::Initial, "Resource") => {
let mut resource = PartialResource::default();
if let Some(reference) = self.context.attributes.get("ResourceReference") {
resource.resource_reference = Some(reference.clone());
}
self.context.state = ParserState::InResource {
resource,
depth: self.context.current_depth,
};
}
(ParserState::Initial, "Party") => {
let mut party = PartialParty::default();
if let Some(reference) = self.context.attributes.get("PartyReference") {
party.party_reference = Some(reference.clone());
}
self.context.state = ParserState::InParty {
party,
depth: self.context.current_depth,
};
}
(ParserState::Initial, "Deal") => {
let mut deal = PartialDeal::default();
if let Some(reference) = self.context.attributes.get("DealReference") {
deal.deal_reference = Some(reference.clone());
}
self.context.state = ParserState::InDeal {
deal,
depth: self.context.current_depth,
};
}
_ => {
// Handle nested elements within current state
self.handle_nested_start_element(name)?;
}
}
Ok(())
}
/// Handle nested start elements within current parsing state
fn handle_nested_start_element(&mut self, name: &str) -> Result<(), ParseError> {
match &mut self.context.state {
ParserState::InHeader { header, .. } => {
// Handle header nested elements
match name {
"MessageId" => {
// Will be handled in text content
}
"MessageCreatedDateTime" => {
// Will be handled in text content
}
"MessageSender" | "MessageRecipient" => {
// Handle sender/recipient nested structure
}
_ => {
// Skip unknown header elements
}
}
}
ParserState::InRelease { release, .. } => {
// Handle release nested elements
match name {
"ReleaseId" | "ReleaseTitle" | "DisplayArtist" | "Genre" => {
// These will be handled when we encounter the end element
}
_ => {
// Skip or handle other release elements
}
}
}
ParserState::InResource { resource, .. } => {
// Handle resource nested elements
match name {
"ResourceId" | "Title" | "DisplayArtist" | "Duration" | "Genre" => {
// These will be handled when we encounter the end element
}
_ => {
// Skip or handle other resource elements
}
}
}
_ => {
// For other states, we might want to start skipping
if !matches!(self.context.state, ParserState::Skipping { .. }) {
self.context.state = ParserState::Skipping {
start_depth: self.context.current_depth,
current_depth: self.context.current_depth,
};
}
}
}
Ok(())
}
/// Handle end element event
fn handle_end_element(&mut self, element: &quick_xml::events::BytesEnd) -> Result<Option<ParsedElement>, ParseError> {
let name_bytes = element.name();
let name = std::str::from_utf8(name_bytes.as_ref())?;
let text_content = self.context.take_text();
// Handle end element based on current state
let result = match std::mem::take(&mut self.context.state) {
ParserState::InHeader { mut header, depth } => {
let res = self.handle_header_end_element(name, &text_content, &mut header, depth)?;
self.context.state = ParserState::InHeader { header, depth };
res
}
ParserState::InRelease { mut release, depth } => {
let res = self.handle_release_end_element(name, &text_content, &mut release, depth)?;
self.context.state = ParserState::InRelease { release, depth };
res
}
ParserState::InResource { mut resource, depth } => {
let res = self.handle_resource_end_element(name, &text_content, &mut resource, depth)?;
self.context.state = ParserState::InResource { resource, depth };
res
}
ParserState::InParty { mut party, depth } => {
let res = self.handle_party_end_element(name, &text_content, &mut party, depth)?;
self.context.state = ParserState::InParty { party, depth };
res
}
ParserState::InDeal { mut deal, depth } => {
let res = self.handle_deal_end_element(name, &text_content, &mut deal, depth)?;
self.context.state = ParserState::InDeal { deal, depth };
res
}
ParserState::Skipping { start_depth, current_depth } => {
if self.context.current_depth <= start_depth {
self.context.state = ParserState::Initial;
}
None
}
_ => None,
};
self.context.pop_element();
Ok(result)
}
/// Handle header end element
fn handle_header_end_element(
&mut self,
name: &str,
text_content: &str,
header: &mut PartialMessageHeader,
depth: usize,
) -> Result<Option<ParsedElement>, ParseError> {
match name {
"MessageId" => {
header.message_id = Some(Identifier {
id_type: IdentifierType::Proprietary,
namespace: None,
value: text_content.to_string(),
});
}
"MessageCreatedDateTime" => {
header.message_created_date_time = Some(text_content.to_string());
}
"MessageHeader" if self.context.current_depth == depth => {
// Complete header - create element
let sender = header.sender.take().unwrap_or_else(|| MessageSender {
party_id: vec![],
party_name: vec![LocalizedString {
text: "Unknown".to_string(),
language_code: None,
script: None,
// territory field removed from LocalizedString
}],
trading_name: None,
attributes: None,
extensions: None,
comments: None,
});
let element = HeaderBuilder::new()
.sender(sender)
.message_id(header.message_id.take().unwrap_or_else(|| Identifier {
id_type: IdentifierType::Proprietary,
namespace: None,
value: "unknown".to_string(),
}))
.created_date_time(header.message_created_date_time.take().unwrap_or_default())
.version(self.context.version)
.build()
?;
self.context.state = ParserState::Initial;
return Ok(Some(element));
}
_ => {}
}
Ok(None)
}
/// Handle release end element
fn handle_release_end_element(
&mut self,
name: &str,
text_content: &str,
release: &mut PartialRelease,
depth: usize,
) -> Result<Option<ParsedElement>, ParseError> {
match name {
"ReleaseTitle" => {
release.release_title.push(LocalizedString {
text: text_content.to_string(),
language_code: self.context.attributes.get("LanguageCode").cloned(),
script: None,
});
}
"Genre" => {
release.genre.push(Genre {
genre_text: text_content.to_string(),
sub_genre: None,
attributes: None,
extensions: None,
comments: None,
});
}
"ReleaseDate" => {
// TODO: Parse date properly into ReleaseEvent
// For now, we'll skip complex date parsing
}
"Release" if self.context.current_depth == depth => {
// Complete release
if release.is_complete() {
let completed_release = release.clone().into_release();
self.context.state = ParserState::Initial;
return Ok(Some(ParsedElement::Release(completed_release)));
}
}
_ => {}
}
Ok(None)
}
/// Handle resource end element
fn handle_resource_end_element(
&mut self,
name: &str,
text_content: &str,
resource: &mut PartialResource,
depth: usize,
) -> Result<Option<ParsedElement>, ParseError> {
match name {
"Title" => {
resource.reference_title.push(LocalizedString {
text: text_content.to_string(),
language_code: self.context.attributes.get("LanguageCode").cloned(),
script: None,
});
}
"Genre" => {
// Note: Resource in current model doesn't have genre field
// This is a TODO for proper resource parsing
}
"Duration" => {
if let Ok(seconds) = text_content.parse::<u64>() {
resource.duration = Some(std::time::Duration::from_secs(seconds));
}
}
"Resource" if self.context.current_depth == depth => {
// Complete resource
if resource.is_complete() {
let completed_resource = resource.clone().into_resource();
self.context.state = ParserState::Initial;
return Ok(Some(ParsedElement::Resource(completed_resource)));
}
}
_ => {}
}
Ok(None)
}
/// Handle party end element (placeholder)
fn handle_party_end_element(
&mut self,
_name: &str,
_text_content: &str,
_party: &mut PartialParty,
_depth: usize,
) -> Result<Option<ParsedElement>, ParseError> {
// TODO: Implement party parsing
Ok(None)
}
/// Handle deal end element (placeholder)
fn handle_deal_end_element(
&mut self,
_name: &str,
_text_content: &str,
_deal: &mut PartialDeal,
_depth: usize,
) -> Result<Option<ParsedElement>, ParseError> {
// TODO: Implement deal parsing
Ok(None)
}
/// Check security limits
fn check_security_limits(&self) -> Result<(), ParseError> {
// Check nesting depth
if self.context.current_depth > self.config.security.max_element_depth {
return Err(ParseError::SecurityViolation {
message: format!(
"Nesting depth {} exceeds maximum {}",
self.context.current_depth,
self.config.security.max_element_depth
),
});
}
// Check memory usage
if self.current_memory > self.config.max_memory {
return Err(ParseError::SecurityViolation {
message: format!(
"Nesting depth {} exceeds maximum {}",
self.current_memory,
self.config.max_memory
),
});
}
Ok(())
}
/// Check if we should yield for memory management
fn should_yield_for_memory(&self) -> Result<bool, ParseError> {
// Simple memory pressure check
Ok(self.current_memory > self.config.max_memory / 2)
}
/// Update progress and call callback if configured
fn update_progress(&mut self) {
if self.config.enable_progress
&& self.bytes_processed % self.config.progress_interval == 0
{
if let Some(ref mut callback) = self.progress_callback {
let progress = StreamingProgress {
bytes_processed: self.bytes_processed,
elements_parsed: self.elements_yielded,
releases_parsed: 0, // TODO: Track separately
resources_parsed: 0, // TODO: Track separately
parties_parsed: 0,
deals_parsed: 0,
elapsed: self.start_time.elapsed(),
estimated_total_bytes: None,
current_depth: self.context.current_depth,
memory_usage: self.current_memory,
};
callback(progress);
}
}
}
/// Get current location for error reporting
fn get_current_location(&self) -> ErrorLocation {
ErrorLocation {
line: 0, // TODO: Track line numbers
column: 0, // TODO: Track column numbers
byte_offset: Some(self.bytes_processed as usize),
path: "streaming".to_string(),
}
}
}
impl<R: BufRead> std::fmt::Debug for StreamingDDEXParser<R> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamingDDEXParser")
.field("bytes_processed", &self.bytes_processed)
.field("elements_yielded", &self.elements_yielded)
.field("current_depth", &self.context.current_depth)
.field("current_memory", &self.current_memory)
.finish()
}
}