ddex_parser/streaming/
iterator.rs

1// src/streaming/iterator.rs
2//! Iterator implementation for streaming DDEX parser
3
4use super::{ParsedElement, StreamingConfig, StreamingDDEXParser, StreamingProgress};
5use crate::error::ParseError;
6use ddex_core::models::versions::ERNVersion;
7use std::io::BufRead;
8
9/// Iterator wrapper for streaming DDEX parser
10pub struct DDEXStreamIterator<R: BufRead> {
11    parser: StreamingDDEXParser<R>,
12    finished: bool,
13    error_state: Option<ParseError>,
14}
15
16impl<R: BufRead> DDEXStreamIterator<R> {
17    /// Create new iterator from reader
18    pub fn new(reader: R, version: ERNVersion) -> Self {
19        Self {
20            parser: StreamingDDEXParser::new(reader, version),
21            finished: false,
22            error_state: None,
23        }
24    }
25
26    /// Create with custom configuration
27    pub fn with_config(reader: R, version: ERNVersion, config: StreamingConfig) -> Self {
28        Self {
29            parser: StreamingDDEXParser::with_config(reader, version, config),
30            finished: false,
31            error_state: None,
32        }
33    }
34
35    /// Add progress callback
36    pub fn with_progress_callback<F>(mut self, callback: F) -> Self
37    where
38        F: FnMut(StreamingProgress) + Send + 'static,
39    {
40        self.parser = self.parser.with_progress_callback(callback);
41        self
42    }
43
44    /// Get current parsing statistics
45    pub fn stats(&self) -> IteratorStats {
46        IteratorStats {
47            bytes_processed: self.parser.bytes_processed,
48            elements_yielded: self.parser.elements_yielded,
49            current_depth: self.parser.context.current_depth,
50            memory_usage: self.parser.current_memory,
51            elapsed: self.parser.start_time.elapsed(),
52            is_finished: self.finished,
53            has_error: self.error_state.is_some(),
54        }
55    }
56
57    /// Check if iterator has encountered an error
58    pub fn has_error(&self) -> bool {
59        self.error_state.is_some()
60    }
61
62    /// Get the last error if any
63    pub fn last_error(&self) -> Option<&ParseError> {
64        self.error_state.as_ref()
65    }
66
67    /// Reset error state (use with caution)
68    pub fn clear_error(&mut self) {
69        self.error_state = None;
70    }
71
72    /// Try to recover from a specific error type
73    pub fn try_recover(&mut self) -> Result<(), ParseError> {
74        if let Some(ref error) = self.error_state {
75            match error {
76                ParseError::XmlError { .. } => {
77                    // For XML errors, we might be able to skip to next element
78                    self.clear_error();
79                    Ok(())
80                }
81                ParseError::SecurityViolation { .. } => {
82                    // Security violations should not be recoverable
83                    Err(error.clone())
84                }
85                _ => {
86                    // Other errors might be recoverable
87                    self.clear_error();
88                    Ok(())
89                }
90            }
91        } else {
92            Ok(())
93        }
94    }
95
96    /// Consume iterator and collect all elements
97    pub fn collect_all(self) -> Result<Vec<ParsedElement>, ParseError> {
98        let mut elements = Vec::new();
99        for result in self {
100            match result {
101                Ok(element) => {
102                    if matches!(element, ParsedElement::EndOfStream) {
103                        break;
104                    }
105                    elements.push(element);
106                }
107                Err(e) => return Err(e),
108            }
109        }
110        Ok(elements)
111    }
112
113    /// Collect only specific element types
114    pub fn collect_releases(self) -> Result<Vec<ddex_core::models::graph::Release>, ParseError> {
115        let mut releases = Vec::new();
116        for result in self {
117            match result {
118                Ok(ParsedElement::Release(release)) => {
119                    releases.push(release);
120                }
121                Ok(ParsedElement::EndOfStream) => break,
122                Ok(_) => continue, // Skip other element types
123                Err(e) => return Err(e),
124            }
125        }
126        Ok(releases)
127    }
128
129    /// Collect only resources
130    pub fn collect_resources(self) -> Result<Vec<ddex_core::models::graph::Resource>, ParseError> {
131        let mut resources = Vec::new();
132        for result in self {
133            match result {
134                Ok(ParsedElement::Resource(resource)) => {
135                    resources.push(resource);
136                }
137                Ok(ParsedElement::EndOfStream) => break,
138                Ok(_) => continue,
139                Err(e) => return Err(e),
140            }
141        }
142        Ok(resources)
143    }
144
145    /// Skip to next element of specific type
146    pub fn skip_to_next_release(
147        &mut self,
148    ) -> Result<Option<ddex_core::models::graph::Release>, ParseError> {
149        for result in self {
150            match result {
151                Ok(ParsedElement::Release(release)) => {
152                    return Ok(Some(release));
153                }
154                Ok(ParsedElement::EndOfStream) => {
155                    return Ok(None);
156                }
157                Ok(_) => continue,
158                Err(e) => return Err(e),
159            }
160        }
161        Ok(None)
162    }
163}
164
165impl<R: BufRead> Iterator for DDEXStreamIterator<R> {
166    type Item = Result<ParsedElement, ParseError>;
167
168    fn next(&mut self) -> Option<Self::Item> {
169        if self.finished || self.error_state.is_some() {
170            return None;
171        }
172
173        match self.parser.parse_next_element() {
174            Ok(Some(element)) => {
175                if matches!(element, ParsedElement::EndOfStream) {
176                    self.finished = true;
177                }
178                Some(Ok(element))
179            }
180            Ok(None) => {
181                self.finished = true;
182                None
183            }
184            Err(e) => {
185                self.error_state = Some(e.clone());
186                self.finished = true;
187                Some(Err(e))
188            }
189        }
190    }
191}
192
193impl<R: BufRead> std::fmt::Debug for DDEXStreamIterator<R> {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        f.debug_struct("DDEXStreamIterator")
196            .field("finished", &self.finished)
197            .field("has_error", &self.error_state.is_some())
198            .field("parser", &self.parser)
199            .finish()
200    }
201}
202
203/// Statistics about iterator state
204#[derive(Debug, Clone)]
205pub struct IteratorStats {
206    pub bytes_processed: u64,
207    pub elements_yielded: usize,
208    pub current_depth: usize,
209    pub memory_usage: usize,
210    pub elapsed: std::time::Duration,
211    pub is_finished: bool,
212    pub has_error: bool,
213}
214
215impl IteratorStats {
216    /// Get processing rate in bytes per second
217    pub fn bytes_per_second(&self) -> f64 {
218        if self.elapsed.as_secs_f64() > 0.0 {
219            self.bytes_processed as f64 / self.elapsed.as_secs_f64()
220        } else {
221            0.0
222        }
223    }
224
225    /// Get element processing rate per second
226    pub fn elements_per_second(&self) -> f64 {
227        if self.elapsed.as_secs_f64() > 0.0 {
228            self.elements_yielded as f64 / self.elapsed.as_secs_f64()
229        } else {
230            0.0
231        }
232    }
233
234    /// Get memory usage in MB
235    pub fn memory_usage_mb(&self) -> f64 {
236        self.memory_usage as f64 / (1024.0 * 1024.0)
237    }
238
239    /// Get throughput in MiB/s
240    pub fn throughput_mibs(&self) -> f64 {
241        if self.elapsed.as_secs_f64() > 0.0 {
242            (self.bytes_processed as f64 / (1024.0 * 1024.0)) / self.elapsed.as_secs_f64()
243        } else {
244            0.0
245        }
246    }
247}
248
249/// Filtered iterator for specific element types
250pub struct FilteredDDEXIterator<R: BufRead, F>
251where
252    F: Fn(&ParsedElement) -> bool,
253{
254    inner: DDEXStreamIterator<R>,
255    filter: F,
256}
257
258impl<R: BufRead, F> FilteredDDEXIterator<R, F>
259where
260    F: Fn(&ParsedElement) -> bool,
261{
262    /// Create new filtered iterator
263    pub fn new(inner: DDEXStreamIterator<R>, filter: F) -> Self {
264        Self { inner, filter }
265    }
266}
267
268impl<R: BufRead, F> Iterator for FilteredDDEXIterator<R, F>
269where
270    F: Fn(&ParsedElement) -> bool,
271{
272    type Item = Result<ParsedElement, ParseError>;
273
274    fn next(&mut self) -> Option<Self::Item> {
275        loop {
276            match self.inner.next() {
277                Some(Ok(element)) => {
278                    if (self.filter)(&element) || matches!(element, ParsedElement::EndOfStream) {
279                        return Some(Ok(element));
280                    }
281                    // Continue to next element
282                }
283                Some(Err(e)) => return Some(Err(e)),
284                None => return None,
285            }
286        }
287    }
288}
289
290/// Convenience functions for creating filtered iterators
291impl<R: BufRead> DDEXStreamIterator<R> {
292    /// Filter to only releases
293    pub fn releases_only(self) -> FilteredDDEXIterator<R, impl Fn(&ParsedElement) -> bool> {
294        FilteredDDEXIterator::new(self, |element| matches!(element, ParsedElement::Release(_)))
295    }
296
297    /// Filter to only resources
298    pub fn resources_only(self) -> FilteredDDEXIterator<R, impl Fn(&ParsedElement) -> bool> {
299        FilteredDDEXIterator::new(self, |element| {
300            matches!(element, ParsedElement::Resource(_))
301        })
302    }
303
304    /// Filter to only headers
305    pub fn headers_only(self) -> FilteredDDEXIterator<R, impl Fn(&ParsedElement) -> bool> {
306        FilteredDDEXIterator::new(self, |element| {
307            matches!(element, ParsedElement::Header { .. })
308        })
309    }
310
311    /// Filter with custom predicate
312    pub fn filter<F>(self, filter: F) -> FilteredDDEXIterator<R, F>
313    where
314        F: Fn(&ParsedElement) -> bool,
315    {
316        FilteredDDEXIterator::new(self, filter)
317    }
318}
319
320#[cfg(test)]
321mod tests {
322    use super::*;
323    use std::io::Cursor;
324
325    #[test]
326    fn test_iterator_stats() {
327        let stats = IteratorStats {
328            bytes_processed: 1024 * 1024, // 1MB
329            elements_yielded: 10,
330            current_depth: 5,
331            memory_usage: 2 * 1024 * 1024, // 2MB
332            elapsed: std::time::Duration::from_secs(1),
333            is_finished: false,
334            has_error: false,
335        };
336
337        assert_eq!(stats.bytes_per_second(), 1024.0 * 1024.0);
338        assert_eq!(stats.elements_per_second(), 10.0);
339        assert_eq!(stats.memory_usage_mb(), 2.0);
340        assert_eq!(stats.throughput_mibs(), 1.0);
341    }
342
343    #[test]
344    fn test_iterator_creation() {
345        let xml = "<ERNMessage></ERNMessage>";
346        let cursor = Cursor::new(xml.as_bytes());
347        let iterator = DDEXStreamIterator::new(cursor, ERNVersion::V4_3);
348
349        assert!(!iterator.finished);
350        assert!(!iterator.has_error());
351    }
352}