1use crate::error::{ParseError, ErrorLocation};
5use ddex_core::models::graph::{ERNMessage, Release, Resource, Deal, Party, MessageHeader};
6use ddex_core::models::flat::ParsedERNMessage;
7use ddex_core::models::versions::ERNVersion;
8use crate::parser::ParseOptions;
9use crate::transform::flatten::Flattener;
10use quick_xml::events::Event;
11use quick_xml::Reader;
12use std::io::BufRead;
13use std::time::{Duration, Instant};
14
15#[derive(Debug, Clone)]
17pub struct ParseProgress {
18 pub bytes_processed: u64,
19 pub releases_parsed: usize,
20 pub resources_parsed: usize,
21 pub elapsed: Duration,
22 pub estimated_total_bytes: Option<u64>,
23}
24
25pub struct StreamingParser<R: BufRead> {
27 reader: Reader<R>,
28 version: ERNVersion,
29 progress_callback: Option<Box<dyn FnMut(ParseProgress) + Send>>,
30 start_time: Instant,
31 bytes_processed: u64,
32 releases_parsed: usize,
33 resources_parsed: usize,
34 chunk_size: usize,
35 max_memory: usize,
36 buffer: Vec<u8>,
37}
38
39impl<R: BufRead> StreamingParser<R> {
40 pub fn new(reader: R, version: ERNVersion) -> Self {
41 let mut xml_reader = Reader::from_reader(reader);
42 xml_reader.config_mut().trim_text(true);
43 xml_reader.config_mut().check_end_names = true;
44 xml_reader.config_mut().expand_empty_elements = false;
45
46 Self {
47 reader: xml_reader,
48 version,
49 progress_callback: None,
50 start_time: Instant::now(),
51 bytes_processed: 0,
52 releases_parsed: 0,
53 resources_parsed: 0,
54 chunk_size: 100,
55 max_memory: 100 * 1024 * 1024, buffer: Vec::with_capacity(8192),
57 }
58 }
59
60 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
61 where
62 F: FnMut(ParseProgress) + Send + 'static
63 {
64 self.progress_callback = Some(Box::new(callback));
65 self
66 }
67
68 pub fn with_chunk_size(mut self, size: usize) -> Self {
69 self.chunk_size = size;
70 self
71 }
72
73 pub fn with_max_memory(mut self, max: usize) -> Self {
74 self.max_memory = max;
75 self
76 }
77
78 fn update_progress(&mut self) {
79 if let Some(ref mut callback) = self.progress_callback {
80 let progress = ParseProgress {
81 bytes_processed: self.bytes_processed,
82 releases_parsed: self.releases_parsed,
83 resources_parsed: self.resources_parsed,
84 elapsed: self.start_time.elapsed(),
85 estimated_total_bytes: None,
86 };
87 callback(progress);
88 }
89 }
90
91 fn update_byte_position(&mut self) {
92 self.bytes_processed = self.reader.buffer_position() as u64;
93 }
94
95 pub fn parse_header(&mut self) -> Result<MessageHeader, ParseError> {
97 self.buffer.clear();
98
99 loop {
101 match self.reader.read_event_into(&mut self.buffer) {
102 Ok(Event::Start(ref e)) if e.name().as_ref() == b"MessageHeader" => {
103 return self.parse_message_header_element();
104 }
105 Ok(Event::Eof) => {
106 return Err(ParseError::XmlError {
107 message: "No MessageHeader found".to_string(),
108 location: ErrorLocation {
109 line: 0,
110 column: 0,
111 byte_offset: Some(self.reader.buffer_position() as usize),
112 path: "/".to_string(),
113 },
114 });
115 }
116 Err(e) => {
117 return Err(ParseError::XmlError {
118 message: e.to_string(),
119 location: self.get_current_location(),
120 });
121 }
122 _ => {}
123 }
124 self.buffer.clear();
125 }
126 }
127
128 fn parse_message_header_element(&mut self) -> Result<MessageHeader, ParseError> {
129 use ddex_core::models::graph::{MessageType, MessageSender, MessageRecipient};
130
131 let mut message_id = String::new();
132 let message_type = MessageType::NewReleaseMessage;
133 let mut created_date_time = chrono::Utc::now();
134 let mut sender = MessageSender {
135 party_id: Vec::new(),
136 party_name: Vec::new(),
137 trading_name: None,
138 };
139 let mut recipient = MessageRecipient {
140 party_id: Vec::new(),
141 party_name: Vec::new(),
142 trading_name: None,
143 };
144
145 self.buffer.clear();
146 loop {
147 match self.reader.read_event_into(&mut self.buffer) {
148 Ok(Event::Start(ref e)) => {
149 match e.name().as_ref() {
150 b"MessageId" => {
151 message_id = self.read_text_element()?;
152 }
153 b"MessageCreatedDateTime" => {
154 let text = self.read_text_element()?;
155 created_date_time = chrono::DateTime::parse_from_rfc3339(&text)
156 .map(|dt| dt.with_timezone(&chrono::Utc))
157 .unwrap_or_else(|_| chrono::Utc::now());
158 }
159 b"MessageSender" => {
160 sender = self.parse_message_sender()?;
161 }
162 b"MessageRecipient" => {
163 recipient = self.parse_message_recipient()?;
164 }
165 _ => {
166 self.skip_element()?;
167 }
168 }
169 }
170 Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageHeader" => {
171 break;
172 }
173 Ok(Event::Eof) => {
174 return Err(ParseError::XmlError {
175 message: "Unexpected EOF in MessageHeader".to_string(),
176 location: self.get_current_location(),
177 });
178 }
179 Err(e) => {
180 return Err(ParseError::XmlError {
181 message: e.to_string(),
182 location: self.get_current_location(),
183 });
184 }
185 _ => {}
186 }
187 self.buffer.clear();
188 }
189
190 Ok(MessageHeader {
191 message_id,
192 message_type,
193 message_created_date_time: created_date_time,
194 message_sender: sender,
195 message_recipient: recipient,
196 message_control_type: None,
197 message_thread_id: None,
198 })
199 }
200
201 fn parse_message_sender(&mut self) -> Result<ddex_core::models::graph::MessageSender, ParseError> {
202 use ddex_core::models::common::{Identifier, LocalizedString};
203
204 let mut sender = ddex_core::models::graph::MessageSender {
205 party_id: Vec::new(),
206 party_name: Vec::new(),
207 trading_name: None,
208 };
209
210 self.buffer.clear();
211 loop {
212 match self.reader.read_event_into(&mut self.buffer) {
213 Ok(Event::Start(ref e)) => {
214 match e.name().as_ref() {
215 b"PartyId" => {
216 let value = self.read_text_element()?;
217 sender.party_id.push(Identifier {
218 id_type: ddex_core::models::common::IdentifierType::Proprietary,
219 namespace: None,
220 value,
221 });
222 }
223 b"PartyName" => {
224 let text = self.read_text_element()?;
225 sender.party_name.push(LocalizedString::new(text));
226 }
227 _ => {
228 self.skip_element()?;
229 }
230 }
231 }
232 Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageSender" => {
233 break;
234 }
235 _ => {}
236 }
237 self.buffer.clear();
238 }
239
240 Ok(sender)
241 }
242
243 fn parse_message_recipient(&mut self) -> Result<ddex_core::models::graph::MessageRecipient, ParseError> {
244 use ddex_core::models::common::{Identifier, LocalizedString};
246
247 let mut recipient = ddex_core::models::graph::MessageRecipient {
248 party_id: Vec::new(),
249 party_name: Vec::new(),
250 trading_name: None,
251 };
252
253 self.buffer.clear();
254 loop {
255 match self.reader.read_event_into(&mut self.buffer) {
256 Ok(Event::Start(ref e)) => {
257 match e.name().as_ref() {
258 b"PartyId" => {
259 let value = self.read_text_element()?;
260 recipient.party_id.push(Identifier {
261 id_type: ddex_core::models::common::IdentifierType::Proprietary,
262 namespace: None,
263 value,
264 });
265 }
266 b"PartyName" => {
267 let text = self.read_text_element()?;
268 recipient.party_name.push(LocalizedString::new(text));
269 }
270 _ => {
271 self.skip_element()?;
272 }
273 }
274 }
275 Ok(Event::End(ref e)) if e.name().as_ref() == b"MessageRecipient" => {
276 break;
277 }
278 _ => {}
279 }
280 self.buffer.clear();
281 }
282
283 Ok(recipient)
284 }
285
286 pub fn stream_releases(&mut self) -> ReleaseIterator<'_, R> {
288 ReleaseIterator::new(self)
289 }
290
291 pub fn stream_resources(&mut self) -> ResourceIterator<'_, R> {
293 ResourceIterator::new(self)
294 }
295
296 pub fn stream_parties(&mut self) -> PartyIterator<'_, R> {
298 PartyIterator::new(self)
299 }
300
301 pub fn stream_deals(&mut self) -> DealIterator<'_, R> {
303 DealIterator::new(self)
304 }
305
306 fn read_text_element(&mut self) -> Result<String, ParseError> {
308 let mut text = String::new();
309 self.buffer.clear();
310
311 loop {
312 let event = self.reader.read_event_into(&mut self.buffer);
313 match event {
314 Ok(Event::Text(e)) => {
315 match e.unescape() {
317 Ok(unescaped) => {
318 text = unescaped.to_string();
319 }
320 Err(err) => {
321 let location = self.get_current_location();
323 return Err(ParseError::XmlError {
324 message: err.to_string(),
325 location,
326 });
327 }
328 }
329 }
330 Ok(Event::End(_)) => {
331 break;
332 }
333 Ok(Event::Eof) => {
334 let location = self.get_current_location();
335 return Err(ParseError::XmlError {
336 message: "Unexpected EOF".to_string(),
337 location,
338 });
339 }
340 Err(e) => {
341 let location = self.get_current_location();
342 return Err(ParseError::XmlError {
343 message: e.to_string(),
344 location,
345 });
346 }
347 _ => {}
348 }
349 self.buffer.clear();
350 }
351
352 Ok(text)
353 }
354
355 fn skip_element(&mut self) -> Result<(), ParseError> {
357 let mut depth = 1;
358 self.buffer.clear();
359
360 while depth > 0 {
361 match self.reader.read_event_into(&mut self.buffer) {
362 Ok(Event::Start(_)) => depth += 1,
363 Ok(Event::End(_)) => depth -= 1,
364 Ok(Event::Eof) => break,
365 Err(e) => {
366 return Err(ParseError::XmlError {
367 message: e.to_string(),
368 location: self.get_current_location(),
369 });
370 }
371 _ => {}
372 }
373 self.buffer.clear();
374 }
375
376 Ok(())
377 }
378
379 fn get_current_location(&self) -> ErrorLocation {
380 ErrorLocation {
381 line: 0, column: 0,
383 byte_offset: Some(self.reader.buffer_position() as usize),
384 path: "/NewReleaseMessage".to_string(),
385 }
386 }
387}
388
389pub struct ReleaseIterator<'a, R: BufRead> {
391 parser: &'a mut StreamingParser<R>,
392 done: bool,
393 in_release_list: bool,
394}
395
396impl<'a, R: BufRead> ReleaseIterator<'a, R> {
397 fn new(parser: &'a mut StreamingParser<R>) -> Self {
398 Self {
399 parser,
400 done: false,
401 in_release_list: false,
402 }
403 }
404
405 fn find_next_release(&mut self) -> Result<Option<Release>, ParseError> {
406 loop {
407 self.parser.buffer.clear();
408 match self.parser.reader.read_event_into(&mut self.parser.buffer) {
409 Ok(Event::Start(ref e)) => {
410 match e.name().as_ref() {
411 b"ReleaseList" => {
412 self.in_release_list = true;
413 }
414 b"Release" if self.in_release_list => {
415 return self.parse_release_element();
416 }
417 _ => {
418 self.parser.skip_element()?;
419 }
420 }
421 }
422 Ok(Event::End(ref e)) if e.name().as_ref() == b"ReleaseList" => {
423 self.done = true;
424 return Ok(None);
425 }
426 Ok(Event::Eof) => {
427 self.done = true;
428 return Ok(None);
429 }
430 Err(e) => {
431 return Err(ParseError::XmlError {
432 message: e.to_string(),
433 location: self.parser.get_current_location(),
434 });
435 }
436 _ => {}
437 }
438 }
439 }
440
441 fn parse_release_element(&mut self) -> Result<Option<Release>, ParseError> {
442 use ddex_core::models::common::LocalizedString;
443
444 let mut release = Release {
445 release_reference: String::new(),
446 release_id: Vec::new(),
447 release_title: Vec::new(),
448 release_subtitle: None,
449 release_type: None,
450 genre: Vec::new(),
451 release_resource_reference_list: Vec::new(),
452 display_artist: Vec::new(),
453 party_list: Vec::new(),
454 release_date: Vec::new(),
455 territory_code: Vec::new(),
456 excluded_territory_code: Vec::new(),
457 };
458
459 self.parser.buffer.clear();
460 loop {
461 match self.parser.reader.read_event_into(&mut self.parser.buffer) {
462 Ok(Event::Start(ref e)) => {
463 match e.name().as_ref() {
464 b"ReleaseReference" => {
465 release.release_reference = self.parser.read_text_element()?;
466 }
467 b"ReferenceTitle" | b"Title" => {
468 let text = self.parser.read_text_element()?;
469 release.release_title.push(LocalizedString::new(text));
470 }
471 _ => {
472 self.parser.skip_element()?;
473 }
474 }
475 }
476 Ok(Event::End(ref e)) if e.name().as_ref() == b"Release" => {
477 break;
478 }
479 _ => {}
480 }
481 self.parser.buffer.clear();
482 }
483
484 self.parser.releases_parsed += 1;
485 self.parser.update_byte_position();
486 self.parser.update_progress();
487
488 let estimated_size = std::mem::size_of::<Release>() * self.parser.releases_parsed;
490 if estimated_size > self.parser.max_memory {
491 return Err(ParseError::SecurityViolation {
492 message: format!("Memory limit exceeded: {} > {}", estimated_size, self.parser.max_memory),
493 });
494 }
495
496 if self.parser.releases_parsed % self.parser.chunk_size == 0 {
498 std::thread::yield_now();
499 }
500
501 Ok(Some(release))
502 }
503}
504
505impl<'a, R: BufRead> Iterator for ReleaseIterator<'a, R> {
506 type Item = Result<Release, ParseError>;
507
508 fn next(&mut self) -> Option<Self::Item> {
509 if self.done {
510 return None;
511 }
512
513 match self.find_next_release() {
514 Ok(Some(release)) => Some(Ok(release)),
515 Ok(None) => None,
516 Err(e) => Some(Err(e)),
517 }
518 }
519}
520
521pub struct ResourceIterator<'a, R: BufRead> {
523 parser: &'a mut StreamingParser<R>,
524 done: bool,
525 in_resource_list: bool,
526}
527
528impl<'a, R: BufRead> ResourceIterator<'a, R> {
529 fn new(parser: &'a mut StreamingParser<R>) -> Self {
530 Self {
531 parser,
532 done: false,
533 in_resource_list: false,
534 }
535 }
536}
537
538impl<'a, R: BufRead> Iterator for ResourceIterator<'a, R> {
539 type Item = Result<Resource, ParseError>;
540
541 fn next(&mut self) -> Option<Self::Item> {
542 None }
545}
546
547pub struct PartyIterator<'a, R: BufRead> {
548 parser: &'a mut StreamingParser<R>,
549 done: bool,
550}
551
552impl<'a, R: BufRead> PartyIterator<'a, R> {
553 fn new(parser: &'a mut StreamingParser<R>) -> Self {
554 Self {
555 parser,
556 done: false,
557 }
558 }
559}
560
561impl<'a, R: BufRead> Iterator for PartyIterator<'a, R> {
562 type Item = Result<Party, ParseError>;
563
564 fn next(&mut self) -> Option<Self::Item> {
565 None }
567}
568
569pub struct DealIterator<'a, R: BufRead> {
570 parser: &'a mut StreamingParser<R>,
571 done: bool,
572}
573
574impl<'a, R: BufRead> DealIterator<'a, R> {
575 fn new(parser: &'a mut StreamingParser<R>) -> Self {
576 Self {
577 parser,
578 done: false,
579 }
580 }
581}
582
583impl<'a, R: BufRead> Iterator for DealIterator<'a, R> {
584 type Item = Result<Deal, ParseError>;
585
586 fn next(&mut self) -> Option<Self::Item> {
587 None }
589}
590
591pub fn parse_streaming<R: BufRead>(
593 reader: R,
594 version: ERNVersion,
595 options: ParseOptions,
596) -> Result<ParsedERNMessage, ParseError> {
597 let mut parser = StreamingParser::new(reader, version)
598 .with_chunk_size(options.chunk_size)
599 .with_max_memory(options.max_memory);
600
601 let message_header = parser.parse_header()?;
603
604 let mut releases = Vec::new();
606 let mut resources = Vec::new();
607 let mut parties = Vec::new();
608 let mut deals = Vec::new();
609
610 for release_result in parser.stream_releases() {
612 let release = release_result?;
613 releases.push(release);
614 }
615
616 for resource_result in parser.stream_resources() {
618 let resource = resource_result?;
619 resources.push(resource);
620 }
621
622 for party_result in parser.stream_parties() {
624 let party = party_result?;
625 parties.push(party);
626 }
627
628 for deal_result in parser.stream_deals() {
630 let deal = deal_result?;
631 deals.push(deal);
632 }
633
634 let graph = ERNMessage {
636 message_header,
637 parties,
638 resources,
639 releases,
640 deals,
641 version,
642 profile: None,
643 message_audit_trail: None,
644 extensions: None,
645 comments: None,
646 };
647
648 let flat = Flattener::flatten(graph.clone());
650
651 Ok(ParsedERNMessage { graph, flat })
652}