1use super::element::HeaderBuilder;
5use super::state::{
6 ParserState, ParsingContext, PartialDeal, PartialMessageHeader, PartialParty, PartialRelease,
7 PartialResource,
8};
9use super::{ParsedElement, StreamingConfig, StreamingProgress};
10use crate::error::ParseError;
11use ddex_core::models::{graph::*, versions::ERNVersion};
12use ddex_core::models::{Identifier, IdentifierType, LocalizedString};
13use quick_xml::{events::Event, Reader};
14use std::io::BufRead;
15use std::time::Instant;
16
17pub struct StreamingDDEXParser<R: BufRead> {
19 reader: Reader<R>,
20 pub(crate) context: ParsingContext,
21 config: StreamingConfig,
22 buffer: Vec<u8>,
23 pub(crate) start_time: Instant,
24 pub(crate) bytes_processed: u64,
25 pub(crate) elements_yielded: usize,
26 pub(crate) current_memory: usize,
27 progress_callback: Option<Box<dyn FnMut(StreamingProgress) + Send>>,
28}
29
30impl<R: BufRead> StreamingDDEXParser<R> {
31 pub fn new(reader: R, version: ERNVersion) -> Self {
33 let mut xml_reader = Reader::from_reader(reader);
34 xml_reader.config_mut().trim_text(true);
35 xml_reader.config_mut().check_end_names = true;
36 xml_reader.config_mut().expand_empty_elements = false;
37
38 Self {
39 reader: xml_reader,
40 context: ParsingContext::new(version),
41 config: StreamingConfig::default(),
42 buffer: Vec::with_capacity(8192),
43 start_time: Instant::now(),
44 bytes_processed: 0,
45 elements_yielded: 0,
46 current_memory: 0,
47 progress_callback: None,
48 }
49 }
50
51 pub fn with_config(reader: R, version: ERNVersion, config: StreamingConfig) -> Self {
53 let mut parser = Self::new(reader, version);
54 let buffer_size = config.buffer_size;
55 parser.config = config;
56 parser.buffer.reserve(buffer_size);
57 parser
58 }
59
60 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
62 where
63 F: FnMut(StreamingProgress) + Send + 'static,
64 {
65 self.progress_callback = Some(Box::new(callback));
66 self
67 }
68
69 pub fn parse_next_element(&mut self) -> Result<Option<ParsedElement>, ParseError> {
71 loop {
72 self.buffer.clear();
73 let event = self.reader.read_event_into(&mut self.buffer)?;
74
75 match event {
77 Event::Start(e) | Event::Empty(e) => {
78 let name_bytes = e.name();
79 let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
80 let mut temp_attributes = std::collections::HashMap::new();
82 for attr_result in e.attributes() {
83 let attr = attr_result?;
84 let key = std::str::from_utf8(attr.key.as_ref())?;
85 let value = std::str::from_utf8(&attr.value)?;
86 temp_attributes.insert(key.to_string(), value.to_string());
87 }
88 self.handle_start_element_by_name_and_attrs(&name, temp_attributes)?;
90 }
91 Event::End(e) => {
92 let name_bytes = e.name();
93 let name = std::str::from_utf8(name_bytes.as_ref())?.to_string();
94 if let Some(element) = self.handle_end_element_by_name(&name)? {
96 self.elements_yielded += 1;
97 self.update_progress();
98 return Ok(Some(element));
99 }
100 }
101 Event::Text(e) => {
102 let text = std::str::from_utf8(&e)?;
103 self.context.add_text(text);
104 }
105 Event::CData(e) => {
106 let text = std::str::from_utf8(&e)?;
107 self.context.add_text(text);
108 }
109 Event::Eof => {
110 if matches!(self.context.state, ParserState::Complete) {
111 return Ok(Some(ParsedElement::EndOfStream));
112 } else {
113 return Ok(None);
114 }
115 }
116 _ => {
117 }
119 }
120
121 self.bytes_processed = self.reader.buffer_position();
122
123 self.check_security_limits()?;
125
126 if self.should_yield_for_memory()? {
128 continue;
129 }
130
131 self.buffer.clear();
132 }
133 }
134
135 fn handle_start_element(
137 &mut self,
138 element: &quick_xml::events::BytesStart,
139 ) -> Result<(), ParseError> {
140 let name_bytes = element.name();
141 let name = std::str::from_utf8(name_bytes.as_ref())?;
142 self.context.push_element(name);
143
144 self.context.attributes.clear();
146 for attr in element.attributes() {
147 let attr = attr?;
148 let key = std::str::from_utf8(attr.key.as_ref())?;
149 let value = std::str::from_utf8(&attr.value)?;
150 self.context
151 .attributes
152 .insert(key.to_string(), value.to_string());
153 }
154
155 self.context.clear_text_buffer();
156
157 match (&self.context.state, name) {
159 (ParserState::Initial, "ERNMessage") => {
160 }
162 (ParserState::Initial, "MessageHeader") => {
163 self.context.state = ParserState::InHeader {
164 header: PartialMessageHeader::default(),
165 depth: self.context.current_depth,
166 };
167 }
168 (ParserState::Initial, "Release") => {
169 let mut release = PartialRelease::default();
170 if let Some(reference) = self.context.attributes.get("ReleaseReference") {
171 release.release_reference = Some(reference.clone());
172 }
173 self.context.state = ParserState::InRelease {
174 release,
175 depth: self.context.current_depth,
176 };
177 }
178 (ParserState::Initial, "Resource") => {
179 let mut resource = PartialResource::default();
180 if let Some(reference) = self.context.attributes.get("ResourceReference") {
181 resource.resource_reference = Some(reference.clone());
182 }
183 self.context.state = ParserState::InResource {
184 resource,
185 depth: self.context.current_depth,
186 };
187 }
188 (ParserState::Initial, "Party") => {
189 let mut party = PartialParty::default();
190 if let Some(reference) = self.context.attributes.get("PartyReference") {
191 party.party_reference = Some(reference.clone());
192 }
193 self.context.state = ParserState::InParty {
194 party,
195 depth: self.context.current_depth,
196 };
197 }
198 (ParserState::Initial, "Deal") => {
199 let mut deal = PartialDeal::default();
200 if let Some(reference) = self.context.attributes.get("DealReference") {
201 deal.deal_reference = Some(reference.clone());
202 }
203 self.context.state = ParserState::InDeal {
204 deal,
205 depth: self.context.current_depth,
206 };
207 }
208 _ => {
209 self.handle_nested_start_element(name)?;
211 }
212 }
213
214 Ok(())
215 }
216
217 fn handle_nested_start_element(&mut self, name: &str) -> Result<(), ParseError> {
219 match &mut self.context.state {
220 ParserState::InHeader { .. } => {
221 match name {
223 "MessageId" => {
224 }
226 "MessageCreatedDateTime" => {
227 }
229 "MessageSender" | "MessageRecipient" => {
230 }
232 _ => {
233 }
235 }
236 }
237 ParserState::InRelease { .. } => {
238 match name {
240 "ReleaseId" | "ReleaseTitle" | "DisplayArtist" | "Genre" => {
241 }
243 _ => {
244 }
246 }
247 }
248 ParserState::InResource { .. } => {
249 match name {
251 "ResourceId" | "Title" | "DisplayArtist" | "Duration" | "Genre" => {
252 }
254 _ => {
255 }
257 }
258 }
259 _ => {
260 if !matches!(self.context.state, ParserState::Skipping { .. }) {
262 self.context.state = ParserState::Skipping {
263 start_depth: self.context.current_depth,
264 current_depth: self.context.current_depth,
265 };
266 }
267 }
268 }
269 Ok(())
270 }
271
272 fn handle_end_element(
274 &mut self,
275 element: &quick_xml::events::BytesEnd,
276 ) -> Result<Option<ParsedElement>, ParseError> {
277 let name_bytes = element.name();
278 let name = std::str::from_utf8(name_bytes.as_ref())?;
279 let text_content = self.context.take_text();
280
281 let result = match std::mem::take(&mut self.context.state) {
283 ParserState::InHeader { mut header, depth } => {
284 let res =
285 self.handle_header_end_element(name, &text_content, &mut header, depth)?;
286 self.context.state = ParserState::InHeader { header, depth };
287 res
288 }
289 ParserState::InRelease { mut release, depth } => {
290 let res =
291 self.handle_release_end_element(name, &text_content, &mut release, depth)?;
292 self.context.state = ParserState::InRelease { release, depth };
293 res
294 }
295 ParserState::InResource {
296 mut resource,
297 depth,
298 } => {
299 let res =
300 self.handle_resource_end_element(name, &text_content, &mut resource, depth)?;
301 self.context.state = ParserState::InResource { resource, depth };
302 res
303 }
304 ParserState::InParty { mut party, depth } => {
305 let res = self.handle_party_end_element(name, &text_content, &mut party, depth)?;
306 self.context.state = ParserState::InParty { party, depth };
307 res
308 }
309 ParserState::InDeal { mut deal, depth } => {
310 let res = self.handle_deal_end_element(name, &text_content, &mut deal, depth)?;
311 self.context.state = ParserState::InDeal { deal, depth };
312 res
313 }
314 ParserState::Skipping {
315 start_depth,
316 current_depth: _,
317 } => {
318 if self.context.current_depth <= start_depth {
319 self.context.state = ParserState::Initial;
320 }
321 None
322 }
323 _ => None,
324 };
325
326 self.context.pop_element();
327 Ok(result)
328 }
329
330 fn handle_header_end_element(
332 &mut self,
333 name: &str,
334 text_content: &str,
335 header: &mut PartialMessageHeader,
336 depth: usize,
337 ) -> Result<Option<ParsedElement>, ParseError> {
338 match name {
339 "MessageId" => {
340 header.message_id = Some(Identifier {
341 id_type: IdentifierType::Proprietary,
342 namespace: None,
343 value: text_content.to_string(),
344 });
345 }
346 "MessageCreatedDateTime" => {
347 header.message_created_date_time = Some(text_content.to_string());
348 }
349 "MessageHeader" if self.context.current_depth == depth => {
350 let sender = header.sender.take().unwrap_or_else(|| MessageSender {
352 party_id: vec![],
353 party_name: vec![LocalizedString {
354 text: "Unknown".to_string(),
355 language_code: None,
356 script: None,
357 }],
359 trading_name: None,
360 attributes: None,
361 extensions: None,
362 comments: None,
363 });
364
365 let element = HeaderBuilder::new()
366 .sender(sender)
367 .message_id(header.message_id.take().unwrap_or_else(|| Identifier {
368 id_type: IdentifierType::Proprietary,
369 namespace: None,
370 value: "unknown".to_string(),
371 }))
372 .created_date_time(header.message_created_date_time.take().unwrap_or_default())
373 .version(self.context.version)
374 .build()?;
375
376 self.context.state = ParserState::Initial;
377 return Ok(Some(element));
378 }
379 _ => {}
380 }
381 Ok(None)
382 }
383
384 fn handle_release_end_element(
386 &mut self,
387 name: &str,
388 text_content: &str,
389 release: &mut PartialRelease,
390 depth: usize,
391 ) -> Result<Option<ParsedElement>, ParseError> {
392 match name {
393 "ReleaseTitle" => {
394 release.release_title.push(LocalizedString {
395 text: text_content.to_string(),
396 language_code: self.context.attributes.get("LanguageCode").cloned(),
397 script: None,
398 });
399 }
400 "Genre" => {
401 release.genre.push(Genre {
402 genre_text: text_content.to_string(),
403 sub_genre: None,
404 attributes: None,
405 extensions: None,
406 comments: None,
407 });
408 }
409 "ReleaseDate" => {
410 }
413 "Release" if self.context.current_depth == depth => {
414 if release.is_complete() {
416 let completed_release = release.clone().into_release();
417 self.context.state = ParserState::Initial;
418 return Ok(Some(ParsedElement::Release(completed_release)));
419 }
420 }
421 _ => {}
422 }
423 Ok(None)
424 }
425
426 fn handle_resource_end_element(
428 &mut self,
429 name: &str,
430 text_content: &str,
431 resource: &mut PartialResource,
432 depth: usize,
433 ) -> Result<Option<ParsedElement>, ParseError> {
434 match name {
435 "Title" => {
436 resource.reference_title.push(LocalizedString {
437 text: text_content.to_string(),
438 language_code: self.context.attributes.get("LanguageCode").cloned(),
439 script: None,
440 });
441 }
442 "Genre" => {
443 }
446 "Duration" => {
447 if let Ok(seconds) = text_content.parse::<u64>() {
448 resource.duration = Some(std::time::Duration::from_secs(seconds));
449 }
450 }
451 "Resource" if self.context.current_depth == depth => {
452 if resource.is_complete() {
454 let completed_resource = resource.clone().into_resource();
455 self.context.state = ParserState::Initial;
456 return Ok(Some(ParsedElement::Resource(completed_resource)));
457 }
458 }
459 _ => {}
460 }
461 Ok(None)
462 }
463
464 fn handle_party_end_element(
466 &mut self,
467 _name: &str,
468 _text_content: &str,
469 _party: &mut PartialParty,
470 _depth: usize,
471 ) -> Result<Option<ParsedElement>, ParseError> {
472 Ok(None)
474 }
475
476 fn handle_deal_end_element(
478 &mut self,
479 _name: &str,
480 _text_content: &str,
481 _deal: &mut PartialDeal,
482 _depth: usize,
483 ) -> Result<Option<ParsedElement>, ParseError> {
484 Ok(None)
486 }
487
488 fn check_security_limits(&self) -> Result<(), ParseError> {
490 if self.context.current_depth > self.config.security.max_element_depth {
492 return Err(ParseError::SecurityViolation {
493 message: format!(
494 "Nesting depth {} exceeds maximum {}",
495 self.context.current_depth, self.config.security.max_element_depth
496 ),
497 });
498 }
499
500 if self.current_memory > self.config.max_memory {
502 return Err(ParseError::SecurityViolation {
503 message: format!(
504 "Memory usage {} exceeds maximum {}",
505 self.current_memory, self.config.max_memory
506 ),
507 });
508 }
509
510 Ok(())
511 }
512
513 fn should_yield_for_memory(&self) -> Result<bool, ParseError> {
515 Ok(self.current_memory > self.config.max_memory / 2)
517 }
518
519 fn update_progress(&mut self) {
521 if self.config.enable_progress && self.bytes_processed % self.config.progress_interval == 0
522 {
523 if let Some(ref mut callback) = self.progress_callback {
524 let progress = StreamingProgress {
525 bytes_processed: self.bytes_processed,
526 elements_parsed: self.elements_yielded,
527 releases_parsed: 0, resources_parsed: 0, parties_parsed: 0,
530 deals_parsed: 0,
531 elapsed: self.start_time.elapsed(),
532 estimated_total_bytes: None,
533 current_depth: self.context.current_depth,
534 memory_usage: self.current_memory,
535 };
536 callback(progress);
537 }
538 }
539 }
540
541 fn get_current_location(&self) -> String {
543 format!("streaming at byte offset {}", self.bytes_processed)
544 }
545}
546
547impl<R: BufRead> std::fmt::Debug for StreamingDDEXParser<R> {
548 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
549 f.debug_struct("StreamingDDEXParser")
550 .field("bytes_processed", &self.bytes_processed)
551 .field("elements_yielded", &self.elements_yielded)
552 .field("current_depth", &self.context.current_depth)
553 .field("current_memory", &self.current_memory)
554 .finish()
555 }
556}
557
558impl<R: BufRead> StreamingDDEXParser<R> {
559 fn handle_start_element_by_name_and_attrs(
561 &mut self,
562 name: &str,
563 attrs: std::collections::HashMap<String, String>,
564 ) -> Result<(), ParseError> {
565 self.context.push_element(name);
566 self.context.attributes = attrs;
567 self.context.clear_text_buffer();
568
569 match (&self.context.state, name) {
571 (ParserState::Initial, "MessageHeader") => {
572 self.context.state = ParserState::InHeader {
573 header: crate::streaming::state::PartialMessageHeader::default(),
574 depth: self.context.current_depth,
575 };
576 }
577 (ParserState::Initial, "Release") => {
578 let _reference = self
579 .context
580 .attributes
581 .get("ReleaseReference")
582 .unwrap_or(&"default".to_string())
583 .clone();
584 self.context.state = ParserState::InRelease {
585 release: crate::streaming::state::PartialRelease::default(),
586 depth: self.context.current_depth,
587 };
588 }
589 _ => {} }
591
592 Ok(())
593 }
594
595 fn handle_end_element_by_name(
597 &mut self,
598 name: &str,
599 ) -> Result<Option<ParsedElement>, ParseError> {
600 let _text_content = self.context.take_text();
601
602 self.context.pop_element();
604
605 match name {
607 "MessageHeader" => {
608 if matches!(self.context.state, ParserState::InHeader { .. }) {
609 self.context.state = ParserState::Initial;
610 return Ok(None); }
613 }
614 "Release" => {
615 if matches!(self.context.state, ParserState::InRelease { .. }) {
616 self.context.state = ParserState::Initial;
617 return Ok(None); }
620 }
621 _ => {
622 }
624 }
625
626 Ok(None)
627 }
628}