1use super::{ParsedElement, StreamingConfig, StreamingDDEXParser, StreamingProgress};
5use crate::error::ParseError;
6use ddex_core::models::versions::ERNVersion;
7use std::io::BufRead;
8
9pub struct DDEXStreamIterator<R: BufRead> {
11 parser: StreamingDDEXParser<R>,
12 finished: bool,
13 error_state: Option<ParseError>,
14}
15
16impl<R: BufRead> DDEXStreamIterator<R> {
17 pub fn new(reader: R, version: ERNVersion) -> Self {
19 Self {
20 parser: StreamingDDEXParser::new(reader, version),
21 finished: false,
22 error_state: None,
23 }
24 }
25
26 pub fn with_config(reader: R, version: ERNVersion, config: StreamingConfig) -> Self {
28 Self {
29 parser: StreamingDDEXParser::with_config(reader, version, config),
30 finished: false,
31 error_state: None,
32 }
33 }
34
35 pub fn with_progress_callback<F>(mut self, callback: F) -> Self
37 where
38 F: FnMut(StreamingProgress) + Send + 'static,
39 {
40 self.parser = self.parser.with_progress_callback(callback);
41 self
42 }
43
44 pub fn stats(&self) -> IteratorStats {
46 IteratorStats {
47 bytes_processed: self.parser.bytes_processed,
48 elements_yielded: self.parser.elements_yielded,
49 current_depth: self.parser.context.current_depth,
50 memory_usage: self.parser.current_memory,
51 elapsed: self.parser.start_time.elapsed(),
52 is_finished: self.finished,
53 has_error: self.error_state.is_some(),
54 }
55 }
56
57 pub fn has_error(&self) -> bool {
59 self.error_state.is_some()
60 }
61
62 pub fn last_error(&self) -> Option<&ParseError> {
64 self.error_state.as_ref()
65 }
66
67 pub fn clear_error(&mut self) {
69 self.error_state = None;
70 }
71
72 pub fn try_recover(&mut self) -> Result<(), ParseError> {
74 if let Some(ref error) = self.error_state {
75 match error {
76 ParseError::XmlError { .. } => {
77 self.clear_error();
79 Ok(())
80 }
81 ParseError::SecurityViolation { .. } => {
82 Err(error.clone())
84 }
85 _ => {
86 self.clear_error();
88 Ok(())
89 }
90 }
91 } else {
92 Ok(())
93 }
94 }
95
96 pub fn collect_all(self) -> Result<Vec<ParsedElement>, ParseError> {
98 let mut elements = Vec::new();
99 for result in self {
100 match result {
101 Ok(element) => {
102 if matches!(element, ParsedElement::EndOfStream) {
103 break;
104 }
105 elements.push(element);
106 }
107 Err(e) => return Err(e),
108 }
109 }
110 Ok(elements)
111 }
112
113 pub fn collect_releases(self) -> Result<Vec<ddex_core::models::graph::Release>, ParseError> {
115 let mut releases = Vec::new();
116 for result in self {
117 match result {
118 Ok(ParsedElement::Release(release)) => {
119 releases.push(release);
120 }
121 Ok(ParsedElement::EndOfStream) => break,
122 Ok(_) => continue, Err(e) => return Err(e),
124 }
125 }
126 Ok(releases)
127 }
128
129 pub fn collect_resources(self) -> Result<Vec<ddex_core::models::graph::Resource>, ParseError> {
131 let mut resources = Vec::new();
132 for result in self {
133 match result {
134 Ok(ParsedElement::Resource(resource)) => {
135 resources.push(resource);
136 }
137 Ok(ParsedElement::EndOfStream) => break,
138 Ok(_) => continue,
139 Err(e) => return Err(e),
140 }
141 }
142 Ok(resources)
143 }
144
145 pub fn skip_to_next_release(
147 &mut self,
148 ) -> Result<Option<ddex_core::models::graph::Release>, ParseError> {
149 for result in self {
150 match result {
151 Ok(ParsedElement::Release(release)) => {
152 return Ok(Some(release));
153 }
154 Ok(ParsedElement::EndOfStream) => {
155 return Ok(None);
156 }
157 Ok(_) => continue,
158 Err(e) => return Err(e),
159 }
160 }
161 Ok(None)
162 }
163}
164
165impl<R: BufRead> Iterator for DDEXStreamIterator<R> {
166 type Item = Result<ParsedElement, ParseError>;
167
168 fn next(&mut self) -> Option<Self::Item> {
169 if self.finished || self.error_state.is_some() {
170 return None;
171 }
172
173 match self.parser.parse_next_element() {
174 Ok(Some(element)) => {
175 if matches!(element, ParsedElement::EndOfStream) {
176 self.finished = true;
177 }
178 Some(Ok(element))
179 }
180 Ok(None) => {
181 self.finished = true;
182 None
183 }
184 Err(e) => {
185 self.error_state = Some(e.clone());
186 self.finished = true;
187 Some(Err(e))
188 }
189 }
190 }
191}
192
193impl<R: BufRead> std::fmt::Debug for DDEXStreamIterator<R> {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 f.debug_struct("DDEXStreamIterator")
196 .field("finished", &self.finished)
197 .field("has_error", &self.error_state.is_some())
198 .field("parser", &self.parser)
199 .finish()
200 }
201}
202
203#[derive(Debug, Clone)]
205pub struct IteratorStats {
206 pub bytes_processed: u64,
207 pub elements_yielded: usize,
208 pub current_depth: usize,
209 pub memory_usage: usize,
210 pub elapsed: std::time::Duration,
211 pub is_finished: bool,
212 pub has_error: bool,
213}
214
215impl IteratorStats {
216 pub fn bytes_per_second(&self) -> f64 {
218 if self.elapsed.as_secs_f64() > 0.0 {
219 self.bytes_processed as f64 / self.elapsed.as_secs_f64()
220 } else {
221 0.0
222 }
223 }
224
225 pub fn elements_per_second(&self) -> f64 {
227 if self.elapsed.as_secs_f64() > 0.0 {
228 self.elements_yielded as f64 / self.elapsed.as_secs_f64()
229 } else {
230 0.0
231 }
232 }
233
234 pub fn memory_usage_mb(&self) -> f64 {
236 self.memory_usage as f64 / (1024.0 * 1024.0)
237 }
238
239 pub fn throughput_mibs(&self) -> f64 {
241 if self.elapsed.as_secs_f64() > 0.0 {
242 (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
243 } else {
244 0.0
245 }
246 }
247}
248
249pub struct FilteredDDEXIterator<R: BufRead, F>
251where
252 F: Fn(&ParsedElement) -> bool,
253{
254 inner: DDEXStreamIterator<R>,
255 filter: F,
256}
257
258impl<R: BufRead, F> FilteredDDEXIterator<R, F>
259where
260 F: Fn(&ParsedElement) -> bool,
261{
262 pub fn new(inner: DDEXStreamIterator<R>, filter: F) -> Self {
264 Self { inner, filter }
265 }
266}
267
268impl<R: BufRead, F> Iterator for FilteredDDEXIterator<R, F>
269where
270 F: Fn(&ParsedElement) -> bool,
271{
272 type Item = Result<ParsedElement, ParseError>;
273
274 fn next(&mut self) -> Option<Self::Item> {
275 loop {
276 match self.inner.next() {
277 Some(Ok(element)) => {
278 if (self.filter)(&element) || matches!(element, ParsedElement::EndOfStream) {
279 return Some(Ok(element));
280 }
281 }
283 Some(Err(e)) => return Some(Err(e)),
284 None => return None,
285 }
286 }
287 }
288}
289
290impl<R: BufRead> DDEXStreamIterator<R> {
292 pub fn releases_only(self) -> FilteredDDEXIterator<R, impl Fn(&ParsedElement) -> bool> {
294 FilteredDDEXIterator::new(self, |element| matches!(element, ParsedElement::Release(_)))
295 }
296
297 pub fn resources_only(self) -> FilteredDDEXIterator<R, impl Fn(&ParsedElement) -> bool> {
299 FilteredDDEXIterator::new(self, |element| {
300 matches!(element, ParsedElement::Resource(_))
301 })
302 }
303
304 pub fn headers_only(self) -> FilteredDDEXIterator<R, impl Fn(&ParsedElement) -> bool> {
306 FilteredDDEXIterator::new(self, |element| {
307 matches!(element, ParsedElement::Header { .. })
308 })
309 }
310
311 pub fn filter<F>(self, filter: F) -> FilteredDDEXIterator<R, F>
313 where
314 F: Fn(&ParsedElement) -> bool,
315 {
316 FilteredDDEXIterator::new(self, filter)
317 }
318}
319
320#[cfg(test)]
321mod tests {
322 use super::*;
323 use std::io::Cursor;
324
325 #[test]
326 fn test_iterator_stats() {
327 let stats = IteratorStats {
328 bytes_processed: 1024 * 1024, elements_yielded: 10,
330 current_depth: 5,
331 memory_usage: 2 * 1024 * 1024, elapsed: std::time::Duration::from_secs(1),
333 is_finished: false,
334 has_error: false,
335 };
336
337 assert_eq!(stats.bytes_per_second(), 1024.0 * 1024.0);
338 assert_eq!(stats.elements_per_second(), 10.0);
339 assert_eq!(stats.memory_usage_mb(), 2.0);
340 assert_eq!(stats.throughput_mibs(), 1.0);
341 }
342
343 #[test]
344 fn test_iterator_creation() {
345 let xml = "<ERNMessage></ERNMessage>";
346 let cursor = Cursor::new(xml.as_bytes());
347 let iterator = DDEXStreamIterator::new(cursor, ERNVersion::V4_3);
348
349 assert!(!iterator.finished);
350 assert!(!iterator.has_error());
351 }
352}