bgpkit_parser/parser/
iters.rs

1/*!
2Provides parser iterator implementation.
3*/
4use crate::error::ParserError;
5use crate::models::*;
6use crate::parser::BgpkitParser;
7use crate::{Elementor, Filterable};
8use log::{error, warn};
9use std::io::Read;
10
11/// Use [ElemIterator] as the default iterator to return [BgpElem]s instead of [MrtRecord]s.
12impl<R: Read> IntoIterator for BgpkitParser<R> {
13    type Item = BgpElem;
14    type IntoIter = ElemIterator<R>;
15
16    fn into_iter(self) -> Self::IntoIter {
17        ElemIterator::new(self)
18    }
19}
20
21impl<R> BgpkitParser<R> {
22    pub fn into_record_iter(self) -> RecordIterator<R> {
23        RecordIterator::new(self)
24    }
25    pub fn into_elem_iter(self) -> ElemIterator<R> {
26        ElemIterator::new(self)
27    }
28}
29
30/*********
31MrtRecord Iterator
32**********/
33
34pub struct RecordIterator<R> {
35    pub parser: BgpkitParser<R>,
36    pub count: u64,
37    elementor: Elementor,
38}
39
40impl<R> RecordIterator<R> {
41    fn new(parser: BgpkitParser<R>) -> Self {
42        RecordIterator {
43            parser,
44            count: 0,
45            elementor: Elementor::new(),
46        }
47    }
48}
49
50impl<R: Read> Iterator for RecordIterator<R> {
51    type Item = MrtRecord;
52
53    fn next(&mut self) -> Option<MrtRecord> {
54        self.count += 1;
55        loop {
56            return match self.parser.next_record() {
57                Ok(v) => {
58                    // if None, the reaches EoF.
59                    let filters = &self.parser.filters;
60                    if filters.is_empty() {
61                        Some(v)
62                    } else {
63                        if let MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(
64                            _,
65                        )) = &v.message
66                        {
67                            let _ = self.elementor.record_to_elems(v.clone());
68                            return Some(v);
69                        }
70                        let elems = self.elementor.record_to_elems(v.clone());
71                        if elems.iter().any(|e| e.match_filters(&self.parser.filters)) {
72                            Some(v)
73                        } else {
74                            continue;
75                        }
76                    }
77                }
78                Err(e) => {
79                    match e.error {
80                        ParserError::TruncatedMsg(err_str) | ParserError::Unsupported(err_str) => {
81                            if self.parser.options.show_warnings {
82                                warn!("parser warn: {}", err_str);
83                            }
84                            if let Some(bytes) = e.bytes {
85                                std::fs::write("mrt_core_dump", bytes)
86                                    .expect("Unable to write to mrt_core_dump");
87                            }
88                            continue;
89                        }
90                        ParserError::ParseError(err_str) => {
91                            error!("parser error: {}", err_str);
92                            if self.parser.core_dump {
93                                if let Some(bytes) = e.bytes {
94                                    std::fs::write("mrt_core_dump", bytes)
95                                        .expect("Unable to write to mrt_core_dump");
96                                }
97                                None
98                            } else {
99                                continue;
100                            }
101                        }
102                        ParserError::EofExpected => {
103                            // normal end of file
104                            None
105                        }
106                        ParserError::IoError(err) | ParserError::EofError(err) => {
107                            // when reaching IO error, stop iterating
108                            error!("{:?}", err);
109                            if self.parser.core_dump {
110                                if let Some(bytes) = e.bytes {
111                                    std::fs::write("mrt_core_dump", bytes)
112                                        .expect("Unable to write to mrt_core_dump");
113                                }
114                            }
115                            None
116                        }
117                        #[cfg(feature = "oneio")]
118                        ParserError::OneIoError(_) => None,
119                        ParserError::FilterError(_) => {
120                            // this should not happen at this stage
121                            None
122                        }
123                    }
124                }
125            };
126        }
127    }
128}
129
130/*********
131BgpElem Iterator
132**********/
133
134pub struct ElemIterator<R> {
135    cache_elems: Vec<BgpElem>,
136    record_iter: RecordIterator<R>,
137    elementor: Elementor,
138    count: u64,
139}
140
141impl<R> ElemIterator<R> {
142    fn new(parser: BgpkitParser<R>) -> Self {
143        ElemIterator {
144            record_iter: RecordIterator::new(parser),
145            count: 0,
146            cache_elems: vec![],
147            elementor: Elementor::new(),
148        }
149    }
150}
151
152impl<R: Read> Iterator for ElemIterator<R> {
153    type Item = BgpElem;
154
155    fn next(&mut self) -> Option<BgpElem> {
156        self.count += 1;
157
158        loop {
159            if self.cache_elems.is_empty() {
160                // refill cache elems
161                loop {
162                    match self.record_iter.next() {
163                        None => {
164                            // no more records
165                            return None;
166                        }
167                        Some(r) => {
168                            let mut elems = self.elementor.record_to_elems(r);
169                            if elems.is_empty() {
170                                // somehow this record does not contain any elems, continue to parse next record
171                                continue;
172                            } else {
173                                elems.reverse();
174                                self.cache_elems = elems;
175                                break;
176                            }
177                        }
178                    }
179                }
180                // when reaching here, the `self.cache_elems` has been refilled with some more elems
181            }
182
183            // popping cached elems. note that the original elems order is preseved by reversing the
184            // vector before putting it on to cache_elems.
185            let elem = self.cache_elems.pop();
186            match elem {
187                None => return None,
188                Some(e) => match e.match_filters(&self.record_iter.parser.filters) {
189                    true => return Some(e),
190                    false => continue,
191                },
192            }
193        }
194    }
195}