ddex_parser/streaming/
fast_streaming_parser.rs

1// src/streaming/fast_streaming_parser.rs
2//! Ultra-high-performance streaming DDEX parser targeting 280+ MB/s throughput
3
4#[allow(dead_code)] // Experimental high-performance streaming parser
5use crate::error::ParseError;
6use crate::parser::security::SecurityConfig;
7use crate::streaming::{StreamingConfig, StreamingProgress};
8use memchr::memmem;
9use std::io::BufRead;
10use std::time::{Duration, Instant};
11
12/// High-performance streaming parser optimized for 280+ MB/s
13pub struct FastStreamingParser {
14    config: StreamingConfig,
15    // Pre-compiled SIMD-accelerated pattern matchers
16    release_start: memmem::Finder<'static>,
17    release_end: memmem::Finder<'static>,
18    resource_start: memmem::Finder<'static>,
19    resource_end: memmem::Finder<'static>,
20    header_start: memmem::Finder<'static>,
21    header_end: memmem::Finder<'static>,
22    // Additional resource patterns for comprehensive matching
23    sound_recording_start: memmem::Finder<'static>,
24    sound_recording_end: memmem::Finder<'static>,
25    party_start: memmem::Finder<'static>,
26    party_end: memmem::Finder<'static>,
27    deal_start: memmem::Finder<'static>,
28    deal_end: memmem::Finder<'static>,
29}
30
31/// Fast streaming element with minimal allocation
32#[derive(Debug, Clone)]
33pub struct FastStreamingElement {
34    /// Element type (Release, Resource, Party, etc.)
35    pub element_type: FastElementType,
36    /// Raw XML content (zero-copy reference)
37    pub raw_content: Vec<u8>,
38    /// Byte position in original stream
39    pub position: u64,
40    /// Size in bytes
41    pub size: usize,
42    /// Parse timestamp
43    pub parsed_at: Instant,
44}
45
46/// Element types for fast classification
47#[derive(Debug, Clone, PartialEq)]
48pub enum FastElementType {
49    Release,
50    Resource,
51    Party,
52    Deal,
53    MessageHeader,
54    Other(String),
55}
56
57/// Performance metrics
58#[derive(Debug, Clone)]
59pub struct FastParsingStats {
60    pub throughput_mbps: f64,
61    pub elements_per_second: f64,
62    pub total_bytes: u64,
63    pub total_elements: usize,
64    pub elapsed: Duration,
65    pub peak_memory_mb: f64,
66    pub avg_element_size: f64,
67}
68
69impl FastStreamingParser {
70    pub fn new(config: StreamingConfig) -> Self {
71        Self {
72            config,
73            // Pre-compile all patterns for SIMD acceleration
74            release_start: memmem::Finder::new(b"<Release"),
75            release_end: memmem::Finder::new(b"</Release>"),
76            resource_start: memmem::Finder::new(b"<Resource"),
77            resource_end: memmem::Finder::new(b"</Resource>"),
78            sound_recording_start: memmem::Finder::new(b"<SoundRecording"),
79            sound_recording_end: memmem::Finder::new(b"</SoundRecording>"),
80            header_start: memmem::Finder::new(b"<MessageHeader"),
81            header_end: memmem::Finder::new(b"</MessageHeader>"),
82            party_start: memmem::Finder::new(b"<Party"),
83            party_end: memmem::Finder::new(b"</Party>"),
84            deal_start: memmem::Finder::new(b"<Deal"),
85            deal_end: memmem::Finder::new(b"</Deal>"),
86        }
87    }
88
89    pub fn parse_streaming<R: BufRead>(
90        &mut self,
91        reader: &mut R,
92        _progress_callback: Option<Box<dyn FnMut(StreamingProgress)>>,
93    ) -> Result<FastStreamingIterator, ParseError> {
94        let start = Instant::now();
95
96        // Read entire buffer at once - critical for performance
97        let mut buffer = Vec::with_capacity(50 * 1024 * 1024); // 50MB initial capacity
98        let bytes_read = reader.read_to_end(&mut buffer)?;
99
100        // Pre-allocate results with generous capacity to avoid reallocation
101        let mut elements = Vec::with_capacity(50000);
102
103        // Scan using SIMD-accelerated pattern matching
104        // Multiple passes for different element types maximize SIMD efficiency
105
106        // Pass 1: Find all releases using SIMD
107        let mut pos = 0;
108        while let Some(offset) = self.release_start.find(&buffer[pos..]) {
109            let start_pos = pos + offset;
110
111            // Find end using SIMD
112            if let Some(end_offset) = self.release_end.find(&buffer[start_pos..]) {
113                let end_pos = start_pos + end_offset + 10; // "</Release>".len()
114
115                elements.push(FastStreamingElement {
116                    element_type: FastElementType::Release,
117                    raw_content: buffer[start_pos..end_pos].to_vec(),
118                    position: start_pos as u64,
119                    size: end_pos - start_pos,
120                    parsed_at: Instant::now(),
121                });
122
123                pos = end_pos;
124            } else {
125                pos = start_pos + 1;
126            }
127        }
128
129        // Pass 2: Find all resources (both Resource and SoundRecording)
130        pos = 0;
131        while let Some(offset) = self.resource_start.find(&buffer[pos..]) {
132            let start_pos = pos + offset;
133
134            if let Some(end_offset) = self.resource_end.find(&buffer[start_pos..]) {
135                let end_pos = start_pos + end_offset + 11; // "</Resource>".len()
136
137                elements.push(FastStreamingElement {
138                    element_type: FastElementType::Resource,
139                    raw_content: buffer[start_pos..end_pos].to_vec(),
140                    position: start_pos as u64,
141                    size: end_pos - start_pos,
142                    parsed_at: Instant::now(),
143                });
144
145                pos = end_pos;
146            } else {
147                pos = start_pos + 1;
148            }
149        }
150
151        // Pass 2b: Find SoundRecording elements
152        pos = 0;
153        while let Some(offset) = self.sound_recording_start.find(&buffer[pos..]) {
154            let start_pos = pos + offset;
155
156            if let Some(end_offset) = self.sound_recording_end.find(&buffer[start_pos..]) {
157                let end_pos = start_pos + end_offset + 17; // "</SoundRecording>".len()
158
159                elements.push(FastStreamingElement {
160                    element_type: FastElementType::Resource,
161                    raw_content: buffer[start_pos..end_pos].to_vec(),
162                    position: start_pos as u64,
163                    size: end_pos - start_pos,
164                    parsed_at: Instant::now(),
165                });
166
167                pos = end_pos;
168            } else {
169                pos = start_pos + 1;
170            }
171        }
172
173        // Pass 3: Find message header
174        if let Some(offset) = self.header_start.find(&buffer) {
175            if let Some(end_offset) = self.header_end.find(&buffer[offset..]) {
176                let end_pos = offset + end_offset + 16; // "</MessageHeader>".len()
177
178                elements.push(FastStreamingElement {
179                    element_type: FastElementType::MessageHeader,
180                    raw_content: buffer[offset..end_pos].to_vec(),
181                    position: offset as u64,
182                    size: end_pos - offset,
183                    parsed_at: Instant::now(),
184                });
185            }
186        }
187
188        // Pass 4: Find parties
189        pos = 0;
190        while let Some(offset) = self.party_start.find(&buffer[pos..]) {
191            let start_pos = pos + offset;
192
193            if let Some(end_offset) = self.party_end.find(&buffer[start_pos..]) {
194                let end_pos = start_pos + end_offset + 8; // "</Party>".len()
195
196                elements.push(FastStreamingElement {
197                    element_type: FastElementType::Party,
198                    raw_content: buffer[start_pos..end_pos].to_vec(),
199                    position: start_pos as u64,
200                    size: end_pos - start_pos,
201                    parsed_at: Instant::now(),
202                });
203
204                pos = end_pos;
205            } else {
206                pos = start_pos + 1;
207            }
208        }
209
210        // Pass 5: Find deals
211        pos = 0;
212        while let Some(offset) = self.deal_start.find(&buffer[pos..]) {
213            let start_pos = pos + offset;
214
215            if let Some(end_offset) = self.deal_end.find(&buffer[start_pos..]) {
216                let end_pos = start_pos + end_offset + 7; // "</Deal>".len()
217
218                elements.push(FastStreamingElement {
219                    element_type: FastElementType::Deal,
220                    raw_content: buffer[start_pos..end_pos].to_vec(),
221                    position: start_pos as u64,
222                    size: end_pos - start_pos,
223                    parsed_at: Instant::now(),
224                });
225
226                pos = end_pos;
227            } else {
228                pos = start_pos + 1;
229            }
230        }
231
232        // Sort elements by position for proper ordering
233        elements.sort_by_key(|e| e.position);
234
235        let elapsed = start.elapsed();
236        let throughput = (bytes_read as f64) / elapsed.as_secs_f64() / (1024.0 * 1024.0);
237
238        let stats = FastParsingStats {
239            throughput_mbps: throughput,
240            elements_per_second: elements.len() as f64 / elapsed.as_secs_f64(),
241            total_bytes: bytes_read as u64,
242            total_elements: elements.len(),
243            elapsed,
244            peak_memory_mb: (buffer.capacity() as f64) / (1024.0 * 1024.0),
245            avg_element_size: if !elements.is_empty() {
246                elements.iter().map(|e| e.size).sum::<usize>() as f64 / elements.len() as f64
247            } else {
248                0.0
249            },
250        };
251
252        Ok(FastStreamingIterator::new(elements, stats))
253    }
254
255    /// Get current parsing statistics
256    pub fn get_stats(&self) -> FastParsingStats {
257        FastParsingStats {
258            throughput_mbps: 0.0,
259            elements_per_second: 0.0,
260            total_bytes: 0,
261            total_elements: 0,
262            elapsed: Duration::from_secs(0),
263            peak_memory_mb: 0.0,
264            avg_element_size: 0.0,
265        }
266    }
267}
268
269/// High-performance streaming iterator
270#[allow(dead_code)]
271pub struct FastStreamingIterator {
272    elements: Vec<FastStreamingElement>,
273    position: usize,
274    stats: FastParsingStats,
275}
276
277#[allow(dead_code)]
278impl FastStreamingIterator {
279    pub fn new(elements: Vec<FastStreamingElement>, mut stats: FastParsingStats) -> Self {
280        // Calculate final statistics
281        stats.total_elements = elements.len();
282        if stats.elapsed.as_secs_f64() > 0.0 {
283            stats.elements_per_second = elements.len() as f64 / stats.elapsed.as_secs_f64();
284        }
285        if !elements.is_empty() {
286            stats.avg_element_size =
287                elements.iter().map(|e| e.size).sum::<usize>() as f64 / elements.len() as f64;
288        }
289
290        Self {
291            elements,
292            position: 0,
293            stats,
294        }
295    }
296
297    /// Get parsing performance statistics
298    pub fn stats(&self) -> &FastParsingStats {
299        &self.stats
300    }
301
302    /// Get all elements of a specific type
303    pub fn filter_by_type(&self, element_type: FastElementType) -> Vec<&FastStreamingElement> {
304        self.elements
305            .iter()
306            .filter(|e| e.element_type == element_type)
307            .collect()
308    }
309
310    /// Get total number of elements
311    pub fn len(&self) -> usize {
312        self.elements.len()
313    }
314
315    /// Check if iterator is empty
316    pub fn is_empty(&self) -> bool {
317        self.elements.is_empty()
318    }
319}
320
321impl Iterator for FastStreamingIterator {
322    type Item = FastStreamingElement;
323
324    fn next(&mut self) -> Option<Self::Item> {
325        if self.position < self.elements.len() {
326            let element = self.elements[self.position].clone();
327            self.position += 1;
328            Some(element)
329        } else {
330            None
331        }
332    }
333
334    fn size_hint(&self) -> (usize, Option<usize>) {
335        let remaining = self.elements.len() - self.position;
336        (remaining, Some(remaining))
337    }
338}
339
340impl ExactSizeIterator for FastStreamingIterator {}
341
342/// Create a fast streaming parser with optimal configuration for performance
343#[allow(dead_code)]
344pub fn create_fast_parser() -> FastStreamingParser {
345    let config = StreamingConfig {
346        security: SecurityConfig::relaxed(), // Use relaxed for maximum performance
347        buffer_size: 64 * 1024,              // 64KB buffer
348        max_memory: 200 * 1024 * 1024,       // 200MB memory limit
349        chunk_size: 512,                     // 512KB chunks for optimal throughput
350        enable_progress: false,              // Disable progress for max speed
351        progress_interval: 0,
352    };
353
354    FastStreamingParser::new(config)
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use std::io::{BufReader, Cursor};
361
362    #[test]
363    fn test_fast_streaming_parser_creation() {
364        let parser = create_fast_parser();
365        assert_eq!(parser.config.buffer_size, 64 * 1024);
366    }
367
368    #[test]
369    fn test_fast_streaming_basic() {
370        let mut parser = create_fast_parser();
371
372        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
373        <ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
374            <MessageHeader>
375                <MessageId>MSG001</MessageId>
376            </MessageHeader>
377            <ReleaseList>
378                <Release>
379                    <ReleaseId>REL001</ReleaseId>
380                    <ReleaseReference>R001</ReleaseReference>
381                </Release>
382                <Release>
383                    <ReleaseId>REL002</ReleaseId>
384                    <ReleaseReference>R002</ReleaseReference>
385                </Release>
386            </ReleaseList>
387            <ResourceList>
388                <SoundRecording>
389                    <ResourceReference>A1</ResourceReference>
390                    <Duration>PT3M45S</Duration>
391                </SoundRecording>
392            </ResourceList>
393        </ern:NewReleaseMessage>"#;
394
395        let cursor = Cursor::new(xml.as_bytes());
396        let mut reader = BufReader::new(cursor);
397
398        let result = parser.parse_streaming(&mut reader, None);
399        assert!(result.is_ok());
400
401        let iterator = result.unwrap();
402        let stats = iterator.stats();
403
404        // Should have parsed some elements
405        assert!(stats.total_elements > 0);
406        assert!(stats.total_bytes > 0);
407
408#[cfg(feature = "performance-debug")]         println!("SIMD Fast streaming stats: {:#?}", stats);
409#[cfg(feature = "performance-debug")]         println!("Throughput: {:.2} MB/s", stats.throughput_mbps);
410    }
411
412    #[test]
413    fn test_performance_target() {
414        let mut parser = create_fast_parser();
415
416        // Generate a larger XML for more realistic performance testing
417        let mut test_xml = String::from(
418            r#"<?xml version="1.0" encoding="UTF-8"?>
419        <ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
420            <MessageHeader>
421                <MessageId>PERFORMANCE_TEST</MessageId>
422                <MessageThreadId>THREAD001</MessageThreadId>
423                <MessageCreatedDateTime>2024-01-01T12:00:00</MessageCreatedDateTime>
424            </MessageHeader>
425            <ReleaseList>"#,
426        );
427
428        // Add many releases for performance testing
429        for i in 0..5000 {
430            test_xml.push_str(&format!(
431                r#"
432                <Release>
433                    <ReleaseId>REL{:08}</ReleaseId>
434                    <ReleaseReference>R{:08}</ReleaseReference>
435                    <Title>
436                        <TitleText>Test Release {} - High Performance Streaming Test</TitleText>
437                    </Title>
438                    <DisplayArtist>Test Artist {}</DisplayArtist>
439                    <ReleaseType>Album</ReleaseType>
440                    <Genre>Electronic</Genre>
441                </Release>"#,
442                i,
443                i,
444                i,
445                i % 100
446            ));
447        }
448
449        test_xml.push_str("</ReleaseList><ResourceList>");
450
451        // Add resources
452        for i in 0..3000 {
453            test_xml.push_str(&format!(
454                r#"
455                <SoundRecording>
456                    <ResourceReference>A{:08}</ResourceReference>
457                    <Duration>PT3M{:02}S</Duration>
458                    <Title>Track {} High Performance Test</Title>
459                    <AudioChannelConfiguration>Stereo</AudioChannelConfiguration>
460                    <SampleRate>44100</SampleRate>
461                    <BitsPerSample>16</BitsPerSample>
462                </SoundRecording>"#,
463                i,
464                i % 60,
465                i
466            ));
467        }
468
469        test_xml.push_str("</ResourceList></ern:NewReleaseMessage>");
470
471        let cursor = Cursor::new(test_xml.as_bytes());
472        let mut reader = BufReader::new(cursor);
473
474        let start = Instant::now();
475        let result = parser.parse_streaming(&mut reader, None);
476        let elapsed = start.elapsed();
477
478        assert!(result.is_ok());
479        let iterator = result.unwrap();
480        let stats = iterator.stats();
481
482#[cfg(feature = "performance-debug")]         println!("SIMD Performance test results:");
483        #[cfg(feature = "performance-debug")]
484        println!(
485            "  Total bytes: {:.2} MB",
486            stats.total_bytes as f64 / (1024.0 * 1024.0)
487        );
488        #[cfg(feature = "performance-debug")]
489        println!("  Total elements: {}", stats.total_elements);
490        #[cfg(feature = "performance-debug")]
491        println!("  Elapsed: {:?}", elapsed);
492        #[cfg(feature = "performance-debug")]
493        println!("  Throughput: {:.2} MB/s", stats.throughput_mbps);
494#[cfg(feature = "performance-debug")]         println!("  Elements/sec: {:.2}", stats.elements_per_second);
495        #[cfg(feature = "performance-debug")]
496        println!("  Peak memory: {:.2} MB", stats.peak_memory_mb);
497        #[cfg(feature = "performance-debug")]
498        println!("  Avg element size: {:.2} bytes", stats.avg_element_size);
499
500        // Performance targets
501        let target_throughput = 50.0; // MB/s - conservative target for CI
502        if stats.throughput_mbps >= target_throughput {
503            #[cfg(feature = "performance-debug")]
504            println!(
505                "✅ Performance target met: {:.2} MB/s >= {:.2} MB/s",
506                stats.throughput_mbps, target_throughput
507            );
508        } else {
509            #[cfg(feature = "performance-debug")]
510            println!(
511                "⚠️  Performance below target: {:.2} MB/s < {:.2} MB/s",
512                stats.throughput_mbps, target_throughput
513            );
514        }
515
516        // The parser should handle this efficiently
517        assert!(
518            stats.total_elements > 8000,
519            "Should have found many elements"
520        );
521        assert!(
522            stats.total_bytes > 1024 * 1024,
523            "Should have processed > 1MB"
524        );
525    }
526
527    #[test]
528    fn test_element_types_detection() {
529        let mut parser = create_fast_parser();
530
531        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
532        <ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
533            <MessageHeader><MessageId>TEST</MessageId></MessageHeader>
534            <Release><ReleaseId>REL001</ReleaseId></Release>
535            <SoundRecording><ResourceReference>A1</ResourceReference></SoundRecording>
536            <Party><PartyId>P1</PartyId></Party>
537            <Deal><DealId>D1</DealId></Deal>
538        </ern:NewReleaseMessage>"#;
539
540        let cursor = Cursor::new(xml.as_bytes());
541        let mut reader = BufReader::new(cursor);
542
543        let result = parser.parse_streaming(&mut reader, None);
544        assert!(result.is_ok());
545
546        let iterator = result.unwrap();
547        let elements: Vec<_> = iterator.collect();
548
549        // Should find all different element types
550        let header_count = elements
551            .iter()
552            .filter(|e| e.element_type == FastElementType::MessageHeader)
553            .count();
554        let release_count = elements
555            .iter()
556            .filter(|e| e.element_type == FastElementType::Release)
557            .count();
558        let resource_count = elements
559            .iter()
560            .filter(|e| e.element_type == FastElementType::Resource)
561            .count();
562        let party_count = elements
563            .iter()
564            .filter(|e| e.element_type == FastElementType::Party)
565            .count();
566        let deal_count = elements
567            .iter()
568            .filter(|e| e.element_type == FastElementType::Deal)
569            .count();
570
571#[cfg(feature = "performance-debug")]         println!("Element type counts:");
572        #[cfg(feature = "performance-debug")]
573        println!("  Headers: {}", header_count);
574        #[cfg(feature = "performance-debug")]
575        println!("  Releases: {}", release_count);
576        #[cfg(feature = "performance-debug")]
577        println!("  Resources: {}", resource_count);
578        #[cfg(feature = "performance-debug")]
579        println!("  Parties: {}", party_count);
580        #[cfg(feature = "performance-debug")]
581        println!("  Deals: {}", deal_count);
582
583        assert!(header_count >= 1, "Should find message header");
584        assert!(release_count >= 1, "Should find releases");
585        assert!(resource_count >= 1, "Should find resources");
586        assert!(party_count >= 1, "Should find parties");
587        assert!(deal_count >= 1, "Should find deals");
588    }
589}