1use crate::error::{ParseError, ErrorLocation};
5use ddex_core::models::graph::{ERNMessage, Release, Resource, Deal, Party, MessageHeader};
6use ddex_core::models::flat::ParsedERNMessage;
7use ddex_core::models::versions::ERNVersion;
8use crate::parser::ParseOptions;
9use crate::transform::flatten::Flattener;
10use quick_xml::events::Event;
11use quick_xml::Reader;
12use std::io::BufRead;
13use std::time::{Duration, Instant};
14
15#[derive(Debug, Clone)]
17pub struct ParseProgress {
18 pub bytes_processed: u64,
19 pub releases_parsed: usize,
20 pub resources_parsed: usize,
21 pub elapsed: Duration,
22 pub estimated_total_bytes: Option<u64>,
23}
24
25pub struct StreamingParser<R: BufRead> {
27 reader: Reader<R>,
28 version: ERNVersion,
29 progress_callback: Option<Box<dyn FnMut(ParseProgress) + Send>>,
30 start_time: Instant,
31 bytes_processed: u64,
32 releases_parsed: usize,
33 resources_parsed: usize,
34 chunk_size: usize,
35 max_memory: usize,
36 buffer: Vec<u8>,
37}
38
39impl<R: BufRead> StreamingParser<R> {
40 pub fn new(reader: R, version: ERNVersion) -> Self {
41 let mut xml_reader = Reader::from_reader(reader);
42 xml_reader.config_mut().trim_text(true);
43 xml_reader.config_mut().check_end_names = true;
44 xml_reader.config_mut().expand_empty_elements = false;
45
46 Self {
47 reader: xml_reader,
48 version,
49 progress_callback: None,
50 start_time: Instant::now(),
51 bytes_processed: 0,
52 releases_parsed: 0,
53 resources_parsed: 0,
54 chunk_size: 100,
55 max_memory: 100 * 1024 * 1024, buffer: Vec::with_capacity(8192),
57 }
58 }
59
60 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
61 where
62 F: FnMut(ParseProgress) + Send + 'static
63 {
64 self.progress_callback = Some(Box::new(callback));
65 self
66 }
67
68 pub fn with_chunk_size(mut self, size: usize) -> Self {
69 self.chunk_size = size;
70 self
71 }
72
73 pub fn with_max_memory(mut self, max: usize) -> Self {
74 self.max_memory = max;
75 self
76 }
77
78 fn update_progress(&mut self) {
79 if let Some(ref mut callback) = self.progress_callback {
80 let progress = ParseProgress {
81 bytes_processed: self.bytes_processed,
82 releases_parsed: self.releases_parsed,
83 resources_parsed: self.resources_parsed,
84 elapsed: self.start_time.elapsed(),
85 estimated_total_bytes: None,
86 };
87 callback(progress);
88 }
89 }
90
91 fn update_byte_position(&mut self) {
92 self.bytes_processed = self.reader.buffer_position() as u64;
93 }
94
95 pub fn parse_header(&mut self) -> Result<MessageHeader, ParseError> {
97 self.buffer.clear();
98
99 loop {
101 match self.reader.read_event_into(&mut self.buffer) {
102 Ok(Event::Start(ref e)) if e.name().as_ref() == b"MessageHeader" => {
103 return self.parse_message_header_element();
104 }
105 Ok(Event::Eof) => {
106 return Err(ParseError::XmlError {
107 message: "No MessageHeader found".to_string(),
108 location: ErrorLocation {
109 line: 0,
110 column: 0,
111 byte_offset: Some(self.reader.buffer_position() as usize),
112 path: "/".to_string(),
113 },
114 });
115 }
116 Err(e) => {
117 return Err(ParseError::XmlError {
118 message: e.to_string(),
119 location: self.get_current_location(),
120 });
121 }
122 _ => {}
123 }
124 self.buffer.clear();
125 }
126 }
127
128 fn parse_message_header_element(&mut self) -> Result<MessageHeader, ParseError> {
129 use ddex_core::models::graph::{MessageType, MessageSender, MessageRecipient};
130
131 let mut message_id = String::new();
132 let message_type = MessageType::NewReleaseMessage;
133 let mut created_date_time = chrono::Utc::now();
134 let mut sender = MessageSender {
135 party_id: Vec::new(),
136 party_name: Vec::new(),
137 trading_name: None,
138 extensions: None,
139 attributes: None,
140 comments: None,
141 };
142 let mut recipient = MessageRecipient {
143 party_id: Vec::new(),
144 party_name: Vec::new(),
145 trading_name: None,
146 extensions: None,
147 attributes: None,
148 comments: None,
149 };
150
151 self.buffer.clear();
152 loop {
153 match self.reader.read_event_into(&mut self.buffer) {
154 Ok(Event::Start(ref e)) => {
155 match e.name().as_ref() {
156 b"MessageId" => {
157 message_id = self.read_text_element()?;
158 }
159 b"MessageCreatedDateTime" => {
160 let text = self.read_text_element()?;
161 created_date_time = chrono::DateTime::parse_from_rfc3339(&text)
162 .map(|dt| dt.with_timezone(&chrono::Utc))
163 .unwrap_or_else(|_| chrono::Utc::now());
164 }
165 b"MessageSender" => {
166 sender = self.parse_message_sender()?;
167 }
168 b"MessageRecipient" => {
169 recipient = self.parse_message_recipient()?;
170 }
171 _ => {
172 self.skip_element()?;
173 }
174 }
175 }
176 Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageHeader" => {
177 break;
178 }
179 Ok(Event::Eof) => {
180 return Err(ParseError::XmlError {
181 message: "Unexpected EOF in MessageHeader".to_string(),
182 location: self.get_current_location(),
183 });
184 }
185 Err(e) => {
186 return Err(ParseError::XmlError {
187 message: e.to_string(),
188 location: self.get_current_location(),
189 });
190 }
191 _ => {}
192 }
193 self.buffer.clear();
194 }
195
196 Ok(MessageHeader {
197 message_id,
198 message_type,
199 message_created_date_time: created_date_time,
200 message_sender: sender,
201 message_recipient: recipient,
202 message_control_type: None,
203 message_thread_id: None,
204 extensions: None,
205 attributes: None,
206 comments: None,
207 })
208 }
209
210 fn parse_message_sender(&mut self) -> Result<ddex_core::models::graph::MessageSender, ParseError> {
211 use ddex_core::models::common::{Identifier, LocalizedString};
212
213 let mut sender = ddex_core::models::graph::MessageSender {
214 party_id: Vec::new(),
215 party_name: Vec::new(),
216 trading_name: None,
217 extensions: None,
218 attributes: None,
219 comments: None,
220 };
221
222 self.buffer.clear();
223 loop {
224 match self.reader.read_event_into(&mut self.buffer) {
225 Ok(Event::Start(ref e)) => {
226 match e.name().as_ref() {
227 b"PartyId" => {
228 let value = self.read_text_element()?;
229 sender.party_id.push(Identifier {
230 id_type: ddex_core::models::common::IdentifierType::Proprietary,
231 namespace: None,
232 value,
233 });
234 }
235 b"PartyName" => {
236 let text = self.read_text_element()?;
237 sender.party_name.push(LocalizedString::new(text));
238 }
239 _ => {
240 self.skip_element()?;
241 }
242 }
243 }
244 Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageSender" => {
245 break;
246 }
247 _ => {}
248 }
249 self.buffer.clear();
250 }
251
252 Ok(sender)
253 }
254
255 fn parse_message_recipient(&mut self) -> Result<ddex_core::models::graph::MessageRecipient, ParseError> {
256 use ddex_core::models::common::{Identifier, LocalizedString};
258
259 let mut recipient = ddex_core::models::graph::MessageRecipient {
260 party_id: Vec::new(),
261 party_name: Vec::new(),
262 trading_name: None,
263 extensions: None,
264 attributes: None,
265 comments: None,
266 };
267
268 self.buffer.clear();
269 loop {
270 match self.reader.read_event_into(&mut self.buffer) {
271 Ok(Event::Start(ref e)) => {
272 match e.name().as_ref() {
273 b"PartyId" => {
274 let value = self.read_text_element()?;
275 recipient.party_id.push(Identifier {
276 id_type: ddex_core::models::common::IdentifierType::Proprietary,
277 namespace: None,
278 value,
279 });
280 }
281 b"PartyName" => {
282 let text = self.read_text_element()?;
283 recipient.party_name.push(LocalizedString::new(text));
284 }
285 _ => {
286 self.skip_element()?;
287 }
288 }
289 }
290 Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageRecipient" => {
291 break;
292 }
293 _ => {}
294 }
295 self.buffer.clear();
296 }
297
298 Ok(recipient)
299 }
300
301 pub fn stream_releases(&mut self) -> ReleaseIterator<'_, R> {
303 ReleaseIterator::new(self)
304 }
305
306 pub fn stream_resources(&mut self) -> ResourceIterator<'_, R> {
308 ResourceIterator::new(self)
309 }
310
311 pub fn stream_parties(&mut self) -> PartyIterator<'_, R> {
313 PartyIterator::new(self)
314 }
315
316 pub fn stream_deals(&mut self) -> DealIterator<'_, R> {
318 DealIterator::new(self)
319 }
320
321 fn read_text_element(&mut self) -> Result<String, ParseError> {
323 let mut text = String::new();
324 self.buffer.clear();
325
326 loop {
327 let event = self.reader.read_event_into(&mut self.buffer);
328 match event {
329 Ok(Event::Text(e)) => {
330 match e.unescape() {
332 Ok(unescaped) => {
333 text = unescaped.to_string();
334 }
335 Err(err) => {
336 let location = self.get_current_location();
338 return Err(ParseError::XmlError {
339 message: err.to_string(),
340 location,
341 });
342 }
343 }
344 }
345 Ok(Event::End(_)) => {
346 break;
347 }
348 Ok(Event::Eof) => {
349 let location = self.get_current_location();
350 return Err(ParseError::XmlError {
351 message: "Unexpected EOF".to_string(),
352 location,
353 });
354 }
355 Err(e) => {
356 let location = self.get_current_location();
357 return Err(ParseError::XmlError {
358 message: e.to_string(),
359 location,
360 });
361 }
362 _ => {}
363 }
364 self.buffer.clear();
365 }
366
367 Ok(text)
368 }
369
370 fn skip_element(&mut self) -> Result<(), ParseError> {
372 let mut depth = 1;
373 self.buffer.clear();
374
375 while depth > 0 {
376 match self.reader.read_event_into(&mut self.buffer) {
377 Ok(Event::Start(_)) => depth += 1,
378 Ok(Event::End(_)) => depth -= 1,
379 Ok(Event::Eof) => break,
380 Err(e) => {
381 return Err(ParseError::XmlError {
382 message: e.to_string(),
383 location: self.get_current_location(),
384 });
385 }
386 _ => {}
387 }
388 self.buffer.clear();
389 }
390
391 Ok(())
392 }
393
394 fn get_current_location(&self) -> ErrorLocation {
395 ErrorLocation {
396 line: 0, column: 0,
398 byte_offset: Some(self.reader.buffer_position() as usize),
399 path: "/NewReleaseMessage".to_string(),
400 }
401 }
402}
403
404pub struct ReleaseIterator<'a, R: BufRead> {
406 parser: &'a mut StreamingParser<R>,
407 done: bool,
408 in_release_list: bool,
409}
410
411impl<'a, R: BufRead> ReleaseIterator<'a, R> {
412 fn new(parser: &'a mut StreamingParser<R>) -> Self {
413 Self {
414 parser,
415 done: false,
416 in_release_list: false,
417 }
418 }
419
420 fn find_next_release(&mut self) -> Result<Option<Release>, ParseError> {
421 loop {
422 self.parser.buffer.clear();
423 match self.parser.reader.read_event_into(&mut self.parser.buffer) {
424 Ok(Event::Start(ref e)) => {
425 match e.name().as_ref() {
426 b"ReleaseList" => {
427 self.in_release_list = true;
428 }
429 b"Release" if self.in_release_list => {
430 return self.parse_release_element();
431 }
432 _ => {
433 self.parser.skip_element()?;
434 }
435 }
436 }
437 Ok(Event::End(ref e)) if e.name().as_ref() == b"ReleaseList" => {
438 self.done = true;
439 return Ok(None);
440 }
441 Ok(Event::Eof) => {
442 self.done = true;
443 return Ok(None);
444 }
445 Err(e) => {
446 return Err(ParseError::XmlError {
447 message: e.to_string(),
448 location: self.parser.get_current_location(),
449 });
450 }
451 _ => {}
452 }
453 }
454 }
455
456 fn parse_release_element(&mut self) -> Result<Option<Release>, ParseError> {
457 use ddex_core::models::common::LocalizedString;
458
459 let mut release = Release {
460 release_reference: String::new(),
461 release_id: Vec::new(),
462 release_title: Vec::new(),
463 release_subtitle: None,
464 release_type: None,
465 genre: Vec::new(),
466 release_resource_reference_list: Vec::new(),
467 display_artist: Vec::new(),
468 party_list: Vec::new(),
469 release_date: Vec::new(),
470 territory_code: Vec::new(),
471 excluded_territory_code: Vec::new(),
472 extensions: None,
473 attributes: None,
474 comments: None,
475 };
476
477 self.parser.buffer.clear();
478 loop {
479 match self.parser.reader.read_event_into(&mut self.parser.buffer) {
480 Ok(Event::Start(ref e)) => {
481 match e.name().as_ref() {
482 b"ReleaseReference" => {
483 release.release_reference = self.parser.read_text_element()?;
484 }
485 b"ReferenceTitle" | b"Title" => {
486 let text = self.parser.read_text_element()?;
487 release.release_title.push(LocalizedString::new(text));
488 }
489 _ => {
490 self.parser.skip_element()?;
491 }
492 }
493 }
494 Ok(Event::End(ref e)) if e.name().as_ref() == b"Release" => {
495 break;
496 }
497 _ => {}
498 }
499 self.parser.buffer.clear();
500 }
501
502 self.parser.releases_parsed += 1;
503 self.parser.update_byte_position();
504 self.parser.update_progress();
505
506 let estimated_size = std::mem::size_of::<Release>() * self.parser.releases_parsed;
508 if estimated_size > self.parser.max_memory {
509 return Err(ParseError::SecurityViolation {
510 message: format!("Memory limit exceeded: {} > {}", estimated_size, self.parser.max_memory),
511 });
512 }
513
514 if self.parser.releases_parsed % self.parser.chunk_size == 0 {
516 std::thread::yield_now();
517 }
518
519 Ok(Some(release))
520 }
521}
522
523impl<'a, R: BufRead> Iterator for ReleaseIterator<'a, R> {
524 type Item = Result<Release, ParseError>;
525
526 fn next(&mut self) -> Option<Self::Item> {
527 if self.done {
528 return None;
529 }
530
531 match self.find_next_release() {
532 Ok(Some(release)) => Some(Ok(release)),
533 Ok(None) => None,
534 Err(e) => Some(Err(e)),
535 }
536 }
537}
538
539pub struct ResourceIterator<'a, R: BufRead> {
541 parser: &'a mut StreamingParser<R>,
542 done: bool,
543 in_resource_list: bool,
544}
545
546impl<'a, R: BufRead> ResourceIterator<'a, R> {
547 fn new(parser: &'a mut StreamingParser<R>) -> Self {
548 Self {
549 parser,
550 done: false,
551 in_resource_list: false,
552 }
553 }
554}
555
556impl<'a, R: BufRead> Iterator for ResourceIterator<'a, R> {
557 type Item = Result<Resource, ParseError>;
558
559 fn next(&mut self) -> Option<Self::Item> {
560 None }
563}
564
565pub struct PartyIterator<'a, R: BufRead> {
566 parser: &'a mut StreamingParser<R>,
567 done: bool,
568}
569
570impl<'a, R: BufRead> PartyIterator<'a, R> {
571 fn new(parser: &'a mut StreamingParser<R>) -> Self {
572 Self {
573 parser,
574 done: false,
575 }
576 }
577}
578
579impl<'a, R: BufRead> Iterator for PartyIterator<'a, R> {
580 type Item = Result<Party, ParseError>;
581
582 fn next(&mut self) -> Option<Self::Item> {
583 None }
585}
586
587pub struct DealIterator<'a, R: BufRead> {
588 parser: &'a mut StreamingParser<R>,
589 done: bool,
590}
591
592impl<'a, R: BufRead> DealIterator<'a, R> {
593 fn new(parser: &'a mut StreamingParser<R>) -> Self {
594 Self {
595 parser,
596 done: false,
597 }
598 }
599}
600
601impl<'a, R: BufRead> Iterator for DealIterator<'a, R> {
602 type Item = Result<Deal, ParseError>;
603
604 fn next(&mut self) -> Option<Self::Item> {
605 None }
607}
608
609pub fn parse_streaming<R: BufRead>(
611 reader: R,
612 version: ERNVersion,
613 options: ParseOptions,
614) -> Result<ParsedERNMessage, ParseError> {
615 let mut parser = StreamingParser::new(reader, version)
616 .with_chunk_size(options.chunk_size)
617 .with_max_memory(options.max_memory);
618
619 let message_header = parser.parse_header()?;
621
622 let mut releases = Vec::new();
624 let mut resources = Vec::new();
625 let mut parties = Vec::new();
626 let mut deals = Vec::new();
627
628 for release_result in parser.stream_releases() {
630 let release = release_result?;
631 releases.push(release);
632 }
633
634 for resource_result in parser.stream_resources() {
636 let resource = resource_result?;
637 resources.push(resource);
638 }
639
640 for party_result in parser.stream_parties() {
642 let party = party_result?;
643 parties.push(party);
644 }
645
646 for deal_result in parser.stream_deals() {
648 let deal = deal_result?;
649 deals.push(deal);
650 }
651
652 let graph = ERNMessage {
654 message_header,
655 parties,
656 resources,
657 releases,
658 deals,
659 version,
660 profile: None,
661 message_audit_trail: None,
662 extensions: None,
663 legacy_extensions: None,
664 comments: None,
665 attributes: None,
666 };
667
668 let flat = Flattener::flatten(graph.clone());
670
671 Ok(ParsedERNMessage { graph, flat, extensions: None })
672}