1#[allow(dead_code)] use crate::error::ParseError;
12use crate::streaming::{WorkingStreamingElement, WorkingStreamingStats};
13use ddex_core::models::versions::ERNVersion;
14use std::collections::HashMap;
15use std::io::BufRead;
16use std::time::Instant;
17
18pub struct ZeroCopyParser {
20 buffer: Vec<u8>,
22 string_cache: StringCache,
24 state: ParserState,
26 stats: ZeroCopyStats,
28 version: ERNVersion,
30}
31
32struct StringCache {
34 cache: HashMap<Vec<u8>, String>,
35 hit_count: u64,
36 miss_count: u64,
37}
38
39impl StringCache {
40 fn new() -> Self {
41 Self {
42 cache: HashMap::with_capacity(1024),
43 hit_count: 0,
44 miss_count: 0,
45 }
46 }
47
48 fn intern(&mut self, bytes: &[u8]) -> String {
49 if let Some(cached) = self.cache.get(bytes) {
50 self.hit_count += 1;
51 cached.clone()
52 } else {
53 self.miss_count += 1;
54 let s = String::from_utf8_lossy(bytes).to_string();
55 self.cache.insert(bytes.to_vec(), s.clone());
56 s
57 }
58 }
59
60 fn hit_rate(&self) -> f64 {
61 if self.hit_count + self.miss_count == 0 {
62 0.0
63 } else {
64 self.hit_count as f64 / (self.hit_count + self.miss_count) as f64
65 }
66 }
67}
68
69#[derive(Debug, Clone)]
70enum ParserState {
71 Initial,
72 InMessageHeader,
73 InRelease {
74 reference: String,
75 },
76 InResource {
77 resource_type: String,
78 reference: String,
79 },
80 Done,
81}
82
83#[derive(Debug, Clone)]
85pub enum ZeroCopyElement {
86 MessageHeader {
87 message_id: String,
88 created_date_time: String,
89 version: ERNVersion,
90 },
91 Release {
92 reference: String,
93 title: String,
94 genre: Option<String>,
95 resource_references: Vec<String>,
96 },
97 SoundRecording {
98 reference: String,
99 title: String,
100 duration: Option<String>,
101 isrc: Option<String>,
102 creation_date: Option<String>,
103 },
104 Video {
105 reference: String,
106 title: String,
107 duration: Option<String>,
108 codec: Option<String>,
109 },
110 Image {
111 reference: String,
112 title: String,
113 width: Option<u32>,
114 height: Option<u32>,
115 format: Option<String>,
116 },
117 Text {
118 reference: String,
119 title: String,
120 language: Option<String>,
121 },
122 EndOfStream {
123 stats: ZeroCopyStats,
124 },
125}
126
127#[derive(Debug, Clone)]
128pub struct ZeroCopyStats {
129 pub bytes_processed: u64,
130 pub elements_found: u64,
131 pub string_cache_hit_rate: f64,
132 pub parse_time: std::time::Duration,
133 pub throughput_mb_per_sec: f64,
134 pub memory_used_bytes: usize,
135}
136
137impl ZeroCopyParser {
138 pub fn new(version: ERNVersion) -> Self {
139 Self {
140 buffer: Vec::with_capacity(1024 * 1024), string_cache: StringCache::new(),
142 state: ParserState::Initial,
143 stats: ZeroCopyStats {
144 bytes_processed: 0,
145 elements_found: 0,
146 string_cache_hit_rate: 0.0,
147 parse_time: std::time::Duration::default(),
148 throughput_mb_per_sec: 0.0,
149 memory_used_bytes: 0,
150 },
151 version,
152 }
153 }
154
155 pub fn parse_streaming(&mut self, data: &[u8]) -> Result<Vec<ZeroCopyElement>, ParseError> {
157 let start_time = Instant::now();
158 self.stats.bytes_processed += data.len() as u64;
159
160 let mut results = Vec::new();
161
162 let release_positions = self.find_elements_simd(data, b"<Release")?;
164 let sound_recording_positions = self.find_elements_simd(data, b"<SoundRecording")?;
165 let video_positions = self.find_elements_simd(data, b"<Video")?;
166 let image_positions = self.find_elements_simd(data, b"<Image")?;
167 let text_positions = self.find_elements_simd(data, b"<Text")?;
168 let message_header_positions = self.find_elements_simd(data, b"<MessageHeader")?;
169
170 for pos in message_header_positions {
172 if let Some(element) = self.extract_message_header(data, pos)? {
173 results.push(element);
174 self.stats.elements_found += 1;
175 }
176 }
177
178 for pos in release_positions {
180 if let Some(element) = self.extract_release_zero_copy(data, pos)? {
181 results.push(element);
182 self.stats.elements_found += 1;
183 }
184 }
185
186 for pos in sound_recording_positions {
188 if let Some(element) = self.extract_sound_recording_zero_copy(data, pos)? {
189 results.push(element);
190 self.stats.elements_found += 1;
191 }
192 }
193
194 for pos in video_positions {
196 if let Some(element) = self.extract_video_zero_copy(data, pos)? {
197 results.push(element);
198 self.stats.elements_found += 1;
199 }
200 }
201
202 for pos in image_positions {
204 if let Some(element) = self.extract_image_zero_copy(data, pos)? {
205 results.push(element);
206 self.stats.elements_found += 1;
207 }
208 }
209
210 for pos in text_positions {
212 if let Some(element) = self.extract_text_zero_copy(data, pos)? {
213 results.push(element);
214 self.stats.elements_found += 1;
215 }
216 }
217
218 self.stats.parse_time = start_time.elapsed();
220 self.stats.string_cache_hit_rate = self.string_cache.hit_rate();
221 self.stats.throughput_mb_per_sec =
222 (data.len() as f64 / (1024.0 * 1024.0)) / self.stats.parse_time.as_secs_f64();
223 self.stats.memory_used_bytes = self.estimate_memory_usage();
224
225 Ok(results)
226 }
227
228 #[cfg(target_arch = "x86_64")]
230 fn find_elements_simd(&self, data: &[u8], pattern: &[u8]) -> Result<Vec<usize>, ParseError> {
231 use std::arch::x86_64::*;
232
233 let mut positions = Vec::new();
234
235 if pattern.len() == 0 || data.len() < pattern.len() {
236 return Ok(positions);
237 }
238
239 if pattern.len() > 16 {
241 return self.find_elements_fallback(data, pattern);
242 }
243
244 unsafe {
246 let pattern_first = pattern[0];
247 let mut i = 0;
248
249 while i + 16 <= data.len() {
251 let chunk = _mm_loadu_si128(data.as_ptr().add(i) as *const __m128i);
253
254 let pattern_vec = _mm_set1_epi8(pattern_first as i8);
256
257 let matches = _mm_cmpeq_epi8(chunk, pattern_vec);
259
260 let mask = _mm_movemask_epi8(matches) as u16;
262
263 for bit_pos in 0..16 {
265 if (mask & (1 << bit_pos)) != 0 {
266 let pos = i + bit_pos;
267
268 if pos + pattern.len() <= data.len()
270 && data[pos..pos + pattern.len()] == *pattern
271 {
272 positions.push(pos);
273 }
274 }
275 }
276
277 i += 16;
278 }
279
280 while i + pattern.len() <= data.len() {
282 if data[i..i + pattern.len()] == *pattern {
283 positions.push(i);
284 }
285 i += 1;
286 }
287 }
288
289 Ok(positions)
290 }
291
292 #[cfg(not(target_arch = "x86_64"))]
294 fn find_elements_simd(&self, data: &[u8], pattern: &[u8]) -> Result<Vec<usize>, ParseError> {
295 self.find_elements_fallback(data, pattern)
296 }
297
298 fn find_elements_fallback(
299 &self,
300 data: &[u8],
301 pattern: &[u8],
302 ) -> Result<Vec<usize>, ParseError> {
303 let mut positions = Vec::new();
304 let mut start = 0;
305
306 use memchr::memchr;
308
309 while let Some(pos) = memchr(pattern[0], &data[start..]) {
310 let abs_pos = start + pos;
311
312 if abs_pos + pattern.len() <= data.len()
314 && data[abs_pos..abs_pos + pattern.len()] == *pattern
315 {
316 positions.push(abs_pos);
317 }
318
319 start = abs_pos + 1;
320 }
321
322 Ok(positions)
323 }
324
325 fn extract_message_header(
327 &mut self,
328 data: &[u8],
329 start: usize,
330 ) -> Result<Option<ZeroCopyElement>, ParseError> {
331 if let Some(end_pos) = self.find_closing_tag(data, start, b"MessageHeader") {
333 let header_data = &data[start..end_pos];
334
335 let message_id =
337 if let Some(id_data) = self.extract_field_zero_copy(header_data, b"MessageId") {
338 self.string_cache.intern(id_data)
339 } else {
340 "unknown".to_string()
341 };
342
343 let created_date_time = if let Some(dt_data) =
345 self.extract_field_zero_copy(header_data, b"CreatedDateTime")
346 {
347 self.string_cache.intern(dt_data)
348 } else {
349 chrono::Utc::now().to_rfc3339()
350 };
351
352 return Ok(Some(ZeroCopyElement::MessageHeader {
353 message_id,
354 created_date_time,
355 version: self.version,
356 }));
357 }
358
359 Ok(None)
360 }
361
362 fn extract_release_zero_copy(
364 &mut self,
365 data: &[u8],
366 start: usize,
367 ) -> Result<Option<ZeroCopyElement>, ParseError> {
368 if let Some(end_pos) = self.find_closing_tag(data, start, b"Release") {
369 let release_data = &data[start..end_pos];
370
371 let reference = if let Some(ref_data) =
373 self.extract_attribute_zero_copy(release_data, b"ReleaseReference")
374 {
375 self.string_cache.intern(ref_data)
376 } else {
377 format!("REL-{}", self.stats.elements_found)
378 };
379
380 let title = if let Some(title_data) =
382 self.extract_nested_field_zero_copy(release_data, b"TitleText")
383 {
384 self.string_cache.intern(title_data)
385 } else if let Some(title_data) = self.extract_field_zero_copy(release_data, b"Title") {
386 self.string_cache.intern(title_data)
387 } else {
388 "Untitled Release".to_string()
389 };
390
391 let genre = self
393 .extract_nested_field_zero_copy(release_data, b"GenreText")
394 .map(|g| self.string_cache.intern(g));
395
396 let resource_references = self.extract_resource_references_zero_copy(release_data);
398
399 return Ok(Some(ZeroCopyElement::Release {
400 reference,
401 title,
402 genre,
403 resource_references,
404 }));
405 }
406
407 Ok(None)
408 }
409
410 fn extract_sound_recording_zero_copy(
412 &mut self,
413 data: &[u8],
414 start: usize,
415 ) -> Result<Option<ZeroCopyElement>, ParseError> {
416 if let Some(end_pos) = self.find_closing_tag(data, start, b"SoundRecording") {
417 let recording_data = &data[start..end_pos];
418
419 let reference = if let Some(ref_data) =
420 self.extract_attribute_zero_copy(recording_data, b"ResourceReference")
421 {
422 self.string_cache.intern(ref_data)
423 } else {
424 format!("RES-{}", self.stats.elements_found)
425 };
426
427 let title = if let Some(title_data) =
428 self.extract_nested_field_zero_copy(recording_data, b"TitleText")
429 {
430 self.string_cache.intern(title_data)
431 } else {
432 "Untitled Track".to_string()
433 };
434
435 let duration = self
436 .extract_field_zero_copy(recording_data, b"Duration")
437 .map(|d| self.string_cache.intern(d));
438
439 let isrc = self
440 .extract_field_zero_copy(recording_data, b"ISRC")
441 .map(|i| self.string_cache.intern(i));
442
443 let creation_date = self
444 .extract_field_zero_copy(recording_data, b"CreationDate")
445 .map(|cd| self.string_cache.intern(cd));
446
447 return Ok(Some(ZeroCopyElement::SoundRecording {
448 reference,
449 title,
450 duration,
451 isrc,
452 creation_date,
453 }));
454 }
455
456 Ok(None)
457 }
458
459 fn extract_video_zero_copy(
461 &mut self,
462 data: &[u8],
463 start: usize,
464 ) -> Result<Option<ZeroCopyElement>, ParseError> {
465 if let Some(end_pos) = self.find_closing_tag(data, start, b"Video") {
466 let video_data = &data[start..end_pos];
467
468 let reference = if let Some(ref_data) =
469 self.extract_attribute_zero_copy(video_data, b"ResourceReference")
470 {
471 self.string_cache.intern(ref_data)
472 } else {
473 format!("VID-{}", self.stats.elements_found)
474 };
475
476 let title = if let Some(title_data) =
477 self.extract_nested_field_zero_copy(video_data, b"TitleText")
478 {
479 self.string_cache.intern(title_data)
480 } else {
481 "Untitled Video".to_string()
482 };
483
484 let duration = self
485 .extract_field_zero_copy(video_data, b"Duration")
486 .map(|d| self.string_cache.intern(d));
487
488 let codec = self
489 .extract_field_zero_copy(video_data, b"VideoCodecType")
490 .map(|c| self.string_cache.intern(c));
491
492 return Ok(Some(ZeroCopyElement::Video {
493 reference,
494 title,
495 duration,
496 codec,
497 }));
498 }
499
500 Ok(None)
501 }
502
503 fn extract_image_zero_copy(
505 &mut self,
506 data: &[u8],
507 start: usize,
508 ) -> Result<Option<ZeroCopyElement>, ParseError> {
509 if let Some(end_pos) = self.find_closing_tag(data, start, b"Image") {
510 let image_data = &data[start..end_pos];
511
512 let reference = if let Some(ref_data) =
513 self.extract_attribute_zero_copy(image_data, b"ResourceReference")
514 {
515 self.string_cache.intern(ref_data)
516 } else {
517 format!("IMG-{}", self.stats.elements_found)
518 };
519
520 let title = if let Some(title_data) =
521 self.extract_nested_field_zero_copy(image_data, b"TitleText")
522 {
523 self.string_cache.intern(title_data)
524 } else {
525 "Untitled Image".to_string()
526 };
527
528 let width = self
529 .extract_field_zero_copy(image_data, b"Width")
530 .and_then(|w| String::from_utf8_lossy(w).parse().ok());
531
532 let height = self
533 .extract_field_zero_copy(image_data, b"Height")
534 .and_then(|h| String::from_utf8_lossy(h).parse().ok());
535
536 let format = self
537 .extract_field_zero_copy(image_data, b"ImageCodecType")
538 .map(|f| self.string_cache.intern(f));
539
540 return Ok(Some(ZeroCopyElement::Image {
541 reference,
542 title,
543 width,
544 height,
545 format,
546 }));
547 }
548
549 Ok(None)
550 }
551
552 fn extract_text_zero_copy(
554 &mut self,
555 data: &[u8],
556 start: usize,
557 ) -> Result<Option<ZeroCopyElement>, ParseError> {
558 if let Some(end_pos) = self.find_closing_tag(data, start, b"Text") {
559 let text_data = &data[start..end_pos];
560
561 let reference = if let Some(ref_data) =
562 self.extract_attribute_zero_copy(text_data, b"ResourceReference")
563 {
564 self.string_cache.intern(ref_data)
565 } else {
566 format!("TXT-{}", self.stats.elements_found)
567 };
568
569 let title = if let Some(title_data) =
570 self.extract_nested_field_zero_copy(text_data, b"TitleText")
571 {
572 self.string_cache.intern(title_data)
573 } else {
574 "Untitled Text".to_string()
575 };
576
577 let language = self
578 .extract_field_zero_copy(text_data, b"LanguageOfPerformance")
579 .or_else(|| self.extract_field_zero_copy(text_data, b"LanguageCode"))
580 .map(|l| self.string_cache.intern(l));
581
582 return Ok(Some(ZeroCopyElement::Text {
583 reference,
584 title,
585 language,
586 }));
587 }
588
589 Ok(None)
590 }
591
592 fn find_closing_tag(&self, data: &[u8], start: usize, tag_name: &[u8]) -> Option<usize> {
594 let closing_pattern = [b"</", tag_name, b">"].concat();
595
596 let search_start = start + tag_name.len();
598 if let Ok(positions) = self.find_elements_fallback(&data[search_start..], &closing_pattern)
599 {
600 if let Some(pos) = positions.first() {
601 return Some(search_start + pos + closing_pattern.len());
602 }
603 }
604
605 None
606 }
607
608 fn extract_field_zero_copy<'a>(&self, data: &'a [u8], field_name: &[u8]) -> Option<&'a [u8]> {
610 let opening = [b"<", field_name, b">"].concat();
611 let closing = [b"</", field_name, b">"].concat();
612
613 if let Ok(start_positions) = self.find_elements_fallback(data, &opening) {
614 if let Some(&start_pos) = start_positions.first() {
615 let content_start = start_pos + opening.len();
616
617 if let Ok(end_positions) =
618 self.find_elements_fallback(&data[content_start..], &closing)
619 {
620 if let Some(&end_pos) = end_positions.first() {
621 let content_end = content_start + end_pos;
622 return Some(&data[content_start..content_end]);
623 }
624 }
625 }
626 }
627
628 None
629 }
630
631 fn extract_nested_field_zero_copy<'a>(
633 &self,
634 data: &'a [u8],
635 inner_field: &[u8],
636 ) -> Option<&'a [u8]> {
637 if let Some(content) = self.extract_field_zero_copy(data, inner_field) {
639 return Some(content);
640 }
641
642 let parent_tags: &[&[u8]] = &[b"ReferenceTitle", b"Title"];
644
645 for parent in parent_tags {
646 if let Some(parent_content) = self.extract_field_zero_copy(data, parent) {
647 if let Some(inner_content) =
648 self.extract_field_zero_copy(parent_content, inner_field)
649 {
650 return Some(inner_content);
651 }
652 }
653 }
654
655 None
656 }
657
658 fn extract_attribute_zero_copy<'a>(
660 &self,
661 data: &'a [u8],
662 attr_name: &[u8],
663 ) -> Option<&'a [u8]> {
664 let pattern = [attr_name, b"=\""].concat();
665
666 if let Ok(positions) = self.find_elements_fallback(data, &pattern) {
667 if let Some(&pos) = positions.first() {
668 let value_start = pos + pattern.len();
669
670 if let Some(quote_pos) = memchr::memchr(b'"', &data[value_start..]) {
672 let value_end = value_start + quote_pos;
673 return Some(&data[value_start..value_end]);
674 }
675 }
676 }
677
678 None
679 }
680
681 fn extract_resource_references_zero_copy(&mut self, data: &[u8]) -> Vec<String> {
683 let mut references = Vec::new();
684
685 if let Ok(positions) = self.find_elements_fallback(data, b"<ResourceReference>") {
687 for pos in positions {
688 if let Some(ref_data) =
689 self.extract_field_zero_copy(&data[pos..], b"ResourceReference")
690 {
691 references.push(self.string_cache.intern(ref_data));
692 }
693 }
694 }
695
696 references
697 }
698
699 fn find_closing_tag_simple(&self, data: &[u8], start: usize, tag_name: &str) -> Option<usize> {
700 let closing_tag = format!("</{}>", tag_name);
701 let closing_bytes = closing_tag.as_bytes();
702
703 if let Ok(positions) = self.find_elements_fallback(&data[start..], closing_bytes) {
704 if let Some(&pos) = positions.first() {
705 return Some(start + pos + closing_bytes.len());
706 }
707 }
708
709 None
710 }
711
712 fn estimate_memory_usage(&self) -> usize {
713 self.buffer.capacity() +
714 self.string_cache.cache.capacity() * 64 + std::mem::size_of::<Self>()
716 }
717
718 pub fn get_stats(&self) -> &ZeroCopyStats {
719 &self.stats
720 }
721}
722
723pub struct ZeroCopyStreamIterator<R: BufRead> {
725 reader: R,
726 parser: ZeroCopyParser,
727 buffer: Vec<u8>,
728 finished: bool,
729 elements_queue: Vec<ZeroCopyElement>,
730 current_index: usize,
731 start_time: Instant,
732}
733
734impl<R: BufRead> ZeroCopyStreamIterator<R> {
735 pub fn new(mut reader: R, version: ERNVersion) -> Self {
736 let mut buffer = Vec::with_capacity(1024 * 1024); let _ = reader.read_to_end(&mut buffer);
738
739 Self {
740 reader,
741 parser: ZeroCopyParser::new(version),
742 buffer,
743 finished: false,
744 elements_queue: Vec::new(),
745 current_index: 0,
746 start_time: Instant::now(),
747 }
748 }
749
750 pub fn stats(&self) -> WorkingStreamingStats {
751 let zero_copy_stats = self.parser.get_stats();
752 WorkingStreamingStats {
753 bytes_processed: zero_copy_stats.bytes_processed,
754 elements_yielded: zero_copy_stats.elements_found as usize,
755 current_depth: 0,
756 max_depth_reached: 10, current_memory_bytes: zero_copy_stats.memory_used_bytes,
758 max_memory_used_bytes: zero_copy_stats.memory_used_bytes,
759 elapsed_time: self.start_time.elapsed(),
760 throughput_mb_per_sec: zero_copy_stats.throughput_mb_per_sec,
761 }
762 }
763
764 fn convert_to_working_element(element: ZeroCopyElement) -> WorkingStreamingElement {
765 match element {
766 ZeroCopyElement::MessageHeader {
767 message_id,
768 created_date_time,
769 version,
770 } => WorkingStreamingElement::MessageHeader {
771 message_id,
772 created_date_time,
773 version,
774 },
775 ZeroCopyElement::Release {
776 reference,
777 title,
778 resource_references,
779 ..
780 } => WorkingStreamingElement::Release {
781 reference,
782 title,
783 resource_references,
784 },
785 ZeroCopyElement::SoundRecording {
786 reference,
787 title,
788 duration,
789 isrc,
790 ..
791 } => WorkingStreamingElement::SoundRecording {
792 reference,
793 title,
794 duration,
795 isrc,
796 },
797 ZeroCopyElement::Video {
798 reference,
799 title,
800 duration,
801 ..
802 } => WorkingStreamingElement::Video {
803 reference,
804 title,
805 duration,
806 },
807 ZeroCopyElement::Image {
808 reference,
809 title,
810 width,
811 height,
812 ..
813 } => WorkingStreamingElement::Image {
814 reference,
815 title,
816 width,
817 height,
818 },
819 ZeroCopyElement::Text {
820 reference,
821 title,
822 language,
823 } => WorkingStreamingElement::Text {
824 reference,
825 title,
826 language_code: language,
827 },
828 ZeroCopyElement::EndOfStream { stats } => {
829 WorkingStreamingElement::EndOfStream {
830 stats: WorkingStreamingStats {
831 bytes_processed: stats.bytes_processed,
832 elements_yielded: stats.elements_found as usize,
833 current_depth: 0,
834 max_depth_reached: 10, current_memory_bytes: stats.memory_used_bytes,
836 max_memory_used_bytes: stats.memory_used_bytes,
837 elapsed_time: stats.parse_time,
838 throughput_mb_per_sec: stats.throughput_mb_per_sec,
839 },
840 }
841 }
842 }
843 }
844}
845
846impl<R: BufRead> Iterator for ZeroCopyStreamIterator<R> {
847 type Item = Result<WorkingStreamingElement, ParseError>;
848
849 fn next(&mut self) -> Option<Self::Item> {
850 if self.finished {
851 return None;
852 }
853
854 if self.elements_queue.is_empty() && self.current_index == 0 {
856 match self.parser.parse_streaming(&self.buffer) {
857 Ok(mut elements) => {
858 elements.push(ZeroCopyElement::EndOfStream {
860 stats: self.parser.get_stats().clone(),
861 });
862 self.elements_queue = elements;
863 }
864 Err(e) => {
865 self.finished = true;
866 return Some(Err(e));
867 }
868 }
869 }
870
871 if self.current_index < self.elements_queue.len() {
873 let element = self.elements_queue[self.current_index].clone();
874 self.current_index += 1;
875
876 if matches!(element, ZeroCopyElement::EndOfStream { .. }) {
878 self.finished = true;
879 }
880
881 Some(Ok(Self::convert_to_working_element(element)))
882 } else {
883 self.finished = true;
884 None
885 }
886 }
887}
888
889pub struct ZeroCopyIterator {
891 parser: ZeroCopyParser,
892 data: Vec<u8>,
893 position: usize,
894 chunk_size: usize,
895 finished: bool,
896}
897
898impl ZeroCopyIterator {
899 pub fn new(data: Vec<u8>, version: ERNVersion, chunk_size: usize) -> Self {
900 Self {
901 parser: ZeroCopyParser::new(version),
902 data,
903 position: 0,
904 chunk_size: chunk_size.max(1024), finished: false,
906 }
907 }
908
909 pub fn parse_all(&mut self) -> Result<Vec<ZeroCopyElement>, ParseError> {
910 let mut all_elements = Vec::new();
911
912 let elements = self.parser.parse_streaming(&self.data)?;
914 all_elements.extend(elements);
915
916 all_elements.push(ZeroCopyElement::EndOfStream {
918 stats: self.parser.get_stats().clone(),
919 });
920
921 self.finished = true;
922 Ok(all_elements)
923 }
924
925 pub fn stats(&self) -> &ZeroCopyStats {
926 self.parser.get_stats()
927 }
928}
929
930#[cfg(test)]
931mod tests {
932 use super::*;
933
934 #[test]
935 fn test_zero_copy_basic_parsing() {
936 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
937<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
938 <MessageHeader>
939 <MessageId>ZERO-COPY-TEST</MessageId>
940 <CreatedDateTime>2023-01-01T00:00:00Z</CreatedDateTime>
941 </MessageHeader>
942 <Release ReleaseReference="ZC-REL-001">
943 <ReferenceTitle>
944 <TitleText>Zero Copy Release</TitleText>
945 </ReferenceTitle>
946 </Release>
947</ern:NewReleaseMessage>"#;
948
949 let mut parser = ZeroCopyParser::new(ERNVersion::V4_3);
950 let elements = parser.parse_streaming(xml.as_bytes()).unwrap();
951
952 assert!(!elements.is_empty(), "Should find elements");
953 println!("Zero-copy parsing found {} elements", elements.len());
954
955 let has_header = elements
957 .iter()
958 .any(|e| matches!(e, ZeroCopyElement::MessageHeader { .. }));
959 let has_release = elements
960 .iter()
961 .any(|e| matches!(e, ZeroCopyElement::Release { .. }));
962
963 assert!(has_header, "Should find message header");
964 assert!(has_release, "Should find release");
965
966 let stats = parser.get_stats();
967 println!(
968 "Zero-copy stats: {:.2} MB/s, {}% cache hit rate",
969 stats.throughput_mb_per_sec,
970 stats.string_cache_hit_rate * 100.0
971 );
972 }
973
974 #[test]
975 fn test_simd_pattern_matching() {
976 let data = b"<Release><Release><Release>";
977 let parser = ZeroCopyParser::new(ERNVersion::V4_3);
978
979 let positions = parser.find_elements_simd(data, b"<Release").unwrap();
980 assert_eq!(positions.len(), 3, "Should find 3 occurrences");
981 assert_eq!(positions, vec![0, 9, 18]);
982 }
983
984 #[test]
985 fn test_zero_copy_field_extraction() {
986 let data = b"<Title>Test Title</Title>";
987 let parser = ZeroCopyParser::new(ERNVersion::V4_3);
988
989 let content = parser.extract_field_zero_copy(data, b"Title").unwrap();
990 assert_eq!(content, b"Test Title");
991 }
992
993 #[test]
994 fn test_attribute_extraction() {
995 let data = b"<Release ReleaseReference=\"REL-123\">content</Release>";
996 let parser = ZeroCopyParser::new(ERNVersion::V4_3);
997
998 let attr_value = parser
999 .extract_attribute_zero_copy(data, b"ReleaseReference")
1000 .unwrap();
1001 assert_eq!(attr_value, b"REL-123");
1002 }
1003}