ddex_parser/parser/
stream.rs

1// core/src/parser/stream.rs
2//! Streaming parser for large DDEX files
3
4use crate::error::{ParseError, ErrorLocation};
5use ddex_core::models::graph::{ERNMessage, Release, Resource, Deal, Party, MessageHeader};
6use ddex_core::models::flat::ParsedERNMessage;
7use ddex_core::models::versions::ERNVersion;
8use crate::parser::ParseOptions;
9use crate::transform::flatten::Flattener;
10use quick_xml::events::Event;
11use quick_xml::Reader;
12use std::io::BufRead;
13use std::time::{Duration, Instant};
14
15/// Progress information for streaming parsing
16#[derive(Debug, Clone)]
17pub struct ParseProgress {
18    pub bytes_processed: u64,
19    pub releases_parsed: usize,
20    pub resources_parsed: usize,
21    pub elapsed: Duration,
22    pub estimated_total_bytes: Option<u64>,
23}
24
25/// Streaming parser for memory-efficient processing
26pub struct StreamingParser<R: BufRead> {
27    reader: Reader<R>,
28    version: ERNVersion,
29    progress_callback: Option<Box<dyn FnMut(ParseProgress) + Send>>,
30    start_time: Instant,
31    bytes_processed: u64,
32    releases_parsed: usize,
33    resources_parsed: usize,
34    chunk_size: usize,
35    max_memory: usize,
36    buffer: Vec<u8>,
37}
38
39impl<R: BufRead> StreamingParser<R> {
40    pub fn new(reader: R, version: ERNVersion) -> Self {
41        let mut xml_reader = Reader::from_reader(reader);
42        xml_reader.config_mut().trim_text(true);
43        xml_reader.config_mut().check_end_names = true;
44        xml_reader.config_mut().expand_empty_elements = false;
45        
46        Self {
47            reader: xml_reader,
48            version,
49            progress_callback: None,
50            start_time: Instant::now(),
51            bytes_processed: 0,
52            releases_parsed: 0,
53            resources_parsed: 0,
54            chunk_size: 100,
55            max_memory: 100 * 1024 * 1024, // 100MB default
56            buffer: Vec::with_capacity(8192),
57        }
58    }
59    
60    pub fn with_progress_callback<F>(mut self, callback: F) -> Self 
61    where
62        F: FnMut(ParseProgress) + Send + 'static
63    {
64        self.progress_callback = Some(Box::new(callback));
65        self
66    }
67    
68    pub fn with_chunk_size(mut self, size: usize) -> Self {
69        self.chunk_size = size;
70        self
71    }
72    
73    pub fn with_max_memory(mut self, max: usize) -> Self {
74        self.max_memory = max;
75        self
76    }
77    
78    fn update_progress(&mut self) {
79        if let Some(ref mut callback) = self.progress_callback {
80            let progress = ParseProgress {
81                bytes_processed: self.bytes_processed,
82                releases_parsed: self.releases_parsed,
83                resources_parsed: self.resources_parsed,
84                elapsed: self.start_time.elapsed(),
85                estimated_total_bytes: None,
86            };
87            callback(progress);
88        }
89    }
90    
91    fn update_byte_position(&mut self) {
92        self.bytes_processed = self.reader.buffer_position() as u64;
93    }
94    
95    /// Parse the message header
96    pub fn parse_header(&mut self) -> Result<MessageHeader, ParseError> {
97        self.buffer.clear();
98        
99        // Skip to MessageHeader element
100        loop {
101            match self.reader.read_event_into(&mut self.buffer) {
102                Ok(Event::Start(ref e)) if e.name().as_ref() == b"MessageHeader" => {
103                    return self.parse_message_header_element();
104                }
105                Ok(Event::Eof) => {
106                    return Err(ParseError::XmlError {
107                        message: "No MessageHeader found".to_string(),
108                        location: ErrorLocation {
109                            line: 0,
110                            column: 0,
111                            byte_offset: Some(self.reader.buffer_position() as usize),
112                            path: "/".to_string(),
113                        },
114                    });
115                }
116                Err(e) => {
117                    return Err(ParseError::XmlError {
118                        message: e.to_string(),
119                        location: self.get_current_location(),
120                    });
121                }
122                _ => {}
123            }
124            self.buffer.clear();
125        }
126    }
127    
128    fn parse_message_header_element(&mut self) -> Result<MessageHeader, ParseError> {
129        use ddex_core::models::graph::{MessageType, MessageSender, MessageRecipient};
130        
131        let mut message_id = String::new();
132        let message_type = MessageType::NewReleaseMessage;
133        let mut created_date_time = chrono::Utc::now();
134        let mut sender = MessageSender {
135            party_id: Vec::new(),
136            party_name: Vec::new(),
137            trading_name: None,
138            extensions: None,
139            attributes: None,
140            comments: None,
141        };
142        let mut recipient = MessageRecipient {
143            party_id: Vec::new(),
144            party_name: Vec::new(),
145            trading_name: None,
146            extensions: None,
147            attributes: None,
148            comments: None,
149        };
150        
151        self.buffer.clear();
152        loop {
153            match self.reader.read_event_into(&mut self.buffer) {
154                Ok(Event::Start(ref e)) => {
155                    match e.name().as_ref() {
156                        b"MessageId" => {
157                            message_id = self.read_text_element()?;
158                        }
159                        b"MessageCreatedDateTime" => {
160                            let text = self.read_text_element()?;
161                            created_date_time = chrono::DateTime::parse_from_rfc3339(&text)
162                                .map(|dt| dt.with_timezone(&chrono::Utc))
163                                .unwrap_or_else(|_| chrono::Utc::now());
164                        }
165                        b"MessageSender" => {
166                            sender = self.parse_message_sender()?;
167                        }
168                        b"MessageRecipient" => {
169                            recipient = self.parse_message_recipient()?;
170                        }
171                        _ => {
172                            self.skip_element()?;
173                        }
174                    }
175                }
176                Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageHeader" => {
177                    break;
178                }
179                Ok(Event::Eof) => {
180                    return Err(ParseError::XmlError {
181                        message: "Unexpected EOF in MessageHeader".to_string(),
182                        location: self.get_current_location(),
183                    });
184                }
185                Err(e) => {
186                    return Err(ParseError::XmlError {
187                        message: e.to_string(),
188                        location: self.get_current_location(),
189                    });
190                }
191                _ => {}
192            }
193            self.buffer.clear();
194        }
195        
196        Ok(MessageHeader {
197            message_id,
198            message_type,
199            message_created_date_time: created_date_time,
200            message_sender: sender,
201            message_recipient: recipient,
202            message_control_type: None,
203            message_thread_id: None,
204            extensions: None,
205            attributes: None,
206            comments: None,
207        })
208    }
209    
210    fn parse_message_sender(&mut self) -> Result<ddex_core::models::graph::MessageSender, ParseError> {
211        use ddex_core::models::common::{Identifier, LocalizedString};
212        
213        let mut sender = ddex_core::models::graph::MessageSender {
214            party_id: Vec::new(),
215            party_name: Vec::new(),
216            trading_name: None,
217            extensions: None,
218            attributes: None,
219            comments: None,
220        };
221        
222        self.buffer.clear();
223        loop {
224            match self.reader.read_event_into(&mut self.buffer) {
225                Ok(Event::Start(ref e)) => {
226                    match e.name().as_ref() {
227                        b"PartyId" => {
228                            let value = self.read_text_element()?;
229                            sender.party_id.push(Identifier {
230                                id_type: ddex_core::models::common::IdentifierType::Proprietary,
231                                namespace: None,
232                                value,
233                            });
234                        }
235                        b"PartyName" => {
236                            let text = self.read_text_element()?;
237                            sender.party_name.push(LocalizedString::new(text));
238                        }
239                        _ => {
240                            self.skip_element()?;
241                        }
242                    }
243                }
244                Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageSender" => {
245                    break;
246                }
247                _ => {}
248            }
249            self.buffer.clear();
250        }
251        
252        Ok(sender)
253    }
254    
255    fn parse_message_recipient(&mut self) -> Result<ddex_core::models::graph::MessageRecipient, ParseError> {
256        // Similar to parse_message_sender
257        use ddex_core::models::common::{Identifier, LocalizedString};
258        
259        let mut recipient = ddex_core::models::graph::MessageRecipient {
260            party_id: Vec::new(),
261            party_name: Vec::new(),
262            trading_name: None,
263            extensions: None,
264            attributes: None,
265            comments: None,
266        };
267        
268        self.buffer.clear();
269        loop {
270            match self.reader.read_event_into(&mut self.buffer) {
271                Ok(Event::Start(ref e)) => {
272                    match e.name().as_ref() {
273                        b"PartyId" => {
274                            let value = self.read_text_element()?;
275                            recipient.party_id.push(Identifier {
276                                id_type: ddex_core::models::common::IdentifierType::Proprietary,
277                                namespace: None,
278                                value,
279                            });
280                        }
281                        b"PartyName" => {
282                            let text = self.read_text_element()?;
283                            recipient.party_name.push(LocalizedString::new(text));
284                        }
285                        _ => {
286                            self.skip_element()?;
287                        }
288                    }
289                }
290                Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageRecipient" => {
291                    break;
292                }
293                _ => {}
294            }
295            self.buffer.clear();
296        }
297        
298        Ok(recipient)
299    }
300    
301    /// Stream releases one at a time for memory efficiency
302    pub fn stream_releases(&mut self) -> ReleaseIterator<'_, R> {
303        ReleaseIterator::new(self)
304    }
305    
306    /// Stream resources one at a time
307    pub fn stream_resources(&mut self) -> ResourceIterator<'_, R> {
308        ResourceIterator::new(self)
309    }
310    
311    /// Stream parties
312    pub fn stream_parties(&mut self) -> PartyIterator<'_, R> {
313        PartyIterator::new(self)
314    }
315    
316    /// Stream deals
317    pub fn stream_deals(&mut self) -> DealIterator<'_, R> {
318        DealIterator::new(self)
319    }
320    
321    /// Helper to read text content of current element
322    fn read_text_element(&mut self) -> Result<String, ParseError> {
323        let mut text = String::new();
324        self.buffer.clear();
325        
326        loop {
327            let event = self.reader.read_event_into(&mut self.buffer);
328            match event {
329                Ok(Event::Text(e)) => {
330                    // Handle the text decoding separately to avoid borrowing issues
331                    match e.unescape() {
332                        Ok(unescaped) => {
333                            text = unescaped.to_string();
334                        }
335                        Err(err) => {
336                            // Get location after the borrow of buffer is done
337                            let location = self.get_current_location();
338                            return Err(ParseError::XmlError {
339                                message: err.to_string(),
340                                location,
341                            });
342                        }
343                    }
344                }
345                Ok(Event::End(_)) => {
346                    break;
347                }
348                Ok(Event::Eof) => {
349                    let location = self.get_current_location();
350                    return Err(ParseError::XmlError {
351                        message: "Unexpected EOF".to_string(),
352                        location,
353                    });
354                }
355                Err(e) => {
356                    let location = self.get_current_location();
357                    return Err(ParseError::XmlError {
358                        message: e.to_string(),
359                        location,
360                    });
361                }
362                _ => {}
363            }
364            self.buffer.clear();
365        }
366        
367        Ok(text)
368    }
369    
370    /// Skip an element and all its children
371    fn skip_element(&mut self) -> Result<(), ParseError> {
372        let mut depth = 1;
373        self.buffer.clear();
374        
375        while depth > 0 {
376            match self.reader.read_event_into(&mut self.buffer) {
377                Ok(Event::Start(_)) => depth += 1,
378                Ok(Event::End(_)) => depth -= 1,
379                Ok(Event::Eof) => break,
380                Err(e) => {
381                    return Err(ParseError::XmlError {
382                        message: e.to_string(),
383                        location: self.get_current_location(),
384                    });
385                }
386                _ => {}
387            }
388            self.buffer.clear();
389        }
390        
391        Ok(())
392    }
393    
394    fn get_current_location(&self) -> ErrorLocation {
395        ErrorLocation {
396            line: 0, // Would need line tracking
397            column: 0,
398            byte_offset: Some(self.reader.buffer_position() as usize),
399            path: "/NewReleaseMessage".to_string(),
400        }
401    }
402}
403
404/// Iterator for streaming releases
405pub struct ReleaseIterator<'a, R: BufRead> {
406    parser: &'a mut StreamingParser<R>,
407    done: bool,
408    in_release_list: bool,
409}
410
411impl<'a, R: BufRead> ReleaseIterator<'a, R> {
412    fn new(parser: &'a mut StreamingParser<R>) -> Self {
413        Self {
414            parser,
415            done: false,
416            in_release_list: false,
417        }
418    }
419    
420    fn find_next_release(&mut self) -> Result<Option<Release>, ParseError> {
421        loop {
422            self.parser.buffer.clear();
423            match self.parser.reader.read_event_into(&mut self.parser.buffer) {
424                Ok(Event::Start(ref e)) => {
425                    match e.name().as_ref() {
426                        b"ReleaseList" => {
427                            self.in_release_list = true;
428                        }
429                        b"Release" if self.in_release_list => {
430                            return self.parse_release_element();
431                        }
432                        _ => {
433                            self.parser.skip_element()?;
434                        }
435                    }
436                }
437                Ok(Event::End(ref e)) if e.name().as_ref() == b"ReleaseList" => {
438                    self.done = true;
439                    return Ok(None);
440                }
441                Ok(Event::Eof) => {
442                    self.done = true;
443                    return Ok(None);
444                }
445                Err(e) => {
446                    return Err(ParseError::XmlError {
447                        message: e.to_string(),
448                        location: self.parser.get_current_location(),
449                    });
450                }
451                _ => {}
452            }
453        }
454    }
455    
456    fn parse_release_element(&mut self) -> Result<Option<Release>, ParseError> {
457        use ddex_core::models::common::LocalizedString;
458        
459        let mut release = Release {
460            release_reference: String::new(),
461            release_id: Vec::new(),
462            release_title: Vec::new(),
463            release_subtitle: None,
464            release_type: None,
465            genre: Vec::new(),
466            release_resource_reference_list: Vec::new(),
467            display_artist: Vec::new(),
468            party_list: Vec::new(),
469            release_date: Vec::new(),
470            territory_code: Vec::new(),
471            excluded_territory_code: Vec::new(),
472            extensions: None,
473            attributes: None,
474            comments: None,
475        };
476        
477        self.parser.buffer.clear();
478        loop {
479            match self.parser.reader.read_event_into(&mut self.parser.buffer) {
480                Ok(Event::Start(ref e)) => {
481                    match e.name().as_ref() {
482                        b"ReleaseReference" => {
483                            release.release_reference = self.parser.read_text_element()?;
484                        }
485                        b"ReferenceTitle" | b"Title" => {
486                            let text = self.parser.read_text_element()?;
487                            release.release_title.push(LocalizedString::new(text));
488                        }
489                        _ => {
490                            self.parser.skip_element()?;
491                        }
492                    }
493                }
494                Ok(Event::End(ref e)) if e.name().as_ref() == b"Release" => {
495                    break;
496                }
497                _ => {}
498            }
499            self.parser.buffer.clear();
500        }
501        
502        self.parser.releases_parsed += 1;
503        self.parser.update_byte_position();
504        self.parser.update_progress();
505        
506        // Check memory limit
507        let estimated_size = std::mem::size_of::<Release>() * self.parser.releases_parsed;
508        if estimated_size > self.parser.max_memory {
509            return Err(ParseError::SecurityViolation {
510                message: format!("Memory limit exceeded: {} > {}", estimated_size, self.parser.max_memory),
511            });
512        }
513        
514        // Yield control periodically
515        if self.parser.releases_parsed % self.parser.chunk_size == 0 {
516            std::thread::yield_now();
517        }
518        
519        Ok(Some(release))
520    }
521}
522
523impl<'a, R: BufRead> Iterator for ReleaseIterator<'a, R> {
524    type Item = Result<Release, ParseError>;
525    
526    fn next(&mut self) -> Option<Self::Item> {
527        if self.done {
528            return None;
529        }
530        
531        match self.find_next_release() {
532            Ok(Some(release)) => Some(Ok(release)),
533            Ok(None) => None,
534            Err(e) => Some(Err(e)),
535        }
536    }
537}
538
539// Similar iterators for other types
540pub struct ResourceIterator<'a, R: BufRead> {
541    parser: &'a mut StreamingParser<R>,
542    done: bool,
543    in_resource_list: bool,
544}
545
546impl<'a, R: BufRead> ResourceIterator<'a, R> {
547    fn new(parser: &'a mut StreamingParser<R>) -> Self {
548        Self {
549            parser,
550            done: false,
551            in_resource_list: false,
552        }
553    }
554}
555
556impl<'a, R: BufRead> Iterator for ResourceIterator<'a, R> {
557    type Item = Result<Resource, ParseError>;
558    
559    fn next(&mut self) -> Option<Self::Item> {
560        // Similar implementation to ReleaseIterator
561        None // Placeholder
562    }
563}
564
565pub struct PartyIterator<'a, R: BufRead> {
566    parser: &'a mut StreamingParser<R>,
567    done: bool,
568}
569
570impl<'a, R: BufRead> PartyIterator<'a, R> {
571    fn new(parser: &'a mut StreamingParser<R>) -> Self {
572        Self {
573            parser,
574            done: false,
575        }
576    }
577}
578
579impl<'a, R: BufRead> Iterator for PartyIterator<'a, R> {
580    type Item = Result<Party, ParseError>;
581    
582    fn next(&mut self) -> Option<Self::Item> {
583        None // Placeholder
584    }
585}
586
587pub struct DealIterator<'a, R: BufRead> {
588    parser: &'a mut StreamingParser<R>,
589    done: bool,
590}
591
592impl<'a, R: BufRead> DealIterator<'a, R> {
593    fn new(parser: &'a mut StreamingParser<R>) -> Self {
594        Self {
595            parser,
596            done: false,
597        }
598    }
599}
600
601impl<'a, R: BufRead> Iterator for DealIterator<'a, R> {
602    type Item = Result<Deal, ParseError>;
603    
604    fn next(&mut self) -> Option<Self::Item> {
605        None // Placeholder
606    }
607}
608
609/// Parse using streaming for large files
610pub fn parse_streaming<R: BufRead>(
611    reader: R,
612    version: ERNVersion,
613    options: ParseOptions,
614) -> Result<ParsedERNMessage, ParseError> {
615    let mut parser = StreamingParser::new(reader, version)
616        .with_chunk_size(options.chunk_size)
617        .with_max_memory(options.max_memory);
618    
619    // Parse header first
620    let message_header = parser.parse_header()?;
621    
622    // Collect releases in chunks to limit memory
623    let mut releases = Vec::new();
624    let mut resources = Vec::new();
625    let mut parties = Vec::new();
626    let mut deals = Vec::new();
627    
628    // Stream releases
629    for release_result in parser.stream_releases() {
630        let release = release_result?;
631        releases.push(release);
632    }
633    
634    // Stream resources
635    for resource_result in parser.stream_resources() {
636        let resource = resource_result?;
637        resources.push(resource);
638    }
639    
640    // Stream parties
641    for party_result in parser.stream_parties() {
642        let party = party_result?;
643        parties.push(party);
644    }
645    
646    // Stream deals
647    for deal_result in parser.stream_deals() {
648        let deal = deal_result?;
649        deals.push(deal);
650    }
651    
652    // Build ERNMessage
653    let graph = ERNMessage {
654        message_header,
655        parties,
656        resources,
657        releases,
658        deals,
659        version,
660        profile: None,
661        message_audit_trail: None,
662        extensions: None,
663        legacy_extensions: None,
664        comments: None,
665        attributes: None,
666    };
667    
668    // Flatten to developer-friendly model
669    let flat = Flattener::flatten(graph.clone());
670    
671    Ok(ParsedERNMessage { graph, flat, extensions: None })
672}