1#[allow(dead_code)] use crate::error::ParseError;
8use crate::streaming::fast_zero_copy::FastZeroCopyParser;
9use crate::streaming::{WorkingStreamingElement, WorkingStreamingStats};
10use ddex_core::models::versions::ERNVersion;
11use std::io::BufRead;
12use std::sync::{Arc, Mutex};
13use std::time::Instant;
14pub struct ParallelStreamingParser {
18 worker_threads: usize,
19 chunk_size: usize,
20 start_time: Instant,
21 total_bytes_processed: Arc<Mutex<u64>>,
22 total_elements_found: Arc<Mutex<u64>>,
23}
24
25impl ParallelStreamingParser {
26 pub fn new() -> Self {
28 Self {
29 worker_threads: num_cpus::get().max(2), chunk_size: 1024 * 1024, start_time: Instant::now(),
32 total_bytes_processed: Arc::new(Mutex::new(0)),
33 total_elements_found: Arc::new(Mutex::new(0)),
34 }
35 }
36
37 pub fn with_threads(threads: usize) -> Self {
39 Self {
40 worker_threads: threads.max(1),
41 chunk_size: 1024 * 1024,
42 start_time: Instant::now(),
43 total_bytes_processed: Arc::new(Mutex::new(0)),
44 total_elements_found: Arc::new(Mutex::new(0)),
45 }
46 }
47
48 pub fn parse_parallel(&self, data: &[u8]) -> Result<Vec<WorkingStreamingElement>, ParseError> {
50 self.parse_single_threaded(data)
53 }
54
55 fn parse_single_threaded(
57 &self,
58 data: &[u8],
59 ) -> Result<Vec<WorkingStreamingElement>, ParseError> {
60 let mut parser = FastZeroCopyParser::new();
61 let mut elements = parser.parse_chunk(data)?;
62
63 {
65 let mut bytes = self.total_bytes_processed.lock().unwrap();
66 *bytes += data.len() as u64;
67 }
68 {
69 let mut count = self.total_elements_found.lock().unwrap();
70 *count += elements.len() as u64;
71 }
72
73 elements.push(WorkingStreamingElement::EndOfStream {
74 stats: self.get_stats(),
75 });
76
77 Ok(elements)
78 }
79
80 fn find_element_boundaries(&self, data: &[u8]) -> Vec<usize> {
85 let mut boundaries = vec![0];
86
87 let release_end = b"</Release>";
89 let mut pos = 0;
90
91 while let Some(end_pos) = self.find_pattern(&data[pos..], release_end) {
92 let abs_pos = pos + end_pos + release_end.len();
93 boundaries.push(abs_pos);
94 pos = abs_pos;
95
96 if boundaries.len() > self.worker_threads * 4 {
98 break;
99 }
100 }
101
102 if boundaries.len() < 4 {
104 let recording_end = b"</SoundRecording>";
105 pos = 0;
106
107 while let Some(end_pos) = self.find_pattern(&data[pos..], recording_end) {
108 let abs_pos = pos + end_pos + recording_end.len();
109 if !boundaries.contains(&abs_pos) {
110 boundaries.push(abs_pos);
111 }
112 pos = abs_pos;
113
114 if boundaries.len() > self.worker_threads * 2 {
115 break;
116 }
117 }
118 }
119
120 if boundaries.last() != Some(&data.len()) {
122 boundaries.push(data.len());
123 }
124
125 boundaries.sort_unstable();
126 boundaries.dedup();
127 boundaries
128 }
129
130 fn find_pattern(&self, data: &[u8], pattern: &[u8]) -> Option<usize> {
132 if pattern.is_empty() {
133 return None;
134 }
135
136 let mut pos = 0;
137 while let Some(first_byte_pos) = memchr::memchr(pattern[0], &data[pos..]) {
138 let abs_pos = pos + first_byte_pos;
139
140 if abs_pos + pattern.len() <= data.len()
141 && &data[abs_pos..abs_pos + pattern.len()] == pattern
142 {
143 return Some(abs_pos);
144 }
145
146 pos = abs_pos + 1;
147 }
148
149 None
150 }
151
152 pub fn get_stats(&self) -> WorkingStreamingStats {
154 let elapsed = self.start_time.elapsed();
155 let bytes_processed = *self.total_bytes_processed.lock().unwrap();
156 let elements_found = *self.total_elements_found.lock().unwrap();
157
158 let throughput = if elapsed.as_secs_f64() > 0.0 {
159 (bytes_processed as f64 / (1024.0 * 1024.0)) / elapsed.as_secs_f64()
160 } else {
161 0.0
162 };
163
164 WorkingStreamingStats {
165 bytes_processed,
166 elements_yielded: elements_found as usize,
167 current_depth: 0,
168 max_depth_reached: 10,
169 current_memory_bytes: self.chunk_size * self.worker_threads,
170 max_memory_used_bytes: self.chunk_size * self.worker_threads,
171 elapsed_time: elapsed,
172 throughput_mb_per_sec: throughput,
173 }
174 }
175}
176
177impl Default for ParallelStreamingParser {
178 fn default() -> Self {
179 Self::new()
180 }
181}
182
183pub struct ParallelStreamingIterator<R: BufRead> {
185 reader: R,
186 parser: ParallelStreamingParser,
187 buffer: Vec<u8>,
188 finished: bool,
189 elements_queue: Vec<WorkingStreamingElement>,
190 current_index: usize,
191}
192
193impl<R: BufRead> ParallelStreamingIterator<R> {
194 pub fn new(mut reader: R, _version: ERNVersion) -> Self {
195 let mut buffer = Vec::new();
197 let _ = reader.read_to_end(&mut buffer);
198
199 Self {
200 reader,
201 parser: ParallelStreamingParser::new(),
202 buffer,
203 finished: false,
204 elements_queue: Vec::new(),
205 current_index: 0,
206 }
207 }
208
209 pub fn with_threads(mut reader: R, _version: ERNVersion, threads: usize) -> Self {
210 let mut buffer = Vec::new();
211 let _ = reader.read_to_end(&mut buffer);
212
213 Self {
214 reader,
215 parser: ParallelStreamingParser::with_threads(threads),
216 buffer,
217 finished: false,
218 elements_queue: Vec::new(),
219 current_index: 0,
220 }
221 }
222
223 pub fn stats(&self) -> WorkingStreamingStats {
224 self.parser.get_stats()
225 }
226}
227
228impl<R: BufRead> Iterator for ParallelStreamingIterator<R> {
229 type Item = Result<WorkingStreamingElement, ParseError>;
230
231 fn next(&mut self) -> Option<Self::Item> {
232 if self.finished {
233 return None;
234 }
235
236 if self.elements_queue.is_empty() && self.current_index == 0 {
238 match self.parser.parse_parallel(&self.buffer) {
239 Ok(elements) => {
240 self.elements_queue = elements;
241 }
242 Err(e) => {
243 self.finished = true;
244 return Some(Err(e));
245 }
246 }
247 }
248
249 if self.current_index < self.elements_queue.len() {
251 let element = self.elements_queue[self.current_index].clone();
252 self.current_index += 1;
253
254 if matches!(element, WorkingStreamingElement::EndOfStream { .. }) {
256 self.finished = true;
257 }
258
259 Some(Ok(element))
260 } else {
261 self.finished = true;
262 None
263 }
264 }
265}
266
267pub struct ParallelBenchmark;
269
270impl ParallelBenchmark {
271 pub fn measure_parallel_speedup(data: &[u8]) -> Result<ParallelBenchmarkResult, ParseError> {
272 println!("š Measuring Parallel Performance Speedup");
273 println!("Data size: {:.2} MB", data.len() as f64 / (1024.0 * 1024.0));
274
275 let start = Instant::now();
277 let single_parser = ParallelStreamingParser::with_threads(1);
278 let single_elements = single_parser.parse_parallel(data)?;
279 let single_time = start.elapsed();
280
281 let mut thread_results = Vec::new();
283
284 for threads in [2, 4, 6, 8] {
285 if threads <= num_cpus::get() {
286 let start = Instant::now();
287 let parallel_parser = ParallelStreamingParser::with_threads(threads);
288 let parallel_elements = parallel_parser.parse_parallel(data)?;
289 let parallel_time = start.elapsed();
290
291 let speedup = single_time.as_secs_f64() / parallel_time.as_secs_f64();
292 let efficiency = (speedup / threads as f64) * 100.0;
293 let throughput =
294 (data.len() as f64 / (1024.0 * 1024.0)) / parallel_time.as_secs_f64();
295
296 thread_results.push(ThreadResult {
297 threads,
298 time: parallel_time,
299 speedup,
300 efficiency,
301 throughput_mb_per_sec: throughput,
302 elements_found: parallel_elements.len(),
303 });
304
305 println!(
306 " {} threads: {:.3}s, {:.1}x speedup, {:.1}% efficiency, {:.1} MB/s",
307 threads,
308 parallel_time.as_secs_f64(),
309 speedup,
310 efficiency,
311 throughput
312 );
313
314 assert_eq!(
316 single_elements.len(),
317 parallel_elements.len(),
318 "Element count mismatch: single={}, parallel={}",
319 single_elements.len(),
320 parallel_elements.len()
321 );
322 }
323 }
324
325 let single_throughput = (data.len() as f64 / (1024.0 * 1024.0)) / single_time.as_secs_f64();
326
327 let best_result = thread_results
328 .iter()
329 .max_by(|a, b| {
330 a.throughput_mb_per_sec
331 .partial_cmp(&b.throughput_mb_per_sec)
332 .unwrap()
333 })
334 .unwrap();
335
336 let best_speedup = best_result.speedup;
337 let best_throughput = best_result.throughput_mb_per_sec;
338 let target_achieved = best_result.throughput_mb_per_sec >= 280.0;
339
340 let result = ParallelBenchmarkResult {
341 single_threaded_time: single_time,
342 single_threaded_throughput: single_throughput,
343 single_threaded_elements: single_elements.len(),
344 thread_results,
345 best_speedup,
346 best_throughput,
347 target_achieved,
348 };
349
350 println!("\nš PARALLEL PERFORMANCE SUMMARY");
351 println!(
352 "Single-threaded: {:.1} MB/s",
353 result.single_threaded_throughput
354 );
355 println!(
356 "Best parallel: {:.1} MB/s ({:.1}x speedup)",
357 result.best_throughput, result.best_speedup
358 );
359 println!(
360 "Target (280 MB/s): {}",
361 if result.target_achieved {
362 "ā
ACHIEVED!"
363 } else {
364 "ā Not achieved"
365 }
366 );
367
368 if result.target_achieved {
369 println!("š SUCCESS: 480x performance improvement target achieved with parallel processing!");
370 }
371
372 Ok(result)
373 }
374}
375
376#[derive(Debug, Clone)]
377pub struct ThreadResult {
378 pub threads: usize,
379 pub time: std::time::Duration,
380 pub speedup: f64,
381 pub efficiency: f64,
382 pub throughput_mb_per_sec: f64,
383 pub elements_found: usize,
384}
385
386#[derive(Debug)]
387pub struct ParallelBenchmarkResult {
388 pub single_threaded_time: std::time::Duration,
389 pub single_threaded_throughput: f64,
390 pub single_threaded_elements: usize,
391 pub thread_results: Vec<ThreadResult>,
392 pub best_speedup: f64,
393 pub best_throughput: f64,
394 pub target_achieved: bool,
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400 use std::io::Cursor;
401
402 fn generate_large_ddex_data(target_mb: usize) -> Vec<u8> {
403 let mut xml = String::from(
404 r#"<?xml version="1.0" encoding="UTF-8"?>
405<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
406 <MessageHeader>
407 <MessageId>PARALLEL-BENCH-MSG</MessageId>
408 <CreatedDateTime>2024-01-01T00:00:00Z</CreatedDateTime>
409 </MessageHeader>
410"#,
411 );
412
413 let target_bytes = target_mb * 1024 * 1024;
414 let single_release_size = 1200; let num_releases = (target_bytes / single_release_size).max(1000);
416
417 for i in 0..num_releases {
418 xml.push_str(&format!(
419 r#"
420 <Release ReleaseReference="PAR-REL-{:08}">
421 <ReferenceTitle>
422 <TitleText>Parallel Benchmark Release #{}</TitleText>
423 <SubTitle>Multi-core Performance Test Release</SubTitle>
424 </ReferenceTitle>
425 <Genre>
426 <GenreText>Electronic</GenreText>
427 <SubGenre>Ambient</SubGenre>
428 </Genre>
429 <PLine>
430 <Year>2024</Year>
431 <PLineText>ā 2024 Parallel Performance Label</PLineText>
432 </PLine>
433 <ReleaseLabelReference>PAR-LBL-{:03}</ReleaseLabelReference>
434 </Release>
435"#,
436 i,
437 i,
438 i % 100
439 ));
440
441 for j in 0..4 {
443 xml.push_str(&format!(
444 r#"
445 <SoundRecording ResourceReference="PAR-RES-{:08}-{:02}">
446 <ResourceId>
447 <ISRC>PARLL{:08}</ISRC>
448 </ResourceId>
449 <ReferenceTitle>
450 <TitleText>Parallel Track {} from Release {}</TitleText>
451 </ReferenceTitle>
452 <Duration>PT{}M{}S</Duration>
453 <CreationDate>2024-01-01</CreationDate>
454 <LanguageOfPerformance>en</LanguageOfPerformance>
455 <ResourceContributor>
456 <PartyId namespace="IPI">PAR{:08}</PartyId>
457 <PartyName>Parallel Artist {}</PartyName>
458 <ContributorRole>MainArtist</ContributorRole>
459 </ResourceContributor>
460 </SoundRecording>
461"#,
462 i,
463 j,
464 i * 10 + j,
465 j + 1,
466 i,
467 (j + 3) % 8,
468 (i + j + 30) % 60,
469 i,
470 i % 1000
471 ));
472 }
473
474 if i % 1000 == 0 && i > 0 {
475 let current_size = xml.len() as f64 / (1024.0 * 1024.0);
476 println!("Generated {:.1}MB with {} releases", current_size, i);
477
478 if current_size >= target_mb as f64 {
479 break;
480 }
481 }
482 }
483
484 xml.push_str("</ern:NewReleaseMessage>");
485 xml.into_bytes()
486 }
487
488 #[test]
489 fn test_parallel_basic_functionality() {
490 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
491<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
492 <MessageHeader>
493 <MessageId>PAR-TEST-MSG</MessageId>
494 <CreatedDateTime>2024-01-01T00:00:00Z</CreatedDateTime>
495 </MessageHeader>
496 <Release ReleaseReference="PAR-REL-001">
497 <ReferenceTitle>
498 <TitleText>Parallel Test Release</TitleText>
499 </ReferenceTitle>
500 </Release>
501</ern:NewReleaseMessage>"#;
502
503 let cursor = Cursor::new(xml.as_bytes());
504 let mut iterator = ParallelStreamingIterator::new(cursor, ERNVersion::V4_3);
505
506 let elements: Result<Vec<_>, _> = iterator.collect();
507 assert!(elements.is_ok(), "Parallel parsing should work");
508
509 let elements = elements.unwrap();
510 assert!(!elements.is_empty(), "Should find elements");
511
512 let has_header = elements
513 .iter()
514 .any(|e| matches!(e, WorkingStreamingElement::MessageHeader { .. }));
515 let has_release = elements
516 .iter()
517 .any(|e| matches!(e, WorkingStreamingElement::Release { .. }));
518
519 assert!(has_header, "Should find message header");
520 assert!(has_release, "Should find release");
521
522 println!("ā
Parallel parser basic test passed!");
523 }
524
525 #[test]
526 fn test_parallel_speedup_measurement() {
527 let data = generate_large_ddex_data(50);
529
530 let result = ParallelBenchmark::measure_parallel_speedup(&data).unwrap();
532
533 assert!(result.best_speedup > 1.0, "Should have some speedup");
535 assert!(
536 result.best_throughput > result.single_threaded_throughput,
537 "Parallel should be faster"
538 );
539
540 if result.target_achieved {
542 println!("š TARGET ACHIEVED: {} MB/s", result.best_throughput);
543 } else {
544 println!(
545 "ā ļø Target not achieved: {} MB/s (need 280 MB/s)",
546 result.best_throughput
547 );
548 }
549 }
550
551 #[test]
552 fn test_element_boundary_detection() {
553 let parser = ParallelStreamingParser::new();
554 let xml = b"<Release>content</Release><Release>more</Release>";
555
556 let boundaries = parser.find_element_boundaries(xml);
557 println!("Boundaries: {:?}", boundaries);
558
559 assert!(boundaries.len() >= 2, "Should find boundaries");
560 assert_eq!(boundaries[0], 0, "Should start at 0");
561 assert_eq!(
562 boundaries[boundaries.len() - 1],
563 xml.len(),
564 "Should end at data length"
565 );
566 }
567
568 #[test]
569 fn test_thread_scaling() {
570 if num_cpus::get() < 4 {
571 println!("Skipping thread scaling test - need at least 4 cores");
572 return;
573 }
574
575 let data = generate_large_ddex_data(100);
576
577 println!("Testing thread scaling with 100MB data:");
578
579 for threads in [1, 2, 4, 8] {
580 if threads <= num_cpus::get() {
581 let start = Instant::now();
582 let parser = ParallelStreamingParser::with_threads(threads);
583 let elements = parser.parse_parallel(&data).unwrap();
584 let elapsed = start.elapsed();
585
586 let throughput = (data.len() as f64 / (1024.0 * 1024.0)) / elapsed.as_secs_f64();
587
588 println!(
589 " {} threads: {:.1} MB/s ({} elements)",
590 threads,
591 throughput,
592 elements.len()
593 );
594 }
595 }
596 }
597}