1use crate::error::ParserError;
45use crate::models::*;
46use crate::parser::BgpkitParser;
47use crate::Elementor;
48use log::{error, warn};
49use std::io::Read;
50use std::net::IpAddr;
51
52#[derive(Debug, Clone, PartialEq)]
58#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
59pub struct Bgp4MpUpdate {
60 pub timestamp: f64,
62 pub peer_ip: IpAddr,
64 pub peer_asn: Asn,
66 pub message: BgpUpdateMessage,
68}
69
70#[derive(Debug, Clone, PartialEq)]
75#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
76pub struct TableDumpV2Entry {
77 pub timestamp: f64,
79 pub rib_type: TableDumpV2Type,
81 pub sequence_number: u32,
83 pub prefix: NetworkPrefix,
85 pub rib_entries: Vec<RibEntry>,
88}
89
90#[derive(Debug, Clone, PartialEq)]
97#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
98pub enum MrtUpdate {
99 Bgp4MpUpdate(Bgp4MpUpdate),
101 TableDumpV2Entry(TableDumpV2Entry),
103 TableDumpMessage(TableDumpMessage),
105}
106
107impl MrtUpdate {
108 pub fn timestamp(&self) -> f64 {
110 match self {
111 MrtUpdate::Bgp4MpUpdate(u) => u.timestamp,
112 MrtUpdate::TableDumpV2Entry(e) => e.timestamp,
113 MrtUpdate::TableDumpMessage(m) => m.originated_time as f64,
114 }
115 }
116}
117
118pub struct UpdateIterator<R> {
127 parser: BgpkitParser<R>,
128 elementor: Elementor,
129}
130
131impl<R> UpdateIterator<R> {
132 pub(crate) fn new(parser: BgpkitParser<R>) -> Self {
133 UpdateIterator {
134 parser,
135 elementor: Elementor::new(),
136 }
137 }
138}
139
140impl<R: Read> Iterator for UpdateIterator<R> {
141 type Item = MrtUpdate;
142
143 fn next(&mut self) -> Option<MrtUpdate> {
144 loop {
145 let record = match self.parser.next_record() {
146 Ok(record) => record,
147 Err(e) => match e.error {
148 ParserError::TruncatedMsg(err_str) | ParserError::Unsupported(err_str) => {
149 if self.parser.options.show_warnings {
150 warn!("parser warn: {}", err_str);
151 }
152 if self.parser.core_dump {
153 if let Some(bytes) = e.bytes {
154 std::fs::write("mrt_core_dump", bytes)
155 .expect("Unable to write to mrt_core_dump");
156 }
157 }
158 continue;
159 }
160 ParserError::ParseError(err_str) => {
161 error!("parser error: {}", err_str);
162 if self.parser.core_dump {
163 if let Some(bytes) = e.bytes {
164 std::fs::write("mrt_core_dump", bytes)
165 .expect("Unable to write to mrt_core_dump");
166 }
167 return None;
168 }
169 continue;
170 }
171 ParserError::EofExpected => return None,
172 ParserError::IoError(err) | ParserError::EofError(err) => {
173 error!("{:?}", err);
174 if self.parser.core_dump {
175 if let Some(bytes) = e.bytes {
176 std::fs::write("mrt_core_dump", bytes)
177 .expect("Unable to write to mrt_core_dump");
178 }
179 }
180 return None;
181 }
182 #[cfg(feature = "oneio")]
183 ParserError::OneIoError(_) => return None,
184 ParserError::FilterError(_) => return None,
185 },
186 };
187
188 let t = record.common_header.timestamp;
189 let timestamp: f64 = if let Some(micro) = &record.common_header.microsecond_timestamp {
190 let m = (*micro as f64) / 1_000_000.0;
191 t as f64 + m
192 } else {
193 f64::from(t)
194 };
195
196 match record.message {
197 MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(msg)) => {
198 if let BgpMessage::Update(update) = msg.bgp_message {
199 return Some(MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate {
200 timestamp,
201 peer_ip: msg.peer_ip,
202 peer_asn: msg.peer_asn,
203 message: update,
204 }));
205 }
206 continue;
208 }
209 MrtMessage::Bgp4Mp(Bgp4MpEnum::StateChange(_)) => {
210 continue;
212 }
213 MrtMessage::TableDumpV2Message(msg) => {
214 match msg {
215 TableDumpV2Message::PeerIndexTable(p) => {
216 self.elementor.peer_table = Some(p);
218 continue;
219 }
220 TableDumpV2Message::RibAfi(entries) => {
221 return Some(MrtUpdate::TableDumpV2Entry(TableDumpV2Entry {
222 timestamp,
223 rib_type: entries.rib_type,
224 sequence_number: entries.sequence_number,
225 prefix: entries.prefix,
226 rib_entries: entries.rib_entries,
227 }));
228 }
229 TableDumpV2Message::RibGeneric(_) => {
230 continue;
232 }
233 TableDumpV2Message::GeoPeerTable(_) => {
234 continue;
236 }
237 }
238 }
239 MrtMessage::TableDumpMessage(msg) => {
240 return Some(MrtUpdate::TableDumpMessage(msg));
241 }
242 }
243 }
244 }
245}
246
247pub struct FallibleUpdateIterator<R> {
252 parser: BgpkitParser<R>,
253 elementor: Elementor,
254}
255
256impl<R> FallibleUpdateIterator<R> {
257 pub(crate) fn new(parser: BgpkitParser<R>) -> Self {
258 FallibleUpdateIterator {
259 parser,
260 elementor: Elementor::new(),
261 }
262 }
263}
264
265impl<R: Read> Iterator for FallibleUpdateIterator<R> {
266 type Item = Result<MrtUpdate, crate::error::ParserErrorWithBytes>;
267
268 fn next(&mut self) -> Option<Self::Item> {
269 loop {
270 match self.parser.next_record() {
271 Ok(record) => {
272 let t = record.common_header.timestamp;
273 let timestamp: f64 =
274 if let Some(micro) = &record.common_header.microsecond_timestamp {
275 let m = (*micro as f64) / 1_000_000.0;
276 t as f64 + m
277 } else {
278 f64::from(t)
279 };
280
281 match record.message {
282 MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(msg)) => {
283 if let BgpMessage::Update(update) = msg.bgp_message {
284 return Some(Ok(MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate {
285 timestamp,
286 peer_ip: msg.peer_ip,
287 peer_asn: msg.peer_asn,
288 message: update,
289 })));
290 }
291 continue;
292 }
293 MrtMessage::Bgp4Mp(Bgp4MpEnum::StateChange(_)) => {
294 continue;
295 }
296 MrtMessage::TableDumpV2Message(msg) => match msg {
297 TableDumpV2Message::PeerIndexTable(p) => {
298 self.elementor.peer_table = Some(p);
299 continue;
300 }
301 TableDumpV2Message::RibAfi(entries) => {
302 return Some(Ok(MrtUpdate::TableDumpV2Entry(TableDumpV2Entry {
303 timestamp,
304 rib_type: entries.rib_type,
305 sequence_number: entries.sequence_number,
306 prefix: entries.prefix,
307 rib_entries: entries.rib_entries,
308 })));
309 }
310 TableDumpV2Message::RibGeneric(_) => {
311 continue;
312 }
313 TableDumpV2Message::GeoPeerTable(_) => {
314 continue;
315 }
316 },
317 MrtMessage::TableDumpMessage(msg) => {
318 return Some(Ok(MrtUpdate::TableDumpMessage(msg)));
319 }
320 }
321 }
322 Err(e) if matches!(e.error, ParserError::EofExpected) => {
323 return None;
324 }
325 Err(e) => {
326 return Some(Err(e));
327 }
328 }
329 }
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use std::io::Cursor;
337
338 #[test]
339 fn test_bgp4mp_update_struct() {
340 let update = Bgp4MpUpdate {
341 timestamp: 1234567890.123456,
342 peer_ip: "192.0.2.1".parse().unwrap(),
343 peer_asn: Asn::new_32bit(65000),
344 message: BgpUpdateMessage::default(),
345 };
346
347 assert_eq!(update.timestamp, 1234567890.123456);
348 assert_eq!(update.peer_ip.to_string(), "192.0.2.1");
349 assert_eq!(update.peer_asn, Asn::new_32bit(65000));
350 }
351
352 #[test]
353 fn test_table_dump_v2_entry_struct() {
354 let entry = TableDumpV2Entry {
355 timestamp: 1234567890.0,
356 rib_type: TableDumpV2Type::RibIpv4Unicast,
357 sequence_number: 42,
358 prefix: "10.0.0.0/8".parse().unwrap(),
359 rib_entries: vec![],
360 };
361
362 assert_eq!(entry.timestamp, 1234567890.0);
363 assert_eq!(entry.rib_type, TableDumpV2Type::RibIpv4Unicast);
364 assert_eq!(entry.sequence_number, 42);
365 assert_eq!(entry.prefix.to_string(), "10.0.0.0/8");
366 assert!(entry.rib_entries.is_empty());
367 }
368
369 #[test]
370 fn test_mrt_update_timestamp() {
371 let bgp4mp = MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate {
373 timestamp: 1234567890.5,
374 peer_ip: "192.0.2.1".parse().unwrap(),
375 peer_asn: Asn::new_32bit(65000),
376 message: BgpUpdateMessage::default(),
377 });
378 assert_eq!(bgp4mp.timestamp(), 1234567890.5);
379
380 let table_dump_v2 = MrtUpdate::TableDumpV2Entry(TableDumpV2Entry {
382 timestamp: 1234567891.5,
383 rib_type: TableDumpV2Type::RibIpv4Unicast,
384 sequence_number: 1,
385 prefix: "10.0.0.0/8".parse().unwrap(),
386 rib_entries: vec![],
387 });
388 assert_eq!(table_dump_v2.timestamp(), 1234567891.5);
389
390 let table_dump_v1 = MrtUpdate::TableDumpMessage(TableDumpMessage {
392 view_number: 0,
393 sequence_number: 1,
394 prefix: "192.168.0.0/16".parse().unwrap(),
395 status: 1,
396 originated_time: 1234567892,
397 peer_ip: "10.0.0.1".parse().unwrap(),
398 peer_asn: Asn::new_32bit(65001),
399 attributes: Attributes::default(),
400 });
401 assert_eq!(table_dump_v1.timestamp(), 1234567892.0);
402 }
403
404 #[test]
405 fn test_update_iterator_empty() {
406 let cursor = Cursor::new(vec![]);
407 let parser = BgpkitParser::from_reader(cursor);
408 let mut iter = UpdateIterator::new(parser);
409
410 assert!(iter.next().is_none());
411 }
412
413 #[test]
414 fn test_fallible_update_iterator_empty() {
415 let cursor = Cursor::new(vec![]);
416 let parser = BgpkitParser::from_reader(cursor);
417 let mut iter = FallibleUpdateIterator::new(parser);
418
419 assert!(iter.next().is_none());
420 }
421
422 #[test]
423 fn test_bgp4mp_update_clone_and_debug() {
424 let update = Bgp4MpUpdate {
425 timestamp: 1234567890.123456,
426 peer_ip: "192.0.2.1".parse().unwrap(),
427 peer_asn: Asn::new_32bit(65000),
428 message: BgpUpdateMessage::default(),
429 };
430
431 let cloned = update.clone();
433 assert_eq!(update, cloned);
434
435 let debug_str = format!("{:?}", update);
437 assert!(debug_str.contains("Bgp4MpUpdate"));
438 assert!(debug_str.contains("192.0.2.1"));
439 }
440
441 #[test]
442 fn test_table_dump_v2_entry_clone_and_debug() {
443 let entry = TableDumpV2Entry {
444 timestamp: 1234567890.0,
445 rib_type: TableDumpV2Type::RibIpv4Unicast,
446 sequence_number: 42,
447 prefix: "10.0.0.0/8".parse().unwrap(),
448 rib_entries: vec![],
449 };
450
451 let cloned = entry.clone();
453 assert_eq!(entry, cloned);
454
455 let debug_str = format!("{:?}", entry);
457 assert!(debug_str.contains("TableDumpV2Entry"));
458 assert!(debug_str.contains("10.0.0.0/8"));
459 }
460
461 #[test]
462 fn test_mrt_update_clone_and_debug() {
463 let update = MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate {
464 timestamp: 1234567890.5,
465 peer_ip: "192.0.2.1".parse().unwrap(),
466 peer_asn: Asn::new_32bit(65000),
467 message: BgpUpdateMessage::default(),
468 });
469
470 let cloned = update.clone();
472 assert_eq!(update, cloned);
473
474 let debug_str = format!("{:?}", update);
476 assert!(debug_str.contains("Bgp4MpUpdate"));
477 }
478
479 #[test]
480 fn test_fallible_update_iterator_with_invalid_data() {
481 let invalid_data = vec![
483 0x00, 0x00, 0x00, 0x00, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, ];
489
490 let cursor = Cursor::new(invalid_data);
491 let parser = BgpkitParser::from_reader(cursor);
492 let mut iter = FallibleUpdateIterator::new(parser);
493
494 let result = iter.next();
496 assert!(result.is_some());
497 assert!(result.unwrap().is_err());
498 }
499
500 #[test]
501 fn test_mrt_update_enum_variants() {
502 let updates: Vec<MrtUpdate> = vec![
504 MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate {
505 timestamp: 1.0,
506 peer_ip: "192.0.2.1".parse().unwrap(),
507 peer_asn: Asn::new_32bit(65000),
508 message: BgpUpdateMessage::default(),
509 }),
510 MrtUpdate::TableDumpV2Entry(TableDumpV2Entry {
511 timestamp: 2.0,
512 rib_type: TableDumpV2Type::RibIpv6Unicast,
513 sequence_number: 1,
514 prefix: "2001:db8::/32".parse().unwrap(),
515 rib_entries: vec![],
516 }),
517 MrtUpdate::TableDumpMessage(TableDumpMessage {
518 view_number: 0,
519 sequence_number: 1,
520 prefix: "10.0.0.0/8".parse().unwrap(),
521 status: 1,
522 originated_time: 3,
523 peer_ip: "10.0.0.1".parse().unwrap(),
524 peer_asn: Asn::new_32bit(65001),
525 attributes: Attributes::default(),
526 }),
527 ];
528
529 for (i, update) in updates.iter().enumerate() {
530 match update {
531 MrtUpdate::Bgp4MpUpdate(_) => assert_eq!(i, 0),
532 MrtUpdate::TableDumpV2Entry(_) => assert_eq!(i, 1),
533 MrtUpdate::TableDumpMessage(_) => assert_eq!(i, 2),
534 }
535 }
536 }
537
538 #[test]
539 #[cfg(feature = "serde")]
540 fn test_bgp4mp_update_serde() {
541 let update = Bgp4MpUpdate {
542 timestamp: 1234567890.123456,
543 peer_ip: "192.0.2.1".parse().unwrap(),
544 peer_asn: Asn::new_32bit(65000),
545 message: BgpUpdateMessage::default(),
546 };
547
548 let serialized = serde_json::to_string(&update).unwrap();
549 let deserialized: Bgp4MpUpdate = serde_json::from_str(&serialized).unwrap();
550 assert_eq!(update, deserialized);
551 }
552
553 #[test]
554 #[cfg(feature = "serde")]
555 fn test_table_dump_v2_entry_serde() {
556 let entry = TableDumpV2Entry {
557 timestamp: 1234567890.0,
558 rib_type: TableDumpV2Type::RibIpv4Unicast,
559 sequence_number: 42,
560 prefix: "10.0.0.0/8".parse().unwrap(),
561 rib_entries: vec![],
562 };
563
564 let serialized = serde_json::to_string(&entry).unwrap();
565 let deserialized: TableDumpV2Entry = serde_json::from_str(&serialized).unwrap();
566 assert_eq!(entry, deserialized);
567 }
568
569 #[test]
570 #[cfg(feature = "serde")]
571 fn test_mrt_update_serde() {
572 let update = MrtUpdate::Bgp4MpUpdate(Bgp4MpUpdate {
573 timestamp: 1234567890.5,
574 peer_ip: "192.0.2.1".parse().unwrap(),
575 peer_asn: Asn::new_32bit(65000),
576 message: BgpUpdateMessage::default(),
577 });
578
579 let serialized = serde_json::to_string(&update).unwrap();
580 let deserialized: MrtUpdate = serde_json::from_str(&serialized).unwrap();
581 assert_eq!(update, deserialized);
582 }
583
584 #[test]
586 fn test_update_iterator_with_updates_file() {
587 let url = "https://spaces.bgpkit.org/parser/update-example";
588 let parser = BgpkitParser::new(url).unwrap();
589
590 let mut bgp4mp_count = 0;
591 let mut total_announced = 0;
592 let mut total_withdrawn = 0;
593
594 for update in parser.into_update_iter() {
595 match update {
596 MrtUpdate::Bgp4MpUpdate(u) => {
597 bgp4mp_count += 1;
598 total_announced += u.message.announced_prefixes.len();
599 total_withdrawn += u.message.withdrawn_prefixes.len();
600 for attr in &u.message.attributes {
602 match attr {
603 AttributeValue::MpReachNlri(nlri) => {
604 total_announced += nlri.prefixes.len();
605 }
606 AttributeValue::MpUnreachNlri(nlri) => {
607 total_withdrawn += nlri.prefixes.len();
608 }
609 _ => {}
610 }
611 }
612 }
613 MrtUpdate::TableDumpV2Entry(_) => {
614 panic!("Should not see TableDumpV2Entry in UPDATES file");
615 }
616 MrtUpdate::TableDumpMessage(_) => {
617 panic!("Should not see TableDumpMessage in UPDATES file");
618 }
619 }
620 }
621
622 assert!(bgp4mp_count > 0, "Should have parsed some BGP4MP updates");
624 assert!(
625 total_announced + total_withdrawn > 0,
626 "Should have some prefixes"
627 );
628 }
629
630 #[test]
632 fn test_update_iterator_with_rib_file() {
633 let url = "https://spaces.bgpkit.org/parser/rib-example-small.bz2";
634 let parser = BgpkitParser::new(url).unwrap();
635
636 let mut rib_entry_count = 0;
637 let mut total_rib_entries = 0;
638
639 for update in parser.into_update_iter().take(100) {
640 match update {
641 MrtUpdate::Bgp4MpUpdate(_) => {
642 panic!("Should not see Bgp4MpUpdate in RIB file");
643 }
644 MrtUpdate::TableDumpV2Entry(e) => {
645 rib_entry_count += 1;
646 total_rib_entries += e.rib_entries.len();
647 assert!(e.sequence_number > 0 || rib_entry_count == 1);
649 }
650 MrtUpdate::TableDumpMessage(_) => {
651 }
653 }
654 }
655
656 assert!(rib_entry_count > 0, "Should have parsed some RIB entries");
658 assert!(
659 total_rib_entries > 0,
660 "Should have some RIB entries per prefix"
661 );
662 }
663
664 #[test]
666 fn test_fallible_update_iterator_with_updates_file() {
667 let url = "https://spaces.bgpkit.org/parser/update-example";
668 let parser = BgpkitParser::new(url).unwrap();
669
670 let mut success_count = 0;
671 let mut error_count = 0;
672
673 for result in parser.into_fallible_update_iter() {
674 match result {
675 Ok(_) => success_count += 1,
676 Err(_) => error_count += 1,
677 }
678 }
679
680 assert!(
681 success_count > 0,
682 "Should have parsed some updates successfully"
683 );
684 assert_eq!(
686 error_count, 0,
687 "Should have no parsing errors in valid file"
688 );
689 }
690
691 #[test]
693 fn test_update_iter_vs_elem_iter_consistency() {
694 let url = "https://spaces.bgpkit.org/parser/update-example";
695
696 let parser1 = BgpkitParser::new(url).unwrap();
698 let mut update_iter_announced = 0;
699 let mut update_iter_withdrawn = 0;
700
701 for update in parser1.into_update_iter() {
702 if let MrtUpdate::Bgp4MpUpdate(u) = update {
703 update_iter_announced += u.message.announced_prefixes.len();
704 update_iter_withdrawn += u.message.withdrawn_prefixes.len();
705 for attr in &u.message.attributes {
706 match attr {
707 AttributeValue::MpReachNlri(nlri) => {
708 update_iter_announced += nlri.prefixes.len();
709 }
710 AttributeValue::MpUnreachNlri(nlri) => {
711 update_iter_withdrawn += nlri.prefixes.len();
712 }
713 _ => {}
714 }
715 }
716 }
717 }
718
719 let parser2 = BgpkitParser::new(url).unwrap();
721 let mut elem_iter_announced = 0;
722 let mut elem_iter_withdrawn = 0;
723
724 for elem in parser2.into_elem_iter() {
725 match elem.elem_type {
726 ElemType::ANNOUNCE => elem_iter_announced += 1,
727 ElemType::WITHDRAW => elem_iter_withdrawn += 1,
728 }
729 }
730
731 assert_eq!(
733 update_iter_announced, elem_iter_announced,
734 "Announced prefix counts should match"
735 );
736 assert_eq!(
737 update_iter_withdrawn, elem_iter_withdrawn,
738 "Withdrawn prefix counts should match"
739 );
740 }
741}