ddex_parser/streaming/
zero_copy_parser.rs

1//! Zero-copy high-performance streaming parser for DDEX XML
2//!
3//! This implementation targets 280+ MB/s throughput using:
4//! - Zero-copy string handling
5//! - SIMD-accelerated pattern matching
6
7#[allow(dead_code)] // Experimental zero-copy streaming parser
8// - Streaming-native parsing (no DOM)
9// - Memory-efficient buffer management
10// - Specialized DDEX parsing optimizations
11use crate::error::ParseError;
12use crate::streaming::{WorkingStreamingElement, WorkingStreamingStats};
13use ddex_core::models::versions::ERNVersion;
14use std::collections::HashMap;
15use std::io::BufRead;
16use std::time::Instant;
17
18/// Zero-copy high-performance streaming parser
19pub struct ZeroCopyParser {
20    /// Current buffer for zero-copy operations
21    buffer: Vec<u8>,
22    /// String intern cache to avoid allocations
23    string_cache: StringCache,
24    /// Parser state
25    state: ParserState,
26    /// Statistics
27    stats: ZeroCopyStats,
28    /// ERN version
29    version: ERNVersion,
30}
31
32/// String interning cache for zero-copy operations
33struct StringCache {
34    cache: HashMap<Vec<u8>, String>,
35    hit_count: u64,
36    miss_count: u64,
37}
38
39impl StringCache {
40    fn new() -> Self {
41        Self {
42            cache: HashMap::with_capacity(1024),
43            hit_count: 0,
44            miss_count: 0,
45        }
46    }
47
48    fn intern(&mut self, bytes: &[u8]) -> String {
49        if let Some(cached) = self.cache.get(bytes) {
50            self.hit_count += 1;
51            cached.clone()
52        } else {
53            self.miss_count += 1;
54            let s = String::from_utf8_lossy(bytes).to_string();
55            self.cache.insert(bytes.to_vec(), s.clone());
56            s
57        }
58    }
59
60    fn hit_rate(&self) -> f64 {
61        if self.hit_count + self.miss_count == 0 {
62            0.0
63        } else {
64            self.hit_count as f64 / (self.hit_count + self.miss_count) as f64
65        }
66    }
67}
68
69#[derive(Debug, Clone)]
70enum ParserState {
71    Initial,
72    InMessageHeader,
73    InRelease {
74        reference: String,
75    },
76    InResource {
77        resource_type: String,
78        reference: String,
79    },
80    Done,
81}
82
83/// High-performance parsed element
84#[derive(Debug, Clone)]
85pub enum ZeroCopyElement {
86    MessageHeader {
87        message_id: String,
88        created_date_time: String,
89        version: ERNVersion,
90    },
91    Release {
92        reference: String,
93        title: String,
94        genre: Option<String>,
95        resource_references: Vec<String>,
96    },
97    SoundRecording {
98        reference: String,
99        title: String,
100        duration: Option<String>,
101        isrc: Option<String>,
102        creation_date: Option<String>,
103    },
104    Video {
105        reference: String,
106        title: String,
107        duration: Option<String>,
108        codec: Option<String>,
109    },
110    Image {
111        reference: String,
112        title: String,
113        width: Option<u32>,
114        height: Option<u32>,
115        format: Option<String>,
116    },
117    Text {
118        reference: String,
119        title: String,
120        language: Option<String>,
121    },
122    EndOfStream {
123        stats: ZeroCopyStats,
124    },
125}
126
127#[derive(Debug, Clone)]
128pub struct ZeroCopyStats {
129    pub bytes_processed: u64,
130    pub elements_found: u64,
131    pub string_cache_hit_rate: f64,
132    pub parse_time: std::time::Duration,
133    pub throughput_mb_per_sec: f64,
134    pub memory_used_bytes: usize,
135}
136
137impl ZeroCopyParser {
138    pub fn new(version: ERNVersion) -> Self {
139        Self {
140            buffer: Vec::with_capacity(1024 * 1024), // 1MB buffer
141            string_cache: StringCache::new(),
142            state: ParserState::Initial,
143            stats: ZeroCopyStats {
144                bytes_processed: 0,
145                elements_found: 0,
146                string_cache_hit_rate: 0.0,
147                parse_time: std::time::Duration::default(),
148                throughput_mb_per_sec: 0.0,
149                memory_used_bytes: 0,
150            },
151            version,
152        }
153    }
154
155    /// High-performance streaming parse using SIMD and zero-copy techniques
156    pub fn parse_streaming(&mut self, data: &[u8]) -> Result<Vec<ZeroCopyElement>, ParseError> {
157        let start_time = Instant::now();
158        self.stats.bytes_processed += data.len() as u64;
159
160        let mut results = Vec::new();
161
162        // Use SIMD-accelerated pattern matching to find element boundaries
163        let release_positions = self.find_elements_simd(data, b"<Release")?;
164        let sound_recording_positions = self.find_elements_simd(data, b"<SoundRecording")?;
165        let video_positions = self.find_elements_simd(data, b"<Video")?;
166        let image_positions = self.find_elements_simd(data, b"<Image")?;
167        let text_positions = self.find_elements_simd(data, b"<Text")?;
168        let message_header_positions = self.find_elements_simd(data, b"<MessageHeader")?;
169
170        // Process message headers
171        for pos in message_header_positions {
172            if let Some(element) = self.extract_message_header(data, pos)? {
173                results.push(element);
174                self.stats.elements_found += 1;
175            }
176        }
177
178        // Process releases with zero-copy extraction
179        for pos in release_positions {
180            if let Some(element) = self.extract_release_zero_copy(data, pos)? {
181                results.push(element);
182                self.stats.elements_found += 1;
183            }
184        }
185
186        // Process sound recordings
187        for pos in sound_recording_positions {
188            if let Some(element) = self.extract_sound_recording_zero_copy(data, pos)? {
189                results.push(element);
190                self.stats.elements_found += 1;
191            }
192        }
193
194        // Process videos
195        for pos in video_positions {
196            if let Some(element) = self.extract_video_zero_copy(data, pos)? {
197                results.push(element);
198                self.stats.elements_found += 1;
199            }
200        }
201
202        // Process images
203        for pos in image_positions {
204            if let Some(element) = self.extract_image_zero_copy(data, pos)? {
205                results.push(element);
206                self.stats.elements_found += 1;
207            }
208        }
209
210        // Process text resources
211        for pos in text_positions {
212            if let Some(element) = self.extract_text_zero_copy(data, pos)? {
213                results.push(element);
214                self.stats.elements_found += 1;
215            }
216        }
217
218        // Update statistics
219        self.stats.parse_time = start_time.elapsed();
220        self.stats.string_cache_hit_rate = self.string_cache.hit_rate();
221        self.stats.throughput_mb_per_sec =
222            (data.len() as f64 / (1024.0 * 1024.0)) / self.stats.parse_time.as_secs_f64();
223        self.stats.memory_used_bytes = self.estimate_memory_usage();
224
225        Ok(results)
226    }
227
228    /// SIMD-accelerated element boundary detection
229    #[cfg(target_arch = "x86_64")]
230    fn find_elements_simd(&self, data: &[u8], pattern: &[u8]) -> Result<Vec<usize>, ParseError> {
231        use std::arch::x86_64::*;
232
233        let mut positions = Vec::new();
234
235        if pattern.len() == 0 || data.len() < pattern.len() {
236            return Ok(positions);
237        }
238
239        // For patterns longer than 16 bytes, fall back to memchr
240        if pattern.len() > 16 {
241            return self.find_elements_fallback(data, pattern);
242        }
243
244        // SIMD implementation for x86_64
245        unsafe {
246            let pattern_first = pattern[0];
247            let mut i = 0;
248
249            // Process 16 bytes at a time using SIMD
250            while i + 16 <= data.len() {
251                // Load 16 bytes
252                let chunk = _mm_loadu_si128(data.as_ptr().add(i) as *const __m128i);
253
254                // Create a vector of the first pattern byte
255                let pattern_vec = _mm_set1_epi8(pattern_first as i8);
256
257                // Compare
258                let matches = _mm_cmpeq_epi8(chunk, pattern_vec);
259
260                // Extract match mask
261                let mask = _mm_movemask_epi8(matches) as u16;
262
263                // Check each potential match
264                for bit_pos in 0..16 {
265                    if (mask & (1 << bit_pos)) != 0 {
266                        let pos = i + bit_pos;
267
268                        // Verify the full pattern matches
269                        if pos + pattern.len() <= data.len()
270                            && data[pos..pos + pattern.len()] == *pattern
271                        {
272                            positions.push(pos);
273                        }
274                    }
275                }
276
277                i += 16;
278            }
279
280            // Handle remaining bytes
281            while i + pattern.len() <= data.len() {
282                if data[i..i + pattern.len()] == *pattern {
283                    positions.push(i);
284                }
285                i += 1;
286            }
287        }
288
289        Ok(positions)
290    }
291
292    /// Fallback pattern matching for non-x86_64 or long patterns
293    #[cfg(not(target_arch = "x86_64"))]
294    fn find_elements_simd(&self, data: &[u8], pattern: &[u8]) -> Result<Vec<usize>, ParseError> {
295        self.find_elements_fallback(data, pattern)
296    }
297
298    fn find_elements_fallback(
299        &self,
300        data: &[u8],
301        pattern: &[u8],
302    ) -> Result<Vec<usize>, ParseError> {
303        let mut positions = Vec::new();
304        let mut start = 0;
305
306        // Use memchr for fast first-byte scanning
307        use memchr::memchr;
308
309        while let Some(pos) = memchr(pattern[0], &data[start..]) {
310            let abs_pos = start + pos;
311
312            // Check if full pattern matches
313            if abs_pos + pattern.len() <= data.len()
314                && data[abs_pos..abs_pos + pattern.len()] == *pattern
315            {
316                positions.push(abs_pos);
317            }
318
319            start = abs_pos + 1;
320        }
321
322        Ok(positions)
323    }
324
325    /// Zero-copy message header extraction
326    fn extract_message_header(
327        &mut self,
328        data: &[u8],
329        start: usize,
330    ) -> Result<Option<ZeroCopyElement>, ParseError> {
331        // Find the end of MessageHeader element
332        if let Some(end_pos) = self.find_closing_tag(data, start, b"MessageHeader") {
333            let header_data = &data[start..end_pos];
334
335            // Extract MessageId with zero-copy
336            let message_id =
337                if let Some(id_data) = self.extract_field_zero_copy(header_data, b"MessageId") {
338                    self.string_cache.intern(id_data)
339                } else {
340                    "unknown".to_string()
341                };
342
343            // Extract CreatedDateTime
344            let created_date_time = if let Some(dt_data) =
345                self.extract_field_zero_copy(header_data, b"CreatedDateTime")
346            {
347                self.string_cache.intern(dt_data)
348            } else {
349                chrono::Utc::now().to_rfc3339()
350            };
351
352            return Ok(Some(ZeroCopyElement::MessageHeader {
353                message_id,
354                created_date_time,
355                version: self.version,
356            }));
357        }
358
359        Ok(None)
360    }
361
362    /// Zero-copy release extraction
363    fn extract_release_zero_copy(
364        &mut self,
365        data: &[u8],
366        start: usize,
367    ) -> Result<Option<ZeroCopyElement>, ParseError> {
368        if let Some(end_pos) = self.find_closing_tag(data, start, b"Release") {
369            let release_data = &data[start..end_pos];
370
371            // Extract ReleaseReference attribute
372            let reference = if let Some(ref_data) =
373                self.extract_attribute_zero_copy(release_data, b"ReleaseReference")
374            {
375                self.string_cache.intern(ref_data)
376            } else {
377                format!("REL-{}", self.stats.elements_found)
378            };
379
380            // Extract title with nested TitleText handling
381            let title = if let Some(title_data) =
382                self.extract_nested_field_zero_copy(release_data, b"TitleText")
383            {
384                self.string_cache.intern(title_data)
385            } else if let Some(title_data) = self.extract_field_zero_copy(release_data, b"Title") {
386                self.string_cache.intern(title_data)
387            } else {
388                "Untitled Release".to_string()
389            };
390
391            // Extract genre
392            let genre = self
393                .extract_nested_field_zero_copy(release_data, b"GenreText")
394                .map(|g| self.string_cache.intern(g));
395
396            // Extract resource references (simplified)
397            let resource_references = self.extract_resource_references_zero_copy(release_data);
398
399            return Ok(Some(ZeroCopyElement::Release {
400                reference,
401                title,
402                genre,
403                resource_references,
404            }));
405        }
406
407        Ok(None)
408    }
409
410    /// Zero-copy sound recording extraction
411    fn extract_sound_recording_zero_copy(
412        &mut self,
413        data: &[u8],
414        start: usize,
415    ) -> Result<Option<ZeroCopyElement>, ParseError> {
416        if let Some(end_pos) = self.find_closing_tag(data, start, b"SoundRecording") {
417            let recording_data = &data[start..end_pos];
418
419            let reference = if let Some(ref_data) =
420                self.extract_attribute_zero_copy(recording_data, b"ResourceReference")
421            {
422                self.string_cache.intern(ref_data)
423            } else {
424                format!("RES-{}", self.stats.elements_found)
425            };
426
427            let title = if let Some(title_data) =
428                self.extract_nested_field_zero_copy(recording_data, b"TitleText")
429            {
430                self.string_cache.intern(title_data)
431            } else {
432                "Untitled Track".to_string()
433            };
434
435            let duration = self
436                .extract_field_zero_copy(recording_data, b"Duration")
437                .map(|d| self.string_cache.intern(d));
438
439            let isrc = self
440                .extract_field_zero_copy(recording_data, b"ISRC")
441                .map(|i| self.string_cache.intern(i));
442
443            let creation_date = self
444                .extract_field_zero_copy(recording_data, b"CreationDate")
445                .map(|cd| self.string_cache.intern(cd));
446
447            return Ok(Some(ZeroCopyElement::SoundRecording {
448                reference,
449                title,
450                duration,
451                isrc,
452                creation_date,
453            }));
454        }
455
456        Ok(None)
457    }
458
459    /// Zero-copy video extraction
460    fn extract_video_zero_copy(
461        &mut self,
462        data: &[u8],
463        start: usize,
464    ) -> Result<Option<ZeroCopyElement>, ParseError> {
465        if let Some(end_pos) = self.find_closing_tag(data, start, b"Video") {
466            let video_data = &data[start..end_pos];
467
468            let reference = if let Some(ref_data) =
469                self.extract_attribute_zero_copy(video_data, b"ResourceReference")
470            {
471                self.string_cache.intern(ref_data)
472            } else {
473                format!("VID-{}", self.stats.elements_found)
474            };
475
476            let title = if let Some(title_data) =
477                self.extract_nested_field_zero_copy(video_data, b"TitleText")
478            {
479                self.string_cache.intern(title_data)
480            } else {
481                "Untitled Video".to_string()
482            };
483
484            let duration = self
485                .extract_field_zero_copy(video_data, b"Duration")
486                .map(|d| self.string_cache.intern(d));
487
488            let codec = self
489                .extract_field_zero_copy(video_data, b"VideoCodecType")
490                .map(|c| self.string_cache.intern(c));
491
492            return Ok(Some(ZeroCopyElement::Video {
493                reference,
494                title,
495                duration,
496                codec,
497            }));
498        }
499
500        Ok(None)
501    }
502
503    /// Zero-copy image extraction
504    fn extract_image_zero_copy(
505        &mut self,
506        data: &[u8],
507        start: usize,
508    ) -> Result<Option<ZeroCopyElement>, ParseError> {
509        if let Some(end_pos) = self.find_closing_tag(data, start, b"Image") {
510            let image_data = &data[start..end_pos];
511
512            let reference = if let Some(ref_data) =
513                self.extract_attribute_zero_copy(image_data, b"ResourceReference")
514            {
515                self.string_cache.intern(ref_data)
516            } else {
517                format!("IMG-{}", self.stats.elements_found)
518            };
519
520            let title = if let Some(title_data) =
521                self.extract_nested_field_zero_copy(image_data, b"TitleText")
522            {
523                self.string_cache.intern(title_data)
524            } else {
525                "Untitled Image".to_string()
526            };
527
528            let width = self
529                .extract_field_zero_copy(image_data, b"Width")
530                .and_then(|w| String::from_utf8_lossy(w).parse().ok());
531
532            let height = self
533                .extract_field_zero_copy(image_data, b"Height")
534                .and_then(|h| String::from_utf8_lossy(h).parse().ok());
535
536            let format = self
537                .extract_field_zero_copy(image_data, b"ImageCodecType")
538                .map(|f| self.string_cache.intern(f));
539
540            return Ok(Some(ZeroCopyElement::Image {
541                reference,
542                title,
543                width,
544                height,
545                format,
546            }));
547        }
548
549        Ok(None)
550    }
551
552    /// Zero-copy text resource extraction
553    fn extract_text_zero_copy(
554        &mut self,
555        data: &[u8],
556        start: usize,
557    ) -> Result<Option<ZeroCopyElement>, ParseError> {
558        if let Some(end_pos) = self.find_closing_tag(data, start, b"Text") {
559            let text_data = &data[start..end_pos];
560
561            let reference = if let Some(ref_data) =
562                self.extract_attribute_zero_copy(text_data, b"ResourceReference")
563            {
564                self.string_cache.intern(ref_data)
565            } else {
566                format!("TXT-{}", self.stats.elements_found)
567            };
568
569            let title = if let Some(title_data) =
570                self.extract_nested_field_zero_copy(text_data, b"TitleText")
571            {
572                self.string_cache.intern(title_data)
573            } else {
574                "Untitled Text".to_string()
575            };
576
577            let language = self
578                .extract_field_zero_copy(text_data, b"LanguageOfPerformance")
579                .or_else(|| self.extract_field_zero_copy(text_data, b"LanguageCode"))
580                .map(|l| self.string_cache.intern(l));
581
582            return Ok(Some(ZeroCopyElement::Text {
583                reference,
584                title,
585                language,
586            }));
587        }
588
589        Ok(None)
590    }
591
592    /// Find closing tag position
593    fn find_closing_tag(&self, data: &[u8], start: usize, tag_name: &[u8]) -> Option<usize> {
594        let closing_pattern = [b"</", tag_name, b">"].concat();
595
596        // Start search after the opening tag
597        let search_start = start + tag_name.len();
598        if let Ok(positions) = self.find_elements_fallback(&data[search_start..], &closing_pattern)
599        {
600            if let Some(pos) = positions.first() {
601                return Some(search_start + pos + closing_pattern.len());
602            }
603        }
604
605        None
606    }
607
608    /// Extract field content with zero-copy
609    fn extract_field_zero_copy<'a>(&self, data: &'a [u8], field_name: &[u8]) -> Option<&'a [u8]> {
610        let opening = [b"<", field_name, b">"].concat();
611        let closing = [b"</", field_name, b">"].concat();
612
613        if let Ok(start_positions) = self.find_elements_fallback(data, &opening) {
614            if let Some(&start_pos) = start_positions.first() {
615                let content_start = start_pos + opening.len();
616
617                if let Ok(end_positions) =
618                    self.find_elements_fallback(&data[content_start..], &closing)
619                {
620                    if let Some(&end_pos) = end_positions.first() {
621                        let content_end = content_start + end_pos;
622                        return Some(&data[content_start..content_end]);
623                    }
624                }
625            }
626        }
627
628        None
629    }
630
631    /// Extract nested field content (e.g., ReferenceTitle/TitleText)
632    fn extract_nested_field_zero_copy<'a>(
633        &self,
634        data: &'a [u8],
635        inner_field: &[u8],
636    ) -> Option<&'a [u8]> {
637        // Look for the inner field directly first
638        if let Some(content) = self.extract_field_zero_copy(data, inner_field) {
639            return Some(content);
640        }
641
642        // Look within common parent elements
643        let parent_tags: &[&[u8]] = &[b"ReferenceTitle", b"Title"];
644
645        for parent in parent_tags {
646            if let Some(parent_content) = self.extract_field_zero_copy(data, parent) {
647                if let Some(inner_content) =
648                    self.extract_field_zero_copy(parent_content, inner_field)
649                {
650                    return Some(inner_content);
651                }
652            }
653        }
654
655        None
656    }
657
658    /// Extract attribute value with zero-copy
659    fn extract_attribute_zero_copy<'a>(
660        &self,
661        data: &'a [u8],
662        attr_name: &[u8],
663    ) -> Option<&'a [u8]> {
664        let pattern = [attr_name, b"=\""].concat();
665
666        if let Ok(positions) = self.find_elements_fallback(data, &pattern) {
667            if let Some(&pos) = positions.first() {
668                let value_start = pos + pattern.len();
669
670                // Find the closing quote
671                if let Some(quote_pos) = memchr::memchr(b'"', &data[value_start..]) {
672                    let value_end = value_start + quote_pos;
673                    return Some(&data[value_start..value_end]);
674                }
675            }
676        }
677
678        None
679    }
680
681    /// Extract resource references (simplified zero-copy version)
682    fn extract_resource_references_zero_copy(&mut self, data: &[u8]) -> Vec<String> {
683        let mut references = Vec::new();
684
685        // Look for ResourceReference elements
686        if let Ok(positions) = self.find_elements_fallback(data, b"<ResourceReference>") {
687            for pos in positions {
688                if let Some(ref_data) =
689                    self.extract_field_zero_copy(&data[pos..], b"ResourceReference")
690                {
691                    references.push(self.string_cache.intern(ref_data));
692                }
693            }
694        }
695
696        references
697    }
698
699    fn find_closing_tag_simple(&self, data: &[u8], start: usize, tag_name: &str) -> Option<usize> {
700        let closing_tag = format!("</{}>", tag_name);
701        let closing_bytes = closing_tag.as_bytes();
702
703        if let Ok(positions) = self.find_elements_fallback(&data[start..], closing_bytes) {
704            if let Some(&pos) = positions.first() {
705                return Some(start + pos + closing_bytes.len());
706            }
707        }
708
709        None
710    }
711
712    fn estimate_memory_usage(&self) -> usize {
713        self.buffer.capacity() +
714        self.string_cache.cache.capacity() * 64 + // Rough estimate
715        std::mem::size_of::<Self>()
716    }
717
718    pub fn get_stats(&self) -> &ZeroCopyStats {
719        &self.stats
720    }
721}
722
723/// High-performance stream iterator that integrates with existing API
724pub struct ZeroCopyStreamIterator<R: BufRead> {
725    reader: R,
726    parser: ZeroCopyParser,
727    buffer: Vec<u8>,
728    finished: bool,
729    elements_queue: Vec<ZeroCopyElement>,
730    current_index: usize,
731    start_time: Instant,
732}
733
734impl<R: BufRead> ZeroCopyStreamIterator<R> {
735    pub fn new(mut reader: R, version: ERNVersion) -> Self {
736        let mut buffer = Vec::with_capacity(1024 * 1024); // 1MB buffer
737        let _ = reader.read_to_end(&mut buffer);
738
739        Self {
740            reader,
741            parser: ZeroCopyParser::new(version),
742            buffer,
743            finished: false,
744            elements_queue: Vec::new(),
745            current_index: 0,
746            start_time: Instant::now(),
747        }
748    }
749
750    pub fn stats(&self) -> WorkingStreamingStats {
751        let zero_copy_stats = self.parser.get_stats();
752        WorkingStreamingStats {
753            bytes_processed: zero_copy_stats.bytes_processed,
754            elements_yielded: zero_copy_stats.elements_found as usize,
755            current_depth: 0,
756            max_depth_reached: 10, // Estimated
757            current_memory_bytes: zero_copy_stats.memory_used_bytes,
758            max_memory_used_bytes: zero_copy_stats.memory_used_bytes,
759            elapsed_time: self.start_time.elapsed(),
760            throughput_mb_per_sec: zero_copy_stats.throughput_mb_per_sec,
761        }
762    }
763
764    fn convert_to_working_element(element: ZeroCopyElement) -> WorkingStreamingElement {
765        match element {
766            ZeroCopyElement::MessageHeader {
767                message_id,
768                created_date_time,
769                version,
770            } => WorkingStreamingElement::MessageHeader {
771                message_id,
772                created_date_time,
773                version,
774            },
775            ZeroCopyElement::Release {
776                reference,
777                title,
778                resource_references,
779                ..
780            } => WorkingStreamingElement::Release {
781                reference,
782                title,
783                resource_references,
784            },
785            ZeroCopyElement::SoundRecording {
786                reference,
787                title,
788                duration,
789                isrc,
790                ..
791            } => WorkingStreamingElement::SoundRecording {
792                reference,
793                title,
794                duration,
795                isrc,
796            },
797            ZeroCopyElement::Video {
798                reference,
799                title,
800                duration,
801                ..
802            } => WorkingStreamingElement::Video {
803                reference,
804                title,
805                duration,
806            },
807            ZeroCopyElement::Image {
808                reference,
809                title,
810                width,
811                height,
812                ..
813            } => WorkingStreamingElement::Image {
814                reference,
815                title,
816                width,
817                height,
818            },
819            ZeroCopyElement::Text {
820                reference,
821                title,
822                language,
823            } => WorkingStreamingElement::Text {
824                reference,
825                title,
826                language_code: language,
827            },
828            ZeroCopyElement::EndOfStream { stats } => {
829                WorkingStreamingElement::EndOfStream {
830                    stats: WorkingStreamingStats {
831                        bytes_processed: stats.bytes_processed,
832                        elements_yielded: stats.elements_found as usize,
833                        current_depth: 0,
834                        max_depth_reached: 10, // Estimated
835                        current_memory_bytes: stats.memory_used_bytes,
836                        max_memory_used_bytes: stats.memory_used_bytes,
837                        elapsed_time: stats.parse_time,
838                        throughput_mb_per_sec: stats.throughput_mb_per_sec,
839                    },
840                }
841            }
842        }
843    }
844}
845
846impl<R: BufRead> Iterator for ZeroCopyStreamIterator<R> {
847    type Item = Result<WorkingStreamingElement, ParseError>;
848
849    fn next(&mut self) -> Option<Self::Item> {
850        if self.finished {
851            return None;
852        }
853
854        // If we haven't processed the data yet, do it now
855        if self.elements_queue.is_empty() && self.current_index == 0 {
856            match self.parser.parse_streaming(&self.buffer) {
857                Ok(mut elements) => {
858                    // Add end-of-stream marker
859                    elements.push(ZeroCopyElement::EndOfStream {
860                        stats: self.parser.get_stats().clone(),
861                    });
862                    self.elements_queue = elements;
863                }
864                Err(e) => {
865                    self.finished = true;
866                    return Some(Err(e));
867                }
868            }
869        }
870
871        // Return next element from queue
872        if self.current_index < self.elements_queue.len() {
873            let element = self.elements_queue[self.current_index].clone();
874            self.current_index += 1;
875
876            // Check if this is the last element
877            if matches!(element, ZeroCopyElement::EndOfStream { .. }) {
878                self.finished = true;
879            }
880
881            Some(Ok(Self::convert_to_working_element(element)))
882        } else {
883            self.finished = true;
884            None
885        }
886    }
887}
888
889/// High-performance iterator wrapper for backward compatibility
890pub struct ZeroCopyIterator {
891    parser: ZeroCopyParser,
892    data: Vec<u8>,
893    position: usize,
894    chunk_size: usize,
895    finished: bool,
896}
897
898impl ZeroCopyIterator {
899    pub fn new(data: Vec<u8>, version: ERNVersion, chunk_size: usize) -> Self {
900        Self {
901            parser: ZeroCopyParser::new(version),
902            data,
903            position: 0,
904            chunk_size: chunk_size.max(1024), // Minimum 1KB chunks
905            finished: false,
906        }
907    }
908
909    pub fn parse_all(&mut self) -> Result<Vec<ZeroCopyElement>, ParseError> {
910        let mut all_elements = Vec::new();
911
912        // Process the entire data at once for maximum performance
913        let elements = self.parser.parse_streaming(&self.data)?;
914        all_elements.extend(elements);
915
916        // Add end-of-stream marker
917        all_elements.push(ZeroCopyElement::EndOfStream {
918            stats: self.parser.get_stats().clone(),
919        });
920
921        self.finished = true;
922        Ok(all_elements)
923    }
924
925    pub fn stats(&self) -> &ZeroCopyStats {
926        self.parser.get_stats()
927    }
928}
929
930#[cfg(test)]
931mod tests {
932    use super::*;
933
934    #[test]
935    fn test_zero_copy_basic_parsing() {
936        let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
937<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
938    <MessageHeader>
939        <MessageId>ZERO-COPY-TEST</MessageId>
940        <CreatedDateTime>2023-01-01T00:00:00Z</CreatedDateTime>
941    </MessageHeader>
942    <Release ReleaseReference="ZC-REL-001">
943        <ReferenceTitle>
944            <TitleText>Zero Copy Release</TitleText>
945        </ReferenceTitle>
946    </Release>
947</ern:NewReleaseMessage>"#;
948
949        let mut parser = ZeroCopyParser::new(ERNVersion::V4_3);
950        let elements = parser.parse_streaming(xml.as_bytes()).unwrap();
951
952        assert!(!elements.is_empty(), "Should find elements");
953        println!("Zero-copy parsing found {} elements", elements.len());
954
955        // Verify we found expected elements
956        let has_header = elements
957            .iter()
958            .any(|e| matches!(e, ZeroCopyElement::MessageHeader { .. }));
959        let has_release = elements
960            .iter()
961            .any(|e| matches!(e, ZeroCopyElement::Release { .. }));
962
963        assert!(has_header, "Should find message header");
964        assert!(has_release, "Should find release");
965
966        let stats = parser.get_stats();
967        println!(
968            "Zero-copy stats: {:.2} MB/s, {}% cache hit rate",
969            stats.throughput_mb_per_sec,
970            stats.string_cache_hit_rate * 100.0
971        );
972    }
973
974    #[test]
975    fn test_simd_pattern_matching() {
976        let data = b"<Release><Release><Release>";
977        let parser = ZeroCopyParser::new(ERNVersion::V4_3);
978
979        let positions = parser.find_elements_simd(data, b"<Release").unwrap();
980        assert_eq!(positions.len(), 3, "Should find 3 occurrences");
981        assert_eq!(positions, vec![0, 9, 18]);
982    }
983
984    #[test]
985    fn test_zero_copy_field_extraction() {
986        let data = b"<Title>Test Title</Title>";
987        let parser = ZeroCopyParser::new(ERNVersion::V4_3);
988
989        let content = parser.extract_field_zero_copy(data, b"Title").unwrap();
990        assert_eq!(content, b"Test Title");
991    }
992
993    #[test]
994    fn test_attribute_extraction() {
995        let data = b"<Release ReleaseReference=\"REL-123\">content</Release>";
996        let parser = ZeroCopyParser::new(ERNVersion::V4_3);
997
998        let attr_value = parser
999            .extract_attribute_zero_copy(data, b"ReleaseReference")
1000            .unwrap();
1001        assert_eq!(attr_value, b"REL-123");
1002    }
1003}