Skip to main content

bgpkit_parser/parser/mrt/
mrt_record.rs

1use super::mrt_header::parse_common_header_with_bytes;
2use crate::bmp::messages::{BmpMessage, BmpMessageBody};
3use crate::error::ParserError;
4use crate::models::*;
5use crate::parser::{
6    parse_bgp4mp, parse_table_dump_message, parse_table_dump_v2_message, ParserErrorWithBytes,
7};
8use crate::utils::convert_timestamp;
9use bytes::{BufMut, Bytes, BytesMut};
10use log::warn;
11use std::convert::TryFrom;
12use std::fs::File;
13use std::io::{Read, Write};
14use std::net::IpAddr;
15use std::path::Path;
16use std::str::FromStr;
17
18/// Raw MRT record containing the common header and unparsed message bytes.
19/// This allows for lazy parsing of the MRT message body, and provides
20/// utilities for debugging and exporting problematic records.
21#[derive(Debug, Clone)]
22pub struct RawMrtRecord {
23    pub common_header: CommonHeader,
24    /// The raw bytes of the MRT common header (as read from the wire).
25    pub header_bytes: Bytes,
26    /// The raw bytes of the MRT message body (excluding the common header).
27    pub message_bytes: Bytes,
28}
29
30impl RawMrtRecord {
31    /// Parse the raw MRT record into a fully parsed MrtRecord.
32    /// This consumes the RawMrtRecord and returns a MrtRecord.
33    pub fn parse(self) -> Result<MrtRecord, ParserError> {
34        let message = parse_mrt_body(
35            self.common_header.entry_type as u16,
36            self.common_header.entry_subtype,
37            self.message_bytes,
38        )?;
39
40        Ok(MrtRecord {
41            common_header: self.common_header,
42            message,
43        })
44    }
45
46    /// Returns the complete MRT record as raw bytes (header + message body).
47    ///
48    /// This returns the exact bytes as they were read from the wire,
49    /// without any re-encoding. This is useful for debugging problematic
50    /// MRT records by exporting them as-is to a file for further analysis.
51    ///
52    /// # Example
53    /// ```ignore
54    /// let raw_record = parser.into_raw_record_iter().next().unwrap();
55    /// let bytes = raw_record.raw_bytes();
56    /// std::fs::write("record.mrt", &bytes).unwrap();
57    /// ```
58    pub fn raw_bytes(&self) -> Bytes {
59        let mut bytes = BytesMut::with_capacity(self.header_bytes.len() + self.message_bytes.len());
60        bytes.put_slice(&self.header_bytes);
61        bytes.put_slice(&self.message_bytes);
62        bytes.freeze()
63    }
64
65    /// Writes the raw MRT record (header + message body) to a file.
66    ///
67    /// This is useful for extracting problematic MRT records for debugging
68    /// or further analysis with other tools.
69    ///
70    /// # Arguments
71    /// * `path` - The path to write the raw bytes to.
72    ///
73    /// # Example
74    /// ```ignore
75    /// let raw_record = parser.into_raw_record_iter().next().unwrap();
76    /// raw_record.write_raw_bytes("problematic_record.mrt").unwrap();
77    /// ```
78    pub fn write_raw_bytes<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
79        let mut file = File::create(path)?;
80        file.write_all(&self.header_bytes)?;
81        file.write_all(&self.message_bytes)?;
82        Ok(())
83    }
84
85    /// Appends the raw MRT record (header + message body) to a file.
86    ///
87    /// This is useful for collecting multiple problematic records into a single file.
88    ///
89    /// # Arguments
90    /// * `path` - The path to append the raw bytes to.
91    ///
92    /// # Example
93    /// ```ignore
94    /// for raw_record in parser.into_raw_record_iter() {
95    ///     if is_problematic(&raw_record) {
96    ///         raw_record.append_raw_bytes("problematic_records.mrt").unwrap();
97    ///     }
98    /// }
99    /// ```
100    pub fn append_raw_bytes<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
101        let mut file = std::fs::OpenOptions::new()
102            .create(true)
103            .append(true)
104            .open(path)?;
105        file.write_all(&self.header_bytes)?;
106        file.write_all(&self.message_bytes)?;
107        Ok(())
108    }
109
110    /// Returns the total length of the complete MRT record in bytes (header + body).
111    pub fn total_bytes_len(&self) -> usize {
112        self.header_bytes.len() + self.message_bytes.len()
113    }
114}
115
116pub fn chunk_mrt_record(input: &mut impl Read) -> Result<RawMrtRecord, ParserErrorWithBytes> {
117    // parse common header and capture raw bytes
118    let parsed_header = match parse_common_header_with_bytes(input) {
119        Ok(v) => v,
120        Err(e) => {
121            if let ParserError::EofError(e) = &e {
122                if e.kind() == std::io::ErrorKind::UnexpectedEof {
123                    return Err(ParserErrorWithBytes::from(ParserError::EofExpected));
124                }
125            }
126            return Err(ParserErrorWithBytes {
127                error: e,
128                bytes: None,
129            });
130        }
131    };
132
133    let common_header = parsed_header.header;
134    let header_bytes = parsed_header.raw_bytes;
135
136    // Protect against unreasonable allocations from corrupt headers
137    const MAX_MRT_MESSAGE_LEN: u32 = 16 * 1024 * 1024; // 16 MiB upper bound
138    if common_header.length > MAX_MRT_MESSAGE_LEN {
139        return Err(ParserErrorWithBytes::from(ParserError::Unsupported(
140            format!("MRT message too large: {} bytes", common_header.length),
141        )));
142    }
143
144    // read the whole message bytes to buffer
145    let mut buffer = BytesMut::zeroed(common_header.length as usize);
146    match input
147        .take(common_header.length as u64)
148        .read_exact(&mut buffer)
149    {
150        Ok(_) => {}
151        Err(e) => {
152            return Err(ParserErrorWithBytes {
153                error: ParserError::IoError(e),
154                bytes: None,
155            })
156        }
157    }
158
159    Ok(RawMrtRecord {
160        common_header,
161        header_bytes,
162        message_bytes: buffer.freeze(),
163    })
164}
165
166pub fn parse_mrt_record(input: &mut impl Read) -> Result<MrtRecord, ParserErrorWithBytes> {
167    let raw_record = chunk_mrt_record(input)?;
168    match raw_record.parse() {
169        Ok(record) => Ok(record),
170        Err(e) => Err(ParserErrorWithBytes {
171            error: e,
172            bytes: None,
173        }),
174    }
175}
176
177/// Parse MRT message body with given entry type and subtype.
178///
179/// The entry type and subtype are parsed from the common header. The message body is parsed
180/// according to the entry type and subtype. The message body is the remaining bytes after the
181/// common header. The length of the message body is also parsed from the common header.
182pub fn parse_mrt_body(
183    entry_type: u16,
184    entry_subtype: u16,
185    data: Bytes,
186) -> Result<MrtMessage, ParserError> {
187    let etype = EntryType::try_from(entry_type)?;
188
189    let message: MrtMessage = match &etype {
190        EntryType::TABLE_DUMP => {
191            let msg = parse_table_dump_message(entry_subtype, data);
192            match msg {
193                Ok(msg) => MrtMessage::TableDumpMessage(msg),
194                Err(e) => {
195                    return Err(e);
196                }
197            }
198        }
199        EntryType::TABLE_DUMP_V2 => {
200            let msg = parse_table_dump_v2_message(entry_subtype, data);
201            match msg {
202                Ok(msg) => MrtMessage::TableDumpV2Message(msg),
203                Err(e) => {
204                    return Err(e);
205                }
206            }
207        }
208        EntryType::BGP4MP | EntryType::BGP4MP_ET => {
209            let msg = parse_bgp4mp(entry_subtype, data);
210            match msg {
211                Ok(msg) => MrtMessage::Bgp4Mp(msg),
212                Err(e) => {
213                    return Err(e);
214                }
215            }
216        }
217        v => {
218            // deprecated
219            return Err(ParserError::Unsupported(format!(
220                "unsupported MRT type: {v:?}"
221            )));
222        }
223    };
224    Ok(message)
225}
226
227impl MrtRecord {
228    pub fn encode(&self) -> Bytes {
229        let message_bytes = self.message.encode(self.common_header.entry_subtype);
230        let mut new_header = self.common_header;
231        if message_bytes.len() != new_header.length as usize {
232            warn!(
233                "message length {} does not match the length in the header {} (encoding MRT record)",
234                message_bytes.len(),
235                new_header.length
236            );
237        }
238        new_header.length = message_bytes.len() as u32;
239        let header_bytes = new_header.encode();
240
241        // // debug begins
242        // let parsed_body = parse_mrt_body(
243        //     self.common_header.entry_type as u16,
244        //     self.common_header.entry_subtype,
245        //     message_bytes.clone(),
246        // )
247        // .unwrap();
248        // assert!(self.message == parsed_body);
249        // // debug ends
250
251        let mut bytes = BytesMut::with_capacity(header_bytes.len() + message_bytes.len());
252        bytes.put_slice(&header_bytes);
253        bytes.put_slice(&message_bytes);
254        bytes.freeze()
255    }
256}
257
258impl TryFrom<&BmpMessage> for MrtRecord {
259    type Error = String;
260
261    fn try_from(bmp_message: &BmpMessage) -> Result<Self, Self::Error> {
262        let bgp_message = match &bmp_message.message_body {
263            BmpMessageBody::RouteMonitoring(m) => &m.bgp_message,
264            _ => return Err("unsupported bmp message type".to_string()),
265        };
266        let bmp_header = match &bmp_message.per_peer_header {
267            Some(h) => h,
268            None => return Err("missing per peer header".to_string()),
269        };
270
271        let local_ip = match bmp_header.peer_ip {
272            IpAddr::V4(_) => IpAddr::from_str("0.0.0.0").unwrap(),
273            IpAddr::V6(_) => IpAddr::from_str("::").unwrap(),
274        };
275        let local_asn = match bmp_header.peer_asn.is_four_byte() {
276            true => Asn::new_32bit(0),
277            false => Asn::new_16bit(0),
278        };
279
280        let bgp4mp_message = Bgp4MpMessage {
281            msg_type: Bgp4MpType::MessageAs4, // TODO: check Message or MessageAs4
282            peer_asn: bmp_header.peer_asn,
283            local_asn,
284            interface_index: 0,
285            peer_ip: bmp_header.peer_ip,
286            local_ip,
287            bgp_message: bgp_message.clone(),
288        };
289
290        let mrt_message = MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(bgp4mp_message));
291
292        let (seconds, microseconds) = convert_timestamp(bmp_header.timestamp);
293
294        let subtype = Bgp4MpType::MessageAs4 as u16;
295        let mrt_header = CommonHeader {
296            timestamp: seconds,
297            microsecond_timestamp: Some(microseconds),
298            entry_type: EntryType::BGP4MP_ET,
299            entry_subtype: Bgp4MpType::MessageAs4 as u16,
300            length: mrt_message.encode(subtype).len() as u32,
301        };
302
303        Ok(MrtRecord {
304            common_header: mrt_header,
305            message: mrt_message,
306        })
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313    use crate::bmp::messages::headers::{BmpPeerType, PeerFlags, PerPeerFlags};
314    use crate::bmp::messages::{BmpCommonHeader, BmpMsgType, BmpPerPeerHeader, RouteMonitoring};
315    use std::io::Cursor;
316    use std::net::Ipv4Addr;
317    use tempfile::tempdir;
318
319    #[test]
320    fn test_raw_mrt_record_raw_bytes() {
321        let header = CommonHeader {
322            timestamp: 1609459200,
323            microsecond_timestamp: None,
324            entry_type: EntryType::BGP4MP,
325            entry_subtype: 4,
326            length: 10,
327        };
328        let header_bytes = Bytes::from_static(&[
329            0x5f, 0xee, 0x6a, 0x80, // timestamp
330            0x00, 0x10, // entry type
331            0x00, 0x04, // entry subtype
332            0x00, 0x00, 0x00, 0x0a, // length
333        ]);
334        let message_bytes = Bytes::from_static(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
335
336        let raw_record = RawMrtRecord {
337            common_header: header,
338            header_bytes,
339            message_bytes,
340        };
341
342        let mrt_bytes = raw_record.raw_bytes();
343        // Header is 12 bytes + 10 bytes body = 22 bytes total
344        assert_eq!(mrt_bytes.len(), 22);
345        assert_eq!(raw_record.total_bytes_len(), 22);
346    }
347
348    #[test]
349    fn test_raw_mrt_record_raw_bytes_with_et() {
350        let header = CommonHeader {
351            timestamp: 1609459200,
352            microsecond_timestamp: Some(500000),
353            entry_type: EntryType::BGP4MP_ET,
354            entry_subtype: 4,
355            length: 10,
356        };
357        let header_bytes = Bytes::from_static(&[
358            0x5f, 0xee, 0x6a, 0x80, // timestamp
359            0x00, 0x11, // entry type (BGP4MP_ET = 17)
360            0x00, 0x04, // entry subtype
361            0x00, 0x00, 0x00, 0x0e, // length (10 + 4 for microseconds)
362            0x00, 0x07, 0xa1, 0x20, // microsecond timestamp (500000)
363        ]);
364        let message_bytes = Bytes::from_static(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
365
366        let raw_record = RawMrtRecord {
367            common_header: header,
368            header_bytes,
369            message_bytes,
370        };
371
372        let mrt_bytes = raw_record.raw_bytes();
373        // ET Header is 16 bytes + 10 bytes body = 26 bytes total
374        assert_eq!(mrt_bytes.len(), 26);
375        assert_eq!(raw_record.total_bytes_len(), 26);
376    }
377
378    #[test]
379    fn test_raw_mrt_record_write_to_file() {
380        let dir = tempdir().unwrap();
381        let file_path = dir.path().join("test_record.mrt");
382
383        let header = CommonHeader {
384            timestamp: 1609459200,
385            microsecond_timestamp: None,
386            entry_type: EntryType::BGP4MP,
387            entry_subtype: 4,
388            length: 5,
389        };
390        let header_bytes = Bytes::from_static(&[
391            0x5f, 0xee, 0x6a, 0x80, // timestamp
392            0x00, 0x10, // entry type
393            0x00, 0x04, // entry subtype
394            0x00, 0x00, 0x00, 0x05, // length
395        ]);
396        let message_bytes = Bytes::from_static(&[1, 2, 3, 4, 5]);
397
398        let raw_record = RawMrtRecord {
399            common_header: header,
400            header_bytes,
401            message_bytes,
402        };
403
404        raw_record.write_raw_bytes(&file_path).unwrap();
405
406        let written_bytes = std::fs::read(&file_path).unwrap();
407        assert_eq!(written_bytes.len(), 17); // 12 header + 5 body
408    }
409
410    #[test]
411    fn test_raw_mrt_record_append_to_file() {
412        let dir = tempdir().unwrap();
413        let file_path = dir.path().join("test_records.mrt");
414
415        let header = CommonHeader {
416            timestamp: 1609459200,
417            microsecond_timestamp: None,
418            entry_type: EntryType::BGP4MP,
419            entry_subtype: 4,
420            length: 3,
421        };
422        let header_bytes = Bytes::from_static(&[
423            0x5f, 0xee, 0x6a, 0x80, // timestamp
424            0x00, 0x10, // entry type
425            0x00, 0x04, // entry subtype
426            0x00, 0x00, 0x00, 0x03, // length
427        ]);
428        let message_bytes = Bytes::from_static(&[1, 2, 3]);
429
430        let raw_record = RawMrtRecord {
431            common_header: header,
432            header_bytes,
433            message_bytes,
434        };
435
436        raw_record.append_raw_bytes(&file_path).unwrap();
437        raw_record.append_raw_bytes(&file_path).unwrap();
438
439        let written_bytes = std::fs::read(&file_path).unwrap();
440        assert_eq!(written_bytes.len(), 30); // (12 header + 3 body) * 2
441    }
442
443    #[test]
444    fn test_try_from_bmp_message() {
445        let bmp_message = BmpMessage {
446            common_header: BmpCommonHeader {
447                version: 0,
448                msg_len: 0,
449                msg_type: BmpMsgType::RouteMonitoring,
450            },
451            per_peer_header: Some(BmpPerPeerHeader {
452                peer_asn: Asn::new_32bit(0),
453                peer_ip: IpAddr::from_str("10.0.0.1").unwrap(),
454                peer_bgp_id: Ipv4Addr::from_str("10.0.0.2").unwrap(),
455                timestamp: 0.0,
456                peer_type: BmpPeerType::Global,
457                peer_flags: PerPeerFlags::PeerFlags(PeerFlags::empty()),
458                peer_distinguisher: 0,
459            }),
460            message_body: BmpMessageBody::RouteMonitoring(RouteMonitoring {
461                bgp_message: BgpMessage::KeepAlive,
462            }),
463        };
464
465        let mrt_record = MrtRecord::try_from(&bmp_message).unwrap();
466        assert_eq!(mrt_record.common_header.entry_type, EntryType::BGP4MP_ET);
467    }
468
469    #[test]
470    fn test_parse_mrt_body() {
471        let mut data = BytesMut::new();
472        data.put_u16(0);
473        data.put_u16(0);
474        data.put_u32(0);
475        data.put_u16(0);
476
477        let result = parse_mrt_body(0, 0, data.freeze());
478        assert!(result.is_err());
479    }
480
481    #[test]
482    fn test_mrt_record_encode_updates_header_length() {
483        let record = MrtRecord {
484            common_header: CommonHeader {
485                timestamp: 1609459200,
486                microsecond_timestamp: None,
487                entry_type: EntryType::BGP4MP,
488                entry_subtype: Bgp4MpType::MessageAs4 as u16,
489                length: 0,
490            },
491            message: MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(Bgp4MpMessage {
492                msg_type: Bgp4MpType::MessageAs4,
493                peer_asn: Asn::new_32bit(65000),
494                local_asn: Asn::new_32bit(65001),
495                interface_index: 1,
496                peer_ip: IpAddr::from_str("10.0.0.1").unwrap(),
497                local_ip: IpAddr::from_str("10.0.0.2").unwrap(),
498                bgp_message: BgpMessage::KeepAlive,
499            })),
500        };
501
502        let encoded = record.encode();
503        let mut cursor = Cursor::new(encoded);
504        let parsed = parse_mrt_record(&mut cursor).unwrap();
505        let expected_len = parsed
506            .message
507            .encode(parsed.common_header.entry_subtype)
508            .len() as u32;
509
510        assert_eq!(parsed.common_header.length, expected_len);
511    }
512}