1use super::mrt_header::parse_common_header_with_bytes;
2use crate::bmp::messages::{BmpMessage, BmpMessageBody};
3use crate::error::ParserError;
4use crate::models::*;
5use crate::parser::{
6 parse_bgp4mp, parse_table_dump_message, parse_table_dump_v2_message, ParserErrorWithBytes,
7};
8use crate::utils::convert_timestamp;
9use bytes::{BufMut, Bytes, BytesMut};
10use log::warn;
11use std::convert::TryFrom;
12use std::fs::File;
13use std::io::{Read, Write};
14use std::net::IpAddr;
15use std::path::Path;
16use std::str::FromStr;
17
18#[derive(Debug, Clone)]
22pub struct RawMrtRecord {
23 pub common_header: CommonHeader,
24 pub header_bytes: Bytes,
26 pub message_bytes: Bytes,
28}
29
30impl RawMrtRecord {
31 pub fn parse(self) -> Result<MrtRecord, ParserError> {
34 let message = parse_mrt_body(
35 self.common_header.entry_type as u16,
36 self.common_header.entry_subtype,
37 self.message_bytes,
38 )?;
39
40 Ok(MrtRecord {
41 common_header: self.common_header,
42 message,
43 })
44 }
45
46 pub fn raw_bytes(&self) -> Bytes {
59 let mut bytes = BytesMut::with_capacity(self.header_bytes.len() + self.message_bytes.len());
60 bytes.put_slice(&self.header_bytes);
61 bytes.put_slice(&self.message_bytes);
62 bytes.freeze()
63 }
64
65 pub fn write_raw_bytes<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
79 let mut file = File::create(path)?;
80 file.write_all(&self.header_bytes)?;
81 file.write_all(&self.message_bytes)?;
82 Ok(())
83 }
84
85 pub fn append_raw_bytes<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
101 let mut file = std::fs::OpenOptions::new()
102 .create(true)
103 .append(true)
104 .open(path)?;
105 file.write_all(&self.header_bytes)?;
106 file.write_all(&self.message_bytes)?;
107 Ok(())
108 }
109
110 pub fn total_bytes_len(&self) -> usize {
112 self.header_bytes.len() + self.message_bytes.len()
113 }
114}
115
116pub fn chunk_mrt_record(input: &mut impl Read) -> Result<RawMrtRecord, ParserErrorWithBytes> {
117 let parsed_header = match parse_common_header_with_bytes(input) {
119 Ok(v) => v,
120 Err(e) => {
121 if let ParserError::EofError(e) = &e {
122 if e.kind() == std::io::ErrorKind::UnexpectedEof {
123 return Err(ParserErrorWithBytes::from(ParserError::EofExpected));
124 }
125 }
126 return Err(ParserErrorWithBytes {
127 error: e,
128 bytes: None,
129 });
130 }
131 };
132
133 let common_header = parsed_header.header;
134 let header_bytes = parsed_header.raw_bytes;
135
136 const MAX_MRT_MESSAGE_LEN: u32 = 16 * 1024 * 1024; if common_header.length > MAX_MRT_MESSAGE_LEN {
139 return Err(ParserErrorWithBytes::from(ParserError::Unsupported(
140 format!("MRT message too large: {} bytes", common_header.length),
141 )));
142 }
143
144 let mut buffer = BytesMut::zeroed(common_header.length as usize);
146 match input
147 .take(common_header.length as u64)
148 .read_exact(&mut buffer)
149 {
150 Ok(_) => {}
151 Err(e) => {
152 return Err(ParserErrorWithBytes {
153 error: ParserError::IoError(e),
154 bytes: None,
155 })
156 }
157 }
158
159 Ok(RawMrtRecord {
160 common_header,
161 header_bytes,
162 message_bytes: buffer.freeze(),
163 })
164}
165
166pub fn parse_mrt_record(input: &mut impl Read) -> Result<MrtRecord, ParserErrorWithBytes> {
167 let raw_record = chunk_mrt_record(input)?;
168 match raw_record.parse() {
169 Ok(record) => Ok(record),
170 Err(e) => Err(ParserErrorWithBytes {
171 error: e,
172 bytes: None,
173 }),
174 }
175}
176
177pub fn parse_mrt_body(
183 entry_type: u16,
184 entry_subtype: u16,
185 data: Bytes,
186) -> Result<MrtMessage, ParserError> {
187 let etype = EntryType::try_from(entry_type)?;
188
189 let message: MrtMessage = match &etype {
190 EntryType::TABLE_DUMP => {
191 let msg = parse_table_dump_message(entry_subtype, data);
192 match msg {
193 Ok(msg) => MrtMessage::TableDumpMessage(msg),
194 Err(e) => {
195 return Err(e);
196 }
197 }
198 }
199 EntryType::TABLE_DUMP_V2 => {
200 let msg = parse_table_dump_v2_message(entry_subtype, data);
201 match msg {
202 Ok(msg) => MrtMessage::TableDumpV2Message(msg),
203 Err(e) => {
204 return Err(e);
205 }
206 }
207 }
208 EntryType::BGP4MP | EntryType::BGP4MP_ET => {
209 let msg = parse_bgp4mp(entry_subtype, data);
210 match msg {
211 Ok(msg) => MrtMessage::Bgp4Mp(msg),
212 Err(e) => {
213 return Err(e);
214 }
215 }
216 }
217 v => {
218 return Err(ParserError::Unsupported(format!(
220 "unsupported MRT type: {v:?}"
221 )));
222 }
223 };
224 Ok(message)
225}
226
227impl MrtRecord {
228 pub fn encode(&self) -> Bytes {
229 let message_bytes = self.message.encode(self.common_header.entry_subtype);
230 let mut new_header = self.common_header;
231 if message_bytes.len() != new_header.length as usize {
232 warn!(
233 "message length {} does not match the length in the header {} (encoding MRT record)",
234 message_bytes.len(),
235 new_header.length
236 );
237 }
238 new_header.length = message_bytes.len() as u32;
239 let header_bytes = new_header.encode();
240
241 let mut bytes = BytesMut::with_capacity(header_bytes.len() + message_bytes.len());
252 bytes.put_slice(&header_bytes);
253 bytes.put_slice(&message_bytes);
254 bytes.freeze()
255 }
256}
257
258impl TryFrom<&BmpMessage> for MrtRecord {
259 type Error = String;
260
261 fn try_from(bmp_message: &BmpMessage) -> Result<Self, Self::Error> {
262 let bgp_message = match &bmp_message.message_body {
263 BmpMessageBody::RouteMonitoring(m) => &m.bgp_message,
264 _ => return Err("unsupported bmp message type".to_string()),
265 };
266 let bmp_header = match &bmp_message.per_peer_header {
267 Some(h) => h,
268 None => return Err("missing per peer header".to_string()),
269 };
270
271 let local_ip = match bmp_header.peer_ip {
272 IpAddr::V4(_) => IpAddr::from_str("0.0.0.0").unwrap(),
273 IpAddr::V6(_) => IpAddr::from_str("::").unwrap(),
274 };
275 let local_asn = match bmp_header.peer_asn.is_four_byte() {
276 true => Asn::new_32bit(0),
277 false => Asn::new_16bit(0),
278 };
279
280 let bgp4mp_message = Bgp4MpMessage {
281 msg_type: Bgp4MpType::MessageAs4, peer_asn: bmp_header.peer_asn,
283 local_asn,
284 interface_index: 0,
285 peer_ip: bmp_header.peer_ip,
286 local_ip,
287 bgp_message: bgp_message.clone(),
288 };
289
290 let mrt_message = MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(bgp4mp_message));
291
292 let (seconds, microseconds) = convert_timestamp(bmp_header.timestamp);
293
294 let subtype = Bgp4MpType::MessageAs4 as u16;
295 let mrt_header = CommonHeader {
296 timestamp: seconds,
297 microsecond_timestamp: Some(microseconds),
298 entry_type: EntryType::BGP4MP_ET,
299 entry_subtype: Bgp4MpType::MessageAs4 as u16,
300 length: mrt_message.encode(subtype).len() as u32,
301 };
302
303 Ok(MrtRecord {
304 common_header: mrt_header,
305 message: mrt_message,
306 })
307 }
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313 use crate::bmp::messages::headers::{BmpPeerType, PeerFlags, PerPeerFlags};
314 use crate::bmp::messages::{BmpCommonHeader, BmpMsgType, BmpPerPeerHeader, RouteMonitoring};
315 use std::io::Cursor;
316 use std::net::Ipv4Addr;
317 use tempfile::tempdir;
318
319 #[test]
320 fn test_raw_mrt_record_raw_bytes() {
321 let header = CommonHeader {
322 timestamp: 1609459200,
323 microsecond_timestamp: None,
324 entry_type: EntryType::BGP4MP,
325 entry_subtype: 4,
326 length: 10,
327 };
328 let header_bytes = Bytes::from_static(&[
329 0x5f, 0xee, 0x6a, 0x80, 0x00, 0x10, 0x00, 0x04, 0x00, 0x00, 0x00, 0x0a, ]);
334 let message_bytes = Bytes::from_static(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
335
336 let raw_record = RawMrtRecord {
337 common_header: header,
338 header_bytes,
339 message_bytes,
340 };
341
342 let mrt_bytes = raw_record.raw_bytes();
343 assert_eq!(mrt_bytes.len(), 22);
345 assert_eq!(raw_record.total_bytes_len(), 22);
346 }
347
348 #[test]
349 fn test_raw_mrt_record_raw_bytes_with_et() {
350 let header = CommonHeader {
351 timestamp: 1609459200,
352 microsecond_timestamp: Some(500000),
353 entry_type: EntryType::BGP4MP_ET,
354 entry_subtype: 4,
355 length: 10,
356 };
357 let header_bytes = Bytes::from_static(&[
358 0x5f, 0xee, 0x6a, 0x80, 0x00, 0x11, 0x00, 0x04, 0x00, 0x00, 0x00, 0x0e, 0x00, 0x07, 0xa1, 0x20, ]);
364 let message_bytes = Bytes::from_static(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
365
366 let raw_record = RawMrtRecord {
367 common_header: header,
368 header_bytes,
369 message_bytes,
370 };
371
372 let mrt_bytes = raw_record.raw_bytes();
373 assert_eq!(mrt_bytes.len(), 26);
375 assert_eq!(raw_record.total_bytes_len(), 26);
376 }
377
378 #[test]
379 fn test_raw_mrt_record_write_to_file() {
380 let dir = tempdir().unwrap();
381 let file_path = dir.path().join("test_record.mrt");
382
383 let header = CommonHeader {
384 timestamp: 1609459200,
385 microsecond_timestamp: None,
386 entry_type: EntryType::BGP4MP,
387 entry_subtype: 4,
388 length: 5,
389 };
390 let header_bytes = Bytes::from_static(&[
391 0x5f, 0xee, 0x6a, 0x80, 0x00, 0x10, 0x00, 0x04, 0x00, 0x00, 0x00, 0x05, ]);
396 let message_bytes = Bytes::from_static(&[1, 2, 3, 4, 5]);
397
398 let raw_record = RawMrtRecord {
399 common_header: header,
400 header_bytes,
401 message_bytes,
402 };
403
404 raw_record.write_raw_bytes(&file_path).unwrap();
405
406 let written_bytes = std::fs::read(&file_path).unwrap();
407 assert_eq!(written_bytes.len(), 17); }
409
410 #[test]
411 fn test_raw_mrt_record_append_to_file() {
412 let dir = tempdir().unwrap();
413 let file_path = dir.path().join("test_records.mrt");
414
415 let header = CommonHeader {
416 timestamp: 1609459200,
417 microsecond_timestamp: None,
418 entry_type: EntryType::BGP4MP,
419 entry_subtype: 4,
420 length: 3,
421 };
422 let header_bytes = Bytes::from_static(&[
423 0x5f, 0xee, 0x6a, 0x80, 0x00, 0x10, 0x00, 0x04, 0x00, 0x00, 0x00, 0x03, ]);
428 let message_bytes = Bytes::from_static(&[1, 2, 3]);
429
430 let raw_record = RawMrtRecord {
431 common_header: header,
432 header_bytes,
433 message_bytes,
434 };
435
436 raw_record.append_raw_bytes(&file_path).unwrap();
437 raw_record.append_raw_bytes(&file_path).unwrap();
438
439 let written_bytes = std::fs::read(&file_path).unwrap();
440 assert_eq!(written_bytes.len(), 30); }
442
443 #[test]
444 fn test_try_from_bmp_message() {
445 let bmp_message = BmpMessage {
446 common_header: BmpCommonHeader {
447 version: 0,
448 msg_len: 0,
449 msg_type: BmpMsgType::RouteMonitoring,
450 },
451 per_peer_header: Some(BmpPerPeerHeader {
452 peer_asn: Asn::new_32bit(0),
453 peer_ip: IpAddr::from_str("10.0.0.1").unwrap(),
454 peer_bgp_id: Ipv4Addr::from_str("10.0.0.2").unwrap(),
455 timestamp: 0.0,
456 peer_type: BmpPeerType::Global,
457 peer_flags: PerPeerFlags::PeerFlags(PeerFlags::empty()),
458 peer_distinguisher: 0,
459 }),
460 message_body: BmpMessageBody::RouteMonitoring(RouteMonitoring {
461 bgp_message: BgpMessage::KeepAlive,
462 }),
463 };
464
465 let mrt_record = MrtRecord::try_from(&bmp_message).unwrap();
466 assert_eq!(mrt_record.common_header.entry_type, EntryType::BGP4MP_ET);
467 }
468
469 #[test]
470 fn test_parse_mrt_body() {
471 let mut data = BytesMut::new();
472 data.put_u16(0);
473 data.put_u16(0);
474 data.put_u32(0);
475 data.put_u16(0);
476
477 let result = parse_mrt_body(0, 0, data.freeze());
478 assert!(result.is_err());
479 }
480
481 #[test]
482 fn test_mrt_record_encode_updates_header_length() {
483 let record = MrtRecord {
484 common_header: CommonHeader {
485 timestamp: 1609459200,
486 microsecond_timestamp: None,
487 entry_type: EntryType::BGP4MP,
488 entry_subtype: Bgp4MpType::MessageAs4 as u16,
489 length: 0,
490 },
491 message: MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(Bgp4MpMessage {
492 msg_type: Bgp4MpType::MessageAs4,
493 peer_asn: Asn::new_32bit(65000),
494 local_asn: Asn::new_32bit(65001),
495 interface_index: 1,
496 peer_ip: IpAddr::from_str("10.0.0.1").unwrap(),
497 local_ip: IpAddr::from_str("10.0.0.2").unwrap(),
498 bgp_message: BgpMessage::KeepAlive,
499 })),
500 };
501
502 let encoded = record.encode();
503 let mut cursor = Cursor::new(encoded);
504 let parsed = parse_mrt_record(&mut cursor).unwrap();
505 let expected_len = parsed
506 .message
507 .encode(parsed.common_header.entry_subtype)
508 .len() as u32;
509
510 assert_eq!(parsed.common_header.length, expected_len);
511 }
512}