bgpkit_parser/parser/mrt/
mrt_record.rs1use super::mrt_header::parse_common_header;
2use crate::bmp::messages::{BmpMessage, BmpMessageBody};
3use crate::error::ParserError;
4use crate::models::*;
5use crate::parser::{
6 parse_bgp4mp, parse_table_dump_message, parse_table_dump_v2_message, ParserErrorWithBytes,
7};
8use crate::utils::convert_timestamp;
9use bytes::{BufMut, Bytes, BytesMut};
10use log::warn;
11use std::convert::TryFrom;
12use std::io::Read;
13use std::net::IpAddr;
14use std::str::FromStr;
15
16#[derive(Debug, Clone)]
19pub struct RawMrtRecord {
20 pub common_header: CommonHeader,
21 pub raw_bytes: Bytes,
22}
23
24impl RawMrtRecord {
25 pub fn parse(self) -> Result<MrtRecord, ParserError> {
28 let message = parse_mrt_body(
29 self.common_header.entry_type as u16,
30 self.common_header.entry_subtype,
31 self.raw_bytes,
32 )?;
33
34 Ok(MrtRecord {
35 common_header: self.common_header,
36 message,
37 })
38 }
39}
40
41pub fn chunk_mrt_record(input: &mut impl Read) -> Result<RawMrtRecord, ParserErrorWithBytes> {
42 let common_header = match parse_common_header(input) {
44 Ok(v) => v,
45 Err(e) => {
46 if let ParserError::EofError(e) = &e {
47 if e.kind() == std::io::ErrorKind::UnexpectedEof {
48 return Err(ParserErrorWithBytes::from(ParserError::EofExpected));
49 }
50 }
51 return Err(ParserErrorWithBytes {
52 error: e,
53 bytes: None,
54 });
55 }
56 };
57
58 let mut buffer = BytesMut::zeroed(common_header.length as usize);
60 match input
61 .take(common_header.length as u64)
62 .read_exact(&mut buffer)
63 {
64 Ok(_) => {}
65 Err(e) => {
66 return Err(ParserErrorWithBytes {
67 error: ParserError::IoError(e),
68 bytes: None,
69 })
70 }
71 }
72
73 Ok(RawMrtRecord {
74 common_header,
75 raw_bytes: buffer.freeze(),
76 })
77}
78
79pub fn parse_mrt_record(input: &mut impl Read) -> Result<MrtRecord, ParserErrorWithBytes> {
80 let raw_record = chunk_mrt_record(input)?;
81 match raw_record.parse() {
82 Ok(record) => Ok(record),
83 Err(e) => Err(ParserErrorWithBytes {
84 error: e,
85 bytes: None,
86 }),
87 }
88}
89
90pub fn parse_mrt_body(
96 entry_type: u16,
97 entry_subtype: u16,
98 data: Bytes,
99) -> Result<MrtMessage, ParserError> {
100 let etype = EntryType::try_from(entry_type)?;
101
102 let message: MrtMessage = match &etype {
103 EntryType::TABLE_DUMP => {
104 let msg = parse_table_dump_message(entry_subtype, data);
105 match msg {
106 Ok(msg) => MrtMessage::TableDumpMessage(msg),
107 Err(e) => {
108 return Err(e);
109 }
110 }
111 }
112 EntryType::TABLE_DUMP_V2 => {
113 let msg = parse_table_dump_v2_message(entry_subtype, data);
114 match msg {
115 Ok(msg) => MrtMessage::TableDumpV2Message(msg),
116 Err(e) => {
117 return Err(e);
118 }
119 }
120 }
121 EntryType::BGP4MP | EntryType::BGP4MP_ET => {
122 let msg = parse_bgp4mp(entry_subtype, data);
123 match msg {
124 Ok(msg) => MrtMessage::Bgp4Mp(msg),
125 Err(e) => {
126 return Err(e);
127 }
128 }
129 }
130 v => {
131 return Err(ParserError::Unsupported(format!(
133 "unsupported MRT type: {v:?}"
134 )));
135 }
136 };
137 Ok(message)
138}
139
140impl MrtRecord {
141 pub fn encode(&self) -> Bytes {
142 let message_bytes = self.message.encode(self.common_header.entry_subtype);
143 let mut new_header = self.common_header;
144 if message_bytes.len() < new_header.length as usize {
145 warn!("message length is less than the length in the header");
146 new_header.length = message_bytes.len() as u32;
147 }
148 let header_bytes = new_header.encode();
149
150 let mut bytes = BytesMut::with_capacity(header_bytes.len() + message_bytes.len());
161 bytes.put_slice(&header_bytes);
162 bytes.put_slice(&message_bytes);
163 bytes.freeze()
164 }
165}
166
167impl TryFrom<&BmpMessage> for MrtRecord {
168 type Error = String;
169
170 fn try_from(bmp_message: &BmpMessage) -> Result<Self, Self::Error> {
171 let bgp_message = match &bmp_message.message_body {
172 BmpMessageBody::RouteMonitoring(m) => &m.bgp_message,
173 _ => return Err("unsupported bmp message type".to_string()),
174 };
175 let bmp_header = match &bmp_message.per_peer_header {
176 Some(h) => h,
177 None => return Err("missing per peer header".to_string()),
178 };
179
180 let local_ip = match bmp_header.peer_ip {
181 IpAddr::V4(_) => IpAddr::from_str("0.0.0.0").unwrap(),
182 IpAddr::V6(_) => IpAddr::from_str("::").unwrap(),
183 };
184 let local_asn = match bmp_header.peer_asn.is_four_byte() {
185 true => Asn::new_32bit(0),
186 false => Asn::new_16bit(0),
187 };
188
189 let bgp4mp_message = Bgp4MpMessage {
190 msg_type: Bgp4MpType::MessageAs4, peer_asn: bmp_header.peer_asn,
192 local_asn,
193 interface_index: 0,
194 peer_ip: bmp_header.peer_ip,
195 local_ip,
196 bgp_message: bgp_message.clone(),
197 };
198
199 let mrt_message = MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(bgp4mp_message));
200
201 let (seconds, microseconds) = convert_timestamp(bmp_header.timestamp);
202
203 let subtype = Bgp4MpType::MessageAs4 as u16;
204 let mrt_header = CommonHeader {
205 timestamp: seconds,
206 microsecond_timestamp: Some(microseconds),
207 entry_type: EntryType::BGP4MP_ET,
208 entry_subtype: Bgp4MpType::MessageAs4 as u16,
209 length: mrt_message.encode(subtype).len() as u32,
210 };
211
212 Ok(MrtRecord {
213 common_header: mrt_header,
214 message: mrt_message,
215 })
216 }
217}
218
219#[cfg(test)]
220mod tests {
221 use super::*;
222 use crate::bmp::messages::headers::{BmpPeerType, PeerFlags, PerPeerFlags};
223 use crate::bmp::messages::{BmpCommonHeader, BmpMsgType, BmpPerPeerHeader, RouteMonitoring};
224 use std::net::Ipv4Addr;
225
226 #[test]
227 fn test_try_from_bmp_message() {
228 let bmp_message = BmpMessage {
229 common_header: BmpCommonHeader {
230 version: 0,
231 msg_len: 0,
232 msg_type: BmpMsgType::RouteMonitoring,
233 },
234 per_peer_header: Some(BmpPerPeerHeader {
235 peer_asn: Asn::new_32bit(0),
236 peer_ip: IpAddr::from_str("10.0.0.1").unwrap(),
237 peer_bgp_id: Ipv4Addr::from_str("10.0.0.2").unwrap(),
238 timestamp: 0.0,
239 peer_type: BmpPeerType::Global,
240 peer_flags: PerPeerFlags::PeerFlags(PeerFlags::empty()),
241 peer_distinguisher: 0,
242 }),
243 message_body: BmpMessageBody::RouteMonitoring(RouteMonitoring {
244 bgp_message: BgpMessage::KeepAlive,
245 }),
246 };
247
248 let mrt_record = MrtRecord::try_from(&bmp_message).unwrap();
249 assert_eq!(mrt_record.common_header.entry_type, EntryType::BGP4MP_ET);
250 }
251
252 #[test]
253 fn test_parse_mrt_body() {
254 let mut data = BytesMut::new();
255 data.put_u16(0);
256 data.put_u16(0);
257 data.put_u32(0);
258 data.put_u16(0);
259
260 let result = parse_mrt_body(0, 0, data.freeze());
261 assert!(result.is_err());
262 }
263}