ddex_parser/streaming/
fast_zero_copy.rs

1//! Fast zero-copy streaming parser optimized for maximum throughput
2//!
3//! This implementation focuses on:
4//! - True streaming (not batch processing)
5//! - Minimal allocations using string interning
6//! - Fast pattern matching with memchr
7//! - Simple but efficient element extraction
8
9use crate::error::ParseError;
10use crate::streaming::{WorkingStreamingElement, WorkingStreamingStats};
11use ddex_core::models::versions::ERNVersion;
12use std::collections::HashMap;
13use std::io::BufRead;
14use std::time::Instant;
15
16/// Fast zero-copy streaming parser
17pub struct FastZeroCopyParser {
18    /// Buffer for reading chunks
19    read_buffer: Vec<u8>,
20    /// Leftover data from previous chunk
21    leftover: Vec<u8>,
22    /// String cache for avoiding allocations
23    string_cache: HashMap<Vec<u8>, String>,
24    /// Statistics
25    bytes_processed: u64,
26    elements_found: u64,
27    start_time: Instant,
28}
29
30impl FastZeroCopyParser {
31    pub fn new() -> Self {
32        Self {
33            read_buffer: vec![0; 64 * 1024], // 64KB buffer
34            leftover: Vec::new(),
35            string_cache: HashMap::with_capacity(512),
36            bytes_processed: 0,
37            elements_found: 0,
38            start_time: Instant::now(),
39        }
40    }
41
42    /// Get interned string to avoid allocations
43    fn intern_string(&mut self, bytes: &[u8]) -> String {
44        if let Some(cached) = self.string_cache.get(bytes) {
45            cached.clone()
46        } else {
47            let s = String::from_utf8_lossy(bytes).to_string();
48            self.string_cache.insert(bytes.to_vec(), s.clone());
49            s
50        }
51    }
52
53    /// Fast element extraction using memchr for initial scanning
54    pub fn parse_chunk(
55        &mut self,
56        chunk: &[u8],
57    ) -> Result<Vec<WorkingStreamingElement>, ParseError> {
58        self.bytes_processed += chunk.len() as u64;
59        let mut results = Vec::new();
60
61        // Combine leftover with new chunk
62        let mut data = Vec::with_capacity(self.leftover.len() + chunk.len());
63        data.extend_from_slice(&self.leftover);
64        data.extend_from_slice(chunk);
65
66        // Find MessageHeader elements
67        let mut pos = 0;
68        while let Some(start) = self.find_pattern(&data[pos..], b"<MessageHeader") {
69            let abs_start = pos + start;
70            if let Some(element) = self.extract_message_header_fast(&data, abs_start)? {
71                results.push(element);
72                self.elements_found += 1;
73            }
74            pos = abs_start + 14; // Skip past "<MessageHeader"
75        }
76
77        // Find Release elements
78        pos = 0;
79        while let Some(start) = self.find_pattern(&data[pos..], b"<Release ") {
80            let abs_start = pos + start;
81            if let Some(element) = self.extract_release_fast(&data, abs_start)? {
82                results.push(element);
83                self.elements_found += 1;
84            }
85            pos = abs_start + 9; // Skip past "<Release "
86        }
87
88        // Find SoundRecording elements
89        pos = 0;
90        while let Some(start) = self.find_pattern(&data[pos..], b"<SoundRecording ") {
91            let abs_start = pos + start;
92            if let Some(element) = self.extract_sound_recording_fast(&data, abs_start)? {
93                results.push(element);
94                self.elements_found += 1;
95            }
96            pos = abs_start + 16; // Skip past "<SoundRecording "
97        }
98
99        // Store leftover data that might contain incomplete elements
100        if data.len() > 2048 {
101            // Keep last 2KB to handle elements spanning chunks
102            self.leftover.clear();
103            self.leftover.extend_from_slice(&data[data.len() - 2048..]);
104        } else {
105            self.leftover = data;
106        }
107
108        Ok(results)
109    }
110
111    /// Fast pattern finding using memchr
112    fn find_pattern(&self, data: &[u8], pattern: &[u8]) -> Option<usize> {
113        if pattern.is_empty() {
114            return None;
115        }
116
117        // Use memchr to find first byte quickly, then verify full pattern
118        let mut pos = 0;
119        while let Some(first_byte_pos) = memchr::memchr(pattern[0], &data[pos..]) {
120            let abs_pos = pos + first_byte_pos;
121
122            if abs_pos + pattern.len() <= data.len()
123                && &data[abs_pos..abs_pos + pattern.len()] == pattern
124            {
125                return Some(abs_pos);
126            }
127
128            pos = abs_pos + 1;
129        }
130
131        None
132    }
133
134    /// Fast message header extraction
135    fn extract_message_header_fast(
136        &mut self,
137        data: &[u8],
138        start: usize,
139    ) -> Result<Option<WorkingStreamingElement>, ParseError> {
140        // Find closing tag
141        if let Some(end) = self.find_pattern(&data[start..], b"</MessageHeader>") {
142            let header_data = &data[start..start + end + 16]; // Include closing tag
143
144            // Extract MessageId quickly
145            let message_id = if let Some(id) = self.extract_tag_content(header_data, b"MessageId") {
146                self.intern_string(id)
147            } else {
148                "unknown".to_string()
149            };
150
151            // Extract CreatedDateTime
152            let created_date_time =
153                if let Some(dt) = self.extract_tag_content(header_data, b"CreatedDateTime") {
154                    self.intern_string(dt)
155                } else {
156                    chrono::Utc::now().to_rfc3339()
157                };
158
159            return Ok(Some(WorkingStreamingElement::MessageHeader {
160                message_id,
161                created_date_time,
162                version: ERNVersion::V4_3,
163            }));
164        }
165
166        Ok(None)
167    }
168
169    /// Fast release extraction
170    fn extract_release_fast(
171        &mut self,
172        data: &[u8],
173        start: usize,
174    ) -> Result<Option<WorkingStreamingElement>, ParseError> {
175        // Find closing tag
176        if let Some(end) = self.find_pattern(&data[start..], b"</Release>") {
177            let release_data = &data[start..start + end + 10]; // Include closing tag
178
179            // Extract ReleaseReference attribute from opening tag
180            let reference = if let Some(attr) =
181                self.extract_attribute_fast(release_data, b"ReleaseReference")
182            {
183                self.intern_string(attr)
184            } else {
185                format!("REL-{}", self.elements_found)
186            };
187
188            // Extract title from TitleText nested in ReferenceTitle
189            let title =
190                if let Some(title_data) = self.extract_tag_content(release_data, b"TitleText") {
191                    self.intern_string(title_data)
192                } else {
193                    "Untitled Release".to_string()
194                };
195
196            // Extract resource references (simplified)
197            let resource_references = self.extract_resource_references_fast(release_data);
198
199            return Ok(Some(WorkingStreamingElement::Release {
200                reference,
201                title,
202                resource_references,
203            }));
204        }
205
206        Ok(None)
207    }
208
209    /// Fast sound recording extraction
210    fn extract_sound_recording_fast(
211        &mut self,
212        data: &[u8],
213        start: usize,
214    ) -> Result<Option<WorkingStreamingElement>, ParseError> {
215        if let Some(end) = self.find_pattern(&data[start..], b"</SoundRecording>") {
216            let recording_data = &data[start..start + end + 17]; // Include closing tag
217
218            let reference = if let Some(attr) =
219                self.extract_attribute_fast(recording_data, b"ResourceReference")
220            {
221                self.intern_string(attr)
222            } else {
223                format!("RES-{}", self.elements_found)
224            };
225
226            let title =
227                if let Some(title_data) = self.extract_tag_content(recording_data, b"TitleText") {
228                    self.intern_string(title_data)
229                } else {
230                    "Untitled Track".to_string()
231                };
232
233            let duration = self
234                .extract_tag_content(recording_data, b"Duration")
235                .map(|d| self.intern_string(d));
236
237            let isrc = self
238                .extract_tag_content(recording_data, b"ISRC")
239                .map(|i| self.intern_string(i));
240
241            return Ok(Some(WorkingStreamingElement::SoundRecording {
242                reference,
243                title,
244                duration,
245                isrc,
246            }));
247        }
248
249        Ok(None)
250    }
251
252    /// Extract content between XML tags
253    fn extract_tag_content<'a>(&self, data: &'a [u8], tag_name: &[u8]) -> Option<&'a [u8]> {
254        // Create opening and closing tags
255        let opening = [b"<", tag_name, b">"].concat();
256        let closing = [b"</", tag_name, b">"].concat();
257
258        if let Some(start_pos) = self.find_pattern(data, &opening) {
259            let content_start = start_pos + opening.len();
260            if let Some(end_pos) = self.find_pattern(&data[content_start..], &closing) {
261                let content_end = content_start + end_pos;
262                return Some(&data[content_start..content_end]);
263            }
264        }
265
266        None
267    }
268
269    /// Extract attribute value from XML tag
270    fn extract_attribute_fast<'a>(&self, data: &'a [u8], attr_name: &[u8]) -> Option<&'a [u8]> {
271        let pattern = [attr_name, b"=\""].concat();
272
273        if let Some(start_pos) = self.find_pattern(data, &pattern) {
274            let value_start = start_pos + pattern.len();
275
276            // Find closing quote
277            if let Some(quote_pos) = memchr::memchr(b'"', &data[value_start..]) {
278                let value_end = value_start + quote_pos;
279                return Some(&data[value_start..value_end]);
280            }
281        }
282
283        None
284    }
285
286    /// Extract resource references (simplified)
287    fn extract_resource_references_fast(&mut self, data: &[u8]) -> Vec<String> {
288        let mut refs = Vec::new();
289        let mut pos = 0;
290
291        // Look for ResourceReference tags
292        while let Some(start) = self.find_pattern(&data[pos..], b"<ResourceReference>") {
293            let abs_start = pos + start;
294            if let Some(content) =
295                self.extract_tag_content(&data[abs_start..], b"ResourceReference")
296            {
297                refs.push(self.intern_string(content));
298            }
299            pos = abs_start + 19; // Skip past "<ResourceReference>"
300        }
301
302        refs
303    }
304
305    /// Get current statistics
306    pub fn stats(&self) -> WorkingStreamingStats {
307        let elapsed = self.start_time.elapsed();
308        let throughput = if elapsed.as_secs_f64() > 0.0 {
309            (self.bytes_processed as f64 / (1024.0 * 1024.0)) / elapsed.as_secs_f64()
310        } else {
311            0.0
312        };
313
314        WorkingStreamingStats {
315            bytes_processed: self.bytes_processed,
316            elements_yielded: self.elements_found as usize,
317            current_depth: 0,
318            max_depth_reached: 10,
319            current_memory_bytes: self.read_buffer.capacity() + self.leftover.capacity(),
320            max_memory_used_bytes: self.read_buffer.capacity() + self.leftover.capacity(),
321            elapsed_time: elapsed,
322            throughput_mb_per_sec: throughput,
323        }
324    }
325}
326
327impl Default for FastZeroCopyParser {
328    fn default() -> Self {
329        Self::new()
330    }
331}
332
333/// Fast streaming iterator
334pub struct FastZeroCopyIterator<R: BufRead> {
335    reader: R,
336    parser: FastZeroCopyParser,
337    buffer: Vec<u8>,
338    finished: bool,
339    elements_queue: Vec<WorkingStreamingElement>,
340    current_index: usize,
341}
342
343impl<R: BufRead> FastZeroCopyIterator<R> {
344    pub fn new(reader: R, _version: ERNVersion) -> Self {
345        Self {
346            reader,
347            parser: FastZeroCopyParser::new(),
348            buffer: vec![0; 64 * 1024], // 64KB chunks
349            finished: false,
350            elements_queue: Vec::new(),
351            current_index: 0,
352        }
353    }
354
355    pub fn stats(&self) -> WorkingStreamingStats {
356        self.parser.stats()
357    }
358
359    fn read_next_chunk(&mut self) -> Result<bool, ParseError> {
360        let bytes_read = self.reader.read(&mut self.buffer)?;
361
362        if bytes_read == 0 {
363            return Ok(false); // EOF
364        }
365
366        let elements = self.parser.parse_chunk(&self.buffer[..bytes_read])?;
367        self.elements_queue.extend(elements);
368
369        Ok(true)
370    }
371}
372
373impl<R: BufRead> Iterator for FastZeroCopyIterator<R> {
374    type Item = Result<WorkingStreamingElement, ParseError>;
375
376    fn next(&mut self) -> Option<Self::Item> {
377        if self.finished {
378            return None;
379        }
380
381        // Return queued elements first
382        if self.current_index < self.elements_queue.len() {
383            let element = self.elements_queue[self.current_index].clone();
384            self.current_index += 1;
385            return Some(Ok(element));
386        }
387
388        // Try to read more data
389        match self.read_next_chunk() {
390            Ok(true) => {
391                // We read some data, try again
392                self.next()
393            }
394            Ok(false) => {
395                // EOF reached
396                self.finished = true;
397                Some(Ok(WorkingStreamingElement::EndOfStream {
398                    stats: self.parser.stats(),
399                }))
400            }
401            Err(e) => {
402                self.finished = true;
403                Some(Err(e))
404            }
405        }
406    }
407}
408
409#[cfg(test)]
410mod tests {
411    use super::*;
412    use std::io::Cursor;
413
414    #[test]
415    fn test_fast_zero_copy_basic() {
416        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
417<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
418    <MessageHeader>
419        <MessageId>FAST-TEST-MSG</MessageId>
420        <CreatedDateTime>2023-01-01T00:00:00Z</CreatedDateTime>
421    </MessageHeader>
422    <Release ReleaseReference="FAST-REL-001">
423        <ReferenceTitle>
424            <TitleText>Fast Zero Copy Release</TitleText>
425        </ReferenceTitle>
426    </Release>
427</ern:NewReleaseMessage>"#;
428
429        let cursor = Cursor::new(xml.as_bytes());
430        let mut iterator = FastZeroCopyIterator::new(cursor, ERNVersion::V4_3);
431
432        let elements: Result<Vec<_>, _> = iterator.collect();
433        assert!(elements.is_ok(), "Fast zero-copy parsing should work");
434
435        let elements = elements.unwrap();
436        assert!(!elements.is_empty(), "Should find elements");
437
438        // Verify elements
439        let has_header = elements
440            .iter()
441            .any(|e| matches!(e, WorkingStreamingElement::MessageHeader { .. }));
442        let has_release = elements
443            .iter()
444            .any(|e| matches!(e, WorkingStreamingElement::Release { .. }));
445        let has_end_stream = elements
446            .iter()
447            .any(|e| matches!(e, WorkingStreamingElement::EndOfStream { .. }));
448
449        assert!(has_header, "Should find message header");
450        assert!(has_release, "Should find release");
451        assert!(has_end_stream, "Should find end of stream");
452
453        println!("✅ Fast zero-copy parser basic test passed!");
454    }
455
456    #[test]
457    fn test_fast_pattern_matching() {
458        let parser = FastZeroCopyParser::new();
459        let data = b"<Release><MessageHeader><SoundRecording>";
460
461        assert_eq!(parser.find_pattern(data, b"<Release>"), Some(0));
462        assert_eq!(parser.find_pattern(data, b"<MessageHeader>"), Some(9));
463        assert_eq!(parser.find_pattern(data, b"<SoundRecording>"), Some(24));
464        assert_eq!(parser.find_pattern(data, b"<NotFound>"), None);
465    }
466
467    #[test]
468    fn test_tag_content_extraction() {
469        let parser = FastZeroCopyParser::new();
470        let data = b"<Title>Test Title</Title>";
471
472        let content = parser.extract_tag_content(data, b"Title").unwrap();
473        assert_eq!(content, b"Test Title");
474    }
475
476    #[test]
477    fn test_attribute_extraction() {
478        let parser = FastZeroCopyParser::new();
479        let data = b"<Release ReleaseReference=\"REL-123\">";
480
481        let attr_value = parser
482            .extract_attribute_fast(data, b"ReleaseReference")
483            .unwrap();
484        assert_eq!(attr_value, b"REL-123");
485    }
486}