bgpkit_parser/parser/mrt/
mrt_record.rs

1use super::mrt_header::parse_common_header;
2use crate::bmp::messages::{BmpMessage, BmpMessageBody};
3use crate::error::ParserError;
4use crate::models::*;
5use crate::parser::{
6    parse_bgp4mp, parse_table_dump_message, parse_table_dump_v2_message, ParserErrorWithBytes,
7};
8use crate::utils::convert_timestamp;
9use bytes::{BufMut, Bytes, BytesMut};
10use log::warn;
11use std::convert::TryFrom;
12use std::io::Read;
13use std::net::IpAddr;
14use std::str::FromStr;
15
16/// Raw MRT record containing the common header and unparsed message bytes.
17/// This allows for lazy parsing of the MRT message body.
18#[derive(Debug, Clone)]
19pub struct RawMrtRecord {
20    pub common_header: CommonHeader,
21    pub raw_bytes: Bytes,
22}
23
24impl RawMrtRecord {
25    /// Parse the raw MRT record into a fully parsed MrtRecord.
26    /// This consumes the RawMrtRecord and returns a MrtRecord.
27    pub fn parse(self) -> Result<MrtRecord, ParserError> {
28        let message = parse_mrt_body(
29            self.common_header.entry_type as u16,
30            self.common_header.entry_subtype,
31            self.raw_bytes,
32        )?;
33
34        Ok(MrtRecord {
35            common_header: self.common_header,
36            message,
37        })
38    }
39}
40
41pub fn chunk_mrt_record(input: &mut impl Read) -> Result<RawMrtRecord, ParserErrorWithBytes> {
42    // parse common header
43    let common_header = match parse_common_header(input) {
44        Ok(v) => v,
45        Err(e) => {
46            if let ParserError::EofError(e) = &e {
47                if e.kind() == std::io::ErrorKind::UnexpectedEof {
48                    return Err(ParserErrorWithBytes::from(ParserError::EofExpected));
49                }
50            }
51            return Err(ParserErrorWithBytes {
52                error: e,
53                bytes: None,
54            });
55        }
56    };
57
58    // read the whole message bytes to buffer
59    let mut buffer = BytesMut::zeroed(common_header.length as usize);
60    match input
61        .take(common_header.length as u64)
62        .read_exact(&mut buffer)
63    {
64        Ok(_) => {}
65        Err(e) => {
66            return Err(ParserErrorWithBytes {
67                error: ParserError::IoError(e),
68                bytes: None,
69            })
70        }
71    }
72
73    Ok(RawMrtRecord {
74        common_header,
75        raw_bytes: buffer.freeze(),
76    })
77}
78
79pub fn parse_mrt_record(input: &mut impl Read) -> Result<MrtRecord, ParserErrorWithBytes> {
80    let raw_record = chunk_mrt_record(input)?;
81    match raw_record.parse() {
82        Ok(record) => Ok(record),
83        Err(e) => Err(ParserErrorWithBytes {
84            error: e,
85            bytes: None,
86        }),
87    }
88}
89
90/// Parse MRT message body with given entry type and subtype.
91///
92/// The entry type and subtype are parsed from the common header. The message body is parsed
93/// according to the entry type and subtype. The message body is the remaining bytes after the
94/// common header. The length of the message body is also parsed from the common header.
95pub fn parse_mrt_body(
96    entry_type: u16,
97    entry_subtype: u16,
98    data: Bytes,
99) -> Result<MrtMessage, ParserError> {
100    let etype = EntryType::try_from(entry_type)?;
101
102    let message: MrtMessage = match &etype {
103        EntryType::TABLE_DUMP => {
104            let msg = parse_table_dump_message(entry_subtype, data);
105            match msg {
106                Ok(msg) => MrtMessage::TableDumpMessage(msg),
107                Err(e) => {
108                    return Err(e);
109                }
110            }
111        }
112        EntryType::TABLE_DUMP_V2 => {
113            let msg = parse_table_dump_v2_message(entry_subtype, data);
114            match msg {
115                Ok(msg) => MrtMessage::TableDumpV2Message(msg),
116                Err(e) => {
117                    return Err(e);
118                }
119            }
120        }
121        EntryType::BGP4MP | EntryType::BGP4MP_ET => {
122            let msg = parse_bgp4mp(entry_subtype, data);
123            match msg {
124                Ok(msg) => MrtMessage::Bgp4Mp(msg),
125                Err(e) => {
126                    return Err(e);
127                }
128            }
129        }
130        v => {
131            // deprecated
132            return Err(ParserError::Unsupported(format!(
133                "unsupported MRT type: {v:?}"
134            )));
135        }
136    };
137    Ok(message)
138}
139
140impl MrtRecord {
141    pub fn encode(&self) -> Bytes {
142        let message_bytes = self.message.encode(self.common_header.entry_subtype);
143        let mut new_header = self.common_header;
144        if message_bytes.len() < new_header.length as usize {
145            warn!("message length is less than the length in the header");
146            new_header.length = message_bytes.len() as u32;
147        }
148        let header_bytes = new_header.encode();
149
150        // // debug begins
151        // let parsed_body = parse_mrt_body(
152        //     self.common_header.entry_type as u16,
153        //     self.common_header.entry_subtype,
154        //     message_bytes.clone(),
155        // )
156        // .unwrap();
157        // assert!(self.message == parsed_body);
158        // // debug ends
159
160        let mut bytes = BytesMut::with_capacity(header_bytes.len() + message_bytes.len());
161        bytes.put_slice(&header_bytes);
162        bytes.put_slice(&message_bytes);
163        bytes.freeze()
164    }
165}
166
167impl TryFrom<&BmpMessage> for MrtRecord {
168    type Error = String;
169
170    fn try_from(bmp_message: &BmpMessage) -> Result<Self, Self::Error> {
171        let bgp_message = match &bmp_message.message_body {
172            BmpMessageBody::RouteMonitoring(m) => &m.bgp_message,
173            _ => return Err("unsupported bmp message type".to_string()),
174        };
175        let bmp_header = match &bmp_message.per_peer_header {
176            Some(h) => h,
177            None => return Err("missing per peer header".to_string()),
178        };
179
180        let local_ip = match bmp_header.peer_ip {
181            IpAddr::V4(_) => IpAddr::from_str("0.0.0.0").unwrap(),
182            IpAddr::V6(_) => IpAddr::from_str("::").unwrap(),
183        };
184        let local_asn = match bmp_header.peer_asn.is_four_byte() {
185            true => Asn::new_32bit(0),
186            false => Asn::new_16bit(0),
187        };
188
189        let bgp4mp_message = Bgp4MpMessage {
190            msg_type: Bgp4MpType::MessageAs4, // TODO: check Message or MessageAs4
191            peer_asn: bmp_header.peer_asn,
192            local_asn,
193            interface_index: 0,
194            peer_ip: bmp_header.peer_ip,
195            local_ip,
196            bgp_message: bgp_message.clone(),
197        };
198
199        let mrt_message = MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(bgp4mp_message));
200
201        let (seconds, microseconds) = convert_timestamp(bmp_header.timestamp);
202
203        let subtype = Bgp4MpType::MessageAs4 as u16;
204        let mrt_header = CommonHeader {
205            timestamp: seconds,
206            microsecond_timestamp: Some(microseconds),
207            entry_type: EntryType::BGP4MP_ET,
208            entry_subtype: Bgp4MpType::MessageAs4 as u16,
209            length: mrt_message.encode(subtype).len() as u32,
210        };
211
212        Ok(MrtRecord {
213            common_header: mrt_header,
214            message: mrt_message,
215        })
216    }
217}
218
219#[cfg(test)]
220mod tests {
221    use super::*;
222    use crate::bmp::messages::headers::{BmpPeerType, PeerFlags, PerPeerFlags};
223    use crate::bmp::messages::{BmpCommonHeader, BmpMsgType, BmpPerPeerHeader, RouteMonitoring};
224    use std::net::Ipv4Addr;
225
226    #[test]
227    fn test_try_from_bmp_message() {
228        let bmp_message = BmpMessage {
229            common_header: BmpCommonHeader {
230                version: 0,
231                msg_len: 0,
232                msg_type: BmpMsgType::RouteMonitoring,
233            },
234            per_peer_header: Some(BmpPerPeerHeader {
235                peer_asn: Asn::new_32bit(0),
236                peer_ip: IpAddr::from_str("10.0.0.1").unwrap(),
237                peer_bgp_id: Ipv4Addr::from_str("10.0.0.2").unwrap(),
238                timestamp: 0.0,
239                peer_type: BmpPeerType::Global,
240                peer_flags: PerPeerFlags::PeerFlags(PeerFlags::empty()),
241                peer_distinguisher: 0,
242            }),
243            message_body: BmpMessageBody::RouteMonitoring(RouteMonitoring {
244                bgp_message: BgpMessage::KeepAlive,
245            }),
246        };
247
248        let mrt_record = MrtRecord::try_from(&bmp_message).unwrap();
249        assert_eq!(mrt_record.common_header.entry_type, EntryType::BGP4MP_ET);
250    }
251
252    #[test]
253    fn test_parse_mrt_body() {
254        let mut data = BytesMut::new();
255        data.put_u16(0);
256        data.put_u16(0);
257        data.put_u32(0);
258        data.put_u16(0);
259
260        let result = parse_mrt_body(0, 0, data.freeze());
261        assert!(result.is_err());
262    }
263}