Skip to main content

bgpkit_parser/parser/mrt/
mrt_elem.rs

1#![allow(unused)]
2//! This module handles converting MRT records into individual per-prefix BGP elements.
3//!
4//! Each MRT record may contain reachability information for multiple prefixes. This module breaks
5//! down MRT records into corresponding BGP elements, and thus allowing users to more conveniently
6//! process BGP information on a per-prefix basis.
7use crate::models::*;
8use crate::parser::bgp::messages::parse_bgp_update_message;
9use crate::ParserError;
10use crate::ParserError::ParseError;
11use itertools::Itertools;
12use log::{error, warn};
13use std::collections::HashMap;
14use std::fmt::{Display, Formatter};
15use std::net::{IpAddr, Ipv4Addr};
16
17#[derive(Default, Debug, Clone)]
18pub struct Elementor {
19    pub peer_table: Option<PeerIndexTable>,
20}
21
22/// Error returned by [`Elementor::record_to_elems_iter`].
23#[derive(Debug)]
24pub enum ElemError {
25    /// The record contains a [`PeerIndexTable`]. The contained table can be
26    /// passed to [`Elementor::with_peer_table`] to create an initialized elementor.
27    UnexpectedPeerIndexTable(Box<PeerIndexTable>),
28    /// A peer table is required for processing TableDumpV2 RIB entries,
29    /// but none has been set on this elementor.
30    MissingPeerTable,
31    /// The record contains a [`RibGenericEntries`] which is not yet supported.
32    UnsupportedRibGeneric,
33}
34
35impl Display for ElemError {
36    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
37        match self {
38            ElemError::UnexpectedPeerIndexTable(_) => {
39                write!(f, "unexpected PeerIndexTable record")
40            }
41            ElemError::MissingPeerTable => {
42                write!(
43                    f,
44                    "peer table not set; call set_peer_table or use with_peer_table first"
45                )
46            }
47            ElemError::UnsupportedRibGeneric => {
48                write!(f, "RibGenericEntries not yet supported")
49            }
50        }
51    }
52}
53
54impl std::error::Error for ElemError {}
55
56// use macro_rules! <name of macro>{<Body>}
57macro_rules! get_attr_value {
58    ($a:tt, $b:expr) => {
59        if let Attribute::$a(x) = $b {
60            Some(x)
61        } else {
62            None
63        }
64    };
65}
66
67#[allow(clippy::type_complexity)]
68fn get_relevant_attributes(
69    attributes: Attributes,
70) -> (
71    Option<AsPath>,
72    Option<AsPath>,
73    Option<Origin>,
74    Option<IpAddr>,
75    Option<u32>,
76    Option<u32>,
77    Option<Vec<MetaCommunity>>,
78    bool,
79    Option<(Asn, BgpIdentifier)>,
80    Option<Nlri>,
81    Option<Nlri>,
82    Option<Asn>,
83    Option<Vec<AttrRaw>>,
84    Option<Vec<AttrRaw>>,
85) {
86    let mut as_path = None;
87    let mut as4_path = None;
88    let mut origin = None;
89    let mut next_hop = None;
90    let mut local_pref = Some(0);
91    let mut med = Some(0);
92    let mut atomic = false;
93    let mut aggregator = None;
94    let mut announced = None;
95    let mut withdrawn = None;
96    let mut otc = None;
97    let mut unknown = vec![];
98    let mut deprecated = vec![];
99
100    let mut communities_vec: Vec<MetaCommunity> = vec![];
101
102    for attr in attributes {
103        match attr {
104            AttributeValue::Origin(v) => origin = Some(v),
105            AttributeValue::AsPath {
106                path,
107                is_as4: false,
108            } => as_path = Some(path),
109            AttributeValue::AsPath { path, is_as4: true } => as4_path = Some(path),
110            AttributeValue::NextHop(v) => next_hop = Some(v),
111            AttributeValue::MultiExitDiscriminator(v) => med = Some(v),
112            AttributeValue::LocalPreference(v) => local_pref = Some(v),
113            AttributeValue::AtomicAggregate => atomic = true,
114            AttributeValue::Communities(v) => communities_vec.extend(
115                v.into_iter()
116                    .map(MetaCommunity::Plain)
117                    .collect::<Vec<MetaCommunity>>(),
118            ),
119            AttributeValue::ExtendedCommunities(v) => communities_vec.extend(
120                v.into_iter()
121                    .map(MetaCommunity::Extended)
122                    .collect::<Vec<MetaCommunity>>(),
123            ),
124            AttributeValue::Ipv6AddressSpecificExtendedCommunities(v) => communities_vec.extend(
125                v.into_iter()
126                    .map(MetaCommunity::Ipv6Extended)
127                    .collect::<Vec<MetaCommunity>>(),
128            ),
129            AttributeValue::LargeCommunities(v) => communities_vec.extend(
130                v.into_iter()
131                    .map(MetaCommunity::Large)
132                    .collect::<Vec<MetaCommunity>>(),
133            ),
134            AttributeValue::Aggregator { asn, id, .. } => aggregator = Some((asn, id)),
135            AttributeValue::MpReachNlri(nlri) => announced = Some(nlri),
136            AttributeValue::MpUnreachNlri(nlri) => withdrawn = Some(nlri),
137            AttributeValue::OnlyToCustomer(o) => otc = Some(o),
138
139            AttributeValue::Unknown(t) => {
140                unknown.push(t);
141            }
142            AttributeValue::Deprecated(t) => {
143                deprecated.push(t);
144            }
145
146            AttributeValue::OriginatorId(_)
147            | AttributeValue::Clusters(_)
148            | AttributeValue::Development(_)
149            | AttributeValue::LinkState(_)
150            | AttributeValue::TunnelEncapsulation(_)
151            | AttributeValue::Aigp(_)
152            | AttributeValue::AttrSet(_) => {}
153        };
154    }
155
156    let communities = match !communities_vec.is_empty() {
157        true => Some(communities_vec),
158        false => None,
159    };
160
161    // If the next_hop is not set, we try to get it from the announced NLRI.
162    let next_hop = next_hop.or_else(|| {
163        announced.as_ref().and_then(|v| {
164            v.next_hop.as_ref().map(|h| match h {
165                NextHopAddress::Ipv4(v) => IpAddr::from(*v),
166                NextHopAddress::Ipv6(v) => IpAddr::from(*v),
167                NextHopAddress::Ipv6LinkLocal(v, _) => IpAddr::from(*v),
168                // RFC 8950: VPN next hops - return the IPv6 address part
169                NextHopAddress::VpnIpv6(_, v) => IpAddr::from(*v),
170                NextHopAddress::VpnIpv6LinkLocal(_, v, _, _) => IpAddr::from(*v),
171            })
172        })
173    });
174
175    (
176        as_path,
177        as4_path,
178        origin,
179        next_hop,
180        local_pref,
181        med,
182        communities,
183        atomic,
184        aggregator,
185        announced,
186        withdrawn,
187        otc,
188        if unknown.is_empty() {
189            None
190        } else {
191            Some(unknown)
192        },
193        if deprecated.is_empty() {
194            None
195        } else {
196            Some(deprecated)
197        },
198    )
199}
200
201fn rib_entry_to_elem(prefix: NetworkPrefix, peer: &Peer, entry: RibEntry) -> BgpElem {
202    let (
203        as_path,
204        as4_path,
205        origin,
206        next_hop,
207        local_pref,
208        med,
209        communities,
210        atomic,
211        aggregator,
212        announced,
213        _withdrawn,
214        only_to_customer,
215        unknown,
216        deprecated,
217    ) = get_relevant_attributes(entry.attributes);
218
219    let path = match (as_path, as4_path) {
220        (None, None) => None,
221        (Some(v), None) => Some(v),
222        (None, Some(v)) => Some(v),
223        (Some(v1), Some(v2)) => Some(AsPath::merge_aspath_as4path(&v1, &v2)),
224    };
225
226    let next_hop = match next_hop {
227        Some(v) => Some(v),
228        None => announced.and_then(|v| {
229            v.next_hop.map(|h| match h {
230                NextHopAddress::Ipv4(v) => IpAddr::from(v),
231                NextHopAddress::Ipv6(v) => IpAddr::from(v),
232                NextHopAddress::Ipv6LinkLocal(v, _) => IpAddr::from(v),
233                NextHopAddress::VpnIpv6(_, v) => IpAddr::from(v),
234                NextHopAddress::VpnIpv6LinkLocal(_, v, _, _) => IpAddr::from(v),
235            })
236        }),
237    };
238
239    let origin_asns = path
240        .as_ref()
241        .map(|as_path| as_path.iter_origins().collect());
242
243    BgpElem {
244        timestamp: entry.originated_time as f64,
245        elem_type: ElemType::ANNOUNCE,
246        peer_ip: peer.peer_ip,
247        peer_asn: peer.peer_asn,
248        peer_bgp_id: Some(peer.peer_bgp_id),
249        prefix,
250        next_hop,
251        as_path: path,
252        origin,
253        origin_asns,
254        local_pref,
255        med,
256        communities,
257        atomic,
258        aggr_asn: aggregator.map(|v| v.0),
259        aggr_ip: aggregator.map(|v| v.1),
260        only_to_customer,
261        unknown,
262        deprecated,
263    }
264}
265
266/// Iterator over [`BgpElem`]s produced from a single [`MrtRecord`],
267/// without requiring a mutable reference to the [`Elementor`].
268///
269/// This avoids allocating a `Vec` for the common RIB table dump case
270/// by lazily converting each [`RibEntry`] into a [`BgpElem`] on demand.
271pub enum RecordElemIter<'a> {
272    #[doc(hidden)]
273    Empty,
274    #[doc(hidden)]
275    TableDump(Option<BgpElem>),
276    #[doc(hidden)]
277    RibAfi {
278        peer_table: &'a PeerIndexTable,
279        prefix: NetworkPrefix,
280        entries: std::vec::IntoIter<RibEntry>,
281    },
282    #[doc(hidden)]
283    Bgp4Mp(BgpUpdateElemIter),
284}
285
286impl Iterator for RecordElemIter<'_> {
287    type Item = BgpElem;
288
289    fn next(&mut self) -> Option<BgpElem> {
290        match self {
291            RecordElemIter::Empty => None,
292            RecordElemIter::TableDump(elem) => elem.take(),
293            RecordElemIter::Bgp4Mp(iter) => iter.next(),
294            RecordElemIter::RibAfi {
295                peer_table,
296                prefix,
297                entries,
298            } => {
299                let entry = entries.next()?;
300                let pid = entry.peer_index;
301                match peer_table.get_peer_by_id(&pid) {
302                    Some(peer) => Some(rib_entry_to_elem(*prefix, peer, entry)),
303                    None => {
304                        error!("peer ID {} not found in peer_index table", pid);
305                        *self = RecordElemIter::Empty;
306                        None
307                    }
308                }
309            }
310        }
311    }
312
313    fn size_hint(&self) -> (usize, Option<usize>) {
314        match self {
315            RecordElemIter::Empty => (0, Some(0)),
316            RecordElemIter::TableDump(elem) => {
317                let n = elem.is_some() as usize;
318                (n, Some(n))
319            }
320            RecordElemIter::Bgp4Mp(iter) => iter.size_hint(),
321            RecordElemIter::RibAfi { entries, .. } => {
322                let len = entries.len();
323                (len, Some(len))
324            }
325        }
326    }
327}
328
329/// Iterator over [`BgpElem`]s produced from a [`BgpUpdateMessage`],
330/// avoiding allocation by lazily yielding elements from announced and
331/// withdrawn prefixes in two phases.
332pub struct BgpUpdateElemIter {
333    timestamp: f64,
334    peer_ip: IpAddr,
335    peer_asn: Asn,
336    peer_bgp_id: Option<BgpIdentifier>,
337    only_to_customer: Option<Asn>,
338    // Announce-specific shared attributes
339    path: Option<AsPath>,
340    origin_asns: Option<Vec<Asn>>,
341    origin: Option<Origin>,
342    next_hop: Option<IpAddr>,
343    local_pref: Option<u32>,
344    med: Option<u32>,
345    communities: Option<Vec<MetaCommunity>>,
346    atomic: bool,
347    aggr_asn: Option<Asn>,
348    aggr_ip: Option<BgpIdentifier>,
349    unknown: Option<Vec<AttrRaw>>,
350    deprecated: Option<Vec<AttrRaw>>,
351    // Prefix iterators (two chained sources each)
352    announced:
353        std::iter::Chain<std::vec::IntoIter<NetworkPrefix>, std::vec::IntoIter<NetworkPrefix>>,
354    withdrawn:
355        std::iter::Chain<std::vec::IntoIter<NetworkPrefix>, std::vec::IntoIter<NetworkPrefix>>,
356    in_withdrawn_phase: bool,
357}
358
359impl Iterator for BgpUpdateElemIter {
360    type Item = BgpElem;
361
362    fn next(&mut self) -> Option<BgpElem> {
363        if !self.in_withdrawn_phase {
364            if let Some(prefix) = self.announced.next() {
365                return Some(BgpElem {
366                    timestamp: self.timestamp,
367                    elem_type: ElemType::ANNOUNCE,
368                    peer_ip: self.peer_ip,
369                    peer_asn: self.peer_asn,
370                    peer_bgp_id: self.peer_bgp_id,
371                    prefix,
372                    next_hop: self.next_hop,
373                    as_path: self.path.clone(),
374                    origin: self.origin,
375                    origin_asns: self.origin_asns.clone(),
376                    local_pref: self.local_pref,
377                    med: self.med,
378                    communities: self.communities.clone(),
379                    atomic: self.atomic,
380                    aggr_asn: self.aggr_asn,
381                    aggr_ip: self.aggr_ip,
382                    only_to_customer: self.only_to_customer,
383                    unknown: self.unknown.clone(),
384                    deprecated: self.deprecated.clone(),
385                });
386            }
387            self.in_withdrawn_phase = true;
388        }
389
390        self.withdrawn.next().map(|prefix| BgpElem {
391            timestamp: self.timestamp,
392            elem_type: ElemType::WITHDRAW,
393            peer_ip: self.peer_ip,
394            peer_asn: self.peer_asn,
395            peer_bgp_id: self.peer_bgp_id,
396            prefix,
397            next_hop: None,
398            as_path: None,
399            origin: None,
400            origin_asns: None,
401            local_pref: None,
402            med: None,
403            communities: None,
404            atomic: false,
405            aggr_asn: None,
406            aggr_ip: None,
407            only_to_customer: None,
408            unknown: None,
409            deprecated: None,
410        })
411    }
412
413    fn size_hint(&self) -> (usize, Option<usize>) {
414        let (ann_lo, ann_hi) = if self.in_withdrawn_phase {
415            (0, Some(0))
416        } else {
417            self.announced.size_hint()
418        };
419        let (wd_lo, wd_hi) = self.withdrawn.size_hint();
420        (ann_lo + wd_lo, ann_hi.and_then(|a| wd_hi.map(|w| a + w)))
421    }
422}
423
424impl Elementor {
425    pub fn new() -> Elementor {
426        Self::default()
427    }
428
429    /// Sets the peer index table for the elementor.
430    ///
431    /// This method takes an MRT record and extracts the peer index table from it if the record contains one.
432    /// The peer index table is required for processing TableDumpV2 records, as it contains the mapping between
433    /// peer indices and their corresponding IP addresses and ASNs.
434    ///
435    /// # Arguments
436    ///
437    /// * `record` - An MRT record that should contain a peer index table
438    ///
439    /// # Returns
440    ///
441    /// * `Ok(())` - If the peer table was successfully extracted and set
442    /// * `Err(ParserError)` - If the record does not contain a peer index table
443    ///
444    /// # Example
445    ///
446    /// ```no_run
447    /// use bgpkit_parser::{BgpkitParser, Elementor};
448    ///
449    /// let mut parser = BgpkitParser::new("rib.dump.bz2").unwrap();
450    /// let mut elementor = Elementor::new();
451    ///
452    /// // Get the first record which should be the peer index table
453    /// if let Ok(record) = parser.next_record() {
454    ///     elementor.set_peer_table(record).unwrap();
455    /// }
456    /// ```
457    pub fn set_peer_table(&mut self, record: MrtRecord) -> Result<(), ParserError> {
458        if let MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(p)) =
459            record.message
460        {
461            self.peer_table = Some(p);
462            Ok(())
463        } else {
464            Err(ParseError("peer_table is not a PeerIndexTable".to_string()))
465        }
466    }
467
468    /// Creates an [`Elementor`] with the given [`PeerIndexTable`] already set.
469    pub fn with_peer_table(peer_table: PeerIndexTable) -> Elementor {
470        Elementor {
471            peer_table: Some(peer_table),
472        }
473    }
474
475    /// Convert a [`MrtRecord`] into an iterator of [`BgpElem`]s without
476    /// requiring `&mut self`.
477    ///
478    /// Unlike [`record_to_elems`](Elementor::record_to_elems), this method:
479    /// - Takes `&self` instead of `&mut self`, since the peer table must
480    ///   already be set via [`set_peer_table`](Elementor::set_peer_table) or
481    ///   [`with_peer_table`](Elementor::with_peer_table).
482    /// - Returns an error if the record contains a [`PeerIndexTable`] (which
483    ///   would require mutation).
484    /// - Returns a lazy [`RecordElemIter`] instead of collecting into a `Vec`,
485    ///   avoiding allocation for the common RIB table dump case.
486    ///
487    /// # Errors
488    ///
489    /// - [`ElemError::UnexpectedPeerIndexTable`] if the record is a PeerIndexTable message.
490    /// - [`ElemError::MissingPeerTable`] if the record requires a peer table but none is set.
491    pub fn record_to_elems_iter(&self, record: MrtRecord) -> Result<RecordElemIter<'_>, ElemError> {
492        let timestamp = {
493            let t = record.common_header.timestamp;
494            if let Some(micro) = &record.common_header.microsecond_timestamp {
495                let m = (*micro as f64) / 1000000.0;
496                t as f64 + m
497            } else {
498                f64::from(t)
499            }
500        };
501
502        match record.message {
503            MrtMessage::TableDumpMessage(msg) => {
504                let (
505                    as_path,
506                    _as4_path,
507                    origin,
508                    next_hop,
509                    local_pref,
510                    med,
511                    communities,
512                    atomic,
513                    aggregator,
514                    _announced,
515                    _withdrawn,
516                    only_to_customer,
517                    unknown,
518                    deprecated,
519                ) = get_relevant_attributes(msg.attributes);
520
521                let origin_asns = as_path
522                    .as_ref()
523                    .map(|as_path| as_path.iter_origins().collect());
524
525                Ok(RecordElemIter::TableDump(Some(BgpElem {
526                    timestamp: msg.originated_time as f64,
527                    elem_type: ElemType::ANNOUNCE,
528                    peer_ip: msg.peer_ip,
529                    peer_asn: msg.peer_asn,
530                    peer_bgp_id: None,
531                    prefix: msg.prefix,
532                    next_hop,
533                    as_path,
534                    origin,
535                    origin_asns,
536                    local_pref,
537                    med,
538                    communities,
539                    atomic,
540                    aggr_asn: aggregator.map(|v| v.0),
541                    aggr_ip: aggregator.map(|v| v.1),
542                    only_to_customer,
543                    unknown,
544                    deprecated,
545                })))
546            }
547
548            MrtMessage::TableDumpV2Message(msg) => match msg {
549                TableDumpV2Message::PeerIndexTable(p) => {
550                    Err(ElemError::UnexpectedPeerIndexTable(Box::new(p)))
551                }
552                TableDumpV2Message::RibAfi(t) => {
553                    let peer_table = self
554                        .peer_table
555                        .as_ref()
556                        .ok_or(ElemError::MissingPeerTable)?;
557                    Ok(RecordElemIter::RibAfi {
558                        peer_table,
559                        prefix: t.prefix,
560                        entries: t.rib_entries.into_iter(),
561                    })
562                }
563                TableDumpV2Message::RibGeneric(_) => Err(ElemError::UnsupportedRibGeneric),
564                TableDumpV2Message::GeoPeerTable(_) => Ok(RecordElemIter::Empty),
565            },
566
567            MrtMessage::Bgp4Mp(msg) => match msg {
568                Bgp4MpEnum::StateChange(_) => Ok(RecordElemIter::Empty),
569                Bgp4MpEnum::Message(v) => {
570                    match Elementor::bgp_to_elems_iter(
571                        v.bgp_message,
572                        timestamp,
573                        &v.peer_ip,
574                        &v.peer_asn,
575                    ) {
576                        Some(iter) => Ok(RecordElemIter::Bgp4Mp(iter)),
577                        None => Ok(RecordElemIter::Empty),
578                    }
579                }
580            },
581        }
582    }
583
584    /// Convert a [BgpMessage] to a vector of [BgpElem]s.
585    ///
586    /// A [BgpMessage] may include `Update`, `Open`, `Notification` or `KeepAlive` messages,
587    /// and only `Update` message contains [BgpElem]s.
588    pub fn bgp_to_elems(
589        msg: BgpMessage,
590        timestamp: f64,
591        peer_ip: &IpAddr,
592        peer_asn: &Asn,
593    ) -> Vec<BgpElem> {
594        Elementor::bgp_to_elems_iter(msg, timestamp, peer_ip, peer_asn)
595            .map(|iter| iter.collect())
596            .unwrap_or_default()
597    }
598
599    /// Convert a [BgpMessage] into an iterator of [BgpElem]s.
600    ///
601    /// Returns `None` for non-Update messages (Open, Notification, KeepAlive).
602    pub fn bgp_to_elems_iter(
603        msg: BgpMessage,
604        timestamp: f64,
605        peer_ip: &IpAddr,
606        peer_asn: &Asn,
607    ) -> Option<BgpUpdateElemIter> {
608        match msg {
609            BgpMessage::Update(msg) => Some(Elementor::bgp_update_to_elems_iter(
610                msg, timestamp, peer_ip, peer_asn,
611            )),
612            BgpMessage::Open(_) | BgpMessage::Notification(_) | BgpMessage::KeepAlive => None,
613        }
614    }
615
616    /// Convert a [BgpUpdateMessage] to a vector of [BgpElem]s.
617    pub fn bgp_update_to_elems(
618        msg: BgpUpdateMessage,
619        timestamp: f64,
620        peer_ip: &IpAddr,
621        peer_asn: &Asn,
622    ) -> Vec<BgpElem> {
623        Elementor::bgp_update_to_elems_iter(msg, timestamp, peer_ip, peer_asn).collect()
624    }
625
626    /// Convert a [BgpUpdateMessage] into a [`BgpUpdateElemIter`] that lazily
627    /// yields [BgpElem]s without allocating a `Vec`.
628    pub fn bgp_update_to_elems_iter(
629        msg: BgpUpdateMessage,
630        timestamp: f64,
631        peer_ip: &IpAddr,
632        peer_asn: &Asn,
633    ) -> BgpUpdateElemIter {
634        let (
635            as_path,
636            as4_path,
637            origin,
638            next_hop,
639            local_pref,
640            med,
641            communities,
642            atomic,
643            aggregator,
644            announced,
645            withdrawn,
646            only_to_customer,
647            unknown,
648            deprecated,
649        ) = get_relevant_attributes(msg.attributes);
650
651        let path = match (as_path, as4_path) {
652            (None, None) => None,
653            (Some(v), None) => Some(v),
654            (None, Some(v)) => Some(v),
655            (Some(v1), Some(v2)) => Some(AsPath::merge_aspath_as4path(&v1, &v2)),
656        };
657
658        let origin_asns = path
659            .as_ref()
660            .map(|as_path| as_path.iter_origins().collect());
661
662        let nlri_announced = announced.map(|n| n.prefixes).unwrap_or_default();
663        let nlri_withdrawn = withdrawn.map(|n| n.prefixes).unwrap_or_default();
664
665        BgpUpdateElemIter {
666            timestamp,
667            peer_ip: *peer_ip,
668            peer_asn: *peer_asn,
669            peer_bgp_id: None,
670            only_to_customer,
671            path,
672            origin_asns,
673            origin,
674            next_hop,
675            local_pref,
676            med,
677            communities,
678            atomic,
679            aggr_asn: aggregator.as_ref().map(|v| v.0),
680            aggr_ip: aggregator.as_ref().map(|v| v.1),
681            unknown,
682            deprecated,
683            announced: msg.announced_prefixes.into_iter().chain(nlri_announced),
684            withdrawn: msg.withdrawn_prefixes.into_iter().chain(nlri_withdrawn),
685            in_withdrawn_phase: false,
686        }
687    }
688
689    /// Convert a [MrtRecord] to a vector of [BgpElem]s.
690    ///
691    /// If the record is a [`PeerIndexTable`], it is consumed to set the internal
692    /// peer table. Errors are logged.
693    ///
694    /// For a non-mutating, lazy alternative, see
695    /// [`record_to_elems_iter`](Elementor::record_to_elems_iter).
696    pub fn record_to_elems(&mut self, record: MrtRecord) -> Vec<BgpElem> {
697        match record.message {
698            MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(_)) => {
699                self.set_peer_table(record);
700                vec![]
701            }
702            _ => match self.record_to_elems_iter(record) {
703                Ok(iter) => iter.collect(),
704                Err(e) => {
705                    error!("{}", e);
706                    vec![]
707                }
708            },
709        }
710    }
711}
712
713#[inline(always)]
714pub fn option_to_string<T>(o: &Option<T>) -> String
715where
716    T: Display,
717{
718    if let Some(v) = o {
719        v.to_string()
720    } else {
721        String::new()
722    }
723}
724
725impl From<&BgpElem> for Attributes {
726    fn from(value: &BgpElem) -> Self {
727        let mut values = Vec::<AttributeValue>::new();
728        let mut attributes = Attributes::default();
729        let prefix = value.prefix;
730
731        if value.elem_type == ElemType::WITHDRAW {
732            values.push(AttributeValue::MpUnreachNlri(Nlri::new_unreachable(prefix)));
733            attributes.extend(values);
734            return attributes;
735        }
736
737        values.push(AttributeValue::MpReachNlri(Nlri::new_reachable(
738            prefix,
739            value.next_hop,
740        )));
741
742        if let Some(v) = value.next_hop {
743            values.push(AttributeValue::NextHop(v));
744        }
745
746        if let Some(v) = value.as_path.as_ref() {
747            let is_as4 = match v.get_origin_opt() {
748                None => true,
749                Some(asn) => asn.is_four_byte(),
750            };
751            values.push(AttributeValue::AsPath {
752                path: v.clone(),
753                is_as4,
754            });
755        }
756
757        if let Some(v) = value.origin {
758            values.push(AttributeValue::Origin(v));
759        }
760
761        if let Some(v) = value.local_pref {
762            values.push(AttributeValue::LocalPreference(v));
763        }
764
765        if let Some(v) = value.med {
766            values.push(AttributeValue::MultiExitDiscriminator(v));
767        }
768
769        if let Some(v) = value.communities.as_ref() {
770            let mut communites = vec![];
771            let mut extended_communities = vec![];
772            let mut ipv6_extended_communities = vec![];
773            let mut large_communities = vec![];
774            for c in v {
775                match c {
776                    MetaCommunity::Plain(v) => communites.push(*v),
777                    MetaCommunity::Extended(v) => extended_communities.push(*v),
778                    MetaCommunity::Large(v) => large_communities.push(*v),
779                    MetaCommunity::Ipv6Extended(v) => ipv6_extended_communities.push(*v),
780                }
781            }
782            if !communites.is_empty() {
783                values.push(AttributeValue::Communities(communites));
784            }
785            if !extended_communities.is_empty() {
786                values.push(AttributeValue::ExtendedCommunities(extended_communities));
787            }
788            if !large_communities.is_empty() {
789                values.push(AttributeValue::LargeCommunities(large_communities));
790            }
791            if !ipv6_extended_communities.is_empty() {
792                values.push(AttributeValue::Ipv6AddressSpecificExtendedCommunities(
793                    ipv6_extended_communities,
794                ));
795            }
796        }
797
798        if let Some(v) = value.aggr_asn {
799            let aggregator_id = match value.aggr_ip {
800                Some(v) => v,
801                None => Ipv4Addr::UNSPECIFIED,
802            };
803            values.push(AttributeValue::Aggregator {
804                asn: v,
805                id: aggregator_id,
806                is_as4: v.is_four_byte(),
807            });
808        }
809
810        if let Some(v) = value.only_to_customer {
811            values.push(AttributeValue::OnlyToCustomer(v));
812        }
813
814        if let Some(v) = value.unknown.as_ref() {
815            for t in v {
816                values.push(AttributeValue::Unknown(t.clone()));
817            }
818        }
819
820        if let Some(v) = value.deprecated.as_ref() {
821            for t in v {
822                values.push(AttributeValue::Deprecated(t.clone()));
823            }
824        }
825
826        attributes.extend(values);
827        attributes
828    }
829}
830
831#[cfg(test)]
832mod tests {
833    use super::*;
834    use crate::BgpkitParser;
835    use std::net::{Ipv4Addr, Ipv6Addr};
836    use std::str::FromStr;
837
838    #[test]
839    fn test_option_to_string() {
840        let o1 = Some(1);
841        let o2: Option<u32> = None;
842        assert_eq!(option_to_string(&o1), "1");
843        assert_eq!(option_to_string(&o2), "");
844    }
845
846    #[test]
847    fn test_record_to_elems() {
848        let url_table_dump_v1 = "https://data.ris.ripe.net/rrc00/2003.01/bview.20030101.0000.gz";
849        let url_table_dump_v2 = "https://data.ris.ripe.net/rrc00/2023.01/bview.20230101.0000.gz";
850        let url_bgp4mp = "https://data.ris.ripe.net/rrc00/2021.10/updates.20211001.0000.gz";
851
852        let mut elementor = Elementor::new();
853        let parser = BgpkitParser::new(url_table_dump_v1).unwrap();
854        let mut record_iter = parser.into_record_iter();
855        let record = record_iter.next().unwrap();
856        let elems = elementor.record_to_elems(record);
857        assert_eq!(elems.len(), 1);
858
859        let parser = BgpkitParser::new(url_table_dump_v2).unwrap();
860        let mut record_iter = parser.into_record_iter();
861        let peer_index_table = record_iter.next().unwrap();
862        let _elems = elementor.record_to_elems(peer_index_table);
863        let record = record_iter.next().unwrap();
864        let elems = elementor.record_to_elems(record);
865        assert!(!elems.is_empty());
866
867        let parser = BgpkitParser::new(url_bgp4mp).unwrap();
868        let mut record_iter = parser.into_record_iter();
869        let record = record_iter.next().unwrap();
870        let elems = elementor.record_to_elems(record);
871        assert!(!elems.is_empty());
872    }
873
874    #[test]
875    fn test_attributes_from_bgp_elem() {
876        let mut elem = BgpElem {
877            timestamp: 0.0,
878            elem_type: ElemType::ANNOUNCE,
879            peer_ip: IpAddr::from_str("10.0.0.1").unwrap(),
880            peer_asn: Asn::new_32bit(65000),
881            peer_bgp_id: None,
882            prefix: NetworkPrefix::from_str("10.0.1.0/24").unwrap(),
883            next_hop: Some(IpAddr::from_str("10.0.0.2").unwrap()),
884            as_path: Some(AsPath::from_sequence([65000, 65001, 65002])),
885            origin: Some(Origin::EGP),
886            origin_asns: Some(vec![Asn::new_32bit(65000)]),
887            local_pref: Some(100),
888            med: Some(200),
889            communities: Some(vec![
890                MetaCommunity::Plain(Community::NoAdvertise),
891                MetaCommunity::Extended(ExtendedCommunity::Raw([0, 0, 0, 0, 0, 0, 0, 0])),
892                MetaCommunity::Large(LargeCommunity {
893                    global_admin: 0,
894                    local_data: [0, 0],
895                }),
896                MetaCommunity::Ipv6Extended(Ipv6AddrExtCommunity {
897                    community_type: ExtendedCommunityType::TransitiveTwoOctetAs,
898                    subtype: 0,
899                    global_admin: Ipv6Addr::from_str("2001:db8::").unwrap(),
900                    local_admin: [0, 0],
901                }),
902            ]),
903            atomic: false,
904            aggr_asn: Some(Asn::new_32bit(65000)),
905            aggr_ip: Some(Ipv4Addr::from_str("10.2.0.0").unwrap()),
906            only_to_customer: Some(Asn::new_32bit(65000)),
907            unknown: Some(vec![AttrRaw {
908                attr_type: AttrType::RESERVED,
909                bytes: vec![],
910            }]),
911            deprecated: Some(vec![AttrRaw {
912                attr_type: AttrType::RESERVED,
913                bytes: vec![],
914            }]),
915        };
916
917        let _attributes = Attributes::from(&elem);
918        elem.elem_type = ElemType::WITHDRAW;
919        let _attributes = Attributes::from(&elem);
920    }
921
922    #[test]
923    fn test_get_relevant_attributes() {
924        let attributes = vec![
925            AttributeValue::Origin(Origin::IGP),
926            AttributeValue::AsPath {
927                path: AsPath::from_sequence([65000, 65001, 65002]),
928                is_as4: true,
929            },
930            AttributeValue::NextHop(IpAddr::from_str("10.0.0.1").unwrap()),
931            AttributeValue::MultiExitDiscriminator(100),
932            AttributeValue::LocalPreference(200),
933            AttributeValue::AtomicAggregate,
934            AttributeValue::Aggregator {
935                asn: Asn::new_32bit(65000),
936                id: Ipv4Addr::from_str("10.0.0.1").unwrap(),
937                is_as4: false,
938            },
939            AttributeValue::Communities(vec![Community::NoExport]),
940            AttributeValue::ExtendedCommunities(vec![ExtendedCommunity::Raw([
941                0, 0, 0, 0, 0, 0, 0, 0,
942            ])]),
943            AttributeValue::LargeCommunities(vec![LargeCommunity {
944                global_admin: 0,
945                local_data: [0, 0],
946            }]),
947            AttributeValue::Ipv6AddressSpecificExtendedCommunities(vec![Ipv6AddrExtCommunity {
948                community_type: ExtendedCommunityType::TransitiveTwoOctetAs,
949                subtype: 0,
950                global_admin: Ipv6Addr::from_str("2001:db8::").unwrap(),
951                local_admin: [0, 0],
952            }]),
953            AttributeValue::MpReachNlri(Nlri::new_reachable(
954                NetworkPrefix::from_str("10.0.0.0/24").unwrap(),
955                Some(IpAddr::from_str("10.0.0.1").unwrap()),
956            )),
957            AttributeValue::MpUnreachNlri(Nlri::new_unreachable(
958                NetworkPrefix::from_str("10.0.0.0/24").unwrap(),
959            )),
960            AttributeValue::OnlyToCustomer(Asn::new_32bit(65000)),
961            AttributeValue::Unknown(AttrRaw {
962                attr_type: AttrType::RESERVED,
963                bytes: vec![],
964            }),
965            AttributeValue::Deprecated(AttrRaw {
966                attr_type: AttrType::RESERVED,
967                bytes: vec![],
968            }),
969        ]
970        .into_iter()
971        .map(Attribute::from)
972        .collect::<Vec<Attribute>>();
973
974        let attributes = Attributes::from(attributes);
975
976        let (
977            _as_path,
978            _as4_path, // Table dump v1 does not have 4-byte AS number
979            _origin,
980            _next_hop,
981            _local_pref,
982            _med,
983            _communities,
984            _atomic,
985            _aggregator,
986            _announced,
987            _withdrawn,
988            _only_to_customer,
989            _unknown,
990            _deprecated,
991        ) = get_relevant_attributes(attributes);
992    }
993
994    #[test]
995    fn test_next_hop_from_nlri() {
996        let attributes = vec![AttributeValue::NextHop(
997            IpAddr::from_str("10.0.0.1").unwrap(),
998        )]
999        .into_iter()
1000        .map(Attribute::from)
1001        .collect::<Vec<Attribute>>();
1002
1003        let attributes = Attributes::from(attributes);
1004
1005        let (
1006            _as_path,
1007            _as4_path, // Table dump v1 does not have 4-byte AS number
1008            _origin,
1009            next_hop,
1010            _local_pref,
1011            _med,
1012            _communities,
1013            _atomic,
1014            _aggregator,
1015            _announced,
1016            _withdrawn,
1017            _only_to_customer,
1018            _unknown,
1019            _deprecated,
1020        ) = get_relevant_attributes(attributes);
1021
1022        assert_eq!(next_hop, Some(IpAddr::from_str("10.0.0.1").unwrap()));
1023
1024        let attributes = vec![AttributeValue::MpReachNlri(Nlri::new_reachable(
1025            NetworkPrefix::from_str("10.0.0.0/24").unwrap(),
1026            Some(IpAddr::from_str("10.0.0.2").unwrap()),
1027        ))]
1028        .into_iter()
1029        .map(Attribute::from)
1030        .collect::<Vec<Attribute>>();
1031
1032        let attributes = Attributes::from(attributes);
1033
1034        let (
1035            _as_path,
1036            _as4_path, // Table dump v1 does not have 4-byte AS number
1037            _origin,
1038            next_hop,
1039            _local_pref,
1040            _med,
1041            _communities,
1042            _atomic,
1043            _aggregator,
1044            _announced,
1045            _withdrawn,
1046            _only_to_customer,
1047            _unknown,
1048            _deprecated,
1049        ) = get_relevant_attributes(attributes);
1050
1051        assert_eq!(next_hop, Some(IpAddr::from_str("10.0.0.2").unwrap()));
1052    }
1053
1054    #[test]
1055    fn test_record_to_elems_iter_equivalence_tabledumpv2_small() {
1056        // rib-example-small.bz2 is a TableDumpV2 file (starts with PeerIndexTable)
1057        let url = "https://spaces.bgpkit.org/parser/rib-example-small.bz2";
1058
1059        let mut elementor = Elementor::new();
1060        let parser = BgpkitParser::new(url).unwrap();
1061        let mut record_iter = parser.into_record_iter();
1062
1063        // Skip the PeerIndexTable
1064        let peer_index_table = record_iter.next().unwrap();
1065        let _ = elementor.record_to_elems(peer_index_table);
1066
1067        // Process the first RIB entry
1068        let record = record_iter.next().unwrap();
1069        let elems_vec = elementor.record_to_elems(record.clone());
1070        let elems_iter: Vec<BgpElem> = elementor.record_to_elems_iter(record).unwrap().collect();
1071        assert_eq!(elems_vec, elems_iter);
1072        assert!(!elems_vec.is_empty());
1073    }
1074
1075    #[test]
1076    fn test_record_to_elems_iter_equivalence_bgp4mp() {
1077        let url = "https://spaces.bgpkit.org/parser/update-example.gz";
1078
1079        let mut elementor = Elementor::new();
1080        let parser = BgpkitParser::new(url).unwrap();
1081        let mut record_iter = parser.into_record_iter();
1082        let record = record_iter.next().unwrap();
1083
1084        let elems_vec = elementor.record_to_elems(record.clone());
1085        let elems_iter: Vec<BgpElem> = elementor.record_to_elems_iter(record).unwrap().collect();
1086        assert_eq!(elems_vec, elems_iter);
1087        assert!(!elems_vec.is_empty());
1088    }
1089
1090    #[test]
1091    #[ignore = "requires large RIB file download"]
1092    fn test_record_to_elems_iter_equivalence_tabledumpv2() {
1093        let url = "https://data.ris.ripe.net/rrc00/2023.01/bview.20230101.0000.gz";
1094
1095        let mut elementor = Elementor::new();
1096        let parser = BgpkitParser::new(url).unwrap();
1097        let mut record_iter = parser.into_record_iter();
1098
1099        let peer_index_table = record_iter.next().unwrap();
1100        let _ = elementor.record_to_elems(peer_index_table);
1101
1102        let record = record_iter.next().unwrap();
1103        let elems_vec = elementor.record_to_elems(record.clone());
1104        let elems_iter: Vec<BgpElem> = elementor.record_to_elems_iter(record).unwrap().collect();
1105        assert_eq!(elems_vec, elems_iter);
1106        assert!(!elems_vec.is_empty());
1107    }
1108
1109    #[test]
1110    fn test_record_to_elems_iter_tabledumpv2_with_peer_table() {
1111        let url = "https://spaces.bgpkit.org/parser/rib-example-small.bz2";
1112
1113        let parser = BgpkitParser::new(url).unwrap();
1114        let mut record_iter = parser.into_record_iter();
1115
1116        let peer_index_table = record_iter.next().unwrap();
1117        let mut elementor = Elementor::with_peer_table(
1118            if let MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(pit)) =
1119                peer_index_table.message
1120            {
1121                pit
1122            } else {
1123                panic!("Expected PeerIndexTable");
1124            },
1125        );
1126
1127        let record = record_iter.next().unwrap();
1128        let elems_vec = elementor.record_to_elems(record.clone());
1129        let elems_iter: Vec<BgpElem> = elementor.record_to_elems_iter(record).unwrap().collect();
1130        assert_eq!(elems_vec, elems_iter);
1131        assert!(!elems_vec.is_empty());
1132    }
1133
1134    #[test]
1135    fn test_record_to_elems_iter_error_unexpected_peer_index_table() {
1136        let url = "https://spaces.bgpkit.org/parser/rib-example-small.bz2";
1137
1138        let elementor = Elementor::new();
1139        let parser = BgpkitParser::new(url).unwrap();
1140        let mut record_iter = parser.into_record_iter();
1141        let record = record_iter.next().unwrap();
1142
1143        let result = elementor.record_to_elems_iter(record);
1144        assert!(matches!(
1145            result,
1146            Err(ElemError::UnexpectedPeerIndexTable(_))
1147        ));
1148    }
1149
1150    #[test]
1151    fn test_record_to_elems_iter_error_missing_peer_table() {
1152        // rib-example-small.bz2 is a TableDumpV2 file (starts with PeerIndexTable)
1153        let url = "https://spaces.bgpkit.org/parser/rib-example-small.bz2";
1154
1155        let elementor = Elementor::new();
1156        let parser = BgpkitParser::new(url).unwrap();
1157        let mut record_iter = parser.into_record_iter();
1158
1159        // Skip the PeerIndexTable without consuming it via record_to_elems
1160        // which would set the peer table in the elementor
1161        let _peer_index_table = record_iter.next().unwrap();
1162
1163        // Now try to process a RIB entry without having set the peer table
1164        let record = record_iter.next().unwrap();
1165        let result = elementor.record_to_elems_iter(record);
1166        assert!(matches!(result, Err(ElemError::MissingPeerTable)));
1167    }
1168
1169    #[test]
1170    fn test_bgp_to_elems_iter_equivalence() {
1171        let timestamp = 0.0;
1172        let peer_ip = IpAddr::from_str("10.0.0.1").unwrap();
1173        let peer_asn = Asn::new_32bit(65000);
1174
1175        let attributes = vec![
1176            AttributeValue::Origin(Origin::IGP),
1177            AttributeValue::AsPath {
1178                path: AsPath::from_sequence([65000, 65001, 65002]),
1179                is_as4: false,
1180            },
1181            AttributeValue::NextHop(peer_ip),
1182        ]
1183        .into_iter()
1184        .map(Attribute::from)
1185        .collect::<Vec<Attribute>>();
1186        let attributes = Attributes::from(attributes);
1187
1188        let announced_prefixes = vec![NetworkPrefix::from_str("10.0.0.0/24").unwrap()];
1189
1190        let bgp_message = BgpMessage::Update(BgpUpdateMessage {
1191            attributes,
1192            announced_prefixes,
1193            withdrawn_prefixes: vec![],
1194        });
1195
1196        let elems_vec =
1197            Elementor::bgp_to_elems(bgp_message.clone(), timestamp, &peer_ip, &peer_asn);
1198        let elems_iter: Vec<BgpElem> =
1199            Elementor::bgp_to_elems_iter(bgp_message, timestamp, &peer_ip, &peer_asn)
1200                .unwrap()
1201                .collect();
1202        assert_eq!(elems_vec, elems_iter);
1203        assert_eq!(elems_vec.len(), 1);
1204    }
1205
1206    #[test]
1207    fn test_bgp_to_elems_iter_non_update_messages() {
1208        use std::net::Ipv4Addr;
1209
1210        let timestamp = 0.0;
1211        let peer_ip = IpAddr::from_str("10.0.0.1").unwrap();
1212        let peer_asn = Asn::new_32bit(65000);
1213
1214        let open_msg = BgpOpenMessage {
1215            version: 4,
1216            asn: Asn::new_32bit(1),
1217            hold_time: 180,
1218            bgp_identifier: Ipv4Addr::new(192, 0, 2, 1),
1219            extended_length: false,
1220            opt_params: vec![],
1221        };
1222        assert!(Elementor::bgp_to_elems_iter(
1223            BgpMessage::Open(open_msg),
1224            timestamp,
1225            &peer_ip,
1226            &peer_asn
1227        )
1228        .is_none());
1229
1230        let notification_msg = BgpNotificationMessage {
1231            error: BgpError::Unknown(0, 0),
1232            data: vec![],
1233        };
1234        assert!(Elementor::bgp_to_elems_iter(
1235            BgpMessage::Notification(notification_msg),
1236            timestamp,
1237            &peer_ip,
1238            &peer_asn
1239        )
1240        .is_none());
1241
1242        assert!(Elementor::bgp_to_elems_iter(
1243            BgpMessage::KeepAlive,
1244            timestamp,
1245            &peer_ip,
1246            &peer_asn
1247        )
1248        .is_none());
1249    }
1250
1251    #[test]
1252    fn test_bgp_update_to_elems_iter_equivalence() {
1253        let timestamp = 0.0;
1254        let peer_ip = IpAddr::from_str("10.0.0.1").unwrap();
1255        let peer_asn = Asn::new_32bit(65000);
1256
1257        let attributes = vec![
1258            AttributeValue::Origin(Origin::IGP),
1259            AttributeValue::AsPath {
1260                path: AsPath::from_sequence([65000, 65001, 65002]),
1261                is_as4: false,
1262            },
1263            AttributeValue::NextHop(peer_ip),
1264        ]
1265        .into_iter()
1266        .map(Attribute::from)
1267        .collect::<Vec<Attribute>>();
1268        let attributes = Attributes::from(attributes);
1269
1270        let announced_prefixes = vec![NetworkPrefix::from_str("10.0.0.0/24").unwrap()];
1271        let withdrawn_prefixes = vec![NetworkPrefix::from_str("10.0.1.0/24").unwrap()];
1272
1273        let update = BgpUpdateMessage {
1274            attributes,
1275            announced_prefixes,
1276            withdrawn_prefixes,
1277        };
1278
1279        let elems_vec =
1280            Elementor::bgp_update_to_elems(update.clone(), timestamp, &peer_ip, &peer_asn);
1281        let elems_iter: Vec<BgpElem> =
1282            Elementor::bgp_update_to_elems_iter(update, timestamp, &peer_ip, &peer_asn).collect();
1283        assert_eq!(elems_vec, elems_iter);
1284        assert_eq!(elems_vec.len(), 2);
1285    }
1286
1287    #[test]
1288    fn test_record_elem_iter_size_hint() {
1289        use std::collections::HashMap;
1290
1291        let peer_table = PeerIndexTable {
1292            collector_bgp_id: BgpIdentifier::from_str("10.0.0.1").unwrap(),
1293            view_name: "".to_string(),
1294            id_peer_map: HashMap::new(),
1295            peer_ip_id_map: HashMap::new(),
1296        };
1297
1298        let entries: Vec<RibEntry> = vec![];
1299        let iter = RecordElemIter::RibAfi {
1300            peer_table: &peer_table,
1301            prefix: NetworkPrefix::from_str("10.0.0.0/24").unwrap(),
1302            entries: entries.into_iter(),
1303        };
1304        assert_eq!(iter.size_hint(), (0, Some(0)));
1305
1306        let entries: Vec<RibEntry> = (0..5)
1307            .map(|i| RibEntry {
1308                peer_index: i as u16,
1309                originated_time: 0,
1310                path_id: None,
1311                attributes: Attributes::default(),
1312            })
1313            .collect();
1314        let iter = RecordElemIter::RibAfi {
1315            peer_table: &peer_table,
1316            prefix: NetworkPrefix::from_str("10.0.0.0/24").unwrap(),
1317            entries: entries.into_iter(),
1318        };
1319        assert_eq!(iter.size_hint(), (5, Some(5)));
1320    }
1321}