ddex_parser/parser/
stream.rs

1// core/src/parser/stream.rs
2//! Streaming parser for large DDEX files
3
4use crate::error::{ParseError, ErrorLocation};
5use ddex_core::models::graph::{ERNMessage, Release, Resource, Deal, Party, MessageHeader};
6use ddex_core::models::flat::ParsedERNMessage;
7use ddex_core::models::versions::ERNVersion;
8use crate::parser::ParseOptions;
9use crate::transform::flatten::Flattener;
10use quick_xml::events::Event;
11use quick_xml::Reader;
12use std::io::BufRead;
13use std::time::{Duration, Instant};
14
15/// Progress information for streaming parsing
16#[derive(Debug, Clone)]
17pub struct ParseProgress {
18    pub bytes_processed: u64,
19    pub releases_parsed: usize,
20    pub resources_parsed: usize,
21    pub elapsed: Duration,
22    pub estimated_total_bytes: Option<u64>,
23}
24
25/// Streaming parser for memory-efficient processing
26pub struct StreamingParser<R: BufRead> {
27    reader: Reader<R>,
28    version: ERNVersion,
29    progress_callback: Option<Box<dyn FnMut(ParseProgress) + Send>>,
30    start_time: Instant,
31    bytes_processed: u64,
32    releases_parsed: usize,
33    resources_parsed: usize,
34    chunk_size: usize,
35    max_memory: usize,
36    buffer: Vec<u8>,
37}
38
39impl<R: BufRead> StreamingParser<R> {
40    pub fn new(reader: R, version: ERNVersion) -> Self {
41        let mut xml_reader = Reader::from_reader(reader);
42        xml_reader.config_mut().trim_text(true);
43        xml_reader.config_mut().check_end_names = true;
44        xml_reader.config_mut().expand_empty_elements = false;
45        
46        Self {
47            reader: xml_reader,
48            version,
49            progress_callback: None,
50            start_time: Instant::now(),
51            bytes_processed: 0,
52            releases_parsed: 0,
53            resources_parsed: 0,
54            chunk_size: 100,
55            max_memory: 100 * 1024 * 1024, // 100MB default
56            buffer: Vec::with_capacity(8192),
57        }
58    }
59    
60    pub fn with_progress_callback<F>(mut self, callback: F) -> Self 
61    where
62        F: FnMut(ParseProgress) + Send + 'static
63    {
64        self.progress_callback = Some(Box::new(callback));
65        self
66    }
67    
68    pub fn with_chunk_size(mut self, size: usize) -> Self {
69        self.chunk_size = size;
70        self
71    }
72    
73    pub fn with_max_memory(mut self, max: usize) -> Self {
74        self.max_memory = max;
75        self
76    }
77    
78    fn update_progress(&mut self) {
79        if let Some(ref mut callback) = self.progress_callback {
80            let progress = ParseProgress {
81                bytes_processed: self.bytes_processed,
82                releases_parsed: self.releases_parsed,
83                resources_parsed: self.resources_parsed,
84                elapsed: self.start_time.elapsed(),
85                estimated_total_bytes: None,
86            };
87            callback(progress);
88        }
89    }
90    
91    fn update_byte_position(&mut self) {
92        self.bytes_processed = self.reader.buffer_position() as u64;
93    }
94    
95    /// Parse the message header
96    pub fn parse_header(&mut self) -> Result<MessageHeader, ParseError> {
97        self.buffer.clear();
98        
99        // Skip to MessageHeader element
100        loop {
101            match self.reader.read_event_into(&mut self.buffer) {
102                Ok(Event::Start(ref e)) if e.name().as_ref() == b"MessageHeader" => {
103                    return self.parse_message_header_element();
104                }
105                Ok(Event::Eof) => {
106                    return Err(ParseError::XmlError {
107                        message: "No MessageHeader found".to_string(),
108                        location: ErrorLocation {
109                            line: 0,
110                            column: 0,
111                            byte_offset: Some(self.reader.buffer_position() as usize),
112                            path: "/".to_string(),
113                        },
114                    });
115                }
116                Err(e) => {
117                    return Err(ParseError::XmlError {
118                        message: e.to_string(),
119                        location: self.get_current_location(),
120                    });
121                }
122                _ => {}
123            }
124            self.buffer.clear();
125        }
126    }
127    
128    fn parse_message_header_element(&mut self) -> Result<MessageHeader, ParseError> {
129        use ddex_core::models::graph::{MessageType, MessageSender, MessageRecipient};
130        
131        let mut message_id = String::new();
132        let message_type = MessageType::NewReleaseMessage;
133        let mut created_date_time = chrono::Utc::now();
134        let mut sender = MessageSender {
135            party_id: Vec::new(),
136            party_name: Vec::new(),
137            trading_name: None,
138        };
139        let mut recipient = MessageRecipient {
140            party_id: Vec::new(),
141            party_name: Vec::new(),
142            trading_name: None,
143        };
144        
145        self.buffer.clear();
146        loop {
147            match self.reader.read_event_into(&mut self.buffer) {
148                Ok(Event::Start(ref e)) => {
149                    match e.name().as_ref() {
150                        b"MessageId" => {
151                            message_id = self.read_text_element()?;
152                        }
153                        b"MessageCreatedDateTime" => {
154                            let text = self.read_text_element()?;
155                            created_date_time = chrono::DateTime::parse_from_rfc3339(&text)
156                                .map(|dt| dt.with_timezone(&chrono::Utc))
157                                .unwrap_or_else(|_| chrono::Utc::now());
158                        }
159                        b"MessageSender" => {
160                            sender = self.parse_message_sender()?;
161                        }
162                        b"MessageRecipient" => {
163                            recipient = self.parse_message_recipient()?;
164                        }
165                        _ => {
166                            self.skip_element()?;
167                        }
168                    }
169                }
170                Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageHeader" => {
171                    break;
172                }
173                Ok(Event::Eof) => {
174                    return Err(ParseError::XmlError {
175                        message: "Unexpected EOF in MessageHeader".to_string(),
176                        location: self.get_current_location(),
177                    });
178                }
179                Err(e) => {
180                    return Err(ParseError::XmlError {
181                        message: e.to_string(),
182                        location: self.get_current_location(),
183                    });
184                }
185                _ => {}
186            }
187            self.buffer.clear();
188        }
189        
190        Ok(MessageHeader {
191            message_id,
192            message_type,
193            message_created_date_time: created_date_time,
194            message_sender: sender,
195            message_recipient: recipient,
196            message_control_type: None,
197            message_thread_id: None,
198        })
199    }
200    
201    fn parse_message_sender(&mut self) -> Result<ddex_core::models::graph::MessageSender, ParseError> {
202        use ddex_core::models::common::{Identifier, LocalizedString};
203        
204        let mut sender = ddex_core::models::graph::MessageSender {
205            party_id: Vec::new(),
206            party_name: Vec::new(),
207            trading_name: None,
208        };
209        
210        self.buffer.clear();
211        loop {
212            match self.reader.read_event_into(&mut self.buffer) {
213                Ok(Event::Start(ref e)) => {
214                    match e.name().as_ref() {
215                        b"PartyId" => {
216                            let value = self.read_text_element()?;
217                            sender.party_id.push(Identifier {
218                                id_type: ddex_core::models::common::IdentifierType::Proprietary,
219                                namespace: None,
220                                value,
221                            });
222                        }
223                        b"PartyName" => {
224                            let text = self.read_text_element()?;
225                            sender.party_name.push(LocalizedString::new(text));
226                        }
227                        _ => {
228                            self.skip_element()?;
229                        }
230                    }
231                }
232                Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageSender" => {
233                    break;
234                }
235                _ => {}
236            }
237            self.buffer.clear();
238        }
239        
240        Ok(sender)
241    }
242    
243    fn parse_message_recipient(&mut self) -> Result<ddex_core::models::graph::MessageRecipient, ParseError> {
244        // Similar to parse_message_sender
245        use ddex_core::models::common::{Identifier, LocalizedString};
246        
247        let mut recipient = ddex_core::models::graph::MessageRecipient {
248            party_id: Vec::new(),
249            party_name: Vec::new(),
250            trading_name: None,
251        };
252        
253        self.buffer.clear();
254        loop {
255            match self.reader.read_event_into(&mut self.buffer) {
256                Ok(Event::Start(ref e)) => {
257                    match e.name().as_ref() {
258                        b"PartyId" => {
259                            let value = self.read_text_element()?;
260                            recipient.party_id.push(Identifier {
261                                id_type: ddex_core::models::common::IdentifierType::Proprietary,
262                                namespace: None,
263                                value,
264                            });
265                        }
266                        b"PartyName" => {
267                            let text = self.read_text_element()?;
268                            recipient.party_name.push(LocalizedString::new(text));
269                        }
270                        _ => {
271                            self.skip_element()?;
272                        }
273                    }
274                }
275                Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageRecipient" => {
276                    break;
277                }
278                _ => {}
279            }
280            self.buffer.clear();
281        }
282        
283        Ok(recipient)
284    }
285    
286    /// Stream releases one at a time for memory efficiency
287    pub fn stream_releases(&mut self) -> ReleaseIterator<'_, R> {
288        ReleaseIterator::new(self)
289    }
290    
291    /// Stream resources one at a time
292    pub fn stream_resources(&mut self) -> ResourceIterator<'_, R> {
293        ResourceIterator::new(self)
294    }
295    
296    /// Stream parties
297    pub fn stream_parties(&mut self) -> PartyIterator<'_, R> {
298        PartyIterator::new(self)
299    }
300    
301    /// Stream deals
302    pub fn stream_deals(&mut self) -> DealIterator<'_, R> {
303        DealIterator::new(self)
304    }
305    
306    /// Helper to read text content of current element
307    fn read_text_element(&mut self) -> Result<String, ParseError> {
308        let mut text = String::new();
309        self.buffer.clear();
310        
311        loop {
312            let event = self.reader.read_event_into(&mut self.buffer);
313            match event {
314                Ok(Event::Text(e)) => {
315                    // Handle the text decoding separately to avoid borrowing issues
316                    match e.unescape() {
317                        Ok(unescaped) => {
318                            text = unescaped.to_string();
319                        }
320                        Err(err) => {
321                            // Get location after the borrow of buffer is done
322                            let location = self.get_current_location();
323                            return Err(ParseError::XmlError {
324                                message: err.to_string(),
325                                location,
326                            });
327                        }
328                    }
329                }
330                Ok(Event::End(_)) => {
331                    break;
332                }
333                Ok(Event::Eof) => {
334                    let location = self.get_current_location();
335                    return Err(ParseError::XmlError {
336                        message: "Unexpected EOF".to_string(),
337                        location,
338                    });
339                }
340                Err(e) => {
341                    let location = self.get_current_location();
342                    return Err(ParseError::XmlError {
343                        message: e.to_string(),
344                        location,
345                    });
346                }
347                _ => {}
348            }
349            self.buffer.clear();
350        }
351        
352        Ok(text)
353    }
354    
355    /// Skip an element and all its children
356    fn skip_element(&mut self) -> Result<(), ParseError> {
357        let mut depth = 1;
358        self.buffer.clear();
359        
360        while depth > 0 {
361            match self.reader.read_event_into(&mut self.buffer) {
362                Ok(Event::Start(_)) => depth += 1,
363                Ok(Event::End(_)) => depth -= 1,
364                Ok(Event::Eof) => break,
365                Err(e) => {
366                    return Err(ParseError::XmlError {
367                        message: e.to_string(),
368                        location: self.get_current_location(),
369                    });
370                }
371                _ => {}
372            }
373            self.buffer.clear();
374        }
375        
376        Ok(())
377    }
378    
379    fn get_current_location(&self) -> ErrorLocation {
380        ErrorLocation {
381            line: 0, // Would need line tracking
382            column: 0,
383            byte_offset: Some(self.reader.buffer_position() as usize),
384            path: "/NewReleaseMessage".to_string(),
385        }
386    }
387}
388
389/// Iterator for streaming releases
390pub struct ReleaseIterator<'a, R: BufRead> {
391    parser: &'a mut StreamingParser<R>,
392    done: bool,
393    in_release_list: bool,
394}
395
396impl<'a, R: BufRead> ReleaseIterator<'a, R> {
397    fn new(parser: &'a mut StreamingParser<R>) -> Self {
398        Self {
399            parser,
400            done: false,
401            in_release_list: false,
402        }
403    }
404    
405    fn find_next_release(&mut self) -> Result<Option<Release>, ParseError> {
406        loop {
407            self.parser.buffer.clear();
408            match self.parser.reader.read_event_into(&mut self.parser.buffer) {
409                Ok(Event::Start(ref e)) => {
410                    match e.name().as_ref() {
411                        b"ReleaseList" => {
412                            self.in_release_list = true;
413                        }
414                        b"Release" if self.in_release_list => {
415                            return self.parse_release_element();
416                        }
417                        _ => {
418                            self.parser.skip_element()?;
419                        }
420                    }
421                }
422                Ok(Event::End(ref e)) if e.name().as_ref() == b"ReleaseList" => {
423                    self.done = true;
424                    return Ok(None);
425                }
426                Ok(Event::Eof) => {
427                    self.done = true;
428                    return Ok(None);
429                }
430                Err(e) => {
431                    return Err(ParseError::XmlError {
432                        message: e.to_string(),
433                        location: self.parser.get_current_location(),
434                    });
435                }
436                _ => {}
437            }
438        }
439    }
440    
441    fn parse_release_element(&mut self) -> Result<Option<Release>, ParseError> {
442        use ddex_core::models::common::LocalizedString;
443        
444        let mut release = Release {
445            release_reference: String::new(),
446            release_id: Vec::new(),
447            release_title: Vec::new(),
448            release_subtitle: None,
449            release_type: None,
450            genre: Vec::new(),
451            release_resource_reference_list: Vec::new(),
452            display_artist: Vec::new(),
453            party_list: Vec::new(),
454            release_date: Vec::new(),
455            territory_code: Vec::new(),
456            excluded_territory_code: Vec::new(),
457        };
458        
459        self.parser.buffer.clear();
460        loop {
461            match self.parser.reader.read_event_into(&mut self.parser.buffer) {
462                Ok(Event::Start(ref e)) => {
463                    match e.name().as_ref() {
464                        b"ReleaseReference" => {
465                            release.release_reference = self.parser.read_text_element()?;
466                        }
467                        b"ReferenceTitle" | b"Title" => {
468                            let text = self.parser.read_text_element()?;
469                            release.release_title.push(LocalizedString::new(text));
470                        }
471                        _ => {
472                            self.parser.skip_element()?;
473                        }
474                    }
475                }
476                Ok(Event::End(ref e)) if e.name().as_ref() == b"Release" => {
477                    break;
478                }
479                _ => {}
480            }
481            self.parser.buffer.clear();
482        }
483        
484        self.parser.releases_parsed += 1;
485        self.parser.update_byte_position();
486        self.parser.update_progress();
487        
488        // Check memory limit
489        let estimated_size = std::mem::size_of::<Release>() * self.parser.releases_parsed;
490        if estimated_size > self.parser.max_memory {
491            return Err(ParseError::SecurityViolation {
492                message: format!("Memory limit exceeded: {} > {}", estimated_size, self.parser.max_memory),
493            });
494        }
495        
496        // Yield control periodically
497        if self.parser.releases_parsed % self.parser.chunk_size == 0 {
498            std::thread::yield_now();
499        }
500        
501        Ok(Some(release))
502    }
503}
504
505impl<'a, R: BufRead> Iterator for ReleaseIterator<'a, R> {
506    type Item = Result<Release, ParseError>;
507    
508    fn next(&mut self) -> Option<Self::Item> {
509        if self.done {
510            return None;
511        }
512        
513        match self.find_next_release() {
514            Ok(Some(release)) => Some(Ok(release)),
515            Ok(None) => None,
516            Err(e) => Some(Err(e)),
517        }
518    }
519}
520
521// Similar iterators for other types
522pub struct ResourceIterator<'a, R: BufRead> {
523    parser: &'a mut StreamingParser<R>,
524    done: bool,
525    in_resource_list: bool,
526}
527
528impl<'a, R: BufRead> ResourceIterator<'a, R> {
529    fn new(parser: &'a mut StreamingParser<R>) -> Self {
530        Self {
531            parser,
532            done: false,
533            in_resource_list: false,
534        }
535    }
536}
537
538impl<'a, R: BufRead> Iterator for ResourceIterator<'a, R> {
539    type Item = Result<Resource, ParseError>;
540    
541    fn next(&mut self) -> Option<Self::Item> {
542        // Similar implementation to ReleaseIterator
543        None // Placeholder
544    }
545}
546
547pub struct PartyIterator<'a, R: BufRead> {
548    parser: &'a mut StreamingParser<R>,
549    done: bool,
550}
551
552impl<'a, R: BufRead> PartyIterator<'a, R> {
553    fn new(parser: &'a mut StreamingParser<R>) -> Self {
554        Self {
555            parser,
556            done: false,
557        }
558    }
559}
560
561impl<'a, R: BufRead> Iterator for PartyIterator<'a, R> {
562    type Item = Result<Party, ParseError>;
563    
564    fn next(&mut self) -> Option<Self::Item> {
565        None // Placeholder
566    }
567}
568
569pub struct DealIterator<'a, R: BufRead> {
570    parser: &'a mut StreamingParser<R>,
571    done: bool,
572}
573
574impl<'a, R: BufRead> DealIterator<'a, R> {
575    fn new(parser: &'a mut StreamingParser<R>) -> Self {
576        Self {
577            parser,
578            done: false,
579        }
580    }
581}
582
583impl<'a, R: BufRead> Iterator for DealIterator<'a, R> {
584    type Item = Result<Deal, ParseError>;
585    
586    fn next(&mut self) -> Option<Self::Item> {
587        None // Placeholder
588    }
589}
590
591/// Parse using streaming for large files
592pub fn parse_streaming<R: BufRead>(
593    reader: R,
594    version: ERNVersion,
595    options: ParseOptions,
596) -> Result<ParsedERNMessage, ParseError> {
597    let mut parser = StreamingParser::new(reader, version)
598        .with_chunk_size(options.chunk_size)
599        .with_max_memory(options.max_memory);
600    
601    // Parse header first
602    let message_header = parser.parse_header()?;
603    
604    // Collect releases in chunks to limit memory
605    let mut releases = Vec::new();
606    let mut resources = Vec::new();
607    let mut parties = Vec::new();
608    let mut deals = Vec::new();
609    
610    // Stream releases
611    for release_result in parser.stream_releases() {
612        let release = release_result?;
613        releases.push(release);
614    }
615    
616    // Stream resources
617    for resource_result in parser.stream_resources() {
618        let resource = resource_result?;
619        resources.push(resource);
620    }
621    
622    // Stream parties
623    for party_result in parser.stream_parties() {
624        let party = party_result?;
625        parties.push(party);
626    }
627    
628    // Stream deals
629    for deal_result in parser.stream_deals() {
630        let deal = deal_result?;
631        deals.push(deal);
632    }
633    
634    // Build ERNMessage
635    let graph = ERNMessage {
636        message_header,
637        parties,
638        resources,
639        releases,
640        deals,
641        version,
642        profile: None,
643        message_audit_trail: None,
644        extensions: None,
645        comments: None,
646    };
647    
648    // Flatten to developer-friendly model
649    let flat = Flattener::flatten(graph.clone());
650    
651    Ok(ParsedERNMessage { graph, flat })
652}