1use super::mrt_header::parse_common_header_with_bytes;
2use crate::bmp::messages::{BmpMessage, BmpMessageBody};
3use crate::error::ParserError;
4use crate::models::*;
5use crate::parser::{
6 parse_bgp4mp, parse_table_dump_message, parse_table_dump_v2_message, ParserErrorWithBytes,
7};
8use crate::utils::convert_timestamp;
9use bytes::{BufMut, Bytes, BytesMut};
10use log::warn;
11use std::convert::TryFrom;
12use std::fs::File;
13use std::io::{Read, Write};
14use std::net::IpAddr;
15use std::path::Path;
16use std::str::FromStr;
17
18#[derive(Debug, Clone)]
22pub struct RawMrtRecord {
23 pub common_header: CommonHeader,
24 pub header_bytes: Bytes,
26 pub message_bytes: Bytes,
28}
29
30impl RawMrtRecord {
31 pub fn parse(self) -> Result<MrtRecord, ParserError> {
34 let message = parse_mrt_body(
35 self.common_header.entry_type as u16,
36 self.common_header.entry_subtype,
37 self.message_bytes,
38 )?;
39
40 Ok(MrtRecord {
41 common_header: self.common_header,
42 message,
43 })
44 }
45
46 pub fn raw_bytes(&self) -> Bytes {
59 let mut bytes = BytesMut::with_capacity(self.header_bytes.len() + self.message_bytes.len());
60 bytes.put_slice(&self.header_bytes);
61 bytes.put_slice(&self.message_bytes);
62 bytes.freeze()
63 }
64
65 pub fn write_raw_bytes<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
79 let mut file = File::create(path)?;
80 file.write_all(&self.header_bytes)?;
81 file.write_all(&self.message_bytes)?;
82 Ok(())
83 }
84
85 pub fn append_raw_bytes<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
101 let mut file = std::fs::OpenOptions::new()
102 .create(true)
103 .append(true)
104 .open(path)?;
105 file.write_all(&self.header_bytes)?;
106 file.write_all(&self.message_bytes)?;
107 Ok(())
108 }
109
110 pub fn total_bytes_len(&self) -> usize {
112 self.header_bytes.len() + self.message_bytes.len()
113 }
114}
115
116pub fn chunk_mrt_record(input: &mut impl Read) -> Result<RawMrtRecord, ParserErrorWithBytes> {
117 let parsed_header = match parse_common_header_with_bytes(input) {
119 Ok(v) => v,
120 Err(e) => {
121 if let ParserError::EofError(e) = &e {
122 if e.kind() == std::io::ErrorKind::UnexpectedEof {
123 return Err(ParserErrorWithBytes::from(ParserError::EofExpected));
124 }
125 }
126 return Err(ParserErrorWithBytes {
127 error: e,
128 bytes: None,
129 });
130 }
131 };
132
133 let common_header = parsed_header.header;
134 let header_bytes = parsed_header.raw_bytes;
135
136 const MAX_MRT_MESSAGE_LEN: u32 = 16 * 1024 * 1024; if common_header.length > MAX_MRT_MESSAGE_LEN {
139 return Err(ParserErrorWithBytes::from(ParserError::Unsupported(
140 format!("MRT message too large: {} bytes", common_header.length),
141 )));
142 }
143
144 let mut buffer = BytesMut::zeroed(common_header.length as usize);
146 match input
147 .take(common_header.length as u64)
148 .read_exact(&mut buffer)
149 {
150 Ok(_) => {}
151 Err(e) => {
152 return Err(ParserErrorWithBytes {
153 error: ParserError::IoError(e),
154 bytes: None,
155 })
156 }
157 }
158
159 Ok(RawMrtRecord {
160 common_header,
161 header_bytes,
162 message_bytes: buffer.freeze(),
163 })
164}
165
166pub fn parse_mrt_record(input: &mut impl Read) -> Result<MrtRecord, ParserErrorWithBytes> {
167 let raw_record = chunk_mrt_record(input)?;
168 match raw_record.parse() {
169 Ok(record) => Ok(record),
170 Err(e) => Err(ParserErrorWithBytes {
171 error: e,
172 bytes: None,
173 }),
174 }
175}
176
177pub fn parse_mrt_body(
183 entry_type: u16,
184 entry_subtype: u16,
185 data: Bytes,
186) -> Result<MrtMessage, ParserError> {
187 let etype = EntryType::try_from(entry_type)?;
188
189 let message: MrtMessage = match &etype {
190 EntryType::TABLE_DUMP => {
191 let msg = parse_table_dump_message(entry_subtype, data);
192 match msg {
193 Ok(msg) => MrtMessage::TableDumpMessage(msg),
194 Err(e) => {
195 return Err(e);
196 }
197 }
198 }
199 EntryType::TABLE_DUMP_V2 => {
200 let msg = parse_table_dump_v2_message(entry_subtype, data);
201 match msg {
202 Ok(msg) => MrtMessage::TableDumpV2Message(msg),
203 Err(e) => {
204 return Err(e);
205 }
206 }
207 }
208 EntryType::BGP4MP | EntryType::BGP4MP_ET => {
209 let msg = parse_bgp4mp(entry_subtype, data);
210 match msg {
211 Ok(msg) => MrtMessage::Bgp4Mp(msg),
212 Err(e) => {
213 return Err(e);
214 }
215 }
216 }
217 v => {
218 return Err(ParserError::Unsupported(format!(
220 "unsupported MRT type: {v:?}"
221 )));
222 }
223 };
224 Ok(message)
225}
226
227impl MrtRecord {
228 pub fn encode(&self) -> Bytes {
229 let message_bytes = self.message.encode(self.common_header.entry_subtype);
230 let mut new_header = self.common_header;
231 if message_bytes.len() < new_header.length as usize {
232 warn!("message length is less than the length in the header");
233 new_header.length = message_bytes.len() as u32;
234 }
235 let header_bytes = new_header.encode();
236
237 let mut bytes = BytesMut::with_capacity(header_bytes.len() + message_bytes.len());
248 bytes.put_slice(&header_bytes);
249 bytes.put_slice(&message_bytes);
250 bytes.freeze()
251 }
252}
253
254impl TryFrom<&BmpMessage> for MrtRecord {
255 type Error = String;
256
257 fn try_from(bmp_message: &BmpMessage) -> Result<Self, Self::Error> {
258 let bgp_message = match &bmp_message.message_body {
259 BmpMessageBody::RouteMonitoring(m) => &m.bgp_message,
260 _ => return Err("unsupported bmp message type".to_string()),
261 };
262 let bmp_header = match &bmp_message.per_peer_header {
263 Some(h) => h,
264 None => return Err("missing per peer header".to_string()),
265 };
266
267 let local_ip = match bmp_header.peer_ip {
268 IpAddr::V4(_) => IpAddr::from_str("0.0.0.0").unwrap(),
269 IpAddr::V6(_) => IpAddr::from_str("::").unwrap(),
270 };
271 let local_asn = match bmp_header.peer_asn.is_four_byte() {
272 true => Asn::new_32bit(0),
273 false => Asn::new_16bit(0),
274 };
275
276 let bgp4mp_message = Bgp4MpMessage {
277 msg_type: Bgp4MpType::MessageAs4, peer_asn: bmp_header.peer_asn,
279 local_asn,
280 interface_index: 0,
281 peer_ip: bmp_header.peer_ip,
282 local_ip,
283 bgp_message: bgp_message.clone(),
284 };
285
286 let mrt_message = MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(bgp4mp_message));
287
288 let (seconds, microseconds) = convert_timestamp(bmp_header.timestamp);
289
290 let subtype = Bgp4MpType::MessageAs4 as u16;
291 let mrt_header = CommonHeader {
292 timestamp: seconds,
293 microsecond_timestamp: Some(microseconds),
294 entry_type: EntryType::BGP4MP_ET,
295 entry_subtype: Bgp4MpType::MessageAs4 as u16,
296 length: mrt_message.encode(subtype).len() as u32,
297 };
298
299 Ok(MrtRecord {
300 common_header: mrt_header,
301 message: mrt_message,
302 })
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use crate::bmp::messages::headers::{BmpPeerType, PeerFlags, PerPeerFlags};
310 use crate::bmp::messages::{BmpCommonHeader, BmpMsgType, BmpPerPeerHeader, RouteMonitoring};
311 use std::net::Ipv4Addr;
312 use tempfile::tempdir;
313
314 #[test]
315 fn test_raw_mrt_record_raw_bytes() {
316 let header = CommonHeader {
317 timestamp: 1609459200,
318 microsecond_timestamp: None,
319 entry_type: EntryType::BGP4MP,
320 entry_subtype: 4,
321 length: 10,
322 };
323 let header_bytes = Bytes::from_static(&[
324 0x5f, 0xee, 0x6a, 0x80, 0x00, 0x10, 0x00, 0x04, 0x00, 0x00, 0x00, 0x0a, ]);
329 let message_bytes = Bytes::from_static(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
330
331 let raw_record = RawMrtRecord {
332 common_header: header,
333 header_bytes,
334 message_bytes,
335 };
336
337 let mrt_bytes = raw_record.raw_bytes();
338 assert_eq!(mrt_bytes.len(), 22);
340 assert_eq!(raw_record.total_bytes_len(), 22);
341 }
342
343 #[test]
344 fn test_raw_mrt_record_raw_bytes_with_et() {
345 let header = CommonHeader {
346 timestamp: 1609459200,
347 microsecond_timestamp: Some(500000),
348 entry_type: EntryType::BGP4MP_ET,
349 entry_subtype: 4,
350 length: 10,
351 };
352 let header_bytes = Bytes::from_static(&[
353 0x5f, 0xee, 0x6a, 0x80, 0x00, 0x11, 0x00, 0x04, 0x00, 0x00, 0x00, 0x0e, 0x00, 0x07, 0xa1, 0x20, ]);
359 let message_bytes = Bytes::from_static(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
360
361 let raw_record = RawMrtRecord {
362 common_header: header,
363 header_bytes,
364 message_bytes,
365 };
366
367 let mrt_bytes = raw_record.raw_bytes();
368 assert_eq!(mrt_bytes.len(), 26);
370 assert_eq!(raw_record.total_bytes_len(), 26);
371 }
372
373 #[test]
374 fn test_raw_mrt_record_write_to_file() {
375 let dir = tempdir().unwrap();
376 let file_path = dir.path().join("test_record.mrt");
377
378 let header = CommonHeader {
379 timestamp: 1609459200,
380 microsecond_timestamp: None,
381 entry_type: EntryType::BGP4MP,
382 entry_subtype: 4,
383 length: 5,
384 };
385 let header_bytes = Bytes::from_static(&[
386 0x5f, 0xee, 0x6a, 0x80, 0x00, 0x10, 0x00, 0x04, 0x00, 0x00, 0x00, 0x05, ]);
391 let message_bytes = Bytes::from_static(&[1, 2, 3, 4, 5]);
392
393 let raw_record = RawMrtRecord {
394 common_header: header,
395 header_bytes,
396 message_bytes,
397 };
398
399 raw_record.write_raw_bytes(&file_path).unwrap();
400
401 let written_bytes = std::fs::read(&file_path).unwrap();
402 assert_eq!(written_bytes.len(), 17); }
404
405 #[test]
406 fn test_raw_mrt_record_append_to_file() {
407 let dir = tempdir().unwrap();
408 let file_path = dir.path().join("test_records.mrt");
409
410 let header = CommonHeader {
411 timestamp: 1609459200,
412 microsecond_timestamp: None,
413 entry_type: EntryType::BGP4MP,
414 entry_subtype: 4,
415 length: 3,
416 };
417 let header_bytes = Bytes::from_static(&[
418 0x5f, 0xee, 0x6a, 0x80, 0x00, 0x10, 0x00, 0x04, 0x00, 0x00, 0x00, 0x03, ]);
423 let message_bytes = Bytes::from_static(&[1, 2, 3]);
424
425 let raw_record = RawMrtRecord {
426 common_header: header,
427 header_bytes,
428 message_bytes,
429 };
430
431 raw_record.append_raw_bytes(&file_path).unwrap();
432 raw_record.append_raw_bytes(&file_path).unwrap();
433
434 let written_bytes = std::fs::read(&file_path).unwrap();
435 assert_eq!(written_bytes.len(), 30); }
437
438 #[test]
439 fn test_try_from_bmp_message() {
440 let bmp_message = BmpMessage {
441 common_header: BmpCommonHeader {
442 version: 0,
443 msg_len: 0,
444 msg_type: BmpMsgType::RouteMonitoring,
445 },
446 per_peer_header: Some(BmpPerPeerHeader {
447 peer_asn: Asn::new_32bit(0),
448 peer_ip: IpAddr::from_str("10.0.0.1").unwrap(),
449 peer_bgp_id: Ipv4Addr::from_str("10.0.0.2").unwrap(),
450 timestamp: 0.0,
451 peer_type: BmpPeerType::Global,
452 peer_flags: PerPeerFlags::PeerFlags(PeerFlags::empty()),
453 peer_distinguisher: 0,
454 }),
455 message_body: BmpMessageBody::RouteMonitoring(RouteMonitoring {
456 bgp_message: BgpMessage::KeepAlive,
457 }),
458 };
459
460 let mrt_record = MrtRecord::try_from(&bmp_message).unwrap();
461 assert_eq!(mrt_record.common_header.entry_type, EntryType::BGP4MP_ET);
462 }
463
464 #[test]
465 fn test_parse_mrt_body() {
466 let mut data = BytesMut::new();
467 data.put_u16(0);
468 data.put_u16(0);
469 data.put_u32(0);
470 data.put_u16(0);
471
472 let result = parse_mrt_body(0, 0, data.freeze());
473 assert!(result.is_err());
474 }
475}