1use crate::error::{ParserError, ParserErrorWithBytes};
2use crate::models::*;
3use crate::parser::bgp::attributes::{parse_as_path, parse_nlri, AttributeValidationState};
4use crate::parser::bgp::messages::read_and_validate_bgp_marker;
5use crate::parser::iters::write_mrt_core_dump;
6use crate::parser::mrt::messages::bgp4mp::bgp4mp_message_payload_len;
7use crate::parser::mrt::messages::table_dump_v2::rib_entry_min_len;
8use crate::parser::{chunk_mrt_record, parse_nlri_list, BgpkitParser, Filterable, ReadUtils};
9use bytes::{Buf, Bytes};
10use ipnet::IpNet;
11use log::{error, warn};
12use std::io::Read;
13use std::net::IpAddr;
14use std::sync::Arc;
15
16#[derive(Default)]
17struct RouteAttributes {
18 as_path: Option<Arc<AsPath>>,
19 announced: Vec<NetworkPrefix>,
20 withdrawn: Vec<NetworkPrefix>,
21}
22
23struct RouteAttributeContext<'a> {
24 afi: Option<Afi>,
25 safi: Option<Safi>,
26 prefixes: Option<&'a [NetworkPrefix]>,
27 is_announcement: Option<bool>,
28 has_standard_nlri: bool,
29}
30
31fn merge_as_path(as_path: Option<AsPath>, as4_path: Option<AsPath>) -> Option<Arc<AsPath>> {
32 let path = match (as_path, as4_path) {
33 (None, None) => None,
34 (Some(path), None) | (None, Some(path)) => Some(path),
35 (Some(path), Some(as4_path)) => Some(AsPath::merge_aspath_as4path(&path, &as4_path)),
36 };
37 path.map(Arc::new)
38}
39
40fn parse_route_attributes(
41 mut data: Bytes,
42 asn_len: &AsnLength,
43 add_path: bool,
44 ctx: RouteAttributeContext<'_>,
45) -> Result<RouteAttributes, ParserError> {
46 let mut validation = AttributeValidationState::new();
47 let mut as_path = None;
48 let mut as4_path = None;
49 let mut announced = Vec::new();
50 let mut withdrawn = Vec::new();
51
52 while data.remaining() >= 3 {
53 let flags = AttrFlags::from_bits_retain(data.read_u8()?);
54 let raw_attr_type = data.read_u8()?;
55 let attr_length = if flags.contains(AttrFlags::EXTENDED) {
56 data.read_u16()? as usize
57 } else {
58 data.read_u8()? as usize
59 };
60 let attr_type = AttrType::from(raw_attr_type);
61 let partial = validation.observe_header(raw_attr_type, attr_type, flags, attr_length);
62
63 if data.remaining() < attr_length {
64 warn!(
65 "{:?} attribute encodes a length ({}) that is longer than the remaining attribute data ({}). Skipping remaining attribute data for BGP message",
66 attr_type,
67 attr_length,
68 data.remaining()
69 );
70 break;
71 }
72
73 let attr_data = data.split_to(attr_length);
74 let result = match attr_type {
75 AttrType::AS_PATH => parse_as_path(attr_data, asn_len).map(|path| {
76 as_path = Some(path);
77 }),
78 AttrType::AS4_PATH => parse_as_path(attr_data, &AsnLength::Bits32).map(|path| {
79 as4_path = Some(path);
80 }),
81 AttrType::MP_REACHABLE_NLRI => parse_nlri(
82 attr_data,
83 &ctx.afi,
84 &ctx.safi,
85 &ctx.prefixes,
86 true,
87 add_path,
88 )
89 .map(|attr| {
90 if let AttributeValue::MpReachNlri(nlri) = attr {
91 announced = nlri.prefixes;
92 }
93 }),
94 AttrType::MP_UNREACHABLE_NLRI => parse_nlri(
95 attr_data,
96 &ctx.afi,
97 &ctx.safi,
98 &ctx.prefixes,
99 false,
100 add_path,
101 )
102 .map(|attr| {
103 if let AttributeValue::MpUnreachNlri(nlri) = attr {
104 withdrawn = nlri.prefixes;
105 }
106 }),
107 _ => Ok(()),
108 };
109
110 if let Err(err) = result {
111 validation.observe_parse_error(attr_type, partial, &err);
112 }
113 }
114
115 let is_announcement = ctx
116 .is_announcement
117 .unwrap_or(ctx.has_standard_nlri || validation.has_attr(AttrType::MP_REACHABLE_NLRI));
118 validation.check_mandatory_attributes(is_announcement, ctx.has_standard_nlri);
119 let _warnings = validation.finish();
120 Ok(RouteAttributes {
121 as_path: merge_as_path(as_path, as4_path),
122 announced,
123 withdrawn,
124 })
125}
126
127fn record_timestamp(common_header: &CommonHeader) -> f64 {
128 match common_header.microsecond_timestamp {
129 Some(microseconds) => common_header.timestamp as f64 + microseconds as f64 / 1_000_000.0,
130 None => common_header.timestamp as f64,
131 }
132}
133
134struct RouteUpdateIter {
135 timestamp: f64,
136 peer_ip: IpAddr,
137 peer_asn: Asn,
138 as_path: Option<Arc<AsPath>>,
139 announced:
140 std::iter::Chain<std::vec::IntoIter<NetworkPrefix>, std::vec::IntoIter<NetworkPrefix>>,
141 withdrawn:
142 std::iter::Chain<std::vec::IntoIter<NetworkPrefix>, std::vec::IntoIter<NetworkPrefix>>,
143 in_withdrawn_phase: bool,
144}
145
146impl RouteUpdateIter {
147 fn next_route(&mut self) -> Option<BgpRouteElem> {
148 if !self.in_withdrawn_phase {
149 if let Some(prefix) = self.announced.next() {
150 return Some(BgpRouteElem {
151 timestamp: self.timestamp,
152 elem_type: ElemType::ANNOUNCE,
153 peer_ip: self.peer_ip,
154 peer_asn: self.peer_asn,
155 prefix,
156 as_path: self.as_path.clone(),
157 });
158 }
159 self.in_withdrawn_phase = true;
160 }
161
162 self.withdrawn.next().map(|prefix| BgpRouteElem {
163 timestamp: self.timestamp,
164 elem_type: ElemType::WITHDRAW,
165 peer_ip: self.peer_ip,
166 peer_asn: self.peer_asn,
167 prefix,
168 as_path: None,
169 })
170 }
171}
172
173#[derive(Clone, Default)]
174struct RoutePeerTable {
175 peers: Arc<[Peer]>,
176}
177
178impl RoutePeerTable {
179 fn get_peer_by_id(&self, peer_index: u16) -> Option<Peer> {
180 self.peers.get(peer_index as usize).copied()
181 }
182}
183
184fn parse_route_peer_table(mut data: Bytes) -> Result<RoutePeerTable, ParserError> {
185 let _collector_bgp_id = data.read_u32()?;
186 let view_name_length = data.read_u16()? as usize;
187 data.has_n_remaining(view_name_length)?;
188 data.advance(view_name_length);
189
190 let peer_count = data.read_u16()? as usize;
191 let mut peers = Vec::with_capacity(peer_count);
192 for _ in 0..peer_count {
193 let peer_type = PeerType::from_bits_retain(data.read_u8()?);
194 let afi = if peer_type.contains(PeerType::ADDRESS_FAMILY_IPV6) {
195 Afi::Ipv6
196 } else {
197 Afi::Ipv4
198 };
199 let asn_len = if peer_type.contains(PeerType::AS_SIZE_32BIT) {
200 AsnLength::Bits32
201 } else {
202 AsnLength::Bits16
203 };
204
205 let peer_bgp_id = data.read_ipv4_address()?;
206 let peer_ip = data.read_address(&afi)?;
207 let peer_asn = data.read_asn(asn_len)?;
208 peers.push(Peer {
209 peer_type,
210 peer_bgp_id,
211 peer_ip,
212 peer_asn,
213 });
214 }
215
216 Ok(RoutePeerTable {
217 peers: Arc::from(peers),
218 })
219}
220
221#[derive(Default)]
222enum RouteRecordIter {
223 #[default]
224 Empty,
225 One(Option<BgpRouteElem>),
226 Update(RouteUpdateIter),
227 RibAfi(RouteRibAfiIter),
228}
229
230impl RouteRecordIter {
231 fn next_route(&mut self) -> Result<Option<BgpRouteElem>, ParserError> {
232 match self {
233 RouteRecordIter::Empty => Ok(None),
234 RouteRecordIter::One(route) => Ok(route.take()),
235 RouteRecordIter::Update(iter) => Ok(iter.next_route()),
236 RouteRecordIter::RibAfi(iter) => iter.next_route(),
237 }
238 }
239}
240
241struct RouteRibAfiIter {
242 data: Bytes,
243 peer_table: RoutePeerTable,
244 afi: Afi,
245 safi: Safi,
246 is_add_path: bool,
247 prefix: NetworkPrefix,
248 remaining_entries: u16,
249}
250
251impl RouteRibAfiIter {
252 fn next_route(&mut self) -> Result<Option<BgpRouteElem>, ParserError> {
253 while self.remaining_entries > 0 {
254 if self.data.remaining() < rib_entry_min_len(self.is_add_path) {
255 warn!("early break due to truncated msg while parsing RIB AFI entries");
256 self.remaining_entries = 0;
257 return Ok(None);
258 }
259
260 self.remaining_entries -= 1;
261 let peer_index = self.data.read_u16()?;
262 let originated_time = self.data.read_u32()? as f64;
263 let _path_id = if self.is_add_path {
264 Some(self.data.read_u32()?)
265 } else {
266 None
267 };
268 let attribute_length = self.data.read_u16()? as usize;
269 if self.data.remaining() < attribute_length {
270 warn!(
271 "early break due to truncated attribute payload while parsing RIB AFI entries: expected {} bytes, have {} bytes available",
272 attribute_length,
273 self.data.remaining()
274 );
275 self.remaining_entries = 0;
276 return Ok(None);
277 }
278
279 let prefixes = [self.prefix];
280 let attrs = parse_route_attributes(
281 self.data.split_to(attribute_length),
282 &AsnLength::Bits32,
283 self.is_add_path,
284 RouteAttributeContext {
285 afi: Some(self.afi),
286 safi: Some(self.safi),
287 prefixes: Some(&prefixes),
288 is_announcement: Some(true),
289 has_standard_nlri: self.afi == Afi::Ipv4,
290 },
291 )?;
292 let Some(peer) = self.peer_table.get_peer_by_id(peer_index) else {
293 error!("peer ID {} not found in peer_index table", peer_index);
294 continue;
295 };
296
297 return Ok(Some(BgpRouteElem {
298 timestamp: originated_time,
299 elem_type: ElemType::ANNOUNCE,
300 peer_ip: peer.peer_ip,
301 peer_asn: peer.peer_asn,
302 prefix: self.prefix,
303 as_path: attrs.as_path,
304 }));
305 }
306
307 Ok(None)
308 }
309}
310
311fn parse_bgp_update_routes(
312 mut input: Bytes,
313 add_path: bool,
314 asn_len: &AsnLength,
315 timestamp: f64,
316 peer_ip: IpAddr,
317 peer_asn: Asn,
318) -> Result<RouteUpdateIter, ParserError> {
319 let withdrawn_len = input.read_u16()? as usize;
320 input.has_n_remaining(withdrawn_len)?;
321 let withdrawn_prefixes = parse_nlri_list(input.split_to(withdrawn_len), add_path, &Afi::Ipv4)?;
322
323 let attribute_length = input.read_u16()? as usize;
324 input.has_n_remaining(attribute_length)?;
325 let attribute_bytes = input.split_to(attribute_length);
326 let announced_prefixes = parse_nlri_list(input, add_path, &Afi::Ipv4)?;
327 let attributes = parse_route_attributes(
328 attribute_bytes,
329 asn_len,
330 add_path,
331 RouteAttributeContext {
332 afi: None,
333 safi: None,
334 prefixes: None,
335 is_announcement: None,
336 has_standard_nlri: !announced_prefixes.is_empty(),
337 },
338 )?;
339
340 Ok(RouteUpdateIter {
341 timestamp,
342 peer_ip,
343 peer_asn,
344 as_path: attributes.as_path,
345 announced: announced_prefixes.into_iter().chain(attributes.announced),
346 withdrawn: withdrawn_prefixes.into_iter().chain(attributes.withdrawn),
347 in_withdrawn_phase: false,
348 })
349}
350
351fn parse_bgp_message_routes(
352 mut data: Bytes,
353 add_path: bool,
354 asn_len: &AsnLength,
355 timestamp: f64,
356 peer_ip: IpAddr,
357 peer_asn: Asn,
358) -> Result<RouteRecordIter, ParserError> {
359 let total_size = data.len();
360 data.has_n_remaining(19)?;
361 read_and_validate_bgp_marker(&mut data)?;
362 let length = data.read_u16()?;
363 if !(19..=65_535).contains(&length) {
364 return Err(ParserError::ParseError(format!(
365 "invalid BGP message length {length}"
366 )));
367 }
368
369 let bgp_msg_length = if length as usize > total_size {
370 total_size - 19
371 } else {
372 length as usize - 19
373 };
374 let msg_type = BgpMessageType::try_from(data.read_u8()?)
375 .map_err(|_| ParserError::ParseError("Unknown BGP Message Type".to_string()))?;
376
377 if matches!(msg_type, BgpMessageType::OPEN | BgpMessageType::KEEPALIVE) && length > 4096 {
378 return Err(ParserError::ParseError(format!(
379 "BGP {msg_type:?} message length {length} exceeds maximum allowed 4096 bytes (RFC 8654)"
380 )));
381 }
382
383 if data.remaining() != bgp_msg_length {
384 warn!(
385 "BGP message length {} does not match the actual length {} (parsing BGP message)",
386 bgp_msg_length,
387 data.remaining()
388 );
389 }
390 data.has_n_remaining(bgp_msg_length)?;
391 let msg_data = data.split_to(bgp_msg_length);
392
393 match msg_type {
394 BgpMessageType::UPDATE => Ok(RouteRecordIter::Update(parse_bgp_update_routes(
395 msg_data, add_path, asn_len, timestamp, peer_ip, peer_asn,
396 )?)),
397 BgpMessageType::OPEN | BgpMessageType::NOTIFICATION | BgpMessageType::KEEPALIVE => {
398 Ok(RouteRecordIter::Empty)
399 }
400 }
401}
402
403fn bgp4mp_asn_len_and_add_path(msg_type: Bgp4MpType) -> Option<(AsnLength, bool)> {
404 match msg_type {
405 Bgp4MpType::Message | Bgp4MpType::MessageLocal => Some((AsnLength::Bits16, false)),
406 Bgp4MpType::MessageAs4 | Bgp4MpType::MessageAs4Local => Some((AsnLength::Bits32, false)),
407 Bgp4MpType::MessageAddpath | Bgp4MpType::MessageLocalAddpath => {
408 Some((AsnLength::Bits16, true))
409 }
410 Bgp4MpType::MessageAs4Addpath | Bgp4MpType::MessageLocalAs4Addpath => {
411 Some((AsnLength::Bits32, true))
412 }
413 Bgp4MpType::StateChange | Bgp4MpType::StateChangeAs4 => None,
414 }
415}
416
417fn parse_bgp4mp_routes(
418 sub_type: u16,
419 mut data: Bytes,
420 timestamp: f64,
421) -> Result<RouteRecordIter, ParserError> {
422 let msg_type = Bgp4MpType::try_from(sub_type)?;
423 let Some((asn_len, add_path)) = bgp4mp_asn_len_and_add_path(msg_type) else {
424 return Ok(RouteRecordIter::Empty);
425 };
426
427 let total_size = data.len();
428 let peer_asn = data.read_asn(asn_len)?;
429 let _local_asn = data.read_asn(asn_len)?;
430 let _interface_index = data.read_u16()?;
431 let afi = data.read_afi()?;
432 let peer_ip = data.read_address(&afi)?;
433 let _local_ip = data.read_address(&afi)?;
434
435 let should_read = bgp4mp_message_payload_len(&afi, &asn_len, total_size);
436 if should_read != data.remaining() {
437 return Err(ParserError::TruncatedMsg(format!(
438 "truncated bgp4mp message: should read {} bytes, have {} bytes available",
439 should_read,
440 data.remaining()
441 )));
442 }
443
444 parse_bgp_message_routes(data, add_path, &asn_len, timestamp, peer_ip, peer_asn)
445}
446
447fn table_dump_v2_afi_safi(rib_type: TableDumpV2Type) -> Result<(Afi, Safi), ParserError> {
448 match rib_type {
449 TableDumpV2Type::RibIpv4Unicast | TableDumpV2Type::RibIpv4UnicastAddPath => {
450 Ok((Afi::Ipv4, Safi::Unicast))
451 }
452 TableDumpV2Type::RibIpv4Multicast | TableDumpV2Type::RibIpv4MulticastAddPath => {
453 Ok((Afi::Ipv4, Safi::Multicast))
454 }
455 TableDumpV2Type::RibIpv6Unicast | TableDumpV2Type::RibIpv6UnicastAddPath => {
456 Ok((Afi::Ipv6, Safi::Unicast))
457 }
458 TableDumpV2Type::RibIpv6Multicast | TableDumpV2Type::RibIpv6MulticastAddPath => {
459 Ok((Afi::Ipv6, Safi::Multicast))
460 }
461 _ => Err(ParserError::ParseError(format!(
462 "wrong RIB type for parsing: {rib_type:?}"
463 ))),
464 }
465}
466
467fn is_add_path_rib_type(rib_type: TableDumpV2Type) -> bool {
468 matches!(
469 rib_type,
470 TableDumpV2Type::RibIpv4UnicastAddPath
471 | TableDumpV2Type::RibIpv4MulticastAddPath
472 | TableDumpV2Type::RibIpv6UnicastAddPath
473 | TableDumpV2Type::RibIpv6MulticastAddPath
474 )
475}
476
477fn parse_table_dump_routes(sub_type: u16, mut data: Bytes) -> Result<RouteRecordIter, ParserError> {
478 let afi = match sub_type {
479 1 => Afi::Ipv4,
480 2 => Afi::Ipv6,
481 _ => {
482 return Err(ParserError::ParseError(format!(
483 "Invalid subtype found for TABLE_DUMP (V1) message: {sub_type}"
484 )))
485 }
486 };
487
488 let _view_number = data.read_u16()?;
489 let _sequence_number = data.read_u16()?;
490 let prefix = match &afi {
491 Afi::Ipv4 => data.read_ipv4_prefix().map(IpNet::V4),
492 Afi::Ipv6 => data.read_ipv6_prefix().map(IpNet::V6),
493 Afi::LinkState => unreachable!(),
494 }?;
495 let _status = data.read_u8()?;
496 let originated_time = data.read_u32()? as f64;
497 let peer_ip = data.read_address(&afi)?;
498 let peer_asn = Asn::new_16bit(data.read_u16()?);
499 let attribute_length = data.read_u16()? as usize;
500 data.has_n_remaining(attribute_length)?;
501 let attrs = parse_route_attributes(
502 data.split_to(attribute_length),
503 &AsnLength::Bits16,
504 false,
505 RouteAttributeContext {
506 afi: None,
507 safi: None,
508 prefixes: None,
509 is_announcement: Some(true),
510 has_standard_nlri: afi == Afi::Ipv4,
511 },
512 )?;
513
514 Ok(RouteRecordIter::One(Some(BgpRouteElem {
515 timestamp: originated_time,
516 elem_type: ElemType::ANNOUNCE,
517 peer_ip,
518 peer_asn,
519 prefix: NetworkPrefix::new(prefix, None),
520 as_path: attrs.as_path,
521 })))
522}
523
524fn parse_table_dump_v2_routes(
525 sub_type: u16,
526 mut data: Bytes,
527 peer_table: &mut Option<RoutePeerTable>,
528) -> Result<RouteRecordIter, ParserError> {
529 let v2_type = TableDumpV2Type::try_from(sub_type)?;
530 match v2_type {
531 TableDumpV2Type::PeerIndexTable => {
532 *peer_table = Some(parse_route_peer_table(data)?);
533 Ok(RouteRecordIter::Empty)
534 }
535 TableDumpV2Type::GeoPeerTable => Ok(RouteRecordIter::Empty),
536 TableDumpV2Type::RibGeneric | TableDumpV2Type::RibGenericAddPath => Err(
537 ParserError::Unsupported("TableDumpV2 RibGeneric is not currently supported".into()),
538 ),
539 rib_type => {
540 let (afi, safi) = table_dump_v2_afi_safi(rib_type)?;
541 let is_add_path = is_add_path_rib_type(rib_type);
542 let _sequence_number = data.read_u32()?;
543 let prefix = data.read_nlri_prefix(&afi, false)?;
544 let entry_count = data.read_u16()?;
545 let Some(peer_table) = peer_table.clone() else {
546 return Err(ParserError::ParseError(
547 "peer table not set for TableDumpV2 RIB entries".to_string(),
548 ));
549 };
550
551 Ok(RouteRecordIter::RibAfi(RouteRibAfiIter {
552 data,
553 peer_table,
554 afi,
555 safi,
556 is_add_path,
557 prefix,
558 remaining_entries: entry_count,
559 }))
560 }
561 }
562}
563
564fn parse_raw_record_route_iter(
565 raw_record: crate::RawMrtRecord,
566 peer_table: &mut Option<RoutePeerTable>,
567) -> Result<RouteRecordIter, ParserError> {
568 let timestamp = record_timestamp(&raw_record.common_header);
569 match raw_record.common_header.entry_type {
570 EntryType::TABLE_DUMP => parse_table_dump_routes(
571 raw_record.common_header.entry_subtype,
572 raw_record.message_bytes,
573 ),
574 EntryType::TABLE_DUMP_V2 => parse_table_dump_v2_routes(
575 raw_record.common_header.entry_subtype,
576 raw_record.message_bytes,
577 peer_table,
578 ),
579 EntryType::BGP4MP | EntryType::BGP4MP_ET => parse_bgp4mp_routes(
580 raw_record.common_header.entry_subtype,
581 raw_record.message_bytes,
582 timestamp,
583 ),
584 v => Err(ParserError::Unsupported(format!(
585 "unsupported MRT type: {v:?}"
586 ))),
587 }
588}
589
590pub struct RouteIterator<R> {
591 parser: BgpkitParser<R>,
592 pending_routes: RouteRecordIter,
593 peer_table: Option<RoutePeerTable>,
594}
595
596impl<R> RouteIterator<R> {
597 pub(crate) fn new(parser: BgpkitParser<R>) -> Self {
598 Self {
599 parser,
600 pending_routes: RouteRecordIter::Empty,
601 peer_table: None,
602 }
603 }
604}
605
606impl<R: Read> Iterator for RouteIterator<R> {
607 type Item = BgpRouteElem;
608
609 fn next(&mut self) -> Option<Self::Item> {
610 loop {
611 match self.pending_routes.next_route() {
612 Ok(Some(route)) => {
613 if route.match_filters(&self.parser.filters) {
614 return Some(route);
615 }
616 continue;
617 }
618 Ok(None) => {}
619 Err(err) => {
620 error!("parser error: {}", err);
621 self.pending_routes = RouteRecordIter::Empty;
622 if self.parser.core_dump {
623 return None;
624 }
625 continue;
626 }
627 }
628
629 let raw_record = match chunk_mrt_record(&mut self.parser.reader) {
630 Ok(raw_record) => raw_record,
631 Err(e) => match e.error {
632 ParserError::TruncatedMsg(err_str) | ParserError::Unsupported(err_str) => {
633 if self.parser.options.show_warnings {
634 warn!("parser warn: {}", err_str);
635 }
636 write_mrt_core_dump(self.parser.core_dump, e.bytes);
637 continue;
638 }
639 ParserError::ParseError(err_str) => {
640 error!("parser error: {}", err_str);
641 if self.parser.core_dump {
642 write_mrt_core_dump(true, e.bytes);
643 return None;
644 }
645 continue;
646 }
647 ParserError::EofExpected => return None,
648 ParserError::IoError(err) | ParserError::EofError(err) => {
649 error!("{:?}", err);
650 write_mrt_core_dump(self.parser.core_dump, e.bytes);
651 return None;
652 }
653 #[cfg(feature = "oneio")]
654 ParserError::OneIoError(_) => return None,
655 ParserError::FilterError(_) => return None,
656 ParserError::InvalidLabeledNlriLength
657 | ParserError::TruncatedLabeledNlri
658 | ParserError::TruncatedPrefix
659 | ParserError::MaxLabelStackDepthExceeded
660 | ParserError::PeerMaxLabelsExceeded
661 | ParserError::InvalidPrefix => {
662 if self.parser.options.show_warnings {
663 warn!("parser warn: labeled NLRI parsing error: {:?}", e.error);
664 }
665 continue;
666 }
667 },
668 };
669
670 match parse_raw_record_route_iter(raw_record, &mut self.peer_table) {
671 Ok(routes) => {
672 self.pending_routes = routes;
673 }
674 Err(err) => {
675 error!("parser error: {}", err);
676 if self.parser.core_dump {
677 return None;
678 }
679 continue;
680 }
681 }
682 }
683 }
684}
685
686pub struct FallibleRouteIterator<R> {
687 parser: BgpkitParser<R>,
688 pending_routes: RouteRecordIter,
689 peer_table: Option<RoutePeerTable>,
690}
691
692impl<R> FallibleRouteIterator<R> {
693 pub(crate) fn new(parser: BgpkitParser<R>) -> Self {
694 Self {
695 parser,
696 pending_routes: RouteRecordIter::Empty,
697 peer_table: None,
698 }
699 }
700}
701
702impl<R: Read> Iterator for FallibleRouteIterator<R> {
703 type Item = Result<BgpRouteElem, ParserErrorWithBytes>;
704
705 fn next(&mut self) -> Option<Self::Item> {
706 loop {
707 match self.pending_routes.next_route() {
708 Ok(Some(route)) => {
709 if route.match_filters(&self.parser.filters) {
710 return Some(Ok(route));
711 }
712 continue;
713 }
714 Ok(None) => {}
715 Err(error) => {
716 self.pending_routes = RouteRecordIter::Empty;
717 return Some(Err(ParserErrorWithBytes { error, bytes: None }));
718 }
719 }
720
721 let raw_record = match chunk_mrt_record(&mut self.parser.reader) {
722 Ok(raw_record) => raw_record,
723 Err(e) if matches!(e.error, ParserError::EofExpected) => return None,
724 Err(e) => return Some(Err(e)),
725 };
726
727 match parse_raw_record_route_iter(raw_record, &mut self.peer_table) {
728 Ok(routes) => {
729 self.pending_routes = routes;
730 }
731 Err(error) => return Some(Err(ParserErrorWithBytes { error, bytes: None })),
732 }
733 }
734 }
735}
736
737#[cfg(test)]
738mod tests {
739 use super::*;
740 use crate::parser::iters::write_mrt_core_dump_to_path;
741 use bytes::{BufMut, BytesMut};
742 use std::io::Cursor;
743 use std::net::{Ipv4Addr, Ipv6Addr};
744 use std::str::FromStr;
745
746 fn route_projection(elem: BgpElem) -> BgpRouteElem {
747 BgpRouteElem {
748 timestamp: elem.timestamp,
749 elem_type: elem.elem_type,
750 peer_ip: elem.peer_ip,
751 peer_asn: elem.peer_asn,
752 prefix: elem.prefix,
753 as_path: elem.as_path.map(Arc::new),
754 }
755 }
756
757 fn collect_route_record_iter(
758 mut iter: RouteRecordIter,
759 ) -> Result<Vec<BgpRouteElem>, ParserError> {
760 let mut routes = Vec::new();
761 while let Some(route) = iter.next_route()? {
762 routes.push(route);
763 }
764 Ok(routes)
765 }
766
767 fn route_peer_table_from_peer_index(peer_table: PeerIndexTable) -> RoutePeerTable {
768 let mut peer_ids = peer_table.id_peer_map.keys().copied().collect::<Vec<_>>();
769 peer_ids.sort_unstable();
770 let peers = peer_ids
771 .into_iter()
772 .map(|peer_id| peer_table.id_peer_map[&peer_id])
773 .collect::<Vec<_>>();
774
775 RoutePeerTable {
776 peers: Arc::from(peers),
777 }
778 }
779
780 fn update_record() -> MrtRecord {
781 let mut attributes = Attributes::default();
782 attributes.add_attr(AttributeValue::Origin(Origin::IGP).into());
783 attributes.add_attr(
784 AttributeValue::AsPath {
785 path: AsPath::from_sequence([64500, 64501]),
786 is_as4: false,
787 }
788 .into(),
789 );
790 attributes
791 .add_attr(AttributeValue::NextHop(IpAddr::from_str("192.0.2.254").unwrap()).into());
792
793 MrtRecord {
794 common_header: CommonHeader {
795 timestamp: 1_700_000_000,
796 microsecond_timestamp: None,
797 entry_type: EntryType::BGP4MP,
798 entry_subtype: Bgp4MpType::MessageAs4 as u16,
799 length: 0,
800 },
801 message: MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(Bgp4MpMessage {
802 msg_type: Bgp4MpType::MessageAs4,
803 peer_asn: Asn::new_32bit(64496),
804 local_asn: Asn::new_32bit(64497),
805 interface_index: 0,
806 peer_ip: IpAddr::from_str("192.0.2.1").unwrap(),
807 local_ip: IpAddr::from_str("192.0.2.2").unwrap(),
808 bgp_message: BgpMessage::Update(BgpUpdateMessage {
809 withdrawn_prefixes: vec![NetworkPrefix::from_str("198.51.100.0/24").unwrap()],
810 attributes,
811 announced_prefixes: vec![NetworkPrefix::from_str("203.0.113.0/24").unwrap()],
812 }),
813 })),
814 }
815 }
816
817 fn route_attributes(as_path: impl AsRef<[u32]>) -> Attributes {
818 let mut attributes = Attributes::default();
819 attributes.add_attr(AttributeValue::Origin(Origin::IGP).into());
820 attributes.add_attr(
821 AttributeValue::AsPath {
822 path: AsPath::from_sequence(as_path),
823 is_as4: false,
824 }
825 .into(),
826 );
827 attributes
828 .add_attr(AttributeValue::NextHop(IpAddr::from_str("192.0.2.254").unwrap()).into());
829 attributes
830 }
831
832 fn bgp4mp_record(msg_type: Bgp4MpType, bgp_message: BgpMessage) -> MrtRecord {
833 let asn = if matches!(
834 msg_type,
835 Bgp4MpType::Message
836 | Bgp4MpType::MessageLocal
837 | Bgp4MpType::MessageAddpath
838 | Bgp4MpType::MessageLocalAddpath
839 ) {
840 Asn::new_16bit(64496)
841 } else {
842 Asn::new_32bit(64496)
843 };
844
845 MrtRecord {
846 common_header: CommonHeader {
847 timestamp: 1_700_000_000,
848 microsecond_timestamp: None,
849 entry_type: EntryType::BGP4MP,
850 entry_subtype: msg_type as u16,
851 length: 0,
852 },
853 message: MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(Bgp4MpMessage {
854 msg_type,
855 peer_asn: asn,
856 local_asn: Asn::new_32bit(64497),
857 interface_index: 0,
858 peer_ip: IpAddr::from_str("192.0.2.1").unwrap(),
859 local_ip: IpAddr::from_str("192.0.2.2").unwrap(),
860 bgp_message,
861 })),
862 }
863 }
864
865 fn open_message() -> BgpMessage {
866 BgpMessage::Open(BgpOpenMessage {
867 version: 4,
868 asn: Asn::new_16bit(64496),
869 hold_time: 180,
870 bgp_identifier: Ipv4Addr::new(192, 0, 2, 1),
871 extended_length: false,
872 opt_params: vec![],
873 })
874 }
875
876 fn raw_bgp_message(length: u16, msg_type: BgpMessageType, payload: &[u8]) -> Bytes {
877 raw_bgp_message_with_marker([0xff; 16], length, msg_type, payload)
878 }
879
880 fn raw_bgp_message_with_marker(
881 marker: [u8; 16],
882 length: u16,
883 msg_type: BgpMessageType,
884 payload: &[u8],
885 ) -> Bytes {
886 let mut bytes = BytesMut::new();
887 bytes.put_slice(&marker);
888 bytes.put_u16(length);
889 bytes.put_u8(msg_type as u8);
890 bytes.put_slice(payload);
891 bytes.freeze()
892 }
893
894 fn table_dump_record() -> MrtRecord {
895 let mut attributes = Attributes::default();
896 attributes.add_attr(AttributeValue::Origin(Origin::IGP).into());
897 attributes.add_attr(
898 AttributeValue::AsPath {
899 path: AsPath::from_sequence([64500, 64501]),
900 is_as4: false,
901 }
902 .into(),
903 );
904 attributes
905 .add_attr(AttributeValue::NextHop(IpAddr::from_str("192.0.2.254").unwrap()).into());
906
907 MrtRecord {
908 common_header: CommonHeader {
909 timestamp: 1_700_000_000,
910 microsecond_timestamp: None,
911 entry_type: EntryType::TABLE_DUMP,
912 entry_subtype: 1,
913 length: 0,
914 },
915 message: MrtMessage::TableDumpMessage(TableDumpMessage {
916 view_number: 0,
917 sequence_number: 1,
918 prefix: NetworkPrefix::from_str("203.0.113.0/24").unwrap(),
919 status: 1,
920 originated_time: 1_699_999_998,
921 peer_ip: IpAddr::from_str("192.0.2.20").unwrap(),
922 peer_asn: Asn::new_16bit(64496),
923 attributes,
924 }),
925 }
926 }
927
928 fn table_dump_ipv6_record() -> MrtRecord {
929 let mut attributes = Attributes::default();
930 attributes.add_attr(AttributeValue::Origin(Origin::IGP).into());
931 attributes.add_attr(
932 AttributeValue::AsPath {
933 path: AsPath::from_sequence([64500, 64501]),
934 is_as4: false,
935 }
936 .into(),
937 );
938
939 MrtRecord {
940 common_header: CommonHeader {
941 timestamp: 1_700_000_000,
942 microsecond_timestamp: None,
943 entry_type: EntryType::TABLE_DUMP,
944 entry_subtype: 2,
945 length: 0,
946 },
947 message: MrtMessage::TableDumpMessage(TableDumpMessage {
948 view_number: 0,
949 sequence_number: 1,
950 prefix: NetworkPrefix::from_str("2001:db8::/32").unwrap(),
951 status: 1,
952 originated_time: 1_699_999_998,
953 peer_ip: IpAddr::from_str("2001:db8::20").unwrap(),
954 peer_asn: Asn::new_16bit(64496),
955 attributes,
956 }),
957 }
958 }
959
960 fn table_dump_v2_records_bytes() -> Vec<u8> {
961 let peer = Peer::new(
962 "192.0.2.10".parse().unwrap(),
963 "192.0.2.11".parse().unwrap(),
964 Asn::new_32bit(64496),
965 );
966 let mut peer_table = PeerIndexTable::default();
967 let peer_index = peer_table.add_peer(peer);
968
969 let mut attributes = Attributes::default();
970 attributes.add_attr(AttributeValue::Origin(Origin::IGP).into());
971 attributes.add_attr(
972 AttributeValue::AsPath {
973 path: AsPath::from_sequence([64500, 64501]),
974 is_as4: false,
975 }
976 .into(),
977 );
978 attributes
979 .add_attr(AttributeValue::NextHop(IpAddr::from_str("192.0.2.254").unwrap()).into());
980
981 let pit_record = MrtRecord {
982 common_header: CommonHeader {
983 timestamp: 1_700_000_000,
984 microsecond_timestamp: None,
985 entry_type: EntryType::TABLE_DUMP_V2,
986 entry_subtype: TableDumpV2Type::PeerIndexTable as u16,
987 length: 0,
988 },
989 message: MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(peer_table)),
990 };
991 let rib_record = MrtRecord {
992 common_header: CommonHeader {
993 timestamp: 1_700_000_001,
994 microsecond_timestamp: None,
995 entry_type: EntryType::TABLE_DUMP_V2,
996 entry_subtype: TableDumpV2Type::RibIpv4Unicast as u16,
997 length: 0,
998 },
999 message: MrtMessage::TableDumpV2Message(TableDumpV2Message::RibAfi(RibAfiEntries {
1000 rib_type: TableDumpV2Type::RibIpv4Unicast,
1001 sequence_number: 1,
1002 prefix: NetworkPrefix::from_str("203.0.113.0/24").unwrap(),
1003 rib_entries: vec![RibEntry {
1004 peer_index,
1005 originated_time: 1_699_999_999,
1006 path_id: None,
1007 attributes,
1008 }],
1009 })),
1010 };
1011
1012 let mut bytes = pit_record.encode().to_vec();
1013 bytes.extend_from_slice(&rib_record.encode());
1014 bytes
1015 }
1016
1017 fn table_dump_v2_truncated_attribute_payload() -> (Vec<u8>, Bytes, PeerIndexTable) {
1018 let peer = Peer::new(
1019 "192.0.2.10".parse().unwrap(),
1020 "192.0.2.11".parse().unwrap(),
1021 Asn::new_32bit(64496),
1022 );
1023 let mut peer_table = PeerIndexTable::default();
1024 let peer_index = peer_table.add_peer(peer);
1025
1026 let pit_record = MrtRecord {
1027 common_header: CommonHeader {
1028 timestamp: 1_700_000_000,
1029 microsecond_timestamp: None,
1030 entry_type: EntryType::TABLE_DUMP_V2,
1031 entry_subtype: TableDumpV2Type::PeerIndexTable as u16,
1032 length: 0,
1033 },
1034 message: MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(
1035 peer_table.clone(),
1036 )),
1037 };
1038
1039 let first_entry = RibEntry {
1040 peer_index,
1041 originated_time: 1_699_999_999,
1042 path_id: None,
1043 attributes: route_attributes([64500, 64501]),
1044 };
1045
1046 let mut rib_body = BytesMut::new();
1047 rib_body.put_u32(1);
1048 rib_body.extend(NetworkPrefix::from_str("203.0.113.0/24").unwrap().encode());
1049 rib_body.put_u16(2);
1050 rib_body.extend(first_entry.encode());
1051 rib_body.put_u16(peer_index);
1052 rib_body.put_u32(1_699_999_998);
1053 rib_body.put_u16(32);
1054 rib_body.put_u8(0);
1055
1056 let rib_body = rib_body.freeze();
1057 let rib_header = CommonHeader {
1058 timestamp: 1_700_000_001,
1059 microsecond_timestamp: None,
1060 entry_type: EntryType::TABLE_DUMP_V2,
1061 entry_subtype: TableDumpV2Type::RibIpv4Unicast as u16,
1062 length: rib_body.len() as u32,
1063 };
1064
1065 let mut bytes = pit_record.encode().to_vec();
1066 bytes.extend_from_slice(&rib_header.encode());
1067 bytes.extend_from_slice(&rib_body);
1068
1069 (bytes, rib_body, peer_table)
1070 }
1071
1072 fn assert_filtered_route_projection(bytes: Vec<u8>, filters: &[(&str, &str)]) {
1073 let elem_parser = filters.iter().fold(
1074 BgpkitParser::from_reader(Cursor::new(bytes.clone())),
1075 |parser, (filter_type, filter_value)| {
1076 parser.add_filter(filter_type, filter_value).unwrap()
1077 },
1078 );
1079 let route_parser = filters.iter().fold(
1080 BgpkitParser::from_reader(Cursor::new(bytes)),
1081 |parser, (filter_type, filter_value)| {
1082 parser.add_filter(filter_type, filter_value).unwrap()
1083 },
1084 );
1085
1086 let elem_projection = elem_parser
1087 .into_elem_iter()
1088 .map(route_projection)
1089 .collect::<Vec<_>>();
1090 let routes = route_parser.into_route_iter().collect::<Vec<_>>();
1091
1092 assert_eq!(routes, elem_projection, "filters: {filters:?}");
1093 }
1094
1095 fn assert_route_projection(bytes: Vec<u8>) -> Vec<BgpRouteElem> {
1096 let elem_projection = BgpkitParser::from_reader(Cursor::new(bytes.clone()))
1097 .into_elem_iter()
1098 .map(route_projection)
1099 .collect::<Vec<_>>();
1100 let routes = BgpkitParser::from_reader(Cursor::new(bytes))
1101 .into_route_iter()
1102 .collect::<Vec<_>>();
1103
1104 assert_eq!(routes, elem_projection);
1105 routes
1106 }
1107
1108 #[test]
1109 fn route_iterator_matches_elem_projection_for_update() {
1110 let bytes = update_record().encode().to_vec();
1111 let routes = assert_route_projection(bytes);
1112 assert_eq!(routes.len(), 2);
1113 assert_eq!(routes[0].elem_type, ElemType::ANNOUNCE);
1114 assert_eq!(routes[1].elem_type, ElemType::WITHDRAW);
1115 assert!(routes[1].as_path.is_none());
1116 }
1117
1118 #[test]
1119 fn route_iterator_shares_as_path_for_update_announcements() {
1120 let bytes = bgp4mp_record(
1121 Bgp4MpType::MessageAs4,
1122 BgpMessage::Update(BgpUpdateMessage {
1123 withdrawn_prefixes: vec![],
1124 attributes: route_attributes([64500, 64501]),
1125 announced_prefixes: vec![
1126 NetworkPrefix::from_str("203.0.113.0/24").unwrap(),
1127 NetworkPrefix::from_str("198.51.100.0/24").unwrap(),
1128 ],
1129 }),
1130 )
1131 .encode()
1132 .to_vec();
1133
1134 let routes = BgpkitParser::from_reader(Cursor::new(bytes))
1135 .into_route_iter()
1136 .collect::<Vec<_>>();
1137
1138 assert_eq!(routes.len(), 2);
1139 assert!(Arc::ptr_eq(
1140 routes[0].as_path.as_ref().unwrap(),
1141 routes[1].as_path.as_ref().unwrap()
1142 ));
1143 }
1144
1145 #[test]
1146 fn route_iterator_uses_microsecond_timestamps() {
1147 let timestamp = record_timestamp(&CommonHeader {
1148 timestamp: 1_700_000_000,
1149 microsecond_timestamp: Some(123_456),
1150 entry_type: EntryType::BGP4MP_ET,
1151 entry_subtype: Bgp4MpType::MessageAs4 as u16,
1152 length: 0,
1153 });
1154
1155 assert_eq!(timestamp, 1_700_000_000.123_456);
1156 }
1157
1158 #[test]
1159 fn route_iterator_matches_elem_projection_for_mp_update() {
1160 let mut attributes = route_attributes([64500, 64501]);
1161 attributes.add_attr(
1162 AttributeValue::MpReachNlri(Nlri::new_reachable(
1163 NetworkPrefix::from_str("2001:db8::/32").unwrap(),
1164 Some(IpAddr::from_str("2001:db8::1").unwrap()),
1165 ))
1166 .into(),
1167 );
1168 attributes.add_attr(
1169 AttributeValue::MpUnreachNlri(Nlri::new_unreachable(
1170 NetworkPrefix::from_str("2001:db8:1::/48").unwrap(),
1171 ))
1172 .into(),
1173 );
1174
1175 let bytes = bgp4mp_record(
1176 Bgp4MpType::MessageAs4,
1177 BgpMessage::Update(BgpUpdateMessage {
1178 withdrawn_prefixes: vec![],
1179 attributes,
1180 announced_prefixes: vec![],
1181 }),
1182 )
1183 .encode()
1184 .to_vec();
1185
1186 let routes = assert_route_projection(bytes);
1187 assert_eq!(routes.len(), 2);
1188 assert_eq!(routes[0].elem_type, ElemType::ANNOUNCE);
1189 assert_eq!(
1190 routes[0].prefix,
1191 NetworkPrefix::from_str("2001:db8::/32").unwrap()
1192 );
1193 assert_eq!(routes[1].elem_type, ElemType::WITHDRAW);
1194 assert_eq!(
1195 routes[1].prefix,
1196 NetworkPrefix::from_str("2001:db8:1::/48").unwrap()
1197 );
1198 }
1199
1200 #[test]
1201 fn route_iterator_matches_elem_projection_for_non_update_bgp4mp_messages() {
1202 let records = [
1203 bgp4mp_record(Bgp4MpType::Message, open_message()),
1204 bgp4mp_record(
1205 Bgp4MpType::MessageAs4,
1206 BgpMessage::Notification(BgpNotificationMessage {
1207 error: BgpError::Unknown(1, 0),
1208 data: vec![],
1209 }),
1210 ),
1211 bgp4mp_record(Bgp4MpType::MessageAddpath, BgpMessage::KeepAlive),
1212 bgp4mp_record(Bgp4MpType::MessageAs4Addpath, BgpMessage::KeepAlive),
1213 ];
1214 let mut bytes = Vec::new();
1215 for record in records {
1216 bytes.extend_from_slice(&record.encode());
1217 }
1218
1219 assert!(assert_route_projection(bytes).is_empty());
1220 }
1221
1222 #[test]
1223 fn route_iterator_matches_elem_projection_for_bgp4mp_16bit_update() {
1224 let bytes = bgp4mp_record(
1225 Bgp4MpType::Message,
1226 BgpMessage::Update(BgpUpdateMessage {
1227 withdrawn_prefixes: vec![],
1228 attributes: route_attributes([64500, 64501]),
1229 announced_prefixes: vec![NetworkPrefix::from_str("203.0.113.0/24").unwrap()],
1230 }),
1231 )
1232 .encode()
1233 .to_vec();
1234
1235 let routes = assert_route_projection(bytes);
1236 assert_eq!(routes.len(), 1);
1237 assert_eq!(routes[0].peer_asn, Asn::new_16bit(64496));
1238 }
1239
1240 #[test]
1241 fn route_iterator_filters_match_elem_projection_for_update() {
1242 let bytes = update_record().encode().to_vec();
1243 let cases: &[&[(&str, &str)]] = &[
1244 &[("peer_ip", "192.0.2.1")],
1245 &[("peer_ip", "192.0.2.99")],
1246 &[("peer_asn", "64496")],
1247 &[("type", "a")],
1248 &[("type", "w")],
1249 &[("type", "!w")],
1250 &[("prefix", "203.0.113.0/24")],
1251 &[("prefix", "198.51.100.0/24")],
1252 &[("prefix_super", "203.0.113.128/25")],
1253 &[("origin_asn", "64501")],
1254 &[("origin_asns", "64496,64501")],
1255 &[("as_path", "64500 64501$")],
1256 &[("ip_version", "4")],
1257 &[("ts_start", "1700000000"), ("ts_end", "1700000000")],
1258 &[("peer_ip", "192.0.2.1"), ("type", "a")],
1259 ];
1260
1261 for filters in cases {
1262 assert_filtered_route_projection(bytes.clone(), filters);
1263 }
1264 }
1265
1266 #[test]
1267 fn selective_attribute_parser_merges_as4_path() {
1268 let mut attributes = Attributes::default();
1269 attributes.add_attr(
1270 AttributeValue::AsPath {
1271 path: AsPath::from_sequence([23456, 64497]),
1272 is_as4: false,
1273 }
1274 .into(),
1275 );
1276 attributes.add_attr(
1277 AttributeValue::AsPath {
1278 path: AsPath::from_sequence([65536, 64497]),
1279 is_as4: true,
1280 }
1281 .into(),
1282 );
1283
1284 let attrs = parse_route_attributes(
1285 attributes.encode(AsnLength::Bits16),
1286 &AsnLength::Bits16,
1287 false,
1288 RouteAttributeContext {
1289 afi: None,
1290 safi: None,
1291 prefixes: None,
1292 is_announcement: Some(true),
1293 has_standard_nlri: true,
1294 },
1295 )
1296 .unwrap();
1297
1298 assert_eq!(
1299 attrs.as_path.unwrap().to_u32_vec_opt(false).unwrap(),
1300 vec![65536, 64497]
1301 );
1302 }
1303
1304 #[test]
1305 fn selective_attribute_parser_handles_as_path_without_as4_path() {
1306 let attrs = parse_route_attributes(
1307 route_attributes([64500, 64501]).encode(AsnLength::Bits16),
1308 &AsnLength::Bits16,
1309 false,
1310 RouteAttributeContext {
1311 afi: None,
1312 safi: None,
1313 prefixes: None,
1314 is_announcement: Some(true),
1315 has_standard_nlri: true,
1316 },
1317 )
1318 .unwrap();
1319
1320 assert_eq!(
1321 attrs.as_path.unwrap().to_u32_vec_opt(false).unwrap(),
1322 vec![64500, 64501]
1323 );
1324 }
1325
1326 #[test]
1327 fn selective_attribute_parser_handles_as4_path_without_as_path() {
1328 let mut attributes = Attributes::default();
1329 attributes.add_attr(
1330 AttributeValue::AsPath {
1331 path: AsPath::from_sequence([65536, 64497]),
1332 is_as4: true,
1333 }
1334 .into(),
1335 );
1336
1337 let attrs = parse_route_attributes(
1338 attributes.encode(AsnLength::Bits16),
1339 &AsnLength::Bits16,
1340 false,
1341 RouteAttributeContext {
1342 afi: None,
1343 safi: None,
1344 prefixes: None,
1345 is_announcement: Some(false),
1346 has_standard_nlri: false,
1347 },
1348 )
1349 .unwrap();
1350
1351 assert_eq!(
1352 attrs.as_path.unwrap().to_u32_vec_opt(false).unwrap(),
1353 vec![65536, 64497]
1354 );
1355 }
1356
1357 #[test]
1358 fn selective_attribute_parser_handles_no_as_path() {
1359 let attrs = parse_route_attributes(
1360 Bytes::new(),
1361 &AsnLength::Bits16,
1362 false,
1363 RouteAttributeContext {
1364 afi: None,
1365 safi: None,
1366 prefixes: None,
1367 is_announcement: Some(false),
1368 has_standard_nlri: false,
1369 },
1370 )
1371 .unwrap();
1372
1373 assert!(attrs.as_path.is_none());
1374 }
1375
1376 #[test]
1377 fn selective_attribute_parser_handles_extended_and_truncated_attributes() {
1378 let mut extended_as_path = BytesMut::new();
1379 extended_as_path.put_u8((AttrFlags::TRANSITIVE | AttrFlags::EXTENDED).bits());
1380 extended_as_path.put_u8(u8::from(AttrType::AS_PATH));
1381 extended_as_path.put_u16(4);
1382 extended_as_path.put_u8(2);
1383 extended_as_path.put_u8(1);
1384 extended_as_path.put_u16(64500);
1385
1386 let attrs = parse_route_attributes(
1387 extended_as_path.freeze(),
1388 &AsnLength::Bits16,
1389 false,
1390 RouteAttributeContext {
1391 afi: None,
1392 safi: None,
1393 prefixes: None,
1394 is_announcement: Some(false),
1395 has_standard_nlri: false,
1396 },
1397 )
1398 .unwrap();
1399 assert_eq!(
1400 attrs.as_path.unwrap().to_u32_vec_opt(false).unwrap(),
1401 vec![64500]
1402 );
1403
1404 let attrs = parse_route_attributes(
1405 Bytes::from_static(&[0x40, 2, 5, 0]),
1406 &AsnLength::Bits16,
1407 false,
1408 RouteAttributeContext {
1409 afi: None,
1410 safi: None,
1411 prefixes: None,
1412 is_announcement: Some(false),
1413 has_standard_nlri: false,
1414 },
1415 )
1416 .unwrap();
1417 assert!(attrs.as_path.is_none());
1418 }
1419
1420 #[test]
1421 fn selective_attribute_parser_discards_malformed_as_path() {
1422 let attrs = parse_route_attributes(
1423 Bytes::from_static(&[0x40, 2, 1, 0]),
1424 &AsnLength::Bits16,
1425 false,
1426 RouteAttributeContext {
1427 afi: None,
1428 safi: None,
1429 prefixes: None,
1430 is_announcement: Some(false),
1431 has_standard_nlri: false,
1432 },
1433 )
1434 .unwrap();
1435
1436 assert!(attrs.as_path.is_none());
1437 }
1438
1439 #[test]
1440 fn route_iterator_matches_elem_projection_for_table_dump() {
1441 let bytes = table_dump_record().encode().to_vec();
1442 let routes = assert_route_projection(bytes);
1443 assert_eq!(routes.len(), 1);
1444 assert_eq!(routes[0].timestamp, 1_699_999_998.0);
1445 assert_eq!(routes[0].peer_asn, Asn::new_16bit(64496));
1446 }
1447
1448 #[test]
1449 fn route_iterator_matches_elem_projection_for_table_dump_ipv6() {
1450 let bytes = table_dump_ipv6_record().encode().to_vec();
1451 let routes = assert_route_projection(bytes);
1452 assert_eq!(routes.len(), 1);
1453 assert_eq!(
1454 routes[0].prefix,
1455 NetworkPrefix::from_str("2001:db8::/32").unwrap()
1456 );
1457 assert_eq!(
1458 routes[0].peer_ip,
1459 IpAddr::from(Ipv6Addr::from_str("2001:db8::20").unwrap())
1460 );
1461 }
1462
1463 #[test]
1464 fn route_iterator_matches_elem_projection_for_table_dump_v2() {
1465 let bytes = table_dump_v2_records_bytes();
1466 let routes = assert_route_projection(bytes);
1467 assert_eq!(routes.len(), 1);
1468 assert_eq!(routes[0].elem_type, ElemType::ANNOUNCE);
1469 assert_eq!(
1470 routes[0].as_path.as_ref().unwrap().to_u32_vec_opt(false),
1471 Some(vec![64500, 64501])
1472 );
1473 }
1474
1475 #[test]
1476 fn route_iterator_matches_elem_projection_for_table_dump_v2_ipv6_addpath() {
1477 let peer = Peer::new(
1478 "192.0.2.11".parse().unwrap(),
1479 "2001:db8::10".parse().unwrap(),
1480 Asn::new_32bit(64496),
1481 );
1482 let mut peer_table = PeerIndexTable::default();
1483 let peer_index = peer_table.add_peer(peer);
1484
1485 let pit_record = MrtRecord {
1486 common_header: CommonHeader {
1487 timestamp: 1_700_000_000,
1488 microsecond_timestamp: None,
1489 entry_type: EntryType::TABLE_DUMP_V2,
1490 entry_subtype: TableDumpV2Type::PeerIndexTable as u16,
1491 length: 0,
1492 },
1493 message: MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(peer_table)),
1494 };
1495 let rib_record = MrtRecord {
1496 common_header: CommonHeader {
1497 timestamp: 1_700_000_001,
1498 microsecond_timestamp: None,
1499 entry_type: EntryType::TABLE_DUMP_V2,
1500 entry_subtype: TableDumpV2Type::RibIpv6UnicastAddPath as u16,
1501 length: 0,
1502 },
1503 message: MrtMessage::TableDumpV2Message(TableDumpV2Message::RibAfi(RibAfiEntries {
1504 rib_type: TableDumpV2Type::RibIpv6UnicastAddPath,
1505 sequence_number: 1,
1506 prefix: NetworkPrefix::from_str("2001:db8::/32").unwrap(),
1507 rib_entries: vec![RibEntry {
1508 peer_index,
1509 originated_time: 1_699_999_999,
1510 path_id: Some(1234),
1511 attributes: route_attributes([64500, 64501]),
1512 }],
1513 })),
1514 };
1515
1516 let mut bytes = pit_record.encode().to_vec();
1517 bytes.extend_from_slice(&rib_record.encode());
1518 let routes = assert_route_projection(bytes);
1519 assert_eq!(routes.len(), 1);
1520 assert_eq!(
1521 routes[0].prefix,
1522 NetworkPrefix::from_str("2001:db8::/32").unwrap()
1523 );
1524 }
1525
1526 #[test]
1527 fn route_iterator_matches_elem_projection_for_bgp4mp_ipv6_peer_update() {
1528 let record = MrtRecord {
1529 common_header: CommonHeader {
1530 timestamp: 1_700_000_000,
1531 microsecond_timestamp: None,
1532 entry_type: EntryType::BGP4MP,
1533 entry_subtype: Bgp4MpType::MessageAs4 as u16,
1534 length: 0,
1535 },
1536 message: MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(Bgp4MpMessage {
1537 msg_type: Bgp4MpType::MessageAs4,
1538 peer_asn: Asn::new_32bit(64496),
1539 local_asn: Asn::new_32bit(64497),
1540 interface_index: 0,
1541 peer_ip: IpAddr::from_str("2001:db8::1").unwrap(),
1542 local_ip: IpAddr::from_str("2001:db8::2").unwrap(),
1543 bgp_message: BgpMessage::Update(BgpUpdateMessage {
1544 withdrawn_prefixes: vec![],
1545 attributes: route_attributes([64500, 64501]),
1546 announced_prefixes: vec![NetworkPrefix::from_str("203.0.113.0/24").unwrap()],
1547 }),
1548 })),
1549 };
1550
1551 let routes = assert_route_projection(record.encode().to_vec());
1552 assert_eq!(routes.len(), 1);
1553 assert_eq!(
1554 routes[0].peer_ip,
1555 IpAddr::from(Ipv6Addr::from_str("2001:db8::1").unwrap())
1556 );
1557 }
1558
1559 #[test]
1560 fn route_iterator_filters_match_elem_projection_for_table_dump_v2() {
1561 let bytes = table_dump_v2_records_bytes();
1562 let cases: &[&[(&str, &str)]] = &[
1563 &[("peer_ip", "192.0.2.10")],
1564 &[("peer_asn", "64496")],
1565 &[("type", "a")],
1566 &[("type", "w")],
1567 &[("prefix", "203.0.113.0/24")],
1568 &[("prefix_sub", "203.0.112.0/23")],
1569 &[("origin_asn", "64501")],
1570 &[("as_path", "64500 64501$")],
1571 &[("ts_start", "1699999999"), ("ts_end", "1699999999")],
1572 &[("peer_asn", "64496"), ("origin_asn", "64501")],
1573 ];
1574
1575 for filters in cases {
1576 assert_filtered_route_projection(bytes.clone(), filters);
1577 }
1578 }
1579
1580 #[test]
1581 fn route_parser_reports_bgp_message_shape_errors() {
1582 assert!(parse_bgp_message_routes(
1583 raw_bgp_message(18, BgpMessageType::KEEPALIVE, &[]),
1584 false,
1585 &AsnLength::Bits16,
1586 1_700_000_000.0,
1587 "192.0.2.1".parse().unwrap(),
1588 Asn::new_16bit(64496)
1589 )
1590 .is_err());
1591 assert!(parse_bgp_message_routes(
1592 raw_bgp_message(4097, BgpMessageType::OPEN, &[]),
1593 false,
1594 &AsnLength::Bits16,
1595 1_700_000_000.0,
1596 "192.0.2.1".parse().unwrap(),
1597 Asn::new_16bit(64496)
1598 )
1599 .is_err());
1600
1601 let routes = collect_route_record_iter(
1602 parse_bgp_message_routes(
1603 raw_bgp_message(30, BgpMessageType::KEEPALIVE, &[]),
1604 false,
1605 &AsnLength::Bits16,
1606 1_700_000_000.0,
1607 "192.0.2.1".parse().unwrap(),
1608 Asn::new_16bit(64496),
1609 )
1610 .unwrap(),
1611 )
1612 .unwrap();
1613 assert!(routes.is_empty());
1614
1615 let routes = collect_route_record_iter(
1616 parse_bgp_message_routes(
1617 raw_bgp_message(19, BgpMessageType::KEEPALIVE, &[0]),
1618 false,
1619 &AsnLength::Bits16,
1620 1_700_000_000.0,
1621 "192.0.2.1".parse().unwrap(),
1622 Asn::new_16bit(64496),
1623 )
1624 .unwrap(),
1625 )
1626 .unwrap();
1627 assert!(routes.is_empty());
1628
1629 let routes = collect_route_record_iter(
1630 parse_bgp_message_routes(
1631 raw_bgp_message_with_marker([0x00; 16], 19, BgpMessageType::KEEPALIVE, &[]),
1632 false,
1633 &AsnLength::Bits16,
1634 1_700_000_000.0,
1635 "192.0.2.1".parse().unwrap(),
1636 Asn::new_16bit(64496),
1637 )
1638 .unwrap(),
1639 )
1640 .unwrap();
1641 assert!(routes.is_empty());
1642 }
1643
1644 #[test]
1645 fn route_core_dump_write_respects_enabled_flag() {
1646 let dir = tempfile::tempdir().unwrap();
1647 let path = dir.path().join("mrt_core_dump");
1648
1649 write_mrt_core_dump_to_path(false, Some(vec![1, 2, 3]), &path);
1650 assert!(!path.exists());
1651
1652 write_mrt_core_dump_to_path(true, Some(vec![1, 2, 3]), &path);
1653 assert_eq!(std::fs::read(&path).unwrap(), vec![1, 2, 3]);
1654 }
1655
1656 #[test]
1657 fn route_parser_handles_table_dump_v2_error_edges() {
1658 let rib = RibAfiEntries {
1659 rib_type: TableDumpV2Type::RibIpv4Unicast,
1660 sequence_number: 1,
1661 prefix: NetworkPrefix::from_str("203.0.113.0/24").unwrap(),
1662 rib_entries: vec![RibEntry {
1663 peer_index: 99,
1664 originated_time: 1_699_999_999,
1665 path_id: None,
1666 attributes: route_attributes([64500, 64501]),
1667 }],
1668 };
1669 let mut no_peer_table = None;
1670 assert!(parse_table_dump_v2_routes(
1671 TableDumpV2Type::RibIpv4Unicast as u16,
1672 rib.encode(),
1673 &mut no_peer_table,
1674 )
1675 .is_err());
1676
1677 let mut empty_peer_table = Some(RoutePeerTable::default());
1678 let routes = collect_route_record_iter(
1679 parse_table_dump_v2_routes(
1680 TableDumpV2Type::RibIpv4Unicast as u16,
1681 rib.encode(),
1682 &mut empty_peer_table,
1683 )
1684 .unwrap(),
1685 )
1686 .unwrap();
1687 assert!(routes.is_empty());
1688
1689 let mut truncated = BytesMut::new();
1690 truncated.put_u32(1);
1691 truncated.extend(NetworkPrefix::from_str("203.0.113.0/24").unwrap().encode());
1692 truncated.put_u16(1);
1693 let mut empty_peer_table = Some(RoutePeerTable::default());
1694 let routes = collect_route_record_iter(
1695 parse_table_dump_v2_routes(
1696 TableDumpV2Type::RibIpv4Unicast as u16,
1697 truncated.freeze(),
1698 &mut empty_peer_table,
1699 )
1700 .unwrap(),
1701 )
1702 .unwrap();
1703 assert!(routes.is_empty());
1704
1705 let peer = Peer::new(
1706 "192.0.2.10".parse().unwrap(),
1707 "192.0.2.11".parse().unwrap(),
1708 Asn::new_32bit(64496),
1709 );
1710 let mut peer_table = PeerIndexTable::default();
1711 let peer_index = peer_table.add_peer(peer);
1712
1713 let first_entry = RibEntry {
1714 peer_index,
1715 originated_time: 1_699_999_999,
1716 path_id: Some(1234),
1717 attributes: route_attributes([64500, 64501]),
1718 };
1719 let mut add_path_truncated = BytesMut::new();
1720 add_path_truncated.put_u32(1);
1721 add_path_truncated.extend(NetworkPrefix::from_str("203.0.113.0/24").unwrap().encode());
1722 add_path_truncated.put_u16(2);
1723 add_path_truncated.extend(first_entry.encode());
1724 add_path_truncated.put_u16(peer_index);
1725 add_path_truncated.put_u32(1_699_999_998);
1726 add_path_truncated.put_u32(5678);
1727
1728 let mut peer_table = Some(route_peer_table_from_peer_index(peer_table));
1729 let routes = collect_route_record_iter(
1730 parse_table_dump_v2_routes(
1731 TableDumpV2Type::RibIpv4UnicastAddPath as u16,
1732 add_path_truncated.freeze(),
1733 &mut peer_table,
1734 )
1735 .unwrap(),
1736 )
1737 .unwrap();
1738 assert_eq!(routes.len(), 1);
1739 assert_eq!(
1740 routes[0].prefix,
1741 NetworkPrefix::from_str("203.0.113.0/24").unwrap()
1742 );
1743 }
1744
1745 #[test]
1746 fn route_parser_preserves_table_dump_v2_routes_before_truncated_attribute_payload() {
1747 let (_bytes, rib_body, peer_table) = table_dump_v2_truncated_attribute_payload();
1748 let mut peer_table = Some(route_peer_table_from_peer_index(peer_table));
1749
1750 let routes = collect_route_record_iter(
1751 parse_table_dump_v2_routes(
1752 TableDumpV2Type::RibIpv4Unicast as u16,
1753 rib_body,
1754 &mut peer_table,
1755 )
1756 .unwrap(),
1757 )
1758 .unwrap();
1759
1760 assert_eq!(routes.len(), 1);
1761 assert_eq!(
1762 routes[0].prefix,
1763 NetworkPrefix::from_str("203.0.113.0/24").unwrap()
1764 );
1765 assert_eq!(
1766 routes[0].as_path.as_ref().unwrap().to_u32_vec_opt(false),
1767 Some(vec![64500, 64501])
1768 );
1769 }
1770
1771 #[test]
1772 fn route_iterators_preserve_table_dump_v2_routes_before_truncated_attribute_payload() {
1773 let (bytes, _rib_body, _peer_table) = table_dump_v2_truncated_attribute_payload();
1774
1775 let routes = BgpkitParser::from_reader(Cursor::new(bytes.clone()))
1776 .into_route_iter()
1777 .collect::<Vec<_>>();
1778 assert_eq!(routes.len(), 1);
1779 assert_eq!(
1780 routes[0].prefix,
1781 NetworkPrefix::from_str("203.0.113.0/24").unwrap()
1782 );
1783
1784 let fallible_routes = BgpkitParser::from_reader(Cursor::new(bytes))
1785 .into_fallible_route_iter()
1786 .collect::<Result<Vec<_>, _>>()
1787 .unwrap();
1788 assert_eq!(fallible_routes, routes);
1789 }
1790
1791 fn table_dump_v2_rib_without_peer_table_record() -> MrtRecord {
1792 MrtRecord {
1793 common_header: CommonHeader {
1794 timestamp: 1_700_000_001,
1795 microsecond_timestamp: None,
1796 entry_type: EntryType::TABLE_DUMP_V2,
1797 entry_subtype: TableDumpV2Type::RibIpv4Unicast as u16,
1798 length: 0,
1799 },
1800 message: MrtMessage::TableDumpV2Message(TableDumpV2Message::RibAfi(RibAfiEntries {
1801 rib_type: TableDumpV2Type::RibIpv4Unicast,
1802 sequence_number: 1,
1803 prefix: NetworkPrefix::from_str("203.0.113.0/24").unwrap(),
1804 rib_entries: vec![RibEntry {
1805 peer_index: 0,
1806 originated_time: 1_699_999_999,
1807 path_id: None,
1808 attributes: route_attributes([64500, 64501]),
1809 }],
1810 })),
1811 }
1812 }
1813
1814 #[test]
1815 fn route_iterator_skips_route_parse_errors() {
1816 let routes = BgpkitParser::from_reader(Cursor::new(
1817 table_dump_v2_rib_without_peer_table_record()
1818 .encode()
1819 .to_vec(),
1820 ))
1821 .into_route_iter()
1822 .collect::<Vec<_>>();
1823
1824 assert!(routes.is_empty());
1825 }
1826
1827 #[test]
1828 fn fallible_route_iterator_applies_filters_to_cached_routes() {
1829 let routes = BgpkitParser::from_reader(Cursor::new(update_record().encode().to_vec()))
1830 .add_filter("type", "w")
1831 .unwrap()
1832 .into_fallible_route_iter()
1833 .collect::<Result<Vec<_>, _>>()
1834 .unwrap();
1835
1836 assert_eq!(routes.len(), 1);
1837 assert_eq!(routes[0].elem_type, ElemType::WITHDRAW);
1838 }
1839
1840 #[test]
1841 fn fallible_route_iterator_returns_route_parse_errors() {
1842 let mut iter = BgpkitParser::from_reader(Cursor::new(
1843 table_dump_v2_rib_without_peer_table_record()
1844 .encode()
1845 .to_vec(),
1846 ))
1847 .into_fallible_route_iter();
1848
1849 assert!(iter.next().unwrap().is_err());
1850 }
1851
1852 #[test]
1853 fn fallible_route_iterator_yields_routes() {
1854 let bytes = update_record().encode().to_vec();
1855 let routes = BgpkitParser::from_reader(Cursor::new(bytes))
1856 .into_fallible_route_iter()
1857 .collect::<Result<Vec<_>, _>>()
1858 .unwrap();
1859
1860 assert_eq!(routes.len(), 2);
1861 assert_eq!(routes[0].elem_type, ElemType::ANNOUNCE);
1862 assert_eq!(routes[1].elem_type, ElemType::WITHDRAW);
1863 }
1864
1865 #[test]
1866 fn fallible_route_iterator_returns_parse_errors() {
1867 let invalid_data = vec![
1868 0x00, 0x00, 0x00, 0x00, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, ];
1874
1875 let mut iter =
1876 BgpkitParser::from_reader(Cursor::new(invalid_data)).into_fallible_route_iter();
1877
1878 assert!(iter.next().unwrap().is_err());
1879 }
1880}