ddex_parser/parser/
stream.rs

1// core/src/parser/stream.rs
2//! Streaming parser for large DDEX files
3
4use crate::error::{ErrorLocation, ParseError};
5use crate::parser::ParseOptions;
6use crate::transform::flatten::Flattener;
7use crate::utf8_utils;
8use ddex_core::models::flat::ParsedERNMessage;
9use ddex_core::models::graph::{Deal, ERNMessage, MessageHeader, Party, Release, Resource};
10use ddex_core::models::versions::ERNVersion;
11use quick_xml::events::Event;
12use quick_xml::Reader;
13use std::io::BufRead;
14use std::time::{Duration, Instant};
15
16/// Progress information for streaming parsing
17#[derive(Debug, Clone)]
18pub struct ParseProgress {
19    pub bytes_processed: u64,
20    pub releases_parsed: usize,
21    pub resources_parsed: usize,
22    pub elapsed: Duration,
23    pub estimated_total_bytes: Option<u64>,
24}
25
26/// Streaming parser for memory-efficient processing
27///
28/// Part of the public streaming API for parsing large DDEX files efficiently.
29#[allow(dead_code)]
30pub struct StreamingParser<R: BufRead> {
31    reader: Reader<R>,
32    _version: ERNVersion,
33    progress_callback: Option<Box<dyn FnMut(ParseProgress) + Send>>,
34    start_time: Instant,
35    bytes_processed: u64,
36    releases_parsed: usize,
37    resources_parsed: usize,
38    chunk_size: usize,
39    max_memory: usize,
40    buffer: Vec<u8>,
41    current_depth: usize,
42    max_depth: usize,
43}
44
45impl<R: BufRead> StreamingParser<R> {
46    pub fn new(reader: R, version: ERNVersion) -> Self {
47        Self::new_with_security_config(
48            reader,
49            version,
50            &crate::parser::security::SecurityConfig::default(),
51        )
52    }
53
54    pub fn new_with_security_config(
55        reader: R,
56        version: ERNVersion,
57        security_config: &crate::parser::security::SecurityConfig,
58    ) -> Self {
59        let mut xml_reader = Reader::from_reader(reader);
60        xml_reader.config_mut().trim_text(true);
61        xml_reader.config_mut().check_end_names = true;
62        xml_reader.config_mut().expand_empty_elements = false;
63
64        Self {
65            reader: xml_reader,
66            _version: version,
67            progress_callback: None,
68            start_time: Instant::now(),
69            bytes_processed: 0,
70            releases_parsed: 0,
71            resources_parsed: 0,
72            chunk_size: 100,
73            max_memory: 100 * 1024 * 1024, // 100MB default
74            buffer: Vec::with_capacity(8192),
75            current_depth: 0,
76            max_depth: security_config.max_element_depth,
77        }
78    }
79
80    pub fn with_progress_callback<F>(mut self, callback: F) -> Self
81    where
82        F: FnMut(ParseProgress) + Send + 'static,
83    {
84        self.progress_callback = Some(Box::new(callback));
85        self
86    }
87
88    pub fn with_chunk_size(mut self, size: usize) -> Self {
89        self.chunk_size = size;
90        self
91    }
92
93    pub fn with_max_memory(mut self, max: usize) -> Self {
94        self.max_memory = max;
95        self
96    }
97
98    fn update_progress(&mut self) {
99        if let Some(ref mut callback) = self.progress_callback {
100            let progress = ParseProgress {
101                bytes_processed: self.bytes_processed,
102                releases_parsed: self.releases_parsed,
103                resources_parsed: self.resources_parsed,
104                elapsed: self.start_time.elapsed(),
105                estimated_total_bytes: None,
106            };
107            callback(progress);
108        }
109    }
110
111    fn update_byte_position(&mut self) {
112        self.bytes_processed = self.reader.buffer_position();
113    }
114
115    /// Parse the message header
116    pub fn parse_header(&mut self) -> Result<MessageHeader, ParseError> {
117        self.buffer.clear();
118
119        // Skip to MessageHeader element
120        loop {
121            match self.reader.read_event_into(&mut self.buffer) {
122                Ok(Event::Start(ref e)) => {
123                    self.current_depth += 1;
124
125                    // Check depth limit
126                    if self.current_depth > self.max_depth {
127                        return Err(ParseError::DepthLimitExceeded {
128                            depth: self.current_depth,
129                            max: self.max_depth,
130                        });
131                    }
132
133                    if e.name().as_ref() == b"MessageHeader" {
134                        return self.parse_message_header_element();
135                    } else {
136                        self.skip_element()?;
137                    }
138                }
139                Ok(Event::End(_)) => {
140                    self.current_depth = self.current_depth.saturating_sub(1);
141                }
142                Ok(Event::Eof) => {
143                    return Err(ParseError::XmlError {
144                        message: "No MessageHeader found".to_string(),
145                        location: ErrorLocation {
146                            line: 0,
147                            column: 0,
148                            byte_offset: Some(self.reader.buffer_position() as usize),
149                            path: "/".to_string(),
150                        },
151                    });
152                }
153                Err(e) => {
154                    return Err(ParseError::XmlError {
155                        message: e.to_string(),
156                        location: self.get_current_location(),
157                    });
158                }
159                _ => {}
160            }
161            self.buffer.clear();
162        }
163    }
164
165    fn parse_message_header_element(&mut self) -> Result<MessageHeader, ParseError> {
166        use ddex_core::models::graph::{MessageRecipient, MessageSender, MessageType};
167
168        let mut message_id = String::new();
169        let message_type = MessageType::NewReleaseMessage;
170        let mut created_date_time = chrono::Utc::now();
171        let mut sender = MessageSender {
172            party_id: Vec::new(),
173            party_name: Vec::new(),
174            trading_name: None,
175            extensions: None,
176            attributes: None,
177            comments: None,
178        };
179        let mut recipient = MessageRecipient {
180            party_id: Vec::new(),
181            party_name: Vec::new(),
182            trading_name: None,
183            extensions: None,
184            attributes: None,
185            comments: None,
186        };
187
188        self.buffer.clear();
189        loop {
190            match self.reader.read_event_into(&mut self.buffer) {
191                Ok(Event::Start(ref e)) => match e.name().as_ref() {
192                    b"MessageId" => {
193                        message_id = self.read_text_element()?;
194                    }
195                    b"MessageCreatedDateTime" => {
196                        let text = self.read_text_element()?;
197                        created_date_time = chrono::DateTime::parse_from_rfc3339(&text)
198                            .map(|dt| dt.with_timezone(&chrono::Utc))
199                            .unwrap_or_else(|_| chrono::Utc::now());
200                    }
201                    b"MessageSender" => {
202                        sender = self.parse_message_sender()?;
203                    }
204                    b"MessageRecipient" => {
205                        recipient = self.parse_message_recipient()?;
206                    }
207                    _ => {
208                        self.skip_element()?;
209                    }
210                },
211                Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageHeader" => {
212                    break;
213                }
214                Ok(Event::Eof) => {
215                    return Err(ParseError::XmlError {
216                        message: "Unexpected EOF in MessageHeader".to_string(),
217                        location: self.get_current_location(),
218                    });
219                }
220                Err(e) => {
221                    return Err(ParseError::XmlError {
222                        message: e.to_string(),
223                        location: self.get_current_location(),
224                    });
225                }
226                _ => {}
227            }
228            self.buffer.clear();
229        }
230
231        Ok(MessageHeader {
232            message_id,
233            message_type,
234            message_created_date_time: created_date_time,
235            message_sender: sender,
236            message_recipient: recipient,
237            message_control_type: None,
238            message_thread_id: None,
239            extensions: None,
240            attributes: None,
241            comments: None,
242        })
243    }
244
245    fn parse_message_sender(
246        &mut self,
247    ) -> Result<ddex_core::models::graph::MessageSender, ParseError> {
248        use ddex_core::models::common::{Identifier, LocalizedString};
249
250        let mut sender = ddex_core::models::graph::MessageSender {
251            party_id: Vec::new(),
252            party_name: Vec::new(),
253            trading_name: None,
254            extensions: None,
255            attributes: None,
256            comments: None,
257        };
258
259        self.buffer.clear();
260        loop {
261            match self.reader.read_event_into(&mut self.buffer) {
262                Ok(Event::Start(ref e)) => match e.name().as_ref() {
263                    b"PartyId" => {
264                        let value = self.read_text_element()?;
265                        sender.party_id.push(Identifier {
266                            id_type: ddex_core::models::common::IdentifierType::Proprietary,
267                            namespace: None,
268                            value,
269                        });
270                    }
271                    b"PartyName" => {
272                        let text = self.read_text_element()?;
273                        sender.party_name.push(LocalizedString::new(text));
274                    }
275                    _ => {
276                        self.skip_element()?;
277                    }
278                },
279                Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageSender" => {
280                    break;
281                }
282                _ => {}
283            }
284            self.buffer.clear();
285        }
286
287        Ok(sender)
288    }
289
290    fn parse_message_recipient(
291        &mut self,
292    ) -> Result<ddex_core::models::graph::MessageRecipient, ParseError> {
293        // Similar to parse_message_sender
294        use ddex_core::models::common::{Identifier, LocalizedString};
295
296        let mut recipient = ddex_core::models::graph::MessageRecipient {
297            party_id: Vec::new(),
298            party_name: Vec::new(),
299            trading_name: None,
300            extensions: None,
301            attributes: None,
302            comments: None,
303        };
304
305        self.buffer.clear();
306        loop {
307            match self.reader.read_event_into(&mut self.buffer) {
308                Ok(Event::Start(ref e)) => match e.name().as_ref() {
309                    b"PartyId" => {
310                        let value = self.read_text_element()?;
311                        recipient.party_id.push(Identifier {
312                            id_type: ddex_core::models::common::IdentifierType::Proprietary,
313                            namespace: None,
314                            value,
315                        });
316                    }
317                    b"PartyName" => {
318                        let text = self.read_text_element()?;
319                        recipient.party_name.push(LocalizedString::new(text));
320                    }
321                    _ => {
322                        self.skip_element()?;
323                    }
324                },
325                Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageRecipient" => {
326                    break;
327                }
328                _ => {}
329            }
330            self.buffer.clear();
331        }
332
333        Ok(recipient)
334    }
335
336    /// Stream releases one at a time for memory efficiency
337    pub fn stream_releases(&mut self) -> ReleaseIterator<'_, R> {
338        ReleaseIterator::new(self)
339    }
340
341    /// Stream resources one at a time
342    pub fn stream_resources(&mut self) -> ResourceIterator<'_, R> {
343        ResourceIterator::new(self)
344    }
345
346    /// Stream parties
347    pub fn stream_parties(&mut self) -> PartyIterator<'_, R> {
348        PartyIterator::new(self)
349    }
350
351    /// Stream deals
352    pub fn stream_deals(&mut self) -> DealIterator<'_, R> {
353        DealIterator::new(self)
354    }
355
356    /// Helper to read text content of current element
357    fn read_text_element(&mut self) -> Result<String, ParseError> {
358        let mut text = String::new();
359        self.buffer.clear();
360
361        loop {
362            let event = self.reader.read_event_into(&mut self.buffer);
363            match event {
364                Ok(Event::Text(e)) => {
365                    // Use proper UTF-8 handling from utf8_utils
366                    let current_pos = self.reader.buffer_position() as usize;
367                    text = utf8_utils::handle_text_node(&e, current_pos)?;
368                }
369                Ok(Event::End(_)) => {
370                    break;
371                }
372                Ok(Event::Eof) => {
373                    let location = self.get_current_location();
374                    return Err(ParseError::XmlError {
375                        message: "Unexpected EOF".to_string(),
376                        location,
377                    });
378                }
379                Err(e) => {
380                    let location = self.get_current_location();
381                    return Err(ParseError::XmlError {
382                        message: e.to_string(),
383                        location,
384                    });
385                }
386                _ => {}
387            }
388            self.buffer.clear();
389        }
390
391        Ok(text)
392    }
393
394    /// Skip an element and all its children
395    fn skip_element(&mut self) -> Result<(), ParseError> {
396        let mut local_depth = 1;
397        self.buffer.clear();
398
399        while local_depth > 0 {
400            match self.reader.read_event_into(&mut self.buffer) {
401                Ok(Event::Start(_)) => {
402                    local_depth += 1;
403                    self.current_depth += 1;
404
405                    // Check depth limit
406                    if self.current_depth > self.max_depth {
407                        return Err(ParseError::DepthLimitExceeded {
408                            depth: self.current_depth,
409                            max: self.max_depth,
410                        });
411                    }
412                }
413                Ok(Event::End(_)) => {
414                    local_depth -= 1;
415                    self.current_depth = self.current_depth.saturating_sub(1);
416                }
417                Ok(Event::Eof) => break,
418                Err(e) => {
419                    return Err(ParseError::XmlError {
420                        message: e.to_string(),
421                        location: self.get_current_location(),
422                    });
423                }
424                _ => {}
425            }
426            self.buffer.clear();
427        }
428
429        Ok(())
430    }
431
432    fn get_current_location(&self) -> ErrorLocation {
433        ErrorLocation {
434            line: 0, // Would need line tracking
435            column: 0,
436            byte_offset: Some(self.reader.buffer_position() as usize),
437            path: "/NewReleaseMessage".to_string(),
438        }
439    }
440}
441
442/// Iterator for streaming releases
443///
444/// Part of the public streaming API.
445#[allow(dead_code)]
446pub struct ReleaseIterator<'a, R: BufRead> {
447    parser: &'a mut StreamingParser<R>,
448    done: bool,
449    in_release_list: bool,
450}
451
452impl<'a, R: BufRead> ReleaseIterator<'a, R> {
453    fn new(parser: &'a mut StreamingParser<R>) -> Self {
454        Self {
455            parser,
456            done: false,
457            in_release_list: false,
458        }
459    }
460
461    fn find_next_release(&mut self) -> Result<Option<Release>, ParseError> {
462        loop {
463            self.parser.buffer.clear();
464            match self.parser.reader.read_event_into(&mut self.parser.buffer) {
465                Ok(Event::Start(ref e)) => match e.name().as_ref() {
466                    b"ReleaseList" => {
467                        self.in_release_list = true;
468                    }
469                    b"Release" if self.in_release_list => {
470                        return self.parse_release_element();
471                    }
472                    _ => {
473                        self.parser.skip_element()?;
474                    }
475                },
476                Ok(Event::End(ref e)) if e.name().as_ref() == b"ReleaseList" => {
477                    self.done = true;
478                    return Ok(None);
479                }
480                Ok(Event::Eof) => {
481                    self.done = true;
482                    return Ok(None);
483                }
484                Err(e) => {
485                    return Err(ParseError::XmlError {
486                        message: e.to_string(),
487                        location: self.parser.get_current_location(),
488                    });
489                }
490                _ => {}
491            }
492        }
493    }
494
495    fn parse_release_element(&mut self) -> Result<Option<Release>, ParseError> {
496        use ddex_core::models::common::LocalizedString;
497
498        let mut release = Release {
499            release_reference: String::new(),
500            release_id: Vec::new(),
501            release_title: Vec::new(),
502            release_subtitle: None,
503            release_type: None,
504            genre: Vec::new(),
505            release_resource_reference_list: Vec::new(),
506            display_artist: Vec::new(),
507            party_list: Vec::new(),
508            release_date: Vec::new(),
509            territory_code: Vec::new(),
510            excluded_territory_code: Vec::new(),
511            extensions: None,
512            attributes: None,
513            comments: None,
514        };
515
516        self.parser.buffer.clear();
517        loop {
518            match self.parser.reader.read_event_into(&mut self.parser.buffer) {
519                Ok(Event::Start(ref e)) => match e.name().as_ref() {
520                    b"ReleaseReference" => {
521                        release.release_reference = self.parser.read_text_element()?;
522                    }
523                    b"ReferenceTitle" | b"Title" => {
524                        let text = self.parser.read_text_element()?;
525                        release.release_title.push(LocalizedString::new(text));
526                    }
527                    _ => {
528                        self.parser.skip_element()?;
529                    }
530                },
531                Ok(Event::End(ref e)) if e.name().as_ref() == b"Release" => {
532                    break;
533                }
534                _ => {}
535            }
536            self.parser.buffer.clear();
537        }
538
539        self.parser.releases_parsed += 1;
540        self.parser.update_byte_position();
541        self.parser.update_progress();
542
543        // Check memory limit
544        let estimated_size = std::mem::size_of::<Release>() * self.parser.releases_parsed;
545        if estimated_size > self.parser.max_memory {
546            return Err(ParseError::SecurityViolation {
547                message: format!(
548                    "Memory limit exceeded: {} > {}",
549                    estimated_size, self.parser.max_memory
550                ),
551            });
552        }
553
554        // Yield control periodically
555        if self.parser.releases_parsed % self.parser.chunk_size == 0 {
556            std::thread::yield_now();
557        }
558
559        Ok(Some(release))
560    }
561}
562
563impl<'a, R: BufRead> Iterator for ReleaseIterator<'a, R> {
564    type Item = Result<Release, ParseError>;
565
566    fn next(&mut self) -> Option<Self::Item> {
567        if self.done {
568            return None;
569        }
570
571        match self.find_next_release() {
572            Ok(Some(release)) => Some(Ok(release)),
573            Ok(None) => None,
574            Err(e) => Some(Err(e)),
575        }
576    }
577}
578
579// Similar iterators for other types
580pub struct ResourceIterator<'a, R: BufRead> {
581    _parser: &'a mut StreamingParser<R>,
582    _done: bool,
583    _in_resource_list: bool,
584}
585
586impl<'a, R: BufRead> ResourceIterator<'a, R> {
587    fn new(parser: &'a mut StreamingParser<R>) -> Self {
588        Self {
589            _parser: parser,
590            _done: false,
591            _in_resource_list: false,
592        }
593    }
594}
595
596impl<'a, R: BufRead> Iterator for ResourceIterator<'a, R> {
597    type Item = Result<Resource, ParseError>;
598
599    fn next(&mut self) -> Option<Self::Item> {
600        // Similar implementation to ReleaseIterator
601        None // Placeholder
602    }
603}
604
605pub struct PartyIterator<'a, R: BufRead> {
606    _parser: &'a mut StreamingParser<R>,
607    _done: bool,
608}
609
610impl<'a, R: BufRead> PartyIterator<'a, R> {
611    fn new(parser: &'a mut StreamingParser<R>) -> Self {
612        Self {
613            _parser: parser,
614            _done: false,
615        }
616    }
617}
618
619impl<'a, R: BufRead> Iterator for PartyIterator<'a, R> {
620    type Item = Result<Party, ParseError>;
621
622    fn next(&mut self) -> Option<Self::Item> {
623        None // Placeholder
624    }
625}
626
627pub struct DealIterator<'a, R: BufRead> {
628    _parser: &'a mut StreamingParser<R>,
629    _done: bool,
630}
631
632impl<'a, R: BufRead> DealIterator<'a, R> {
633    fn new(parser: &'a mut StreamingParser<R>) -> Self {
634        Self {
635            _parser: parser,
636            _done: false,
637        }
638    }
639}
640
641impl<'a, R: BufRead> Iterator for DealIterator<'a, R> {
642    type Item = Result<Deal, ParseError>;
643
644    fn next(&mut self) -> Option<Self::Item> {
645        None // Placeholder
646    }
647}
648
649/// Parse using streaming for large files
650pub fn parse_streaming<R: BufRead>(
651    reader: R,
652    version: ERNVersion,
653    options: ParseOptions,
654    security_config: &crate::parser::security::SecurityConfig,
655) -> Result<ParsedERNMessage, ParseError> {
656    let mut parser = StreamingParser::new_with_security_config(reader, version, security_config)
657        .with_chunk_size(options.chunk_size)
658        .with_max_memory(options.max_memory);
659
660    // Parse header first
661    let message_header = parser.parse_header()?;
662
663    // Collect releases in chunks to limit memory
664    let mut releases = Vec::new();
665    let mut resources = Vec::new();
666    let mut parties = Vec::new();
667    let mut deals = Vec::new();
668
669    // Stream releases
670    for release_result in parser.stream_releases() {
671        let release = release_result?;
672        releases.push(release);
673    }
674
675    // Stream resources
676    for resource_result in parser.stream_resources() {
677        let resource = resource_result?;
678        resources.push(resource);
679    }
680
681    // Stream parties
682    for party_result in parser.stream_parties() {
683        let party = party_result?;
684        parties.push(party);
685    }
686
687    // Stream deals
688    for deal_result in parser.stream_deals() {
689        let deal = deal_result?;
690        deals.push(deal);
691    }
692
693    // Build ERNMessage
694    let graph = ERNMessage {
695        message_header,
696        parties,
697        resources,
698        releases,
699        deals,
700        version,
701        profile: None,
702        message_audit_trail: None,
703        extensions: None,
704        legacy_extensions: None,
705        comments: None,
706        attributes: None,
707    };
708
709    // Flatten to developer-friendly model
710    let flat = Flattener::flatten(graph.clone());
711
712    Ok(ParsedERNMessage {
713        graph,
714        flat,
715        extensions: None,
716    })
717}