bgpkit_parser/parser/mrt/
mrt_record.rs

1use super::mrt_header::parse_common_header_with_bytes;
2use crate::bmp::messages::{BmpMessage, BmpMessageBody};
3use crate::error::ParserError;
4use crate::models::*;
5use crate::parser::{
6    parse_bgp4mp, parse_table_dump_message, parse_table_dump_v2_message, ParserErrorWithBytes,
7};
8use crate::utils::convert_timestamp;
9use bytes::{BufMut, Bytes, BytesMut};
10use log::warn;
11use std::convert::TryFrom;
12use std::fs::File;
13use std::io::{Read, Write};
14use std::net::IpAddr;
15use std::path::Path;
16use std::str::FromStr;
17
18/// Raw MRT record containing the common header and unparsed message bytes.
19/// This allows for lazy parsing of the MRT message body, and provides
20/// utilities for debugging and exporting problematic records.
21#[derive(Debug, Clone)]
22pub struct RawMrtRecord {
23    pub common_header: CommonHeader,
24    /// The raw bytes of the MRT common header (as read from the wire).
25    pub header_bytes: Bytes,
26    /// The raw bytes of the MRT message body (excluding the common header).
27    pub message_bytes: Bytes,
28}
29
30impl RawMrtRecord {
31    /// Parse the raw MRT record into a fully parsed MrtRecord.
32    /// This consumes the RawMrtRecord and returns a MrtRecord.
33    pub fn parse(self) -> Result<MrtRecord, ParserError> {
34        let message = parse_mrt_body(
35            self.common_header.entry_type as u16,
36            self.common_header.entry_subtype,
37            self.message_bytes,
38        )?;
39
40        Ok(MrtRecord {
41            common_header: self.common_header,
42            message,
43        })
44    }
45
46    /// Returns the complete MRT record as raw bytes (header + message body).
47    ///
48    /// This returns the exact bytes as they were read from the wire,
49    /// without any re-encoding. This is useful for debugging problematic
50    /// MRT records by exporting them as-is to a file for further analysis.
51    ///
52    /// # Example
53    /// ```ignore
54    /// let raw_record = parser.into_raw_record_iter().next().unwrap();
55    /// let bytes = raw_record.raw_bytes();
56    /// std::fs::write("record.mrt", &bytes).unwrap();
57    /// ```
58    pub fn raw_bytes(&self) -> Bytes {
59        let mut bytes = BytesMut::with_capacity(self.header_bytes.len() + self.message_bytes.len());
60        bytes.put_slice(&self.header_bytes);
61        bytes.put_slice(&self.message_bytes);
62        bytes.freeze()
63    }
64
65    /// Writes the raw MRT record (header + message body) to a file.
66    ///
67    /// This is useful for extracting problematic MRT records for debugging
68    /// or further analysis with other tools.
69    ///
70    /// # Arguments
71    /// * `path` - The path to write the raw bytes to.
72    ///
73    /// # Example
74    /// ```ignore
75    /// let raw_record = parser.into_raw_record_iter().next().unwrap();
76    /// raw_record.write_raw_bytes("problematic_record.mrt").unwrap();
77    /// ```
78    pub fn write_raw_bytes<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
79        let mut file = File::create(path)?;
80        file.write_all(&self.header_bytes)?;
81        file.write_all(&self.message_bytes)?;
82        Ok(())
83    }
84
85    /// Appends the raw MRT record (header + message body) to a file.
86    ///
87    /// This is useful for collecting multiple problematic records into a single file.
88    ///
89    /// # Arguments
90    /// * `path` - The path to append the raw bytes to.
91    ///
92    /// # Example
93    /// ```ignore
94    /// for raw_record in parser.into_raw_record_iter() {
95    ///     if is_problematic(&raw_record) {
96    ///         raw_record.append_raw_bytes("problematic_records.mrt").unwrap();
97    ///     }
98    /// }
99    /// ```
100    pub fn append_raw_bytes<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
101        let mut file = std::fs::OpenOptions::new()
102            .create(true)
103            .append(true)
104            .open(path)?;
105        file.write_all(&self.header_bytes)?;
106        file.write_all(&self.message_bytes)?;
107        Ok(())
108    }
109
110    /// Returns the total length of the complete MRT record in bytes (header + body).
111    pub fn total_bytes_len(&self) -> usize {
112        self.header_bytes.len() + self.message_bytes.len()
113    }
114}
115
116pub fn chunk_mrt_record(input: &mut impl Read) -> Result<RawMrtRecord, ParserErrorWithBytes> {
117    // parse common header and capture raw bytes
118    let parsed_header = match parse_common_header_with_bytes(input) {
119        Ok(v) => v,
120        Err(e) => {
121            if let ParserError::EofError(e) = &e {
122                if e.kind() == std::io::ErrorKind::UnexpectedEof {
123                    return Err(ParserErrorWithBytes::from(ParserError::EofExpected));
124                }
125            }
126            return Err(ParserErrorWithBytes {
127                error: e,
128                bytes: None,
129            });
130        }
131    };
132
133    let common_header = parsed_header.header;
134    let header_bytes = parsed_header.raw_bytes;
135
136    // Protect against unreasonable allocations from corrupt headers
137    const MAX_MRT_MESSAGE_LEN: u32 = 16 * 1024 * 1024; // 16 MiB upper bound
138    if common_header.length > MAX_MRT_MESSAGE_LEN {
139        return Err(ParserErrorWithBytes::from(ParserError::Unsupported(
140            format!("MRT message too large: {} bytes", common_header.length),
141        )));
142    }
143
144    // read the whole message bytes to buffer
145    let mut buffer = BytesMut::zeroed(common_header.length as usize);
146    match input
147        .take(common_header.length as u64)
148        .read_exact(&mut buffer)
149    {
150        Ok(_) => {}
151        Err(e) => {
152            return Err(ParserErrorWithBytes {
153                error: ParserError::IoError(e),
154                bytes: None,
155            })
156        }
157    }
158
159    Ok(RawMrtRecord {
160        common_header,
161        header_bytes,
162        message_bytes: buffer.freeze(),
163    })
164}
165
166pub fn parse_mrt_record(input: &mut impl Read) -> Result<MrtRecord, ParserErrorWithBytes> {
167    let raw_record = chunk_mrt_record(input)?;
168    match raw_record.parse() {
169        Ok(record) => Ok(record),
170        Err(e) => Err(ParserErrorWithBytes {
171            error: e,
172            bytes: None,
173        }),
174    }
175}
176
177/// Parse MRT message body with given entry type and subtype.
178///
179/// The entry type and subtype are parsed from the common header. The message body is parsed
180/// according to the entry type and subtype. The message body is the remaining bytes after the
181/// common header. The length of the message body is also parsed from the common header.
182pub fn parse_mrt_body(
183    entry_type: u16,
184    entry_subtype: u16,
185    data: Bytes,
186) -> Result<MrtMessage, ParserError> {
187    let etype = EntryType::try_from(entry_type)?;
188
189    let message: MrtMessage = match &etype {
190        EntryType::TABLE_DUMP => {
191            let msg = parse_table_dump_message(entry_subtype, data);
192            match msg {
193                Ok(msg) => MrtMessage::TableDumpMessage(msg),
194                Err(e) => {
195                    return Err(e);
196                }
197            }
198        }
199        EntryType::TABLE_DUMP_V2 => {
200            let msg = parse_table_dump_v2_message(entry_subtype, data);
201            match msg {
202                Ok(msg) => MrtMessage::TableDumpV2Message(msg),
203                Err(e) => {
204                    return Err(e);
205                }
206            }
207        }
208        EntryType::BGP4MP | EntryType::BGP4MP_ET => {
209            let msg = parse_bgp4mp(entry_subtype, data);
210            match msg {
211                Ok(msg) => MrtMessage::Bgp4Mp(msg),
212                Err(e) => {
213                    return Err(e);
214                }
215            }
216        }
217        v => {
218            // deprecated
219            return Err(ParserError::Unsupported(format!(
220                "unsupported MRT type: {v:?}"
221            )));
222        }
223    };
224    Ok(message)
225}
226
227impl MrtRecord {
228    pub fn encode(&self) -> Bytes {
229        let message_bytes = self.message.encode(self.common_header.entry_subtype);
230        let mut new_header = self.common_header;
231        if message_bytes.len() < new_header.length as usize {
232            warn!("message length is less than the length in the header");
233            new_header.length = message_bytes.len() as u32;
234        }
235        let header_bytes = new_header.encode();
236
237        // // debug begins
238        // let parsed_body = parse_mrt_body(
239        //     self.common_header.entry_type as u16,
240        //     self.common_header.entry_subtype,
241        //     message_bytes.clone(),
242        // )
243        // .unwrap();
244        // assert!(self.message == parsed_body);
245        // // debug ends
246
247        let mut bytes = BytesMut::with_capacity(header_bytes.len() + message_bytes.len());
248        bytes.put_slice(&header_bytes);
249        bytes.put_slice(&message_bytes);
250        bytes.freeze()
251    }
252}
253
254impl TryFrom<&BmpMessage> for MrtRecord {
255    type Error = String;
256
257    fn try_from(bmp_message: &BmpMessage) -> Result<Self, Self::Error> {
258        let bgp_message = match &bmp_message.message_body {
259            BmpMessageBody::RouteMonitoring(m) => &m.bgp_message,
260            _ => return Err("unsupported bmp message type".to_string()),
261        };
262        let bmp_header = match &bmp_message.per_peer_header {
263            Some(h) => h,
264            None => return Err("missing per peer header".to_string()),
265        };
266
267        let local_ip = match bmp_header.peer_ip {
268            IpAddr::V4(_) => IpAddr::from_str("0.0.0.0").unwrap(),
269            IpAddr::V6(_) => IpAddr::from_str("::").unwrap(),
270        };
271        let local_asn = match bmp_header.peer_asn.is_four_byte() {
272            true => Asn::new_32bit(0),
273            false => Asn::new_16bit(0),
274        };
275
276        let bgp4mp_message = Bgp4MpMessage {
277            msg_type: Bgp4MpType::MessageAs4, // TODO: check Message or MessageAs4
278            peer_asn: bmp_header.peer_asn,
279            local_asn,
280            interface_index: 0,
281            peer_ip: bmp_header.peer_ip,
282            local_ip,
283            bgp_message: bgp_message.clone(),
284        };
285
286        let mrt_message = MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(bgp4mp_message));
287
288        let (seconds, microseconds) = convert_timestamp(bmp_header.timestamp);
289
290        let subtype = Bgp4MpType::MessageAs4 as u16;
291        let mrt_header = CommonHeader {
292            timestamp: seconds,
293            microsecond_timestamp: Some(microseconds),
294            entry_type: EntryType::BGP4MP_ET,
295            entry_subtype: Bgp4MpType::MessageAs4 as u16,
296            length: mrt_message.encode(subtype).len() as u32,
297        };
298
299        Ok(MrtRecord {
300            common_header: mrt_header,
301            message: mrt_message,
302        })
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use crate::bmp::messages::headers::{BmpPeerType, PeerFlags, PerPeerFlags};
310    use crate::bmp::messages::{BmpCommonHeader, BmpMsgType, BmpPerPeerHeader, RouteMonitoring};
311    use std::net::Ipv4Addr;
312    use tempfile::tempdir;
313
314    #[test]
315    fn test_raw_mrt_record_raw_bytes() {
316        let header = CommonHeader {
317            timestamp: 1609459200,
318            microsecond_timestamp: None,
319            entry_type: EntryType::BGP4MP,
320            entry_subtype: 4,
321            length: 10,
322        };
323        let header_bytes = Bytes::from_static(&[
324            0x5f, 0xee, 0x6a, 0x80, // timestamp
325            0x00, 0x10, // entry type
326            0x00, 0x04, // entry subtype
327            0x00, 0x00, 0x00, 0x0a, // length
328        ]);
329        let message_bytes = Bytes::from_static(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
330
331        let raw_record = RawMrtRecord {
332            common_header: header,
333            header_bytes,
334            message_bytes,
335        };
336
337        let mrt_bytes = raw_record.raw_bytes();
338        // Header is 12 bytes + 10 bytes body = 22 bytes total
339        assert_eq!(mrt_bytes.len(), 22);
340        assert_eq!(raw_record.total_bytes_len(), 22);
341    }
342
343    #[test]
344    fn test_raw_mrt_record_raw_bytes_with_et() {
345        let header = CommonHeader {
346            timestamp: 1609459200,
347            microsecond_timestamp: Some(500000),
348            entry_type: EntryType::BGP4MP_ET,
349            entry_subtype: 4,
350            length: 10,
351        };
352        let header_bytes = Bytes::from_static(&[
353            0x5f, 0xee, 0x6a, 0x80, // timestamp
354            0x00, 0x11, // entry type (BGP4MP_ET = 17)
355            0x00, 0x04, // entry subtype
356            0x00, 0x00, 0x00, 0x0e, // length (10 + 4 for microseconds)
357            0x00, 0x07, 0xa1, 0x20, // microsecond timestamp (500000)
358        ]);
359        let message_bytes = Bytes::from_static(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
360
361        let raw_record = RawMrtRecord {
362            common_header: header,
363            header_bytes,
364            message_bytes,
365        };
366
367        let mrt_bytes = raw_record.raw_bytes();
368        // ET Header is 16 bytes + 10 bytes body = 26 bytes total
369        assert_eq!(mrt_bytes.len(), 26);
370        assert_eq!(raw_record.total_bytes_len(), 26);
371    }
372
373    #[test]
374    fn test_raw_mrt_record_write_to_file() {
375        let dir = tempdir().unwrap();
376        let file_path = dir.path().join("test_record.mrt");
377
378        let header = CommonHeader {
379            timestamp: 1609459200,
380            microsecond_timestamp: None,
381            entry_type: EntryType::BGP4MP,
382            entry_subtype: 4,
383            length: 5,
384        };
385        let header_bytes = Bytes::from_static(&[
386            0x5f, 0xee, 0x6a, 0x80, // timestamp
387            0x00, 0x10, // entry type
388            0x00, 0x04, // entry subtype
389            0x00, 0x00, 0x00, 0x05, // length
390        ]);
391        let message_bytes = Bytes::from_static(&[1, 2, 3, 4, 5]);
392
393        let raw_record = RawMrtRecord {
394            common_header: header,
395            header_bytes,
396            message_bytes,
397        };
398
399        raw_record.write_raw_bytes(&file_path).unwrap();
400
401        let written_bytes = std::fs::read(&file_path).unwrap();
402        assert_eq!(written_bytes.len(), 17); // 12 header + 5 body
403    }
404
405    #[test]
406    fn test_raw_mrt_record_append_to_file() {
407        let dir = tempdir().unwrap();
408        let file_path = dir.path().join("test_records.mrt");
409
410        let header = CommonHeader {
411            timestamp: 1609459200,
412            microsecond_timestamp: None,
413            entry_type: EntryType::BGP4MP,
414            entry_subtype: 4,
415            length: 3,
416        };
417        let header_bytes = Bytes::from_static(&[
418            0x5f, 0xee, 0x6a, 0x80, // timestamp
419            0x00, 0x10, // entry type
420            0x00, 0x04, // entry subtype
421            0x00, 0x00, 0x00, 0x03, // length
422        ]);
423        let message_bytes = Bytes::from_static(&[1, 2, 3]);
424
425        let raw_record = RawMrtRecord {
426            common_header: header,
427            header_bytes,
428            message_bytes,
429        };
430
431        raw_record.append_raw_bytes(&file_path).unwrap();
432        raw_record.append_raw_bytes(&file_path).unwrap();
433
434        let written_bytes = std::fs::read(&file_path).unwrap();
435        assert_eq!(written_bytes.len(), 30); // (12 header + 3 body) * 2
436    }
437
438    #[test]
439    fn test_try_from_bmp_message() {
440        let bmp_message = BmpMessage {
441            common_header: BmpCommonHeader {
442                version: 0,
443                msg_len: 0,
444                msg_type: BmpMsgType::RouteMonitoring,
445            },
446            per_peer_header: Some(BmpPerPeerHeader {
447                peer_asn: Asn::new_32bit(0),
448                peer_ip: IpAddr::from_str("10.0.0.1").unwrap(),
449                peer_bgp_id: Ipv4Addr::from_str("10.0.0.2").unwrap(),
450                timestamp: 0.0,
451                peer_type: BmpPeerType::Global,
452                peer_flags: PerPeerFlags::PeerFlags(PeerFlags::empty()),
453                peer_distinguisher: 0,
454            }),
455            message_body: BmpMessageBody::RouteMonitoring(RouteMonitoring {
456                bgp_message: BgpMessage::KeepAlive,
457            }),
458        };
459
460        let mrt_record = MrtRecord::try_from(&bmp_message).unwrap();
461        assert_eq!(mrt_record.common_header.entry_type, EntryType::BGP4MP_ET);
462    }
463
464    #[test]
465    fn test_parse_mrt_body() {
466        let mut data = BytesMut::new();
467        data.put_u16(0);
468        data.put_u16(0);
469        data.put_u32(0);
470        data.put_u16(0);
471
472        let result = parse_mrt_body(0, 0, data.freeze());
473        assert!(result.is_err());
474    }
475}