1use crate::error::{ErrorLocation, ParseError};
5use crate::parser::ParseOptions;
6use crate::transform::flatten::Flattener;
7use crate::utf8_utils;
8use ddex_core::models::flat::ParsedERNMessage;
9use ddex_core::models::graph::{Deal, ERNMessage, MessageHeader, Party, Release, Resource};
10use ddex_core::models::versions::ERNVersion;
11use quick_xml::events::Event;
12use quick_xml::Reader;
13use std::io::BufRead;
14use std::time::{Duration, Instant};
15
16#[derive(Debug, Clone)]
18pub struct ParseProgress {
19 pub bytes_processed: u64,
20 pub releases_parsed: usize,
21 pub resources_parsed: usize,
22 pub elapsed: Duration,
23 pub estimated_total_bytes: Option<u64>,
24}
25
26#[allow(dead_code)]
30pub struct StreamingParser<R: BufRead> {
31 reader: Reader<R>,
32 _version: ERNVersion,
33 progress_callback: Option<Box<dyn FnMut(ParseProgress) + Send>>,
34 start_time: Instant,
35 bytes_processed: u64,
36 releases_parsed: usize,
37 resources_parsed: usize,
38 chunk_size: usize,
39 max_memory: usize,
40 buffer: Vec<u8>,
41 current_depth: usize,
42 max_depth: usize,
43}
44
45impl<R: BufRead> StreamingParser<R> {
46 pub fn new(reader: R, version: ERNVersion) -> Self {
47 Self::new_with_security_config(
48 reader,
49 version,
50 &crate::parser::security::SecurityConfig::default(),
51 )
52 }
53
54 pub fn new_with_security_config(
55 reader: R,
56 version: ERNVersion,
57 security_config: &crate::parser::security::SecurityConfig,
58 ) -> Self {
59 let mut xml_reader = Reader::from_reader(reader);
60 xml_reader.config_mut().trim_text(true);
61 xml_reader.config_mut().check_end_names = true;
62 xml_reader.config_mut().expand_empty_elements = false;
63
64 Self {
65 reader: xml_reader,
66 _version: version,
67 progress_callback: None,
68 start_time: Instant::now(),
69 bytes_processed: 0,
70 releases_parsed: 0,
71 resources_parsed: 0,
72 chunk_size: 100,
73 max_memory: 100 * 1024 * 1024, buffer: Vec::with_capacity(8192),
75 current_depth: 0,
76 max_depth: security_config.max_element_depth,
77 }
78 }
79
80 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
81 where
82 F: FnMut(ParseProgress) + Send + 'static,
83 {
84 self.progress_callback = Some(Box::new(callback));
85 self
86 }
87
88 pub fn with_chunk_size(mut self, size: usize) -> Self {
89 self.chunk_size = size;
90 self
91 }
92
93 pub fn with_max_memory(mut self, max: usize) -> Self {
94 self.max_memory = max;
95 self
96 }
97
98 fn update_progress(&mut self) {
99 if let Some(ref mut callback) = self.progress_callback {
100 let progress = ParseProgress {
101 bytes_processed: self.bytes_processed,
102 releases_parsed: self.releases_parsed,
103 resources_parsed: self.resources_parsed,
104 elapsed: self.start_time.elapsed(),
105 estimated_total_bytes: None,
106 };
107 callback(progress);
108 }
109 }
110
111 fn update_byte_position(&mut self) {
112 self.bytes_processed = self.reader.buffer_position();
113 }
114
115 pub fn parse_header(&mut self) -> Result<MessageHeader, ParseError> {
117 self.buffer.clear();
118
119 loop {
121 match self.reader.read_event_into(&mut self.buffer) {
122 Ok(Event::Start(ref e)) => {
123 self.current_depth += 1;
124
125 if self.current_depth > self.max_depth {
127 return Err(ParseError::DepthLimitExceeded {
128 depth: self.current_depth,
129 max: self.max_depth,
130 });
131 }
132
133 if e.name().as_ref() == b"MessageHeader" {
134 return self.parse_message_header_element();
135 } else {
136 self.skip_element()?;
137 }
138 }
139 Ok(Event::End(_)) => {
140 self.current_depth = self.current_depth.saturating_sub(1);
141 }
142 Ok(Event::Eof) => {
143 return Err(ParseError::XmlError {
144 message: "No MessageHeader found".to_string(),
145 location: ErrorLocation {
146 line: 0,
147 column: 0,
148 byte_offset: Some(self.reader.buffer_position() as usize),
149 path: "/".to_string(),
150 },
151 });
152 }
153 Err(e) => {
154 return Err(ParseError::XmlError {
155 message: e.to_string(),
156 location: self.get_current_location(),
157 });
158 }
159 _ => {}
160 }
161 self.buffer.clear();
162 }
163 }
164
165 fn parse_message_header_element(&mut self) -> Result<MessageHeader, ParseError> {
166 use ddex_core::models::graph::{MessageRecipient, MessageSender, MessageType};
167
168 let mut message_id = String::new();
169 let message_type = MessageType::NewReleaseMessage;
170 let mut created_date_time = chrono::Utc::now();
171 let mut sender = MessageSender {
172 party_id: Vec::new(),
173 party_name: Vec::new(),
174 trading_name: None,
175 extensions: None,
176 attributes: None,
177 comments: None,
178 };
179 let mut recipient = MessageRecipient {
180 party_id: Vec::new(),
181 party_name: Vec::new(),
182 trading_name: None,
183 extensions: None,
184 attributes: None,
185 comments: None,
186 };
187
188 self.buffer.clear();
189 loop {
190 match self.reader.read_event_into(&mut self.buffer) {
191 Ok(Event::Start(ref e)) => match e.name().as_ref() {
192 b"MessageId" => {
193 message_id = self.read_text_element()?;
194 }
195 b"MessageCreatedDateTime" => {
196 let text = self.read_text_element()?;
197 created_date_time = chrono::DateTime::parse_from_rfc3339(&text)
198 .map(|dt| dt.with_timezone(&chrono::Utc))
199 .unwrap_or_else(|_| chrono::Utc::now());
200 }
201 b"MessageSender" => {
202 sender = self.parse_message_sender()?;
203 }
204 b"MessageRecipient" => {
205 recipient = self.parse_message_recipient()?;
206 }
207 _ => {
208 self.skip_element()?;
209 }
210 },
211 Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageHeader" => {
212 break;
213 }
214 Ok(Event::Eof) => {
215 return Err(ParseError::XmlError {
216 message: "Unexpected EOF in MessageHeader".to_string(),
217 location: self.get_current_location(),
218 });
219 }
220 Err(e) => {
221 return Err(ParseError::XmlError {
222 message: e.to_string(),
223 location: self.get_current_location(),
224 });
225 }
226 _ => {}
227 }
228 self.buffer.clear();
229 }
230
231 Ok(MessageHeader {
232 message_id,
233 message_type,
234 message_created_date_time: created_date_time,
235 message_sender: sender,
236 message_recipient: recipient,
237 message_control_type: None,
238 message_thread_id: None,
239 extensions: None,
240 attributes: None,
241 comments: None,
242 })
243 }
244
245 fn parse_message_sender(
246 &mut self,
247 ) -> Result<ddex_core::models::graph::MessageSender, ParseError> {
248 use ddex_core::models::common::{Identifier, LocalizedString};
249
250 let mut sender = ddex_core::models::graph::MessageSender {
251 party_id: Vec::new(),
252 party_name: Vec::new(),
253 trading_name: None,
254 extensions: None,
255 attributes: None,
256 comments: None,
257 };
258
259 self.buffer.clear();
260 loop {
261 match self.reader.read_event_into(&mut self.buffer) {
262 Ok(Event::Start(ref e)) => match e.name().as_ref() {
263 b"PartyId" => {
264 let value = self.read_text_element()?;
265 sender.party_id.push(Identifier {
266 id_type: ddex_core::models::common::IdentifierType::Proprietary,
267 namespace: None,
268 value,
269 });
270 }
271 b"PartyName" => {
272 let text = self.read_text_element()?;
273 sender.party_name.push(LocalizedString::new(text));
274 }
275 _ => {
276 self.skip_element()?;
277 }
278 },
279 Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageSender" => {
280 break;
281 }
282 _ => {}
283 }
284 self.buffer.clear();
285 }
286
287 Ok(sender)
288 }
289
290 fn parse_message_recipient(
291 &mut self,
292 ) -> Result<ddex_core::models::graph::MessageRecipient, ParseError> {
293 use ddex_core::models::common::{Identifier, LocalizedString};
295
296 let mut recipient = ddex_core::models::graph::MessageRecipient {
297 party_id: Vec::new(),
298 party_name: Vec::new(),
299 trading_name: None,
300 extensions: None,
301 attributes: None,
302 comments: None,
303 };
304
305 self.buffer.clear();
306 loop {
307 match self.reader.read_event_into(&mut self.buffer) {
308 Ok(Event::Start(ref e)) => match e.name().as_ref() {
309 b"PartyId" => {
310 let value = self.read_text_element()?;
311 recipient.party_id.push(Identifier {
312 id_type: ddex_core::models::common::IdentifierType::Proprietary,
313 namespace: None,
314 value,
315 });
316 }
317 b"PartyName" => {
318 let text = self.read_text_element()?;
319 recipient.party_name.push(LocalizedString::new(text));
320 }
321 _ => {
322 self.skip_element()?;
323 }
324 },
325 Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageRecipient" => {
326 break;
327 }
328 _ => {}
329 }
330 self.buffer.clear();
331 }
332
333 Ok(recipient)
334 }
335
336 pub fn stream_releases(&mut self) -> ReleaseIterator<'_, R> {
338 ReleaseIterator::new(self)
339 }
340
341 pub fn stream_resources(&mut self) -> ResourceIterator<'_, R> {
343 ResourceIterator::new(self)
344 }
345
346 pub fn stream_parties(&mut self) -> PartyIterator<'_, R> {
348 PartyIterator::new(self)
349 }
350
351 pub fn stream_deals(&mut self) -> DealIterator<'_, R> {
353 DealIterator::new(self)
354 }
355
356 fn read_text_element(&mut self) -> Result<String, ParseError> {
358 let mut text = String::new();
359 self.buffer.clear();
360
361 loop {
362 let event = self.reader.read_event_into(&mut self.buffer);
363 match event {
364 Ok(Event::Text(e)) => {
365 let current_pos = self.reader.buffer_position() as usize;
367 text = utf8_utils::handle_text_node(&e, current_pos)?;
368 }
369 Ok(Event::End(_)) => {
370 break;
371 }
372 Ok(Event::Eof) => {
373 let location = self.get_current_location();
374 return Err(ParseError::XmlError {
375 message: "Unexpected EOF".to_string(),
376 location,
377 });
378 }
379 Err(e) => {
380 let location = self.get_current_location();
381 return Err(ParseError::XmlError {
382 message: e.to_string(),
383 location,
384 });
385 }
386 _ => {}
387 }
388 self.buffer.clear();
389 }
390
391 Ok(text)
392 }
393
394 fn skip_element(&mut self) -> Result<(), ParseError> {
396 let mut local_depth = 1;
397 self.buffer.clear();
398
399 while local_depth > 0 {
400 match self.reader.read_event_into(&mut self.buffer) {
401 Ok(Event::Start(_)) => {
402 local_depth += 1;
403 self.current_depth += 1;
404
405 if self.current_depth > self.max_depth {
407 return Err(ParseError::DepthLimitExceeded {
408 depth: self.current_depth,
409 max: self.max_depth,
410 });
411 }
412 }
413 Ok(Event::End(_)) => {
414 local_depth -= 1;
415 self.current_depth = self.current_depth.saturating_sub(1);
416 }
417 Ok(Event::Eof) => break,
418 Err(e) => {
419 return Err(ParseError::XmlError {
420 message: e.to_string(),
421 location: self.get_current_location(),
422 });
423 }
424 _ => {}
425 }
426 self.buffer.clear();
427 }
428
429 Ok(())
430 }
431
432 fn get_current_location(&self) -> ErrorLocation {
433 ErrorLocation {
434 line: 0, column: 0,
436 byte_offset: Some(self.reader.buffer_position() as usize),
437 path: "/NewReleaseMessage".to_string(),
438 }
439 }
440}
441
442#[allow(dead_code)]
446pub struct ReleaseIterator<'a, R: BufRead> {
447 parser: &'a mut StreamingParser<R>,
448 done: bool,
449 in_release_list: bool,
450}
451
452impl<'a, R: BufRead> ReleaseIterator<'a, R> {
453 fn new(parser: &'a mut StreamingParser<R>) -> Self {
454 Self {
455 parser,
456 done: false,
457 in_release_list: false,
458 }
459 }
460
461 fn find_next_release(&mut self) -> Result<Option<Release>, ParseError> {
462 loop {
463 self.parser.buffer.clear();
464 match self.parser.reader.read_event_into(&mut self.parser.buffer) {
465 Ok(Event::Start(ref e)) => match e.name().as_ref() {
466 b"ReleaseList" => {
467 self.in_release_list = true;
468 }
469 b"Release" if self.in_release_list => {
470 return self.parse_release_element();
471 }
472 _ => {
473 self.parser.skip_element()?;
474 }
475 },
476 Ok(Event::End(ref e)) if e.name().as_ref() == b"ReleaseList" => {
477 self.done = true;
478 return Ok(None);
479 }
480 Ok(Event::Eof) => {
481 self.done = true;
482 return Ok(None);
483 }
484 Err(e) => {
485 return Err(ParseError::XmlError {
486 message: e.to_string(),
487 location: self.parser.get_current_location(),
488 });
489 }
490 _ => {}
491 }
492 }
493 }
494
495 fn parse_release_element(&mut self) -> Result<Option<Release>, ParseError> {
496 use ddex_core::models::common::LocalizedString;
497
498 let mut release = Release {
499 release_reference: String::new(),
500 release_id: Vec::new(),
501 release_title: Vec::new(),
502 release_subtitle: None,
503 release_type: None,
504 genre: Vec::new(),
505 release_resource_reference_list: Vec::new(),
506 display_artist: Vec::new(),
507 party_list: Vec::new(),
508 release_date: Vec::new(),
509 territory_code: Vec::new(),
510 excluded_territory_code: Vec::new(),
511 extensions: None,
512 attributes: None,
513 comments: None,
514 };
515
516 self.parser.buffer.clear();
517 loop {
518 match self.parser.reader.read_event_into(&mut self.parser.buffer) {
519 Ok(Event::Start(ref e)) => match e.name().as_ref() {
520 b"ReleaseReference" => {
521 release.release_reference = self.parser.read_text_element()?;
522 }
523 b"ReferenceTitle" | b"Title" => {
524 let text = self.parser.read_text_element()?;
525 release.release_title.push(LocalizedString::new(text));
526 }
527 _ => {
528 self.parser.skip_element()?;
529 }
530 },
531 Ok(Event::End(ref e)) if e.name().as_ref() == b"Release" => {
532 break;
533 }
534 _ => {}
535 }
536 self.parser.buffer.clear();
537 }
538
539 self.parser.releases_parsed += 1;
540 self.parser.update_byte_position();
541 self.parser.update_progress();
542
543 let estimated_size = std::mem::size_of::<Release>() * self.parser.releases_parsed;
545 if estimated_size > self.parser.max_memory {
546 return Err(ParseError::SecurityViolation {
547 message: format!(
548 "Memory limit exceeded: {} > {}",
549 estimated_size, self.parser.max_memory
550 ),
551 });
552 }
553
554 if self.parser.releases_parsed % self.parser.chunk_size == 0 {
556 std::thread::yield_now();
557 }
558
559 Ok(Some(release))
560 }
561}
562
563impl<'a, R: BufRead> Iterator for ReleaseIterator<'a, R> {
564 type Item = Result<Release, ParseError>;
565
566 fn next(&mut self) -> Option<Self::Item> {
567 if self.done {
568 return None;
569 }
570
571 match self.find_next_release() {
572 Ok(Some(release)) => Some(Ok(release)),
573 Ok(None) => None,
574 Err(e) => Some(Err(e)),
575 }
576 }
577}
578
579pub struct ResourceIterator<'a, R: BufRead> {
581 _parser: &'a mut StreamingParser<R>,
582 _done: bool,
583 _in_resource_list: bool,
584}
585
586impl<'a, R: BufRead> ResourceIterator<'a, R> {
587 fn new(parser: &'a mut StreamingParser<R>) -> Self {
588 Self {
589 _parser: parser,
590 _done: false,
591 _in_resource_list: false,
592 }
593 }
594}
595
596impl<'a, R: BufRead> Iterator for ResourceIterator<'a, R> {
597 type Item = Result<Resource, ParseError>;
598
599 fn next(&mut self) -> Option<Self::Item> {
600 None }
603}
604
605pub struct PartyIterator<'a, R: BufRead> {
606 _parser: &'a mut StreamingParser<R>,
607 _done: bool,
608}
609
610impl<'a, R: BufRead> PartyIterator<'a, R> {
611 fn new(parser: &'a mut StreamingParser<R>) -> Self {
612 Self {
613 _parser: parser,
614 _done: false,
615 }
616 }
617}
618
619impl<'a, R: BufRead> Iterator for PartyIterator<'a, R> {
620 type Item = Result<Party, ParseError>;
621
622 fn next(&mut self) -> Option<Self::Item> {
623 None }
625}
626
627pub struct DealIterator<'a, R: BufRead> {
628 _parser: &'a mut StreamingParser<R>,
629 _done: bool,
630}
631
632impl<'a, R: BufRead> DealIterator<'a, R> {
633 fn new(parser: &'a mut StreamingParser<R>) -> Self {
634 Self {
635 _parser: parser,
636 _done: false,
637 }
638 }
639}
640
641impl<'a, R: BufRead> Iterator for DealIterator<'a, R> {
642 type Item = Result<Deal, ParseError>;
643
644 fn next(&mut self) -> Option<Self::Item> {
645 None }
647}
648
649pub fn parse_streaming<R: BufRead>(
651 reader: R,
652 version: ERNVersion,
653 options: ParseOptions,
654 security_config: &crate::parser::security::SecurityConfig,
655) -> Result<ParsedERNMessage, ParseError> {
656 let mut parser = StreamingParser::new_with_security_config(reader, version, security_config)
657 .with_chunk_size(options.chunk_size)
658 .with_max_memory(options.max_memory);
659
660 let message_header = parser.parse_header()?;
662
663 let mut releases = Vec::new();
665 let mut resources = Vec::new();
666 let mut parties = Vec::new();
667 let mut deals = Vec::new();
668
669 for release_result in parser.stream_releases() {
671 let release = release_result?;
672 releases.push(release);
673 }
674
675 for resource_result in parser.stream_resources() {
677 let resource = resource_result?;
678 resources.push(resource);
679 }
680
681 for party_result in parser.stream_parties() {
683 let party = party_result?;
684 parties.push(party);
685 }
686
687 for deal_result in parser.stream_deals() {
689 let deal = deal_result?;
690 deals.push(deal);
691 }
692
693 let graph = ERNMessage {
695 message_header,
696 parties,
697 resources,
698 releases,
699 deals,
700 version,
701 profile: None,
702 message_audit_trail: None,
703 extensions: None,
704 legacy_extensions: None,
705 comments: None,
706 attributes: None,
707 };
708
709 let flat = Flattener::flatten(graph.clone());
711
712 Ok(ParsedERNMessage {
713 graph,
714 flat,
715 extensions: None,
716 })
717}