1#[allow(dead_code)] use crate::error::ParseError;
6use crate::parser::security::SecurityConfig;
7use crate::streaming::{StreamingConfig, StreamingProgress};
8use memchr::memmem;
9use std::io::BufRead;
10use std::time::{Duration, Instant};
11
12pub struct FastStreamingParser {
14 config: StreamingConfig,
15 release_start: memmem::Finder<'static>,
17 release_end: memmem::Finder<'static>,
18 resource_start: memmem::Finder<'static>,
19 resource_end: memmem::Finder<'static>,
20 header_start: memmem::Finder<'static>,
21 header_end: memmem::Finder<'static>,
22 sound_recording_start: memmem::Finder<'static>,
24 sound_recording_end: memmem::Finder<'static>,
25 party_start: memmem::Finder<'static>,
26 party_end: memmem::Finder<'static>,
27 deal_start: memmem::Finder<'static>,
28 deal_end: memmem::Finder<'static>,
29}
30
31#[derive(Debug, Clone)]
33pub struct FastStreamingElement {
34 pub element_type: FastElementType,
36 pub raw_content: Vec<u8>,
38 pub position: u64,
40 pub size: usize,
42 pub parsed_at: Instant,
44}
45
46#[derive(Debug, Clone, PartialEq)]
48pub enum FastElementType {
49 Release,
50 Resource,
51 Party,
52 Deal,
53 MessageHeader,
54 Other(String),
55}
56
57#[derive(Debug, Clone)]
59pub struct FastParsingStats {
60 pub throughput_mbps: f64,
61 pub elements_per_second: f64,
62 pub total_bytes: u64,
63 pub total_elements: usize,
64 pub elapsed: Duration,
65 pub peak_memory_mb: f64,
66 pub avg_element_size: f64,
67}
68
69impl FastStreamingParser {
70 pub fn new(config: StreamingConfig) -> Self {
71 Self {
72 config,
73 release_start: memmem::Finder::new(b"<Release"),
75 release_end: memmem::Finder::new(b"</Release>"),
76 resource_start: memmem::Finder::new(b"<Resource"),
77 resource_end: memmem::Finder::new(b"</Resource>"),
78 sound_recording_start: memmem::Finder::new(b"<SoundRecording"),
79 sound_recording_end: memmem::Finder::new(b"</SoundRecording>"),
80 header_start: memmem::Finder::new(b"<MessageHeader"),
81 header_end: memmem::Finder::new(b"</MessageHeader>"),
82 party_start: memmem::Finder::new(b"<Party"),
83 party_end: memmem::Finder::new(b"</Party>"),
84 deal_start: memmem::Finder::new(b"<Deal"),
85 deal_end: memmem::Finder::new(b"</Deal>"),
86 }
87 }
88
89 pub fn parse_streaming<R: BufRead>(
90 &mut self,
91 reader: &mut R,
92 _progress_callback: Option<Box<dyn FnMut(StreamingProgress)>>,
93 ) -> Result<FastStreamingIterator, ParseError> {
94 let start = Instant::now();
95
96 let mut buffer = Vec::with_capacity(50 * 1024 * 1024); let bytes_read = reader.read_to_end(&mut buffer)?;
99
100 let mut elements = Vec::with_capacity(50000);
102
103 let mut pos = 0;
108 while let Some(offset) = self.release_start.find(&buffer[pos..]) {
109 let start_pos = pos + offset;
110
111 if let Some(end_offset) = self.release_end.find(&buffer[start_pos..]) {
113 let end_pos = start_pos + end_offset + 10; elements.push(FastStreamingElement {
116 element_type: FastElementType::Release,
117 raw_content: buffer[start_pos..end_pos].to_vec(),
118 position: start_pos as u64,
119 size: end_pos - start_pos,
120 parsed_at: Instant::now(),
121 });
122
123 pos = end_pos;
124 } else {
125 pos = start_pos + 1;
126 }
127 }
128
129 pos = 0;
131 while let Some(offset) = self.resource_start.find(&buffer[pos..]) {
132 let start_pos = pos + offset;
133
134 if let Some(end_offset) = self.resource_end.find(&buffer[start_pos..]) {
135 let end_pos = start_pos + end_offset + 11; elements.push(FastStreamingElement {
138 element_type: FastElementType::Resource,
139 raw_content: buffer[start_pos..end_pos].to_vec(),
140 position: start_pos as u64,
141 size: end_pos - start_pos,
142 parsed_at: Instant::now(),
143 });
144
145 pos = end_pos;
146 } else {
147 pos = start_pos + 1;
148 }
149 }
150
151 pos = 0;
153 while let Some(offset) = self.sound_recording_start.find(&buffer[pos..]) {
154 let start_pos = pos + offset;
155
156 if let Some(end_offset) = self.sound_recording_end.find(&buffer[start_pos..]) {
157 let end_pos = start_pos + end_offset + 17; elements.push(FastStreamingElement {
160 element_type: FastElementType::Resource,
161 raw_content: buffer[start_pos..end_pos].to_vec(),
162 position: start_pos as u64,
163 size: end_pos - start_pos,
164 parsed_at: Instant::now(),
165 });
166
167 pos = end_pos;
168 } else {
169 pos = start_pos + 1;
170 }
171 }
172
173 if let Some(offset) = self.header_start.find(&buffer) {
175 if let Some(end_offset) = self.header_end.find(&buffer[offset..]) {
176 let end_pos = offset + end_offset + 16; elements.push(FastStreamingElement {
179 element_type: FastElementType::MessageHeader,
180 raw_content: buffer[offset..end_pos].to_vec(),
181 position: offset as u64,
182 size: end_pos - offset,
183 parsed_at: Instant::now(),
184 });
185 }
186 }
187
188 pos = 0;
190 while let Some(offset) = self.party_start.find(&buffer[pos..]) {
191 let start_pos = pos + offset;
192
193 if let Some(end_offset) = self.party_end.find(&buffer[start_pos..]) {
194 let end_pos = start_pos + end_offset + 8; elements.push(FastStreamingElement {
197 element_type: FastElementType::Party,
198 raw_content: buffer[start_pos..end_pos].to_vec(),
199 position: start_pos as u64,
200 size: end_pos - start_pos,
201 parsed_at: Instant::now(),
202 });
203
204 pos = end_pos;
205 } else {
206 pos = start_pos + 1;
207 }
208 }
209
210 pos = 0;
212 while let Some(offset) = self.deal_start.find(&buffer[pos..]) {
213 let start_pos = pos + offset;
214
215 if let Some(end_offset) = self.deal_end.find(&buffer[start_pos..]) {
216 let end_pos = start_pos + end_offset + 7; elements.push(FastStreamingElement {
219 element_type: FastElementType::Deal,
220 raw_content: buffer[start_pos..end_pos].to_vec(),
221 position: start_pos as u64,
222 size: end_pos - start_pos,
223 parsed_at: Instant::now(),
224 });
225
226 pos = end_pos;
227 } else {
228 pos = start_pos + 1;
229 }
230 }
231
232 elements.sort_by_key(|e| e.position);
234
235 let elapsed = start.elapsed();
236 let throughput = (bytes_read as f64) / elapsed.as_secs_f64() / (1024.0 * 1024.0);
237
238 let stats = FastParsingStats {
239 throughput_mbps: throughput,
240 elements_per_second: elements.len() as f64 / elapsed.as_secs_f64(),
241 total_bytes: bytes_read as u64,
242 total_elements: elements.len(),
243 elapsed,
244 peak_memory_mb: (buffer.capacity() as f64) / (1024.0 * 1024.0),
245 avg_element_size: if !elements.is_empty() {
246 elements.iter().map(|e| e.size).sum::<usize>() as f64 / elements.len() as f64
247 } else {
248 0.0
249 },
250 };
251
252 Ok(FastStreamingIterator::new(elements, stats))
253 }
254
255 pub fn get_stats(&self) -> FastParsingStats {
257 FastParsingStats {
258 throughput_mbps: 0.0,
259 elements_per_second: 0.0,
260 total_bytes: 0,
261 total_elements: 0,
262 elapsed: Duration::from_secs(0),
263 peak_memory_mb: 0.0,
264 avg_element_size: 0.0,
265 }
266 }
267}
268
269#[allow(dead_code)]
271pub struct FastStreamingIterator {
272 elements: Vec<FastStreamingElement>,
273 position: usize,
274 stats: FastParsingStats,
275}
276
277#[allow(dead_code)]
278impl FastStreamingIterator {
279 pub fn new(elements: Vec<FastStreamingElement>, mut stats: FastParsingStats) -> Self {
280 stats.total_elements = elements.len();
282 if stats.elapsed.as_secs_f64() > 0.0 {
283 stats.elements_per_second = elements.len() as f64 / stats.elapsed.as_secs_f64();
284 }
285 if !elements.is_empty() {
286 stats.avg_element_size =
287 elements.iter().map(|e| e.size).sum::<usize>() as f64 / elements.len() as f64;
288 }
289
290 Self {
291 elements,
292 position: 0,
293 stats,
294 }
295 }
296
297 pub fn stats(&self) -> &FastParsingStats {
299 &self.stats
300 }
301
302 pub fn filter_by_type(&self, element_type: FastElementType) -> Vec<&FastStreamingElement> {
304 self.elements
305 .iter()
306 .filter(|e| e.element_type == element_type)
307 .collect()
308 }
309
310 pub fn len(&self) -> usize {
312 self.elements.len()
313 }
314
315 pub fn is_empty(&self) -> bool {
317 self.elements.is_empty()
318 }
319}
320
321impl Iterator for FastStreamingIterator {
322 type Item = FastStreamingElement;
323
324 fn next(&mut self) -> Option<Self::Item> {
325 if self.position < self.elements.len() {
326 let element = self.elements[self.position].clone();
327 self.position += 1;
328 Some(element)
329 } else {
330 None
331 }
332 }
333
334 fn size_hint(&self) -> (usize, Option<usize>) {
335 let remaining = self.elements.len() - self.position;
336 (remaining, Some(remaining))
337 }
338}
339
340impl ExactSizeIterator for FastStreamingIterator {}
341
342#[allow(dead_code)]
344pub fn create_fast_parser() -> FastStreamingParser {
345 let config = StreamingConfig {
346 security: SecurityConfig::relaxed(), buffer_size: 64 * 1024, max_memory: 200 * 1024 * 1024, chunk_size: 512, enable_progress: false, progress_interval: 0,
352 };
353
354 FastStreamingParser::new(config)
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use std::io::{BufReader, Cursor};
361
362 #[test]
363 fn test_fast_streaming_parser_creation() {
364 let parser = create_fast_parser();
365 assert_eq!(parser.config.buffer_size, 64 * 1024);
366 }
367
368 #[test]
369 fn test_fast_streaming_basic() {
370 let mut parser = create_fast_parser();
371
372 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
373 <ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
374 <MessageHeader>
375 <MessageId>MSG001</MessageId>
376 </MessageHeader>
377 <ReleaseList>
378 <Release>
379 <ReleaseId>REL001</ReleaseId>
380 <ReleaseReference>R001</ReleaseReference>
381 </Release>
382 <Release>
383 <ReleaseId>REL002</ReleaseId>
384 <ReleaseReference>R002</ReleaseReference>
385 </Release>
386 </ReleaseList>
387 <ResourceList>
388 <SoundRecording>
389 <ResourceReference>A1</ResourceReference>
390 <Duration>PT3M45S</Duration>
391 </SoundRecording>
392 </ResourceList>
393 </ern:NewReleaseMessage>"#;
394
395 let cursor = Cursor::new(xml.as_bytes());
396 let mut reader = BufReader::new(cursor);
397
398 let result = parser.parse_streaming(&mut reader, None);
399 assert!(result.is_ok());
400
401 let iterator = result.unwrap();
402 let stats = iterator.stats();
403
404 assert!(stats.total_elements > 0);
406 assert!(stats.total_bytes > 0);
407
408#[cfg(feature = "performance-debug")] println!("SIMD Fast streaming stats: {:#?}", stats);
409#[cfg(feature = "performance-debug")] println!("Throughput: {:.2} MB/s", stats.throughput_mbps);
410 }
411
412 #[test]
413 fn test_performance_target() {
414 let mut parser = create_fast_parser();
415
416 let mut test_xml = String::from(
418 r#"<?xml version="1.0" encoding="UTF-8"?>
419 <ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
420 <MessageHeader>
421 <MessageId>PERFORMANCE_TEST</MessageId>
422 <MessageThreadId>THREAD001</MessageThreadId>
423 <MessageCreatedDateTime>2024-01-01T12:00:00</MessageCreatedDateTime>
424 </MessageHeader>
425 <ReleaseList>"#,
426 );
427
428 for i in 0..5000 {
430 test_xml.push_str(&format!(
431 r#"
432 <Release>
433 <ReleaseId>REL{:08}</ReleaseId>
434 <ReleaseReference>R{:08}</ReleaseReference>
435 <Title>
436 <TitleText>Test Release {} - High Performance Streaming Test</TitleText>
437 </Title>
438 <DisplayArtist>Test Artist {}</DisplayArtist>
439 <ReleaseType>Album</ReleaseType>
440 <Genre>Electronic</Genre>
441 </Release>"#,
442 i,
443 i,
444 i,
445 i % 100
446 ));
447 }
448
449 test_xml.push_str("</ReleaseList><ResourceList>");
450
451 for i in 0..3000 {
453 test_xml.push_str(&format!(
454 r#"
455 <SoundRecording>
456 <ResourceReference>A{:08}</ResourceReference>
457 <Duration>PT3M{:02}S</Duration>
458 <Title>Track {} High Performance Test</Title>
459 <AudioChannelConfiguration>Stereo</AudioChannelConfiguration>
460 <SampleRate>44100</SampleRate>
461 <BitsPerSample>16</BitsPerSample>
462 </SoundRecording>"#,
463 i,
464 i % 60,
465 i
466 ));
467 }
468
469 test_xml.push_str("</ResourceList></ern:NewReleaseMessage>");
470
471 let cursor = Cursor::new(test_xml.as_bytes());
472 let mut reader = BufReader::new(cursor);
473
474 let start = Instant::now();
475 let result = parser.parse_streaming(&mut reader, None);
476 let elapsed = start.elapsed();
477
478 assert!(result.is_ok());
479 let iterator = result.unwrap();
480 let stats = iterator.stats();
481
482#[cfg(feature = "performance-debug")] println!("SIMD Performance test results:");
483 #[cfg(feature = "performance-debug")]
484 println!(
485 " Total bytes: {:.2} MB",
486 stats.total_bytes as f64 / (1024.0 * 1024.0)
487 );
488 #[cfg(feature = "performance-debug")]
489 println!(" Total elements: {}", stats.total_elements);
490 #[cfg(feature = "performance-debug")]
491 println!(" Elapsed: {:?}", elapsed);
492 #[cfg(feature = "performance-debug")]
493 println!(" Throughput: {:.2} MB/s", stats.throughput_mbps);
494#[cfg(feature = "performance-debug")] println!(" Elements/sec: {:.2}", stats.elements_per_second);
495 #[cfg(feature = "performance-debug")]
496 println!(" Peak memory: {:.2} MB", stats.peak_memory_mb);
497 #[cfg(feature = "performance-debug")]
498 println!(" Avg element size: {:.2} bytes", stats.avg_element_size);
499
500 let target_throughput = 50.0; if stats.throughput_mbps >= target_throughput {
503 #[cfg(feature = "performance-debug")]
504 println!(
505 "✅ Performance target met: {:.2} MB/s >= {:.2} MB/s",
506 stats.throughput_mbps, target_throughput
507 );
508 } else {
509 #[cfg(feature = "performance-debug")]
510 println!(
511 "⚠️ Performance below target: {:.2} MB/s < {:.2} MB/s",
512 stats.throughput_mbps, target_throughput
513 );
514 }
515
516 assert!(
518 stats.total_elements > 8000,
519 "Should have found many elements"
520 );
521 assert!(
522 stats.total_bytes > 1024 * 1024,
523 "Should have processed > 1MB"
524 );
525 }
526
527 #[test]
528 fn test_element_types_detection() {
529 let mut parser = create_fast_parser();
530
531 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
532 <ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
533 <MessageHeader><MessageId>TEST</MessageId></MessageHeader>
534 <Release><ReleaseId>REL001</ReleaseId></Release>
535 <SoundRecording><ResourceReference>A1</ResourceReference></SoundRecording>
536 <Party><PartyId>P1</PartyId></Party>
537 <Deal><DealId>D1</DealId></Deal>
538 </ern:NewReleaseMessage>"#;
539
540 let cursor = Cursor::new(xml.as_bytes());
541 let mut reader = BufReader::new(cursor);
542
543 let result = parser.parse_streaming(&mut reader, None);
544 assert!(result.is_ok());
545
546 let iterator = result.unwrap();
547 let elements: Vec<_> = iterator.collect();
548
549 let header_count = elements
551 .iter()
552 .filter(|e| e.element_type == FastElementType::MessageHeader)
553 .count();
554 let release_count = elements
555 .iter()
556 .filter(|e| e.element_type == FastElementType::Release)
557 .count();
558 let resource_count = elements
559 .iter()
560 .filter(|e| e.element_type == FastElementType::Resource)
561 .count();
562 let party_count = elements
563 .iter()
564 .filter(|e| e.element_type == FastElementType::Party)
565 .count();
566 let deal_count = elements
567 .iter()
568 .filter(|e| e.element_type == FastElementType::Deal)
569 .count();
570
571#[cfg(feature = "performance-debug")] println!("Element type counts:");
572 #[cfg(feature = "performance-debug")]
573 println!(" Headers: {}", header_count);
574 #[cfg(feature = "performance-debug")]
575 println!(" Releases: {}", release_count);
576 #[cfg(feature = "performance-debug")]
577 println!(" Resources: {}", resource_count);
578 #[cfg(feature = "performance-debug")]
579 println!(" Parties: {}", party_count);
580 #[cfg(feature = "performance-debug")]
581 println!(" Deals: {}", deal_count);
582
583 assert!(header_count >= 1, "Should find message header");
584 assert!(release_count >= 1, "Should find releases");
585 assert!(resource_count >= 1, "Should find resources");
586 assert!(party_count >= 1, "Should find parties");
587 assert!(deal_count >= 1, "Should find deals");
588 }
589}