use crate::error::ParseError;
use crate::streaming::{WorkingStreamingElement, WorkingStreamingStats};
use ddex_core::models::versions::ERNVersion;
use std::collections::HashMap;
use std::io::BufRead;
use std::time::Instant;
pub struct FastZeroCopyParser {
read_buffer: Vec<u8>,
leftover: Vec<u8>,
string_cache: HashMap<Vec<u8>, String>,
bytes_processed: u64,
elements_found: u64,
start_time: Instant,
}
impl FastZeroCopyParser {
pub fn new() -> Self {
Self {
read_buffer: vec![0; 64 * 1024], leftover: Vec::new(),
string_cache: HashMap::with_capacity(512),
bytes_processed: 0,
elements_found: 0,
start_time: Instant::now(),
}
}
fn intern_string(&mut self, bytes: &[u8]) -> String {
if let Some(cached) = self.string_cache.get(bytes) {
cached.clone()
} else {
let s = String::from_utf8_lossy(bytes).to_string();
self.string_cache.insert(bytes.to_vec(), s.clone());
s
}
}
pub fn parse_chunk(
&mut self,
chunk: &[u8],
) -> Result<Vec<WorkingStreamingElement>, ParseError> {
self.bytes_processed += chunk.len() as u64;
let mut results = Vec::new();
let mut data = Vec::with_capacity(self.leftover.len() + chunk.len());
data.extend_from_slice(&self.leftover);
data.extend_from_slice(chunk);
let mut pos = 0;
while let Some(start) = self.find_pattern(&data[pos..], b"<MessageHeader") {
let abs_start = pos + start;
if let Some(element) = self.extract_message_header_fast(&data, abs_start)? {
results.push(element);
self.elements_found += 1;
}
pos = abs_start + 14; }
pos = 0;
while let Some(start) = self.find_pattern(&data[pos..], b"<Release ") {
let abs_start = pos + start;
if let Some(element) = self.extract_release_fast(&data, abs_start)? {
results.push(element);
self.elements_found += 1;
}
pos = abs_start + 9; }
pos = 0;
while let Some(start) = self.find_pattern(&data[pos..], b"<SoundRecording ") {
let abs_start = pos + start;
if let Some(element) = self.extract_sound_recording_fast(&data, abs_start)? {
results.push(element);
self.elements_found += 1;
}
pos = abs_start + 16; }
if data.len() > 2048 {
self.leftover.clear();
self.leftover.extend_from_slice(&data[data.len() - 2048..]);
} else {
self.leftover = data;
}
Ok(results)
}
fn find_pattern(&self, data: &[u8], pattern: &[u8]) -> Option<usize> {
if pattern.is_empty() {
return None;
}
let mut pos = 0;
while let Some(first_byte_pos) = memchr::memchr(pattern[0], &data[pos..]) {
let abs_pos = pos + first_byte_pos;
if abs_pos + pattern.len() <= data.len()
&& &data[abs_pos..abs_pos + pattern.len()] == pattern
{
return Some(abs_pos);
}
pos = abs_pos + 1;
}
None
}
fn extract_message_header_fast(
&mut self,
data: &[u8],
start: usize,
) -> Result<Option<WorkingStreamingElement>, ParseError> {
if let Some(end) = self.find_pattern(&data[start..], b"</MessageHeader>") {
let header_data = &data[start..start + end + 16];
let message_id = if let Some(id) = self.extract_tag_content(header_data, b"MessageId") {
self.intern_string(id)
} else {
"unknown".to_string()
};
let created_date_time =
if let Some(dt) = self.extract_tag_content(header_data, b"CreatedDateTime") {
self.intern_string(dt)
} else {
chrono::Utc::now().to_rfc3339()
};
return Ok(Some(WorkingStreamingElement::MessageHeader {
message_id,
created_date_time,
version: ERNVersion::V4_3,
}));
}
Ok(None)
}
fn extract_release_fast(
&mut self,
data: &[u8],
start: usize,
) -> Result<Option<WorkingStreamingElement>, ParseError> {
if let Some(end) = self.find_pattern(&data[start..], b"</Release>") {
let release_data = &data[start..start + end + 10];
let reference = if let Some(attr) =
self.extract_attribute_fast(release_data, b"ReleaseReference")
{
self.intern_string(attr)
} else {
format!("REL-{}", self.elements_found)
};
let title =
if let Some(title_data) = self.extract_tag_content(release_data, b"TitleText") {
self.intern_string(title_data)
} else {
"Untitled Release".to_string()
};
let resource_references = self.extract_resource_references_fast(release_data);
return Ok(Some(WorkingStreamingElement::Release {
reference,
title,
resource_references,
}));
}
Ok(None)
}
fn extract_sound_recording_fast(
&mut self,
data: &[u8],
start: usize,
) -> Result<Option<WorkingStreamingElement>, ParseError> {
if let Some(end) = self.find_pattern(&data[start..], b"</SoundRecording>") {
let recording_data = &data[start..start + end + 17];
let reference = if let Some(attr) =
self.extract_attribute_fast(recording_data, b"ResourceReference")
{
self.intern_string(attr)
} else {
format!("RES-{}", self.elements_found)
};
let title =
if let Some(title_data) = self.extract_tag_content(recording_data, b"TitleText") {
self.intern_string(title_data)
} else {
"Untitled Track".to_string()
};
let duration = self
.extract_tag_content(recording_data, b"Duration")
.map(|d| self.intern_string(d));
let isrc = self
.extract_tag_content(recording_data, b"ISRC")
.map(|i| self.intern_string(i));
return Ok(Some(WorkingStreamingElement::SoundRecording {
reference,
title,
duration,
isrc,
}));
}
Ok(None)
}
fn extract_tag_content<'a>(&self, data: &'a [u8], tag_name: &[u8]) -> Option<&'a [u8]> {
let opening = [b"<", tag_name, b">"].concat();
let closing = [b"</", tag_name, b">"].concat();
if let Some(start_pos) = self.find_pattern(data, &opening) {
let content_start = start_pos + opening.len();
if let Some(end_pos) = self.find_pattern(&data[content_start..], &closing) {
let content_end = content_start + end_pos;
return Some(&data[content_start..content_end]);
}
}
None
}
fn extract_attribute_fast<'a>(&self, data: &'a [u8], attr_name: &[u8]) -> Option<&'a [u8]> {
let pattern = [attr_name, b"=\""].concat();
if let Some(start_pos) = self.find_pattern(data, &pattern) {
let value_start = start_pos + pattern.len();
if let Some(quote_pos) = memchr::memchr(b'"', &data[value_start..]) {
let value_end = value_start + quote_pos;
return Some(&data[value_start..value_end]);
}
}
None
}
fn extract_resource_references_fast(&mut self, data: &[u8]) -> Vec<String> {
let mut refs = Vec::new();
let mut pos = 0;
while let Some(start) = self.find_pattern(&data[pos..], b"<ResourceReference>") {
let abs_start = pos + start;
if let Some(content) =
self.extract_tag_content(&data[abs_start..], b"ResourceReference")
{
refs.push(self.intern_string(content));
}
pos = abs_start + 19; }
refs
}
pub fn stats(&self) -> WorkingStreamingStats {
let elapsed = self.start_time.elapsed();
let throughput = if elapsed.as_secs_f64() > 0.0 {
(self.bytes_processed as f64 / (1024.0 * 1024.0)) / elapsed.as_secs_f64()
} else {
0.0
};
WorkingStreamingStats {
bytes_processed: self.bytes_processed,
elements_yielded: self.elements_found as usize,
current_depth: 0,
max_depth_reached: 10,
current_memory_bytes: self.read_buffer.capacity() + self.leftover.capacity(),
max_memory_used_bytes: self.read_buffer.capacity() + self.leftover.capacity(),
elapsed_time: elapsed,
throughput_mb_per_sec: throughput,
}
}
}
impl Default for FastZeroCopyParser {
fn default() -> Self {
Self::new()
}
}
pub struct FastZeroCopyIterator<R: BufRead> {
reader: R,
parser: FastZeroCopyParser,
buffer: Vec<u8>,
finished: bool,
elements_queue: Vec<WorkingStreamingElement>,
current_index: usize,
}
impl<R: BufRead> FastZeroCopyIterator<R> {
pub fn new(reader: R, _version: ERNVersion) -> Self {
Self {
reader,
parser: FastZeroCopyParser::new(),
buffer: vec![0; 64 * 1024], finished: false,
elements_queue: Vec::new(),
current_index: 0,
}
}
pub fn stats(&self) -> WorkingStreamingStats {
self.parser.stats()
}
fn read_next_chunk(&mut self) -> Result<bool, ParseError> {
let bytes_read = self.reader.read(&mut self.buffer)?;
if bytes_read == 0 {
return Ok(false); }
let elements = self.parser.parse_chunk(&self.buffer[..bytes_read])?;
self.elements_queue.extend(elements);
Ok(true)
}
}
impl<R: BufRead> Iterator for FastZeroCopyIterator<R> {
type Item = Result<WorkingStreamingElement, ParseError>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
if self.current_index < self.elements_queue.len() {
let element = self.elements_queue[self.current_index].clone();
self.current_index += 1;
return Some(Ok(element));
}
match self.read_next_chunk() {
Ok(true) => {
self.next()
}
Ok(false) => {
self.finished = true;
Some(Ok(WorkingStreamingElement::EndOfStream {
stats: self.parser.stats(),
}))
}
Err(e) => {
self.finished = true;
Some(Err(e))
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_fast_zero_copy_basic() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
<MessageHeader>
<MessageId>FAST-TEST-MSG</MessageId>
<CreatedDateTime>2023-01-01T00:00:00Z</CreatedDateTime>
</MessageHeader>
<Release ReleaseReference="FAST-REL-001">
<ReferenceTitle>
<TitleText>Fast Zero Copy Release</TitleText>
</ReferenceTitle>
</Release>
</ern:NewReleaseMessage>"#;
let cursor = Cursor::new(xml.as_bytes());
let mut iterator = FastZeroCopyIterator::new(cursor, ERNVersion::V4_3);
let elements: Result<Vec<_>, _> = iterator.collect();
assert!(elements.is_ok(), "Fast zero-copy parsing should work");
let elements = elements.unwrap();
assert!(!elements.is_empty(), "Should find elements");
let has_header = elements
.iter()
.any(|e| matches!(e, WorkingStreamingElement::MessageHeader { .. }));
let has_release = elements
.iter()
.any(|e| matches!(e, WorkingStreamingElement::Release { .. }));
let has_end_stream = elements
.iter()
.any(|e| matches!(e, WorkingStreamingElement::EndOfStream { .. }));
assert!(has_header, "Should find message header");
assert!(has_release, "Should find release");
assert!(has_end_stream, "Should find end of stream");
println!("✅ Fast zero-copy parser basic test passed!");
}
#[test]
fn test_fast_pattern_matching() {
let parser = FastZeroCopyParser::new();
let data = b"<Release><MessageHeader><SoundRecording>";
assert_eq!(parser.find_pattern(data, b"<Release>"), Some(0));
assert_eq!(parser.find_pattern(data, b"<MessageHeader>"), Some(9));
assert_eq!(parser.find_pattern(data, b"<SoundRecording>"), Some(24));
assert_eq!(parser.find_pattern(data, b"<NotFound>"), None);
}
#[test]
fn test_tag_content_extraction() {
let parser = FastZeroCopyParser::new();
let data = b"<Title>Test Title</Title>";
let content = parser.extract_tag_content(data, b"Title").unwrap();
assert_eq!(content, b"Test Title");
}
#[test]
fn test_attribute_extraction() {
let parser = FastZeroCopyParser::new();
let data = b"<Release ReleaseReference=\"REL-123\">";
let attr_value = parser
.extract_attribute_fast(data, b"ReleaseReference")
.unwrap();
assert_eq!(attr_value, b"REL-123");
}
}