1use crate::error::ParseError;
10use crate::streaming::{WorkingStreamingElement, WorkingStreamingStats};
11use ddex_core::models::versions::ERNVersion;
12use std::collections::HashMap;
13use std::io::BufRead;
14use std::time::Instant;
15
16pub struct FastZeroCopyParser {
18 read_buffer: Vec<u8>,
20 leftover: Vec<u8>,
22 string_cache: HashMap<Vec<u8>, String>,
24 bytes_processed: u64,
26 elements_found: u64,
27 start_time: Instant,
28}
29
30impl FastZeroCopyParser {
31 pub fn new() -> Self {
32 Self {
33 read_buffer: vec![0; 64 * 1024], leftover: Vec::new(),
35 string_cache: HashMap::with_capacity(512),
36 bytes_processed: 0,
37 elements_found: 0,
38 start_time: Instant::now(),
39 }
40 }
41
42 fn intern_string(&mut self, bytes: &[u8]) -> String {
44 if let Some(cached) = self.string_cache.get(bytes) {
45 cached.clone()
46 } else {
47 let s = String::from_utf8_lossy(bytes).to_string();
48 self.string_cache.insert(bytes.to_vec(), s.clone());
49 s
50 }
51 }
52
53 pub fn parse_chunk(
55 &mut self,
56 chunk: &[u8],
57 ) -> Result<Vec<WorkingStreamingElement>, ParseError> {
58 self.bytes_processed += chunk.len() as u64;
59 let mut results = Vec::new();
60
61 let mut data = Vec::with_capacity(self.leftover.len() + chunk.len());
63 data.extend_from_slice(&self.leftover);
64 data.extend_from_slice(chunk);
65
66 let mut pos = 0;
68 while let Some(start) = self.find_pattern(&data[pos..], b"<MessageHeader") {
69 let abs_start = pos + start;
70 if let Some(element) = self.extract_message_header_fast(&data, abs_start)? {
71 results.push(element);
72 self.elements_found += 1;
73 }
74 pos = abs_start + 14; }
76
77 pos = 0;
79 while let Some(start) = self.find_pattern(&data[pos..], b"<Release ") {
80 let abs_start = pos + start;
81 if let Some(element) = self.extract_release_fast(&data, abs_start)? {
82 results.push(element);
83 self.elements_found += 1;
84 }
85 pos = abs_start + 9; }
87
88 pos = 0;
90 while let Some(start) = self.find_pattern(&data[pos..], b"<SoundRecording ") {
91 let abs_start = pos + start;
92 if let Some(element) = self.extract_sound_recording_fast(&data, abs_start)? {
93 results.push(element);
94 self.elements_found += 1;
95 }
96 pos = abs_start + 16; }
98
99 if data.len() > 2048 {
101 self.leftover.clear();
103 self.leftover.extend_from_slice(&data[data.len() - 2048..]);
104 } else {
105 self.leftover = data;
106 }
107
108 Ok(results)
109 }
110
111 fn find_pattern(&self, data: &[u8], pattern: &[u8]) -> Option<usize> {
113 if pattern.is_empty() {
114 return None;
115 }
116
117 let mut pos = 0;
119 while let Some(first_byte_pos) = memchr::memchr(pattern[0], &data[pos..]) {
120 let abs_pos = pos + first_byte_pos;
121
122 if abs_pos + pattern.len() <= data.len()
123 && &data[abs_pos..abs_pos + pattern.len()] == pattern
124 {
125 return Some(abs_pos);
126 }
127
128 pos = abs_pos + 1;
129 }
130
131 None
132 }
133
134 fn extract_message_header_fast(
136 &mut self,
137 data: &[u8],
138 start: usize,
139 ) -> Result<Option<WorkingStreamingElement>, ParseError> {
140 if let Some(end) = self.find_pattern(&data[start..], b"</MessageHeader>") {
142 let header_data = &data[start..start + end + 16]; let message_id = if let Some(id) = self.extract_tag_content(header_data, b"MessageId") {
146 self.intern_string(id)
147 } else {
148 "unknown".to_string()
149 };
150
151 let created_date_time =
153 if let Some(dt) = self.extract_tag_content(header_data, b"CreatedDateTime") {
154 self.intern_string(dt)
155 } else {
156 chrono::Utc::now().to_rfc3339()
157 };
158
159 return Ok(Some(WorkingStreamingElement::MessageHeader {
160 message_id,
161 created_date_time,
162 version: ERNVersion::V4_3,
163 }));
164 }
165
166 Ok(None)
167 }
168
169 fn extract_release_fast(
171 &mut self,
172 data: &[u8],
173 start: usize,
174 ) -> Result<Option<WorkingStreamingElement>, ParseError> {
175 if let Some(end) = self.find_pattern(&data[start..], b"</Release>") {
177 let release_data = &data[start..start + end + 10]; let reference = if let Some(attr) =
181 self.extract_attribute_fast(release_data, b"ReleaseReference")
182 {
183 self.intern_string(attr)
184 } else {
185 format!("REL-{}", self.elements_found)
186 };
187
188 let title =
190 if let Some(title_data) = self.extract_tag_content(release_data, b"TitleText") {
191 self.intern_string(title_data)
192 } else {
193 "Untitled Release".to_string()
194 };
195
196 let resource_references = self.extract_resource_references_fast(release_data);
198
199 return Ok(Some(WorkingStreamingElement::Release {
200 reference,
201 title,
202 resource_references,
203 }));
204 }
205
206 Ok(None)
207 }
208
209 fn extract_sound_recording_fast(
211 &mut self,
212 data: &[u8],
213 start: usize,
214 ) -> Result<Option<WorkingStreamingElement>, ParseError> {
215 if let Some(end) = self.find_pattern(&data[start..], b"</SoundRecording>") {
216 let recording_data = &data[start..start + end + 17]; let reference = if let Some(attr) =
219 self.extract_attribute_fast(recording_data, b"ResourceReference")
220 {
221 self.intern_string(attr)
222 } else {
223 format!("RES-{}", self.elements_found)
224 };
225
226 let title =
227 if let Some(title_data) = self.extract_tag_content(recording_data, b"TitleText") {
228 self.intern_string(title_data)
229 } else {
230 "Untitled Track".to_string()
231 };
232
233 let duration = self
234 .extract_tag_content(recording_data, b"Duration")
235 .map(|d| self.intern_string(d));
236
237 let isrc = self
238 .extract_tag_content(recording_data, b"ISRC")
239 .map(|i| self.intern_string(i));
240
241 return Ok(Some(WorkingStreamingElement::SoundRecording {
242 reference,
243 title,
244 duration,
245 isrc,
246 }));
247 }
248
249 Ok(None)
250 }
251
252 fn extract_tag_content<'a>(&self, data: &'a [u8], tag_name: &[u8]) -> Option<&'a [u8]> {
254 let opening = [b"<", tag_name, b">"].concat();
256 let closing = [b"</", tag_name, b">"].concat();
257
258 if let Some(start_pos) = self.find_pattern(data, &opening) {
259 let content_start = start_pos + opening.len();
260 if let Some(end_pos) = self.find_pattern(&data[content_start..], &closing) {
261 let content_end = content_start + end_pos;
262 return Some(&data[content_start..content_end]);
263 }
264 }
265
266 None
267 }
268
269 fn extract_attribute_fast<'a>(&self, data: &'a [u8], attr_name: &[u8]) -> Option<&'a [u8]> {
271 let pattern = [attr_name, b"=\""].concat();
272
273 if let Some(start_pos) = self.find_pattern(data, &pattern) {
274 let value_start = start_pos + pattern.len();
275
276 if let Some(quote_pos) = memchr::memchr(b'"', &data[value_start..]) {
278 let value_end = value_start + quote_pos;
279 return Some(&data[value_start..value_end]);
280 }
281 }
282
283 None
284 }
285
286 fn extract_resource_references_fast(&mut self, data: &[u8]) -> Vec<String> {
288 let mut refs = Vec::new();
289 let mut pos = 0;
290
291 while let Some(start) = self.find_pattern(&data[pos..], b"<ResourceReference>") {
293 let abs_start = pos + start;
294 if let Some(content) =
295 self.extract_tag_content(&data[abs_start..], b"ResourceReference")
296 {
297 refs.push(self.intern_string(content));
298 }
299 pos = abs_start + 19; }
301
302 refs
303 }
304
305 pub fn stats(&self) -> WorkingStreamingStats {
307 let elapsed = self.start_time.elapsed();
308 let throughput = if elapsed.as_secs_f64() > 0.0 {
309 (self.bytes_processed as f64 / (1024.0 * 1024.0)) / elapsed.as_secs_f64()
310 } else {
311 0.0
312 };
313
314 WorkingStreamingStats {
315 bytes_processed: self.bytes_processed,
316 elements_yielded: self.elements_found as usize,
317 current_depth: 0,
318 max_depth_reached: 10,
319 current_memory_bytes: self.read_buffer.capacity() + self.leftover.capacity(),
320 max_memory_used_bytes: self.read_buffer.capacity() + self.leftover.capacity(),
321 elapsed_time: elapsed,
322 throughput_mb_per_sec: throughput,
323 }
324 }
325}
326
327impl Default for FastZeroCopyParser {
328 fn default() -> Self {
329 Self::new()
330 }
331}
332
333pub struct FastZeroCopyIterator<R: BufRead> {
335 reader: R,
336 parser: FastZeroCopyParser,
337 buffer: Vec<u8>,
338 finished: bool,
339 elements_queue: Vec<WorkingStreamingElement>,
340 current_index: usize,
341}
342
343impl<R: BufRead> FastZeroCopyIterator<R> {
344 pub fn new(reader: R, _version: ERNVersion) -> Self {
345 Self {
346 reader,
347 parser: FastZeroCopyParser::new(),
348 buffer: vec![0; 64 * 1024], finished: false,
350 elements_queue: Vec::new(),
351 current_index: 0,
352 }
353 }
354
355 pub fn stats(&self) -> WorkingStreamingStats {
356 self.parser.stats()
357 }
358
359 fn read_next_chunk(&mut self) -> Result<bool, ParseError> {
360 let bytes_read = self.reader.read(&mut self.buffer)?;
361
362 if bytes_read == 0 {
363 return Ok(false); }
365
366 let elements = self.parser.parse_chunk(&self.buffer[..bytes_read])?;
367 self.elements_queue.extend(elements);
368
369 Ok(true)
370 }
371}
372
373impl<R: BufRead> Iterator for FastZeroCopyIterator<R> {
374 type Item = Result<WorkingStreamingElement, ParseError>;
375
376 fn next(&mut self) -> Option<Self::Item> {
377 if self.finished {
378 return None;
379 }
380
381 if self.current_index < self.elements_queue.len() {
383 let element = self.elements_queue[self.current_index].clone();
384 self.current_index += 1;
385 return Some(Ok(element));
386 }
387
388 match self.read_next_chunk() {
390 Ok(true) => {
391 self.next()
393 }
394 Ok(false) => {
395 self.finished = true;
397 Some(Ok(WorkingStreamingElement::EndOfStream {
398 stats: self.parser.stats(),
399 }))
400 }
401 Err(e) => {
402 self.finished = true;
403 Some(Err(e))
404 }
405 }
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412 use std::io::Cursor;
413
414 #[test]
415 fn test_fast_zero_copy_basic() {
416 let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
417<ern:NewReleaseMessage xmlns:ern="http://ddex.net/xml/ern/43">
418 <MessageHeader>
419 <MessageId>FAST-TEST-MSG</MessageId>
420 <CreatedDateTime>2023-01-01T00:00:00Z</CreatedDateTime>
421 </MessageHeader>
422 <Release ReleaseReference="FAST-REL-001">
423 <ReferenceTitle>
424 <TitleText>Fast Zero Copy Release</TitleText>
425 </ReferenceTitle>
426 </Release>
427</ern:NewReleaseMessage>"#;
428
429 let cursor = Cursor::new(xml.as_bytes());
430 let mut iterator = FastZeroCopyIterator::new(cursor, ERNVersion::V4_3);
431
432 let elements: Result<Vec<_>, _> = iterator.collect();
433 assert!(elements.is_ok(), "Fast zero-copy parsing should work");
434
435 let elements = elements.unwrap();
436 assert!(!elements.is_empty(), "Should find elements");
437
438 let has_header = elements
440 .iter()
441 .any(|e| matches!(e, WorkingStreamingElement::MessageHeader { .. }));
442 let has_release = elements
443 .iter()
444 .any(|e| matches!(e, WorkingStreamingElement::Release { .. }));
445 let has_end_stream = elements
446 .iter()
447 .any(|e| matches!(e, WorkingStreamingElement::EndOfStream { .. }));
448
449 assert!(has_header, "Should find message header");
450 assert!(has_release, "Should find release");
451 assert!(has_end_stream, "Should find end of stream");
452
453 println!("✅ Fast zero-copy parser basic test passed!");
454 }
455
456 #[test]
457 fn test_fast_pattern_matching() {
458 let parser = FastZeroCopyParser::new();
459 let data = b"<Release><MessageHeader><SoundRecording>";
460
461 assert_eq!(parser.find_pattern(data, b"<Release>"), Some(0));
462 assert_eq!(parser.find_pattern(data, b"<MessageHeader>"), Some(9));
463 assert_eq!(parser.find_pattern(data, b"<SoundRecording>"), Some(24));
464 assert_eq!(parser.find_pattern(data, b"<NotFound>"), None);
465 }
466
467 #[test]
468 fn test_tag_content_extraction() {
469 let parser = FastZeroCopyParser::new();
470 let data = b"<Title>Test Title</Title>";
471
472 let content = parser.extract_tag_content(data, b"Title").unwrap();
473 assert_eq!(content, b"Test Title");
474 }
475
476 #[test]
477 fn test_attribute_extraction() {
478 let parser = FastZeroCopyParser::new();
479 let data = b"<Release ReleaseReference=\"REL-123\">";
480
481 let attr_value = parser
482 .extract_attribute_fast(data, b"ReleaseReference")
483 .unwrap();
484 assert_eq!(attr_value, b"REL-123");
485 }
486}