#[allow(dead_code)] use crate::error::ParseError;
use crate::streaming::fast_zero_copy::FastZeroCopyParser;
use crate::streaming::{WorkingStreamingElement, WorkingStreamingStats};
use ddex_core::models::versions::ERNVersion;
use std::io::BufRead;
use std::sync::{Arc, Mutex};
use std::time::Instant;
pub struct ParallelStreamingParser {
worker_threads: usize,
chunk_size: usize,
start_time: Instant,
total_bytes_processed: Arc<Mutex<u64>>,
total_elements_found: Arc<Mutex<u64>>,
}
impl ParallelStreamingParser {
pub fn new() -> Self {
Self {
worker_threads: num_cpus::get().max(2), chunk_size: 1024 * 1024, start_time: Instant::now(),
total_bytes_processed: Arc::new(Mutex::new(0)),
total_elements_found: Arc::new(Mutex::new(0)),
}
}
pub fn with_threads(threads: usize) -> Self {
Self {
worker_threads: threads.max(1),
chunk_size: 1024 * 1024,
start_time: Instant::now(),
total_bytes_processed: Arc::new(Mutex::new(0)),
total_elements_found: Arc::new(Mutex::new(0)),
}
}
pub fn parse_parallel(&self, data: &[u8]) -> Result<Vec<WorkingStreamingElement>, ParseError> {
self.parse_single_threaded(data)
}
fn parse_single_threaded(
&self,
data: &[u8],
) -> Result<Vec<WorkingStreamingElement>, ParseError> {
let mut parser = FastZeroCopyParser::new();
let mut elements = parser.parse_chunk(data)?;
{
let mut bytes = self.total_bytes_processed.lock().unwrap();
*bytes += data.len() as u64;
}
{
let mut count = self.total_elements_found.lock().unwrap();
*count += elements.len() as u64;
}
elements.push(WorkingStreamingElement::EndOfStream {
stats: self.get_stats(),
});
Ok(elements)
}
fn find_element_boundaries(&self, data: &[u8]) -> Vec<usize> {
let mut boundaries = vec![0];
let release_end = b"</Release>";
let mut pos = 0;
while let Some(end_pos) = self.find_pattern(&data[pos..], release_end) {
let abs_pos = pos + end_pos + release_end.len();
boundaries.push(abs_pos);
pos = abs_pos;
if boundaries.len() > self.worker_threads * 4 {
break;
}
}
if boundaries.len() < 4 {
let recording_end = b"</SoundRecording>";
pos = 0;
while let Some(end_pos) = self.find_pattern(&data[pos..], recording_end) {
let abs_pos = pos + end_pos + recording_end.len();
if !boundaries.contains(&abs_pos) {
boundaries.push(abs_pos);
}
pos = abs_pos;
if boundaries.len() > self.worker_threads * 2 {
break;
}
}
}
if boundaries.last() != Some(&data.len()) {
boundaries.push(data.len());
}
boundaries.sort_unstable();
boundaries.dedup();
boundaries
}
fn find_pattern(&self, data: &[u8], pattern: &[u8]) -> Option<usize> {
if pattern.is_empty() {
return None;
}
let mut pos = 0;
while let Some(first_byte_pos) = memchr::memchr(pattern[0], &data[pos..]) {
let abs_pos = pos + first_byte_pos;
if abs_pos + pattern.len() <= data.len()
&& &data[abs_pos..abs_pos + pattern.len()] == pattern
{
return Some(abs_pos);
}
pos = abs_pos + 1;
}
None
}
pub fn get_stats(&self) -> WorkingStreamingStats {
let elapsed = self.start_time.elapsed();
let bytes_processed = *self.total_bytes_processed.lock().unwrap();
let elements_found = *self.total_elements_found.lock().unwrap();
let throughput = if elapsed.as_secs_f64() > 0.0 {
(bytes_processed as f64 / (1024.0 * 1024.0)) / elapsed.as_secs_f64()
} else {
0.0
};
WorkingStreamingStats {
bytes_processed,
elements_yielded: elements_found as usize,
current_depth: 0,
max_depth_reached: 10,
current_memory_bytes: self.chunk_size * self.worker_threads,
max_memory_used_bytes: self.chunk_size * self.worker_threads,
elapsed_time: elapsed,
throughput_mb_per_sec: throughput,
}
}
}
impl Default for ParallelStreamingParser {
fn default() -> Self {
Self::new()
}
}
pub struct ParallelStreamingIterator<R: BufRead> {
reader: R,
parser: ParallelStreamingParser,
buffer: Vec<u8>,
finished: bool,
elements_queue: Vec<WorkingStreamingElement>,
current_index: usize,
}
impl<R: BufRead> ParallelStreamingIterator<R> {
pub fn new(mut reader: R, _version: ERNVersion) -> Self {
let mut buffer = Vec::new();
let _ = reader.read_to_end(&mut buffer);
Self {
reader,
parser: ParallelStreamingParser::new(),
buffer,
finished: false,
elements_queue: Vec::new(),
current_index: 0,
}
}
pub fn with_threads(mut reader: R, _version: ERNVersion, threads: usize) -> Self {
let mut buffer = Vec::new();
let _ = reader.read_to_end(&mut buffer);
Self {
reader,
parser: ParallelStreamingParser::with_threads(threads),
buffer,
finished: false,
elements_queue: Vec::new(),
current_index: 0,
}
}
pub fn stats(&self) -> WorkingStreamingStats {
self.parser.get_stats()
}
}
impl<R: BufRead> Iterator for ParallelStreamingIterator<R> {
type Item = Result<WorkingStreamingElement, ParseError>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
if self.elements_queue.is_empty() && self.current_index == 0 {
match self.parser.parse_parallel(&self.buffer) {
Ok(elements) => {
self.elements_queue = elements;
}
Err(e) => {
self.finished = true;
return Some(Err(e));
}
}
}
if self.current_index < self.elements_queue.len() {
let element = self.elements_queue[self.current_index].clone();
self.current_index += 1;
if matches!(element, WorkingStreamingElement::EndOfStream { .. }) {
self.finished = true;
}
Some(Ok(element))
} else {
self.finished = true;
None
}
}
}
pub struct ParallelBenchmark;
impl ParallelBenchmark {
pub fn measure_parallel_speedup(data: &[u8]) -> Result<ParallelBenchmarkResult, ParseError> {
println!("🚀 Measuring Parallel Performance Speedup");
println!("Data size: {:.2} MB", data.len() as f64 / (1024.0 * 1024.0));
let start = Instant::now();
let single_parser = ParallelStreamingParser::with_threads(1);
let single_elements = single_parser.parse_parallel(data)?;
let single_time = start.elapsed();
let mut thread_results = Vec::new();
for threads in [2, 4, 6, 8] {
if threads <= num_cpus::get() {
let start = Instant::now();
let parallel_parser = ParallelStreamingParser::with_threads(threads);
let parallel_elements = parallel_parser.parse_parallel(data)?;
let parallel_time = start.elapsed();
let speedup = single_time.as_secs_f64() / parallel_time.as_secs_f64();
let efficiency = (speedup / threads as f64) * 100.0;
let throughput =
(data.len() as f64 / (1024.0 * 1024.0)) / parallel_time.as_secs_f64();
thread_results.push(ThreadResult {
threads,
time: parallel_time,
speedup,
efficiency,
throughput_mb_per_sec: throughput,
elements_found: parallel_elements.len(),
});
println!(
" {} threads: {:.3}s, {:.1}x speedup, {:.1}% efficiency, {:.1} MB/s",
threads,
parallel_time.as_secs_f64(),
speedup,
efficiency,
throughput
);
assert_eq!(
single_elements.len(),
parallel_elements.len(),
"Element count mismatch: single={}, parallel={}",
single_elements.len(),
parallel_elements.len()
);
}
}
let single_throughput = (data.len() as f64 / (1024.0 * 1024.0)) / single_time.as_secs_f64();
let best_result = thread_results
.iter()
.max_by(|a, b| {
a.throughput_mb_per_sec
.partial_cmp(&b.throughput_mb_per_sec)
.unwrap()
})
.unwrap();
let best_speedup = best_result.speedup;
let best_throughput = best_result.throughput_mb_per_sec;
let target_achieved = best_result.throughput_mb_per_sec >= 280.0;
let result = ParallelBenchmarkResult {
single_threaded_time: single_time,
single_threaded_throughput: single_throughput,
single_threaded_elements: single_elements.len(),
thread_results,
best_speedup,
best_throughput,
target_achieved,
};
println!("\n📊 PARALLEL PERFORMANCE SUMMARY");
println!(
"Single-threaded: {:.1} MB/s",
result.single_threaded_throughput
);
println!(
"Best parallel: {:.1} MB/s ({:.1}x speedup)",
result.best_throughput, result.best_speedup
);
println!(
"Target (280 MB/s): {}",
if result.target_achieved {
"✅ ACHIEVED!"
} else {
"❌ Not achieved"
}
);
if result.target_achieved {
println!("🎉 SUCCESS: 480x performance improvement target achieved with parallel processing!");
}
Ok(result)
}
}
#[derive(Debug, Clone)]
pub struct ThreadResult {
pub threads: usize,
pub time: std::time::Duration,
pub speedup: f64,
pub efficiency: f64,
pub throughput_mb_per_sec: f64,
pub elements_found: usize,
}
#[derive(Debug)]
pub struct ParallelBenchmarkResult {
pub single_threaded_time: std::time::Duration,
pub single_threaded_throughput: f64,
pub single_threaded_elements: usize,
pub thread_results: Vec<ThreadResult>,
pub best_speedup: f64,
pub best_throughput: f64,
pub target_achieved: bool,
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
fn generate_large_ddex_data(target_mb: usize) -> Vec<u8> {
let mut xml = String::from(
r#"<?xml version="1.0" encoding="UTF-8"?>
<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
<MessageHeader>
<MessageId>PARALLEL-BENCH-MSG</MessageId>
<CreatedDateTime>2024-01-01T00:00:00Z</CreatedDateTime>
</MessageHeader>
"#,
);
let target_bytes = target_mb * 1024 * 1024;
let single_release_size = 1200; let num_releases = (target_bytes / single_release_size).max(1000);
for i in 0..num_releases {
xml.push_str(&format!(
r#"
<Release ReleaseReference="PAR-REL-{:08}">
<ReferenceTitle>
<TitleText>Parallel Benchmark Release #{}</TitleText>
<SubTitle>Multi-core Performance Test Release</SubTitle>
</ReferenceTitle>
<Genre>
<GenreText>Electronic</GenreText>
<SubGenre>Ambient</SubGenre>
</Genre>
<PLine>
<Year>2024</Year>
<PLineText>℗ 2024 Parallel Performance Label</PLineText>
</PLine>
<ReleaseLabelReference>PAR-LBL-{:03}</ReleaseLabelReference>
</Release>
"#,
i,
i,
i % 100
));
for j in 0..4 {
xml.push_str(&format!(
r#"
<SoundRecording ResourceReference="PAR-RES-{:08}-{:02}">
<ResourceId>
<ISRC>PARLL{:08}</ISRC>
</ResourceId>
<ReferenceTitle>
<TitleText>Parallel Track {} from Release {}</TitleText>
</ReferenceTitle>
<Duration>PT{}M{}S</Duration>
<CreationDate>2024-01-01</CreationDate>
<LanguageOfPerformance>en</LanguageOfPerformance>
<ResourceContributor>
<PartyId namespace="IPI">PAR{:08}</PartyId>
<PartyName>Parallel Artist {}</PartyName>
<ContributorRole>MainArtist</ContributorRole>
</ResourceContributor>
</SoundRecording>
"#,
i,
j,
i * 10 + j,
j + 1,
i,
(j + 3) % 8,
(i + j + 30) % 60,
i,
i % 1000
));
}
if i % 1000 == 0 && i > 0 {
let current_size = xml.len() as f64 / (1024.0 * 1024.0);
println!("Generated {:.1}MB with {} releases", current_size, i);
if current_size >= target_mb as f64 {
break;
}
}
}
xml.push_str("</ern:NewReleaseMessage>");
xml.into_bytes()
}
#[test]
fn test_parallel_basic_functionality() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
<MessageHeader>
<MessageId>PAR-TEST-MSG</MessageId>
<CreatedDateTime>2024-01-01T00:00:00Z</CreatedDateTime>
</MessageHeader>
<Release ReleaseReference="PAR-REL-001">
<ReferenceTitle>
<TitleText>Parallel Test Release</TitleText>
</ReferenceTitle>
</Release>
</ern:NewReleaseMessage>"#;
let cursor = Cursor::new(xml.as_bytes());
let mut iterator = ParallelStreamingIterator::new(cursor, ERNVersion::V4_3);
let elements: Result<Vec<_>, _> = iterator.collect();
assert!(elements.is_ok(), "Parallel parsing should work");
let elements = elements.unwrap();
assert!(!elements.is_empty(), "Should find elements");
let has_header = elements
.iter()
.any(|e| matches!(e, WorkingStreamingElement::MessageHeader { .. }));
let has_release = elements
.iter()
.any(|e| matches!(e, WorkingStreamingElement::Release { .. }));
assert!(has_header, "Should find message header");
assert!(has_release, "Should find release");
println!("✅ Parallel parser basic test passed!");
}
#[test]
fn test_parallel_speedup_measurement() {
let data = generate_large_ddex_data(50);
let result = ParallelBenchmark::measure_parallel_speedup(&data).unwrap();
assert!(result.best_speedup > 1.0, "Should have some speedup");
assert!(
result.best_throughput > result.single_threaded_throughput,
"Parallel should be faster"
);
if result.target_achieved {
println!("🎉 TARGET ACHIEVED: {} MB/s", result.best_throughput);
} else {
println!(
"⚠️ Target not achieved: {} MB/s (need 280 MB/s)",
result.best_throughput
);
}
}
#[test]
fn test_element_boundary_detection() {
let parser = ParallelStreamingParser::new();
let xml = b"<Release>content</Release><Release>more</Release>";
let boundaries = parser.find_element_boundaries(xml);
println!("Boundaries: {:?}", boundaries);
assert!(boundaries.len() >= 2, "Should find boundaries");
assert_eq!(boundaries[0], 0, "Should start at 0");
assert_eq!(
boundaries[boundaries.len() - 1],
xml.len(),
"Should end at data length"
);
}
#[test]
fn test_thread_scaling() {
if num_cpus::get() < 4 {
println!("Skipping thread scaling test - need at least 4 cores");
return;
}
let data = generate_large_ddex_data(100);
println!("Testing thread scaling with 100MB data:");
for threads in [1, 2, 4, 8] {
if threads <= num_cpus::get() {
let start = Instant::now();
let parser = ParallelStreamingParser::with_threads(threads);
let elements = parser.parse_parallel(&data).unwrap();
let elapsed = start.elapsed();
let throughput = (data.len() as f64 / (1024.0 * 1024.0)) / elapsed.as_secs_f64();
println!(
" {} threads: {:.1} MB/s ({} elements)",
threads,
throughput,
elements.len()
);
}
}
}
}