ddex_parser/parser/
stream.rs

1// core/src/parser/stream.rs
2//! Streaming parser for large DDEX files
3
4use crate::error::ParseError;
5use crate::parser::ParseOptions;
6use crate::transform::flatten::Flattener;
7use crate::utf8_utils;
8use ddex_core::models::flat::ParsedERNMessage;
9use ddex_core::models::graph::{Deal, ERNMessage, MessageHeader, Party, Release, Resource};
10use ddex_core::models::versions::ERNVersion;
11use quick_xml::events::Event;
12use quick_xml::Reader;
13use std::io::BufRead;
14use std::time::{Duration, Instant};
15
16/// Progress information for streaming parsing
17#[derive(Debug, Clone)]
18pub struct ParseProgress {
19    pub bytes_processed: u64,
20    pub releases_parsed: usize,
21    pub resources_parsed: usize,
22    pub elapsed: Duration,
23    pub estimated_total_bytes: Option<u64>,
24}
25
26/// Streaming parser for memory-efficient processing
27///
28/// Part of the public streaming API for parsing large DDEX files efficiently.
29#[allow(dead_code)]
30pub struct StreamingParser<R: BufRead> {
31    reader: Reader<R>,
32    _version: ERNVersion,
33    progress_callback: Option<Box<dyn FnMut(ParseProgress) + Send>>,
34    start_time: Instant,
35    bytes_processed: u64,
36    releases_parsed: usize,
37    resources_parsed: usize,
38    chunk_size: usize,
39    max_memory: usize,
40    buffer: Vec<u8>,
41    current_depth: usize,
42    max_depth: usize,
43}
44
45impl<R: BufRead> StreamingParser<R> {
46    pub fn new(reader: R, version: ERNVersion) -> Self {
47        Self::new_with_security_config(
48            reader,
49            version,
50            &crate::parser::security::SecurityConfig::default(),
51        )
52    }
53
54    pub fn new_with_security_config(
55        reader: R,
56        version: ERNVersion,
57        security_config: &crate::parser::security::SecurityConfig,
58    ) -> Self {
59        let mut xml_reader = Reader::from_reader(reader);
60        xml_reader.config_mut().trim_text(true);
61        xml_reader.config_mut().check_end_names = true;
62        xml_reader.config_mut().expand_empty_elements = false;
63
64        Self {
65            reader: xml_reader,
66            _version: version,
67            progress_callback: None,
68            start_time: Instant::now(),
69            bytes_processed: 0,
70            releases_parsed: 0,
71            resources_parsed: 0,
72            chunk_size: 100,
73            max_memory: 100 * 1024 * 1024, // 100MB default
74            buffer: Vec::with_capacity(8192),
75            current_depth: 0,
76            max_depth: security_config.max_element_depth,
77        }
78    }
79
80    pub fn with_progress_callback<F>(mut self, callback: F) -> Self
81    where
82        F: FnMut(ParseProgress) + Send + 'static,
83    {
84        self.progress_callback = Some(Box::new(callback));
85        self
86    }
87
88    pub fn with_chunk_size(mut self, size: usize) -> Self {
89        self.chunk_size = size;
90        self
91    }
92
93    pub fn with_max_memory(mut self, max: usize) -> Self {
94        self.max_memory = max;
95        self
96    }
97
98    fn update_progress(&mut self) {
99        if let Some(ref mut callback) = self.progress_callback {
100            let progress = ParseProgress {
101                bytes_processed: self.bytes_processed,
102                releases_parsed: self.releases_parsed,
103                resources_parsed: self.resources_parsed,
104                elapsed: self.start_time.elapsed(),
105                estimated_total_bytes: None,
106            };
107            callback(progress);
108        }
109    }
110
111    fn update_byte_position(&mut self) {
112        self.bytes_processed = self.reader.buffer_position();
113    }
114
115    /// Parse the message header
116    pub fn parse_header(&mut self) -> Result<MessageHeader, ParseError> {
117        self.buffer.clear();
118
119        // Skip to MessageHeader element
120        loop {
121            match self.reader.read_event_into(&mut self.buffer) {
122                Ok(Event::Start(ref e)) => {
123                    self.current_depth += 1;
124
125                    // Check depth limit
126                    if self.current_depth > self.max_depth {
127                        return Err(ParseError::DepthLimitExceeded {
128                            depth: self.current_depth,
129                            limit: self.max_depth,
130                        });
131                    }
132
133                    if e.name().as_ref() == b"MessageHeader" {
134                        return self.parse_message_header_element();
135                    } else {
136                        self.skip_element()?;
137                    }
138                }
139                Ok(Event::End(_)) => {
140                    self.current_depth = self.current_depth.saturating_sub(1);
141                }
142                Ok(Event::Eof) => {
143                    return Err(ParseError::XmlError("No MessageHeader found".to_string()));
144                }
145                Err(e) => {
146                    return Err(ParseError::XmlError(e.to_string()));
147                }
148                _ => {}
149            }
150            self.buffer.clear();
151        }
152    }
153
154    fn parse_message_header_element(&mut self) -> Result<MessageHeader, ParseError> {
155        use ddex_core::models::graph::{MessageRecipient, MessageSender, MessageType};
156
157        let mut message_id = String::new();
158        let message_type = MessageType::NewReleaseMessage;
159        let mut created_date_time = chrono::Utc::now();
160        let mut sender = MessageSender {
161            party_id: Vec::new(),
162            party_name: Vec::new(),
163            trading_name: None,
164            extensions: None,
165            attributes: None,
166            comments: None,
167        };
168        let mut recipient = MessageRecipient {
169            party_id: Vec::new(),
170            party_name: Vec::new(),
171            trading_name: None,
172            extensions: None,
173            attributes: None,
174            comments: None,
175        };
176
177        self.buffer.clear();
178        loop {
179            match self.reader.read_event_into(&mut self.buffer) {
180                Ok(Event::Start(ref e)) => match e.name().as_ref() {
181                    b"MessageId" => {
182                        message_id = self.read_text_element()?;
183                    }
184                    b"MessageCreatedDateTime" => {
185                        let text = self.read_text_element()?;
186                        created_date_time = chrono::DateTime::parse_from_rfc3339(&text)
187                            .map(|dt| dt.with_timezone(&chrono::Utc))
188                            .unwrap_or_else(|_| chrono::Utc::now());
189                    }
190                    b"MessageSender" => {
191                        sender = self.parse_message_sender()?;
192                    }
193                    b"MessageRecipient" => {
194                        recipient = self.parse_message_recipient()?;
195                    }
196                    _ => {
197                        self.skip_element()?;
198                    }
199                },
200                Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageHeader" => {
201                    break;
202                }
203                Ok(Event::Eof) => {
204                    return Err(ParseError::XmlError("Unexpected EOF in MessageHeader".to_string()));
205                }
206                Err(e) => {
207                    return Err(ParseError::XmlError(format!("XML error at {}: {}",
208                        self.get_current_location(), e)));
209                }
210                _ => {}
211            }
212            self.buffer.clear();
213        }
214
215        Ok(MessageHeader {
216            message_id,
217            message_type,
218            message_created_date_time: created_date_time,
219            message_sender: sender,
220            message_recipient: recipient,
221            message_control_type: None,
222            message_thread_id: None,
223            extensions: None,
224            attributes: None,
225            comments: None,
226        })
227    }
228
229    fn parse_message_sender(
230        &mut self,
231    ) -> Result<ddex_core::models::graph::MessageSender, ParseError> {
232        use ddex_core::models::common::{Identifier, LocalizedString};
233
234        let mut sender = ddex_core::models::graph::MessageSender {
235            party_id: Vec::new(),
236            party_name: Vec::new(),
237            trading_name: None,
238            extensions: None,
239            attributes: None,
240            comments: None,
241        };
242
243        self.buffer.clear();
244        loop {
245            match self.reader.read_event_into(&mut self.buffer) {
246                Ok(Event::Start(ref e)) => match e.name().as_ref() {
247                    b"PartyId" => {
248                        let value = self.read_text_element()?;
249                        sender.party_id.push(Identifier {
250                            id_type: ddex_core::models::common::IdentifierType::Proprietary,
251                            namespace: None,
252                            value,
253                        });
254                    }
255                    b"PartyName" => {
256                        let text = self.read_text_element()?;
257                        sender.party_name.push(LocalizedString::new(text));
258                    }
259                    _ => {
260                        self.skip_element()?;
261                    }
262                },
263                Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageSender" => {
264                    break;
265                }
266                _ => {}
267            }
268            self.buffer.clear();
269        }
270
271        Ok(sender)
272    }
273
274    fn parse_message_recipient(
275        &mut self,
276    ) -> Result<ddex_core::models::graph::MessageRecipient, ParseError> {
277        // Similar to parse_message_sender
278        use ddex_core::models::common::{Identifier, LocalizedString};
279
280        let mut recipient = ddex_core::models::graph::MessageRecipient {
281            party_id: Vec::new(),
282            party_name: Vec::new(),
283            trading_name: None,
284            extensions: None,
285            attributes: None,
286            comments: None,
287        };
288
289        self.buffer.clear();
290        loop {
291            match self.reader.read_event_into(&mut self.buffer) {
292                Ok(Event::Start(ref e)) => match e.name().as_ref() {
293                    b"PartyId" => {
294                        let value = self.read_text_element()?;
295                        recipient.party_id.push(Identifier {
296                            id_type: ddex_core::models::common::IdentifierType::Proprietary,
297                            namespace: None,
298                            value,
299                        });
300                    }
301                    b"PartyName" => {
302                        let text = self.read_text_element()?;
303                        recipient.party_name.push(LocalizedString::new(text));
304                    }
305                    _ => {
306                        self.skip_element()?;
307                    }
308                },
309                Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageRecipient" => {
310                    break;
311                }
312                _ => {}
313            }
314            self.buffer.clear();
315        }
316
317        Ok(recipient)
318    }
319
320    /// Stream releases one at a time for memory efficiency
321    pub fn stream_releases(&mut self) -> ReleaseIterator<'_, R> {
322        ReleaseIterator::new(self)
323    }
324
325    /// Stream resources one at a time
326    pub fn stream_resources(&mut self) -> ResourceIterator<'_, R> {
327        ResourceIterator::new(self)
328    }
329
330    /// Stream parties
331    pub fn stream_parties(&mut self) -> PartyIterator<'_, R> {
332        PartyIterator::new(self)
333    }
334
335    /// Stream deals
336    pub fn stream_deals(&mut self) -> DealIterator<'_, R> {
337        DealIterator::new(self)
338    }
339
340    /// Helper to read text content of current element
341    fn read_text_element(&mut self) -> Result<String, ParseError> {
342        let mut text = String::new();
343        self.buffer.clear();
344
345        loop {
346            let event = self.reader.read_event_into(&mut self.buffer);
347            match event {
348                Ok(Event::Text(e)) => {
349                    // Use proper UTF-8 handling from utf8_utils
350                    let current_pos = self.reader.buffer_position() as usize;
351                    text = utf8_utils::handle_text_node(&e, current_pos)?;
352                }
353                Ok(Event::End(_)) => {
354                    break;
355                }
356                Ok(Event::Eof) => {
357                    let location = self.get_current_location();
358                    return Err(ParseError::XmlError("Unexpected EOF".to_string()));
359                }
360                Err(e) => {
361                    let location = self.get_current_location();
362                    return Err(ParseError::XmlError(format!("XML error at {}: {}", location, e)));
363                }
364                _ => {}
365            }
366            self.buffer.clear();
367        }
368
369        Ok(text)
370    }
371
372    /// Skip an element and all its children
373    fn skip_element(&mut self) -> Result<(), ParseError> {
374        let mut local_depth = 1;
375        self.buffer.clear();
376
377        while local_depth > 0 {
378            match self.reader.read_event_into(&mut self.buffer) {
379                Ok(Event::Start(_)) => {
380                    local_depth += 1;
381                    self.current_depth += 1;
382
383                    // Check depth limit
384                    if self.current_depth > self.max_depth {
385                        return Err(ParseError::DepthLimitExceeded {
386                            depth: self.current_depth,
387                            limit: self.max_depth,
388                        });
389                    }
390                }
391                Ok(Event::End(_)) => {
392                    local_depth -= 1;
393                    self.current_depth = self.current_depth.saturating_sub(1);
394                }
395                Ok(Event::Eof) => break,
396                Err(e) => {
397                    return Err(ParseError::XmlError( e.to_string()));
398                }
399                _ => {}
400            }
401            self.buffer.clear();
402        }
403
404        Ok(())
405    }
406
407    fn get_current_location(&self) -> String {
408        format!("byte offset {} in /NewReleaseMessage", self.reader.buffer_position())
409    }
410}
411
412/// Iterator for streaming releases
413///
414/// Part of the public streaming API.
415#[allow(dead_code)]
416pub struct ReleaseIterator<'a, R: BufRead> {
417    parser: &'a mut StreamingParser<R>,
418    done: bool,
419    in_release_list: bool,
420}
421
422impl<'a, R: BufRead> ReleaseIterator<'a, R> {
423    fn new(parser: &'a mut StreamingParser<R>) -> Self {
424        Self {
425            parser,
426            done: false,
427            in_release_list: false,
428        }
429    }
430
431    fn find_next_release(&mut self) -> Result<Option<Release>, ParseError> {
432        loop {
433            self.parser.buffer.clear();
434            match self.parser.reader.read_event_into(&mut self.parser.buffer) {
435                Ok(Event::Start(ref e)) => match e.name().as_ref() {
436                    b"ReleaseList" => {
437                        self.in_release_list = true;
438                    }
439                    b"Release" if self.in_release_list => {
440                        return self.parse_release_element();
441                    }
442                    _ => {
443                        self.parser.skip_element()?;
444                    }
445                },
446                Ok(Event::End(ref e)) if e.name().as_ref() == b"ReleaseList" => {
447                    self.done = true;
448                    return Ok(None);
449                }
450                Ok(Event::Eof) => {
451                    self.done = true;
452                    return Ok(None);
453                }
454                Err(e) => {
455                    return Err(ParseError::XmlError( e.to_string()));
456                }
457                _ => {}
458            }
459        }
460    }
461
462    fn parse_release_element(&mut self) -> Result<Option<Release>, ParseError> {
463        use ddex_core::models::common::LocalizedString;
464
465        let mut release = Release {
466            release_reference: String::new(),
467            release_id: Vec::new(),
468            release_title: Vec::new(),
469            release_subtitle: None,
470            release_type: None,
471            genre: Vec::new(),
472            release_resource_reference_list: Vec::new(),
473            display_artist: Vec::new(),
474            party_list: Vec::new(),
475            release_date: Vec::new(),
476            territory_code: Vec::new(),
477            excluded_territory_code: Vec::new(),
478            extensions: None,
479            attributes: None,
480            comments: None,
481        };
482
483        self.parser.buffer.clear();
484        loop {
485            match self.parser.reader.read_event_into(&mut self.parser.buffer) {
486                Ok(Event::Start(ref e)) => match e.name().as_ref() {
487                    b"ReleaseReference" => {
488                        release.release_reference = self.parser.read_text_element()?;
489                    }
490                    b"ReferenceTitle" | b"Title" => {
491                        let text = self.parser.read_text_element()?;
492                        release.release_title.push(LocalizedString::new(text));
493                    }
494                    _ => {
495                        self.parser.skip_element()?;
496                    }
497                },
498                Ok(Event::End(ref e)) if e.name().as_ref() == b"Release" => {
499                    break;
500                }
501                _ => {}
502            }
503            self.parser.buffer.clear();
504        }
505
506        self.parser.releases_parsed += 1;
507        self.parser.update_byte_position();
508        self.parser.update_progress();
509
510        // Check memory limit
511        let estimated_size = std::mem::size_of::<Release>() * self.parser.releases_parsed;
512        if estimated_size > self.parser.max_memory {
513            return Err(ParseError::SecurityViolation {
514                message: format!(
515                    "Memory limit exceeded: {} > {}",
516                    estimated_size, self.parser.max_memory
517                ),
518            });
519        }
520
521        // Yield control periodically
522        if self.parser.releases_parsed % self.parser.chunk_size == 0 {
523            std::thread::yield_now();
524        }
525
526        Ok(Some(release))
527    }
528}
529
530impl<'a, R: BufRead> Iterator for ReleaseIterator<'a, R> {
531    type Item = Result<Release, ParseError>;
532
533    fn next(&mut self) -> Option<Self::Item> {
534        if self.done {
535            return None;
536        }
537
538        match self.find_next_release() {
539            Ok(Some(release)) => Some(Ok(release)),
540            Ok(None) => None,
541            Err(e) => Some(Err(e)),
542        }
543    }
544}
545
546// Similar iterators for other types
547pub struct ResourceIterator<'a, R: BufRead> {
548    _parser: &'a mut StreamingParser<R>,
549    _done: bool,
550    _in_resource_list: bool,
551}
552
553impl<'a, R: BufRead> ResourceIterator<'a, R> {
554    fn new(parser: &'a mut StreamingParser<R>) -> Self {
555        Self {
556            _parser: parser,
557            _done: false,
558            _in_resource_list: false,
559        }
560    }
561}
562
563impl<'a, R: BufRead> Iterator for ResourceIterator<'a, R> {
564    type Item = Result<Resource, ParseError>;
565
566    fn next(&mut self) -> Option<Self::Item> {
567        // Similar implementation to ReleaseIterator
568        None // Placeholder
569    }
570}
571
572pub struct PartyIterator<'a, R: BufRead> {
573    _parser: &'a mut StreamingParser<R>,
574    _done: bool,
575}
576
577impl<'a, R: BufRead> PartyIterator<'a, R> {
578    fn new(parser: &'a mut StreamingParser<R>) -> Self {
579        Self {
580            _parser: parser,
581            _done: false,
582        }
583    }
584}
585
586impl<'a, R: BufRead> Iterator for PartyIterator<'a, R> {
587    type Item = Result<Party, ParseError>;
588
589    fn next(&mut self) -> Option<Self::Item> {
590        None // Placeholder
591    }
592}
593
594pub struct DealIterator<'a, R: BufRead> {
595    _parser: &'a mut StreamingParser<R>,
596    _done: bool,
597}
598
599impl<'a, R: BufRead> DealIterator<'a, R> {
600    fn new(parser: &'a mut StreamingParser<R>) -> Self {
601        Self {
602            _parser: parser,
603            _done: false,
604        }
605    }
606}
607
608impl<'a, R: BufRead> Iterator for DealIterator<'a, R> {
609    type Item = Result<Deal, ParseError>;
610
611    fn next(&mut self) -> Option<Self::Item> {
612        None // Placeholder
613    }
614}
615
616/// Parse using streaming for large files
617pub fn parse_streaming<R: BufRead>(
618    reader: R,
619    version: ERNVersion,
620    options: ParseOptions,
621    security_config: &crate::parser::security::SecurityConfig,
622) -> Result<ParsedERNMessage, ParseError> {
623    let mut parser = StreamingParser::new_with_security_config(reader, version, security_config)
624        .with_chunk_size(options.chunk_size)
625        .with_max_memory(options.max_memory);
626
627    // Parse header first
628    let message_header = parser.parse_header()?;
629
630    // Collect releases in chunks to limit memory
631    let mut releases = Vec::new();
632    let mut resources = Vec::new();
633    let mut parties = Vec::new();
634    let mut deals = Vec::new();
635
636    // Stream releases
637    for release_result in parser.stream_releases() {
638        let release = release_result?;
639        releases.push(release);
640    }
641
642    // Stream resources
643    for resource_result in parser.stream_resources() {
644        let resource = resource_result?;
645        resources.push(resource);
646    }
647
648    // Stream parties
649    for party_result in parser.stream_parties() {
650        let party = party_result?;
651        parties.push(party);
652    }
653
654    // Stream deals
655    for deal_result in parser.stream_deals() {
656        let deal = deal_result?;
657        deals.push(deal);
658    }
659
660    // Build ERNMessage
661    let graph = ERNMessage {
662        message_header,
663        parties,
664        resources,
665        releases,
666        deals,
667        version,
668        profile: None,
669        message_audit_trail: None,
670        extensions: None,
671        legacy_extensions: None,
672        comments: None,
673        attributes: None,
674    };
675
676    // Flatten to developer-friendly model
677    let flat = Flattener::flatten(graph.clone());
678
679    Ok(ParsedERNMessage {
680        graph,
681        flat: flat?,
682        extensions: None,
683    })
684}