1use crate::error::ParserError;
45use crate::models::*;
46use crate::parser::BgpkitParser;
47use crate::Elementor;
48use log::{error, warn};
49use std::io::Read;
50use std::net::IpAddr;
51
52#[derive(Debug, Clone, PartialEq)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59pub struct Bgp4MpUpdate {
60 pub timestamp: f64,
62 pub peer_ip: IpAddr,
64 pub peer_asn: Asn,
66 pub message: BgpUpdateMessage,
68}
69
70#[derive(Debug, Clone, PartialEq)]
75#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
76pub struct TableDumpV2Entry {
77 pub timestamp: f64,
79 pub rib_type: TableDumpV2Type,
81 pub sequence_number: u32,
83 pub prefix: NetworkPrefix,
85 pub rib_entries: Vec<RibEntry>,
88}
89
90#[derive(Debug, Clone, PartialEq)]
97#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
98pub enum MrtUpdate {
99 Bgp4MpUpdate(Bgp4MpUpdate),
101 TableDumpV2Entry(TableDumpV2Entry),
103 TableDumpMessage(TableDumpMessage),
105}
106
107impl MrtUpdate {
108 pub fn timestamp(&self) -> f64 {
110 match self {
111 MrtUpdate::Bgp4MpUpdate(u) => u.timestamp,
112 MrtUpdate::TableDumpV2Entry(e) => e.timestamp,
113 MrtUpdate::TableDumpMessage(m) => m.originated_time as f64,
114 }
115 }
116}
117
118pub struct UpdateIterator<R> {
127 parser: BgpkitParser<R>,
128 elementor: Elementor,
129}
130
131impl<R> UpdateIterator<R> {
132 pub(crate) fn new(parser: BgpkitParser<R>) -> Self {
133 UpdateIterator {
134 parser,
135 elementor: Elementor::new(),
136 }
137 }
138}
139
140impl<R: Read> Iterator for UpdateIterator<R> {
141 type Item = MrtUpdate;
142
143 fn next(&mut self) -> Option<MrtUpdate> {
144 loop {
145 let record = match self.parser.next_record() {
146 Ok(record) => record,
147 Err(e) => match e.error {
148 ParserError::TruncatedMsg(err_str) | ParserError::Unsupported(err_str) => {
149 if self.parser.options.show_warnings {
150 warn!("parser warn: {}", err_str);
151 }
152 if self.parser.core_dump {
153 if let Some(bytes) = e.bytes {
154 std::fs::write("mrt_core_dump", bytes)
155 .expect("Unable to write to mrt_core_dump");
156 }
157 }
158 continue;
159 }
160 ParserError::ParseError(err_str) => {
161 error!("parser error: {}", err_str);
162 if self.parser.core_dump {
163 if let Some(bytes) = e.bytes {
164 std::fs::write("mrt_core_dump", bytes)
165 .expect("Unable to write to mrt_core_dump");
166 }
167 return None;
168 }
169 continue;
170 }
171 ParserError::EofExpected => return None,
172 ParserError::IoError(err) | ParserError::EofError(err) => {
173 error!("{:?}", err);
174 if self.parser.core_dump {
175 if let Some(bytes) = e.bytes {
176 std::fs::write("mrt_core_dump", bytes)
177 .expect("Unable to write to mrt_core_dump");
178 }
179 }
180 return None;
181 }
182 #[cfg(feature = "oneio")]
183 ParserError::OneIoError(_) => return None,
184 ParserError::FilterError(_) => return None,
185 ParserError::InvalidLabeledNlriLength
187 | ParserError::TruncatedLabeledNlri
188 | ParserError::TruncatedPrefix
189 | ParserError::MaxLabelStackDepthExceeded
190 | ParserError::PeerMaxLabelsExceeded
191 | ParserError::InvalidPrefix => {
192 if self.parser.options.show_warnings {
193 warn!("parser warn: labeled NLRI parsing error: {:?}", e.error);
194 }
195 continue;
196 }
197 },
198 };
199
200 let t = record.common_header.timestamp;
201 let timestamp: f64 = if let Some(micro) = &record.common_header.microsecond_timestamp {
202 let m = (*micro as f64) / 1_000_000.0;
203 t as f64 + m
204 } else {
205 f64::from(t)
206 };
207
208 match record.message {
209 MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(msg)) => {
210 if let BgpMessage::Update(update) = msg.bgp_message {
211 return Some(MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate {
212 timestamp,
213 peer_ip: msg.peer_ip,
214 peer_asn: msg.peer_asn,
215 message: update,
216 }));
217 }
218 continue;
220 }
221 MrtMessage::Bgp4Mp(Bgp4MpEnum::StateChange(_)) => {
222 continue;
224 }
225 MrtMessage::TableDumpV2Message(msg) => {
226 match msg {
227 TableDumpV2Message::PeerIndexTable(p) => {
228 self.elementor.peer_table = Some(p);
230 continue;
231 }
232 TableDumpV2Message::RibAfi(entries) => {
233 return Some(MrtUpdate::TableDumpV2Entry(TableDumpV2Entry {
234 timestamp,
235 rib_type: entries.rib_type,
236 sequence_number: entries.sequence_number,
237 prefix: entries.prefix,
238 rib_entries: entries.rib_entries,
239 }));
240 }
241 TableDumpV2Message::RibGeneric(_) => {
242 continue;
244 }
245 TableDumpV2Message::GeoPeerTable(_) => {
246 continue;
248 }
249 }
250 }
251 MrtMessage::TableDumpMessage(msg) => {
252 return Some(MrtUpdate::TableDumpMessage(msg));
253 }
254 }
255 }
256 }
257}
258
259pub struct FallibleUpdateIterator<R> {
264 parser: BgpkitParser<R>,
265 elementor: Elementor,
266}
267
268impl<R> FallibleUpdateIterator<R> {
269 pub(crate) fn new(parser: BgpkitParser<R>) -> Self {
270 FallibleUpdateIterator {
271 parser,
272 elementor: Elementor::new(),
273 }
274 }
275}
276
277impl<R: Read> Iterator for FallibleUpdateIterator<R> {
278 type Item = Result<MrtUpdate, crate::error::ParserErrorWithBytes>;
279
280 fn next(&mut self) -> Option<Self::Item> {
281 loop {
282 match self.parser.next_record() {
283 Ok(record) => {
284 let t = record.common_header.timestamp;
285 let timestamp: f64 =
286 if let Some(micro) = &record.common_header.microsecond_timestamp {
287 let m = (*micro as f64) / 1_000_000.0;
288 t as f64 + m
289 } else {
290 f64::from(t)
291 };
292
293 match record.message {
294 MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(msg)) => {
295 if let BgpMessage::Update(update) = msg.bgp_message {
296 return Some(Ok(MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate {
297 timestamp,
298 peer_ip: msg.peer_ip,
299 peer_asn: msg.peer_asn,
300 message: update,
301 })));
302 }
303 continue;
304 }
305 MrtMessage::Bgp4Mp(Bgp4MpEnum::StateChange(_)) => {
306 continue;
307 }
308 MrtMessage::TableDumpV2Message(msg) => match msg {
309 TableDumpV2Message::PeerIndexTable(p) => {
310 self.elementor.peer_table = Some(p);
311 continue;
312 }
313 TableDumpV2Message::RibAfi(entries) => {
314 return Some(Ok(MrtUpdate::TableDumpV2Entry(TableDumpV2Entry {
315 timestamp,
316 rib_type: entries.rib_type,
317 sequence_number: entries.sequence_number,
318 prefix: entries.prefix,
319 rib_entries: entries.rib_entries,
320 })));
321 }
322 TableDumpV2Message::RibGeneric(_) => {
323 continue;
324 }
325 TableDumpV2Message::GeoPeerTable(_) => {
326 continue;
327 }
328 },
329 MrtMessage::TableDumpMessage(msg) => {
330 return Some(Ok(MrtUpdate::TableDumpMessage(msg)));
331 }
332 }
333 }
334 Err(e) if matches!(e.error, ParserError::EofExpected) => {
335 return None;
336 }
337 Err(e) => {
338 return Some(Err(e));
339 }
340 }
341 }
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use std::io::Cursor;
349
350 #[test]
351 fn test_bgp4mp_update_struct() {
352 let update = Bgp4MpUpdate {
353 timestamp: 1234567890.123456,
354 peer_ip: "192.0.2.1".parse().unwrap(),
355 peer_asn: Asn::new_32bit(65000),
356 message: BgpUpdateMessage::default(),
357 };
358
359 assert_eq!(update.timestamp, 1234567890.123456);
360 assert_eq!(update.peer_ip.to_string(), "192.0.2.1");
361 assert_eq!(update.peer_asn, Asn::new_32bit(65000));
362 }
363
364 #[test]
365 fn test_table_dump_v2_entry_struct() {
366 let entry = TableDumpV2Entry {
367 timestamp: 1234567890.0,
368 rib_type: TableDumpV2Type::RibIpv4Unicast,
369 sequence_number: 42,
370 prefix: "10.0.0.0/8".parse().unwrap(),
371 rib_entries: vec![],
372 };
373
374 assert_eq!(entry.timestamp, 1234567890.0);
375 assert_eq!(entry.rib_type, TableDumpV2Type::RibIpv4Unicast);
376 assert_eq!(entry.sequence_number, 42);
377 assert_eq!(entry.prefix.to_string(), "10.0.0.0/8");
378 assert!(entry.rib_entries.is_empty());
379 }
380
381 #[test]
382 fn test_mrt_update_timestamp() {
383 let bgp4mp = MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate {
385 timestamp: 1234567890.5,
386 peer_ip: "192.0.2.1".parse().unwrap(),
387 peer_asn: Asn::new_32bit(65000),
388 message: BgpUpdateMessage::default(),
389 });
390 assert_eq!(bgp4mp.timestamp(), 1234567890.5);
391
392 let table_dump_v2 = MrtUpdate::TableDumpV2Entry(TableDumpV2Entry {
394 timestamp: 1234567891.5,
395 rib_type: TableDumpV2Type::RibIpv4Unicast,
396 sequence_number: 1,
397 prefix: "10.0.0.0/8".parse().unwrap(),
398 rib_entries: vec![],
399 });
400 assert_eq!(table_dump_v2.timestamp(), 1234567891.5);
401
402 let table_dump_v1 = MrtUpdate::TableDumpMessage(TableDumpMessage {
404 view_number: 0,
405 sequence_number: 1,
406 prefix: "192.168.0.0/16".parse().unwrap(),
407 status: 1,
408 originated_time: 1234567892,
409 peer_ip: "10.0.0.1".parse().unwrap(),
410 peer_asn: Asn::new_32bit(65001),
411 attributes: Attributes::default(),
412 });
413 assert_eq!(table_dump_v1.timestamp(), 1234567892.0);
414 }
415
416 #[test]
417 fn test_update_iterator_empty() {
418 let cursor = Cursor::new(vec![]);
419 let parser = BgpkitParser::from_reader(cursor);
420 let mut iter = UpdateIterator::new(parser);
421
422 assert!(iter.next().is_none());
423 }
424
425 #[test]
426 fn test_fallible_update_iterator_empty() {
427 let cursor = Cursor::new(vec![]);
428 let parser = BgpkitParser::from_reader(cursor);
429 let mut iter = FallibleUpdateIterator::new(parser);
430
431 assert!(iter.next().is_none());
432 }
433
434 #[test]
435 fn test_bgp4mp_update_clone_and_debug() {
436 let update = Bgp4MpUpdate {
437 timestamp: 1234567890.123456,
438 peer_ip: "192.0.2.1".parse().unwrap(),
439 peer_asn: Asn::new_32bit(65000),
440 message: BgpUpdateMessage::default(),
441 };
442
443 let cloned = update.clone();
445 assert_eq!(update, cloned);
446
447 let debug_str = format!("{:?}", update);
449 assert!(debug_str.contains("Bgp4MpUpdate"));
450 assert!(debug_str.contains("192.0.2.1"));
451 }
452
453 #[test]
454 fn test_table_dump_v2_entry_clone_and_debug() {
455 let entry = TableDumpV2Entry {
456 timestamp: 1234567890.0,
457 rib_type: TableDumpV2Type::RibIpv4Unicast,
458 sequence_number: 42,
459 prefix: "10.0.0.0/8".parse().unwrap(),
460 rib_entries: vec![],
461 };
462
463 let cloned = entry.clone();
465 assert_eq!(entry, cloned);
466
467 let debug_str = format!("{:?}", entry);
469 assert!(debug_str.contains("TableDumpV2Entry"));
470 assert!(debug_str.contains("10.0.0.0/8"));
471 }
472
473 #[test]
474 fn test_mrt_update_clone_and_debug() {
475 let update = MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate {
476 timestamp: 1234567890.5,
477 peer_ip: "192.0.2.1".parse().unwrap(),
478 peer_asn: Asn::new_32bit(65000),
479 message: BgpUpdateMessage::default(),
480 });
481
482 let cloned = update.clone();
484 assert_eq!(update, cloned);
485
486 let debug_str = format!("{:?}", update);
488 assert!(debug_str.contains("Bgp4MpUpdate"));
489 }
490
491 #[test]
492 fn test_fallible_update_iterator_with_invalid_data() {
493 let invalid_data = vec![
495 0x00, 0x00, 0x00, 0x00, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, ];
501
502 let cursor = Cursor::new(invalid_data);
503 let parser = BgpkitParser::from_reader(cursor);
504 let mut iter = FallibleUpdateIterator::new(parser);
505
506 let result = iter.next();
508 assert!(result.is_some());
509 assert!(result.unwrap().is_err());
510 }
511
512 #[test]
513 fn test_mrt_update_enum_variants() {
514 let updates: Vec<MrtUpdate> = vec![
516 MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate {
517 timestamp: 1.0,
518 peer_ip: "192.0.2.1".parse().unwrap(),
519 peer_asn: Asn::new_32bit(65000),
520 message: BgpUpdateMessage::default(),
521 }),
522 MrtUpdate::TableDumpV2Entry(TableDumpV2Entry {
523 timestamp: 2.0,
524 rib_type: TableDumpV2Type::RibIpv6Unicast,
525 sequence_number: 1,
526 prefix: "2001:db8::/32".parse().unwrap(),
527 rib_entries: vec![],
528 }),
529 MrtUpdate::TableDumpMessage(TableDumpMessage {
530 view_number: 0,
531 sequence_number: 1,
532 prefix: "10.0.0.0/8".parse().unwrap(),
533 status: 1,
534 originated_time: 3,
535 peer_ip: "10.0.0.1".parse().unwrap(),
536 peer_asn: Asn::new_32bit(65001),
537 attributes: Attributes::default(),
538 }),
539 ];
540
541 for (i, update) in updates.iter().enumerate() {
542 match update {
543 MrtUpdate::Bgp4MpUpdate(_) => assert_eq!(i, 0),
544 MrtUpdate::TableDumpV2Entry(_) => assert_eq!(i, 1),
545 MrtUpdate::TableDumpMessage(_) => assert_eq!(i, 2),
546 }
547 }
548 }
549
550 #[test]
551 #[cfg(feature = "serde")]
552 fn test_bgp4mp_update_serde() {
553 let update = Bgp4MpUpdate {
554 timestamp: 1234567890.123456,
555 peer_ip: "192.0.2.1".parse().unwrap(),
556 peer_asn: Asn::new_32bit(65000),
557 message: BgpUpdateMessage::default(),
558 };
559
560 let serialized = serde_json::to_string(&update).unwrap();
561 let deserialized: Bgp4MpUpdate = serde_json::from_str(&serialized).unwrap();
562 assert_eq!(update, deserialized);
563 }
564
565 #[test]
566 #[cfg(feature = "serde")]
567 fn test_table_dump_v2_entry_serde() {
568 let entry = TableDumpV2Entry {
569 timestamp: 1234567890.0,
570 rib_type: TableDumpV2Type::RibIpv4Unicast,
571 sequence_number: 42,
572 prefix: "10.0.0.0/8".parse().unwrap(),
573 rib_entries: vec![],
574 };
575
576 let serialized = serde_json::to_string(&entry).unwrap();
577 let deserialized: TableDumpV2Entry = serde_json::from_str(&serialized).unwrap();
578 assert_eq!(entry, deserialized);
579 }
580
581 #[test]
582 #[cfg(feature = "serde")]
583 fn test_mrt_update_serde() {
584 let update = MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate {
585 timestamp: 1234567890.5,
586 peer_ip: "192.0.2.1".parse().unwrap(),
587 peer_asn: Asn::new_32bit(65000),
588 message: BgpUpdateMessage::default(),
589 });
590
591 let serialized = serde_json::to_string(&update).unwrap();
592 let deserialized: MrtUpdate = serde_json::from_str(&serialized).unwrap();
593 assert_eq!(update, deserialized);
594 }
595
596 #[test]
598 fn test_update_iterator_with_updates_file() {
599 let url = "https://spaces.bgpkit.org/parser/update-example";
600 let parser = BgpkitParser::new(url).unwrap();
601
602 let mut bgp4mp_count = 0;
603 let mut total_announced = 0;
604 let mut total_withdrawn = 0;
605
606 for update in parser.into_update_iter() {
607 match update {
608 MrtUpdate::Bgp4MpUpdate(u) => {
609 bgp4mp_count += 1;
610 total_announced += u.message.announced_prefixes.len();
611 total_withdrawn += u.message.withdrawn_prefixes.len();
612 for attr in &u.message.attributes {
614 match attr {
615 AttributeValue::MpReachNlri(nlri) => {
616 total_announced += nlri.prefixes.len();
617 }
618 AttributeValue::MpUnreachNlri(nlri) => {
619 total_withdrawn += nlri.prefixes.len();
620 }
621 _ => {}
622 }
623 }
624 }
625 MrtUpdate::TableDumpV2Entry(_) => {
626 panic!("Should not see TableDumpV2Entry in UPDATES file");
627 }
628 MrtUpdate::TableDumpMessage(_) => {
629 panic!("Should not see TableDumpMessage in UPDATES file");
630 }
631 }
632 }
633
634 assert!(bgp4mp_count > 0, "Should have parsed some BGP4MP updates");
636 assert!(
637 total_announced + total_withdrawn > 0,
638 "Should have some prefixes"
639 );
640 }
641
642 #[test]
644 fn test_update_iterator_with_rib_file() {
645 let url = "https://spaces.bgpkit.org/parser/rib-example-small.bz2";
646 let parser = BgpkitParser::new(url).unwrap();
647
648 let mut rib_entry_count = 0;
649 let mut total_rib_entries = 0;
650
651 for update in parser.into_update_iter().take(100) {
652 match update {
653 MrtUpdate::Bgp4MpUpdate(_) => {
654 panic!("Should not see Bgp4MpUpdate in RIB file");
655 }
656 MrtUpdate::TableDumpV2Entry(e) => {
657 rib_entry_count += 1;
658 total_rib_entries += e.rib_entries.len();
659 assert!(e.sequence_number > 0 || rib_entry_count == 1);
661 }
662 MrtUpdate::TableDumpMessage(_) => {
663 }
665 }
666 }
667
668 assert!(rib_entry_count > 0, "Should have parsed some RIB entries");
670 assert!(
671 total_rib_entries > 0,
672 "Should have some RIB entries per prefix"
673 );
674 }
675
676 #[test]
678 fn test_fallible_update_iterator_with_updates_file() {
679 let url = "https://spaces.bgpkit.org/parser/update-example";
680 let parser = BgpkitParser::new(url).unwrap();
681
682 let mut success_count = 0;
683 let mut error_count = 0;
684
685 for result in parser.into_fallible_update_iter() {
686 match result {
687 Ok(_) => success_count += 1,
688 Err(_) => error_count += 1,
689 }
690 }
691
692 assert!(
693 success_count > 0,
694 "Should have parsed some updates successfully"
695 );
696 assert_eq!(
698 error_count, 0,
699 "Should have no parsing errors in valid file"
700 );
701 }
702
703 #[test]
705 fn test_update_iter_vs_elem_iter_consistency() {
706 let url = "https://spaces.bgpkit.org/parser/update-example";
707
708 let parser1 = BgpkitParser::new(url).unwrap();
710 let mut update_iter_announced = 0;
711 let mut update_iter_withdrawn = 0;
712
713 for update in parser1.into_update_iter() {
714 if let MrtUpdate::Bgp4MpUpdate(u) = update {
715 update_iter_announced += u.message.announced_prefixes.len();
716 update_iter_withdrawn += u.message.withdrawn_prefixes.len();
717 for attr in &u.message.attributes {
718 match attr {
719 AttributeValue::MpReachNlri(nlri) => {
720 update_iter_announced += nlri.prefixes.len();
721 }
722 AttributeValue::MpUnreachNlri(nlri) => {
723 update_iter_withdrawn += nlri.prefixes.len();
724 }
725 _ => {}
726 }
727 }
728 }
729 }
730
731 let parser2 = BgpkitParser::new(url).unwrap();
733 let mut elem_iter_announced = 0;
734 let mut elem_iter_withdrawn = 0;
735
736 for elem in parser2.into_elem_iter() {
737 match elem.elem_type {
738 ElemType::ANNOUNCE => elem_iter_announced += 1,
739 ElemType::WITHDRAW => elem_iter_withdrawn += 1,
740 }
741 }
742
743 assert_eq!(
745 update_iter_announced, elem_iter_announced,
746 "Announced prefix counts should match"
747 );
748 assert_eq!(
749 update_iter_withdrawn, elem_iter_withdrawn,
750 "Withdrawn prefix counts should match"
751 );
752 }
753}