1use crate::error::ParseError;
5use crate::parser::ParseOptions;
6use crate::transform::flatten::Flattener;
7use crate::utf8_utils;
8use ddex_core::models::flat::ParsedERNMessage;
9use ddex_core::models::graph::{Deal, ERNMessage, MessageHeader, Party, Release, Resource};
10use ddex_core::models::versions::ERNVersion;
11use quick_xml::events::Event;
12use quick_xml::Reader;
13use std::io::BufRead;
14use std::time::{Duration, Instant};
15
16#[derive(Debug, Clone)]
18pub struct ParseProgress {
19 pub bytes_processed: u64,
20 pub releases_parsed: usize,
21 pub resources_parsed: usize,
22 pub elapsed: Duration,
23 pub estimated_total_bytes: Option<u64>,
24}
25
26#[allow(dead_code)]
30pub struct StreamingParser<R: BufRead> {
31 reader: Reader<R>,
32 _version: ERNVersion,
33 progress_callback: Option<Box<dyn FnMut(ParseProgress) + Send>>,
34 start_time: Instant,
35 bytes_processed: u64,
36 releases_parsed: usize,
37 resources_parsed: usize,
38 chunk_size: usize,
39 max_memory: usize,
40 buffer: Vec<u8>,
41 current_depth: usize,
42 max_depth: usize,
43}
44
45impl<R: BufRead> StreamingParser<R> {
46 pub fn new(reader: R, version: ERNVersion) -> Self {
47 Self::new_with_security_config(
48 reader,
49 version,
50 &crate::parser::security::SecurityConfig::default(),
51 )
52 }
53
54 pub fn new_with_security_config(
55 reader: R,
56 version: ERNVersion,
57 security_config: &crate::parser::security::SecurityConfig,
58 ) -> Self {
59 let mut xml_reader = Reader::from_reader(reader);
60 xml_reader.config_mut().trim_text(true);
61 xml_reader.config_mut().check_end_names = true;
62 xml_reader.config_mut().expand_empty_elements = false;
63
64 Self {
65 reader: xml_reader,
66 _version: version,
67 progress_callback: None,
68 start_time: Instant::now(),
69 bytes_processed: 0,
70 releases_parsed: 0,
71 resources_parsed: 0,
72 chunk_size: 100,
73 max_memory: 100 * 1024 * 1024, buffer: Vec::with_capacity(8192),
75 current_depth: 0,
76 max_depth: security_config.max_element_depth,
77 }
78 }
79
80 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
81 where
82 F: FnMut(ParseProgress) + Send + 'static,
83 {
84 self.progress_callback = Some(Box::new(callback));
85 self
86 }
87
88 pub fn with_chunk_size(mut self, size: usize) -> Self {
89 self.chunk_size = size;
90 self
91 }
92
93 pub fn with_max_memory(mut self, max: usize) -> Self {
94 self.max_memory = max;
95 self
96 }
97
98 fn update_progress(&mut self) {
99 if let Some(ref mut callback) = self.progress_callback {
100 let progress = ParseProgress {
101 bytes_processed: self.bytes_processed,
102 releases_parsed: self.releases_parsed,
103 resources_parsed: self.resources_parsed,
104 elapsed: self.start_time.elapsed(),
105 estimated_total_bytes: None,
106 };
107 callback(progress);
108 }
109 }
110
111 fn update_byte_position(&mut self) {
112 self.bytes_processed = self.reader.buffer_position();
113 }
114
115 pub fn parse_header(&mut self) -> Result<MessageHeader, ParseError> {
117 self.buffer.clear();
118
119 loop {
121 match self.reader.read_event_into(&mut self.buffer) {
122 Ok(Event::Start(ref e)) => {
123 self.current_depth += 1;
124
125 if self.current_depth > self.max_depth {
127 return Err(ParseError::DepthLimitExceeded {
128 depth: self.current_depth,
129 limit: self.max_depth,
130 });
131 }
132
133 if e.name().as_ref() == b"MessageHeader" {
134 return self.parse_message_header_element();
135 } else {
136 self.skip_element()?;
137 }
138 }
139 Ok(Event::End(_)) => {
140 self.current_depth = self.current_depth.saturating_sub(1);
141 }
142 Ok(Event::Eof) => {
143 return Err(ParseError::XmlError("No MessageHeader found".to_string()));
144 }
145 Err(e) => {
146 return Err(ParseError::XmlError(e.to_string()));
147 }
148 _ => {}
149 }
150 self.buffer.clear();
151 }
152 }
153
154 fn parse_message_header_element(&mut self) -> Result<MessageHeader, ParseError> {
155 use ddex_core::models::graph::{MessageRecipient, MessageSender, MessageType};
156
157 let mut message_id = String::new();
158 let message_type = MessageType::NewReleaseMessage;
159 let mut created_date_time = chrono::Utc::now();
160 let mut sender = MessageSender {
161 party_id: Vec::new(),
162 party_name: Vec::new(),
163 trading_name: None,
164 extensions: None,
165 attributes: None,
166 comments: None,
167 };
168 let mut recipient = MessageRecipient {
169 party_id: Vec::new(),
170 party_name: Vec::new(),
171 trading_name: None,
172 extensions: None,
173 attributes: None,
174 comments: None,
175 };
176
177 self.buffer.clear();
178 loop {
179 match self.reader.read_event_into(&mut self.buffer) {
180 Ok(Event::Start(ref e)) => match e.name().as_ref() {
181 b"MessageId" => {
182 message_id = self.read_text_element()?;
183 }
184 b"MessageCreatedDateTime" => {
185 let text = self.read_text_element()?;
186 created_date_time = chrono::DateTime::parse_from_rfc3339(&text)
187 .map(|dt| dt.with_timezone(&chrono::Utc))
188 .unwrap_or_else(|_| chrono::Utc::now());
189 }
190 b"MessageSender" => {
191 sender = self.parse_message_sender()?;
192 }
193 b"MessageRecipient" => {
194 recipient = self.parse_message_recipient()?;
195 }
196 _ => {
197 self.skip_element()?;
198 }
199 },
200 Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageHeader" => {
201 break;
202 }
203 Ok(Event::Eof) => {
204 return Err(ParseError::XmlError("Unexpected EOF in MessageHeader".to_string()));
205 }
206 Err(e) => {
207 return Err(ParseError::XmlError(format!("XML error at {}: {}",
208 self.get_current_location(), e)));
209 }
210 _ => {}
211 }
212 self.buffer.clear();
213 }
214
215 Ok(MessageHeader {
216 message_id,
217 message_type,
218 message_created_date_time: created_date_time,
219 message_sender: sender,
220 message_recipient: recipient,
221 message_control_type: None,
222 message_thread_id: None,
223 extensions: None,
224 attributes: None,
225 comments: None,
226 })
227 }
228
229 fn parse_message_sender(
230 &mut self,
231 ) -> Result<ddex_core::models::graph::MessageSender, ParseError> {
232 use ddex_core::models::common::{Identifier, LocalizedString};
233
234 let mut sender = ddex_core::models::graph::MessageSender {
235 party_id: Vec::new(),
236 party_name: Vec::new(),
237 trading_name: None,
238 extensions: None,
239 attributes: None,
240 comments: None,
241 };
242
243 self.buffer.clear();
244 loop {
245 match self.reader.read_event_into(&mut self.buffer) {
246 Ok(Event::Start(ref e)) => match e.name().as_ref() {
247 b"PartyId" => {
248 let value = self.read_text_element()?;
249 sender.party_id.push(Identifier {
250 id_type: ddex_core::models::common::IdentifierType::Proprietary,
251 namespace: None,
252 value,
253 });
254 }
255 b"PartyName" => {
256 let text = self.read_text_element()?;
257 sender.party_name.push(LocalizedString::new(text));
258 }
259 _ => {
260 self.skip_element()?;
261 }
262 },
263 Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageSender" => {
264 break;
265 }
266 _ => {}
267 }
268 self.buffer.clear();
269 }
270
271 Ok(sender)
272 }
273
274 fn parse_message_recipient(
275 &mut self,
276 ) -> Result<ddex_core::models::graph::MessageRecipient, ParseError> {
277 use ddex_core::models::common::{Identifier, LocalizedString};
279
280 let mut recipient = ddex_core::models::graph::MessageRecipient {
281 party_id: Vec::new(),
282 party_name: Vec::new(),
283 trading_name: None,
284 extensions: None,
285 attributes: None,
286 comments: None,
287 };
288
289 self.buffer.clear();
290 loop {
291 match self.reader.read_event_into(&mut self.buffer) {
292 Ok(Event::Start(ref e)) => match e.name().as_ref() {
293 b"PartyId" => {
294 let value = self.read_text_element()?;
295 recipient.party_id.push(Identifier {
296 id_type: ddex_core::models::common::IdentifierType::Proprietary,
297 namespace: None,
298 value,
299 });
300 }
301 b"PartyName" => {
302 let text = self.read_text_element()?;
303 recipient.party_name.push(LocalizedString::new(text));
304 }
305 _ => {
306 self.skip_element()?;
307 }
308 },
309 Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageRecipient" => {
310 break;
311 }
312 _ => {}
313 }
314 self.buffer.clear();
315 }
316
317 Ok(recipient)
318 }
319
320 pub fn stream_releases(&mut self) -> ReleaseIterator<'_, R> {
322 ReleaseIterator::new(self)
323 }
324
325 pub fn stream_resources(&mut self) -> ResourceIterator<'_, R> {
327 ResourceIterator::new(self)
328 }
329
330 pub fn stream_parties(&mut self) -> PartyIterator<'_, R> {
332 PartyIterator::new(self)
333 }
334
335 pub fn stream_deals(&mut self) -> DealIterator<'_, R> {
337 DealIterator::new(self)
338 }
339
340 fn read_text_element(&mut self) -> Result<String, ParseError> {
342 let mut text = String::new();
343 self.buffer.clear();
344
345 loop {
346 let event = self.reader.read_event_into(&mut self.buffer);
347 match event {
348 Ok(Event::Text(e)) => {
349 let current_pos = self.reader.buffer_position() as usize;
351 text = utf8_utils::handle_text_node(&e, current_pos)?;
352 }
353 Ok(Event::End(_)) => {
354 break;
355 }
356 Ok(Event::Eof) => {
357 let location = self.get_current_location();
358 return Err(ParseError::XmlError("Unexpected EOF".to_string()));
359 }
360 Err(e) => {
361 let location = self.get_current_location();
362 return Err(ParseError::XmlError(format!("XML error at {}: {}", location, e)));
363 }
364 _ => {}
365 }
366 self.buffer.clear();
367 }
368
369 Ok(text)
370 }
371
372 fn skip_element(&mut self) -> Result<(), ParseError> {
374 let mut local_depth = 1;
375 self.buffer.clear();
376
377 while local_depth > 0 {
378 match self.reader.read_event_into(&mut self.buffer) {
379 Ok(Event::Start(_)) => {
380 local_depth += 1;
381 self.current_depth += 1;
382
383 if self.current_depth > self.max_depth {
385 return Err(ParseError::DepthLimitExceeded {
386 depth: self.current_depth,
387 limit: self.max_depth,
388 });
389 }
390 }
391 Ok(Event::End(_)) => {
392 local_depth -= 1;
393 self.current_depth = self.current_depth.saturating_sub(1);
394 }
395 Ok(Event::Eof) => break,
396 Err(e) => {
397 return Err(ParseError::XmlError( e.to_string()));
398 }
399 _ => {}
400 }
401 self.buffer.clear();
402 }
403
404 Ok(())
405 }
406
407 fn get_current_location(&self) -> String {
408 format!("byte offset {} in /NewReleaseMessage", self.reader.buffer_position())
409 }
410}
411
412#[allow(dead_code)]
416pub struct ReleaseIterator<'a, R: BufRead> {
417 parser: &'a mut StreamingParser<R>,
418 done: bool,
419 in_release_list: bool,
420}
421
422impl<'a, R: BufRead> ReleaseIterator<'a, R> {
423 fn new(parser: &'a mut StreamingParser<R>) -> Self {
424 Self {
425 parser,
426 done: false,
427 in_release_list: false,
428 }
429 }
430
431 fn find_next_release(&mut self) -> Result<Option<Release>, ParseError> {
432 loop {
433 self.parser.buffer.clear();
434 match self.parser.reader.read_event_into(&mut self.parser.buffer) {
435 Ok(Event::Start(ref e)) => match e.name().as_ref() {
436 b"ReleaseList" => {
437 self.in_release_list = true;
438 }
439 b"Release" if self.in_release_list => {
440 return self.parse_release_element();
441 }
442 _ => {
443 self.parser.skip_element()?;
444 }
445 },
446 Ok(Event::End(ref e)) if e.name().as_ref() == b"ReleaseList" => {
447 self.done = true;
448 return Ok(None);
449 }
450 Ok(Event::Eof) => {
451 self.done = true;
452 return Ok(None);
453 }
454 Err(e) => {
455 return Err(ParseError::XmlError( e.to_string()));
456 }
457 _ => {}
458 }
459 }
460 }
461
462 fn parse_release_element(&mut self) -> Result<Option<Release>, ParseError> {
463 use ddex_core::models::common::LocalizedString;
464
465 let mut release = Release {
466 release_reference: String::new(),
467 release_id: Vec::new(),
468 release_title: Vec::new(),
469 release_subtitle: None,
470 release_type: None,
471 genre: Vec::new(),
472 release_resource_reference_list: Vec::new(),
473 display_artist: Vec::new(),
474 party_list: Vec::new(),
475 release_date: Vec::new(),
476 territory_code: Vec::new(),
477 excluded_territory_code: Vec::new(),
478 extensions: None,
479 attributes: None,
480 comments: None,
481 };
482
483 self.parser.buffer.clear();
484 loop {
485 match self.parser.reader.read_event_into(&mut self.parser.buffer) {
486 Ok(Event::Start(ref e)) => match e.name().as_ref() {
487 b"ReleaseReference" => {
488 release.release_reference = self.parser.read_text_element()?;
489 }
490 b"ReferenceTitle" | b"Title" => {
491 let text = self.parser.read_text_element()?;
492 release.release_title.push(LocalizedString::new(text));
493 }
494 _ => {
495 self.parser.skip_element()?;
496 }
497 },
498 Ok(Event::End(ref e)) if e.name().as_ref() == b"Release" => {
499 break;
500 }
501 _ => {}
502 }
503 self.parser.buffer.clear();
504 }
505
506 self.parser.releases_parsed += 1;
507 self.parser.update_byte_position();
508 self.parser.update_progress();
509
510 let estimated_size = std::mem::size_of::<Release>() * self.parser.releases_parsed;
512 if estimated_size > self.parser.max_memory {
513 return Err(ParseError::SecurityViolation {
514 message: format!(
515 "Memory limit exceeded: {} > {}",
516 estimated_size, self.parser.max_memory
517 ),
518 });
519 }
520
521 if self.parser.releases_parsed % self.parser.chunk_size == 0 {
523 std::thread::yield_now();
524 }
525
526 Ok(Some(release))
527 }
528}
529
530impl<'a, R: BufRead> Iterator for ReleaseIterator<'a, R> {
531 type Item = Result<Release, ParseError>;
532
533 fn next(&mut self) -> Option<Self::Item> {
534 if self.done {
535 return None;
536 }
537
538 match self.find_next_release() {
539 Ok(Some(release)) => Some(Ok(release)),
540 Ok(None) => None,
541 Err(e) => Some(Err(e)),
542 }
543 }
544}
545
546pub struct ResourceIterator<'a, R: BufRead> {
548 _parser: &'a mut StreamingParser<R>,
549 _done: bool,
550 _in_resource_list: bool,
551}
552
553impl<'a, R: BufRead> ResourceIterator<'a, R> {
554 fn new(parser: &'a mut StreamingParser<R>) -> Self {
555 Self {
556 _parser: parser,
557 _done: false,
558 _in_resource_list: false,
559 }
560 }
561}
562
563impl<'a, R: BufRead> Iterator for ResourceIterator<'a, R> {
564 type Item = Result<Resource, ParseError>;
565
566 fn next(&mut self) -> Option<Self::Item> {
567 None }
570}
571
572pub struct PartyIterator<'a, R: BufRead> {
573 _parser: &'a mut StreamingParser<R>,
574 _done: bool,
575}
576
577impl<'a, R: BufRead> PartyIterator<'a, R> {
578 fn new(parser: &'a mut StreamingParser<R>) -> Self {
579 Self {
580 _parser: parser,
581 _done: false,
582 }
583 }
584}
585
586impl<'a, R: BufRead> Iterator for PartyIterator<'a, R> {
587 type Item = Result<Party, ParseError>;
588
589 fn next(&mut self) -> Option<Self::Item> {
590 None }
592}
593
594pub struct DealIterator<'a, R: BufRead> {
595 _parser: &'a mut StreamingParser<R>,
596 _done: bool,
597}
598
599impl<'a, R: BufRead> DealIterator<'a, R> {
600 fn new(parser: &'a mut StreamingParser<R>) -> Self {
601 Self {
602 _parser: parser,
603 _done: false,
604 }
605 }
606}
607
608impl<'a, R: BufRead> Iterator for DealIterator<'a, R> {
609 type Item = Result<Deal, ParseError>;
610
611 fn next(&mut self) -> Option<Self::Item> {
612 None }
614}
615
616pub fn parse_streaming<R: BufRead>(
618 reader: R,
619 version: ERNVersion,
620 options: ParseOptions,
621 security_config: &crate::parser::security::SecurityConfig,
622) -> Result<ParsedERNMessage, ParseError> {
623 let mut parser = StreamingParser::new_with_security_config(reader, version, security_config)
624 .with_chunk_size(options.chunk_size)
625 .with_max_memory(options.max_memory);
626
627 let message_header = parser.parse_header()?;
629
630 let mut releases = Vec::new();
632 let mut resources = Vec::new();
633 let mut parties = Vec::new();
634 let mut deals = Vec::new();
635
636 for release_result in parser.stream_releases() {
638 let release = release_result?;
639 releases.push(release);
640 }
641
642 for resource_result in parser.stream_resources() {
644 let resource = resource_result?;
645 resources.push(resource);
646 }
647
648 for party_result in parser.stream_parties() {
650 let party = party_result?;
651 parties.push(party);
652 }
653
654 for deal_result in parser.stream_deals() {
656 let deal = deal_result?;
657 deals.push(deal);
658 }
659
660 let graph = ERNMessage {
662 message_header,
663 parties,
664 resources,
665 releases,
666 deals,
667 version,
668 profile: None,
669 message_audit_trail: None,
670 extensions: None,
671 legacy_extensions: None,
672 comments: None,
673 attributes: None,
674 };
675
676 let flat = Flattener::flatten(graph.clone());
678
679 Ok(ParsedERNMessage {
680 graph,
681 flat: flat?,
682 extensions: None,
683 })
684}