bgpkit_parser/parser/mrt/
mrt_record.rs

1use super::mrt_header::parse_common_header;
2use crate::bmp::messages::{BmpMessage, BmpMessageBody};
3use crate::error::ParserError;
4use crate::models::*;
5use crate::parser::{
6    parse_bgp4mp, parse_table_dump_message, parse_table_dump_v2_message, ParserErrorWithBytes,
7};
8use crate::utils::convert_timestamp;
9use bytes::{BufMut, Bytes, BytesMut};
10use log::warn;
11use std::convert::TryFrom;
12use std::io::Read;
13use std::net::IpAddr;
14use std::str::FromStr;
15
16pub fn parse_mrt_record(input: &mut impl Read) -> Result<MrtRecord, ParserErrorWithBytes> {
17    // parse common header
18    let common_header = match parse_common_header(input) {
19        Ok(v) => v,
20        Err(e) => {
21            if let ParserError::EofError(e) = &e {
22                if e.kind() == std::io::ErrorKind::UnexpectedEof {
23                    return Err(ParserErrorWithBytes::from(ParserError::EofExpected));
24                }
25            }
26            return Err(ParserErrorWithBytes {
27                error: e,
28                bytes: None,
29            });
30        }
31    };
32
33    // read the whole message bytes to buffer
34    let mut buffer = BytesMut::with_capacity(common_header.length as usize);
35    buffer.resize(common_header.length as usize, 0);
36    match input
37        .take(common_header.length as u64)
38        .read_exact(&mut buffer)
39    {
40        Ok(_) => {}
41        Err(e) => {
42            return Err(ParserErrorWithBytes {
43                error: ParserError::IoError(e),
44                bytes: None,
45            })
46        }
47    }
48
49    match parse_mrt_body(
50        common_header.entry_type as u16,
51        common_header.entry_subtype,
52        buffer.freeze(), // freeze the BytesMute to Bytes
53    ) {
54        Ok(message) => Ok(MrtRecord {
55            common_header,
56            message,
57        }),
58        Err(e) => {
59            // TODO: find more efficient way to preserve the bytes during error
60            // let mut total_bytes = vec![];
61            // if common_header.write_header(&mut total_bytes).is_err() {
62            //     unreachable!("Vec<u8> will never produce errors when used as a std::io::Write")
63            // }
64
65            // total_bytes.extend(buffer);
66            // Err(ParserErrorWithBytes {
67            //     error: e,
68            //     bytes: Some(total_bytes),
69            // })
70            Err(ParserErrorWithBytes {
71                error: e,
72                bytes: None,
73            })
74        }
75    }
76}
77
78/// Parse MRT message body with given entry type and subtype.
79///
80/// The entry type and subtype are parsed from the common header. The message body is parsed
81/// according to the entry type and subtype. The message body is the remaining bytes after the
82/// common header. The length of the message body is also parsed from the common header.
83pub fn parse_mrt_body(
84    entry_type: u16,
85    entry_subtype: u16,
86    data: Bytes,
87) -> Result<MrtMessage, ParserError> {
88    let etype = EntryType::try_from(entry_type)?;
89
90    let message: MrtMessage = match &etype {
91        EntryType::TABLE_DUMP => {
92            let msg = parse_table_dump_message(entry_subtype, data);
93            match msg {
94                Ok(msg) => MrtMessage::TableDumpMessage(msg),
95                Err(e) => {
96                    return Err(e);
97                }
98            }
99        }
100        EntryType::TABLE_DUMP_V2 => {
101            let msg = parse_table_dump_v2_message(entry_subtype, data);
102            match msg {
103                Ok(msg) => MrtMessage::TableDumpV2Message(msg),
104                Err(e) => {
105                    return Err(e);
106                }
107            }
108        }
109        EntryType::BGP4MP | EntryType::BGP4MP_ET => {
110            let msg = parse_bgp4mp(entry_subtype, data);
111            match msg {
112                Ok(msg) => MrtMessage::Bgp4Mp(msg),
113                Err(e) => {
114                    return Err(e);
115                }
116            }
117        }
118        v => {
119            // deprecated
120            return Err(ParserError::Unsupported(format!(
121                "unsupported MRT type: {:?}",
122                v
123            )));
124        }
125    };
126    Ok(message)
127}
128
129impl MrtRecord {
130    pub fn encode(&self) -> Bytes {
131        let mut bytes = BytesMut::new();
132        let message_bytes = self.message.encode(self.common_header.entry_subtype);
133        let mut new_header = self.common_header;
134        if message_bytes.len() < new_header.length as usize {
135            warn!("message length is less than the length in the header");
136            new_header.length = message_bytes.len() as u32;
137        }
138        let header_bytes = new_header.encode();
139
140        // // debug begins
141        // let parsed_body = parse_mrt_body(
142        //     self.common_header.entry_type as u16,
143        //     self.common_header.entry_subtype,
144        //     message_bytes.clone(),
145        // )
146        // .unwrap();
147        // assert!(self.message == parsed_body);
148        // // debug ends
149
150        bytes.put_slice(&header_bytes);
151        bytes.put_slice(&message_bytes);
152        bytes.freeze()
153    }
154}
155
156impl TryFrom<&BmpMessage> for MrtRecord {
157    type Error = String;
158
159    fn try_from(bmp_message: &BmpMessage) -> Result<Self, Self::Error> {
160        let bgp_message = match &bmp_message.message_body {
161            BmpMessageBody::RouteMonitoring(m) => &m.bgp_message,
162            _ => return Err("unsupported bmp message type".to_string()),
163        };
164        let bmp_header = match &bmp_message.per_peer_header {
165            Some(h) => h,
166            None => return Err("missing per peer header".to_string()),
167        };
168
169        let local_ip = match bmp_header.peer_ip {
170            IpAddr::V4(_) => IpAddr::from_str("0.0.0.0").unwrap(),
171            IpAddr::V6(_) => IpAddr::from_str("::").unwrap(),
172        };
173        let local_asn = match bmp_header.peer_asn.is_four_byte() {
174            true => Asn::new_32bit(0),
175            false => Asn::new_16bit(0),
176        };
177
178        let bgp4mp_message = Bgp4MpMessage {
179            msg_type: Bgp4MpType::MessageAs4, // TODO: check Message or MessageAs4
180            peer_asn: bmp_header.peer_asn,
181            local_asn,
182            interface_index: 0,
183            peer_ip: bmp_header.peer_ip,
184            local_ip,
185            bgp_message: bgp_message.clone(),
186        };
187
188        let mrt_message = MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(bgp4mp_message));
189
190        let (seconds, microseconds) = convert_timestamp(bmp_header.timestamp);
191
192        let subtype = Bgp4MpType::MessageAs4 as u16;
193        let mrt_header = CommonHeader {
194            timestamp: seconds,
195            microsecond_timestamp: Some(microseconds),
196            entry_type: EntryType::BGP4MP_ET,
197            entry_subtype: Bgp4MpType::MessageAs4 as u16,
198            length: mrt_message.encode(subtype).len() as u32,
199        };
200
201        Ok(MrtRecord {
202            common_header: mrt_header,
203            message: mrt_message,
204        })
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use crate::bmp::messages::headers::{BmpPeerType, PeerFlags, PerPeerFlags};
212    use crate::bmp::messages::{BmpCommonHeader, BmpMsgType, BmpPerPeerHeader, RouteMonitoring};
213    use std::net::Ipv4Addr;
214
215    #[test]
216    fn test_try_from_bmp_message() {
217        let bmp_message = BmpMessage {
218            common_header: BmpCommonHeader {
219                version: 0,
220                msg_len: 0,
221                msg_type: BmpMsgType::RouteMonitoring,
222            },
223            per_peer_header: Some(BmpPerPeerHeader {
224                peer_asn: Asn::new_32bit(0),
225                peer_ip: IpAddr::from_str("10.0.0.1").unwrap(),
226                peer_bgp_id: Ipv4Addr::from_str("10.0.0.2").unwrap(),
227                timestamp: 0.0,
228                peer_type: BmpPeerType::Global,
229                peer_flags: PerPeerFlags::PeerFlags(PeerFlags::empty()),
230                peer_distinguisher: 0,
231            }),
232            message_body: BmpMessageBody::RouteMonitoring(RouteMonitoring {
233                bgp_message: BgpMessage::KeepAlive,
234            }),
235        };
236
237        let mrt_record = MrtRecord::try_from(&bmp_message).unwrap();
238        assert_eq!(mrt_record.common_header.entry_type, EntryType::BGP4MP_ET);
239    }
240
241    #[test]
242    fn test_parse_mrt_body() {
243        let mut data = BytesMut::new();
244        data.put_u16(0);
245        data.put_u16(0);
246        data.put_u32(0);
247        data.put_u16(0);
248
249        let result = parse_mrt_body(0, 0, data.freeze());
250        assert!(result.is_err());
251    }
252}