Skip to main content

bgpkit_parser/parser/iters/
route.rs

1use crate::error::{ParserError, ParserErrorWithBytes};
2use crate::models::*;
3use crate::parser::bgp::attributes::{parse_as_path, parse_nlri, AttributeValidationState};
4use crate::parser::bgp::messages::read_and_validate_bgp_marker;
5use crate::parser::iters::write_mrt_core_dump;
6use crate::parser::mrt::messages::bgp4mp::bgp4mp_message_payload_len;
7use crate::parser::mrt::messages::table_dump_v2::rib_entry_min_len;
8use crate::parser::{chunk_mrt_record, parse_nlri_list, BgpkitParser, Filterable, ReadUtils};
9use bytes::{Buf, Bytes};
10use ipnet::IpNet;
11use log::{error, warn};
12use std::io::Read;
13use std::net::IpAddr;
14use std::sync::Arc;
15
16#[derive(Default)]
17struct RouteAttributes {
18    as_path: Option<Arc<AsPath>>,
19    announced: Vec<NetworkPrefix>,
20    withdrawn: Vec<NetworkPrefix>,
21}
22
23struct RouteAttributeContext<'a> {
24    afi: Option<Afi>,
25    safi: Option<Safi>,
26    prefixes: Option<&'a [NetworkPrefix]>,
27    is_announcement: Option<bool>,
28    has_standard_nlri: bool,
29}
30
31fn merge_as_path(as_path: Option<AsPath>, as4_path: Option<AsPath>) -> Option<Arc<AsPath>> {
32    let path = match (as_path, as4_path) {
33        (None, None) => None,
34        (Some(path), None) | (None, Some(path)) => Some(path),
35        (Some(path), Some(as4_path)) => Some(AsPath::merge_aspath_as4path(&path, &as4_path)),
36    };
37    path.map(Arc::new)
38}
39
40fn parse_route_attributes(
41    mut data: Bytes,
42    asn_len: &AsnLength,
43    add_path: bool,
44    ctx: RouteAttributeContext<'_>,
45) -> Result<RouteAttributes, ParserError> {
46    let mut validation = AttributeValidationState::new();
47    let mut as_path = None;
48    let mut as4_path = None;
49    let mut announced = Vec::new();
50    let mut withdrawn = Vec::new();
51
52    while data.remaining() >= 3 {
53        let flags = AttrFlags::from_bits_retain(data.read_u8()?);
54        let raw_attr_type = data.read_u8()?;
55        let attr_length = if flags.contains(AttrFlags::EXTENDED) {
56            data.read_u16()? as usize
57        } else {
58            data.read_u8()? as usize
59        };
60        let attr_type = AttrType::from(raw_attr_type);
61        let partial = validation.observe_header(raw_attr_type, attr_type, flags, attr_length);
62
63        if data.remaining() < attr_length {
64            warn!(
65                "{:?} attribute encodes a length ({}) that is longer than the remaining attribute data ({}). Skipping remaining attribute data for BGP message",
66                attr_type,
67                attr_length,
68                data.remaining()
69            );
70            break;
71        }
72
73        let attr_data = data.split_to(attr_length);
74        let result = match attr_type {
75            AttrType::AS_PATH => parse_as_path(attr_data, asn_len).map(|path| {
76                as_path = Some(path);
77            }),
78            AttrType::AS4_PATH => parse_as_path(attr_data, &AsnLength::Bits32).map(|path| {
79                as4_path = Some(path);
80            }),
81            AttrType::MP_REACHABLE_NLRI => parse_nlri(
82                attr_data,
83                &ctx.afi,
84                &ctx.safi,
85                &ctx.prefixes,
86                true,
87                add_path,
88            )
89            .map(|attr| {
90                if let AttributeValue::MpReachNlri(nlri) = attr {
91                    announced = nlri.prefixes;
92                }
93            }),
94            AttrType::MP_UNREACHABLE_NLRI => parse_nlri(
95                attr_data,
96                &ctx.afi,
97                &ctx.safi,
98                &ctx.prefixes,
99                false,
100                add_path,
101            )
102            .map(|attr| {
103                if let AttributeValue::MpUnreachNlri(nlri) = attr {
104                    withdrawn = nlri.prefixes;
105                }
106            }),
107            _ => Ok(()),
108        };
109
110        if let Err(err) = result {
111            validation.observe_parse_error(attr_type, partial, &err);
112        }
113    }
114
115    let is_announcement = ctx
116        .is_announcement
117        .unwrap_or(ctx.has_standard_nlri || validation.has_attr(AttrType::MP_REACHABLE_NLRI));
118    validation.check_mandatory_attributes(is_announcement, ctx.has_standard_nlri);
119    let _warnings = validation.finish();
120    Ok(RouteAttributes {
121        as_path: merge_as_path(as_path, as4_path),
122        announced,
123        withdrawn,
124    })
125}
126
127fn record_timestamp(common_header: &CommonHeader) -> f64 {
128    match common_header.microsecond_timestamp {
129        Some(microseconds) => common_header.timestamp as f64 + microseconds as f64 / 1_000_000.0,
130        None => common_header.timestamp as f64,
131    }
132}
133
134struct RouteUpdateIter {
135    timestamp: f64,
136    peer_ip: IpAddr,
137    peer_asn: Asn,
138    as_path: Option<Arc<AsPath>>,
139    announced:
140        std::iter::Chain<std::vec::IntoIter<NetworkPrefix>, std::vec::IntoIter<NetworkPrefix>>,
141    withdrawn:
142        std::iter::Chain<std::vec::IntoIter<NetworkPrefix>, std::vec::IntoIter<NetworkPrefix>>,
143    in_withdrawn_phase: bool,
144}
145
146impl RouteUpdateIter {
147    fn next_route(&mut self) -> Option<BgpRouteElem> {
148        if !self.in_withdrawn_phase {
149            if let Some(prefix) = self.announced.next() {
150                return Some(BgpRouteElem {
151                    timestamp: self.timestamp,
152                    elem_type: ElemType::ANNOUNCE,
153                    peer_ip: self.peer_ip,
154                    peer_asn: self.peer_asn,
155                    prefix,
156                    as_path: self.as_path.clone(),
157                });
158            }
159            self.in_withdrawn_phase = true;
160        }
161
162        self.withdrawn.next().map(|prefix| BgpRouteElem {
163            timestamp: self.timestamp,
164            elem_type: ElemType::WITHDRAW,
165            peer_ip: self.peer_ip,
166            peer_asn: self.peer_asn,
167            prefix,
168            as_path: None,
169        })
170    }
171}
172
173#[derive(Clone, Default)]
174struct RoutePeerTable {
175    peers: Arc<[Peer]>,
176}
177
178impl RoutePeerTable {
179    fn get_peer_by_id(&self, peer_index: u16) -> Option<Peer> {
180        self.peers.get(peer_index as usize).copied()
181    }
182}
183
184fn parse_route_peer_table(mut data: Bytes) -> Result<RoutePeerTable, ParserError> {
185    let _collector_bgp_id = data.read_u32()?;
186    let view_name_length = data.read_u16()? as usize;
187    data.has_n_remaining(view_name_length)?;
188    data.advance(view_name_length);
189
190    let peer_count = data.read_u16()? as usize;
191    let mut peers = Vec::with_capacity(peer_count);
192    for _ in 0..peer_count {
193        let peer_type = PeerType::from_bits_retain(data.read_u8()?);
194        let afi = if peer_type.contains(PeerType::ADDRESS_FAMILY_IPV6) {
195            Afi::Ipv6
196        } else {
197            Afi::Ipv4
198        };
199        let asn_len = if peer_type.contains(PeerType::AS_SIZE_32BIT) {
200            AsnLength::Bits32
201        } else {
202            AsnLength::Bits16
203        };
204
205        let peer_bgp_id = data.read_ipv4_address()?;
206        let peer_ip = data.read_address(&afi)?;
207        let peer_asn = data.read_asn(asn_len)?;
208        peers.push(Peer {
209            peer_type,
210            peer_bgp_id,
211            peer_ip,
212            peer_asn,
213        });
214    }
215
216    Ok(RoutePeerTable {
217        peers: Arc::from(peers),
218    })
219}
220
221#[derive(Default)]
222enum RouteRecordIter {
223    #[default]
224    Empty,
225    One(Option<BgpRouteElem>),
226    Update(RouteUpdateIter),
227    RibAfi(RouteRibAfiIter),
228}
229
230impl RouteRecordIter {
231    fn next_route(&mut self) -> Result<Option<BgpRouteElem>, ParserError> {
232        match self {
233            RouteRecordIter::Empty => Ok(None),
234            RouteRecordIter::One(route) => Ok(route.take()),
235            RouteRecordIter::Update(iter) => Ok(iter.next_route()),
236            RouteRecordIter::RibAfi(iter) => iter.next_route(),
237        }
238    }
239}
240
241struct RouteRibAfiIter {
242    data: Bytes,
243    peer_table: RoutePeerTable,
244    afi: Afi,
245    safi: Safi,
246    is_add_path: bool,
247    prefix: NetworkPrefix,
248    remaining_entries: u16,
249}
250
251impl RouteRibAfiIter {
252    fn next_route(&mut self) -> Result<Option<BgpRouteElem>, ParserError> {
253        while self.remaining_entries > 0 {
254            if self.data.remaining() < rib_entry_min_len(self.is_add_path) {
255                warn!("early break due to truncated msg while parsing RIB AFI entries");
256                self.remaining_entries = 0;
257                return Ok(None);
258            }
259
260            self.remaining_entries -= 1;
261            let peer_index = self.data.read_u16()?;
262            let originated_time = self.data.read_u32()? as f64;
263            let _path_id = if self.is_add_path {
264                Some(self.data.read_u32()?)
265            } else {
266                None
267            };
268            let attribute_length = self.data.read_u16()? as usize;
269            if self.data.remaining() < attribute_length {
270                warn!(
271                    "early break due to truncated attribute payload while parsing RIB AFI entries: expected {} bytes, have {} bytes available",
272                    attribute_length,
273                    self.data.remaining()
274                );
275                self.remaining_entries = 0;
276                return Ok(None);
277            }
278
279            let prefixes = [self.prefix];
280            let attrs = parse_route_attributes(
281                self.data.split_to(attribute_length),
282                &AsnLength::Bits32,
283                self.is_add_path,
284                RouteAttributeContext {
285                    afi: Some(self.afi),
286                    safi: Some(self.safi),
287                    prefixes: Some(&prefixes),
288                    is_announcement: Some(true),
289                    has_standard_nlri: self.afi == Afi::Ipv4,
290                },
291            )?;
292            let Some(peer) = self.peer_table.get_peer_by_id(peer_index) else {
293                error!("peer ID {} not found in peer_index table", peer_index);
294                continue;
295            };
296
297            return Ok(Some(BgpRouteElem {
298                timestamp: originated_time,
299                elem_type: ElemType::ANNOUNCE,
300                peer_ip: peer.peer_ip,
301                peer_asn: peer.peer_asn,
302                prefix: self.prefix,
303                as_path: attrs.as_path,
304            }));
305        }
306
307        Ok(None)
308    }
309}
310
311fn parse_bgp_update_routes(
312    mut input: Bytes,
313    add_path: bool,
314    asn_len: &AsnLength,
315    timestamp: f64,
316    peer_ip: IpAddr,
317    peer_asn: Asn,
318) -> Result<RouteUpdateIter, ParserError> {
319    let withdrawn_len = input.read_u16()? as usize;
320    input.has_n_remaining(withdrawn_len)?;
321    let withdrawn_prefixes = parse_nlri_list(input.split_to(withdrawn_len), add_path, &Afi::Ipv4)?;
322
323    let attribute_length = input.read_u16()? as usize;
324    input.has_n_remaining(attribute_length)?;
325    let attribute_bytes = input.split_to(attribute_length);
326    let announced_prefixes = parse_nlri_list(input, add_path, &Afi::Ipv4)?;
327    let attributes = parse_route_attributes(
328        attribute_bytes,
329        asn_len,
330        add_path,
331        RouteAttributeContext {
332            afi: None,
333            safi: None,
334            prefixes: None,
335            is_announcement: None,
336            has_standard_nlri: !announced_prefixes.is_empty(),
337        },
338    )?;
339
340    Ok(RouteUpdateIter {
341        timestamp,
342        peer_ip,
343        peer_asn,
344        as_path: attributes.as_path,
345        announced: announced_prefixes.into_iter().chain(attributes.announced),
346        withdrawn: withdrawn_prefixes.into_iter().chain(attributes.withdrawn),
347        in_withdrawn_phase: false,
348    })
349}
350
351fn parse_bgp_message_routes(
352    mut data: Bytes,
353    add_path: bool,
354    asn_len: &AsnLength,
355    timestamp: f64,
356    peer_ip: IpAddr,
357    peer_asn: Asn,
358) -> Result<RouteRecordIter, ParserError> {
359    let total_size = data.len();
360    data.has_n_remaining(19)?;
361    read_and_validate_bgp_marker(&mut data)?;
362    let length = data.read_u16()?;
363    if !(19..=65_535).contains(&length) {
364        return Err(ParserError::ParseError(format!(
365            "invalid BGP message length {length}"
366        )));
367    }
368
369    let bgp_msg_length = if length as usize > total_size {
370        total_size - 19
371    } else {
372        length as usize - 19
373    };
374    let msg_type = BgpMessageType::try_from(data.read_u8()?)
375        .map_err(|_| ParserError::ParseError("Unknown BGP Message Type".to_string()))?;
376
377    if matches!(msg_type, BgpMessageType::OPEN | BgpMessageType::KEEPALIVE) && length > 4096 {
378        return Err(ParserError::ParseError(format!(
379            "BGP {msg_type:?} message length {length} exceeds maximum allowed 4096 bytes (RFC 8654)"
380        )));
381    }
382
383    if data.remaining() != bgp_msg_length {
384        warn!(
385            "BGP message length {} does not match the actual length {} (parsing BGP message)",
386            bgp_msg_length,
387            data.remaining()
388        );
389    }
390    data.has_n_remaining(bgp_msg_length)?;
391    let msg_data = data.split_to(bgp_msg_length);
392
393    match msg_type {
394        BgpMessageType::UPDATE => Ok(RouteRecordIter::Update(parse_bgp_update_routes(
395            msg_data, add_path, asn_len, timestamp, peer_ip, peer_asn,
396        )?)),
397        BgpMessageType::OPEN | BgpMessageType::NOTIFICATION | BgpMessageType::KEEPALIVE => {
398            Ok(RouteRecordIter::Empty)
399        }
400    }
401}
402
403fn bgp4mp_asn_len_and_add_path(msg_type: Bgp4MpType) -> Option<(AsnLength, bool)> {
404    match msg_type {
405        Bgp4MpType::Message | Bgp4MpType::MessageLocal => Some((AsnLength::Bits16, false)),
406        Bgp4MpType::MessageAs4 | Bgp4MpType::MessageAs4Local => Some((AsnLength::Bits32, false)),
407        Bgp4MpType::MessageAddpath | Bgp4MpType::MessageLocalAddpath => {
408            Some((AsnLength::Bits16, true))
409        }
410        Bgp4MpType::MessageAs4Addpath | Bgp4MpType::MessageLocalAs4Addpath => {
411            Some((AsnLength::Bits32, true))
412        }
413        Bgp4MpType::StateChange | Bgp4MpType::StateChangeAs4 => None,
414    }
415}
416
417fn parse_bgp4mp_routes(
418    sub_type: u16,
419    mut data: Bytes,
420    timestamp: f64,
421) -> Result<RouteRecordIter, ParserError> {
422    let msg_type = Bgp4MpType::try_from(sub_type)?;
423    let Some((asn_len, add_path)) = bgp4mp_asn_len_and_add_path(msg_type) else {
424        return Ok(RouteRecordIter::Empty);
425    };
426
427    let total_size = data.len();
428    let peer_asn = data.read_asn(asn_len)?;
429    let _local_asn = data.read_asn(asn_len)?;
430    let _interface_index = data.read_u16()?;
431    let afi = data.read_afi()?;
432    let peer_ip = data.read_address(&afi)?;
433    let _local_ip = data.read_address(&afi)?;
434
435    let should_read = bgp4mp_message_payload_len(&afi, &asn_len, total_size);
436    if should_read != data.remaining() {
437        return Err(ParserError::TruncatedMsg(format!(
438            "truncated bgp4mp message: should read {} bytes, have {} bytes available",
439            should_read,
440            data.remaining()
441        )));
442    }
443
444    parse_bgp_message_routes(data, add_path, &asn_len, timestamp, peer_ip, peer_asn)
445}
446
447fn table_dump_v2_afi_safi(rib_type: TableDumpV2Type) -> Result<(Afi, Safi), ParserError> {
448    match rib_type {
449        TableDumpV2Type::RibIpv4Unicast | TableDumpV2Type::RibIpv4UnicastAddPath => {
450            Ok((Afi::Ipv4, Safi::Unicast))
451        }
452        TableDumpV2Type::RibIpv4Multicast | TableDumpV2Type::RibIpv4MulticastAddPath => {
453            Ok((Afi::Ipv4, Safi::Multicast))
454        }
455        TableDumpV2Type::RibIpv6Unicast | TableDumpV2Type::RibIpv6UnicastAddPath => {
456            Ok((Afi::Ipv6, Safi::Unicast))
457        }
458        TableDumpV2Type::RibIpv6Multicast | TableDumpV2Type::RibIpv6MulticastAddPath => {
459            Ok((Afi::Ipv6, Safi::Multicast))
460        }
461        _ => Err(ParserError::ParseError(format!(
462            "wrong RIB type for parsing: {rib_type:?}"
463        ))),
464    }
465}
466
467fn is_add_path_rib_type(rib_type: TableDumpV2Type) -> bool {
468    matches!(
469        rib_type,
470        TableDumpV2Type::RibIpv4UnicastAddPath
471            | TableDumpV2Type::RibIpv4MulticastAddPath
472            | TableDumpV2Type::RibIpv6UnicastAddPath
473            | TableDumpV2Type::RibIpv6MulticastAddPath
474    )
475}
476
477fn parse_table_dump_routes(sub_type: u16, mut data: Bytes) -> Result<RouteRecordIter, ParserError> {
478    let afi = match sub_type {
479        1 => Afi::Ipv4,
480        2 => Afi::Ipv6,
481        _ => {
482            return Err(ParserError::ParseError(format!(
483                "Invalid subtype found for TABLE_DUMP (V1) message: {sub_type}"
484            )))
485        }
486    };
487
488    let _view_number = data.read_u16()?;
489    let _sequence_number = data.read_u16()?;
490    let prefix = match &afi {
491        Afi::Ipv4 => data.read_ipv4_prefix().map(IpNet::V4),
492        Afi::Ipv6 => data.read_ipv6_prefix().map(IpNet::V6),
493        Afi::LinkState => unreachable!(),
494    }?;
495    let _status = data.read_u8()?;
496    let originated_time = data.read_u32()? as f64;
497    let peer_ip = data.read_address(&afi)?;
498    let peer_asn = Asn::new_16bit(data.read_u16()?);
499    let attribute_length = data.read_u16()? as usize;
500    data.has_n_remaining(attribute_length)?;
501    let attrs = parse_route_attributes(
502        data.split_to(attribute_length),
503        &AsnLength::Bits16,
504        false,
505        RouteAttributeContext {
506            afi: None,
507            safi: None,
508            prefixes: None,
509            is_announcement: Some(true),
510            has_standard_nlri: afi == Afi::Ipv4,
511        },
512    )?;
513
514    Ok(RouteRecordIter::One(Some(BgpRouteElem {
515        timestamp: originated_time,
516        elem_type: ElemType::ANNOUNCE,
517        peer_ip,
518        peer_asn,
519        prefix: NetworkPrefix::new(prefix, None),
520        as_path: attrs.as_path,
521    })))
522}
523
524fn parse_table_dump_v2_routes(
525    sub_type: u16,
526    mut data: Bytes,
527    peer_table: &mut Option<RoutePeerTable>,
528) -> Result<RouteRecordIter, ParserError> {
529    let v2_type = TableDumpV2Type::try_from(sub_type)?;
530    match v2_type {
531        TableDumpV2Type::PeerIndexTable => {
532            *peer_table = Some(parse_route_peer_table(data)?);
533            Ok(RouteRecordIter::Empty)
534        }
535        TableDumpV2Type::GeoPeerTable => Ok(RouteRecordIter::Empty),
536        TableDumpV2Type::RibGeneric | TableDumpV2Type::RibGenericAddPath => Err(
537            ParserError::Unsupported("TableDumpV2 RibGeneric is not currently supported".into()),
538        ),
539        rib_type => {
540            let (afi, safi) = table_dump_v2_afi_safi(rib_type)?;
541            let is_add_path = is_add_path_rib_type(rib_type);
542            let _sequence_number = data.read_u32()?;
543            let prefix = data.read_nlri_prefix(&afi, false)?;
544            let entry_count = data.read_u16()?;
545            let Some(peer_table) = peer_table.clone() else {
546                return Err(ParserError::ParseError(
547                    "peer table not set for TableDumpV2 RIB entries".to_string(),
548                ));
549            };
550
551            Ok(RouteRecordIter::RibAfi(RouteRibAfiIter {
552                data,
553                peer_table,
554                afi,
555                safi,
556                is_add_path,
557                prefix,
558                remaining_entries: entry_count,
559            }))
560        }
561    }
562}
563
564fn parse_raw_record_route_iter(
565    raw_record: crate::RawMrtRecord,
566    peer_table: &mut Option<RoutePeerTable>,
567) -> Result<RouteRecordIter, ParserError> {
568    let timestamp = record_timestamp(&raw_record.common_header);
569    match raw_record.common_header.entry_type {
570        EntryType::TABLE_DUMP => parse_table_dump_routes(
571            raw_record.common_header.entry_subtype,
572            raw_record.message_bytes,
573        ),
574        EntryType::TABLE_DUMP_V2 => parse_table_dump_v2_routes(
575            raw_record.common_header.entry_subtype,
576            raw_record.message_bytes,
577            peer_table,
578        ),
579        EntryType::BGP4MP | EntryType::BGP4MP_ET => parse_bgp4mp_routes(
580            raw_record.common_header.entry_subtype,
581            raw_record.message_bytes,
582            timestamp,
583        ),
584        v => Err(ParserError::Unsupported(format!(
585            "unsupported MRT type: {v:?}"
586        ))),
587    }
588}
589
590pub struct RouteIterator<R> {
591    parser: BgpkitParser<R>,
592    pending_routes: RouteRecordIter,
593    peer_table: Option<RoutePeerTable>,
594}
595
596impl<R> RouteIterator<R> {
597    pub(crate) fn new(parser: BgpkitParser<R>) -> Self {
598        Self {
599            parser,
600            pending_routes: RouteRecordIter::Empty,
601            peer_table: None,
602        }
603    }
604}
605
606impl<R: Read> Iterator for RouteIterator<R> {
607    type Item = BgpRouteElem;
608
609    fn next(&mut self) -> Option<Self::Item> {
610        loop {
611            match self.pending_routes.next_route() {
612                Ok(Some(route)) => {
613                    if route.match_filters(&self.parser.filters) {
614                        return Some(route);
615                    }
616                    continue;
617                }
618                Ok(None) => {}
619                Err(err) => {
620                    error!("parser error: {}", err);
621                    self.pending_routes = RouteRecordIter::Empty;
622                    if self.parser.core_dump {
623                        return None;
624                    }
625                    continue;
626                }
627            }
628
629            let raw_record = match chunk_mrt_record(&mut self.parser.reader) {
630                Ok(raw_record) => raw_record,
631                Err(e) => match e.error {
632                    ParserError::TruncatedMsg(err_str) | ParserError::Unsupported(err_str) => {
633                        if self.parser.options.show_warnings {
634                            warn!("parser warn: {}", err_str);
635                        }
636                        write_mrt_core_dump(self.parser.core_dump, e.bytes);
637                        continue;
638                    }
639                    ParserError::ParseError(err_str) => {
640                        error!("parser error: {}", err_str);
641                        if self.parser.core_dump {
642                            write_mrt_core_dump(true, e.bytes);
643                            return None;
644                        }
645                        continue;
646                    }
647                    ParserError::EofExpected => return None,
648                    ParserError::IoError(err) | ParserError::EofError(err) => {
649                        error!("{:?}", err);
650                        write_mrt_core_dump(self.parser.core_dump, e.bytes);
651                        return None;
652                    }
653                    #[cfg(feature = "oneio")]
654                    ParserError::OneIoError(_) => return None,
655                    ParserError::FilterError(_) => return None,
656                    ParserError::InvalidLabeledNlriLength
657                    | ParserError::TruncatedLabeledNlri
658                    | ParserError::TruncatedPrefix
659                    | ParserError::MaxLabelStackDepthExceeded
660                    | ParserError::PeerMaxLabelsExceeded
661                    | ParserError::InvalidPrefix => {
662                        if self.parser.options.show_warnings {
663                            warn!("parser warn: labeled NLRI parsing error: {:?}", e.error);
664                        }
665                        continue;
666                    }
667                },
668            };
669
670            match parse_raw_record_route_iter(raw_record, &mut self.peer_table) {
671                Ok(routes) => {
672                    self.pending_routes = routes;
673                }
674                Err(err) => {
675                    error!("parser error: {}", err);
676                    if self.parser.core_dump {
677                        return None;
678                    }
679                    continue;
680                }
681            }
682        }
683    }
684}
685
686pub struct FallibleRouteIterator<R> {
687    parser: BgpkitParser<R>,
688    pending_routes: RouteRecordIter,
689    peer_table: Option<RoutePeerTable>,
690}
691
692impl<R> FallibleRouteIterator<R> {
693    pub(crate) fn new(parser: BgpkitParser<R>) -> Self {
694        Self {
695            parser,
696            pending_routes: RouteRecordIter::Empty,
697            peer_table: None,
698        }
699    }
700}
701
702impl<R: Read> Iterator for FallibleRouteIterator<R> {
703    type Item = Result<BgpRouteElem, ParserErrorWithBytes>;
704
705    fn next(&mut self) -> Option<Self::Item> {
706        loop {
707            match self.pending_routes.next_route() {
708                Ok(Some(route)) => {
709                    if route.match_filters(&self.parser.filters) {
710                        return Some(Ok(route));
711                    }
712                    continue;
713                }
714                Ok(None) => {}
715                Err(error) => {
716                    self.pending_routes = RouteRecordIter::Empty;
717                    return Some(Err(ParserErrorWithBytes { error, bytes: None }));
718                }
719            }
720
721            let raw_record = match chunk_mrt_record(&mut self.parser.reader) {
722                Ok(raw_record) => raw_record,
723                Err(e) if matches!(e.error, ParserError::EofExpected) => return None,
724                Err(e) => return Some(Err(e)),
725            };
726
727            match parse_raw_record_route_iter(raw_record, &mut self.peer_table) {
728                Ok(routes) => {
729                    self.pending_routes = routes;
730                }
731                Err(error) => return Some(Err(ParserErrorWithBytes { error, bytes: None })),
732            }
733        }
734    }
735}
736
737#[cfg(test)]
738mod tests {
739    use super::*;
740    use crate::parser::iters::write_mrt_core_dump_to_path;
741    use bytes::{BufMut, BytesMut};
742    use std::io::Cursor;
743    use std::net::{Ipv4Addr, Ipv6Addr};
744    use std::str::FromStr;
745
746    fn route_projection(elem: BgpElem) -> BgpRouteElem {
747        BgpRouteElem {
748            timestamp: elem.timestamp,
749            elem_type: elem.elem_type,
750            peer_ip: elem.peer_ip,
751            peer_asn: elem.peer_asn,
752            prefix: elem.prefix,
753            as_path: elem.as_path.map(Arc::new),
754        }
755    }
756
757    fn collect_route_record_iter(
758        mut iter: RouteRecordIter,
759    ) -> Result<Vec<BgpRouteElem>, ParserError> {
760        let mut routes = Vec::new();
761        while let Some(route) = iter.next_route()? {
762            routes.push(route);
763        }
764        Ok(routes)
765    }
766
767    fn route_peer_table_from_peer_index(peer_table: PeerIndexTable) -> RoutePeerTable {
768        let mut peer_ids = peer_table.id_peer_map.keys().copied().collect::<Vec<_>>();
769        peer_ids.sort_unstable();
770        let peers = peer_ids
771            .into_iter()
772            .map(|peer_id| peer_table.id_peer_map[&peer_id])
773            .collect::<Vec<_>>();
774
775        RoutePeerTable {
776            peers: Arc::from(peers),
777        }
778    }
779
780    fn update_record() -> MrtRecord {
781        let mut attributes = Attributes::default();
782        attributes.add_attr(AttributeValue::Origin(Origin::IGP).into());
783        attributes.add_attr(
784            AttributeValue::AsPath {
785                path: AsPath::from_sequence([64500, 64501]),
786                is_as4: false,
787            }
788            .into(),
789        );
790        attributes
791            .add_attr(AttributeValue::NextHop(IpAddr::from_str("192.0.2.254").unwrap()).into());
792
793        MrtRecord {
794            common_header: CommonHeader {
795                timestamp: 1_700_000_000,
796                microsecond_timestamp: None,
797                entry_type: EntryType::BGP4MP,
798                entry_subtype: Bgp4MpType::MessageAs4 as u16,
799                length: 0,
800            },
801            message: MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(Bgp4MpMessage {
802                msg_type: Bgp4MpType::MessageAs4,
803                peer_asn: Asn::new_32bit(64496),
804                local_asn: Asn::new_32bit(64497),
805                interface_index: 0,
806                peer_ip: IpAddr::from_str("192.0.2.1").unwrap(),
807                local_ip: IpAddr::from_str("192.0.2.2").unwrap(),
808                bgp_message: BgpMessage::Update(BgpUpdateMessage {
809                    withdrawn_prefixes: vec![NetworkPrefix::from_str("198.51.100.0/24").unwrap()],
810                    attributes,
811                    announced_prefixes: vec![NetworkPrefix::from_str("203.0.113.0/24").unwrap()],
812                }),
813            })),
814        }
815    }
816
817    fn route_attributes(as_path: impl AsRef<[u32]>) -> Attributes {
818        let mut attributes = Attributes::default();
819        attributes.add_attr(AttributeValue::Origin(Origin::IGP).into());
820        attributes.add_attr(
821            AttributeValue::AsPath {
822                path: AsPath::from_sequence(as_path),
823                is_as4: false,
824            }
825            .into(),
826        );
827        attributes
828            .add_attr(AttributeValue::NextHop(IpAddr::from_str("192.0.2.254").unwrap()).into());
829        attributes
830    }
831
832    fn bgp4mp_record(msg_type: Bgp4MpType, bgp_message: BgpMessage) -> MrtRecord {
833        let asn = if matches!(
834            msg_type,
835            Bgp4MpType::Message
836                | Bgp4MpType::MessageLocal
837                | Bgp4MpType::MessageAddpath
838                | Bgp4MpType::MessageLocalAddpath
839        ) {
840            Asn::new_16bit(64496)
841        } else {
842            Asn::new_32bit(64496)
843        };
844
845        MrtRecord {
846            common_header: CommonHeader {
847                timestamp: 1_700_000_000,
848                microsecond_timestamp: None,
849                entry_type: EntryType::BGP4MP,
850                entry_subtype: msg_type as u16,
851                length: 0,
852            },
853            message: MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(Bgp4MpMessage {
854                msg_type,
855                peer_asn: asn,
856                local_asn: Asn::new_32bit(64497),
857                interface_index: 0,
858                peer_ip: IpAddr::from_str("192.0.2.1").unwrap(),
859                local_ip: IpAddr::from_str("192.0.2.2").unwrap(),
860                bgp_message,
861            })),
862        }
863    }
864
865    fn open_message() -> BgpMessage {
866        BgpMessage::Open(BgpOpenMessage {
867            version: 4,
868            asn: Asn::new_16bit(64496),
869            hold_time: 180,
870            bgp_identifier: Ipv4Addr::new(192, 0, 2, 1),
871            extended_length: false,
872            opt_params: vec![],
873        })
874    }
875
876    fn raw_bgp_message(length: u16, msg_type: BgpMessageType, payload: &[u8]) -> Bytes {
877        raw_bgp_message_with_marker([0xff; 16], length, msg_type, payload)
878    }
879
880    fn raw_bgp_message_with_marker(
881        marker: [u8; 16],
882        length: u16,
883        msg_type: BgpMessageType,
884        payload: &[u8],
885    ) -> Bytes {
886        let mut bytes = BytesMut::new();
887        bytes.put_slice(&marker);
888        bytes.put_u16(length);
889        bytes.put_u8(msg_type as u8);
890        bytes.put_slice(payload);
891        bytes.freeze()
892    }
893
894    fn table_dump_record() -> MrtRecord {
895        let mut attributes = Attributes::default();
896        attributes.add_attr(AttributeValue::Origin(Origin::IGP).into());
897        attributes.add_attr(
898            AttributeValue::AsPath {
899                path: AsPath::from_sequence([64500, 64501]),
900                is_as4: false,
901            }
902            .into(),
903        );
904        attributes
905            .add_attr(AttributeValue::NextHop(IpAddr::from_str("192.0.2.254").unwrap()).into());
906
907        MrtRecord {
908            common_header: CommonHeader {
909                timestamp: 1_700_000_000,
910                microsecond_timestamp: None,
911                entry_type: EntryType::TABLE_DUMP,
912                entry_subtype: 1,
913                length: 0,
914            },
915            message: MrtMessage::TableDumpMessage(TableDumpMessage {
916                view_number: 0,
917                sequence_number: 1,
918                prefix: NetworkPrefix::from_str("203.0.113.0/24").unwrap(),
919                status: 1,
920                originated_time: 1_699_999_998,
921                peer_ip: IpAddr::from_str("192.0.2.20").unwrap(),
922                peer_asn: Asn::new_16bit(64496),
923                attributes,
924            }),
925        }
926    }
927
928    fn table_dump_ipv6_record() -> MrtRecord {
929        let mut attributes = Attributes::default();
930        attributes.add_attr(AttributeValue::Origin(Origin::IGP).into());
931        attributes.add_attr(
932            AttributeValue::AsPath {
933                path: AsPath::from_sequence([64500, 64501]),
934                is_as4: false,
935            }
936            .into(),
937        );
938
939        MrtRecord {
940            common_header: CommonHeader {
941                timestamp: 1_700_000_000,
942                microsecond_timestamp: None,
943                entry_type: EntryType::TABLE_DUMP,
944                entry_subtype: 2,
945                length: 0,
946            },
947            message: MrtMessage::TableDumpMessage(TableDumpMessage {
948                view_number: 0,
949                sequence_number: 1,
950                prefix: NetworkPrefix::from_str("2001:db8::/32").unwrap(),
951                status: 1,
952                originated_time: 1_699_999_998,
953                peer_ip: IpAddr::from_str("2001:db8::20").unwrap(),
954                peer_asn: Asn::new_16bit(64496),
955                attributes,
956            }),
957        }
958    }
959
960    fn table_dump_v2_records_bytes() -> Vec<u8> {
961        let peer = Peer::new(
962            "192.0.2.10".parse().unwrap(),
963            "192.0.2.11".parse().unwrap(),
964            Asn::new_32bit(64496),
965        );
966        let mut peer_table = PeerIndexTable::default();
967        let peer_index = peer_table.add_peer(peer);
968
969        let mut attributes = Attributes::default();
970        attributes.add_attr(AttributeValue::Origin(Origin::IGP).into());
971        attributes.add_attr(
972            AttributeValue::AsPath {
973                path: AsPath::from_sequence([64500, 64501]),
974                is_as4: false,
975            }
976            .into(),
977        );
978        attributes
979            .add_attr(AttributeValue::NextHop(IpAddr::from_str("192.0.2.254").unwrap()).into());
980
981        let pit_record = MrtRecord {
982            common_header: CommonHeader {
983                timestamp: 1_700_000_000,
984                microsecond_timestamp: None,
985                entry_type: EntryType::TABLE_DUMP_V2,
986                entry_subtype: TableDumpV2Type::PeerIndexTable as u16,
987                length: 0,
988            },
989            message: MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(peer_table)),
990        };
991        let rib_record = MrtRecord {
992            common_header: CommonHeader {
993                timestamp: 1_700_000_001,
994                microsecond_timestamp: None,
995                entry_type: EntryType::TABLE_DUMP_V2,
996                entry_subtype: TableDumpV2Type::RibIpv4Unicast as u16,
997                length: 0,
998            },
999            message: MrtMessage::TableDumpV2Message(TableDumpV2Message::RibAfi(RibAfiEntries {
1000                rib_type: TableDumpV2Type::RibIpv4Unicast,
1001                sequence_number: 1,
1002                prefix: NetworkPrefix::from_str("203.0.113.0/24").unwrap(),
1003                rib_entries: vec![RibEntry {
1004                    peer_index,
1005                    originated_time: 1_699_999_999,
1006                    path_id: None,
1007                    attributes,
1008                }],
1009            })),
1010        };
1011
1012        let mut bytes = pit_record.encode().to_vec();
1013        bytes.extend_from_slice(&rib_record.encode());
1014        bytes
1015    }
1016
1017    fn table_dump_v2_truncated_attribute_payload() -> (Vec<u8>, Bytes, PeerIndexTable) {
1018        let peer = Peer::new(
1019            "192.0.2.10".parse().unwrap(),
1020            "192.0.2.11".parse().unwrap(),
1021            Asn::new_32bit(64496),
1022        );
1023        let mut peer_table = PeerIndexTable::default();
1024        let peer_index = peer_table.add_peer(peer);
1025
1026        let pit_record = MrtRecord {
1027            common_header: CommonHeader {
1028                timestamp: 1_700_000_000,
1029                microsecond_timestamp: None,
1030                entry_type: EntryType::TABLE_DUMP_V2,
1031                entry_subtype: TableDumpV2Type::PeerIndexTable as u16,
1032                length: 0,
1033            },
1034            message: MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(
1035                peer_table.clone(),
1036            )),
1037        };
1038
1039        let first_entry = RibEntry {
1040            peer_index,
1041            originated_time: 1_699_999_999,
1042            path_id: None,
1043            attributes: route_attributes([64500, 64501]),
1044        };
1045
1046        let mut rib_body = BytesMut::new();
1047        rib_body.put_u32(1);
1048        rib_body.extend(NetworkPrefix::from_str("203.0.113.0/24").unwrap().encode());
1049        rib_body.put_u16(2);
1050        rib_body.extend(first_entry.encode());
1051        rib_body.put_u16(peer_index);
1052        rib_body.put_u32(1_699_999_998);
1053        rib_body.put_u16(32);
1054        rib_body.put_u8(0);
1055
1056        let rib_body = rib_body.freeze();
1057        let rib_header = CommonHeader {
1058            timestamp: 1_700_000_001,
1059            microsecond_timestamp: None,
1060            entry_type: EntryType::TABLE_DUMP_V2,
1061            entry_subtype: TableDumpV2Type::RibIpv4Unicast as u16,
1062            length: rib_body.len() as u32,
1063        };
1064
1065        let mut bytes = pit_record.encode().to_vec();
1066        bytes.extend_from_slice(&rib_header.encode());
1067        bytes.extend_from_slice(&rib_body);
1068
1069        (bytes, rib_body, peer_table)
1070    }
1071
1072    fn assert_filtered_route_projection(bytes: Vec<u8>, filters: &[(&str, &str)]) {
1073        let elem_parser = filters.iter().fold(
1074            BgpkitParser::from_reader(Cursor::new(bytes.clone())),
1075            |parser, (filter_type, filter_value)| {
1076                parser.add_filter(filter_type, filter_value).unwrap()
1077            },
1078        );
1079        let route_parser = filters.iter().fold(
1080            BgpkitParser::from_reader(Cursor::new(bytes)),
1081            |parser, (filter_type, filter_value)| {
1082                parser.add_filter(filter_type, filter_value).unwrap()
1083            },
1084        );
1085
1086        let elem_projection = elem_parser
1087            .into_elem_iter()
1088            .map(route_projection)
1089            .collect::<Vec<_>>();
1090        let routes = route_parser.into_route_iter().collect::<Vec<_>>();
1091
1092        assert_eq!(routes, elem_projection, "filters: {filters:?}");
1093    }
1094
1095    fn assert_route_projection(bytes: Vec<u8>) -> Vec<BgpRouteElem> {
1096        let elem_projection = BgpkitParser::from_reader(Cursor::new(bytes.clone()))
1097            .into_elem_iter()
1098            .map(route_projection)
1099            .collect::<Vec<_>>();
1100        let routes = BgpkitParser::from_reader(Cursor::new(bytes))
1101            .into_route_iter()
1102            .collect::<Vec<_>>();
1103
1104        assert_eq!(routes, elem_projection);
1105        routes
1106    }
1107
1108    #[test]
1109    fn route_iterator_matches_elem_projection_for_update() {
1110        let bytes = update_record().encode().to_vec();
1111        let routes = assert_route_projection(bytes);
1112        assert_eq!(routes.len(), 2);
1113        assert_eq!(routes[0].elem_type, ElemType::ANNOUNCE);
1114        assert_eq!(routes[1].elem_type, ElemType::WITHDRAW);
1115        assert!(routes[1].as_path.is_none());
1116    }
1117
1118    #[test]
1119    fn route_iterator_shares_as_path_for_update_announcements() {
1120        let bytes = bgp4mp_record(
1121            Bgp4MpType::MessageAs4,
1122            BgpMessage::Update(BgpUpdateMessage {
1123                withdrawn_prefixes: vec![],
1124                attributes: route_attributes([64500, 64501]),
1125                announced_prefixes: vec![
1126                    NetworkPrefix::from_str("203.0.113.0/24").unwrap(),
1127                    NetworkPrefix::from_str("198.51.100.0/24").unwrap(),
1128                ],
1129            }),
1130        )
1131        .encode()
1132        .to_vec();
1133
1134        let routes = BgpkitParser::from_reader(Cursor::new(bytes))
1135            .into_route_iter()
1136            .collect::<Vec<_>>();
1137
1138        assert_eq!(routes.len(), 2);
1139        assert!(Arc::ptr_eq(
1140            routes[0].as_path.as_ref().unwrap(),
1141            routes[1].as_path.as_ref().unwrap()
1142        ));
1143    }
1144
1145    #[test]
1146    fn route_iterator_uses_microsecond_timestamps() {
1147        let timestamp = record_timestamp(&CommonHeader {
1148            timestamp: 1_700_000_000,
1149            microsecond_timestamp: Some(123_456),
1150            entry_type: EntryType::BGP4MP_ET,
1151            entry_subtype: Bgp4MpType::MessageAs4 as u16,
1152            length: 0,
1153        });
1154
1155        assert_eq!(timestamp, 1_700_000_000.123_456);
1156    }
1157
1158    #[test]
1159    fn route_iterator_matches_elem_projection_for_mp_update() {
1160        let mut attributes = route_attributes([64500, 64501]);
1161        attributes.add_attr(
1162            AttributeValue::MpReachNlri(Nlri::new_reachable(
1163                NetworkPrefix::from_str("2001:db8::/32").unwrap(),
1164                Some(IpAddr::from_str("2001:db8::1").unwrap()),
1165            ))
1166            .into(),
1167        );
1168        attributes.add_attr(
1169            AttributeValue::MpUnreachNlri(Nlri::new_unreachable(
1170                NetworkPrefix::from_str("2001:db8:1::/48").unwrap(),
1171            ))
1172            .into(),
1173        );
1174
1175        let bytes = bgp4mp_record(
1176            Bgp4MpType::MessageAs4,
1177            BgpMessage::Update(BgpUpdateMessage {
1178                withdrawn_prefixes: vec![],
1179                attributes,
1180                announced_prefixes: vec![],
1181            }),
1182        )
1183        .encode()
1184        .to_vec();
1185
1186        let routes = assert_route_projection(bytes);
1187        assert_eq!(routes.len(), 2);
1188        assert_eq!(routes[0].elem_type, ElemType::ANNOUNCE);
1189        assert_eq!(
1190            routes[0].prefix,
1191            NetworkPrefix::from_str("2001:db8::/32").unwrap()
1192        );
1193        assert_eq!(routes[1].elem_type, ElemType::WITHDRAW);
1194        assert_eq!(
1195            routes[1].prefix,
1196            NetworkPrefix::from_str("2001:db8:1::/48").unwrap()
1197        );
1198    }
1199
1200    #[test]
1201    fn route_iterator_matches_elem_projection_for_non_update_bgp4mp_messages() {
1202        let records = [
1203            bgp4mp_record(Bgp4MpType::Message, open_message()),
1204            bgp4mp_record(
1205                Bgp4MpType::MessageAs4,
1206                BgpMessage::Notification(BgpNotificationMessage {
1207                    error: BgpError::Unknown(1, 0),
1208                    data: vec![],
1209                }),
1210            ),
1211            bgp4mp_record(Bgp4MpType::MessageAddpath, BgpMessage::KeepAlive),
1212            bgp4mp_record(Bgp4MpType::MessageAs4Addpath, BgpMessage::KeepAlive),
1213        ];
1214        let mut bytes = Vec::new();
1215        for record in records {
1216            bytes.extend_from_slice(&record.encode());
1217        }
1218
1219        assert!(assert_route_projection(bytes).is_empty());
1220    }
1221
1222    #[test]
1223    fn route_iterator_matches_elem_projection_for_bgp4mp_16bit_update() {
1224        let bytes = bgp4mp_record(
1225            Bgp4MpType::Message,
1226            BgpMessage::Update(BgpUpdateMessage {
1227                withdrawn_prefixes: vec![],
1228                attributes: route_attributes([64500, 64501]),
1229                announced_prefixes: vec![NetworkPrefix::from_str("203.0.113.0/24").unwrap()],
1230            }),
1231        )
1232        .encode()
1233        .to_vec();
1234
1235        let routes = assert_route_projection(bytes);
1236        assert_eq!(routes.len(), 1);
1237        assert_eq!(routes[0].peer_asn, Asn::new_16bit(64496));
1238    }
1239
1240    #[test]
1241    fn route_iterator_filters_match_elem_projection_for_update() {
1242        let bytes = update_record().encode().to_vec();
1243        let cases: &[&[(&str, &str)]] = &[
1244            &[("peer_ip", "192.0.2.1")],
1245            &[("peer_ip", "192.0.2.99")],
1246            &[("peer_asn", "64496")],
1247            &[("type", "a")],
1248            &[("type", "w")],
1249            &[("type", "!w")],
1250            &[("prefix", "203.0.113.0/24")],
1251            &[("prefix", "198.51.100.0/24")],
1252            &[("prefix_super", "203.0.113.128/25")],
1253            &[("origin_asn", "64501")],
1254            &[("origin_asns", "64496,64501")],
1255            &[("as_path", "64500 64501$")],
1256            &[("ip_version", "4")],
1257            &[("ts_start", "1700000000"), ("ts_end", "1700000000")],
1258            &[("peer_ip", "192.0.2.1"), ("type", "a")],
1259        ];
1260
1261        for filters in cases {
1262            assert_filtered_route_projection(bytes.clone(), filters);
1263        }
1264    }
1265
1266    #[test]
1267    fn selective_attribute_parser_merges_as4_path() {
1268        let mut attributes = Attributes::default();
1269        attributes.add_attr(
1270            AttributeValue::AsPath {
1271                path: AsPath::from_sequence([23456, 64497]),
1272                is_as4: false,
1273            }
1274            .into(),
1275        );
1276        attributes.add_attr(
1277            AttributeValue::AsPath {
1278                path: AsPath::from_sequence([65536, 64497]),
1279                is_as4: true,
1280            }
1281            .into(),
1282        );
1283
1284        let attrs = parse_route_attributes(
1285            attributes.encode(AsnLength::Bits16),
1286            &AsnLength::Bits16,
1287            false,
1288            RouteAttributeContext {
1289                afi: None,
1290                safi: None,
1291                prefixes: None,
1292                is_announcement: Some(true),
1293                has_standard_nlri: true,
1294            },
1295        )
1296        .unwrap();
1297
1298        assert_eq!(
1299            attrs.as_path.unwrap().to_u32_vec_opt(false).unwrap(),
1300            vec![65536, 64497]
1301        );
1302    }
1303
1304    #[test]
1305    fn selective_attribute_parser_handles_as_path_without_as4_path() {
1306        let attrs = parse_route_attributes(
1307            route_attributes([64500, 64501]).encode(AsnLength::Bits16),
1308            &AsnLength::Bits16,
1309            false,
1310            RouteAttributeContext {
1311                afi: None,
1312                safi: None,
1313                prefixes: None,
1314                is_announcement: Some(true),
1315                has_standard_nlri: true,
1316            },
1317        )
1318        .unwrap();
1319
1320        assert_eq!(
1321            attrs.as_path.unwrap().to_u32_vec_opt(false).unwrap(),
1322            vec![64500, 64501]
1323        );
1324    }
1325
1326    #[test]
1327    fn selective_attribute_parser_handles_as4_path_without_as_path() {
1328        let mut attributes = Attributes::default();
1329        attributes.add_attr(
1330            AttributeValue::AsPath {
1331                path: AsPath::from_sequence([65536, 64497]),
1332                is_as4: true,
1333            }
1334            .into(),
1335        );
1336
1337        let attrs = parse_route_attributes(
1338            attributes.encode(AsnLength::Bits16),
1339            &AsnLength::Bits16,
1340            false,
1341            RouteAttributeContext {
1342                afi: None,
1343                safi: None,
1344                prefixes: None,
1345                is_announcement: Some(false),
1346                has_standard_nlri: false,
1347            },
1348        )
1349        .unwrap();
1350
1351        assert_eq!(
1352            attrs.as_path.unwrap().to_u32_vec_opt(false).unwrap(),
1353            vec![65536, 64497]
1354        );
1355    }
1356
1357    #[test]
1358    fn selective_attribute_parser_handles_no_as_path() {
1359        let attrs = parse_route_attributes(
1360            Bytes::new(),
1361            &AsnLength::Bits16,
1362            false,
1363            RouteAttributeContext {
1364                afi: None,
1365                safi: None,
1366                prefixes: None,
1367                is_announcement: Some(false),
1368                has_standard_nlri: false,
1369            },
1370        )
1371        .unwrap();
1372
1373        assert!(attrs.as_path.is_none());
1374    }
1375
1376    #[test]
1377    fn selective_attribute_parser_handles_extended_and_truncated_attributes() {
1378        let mut extended_as_path = BytesMut::new();
1379        extended_as_path.put_u8((AttrFlags::TRANSITIVE | AttrFlags::EXTENDED).bits());
1380        extended_as_path.put_u8(u8::from(AttrType::AS_PATH));
1381        extended_as_path.put_u16(4);
1382        extended_as_path.put_u8(2);
1383        extended_as_path.put_u8(1);
1384        extended_as_path.put_u16(64500);
1385
1386        let attrs = parse_route_attributes(
1387            extended_as_path.freeze(),
1388            &AsnLength::Bits16,
1389            false,
1390            RouteAttributeContext {
1391                afi: None,
1392                safi: None,
1393                prefixes: None,
1394                is_announcement: Some(false),
1395                has_standard_nlri: false,
1396            },
1397        )
1398        .unwrap();
1399        assert_eq!(
1400            attrs.as_path.unwrap().to_u32_vec_opt(false).unwrap(),
1401            vec![64500]
1402        );
1403
1404        let attrs = parse_route_attributes(
1405            Bytes::from_static(&[0x40, 2, 5, 0]),
1406            &AsnLength::Bits16,
1407            false,
1408            RouteAttributeContext {
1409                afi: None,
1410                safi: None,
1411                prefixes: None,
1412                is_announcement: Some(false),
1413                has_standard_nlri: false,
1414            },
1415        )
1416        .unwrap();
1417        assert!(attrs.as_path.is_none());
1418    }
1419
1420    #[test]
1421    fn selective_attribute_parser_discards_malformed_as_path() {
1422        let attrs = parse_route_attributes(
1423            Bytes::from_static(&[0x40, 2, 1, 0]),
1424            &AsnLength::Bits16,
1425            false,
1426            RouteAttributeContext {
1427                afi: None,
1428                safi: None,
1429                prefixes: None,
1430                is_announcement: Some(false),
1431                has_standard_nlri: false,
1432            },
1433        )
1434        .unwrap();
1435
1436        assert!(attrs.as_path.is_none());
1437    }
1438
1439    #[test]
1440    fn route_iterator_matches_elem_projection_for_table_dump() {
1441        let bytes = table_dump_record().encode().to_vec();
1442        let routes = assert_route_projection(bytes);
1443        assert_eq!(routes.len(), 1);
1444        assert_eq!(routes[0].timestamp, 1_699_999_998.0);
1445        assert_eq!(routes[0].peer_asn, Asn::new_16bit(64496));
1446    }
1447
1448    #[test]
1449    fn route_iterator_matches_elem_projection_for_table_dump_ipv6() {
1450        let bytes = table_dump_ipv6_record().encode().to_vec();
1451        let routes = assert_route_projection(bytes);
1452        assert_eq!(routes.len(), 1);
1453        assert_eq!(
1454            routes[0].prefix,
1455            NetworkPrefix::from_str("2001:db8::/32").unwrap()
1456        );
1457        assert_eq!(
1458            routes[0].peer_ip,
1459            IpAddr::from(Ipv6Addr::from_str("2001:db8::20").unwrap())
1460        );
1461    }
1462
1463    #[test]
1464    fn route_iterator_matches_elem_projection_for_table_dump_v2() {
1465        let bytes = table_dump_v2_records_bytes();
1466        let routes = assert_route_projection(bytes);
1467        assert_eq!(routes.len(), 1);
1468        assert_eq!(routes[0].elem_type, ElemType::ANNOUNCE);
1469        assert_eq!(
1470            routes[0].as_path.as_ref().unwrap().to_u32_vec_opt(false),
1471            Some(vec![64500, 64501])
1472        );
1473    }
1474
1475    #[test]
1476    fn route_iterator_matches_elem_projection_for_table_dump_v2_ipv6_addpath() {
1477        let peer = Peer::new(
1478            "192.0.2.11".parse().unwrap(),
1479            "2001:db8::10".parse().unwrap(),
1480            Asn::new_32bit(64496),
1481        );
1482        let mut peer_table = PeerIndexTable::default();
1483        let peer_index = peer_table.add_peer(peer);
1484
1485        let pit_record = MrtRecord {
1486            common_header: CommonHeader {
1487                timestamp: 1_700_000_000,
1488                microsecond_timestamp: None,
1489                entry_type: EntryType::TABLE_DUMP_V2,
1490                entry_subtype: TableDumpV2Type::PeerIndexTable as u16,
1491                length: 0,
1492            },
1493            message: MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(peer_table)),
1494        };
1495        let rib_record = MrtRecord {
1496            common_header: CommonHeader {
1497                timestamp: 1_700_000_001,
1498                microsecond_timestamp: None,
1499                entry_type: EntryType::TABLE_DUMP_V2,
1500                entry_subtype: TableDumpV2Type::RibIpv6UnicastAddPath as u16,
1501                length: 0,
1502            },
1503            message: MrtMessage::TableDumpV2Message(TableDumpV2Message::RibAfi(RibAfiEntries {
1504                rib_type: TableDumpV2Type::RibIpv6UnicastAddPath,
1505                sequence_number: 1,
1506                prefix: NetworkPrefix::from_str("2001:db8::/32").unwrap(),
1507                rib_entries: vec![RibEntry {
1508                    peer_index,
1509                    originated_time: 1_699_999_999,
1510                    path_id: Some(1234),
1511                    attributes: route_attributes([64500, 64501]),
1512                }],
1513            })),
1514        };
1515
1516        let mut bytes = pit_record.encode().to_vec();
1517        bytes.extend_from_slice(&rib_record.encode());
1518        let routes = assert_route_projection(bytes);
1519        assert_eq!(routes.len(), 1);
1520        assert_eq!(
1521            routes[0].prefix,
1522            NetworkPrefix::from_str("2001:db8::/32").unwrap()
1523        );
1524    }
1525
1526    #[test]
1527    fn route_iterator_matches_elem_projection_for_bgp4mp_ipv6_peer_update() {
1528        let record = MrtRecord {
1529            common_header: CommonHeader {
1530                timestamp: 1_700_000_000,
1531                microsecond_timestamp: None,
1532                entry_type: EntryType::BGP4MP,
1533                entry_subtype: Bgp4MpType::MessageAs4 as u16,
1534                length: 0,
1535            },
1536            message: MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(Bgp4MpMessage {
1537                msg_type: Bgp4MpType::MessageAs4,
1538                peer_asn: Asn::new_32bit(64496),
1539                local_asn: Asn::new_32bit(64497),
1540                interface_index: 0,
1541                peer_ip: IpAddr::from_str("2001:db8::1").unwrap(),
1542                local_ip: IpAddr::from_str("2001:db8::2").unwrap(),
1543                bgp_message: BgpMessage::Update(BgpUpdateMessage {
1544                    withdrawn_prefixes: vec![],
1545                    attributes: route_attributes([64500, 64501]),
1546                    announced_prefixes: vec![NetworkPrefix::from_str("203.0.113.0/24").unwrap()],
1547                }),
1548            })),
1549        };
1550
1551        let routes = assert_route_projection(record.encode().to_vec());
1552        assert_eq!(routes.len(), 1);
1553        assert_eq!(
1554            routes[0].peer_ip,
1555            IpAddr::from(Ipv6Addr::from_str("2001:db8::1").unwrap())
1556        );
1557    }
1558
1559    #[test]
1560    fn route_iterator_filters_match_elem_projection_for_table_dump_v2() {
1561        let bytes = table_dump_v2_records_bytes();
1562        let cases: &[&[(&str, &str)]] = &[
1563            &[("peer_ip", "192.0.2.10")],
1564            &[("peer_asn", "64496")],
1565            &[("type", "a")],
1566            &[("type", "w")],
1567            &[("prefix", "203.0.113.0/24")],
1568            &[("prefix_sub", "203.0.112.0/23")],
1569            &[("origin_asn", "64501")],
1570            &[("as_path", "64500 64501$")],
1571            &[("ts_start", "1699999999"), ("ts_end", "1699999999")],
1572            &[("peer_asn", "64496"), ("origin_asn", "64501")],
1573        ];
1574
1575        for filters in cases {
1576            assert_filtered_route_projection(bytes.clone(), filters);
1577        }
1578    }
1579
1580    #[test]
1581    fn route_parser_reports_bgp_message_shape_errors() {
1582        assert!(parse_bgp_message_routes(
1583            raw_bgp_message(18, BgpMessageType::KEEPALIVE, &[]),
1584            false,
1585            &AsnLength::Bits16,
1586            1_700_000_000.0,
1587            "192.0.2.1".parse().unwrap(),
1588            Asn::new_16bit(64496)
1589        )
1590        .is_err());
1591        assert!(parse_bgp_message_routes(
1592            raw_bgp_message(4097, BgpMessageType::OPEN, &[]),
1593            false,
1594            &AsnLength::Bits16,
1595            1_700_000_000.0,
1596            "192.0.2.1".parse().unwrap(),
1597            Asn::new_16bit(64496)
1598        )
1599        .is_err());
1600
1601        let routes = collect_route_record_iter(
1602            parse_bgp_message_routes(
1603                raw_bgp_message(30, BgpMessageType::KEEPALIVE, &[]),
1604                false,
1605                &AsnLength::Bits16,
1606                1_700_000_000.0,
1607                "192.0.2.1".parse().unwrap(),
1608                Asn::new_16bit(64496),
1609            )
1610            .unwrap(),
1611        )
1612        .unwrap();
1613        assert!(routes.is_empty());
1614
1615        let routes = collect_route_record_iter(
1616            parse_bgp_message_routes(
1617                raw_bgp_message(19, BgpMessageType::KEEPALIVE, &[0]),
1618                false,
1619                &AsnLength::Bits16,
1620                1_700_000_000.0,
1621                "192.0.2.1".parse().unwrap(),
1622                Asn::new_16bit(64496),
1623            )
1624            .unwrap(),
1625        )
1626        .unwrap();
1627        assert!(routes.is_empty());
1628
1629        let routes = collect_route_record_iter(
1630            parse_bgp_message_routes(
1631                raw_bgp_message_with_marker([0x00; 16], 19, BgpMessageType::KEEPALIVE, &[]),
1632                false,
1633                &AsnLength::Bits16,
1634                1_700_000_000.0,
1635                "192.0.2.1".parse().unwrap(),
1636                Asn::new_16bit(64496),
1637            )
1638            .unwrap(),
1639        )
1640        .unwrap();
1641        assert!(routes.is_empty());
1642    }
1643
1644    #[test]
1645    fn route_core_dump_write_respects_enabled_flag() {
1646        let dir = tempfile::tempdir().unwrap();
1647        let path = dir.path().join("mrt_core_dump");
1648
1649        write_mrt_core_dump_to_path(false, Some(vec![1, 2, 3]), &path);
1650        assert!(!path.exists());
1651
1652        write_mrt_core_dump_to_path(true, Some(vec![1, 2, 3]), &path);
1653        assert_eq!(std::fs::read(&path).unwrap(), vec![1, 2, 3]);
1654    }
1655
1656    #[test]
1657    fn route_parser_handles_table_dump_v2_error_edges() {
1658        let rib = RibAfiEntries {
1659            rib_type: TableDumpV2Type::RibIpv4Unicast,
1660            sequence_number: 1,
1661            prefix: NetworkPrefix::from_str("203.0.113.0/24").unwrap(),
1662            rib_entries: vec![RibEntry {
1663                peer_index: 99,
1664                originated_time: 1_699_999_999,
1665                path_id: None,
1666                attributes: route_attributes([64500, 64501]),
1667            }],
1668        };
1669        let mut no_peer_table = None;
1670        assert!(parse_table_dump_v2_routes(
1671            TableDumpV2Type::RibIpv4Unicast as u16,
1672            rib.encode(),
1673            &mut no_peer_table,
1674        )
1675        .is_err());
1676
1677        let mut empty_peer_table = Some(RoutePeerTable::default());
1678        let routes = collect_route_record_iter(
1679            parse_table_dump_v2_routes(
1680                TableDumpV2Type::RibIpv4Unicast as u16,
1681                rib.encode(),
1682                &mut empty_peer_table,
1683            )
1684            .unwrap(),
1685        )
1686        .unwrap();
1687        assert!(routes.is_empty());
1688
1689        let mut truncated = BytesMut::new();
1690        truncated.put_u32(1);
1691        truncated.extend(NetworkPrefix::from_str("203.0.113.0/24").unwrap().encode());
1692        truncated.put_u16(1);
1693        let mut empty_peer_table = Some(RoutePeerTable::default());
1694        let routes = collect_route_record_iter(
1695            parse_table_dump_v2_routes(
1696                TableDumpV2Type::RibIpv4Unicast as u16,
1697                truncated.freeze(),
1698                &mut empty_peer_table,
1699            )
1700            .unwrap(),
1701        )
1702        .unwrap();
1703        assert!(routes.is_empty());
1704
1705        let peer = Peer::new(
1706            "192.0.2.10".parse().unwrap(),
1707            "192.0.2.11".parse().unwrap(),
1708            Asn::new_32bit(64496),
1709        );
1710        let mut peer_table = PeerIndexTable::default();
1711        let peer_index = peer_table.add_peer(peer);
1712
1713        let first_entry = RibEntry {
1714            peer_index,
1715            originated_time: 1_699_999_999,
1716            path_id: Some(1234),
1717            attributes: route_attributes([64500, 64501]),
1718        };
1719        let mut add_path_truncated = BytesMut::new();
1720        add_path_truncated.put_u32(1);
1721        add_path_truncated.extend(NetworkPrefix::from_str("203.0.113.0/24").unwrap().encode());
1722        add_path_truncated.put_u16(2);
1723        add_path_truncated.extend(first_entry.encode());
1724        add_path_truncated.put_u16(peer_index);
1725        add_path_truncated.put_u32(1_699_999_998);
1726        add_path_truncated.put_u32(5678);
1727
1728        let mut peer_table = Some(route_peer_table_from_peer_index(peer_table));
1729        let routes = collect_route_record_iter(
1730            parse_table_dump_v2_routes(
1731                TableDumpV2Type::RibIpv4UnicastAddPath as u16,
1732                add_path_truncated.freeze(),
1733                &mut peer_table,
1734            )
1735            .unwrap(),
1736        )
1737        .unwrap();
1738        assert_eq!(routes.len(), 1);
1739        assert_eq!(
1740            routes[0].prefix,
1741            NetworkPrefix::from_str("203.0.113.0/24").unwrap()
1742        );
1743    }
1744
1745    #[test]
1746    fn route_parser_preserves_table_dump_v2_routes_before_truncated_attribute_payload() {
1747        let (_bytes, rib_body, peer_table) = table_dump_v2_truncated_attribute_payload();
1748        let mut peer_table = Some(route_peer_table_from_peer_index(peer_table));
1749
1750        let routes = collect_route_record_iter(
1751            parse_table_dump_v2_routes(
1752                TableDumpV2Type::RibIpv4Unicast as u16,
1753                rib_body,
1754                &mut peer_table,
1755            )
1756            .unwrap(),
1757        )
1758        .unwrap();
1759
1760        assert_eq!(routes.len(), 1);
1761        assert_eq!(
1762            routes[0].prefix,
1763            NetworkPrefix::from_str("203.0.113.0/24").unwrap()
1764        );
1765        assert_eq!(
1766            routes[0].as_path.as_ref().unwrap().to_u32_vec_opt(false),
1767            Some(vec![64500, 64501])
1768        );
1769    }
1770
1771    #[test]
1772    fn route_iterators_preserve_table_dump_v2_routes_before_truncated_attribute_payload() {
1773        let (bytes, _rib_body, _peer_table) = table_dump_v2_truncated_attribute_payload();
1774
1775        let routes = BgpkitParser::from_reader(Cursor::new(bytes.clone()))
1776            .into_route_iter()
1777            .collect::<Vec<_>>();
1778        assert_eq!(routes.len(), 1);
1779        assert_eq!(
1780            routes[0].prefix,
1781            NetworkPrefix::from_str("203.0.113.0/24").unwrap()
1782        );
1783
1784        let fallible_routes = BgpkitParser::from_reader(Cursor::new(bytes))
1785            .into_fallible_route_iter()
1786            .collect::<Result<Vec<_>, _>>()
1787            .unwrap();
1788        assert_eq!(fallible_routes, routes);
1789    }
1790
1791    fn table_dump_v2_rib_without_peer_table_record() -> MrtRecord {
1792        MrtRecord {
1793            common_header: CommonHeader {
1794                timestamp: 1_700_000_001,
1795                microsecond_timestamp: None,
1796                entry_type: EntryType::TABLE_DUMP_V2,
1797                entry_subtype: TableDumpV2Type::RibIpv4Unicast as u16,
1798                length: 0,
1799            },
1800            message: MrtMessage::TableDumpV2Message(TableDumpV2Message::RibAfi(RibAfiEntries {
1801                rib_type: TableDumpV2Type::RibIpv4Unicast,
1802                sequence_number: 1,
1803                prefix: NetworkPrefix::from_str("203.0.113.0/24").unwrap(),
1804                rib_entries: vec![RibEntry {
1805                    peer_index: 0,
1806                    originated_time: 1_699_999_999,
1807                    path_id: None,
1808                    attributes: route_attributes([64500, 64501]),
1809                }],
1810            })),
1811        }
1812    }
1813
1814    #[test]
1815    fn route_iterator_skips_route_parse_errors() {
1816        let routes = BgpkitParser::from_reader(Cursor::new(
1817            table_dump_v2_rib_without_peer_table_record()
1818                .encode()
1819                .to_vec(),
1820        ))
1821        .into_route_iter()
1822        .collect::<Vec<_>>();
1823
1824        assert!(routes.is_empty());
1825    }
1826
1827    #[test]
1828    fn fallible_route_iterator_applies_filters_to_cached_routes() {
1829        let routes = BgpkitParser::from_reader(Cursor::new(update_record().encode().to_vec()))
1830            .add_filter("type", "w")
1831            .unwrap()
1832            .into_fallible_route_iter()
1833            .collect::<Result<Vec<_>, _>>()
1834            .unwrap();
1835
1836        assert_eq!(routes.len(), 1);
1837        assert_eq!(routes[0].elem_type, ElemType::WITHDRAW);
1838    }
1839
1840    #[test]
1841    fn fallible_route_iterator_returns_route_parse_errors() {
1842        let mut iter = BgpkitParser::from_reader(Cursor::new(
1843            table_dump_v2_rib_without_peer_table_record()
1844                .encode()
1845                .to_vec(),
1846        ))
1847        .into_fallible_route_iter();
1848
1849        assert!(iter.next().unwrap().is_err());
1850    }
1851
1852    #[test]
1853    fn fallible_route_iterator_yields_routes() {
1854        let bytes = update_record().encode().to_vec();
1855        let routes = BgpkitParser::from_reader(Cursor::new(bytes))
1856            .into_fallible_route_iter()
1857            .collect::<Result<Vec<_>, _>>()
1858            .unwrap();
1859
1860        assert_eq!(routes.len(), 2);
1861        assert_eq!(routes[0].elem_type, ElemType::ANNOUNCE);
1862        assert_eq!(routes[1].elem_type, ElemType::WITHDRAW);
1863    }
1864
1865    #[test]
1866    fn fallible_route_iterator_returns_parse_errors() {
1867        let invalid_data = vec![
1868            0x00, 0x00, 0x00, 0x00, // timestamp
1869            0xFF, 0xFF, // invalid type
1870            0x00, 0x00, // subtype
1871            0x00, 0x00, 0x00, 0x04, // length
1872            0x00, 0x00, 0x00, 0x00, // dummy data
1873        ];
1874
1875        let mut iter =
1876            BgpkitParser::from_reader(Cursor::new(invalid_data)).into_fallible_route_iter();
1877
1878        assert!(iter.next().unwrap().is_err());
1879    }
1880}