use bgpkit_parser::bmp::messages::{
PeerDownNotification, PeerDownReason, PeerUpNotification, RouteMonitoring,
};
use bgpkit_parser::models::{
Asn, Attributes, BgpMessage, BgpOpenMessage, BgpUpdateMessage, NetworkPrefix,
};
use chrono::DateTime;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::str::FromStr;
use tokio::sync::mpsc::channel;
use risotto_lib::processor::{peer_down_notification, peer_up_notification, route_monitoring};
use risotto_lib::state::new_state;
use risotto_lib::state_store::memory::MemoryStore;
use risotto_lib::update::{Update, UpdateMetadata};
fn default_open_message() -> BgpOpenMessage {
BgpOpenMessage {
version: 0,
asn: Asn::from(0),
hold_time: 0,
sender_ip: Ipv4Addr::from_str("0.0.0.0").unwrap(),
extended_length: false,
opt_params: vec![],
}
}
#[tokio::test]
async fn test_peer_up_notification() {
let (tx, mut rx) = channel(100);
let store = MemoryStore::new();
let state = new_state(store);
let metadata = UpdateMetadata {
time_bmp_header_ns: 0,
router_socket: SocketAddr::from_str("192.0.1.0:179").unwrap(),
peer_addr: IpAddr::from_str("192.0.2.0").unwrap(),
peer_bgp_id: Ipv4Addr::from_str("192.0.2.0").unwrap(),
peer_asn: 65000,
is_post_policy: false,
is_adj_rib_out: false,
};
let body = PeerUpNotification {
local_addr: IpAddr::from_str("192.0.2.0").unwrap(),
local_port: 10000,
remote_port: 10001,
sent_open: BgpMessage::Open(default_open_message()),
received_open: BgpMessage::Open(default_open_message()),
tlvs: vec![],
};
peer_up_notification(Some(state), tx, metadata, body)
.await
.unwrap();
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_route_monitoring() {
let mut tests = vec![];
tests.push((
UpdateMetadata {
time_bmp_header_ns: 0,
router_socket: SocketAddr::from_str("192.0.1.0:179").unwrap(),
peer_addr: IpAddr::from_str("192.0.2.0").unwrap(),
peer_bgp_id: Ipv4Addr::from_str("192.0.2.0").unwrap(),
peer_asn: 65000,
is_post_policy: false,
is_adj_rib_out: false,
},
RouteMonitoring {
bgp_message: BgpMessage::Update(BgpUpdateMessage {
announced_prefixes: vec![
(NetworkPrefix {
prefix: "10.0.1.0/24".parse().unwrap(),
path_id: 0,
}),
],
withdrawn_prefixes: vec![],
attributes: Attributes::default(),
}),
},
vec![Update {
time_received_ns: DateTime::from_timestamp(0, 0).unwrap(),
time_bmp_header_ns: DateTime::from_timestamp(0, 0).unwrap(),
router_addr: IpAddr::from_str("::ffff:192.0.1.0").unwrap(),
router_port: 179,
peer_addr: IpAddr::from_str("::ffff:192.0.2.0").unwrap(),
peer_bgp_id: Ipv4Addr::from_str("192.0.2.0").unwrap(),
peer_asn: 65000,
prefix_addr: IpAddr::from_str("::ffff:10.0.1.0").unwrap(),
prefix_len: 24,
is_post_policy: false,
is_adj_rib_out: false,
announced: true,
next_hop: None,
origin: "INCOMPLETE".to_string(),
path: vec![],
local_preference: None,
med: None,
communities: vec![],
synthetic: false,
}],
));
for (metadata, body, expects) in tests {
let (tx, mut rx) = channel(100);
let store = MemoryStore::new();
let state = new_state(store);
route_monitoring(Some(state), tx, metadata, body)
.await
.unwrap();
for expect in expects.iter() {
let mut update = rx.recv().await.unwrap();
update.time_received_ns = expect.time_received_ns;
assert_eq!(update, expect.clone());
}
}
}
#[tokio::test]
async fn test_peer_down_notification() {
let (tx, mut rx) = channel(100);
let store = MemoryStore::new();
let state = new_state(store);
let metadata = UpdateMetadata {
time_bmp_header_ns: 0,
router_socket: SocketAddr::from_str("192.0.1.0:179").unwrap(),
peer_addr: IpAddr::from_str("192.0.2.0").unwrap(),
peer_bgp_id: Ipv4Addr::from_str("192.0.2.0").unwrap(),
peer_asn: 65000,
is_post_policy: false,
is_adj_rib_out: false,
};
let body = PeerDownNotification {
reason: PeerDownReason::PeerDeConfigured,
data: None,
};
peer_down_notification(Some(state), tx, metadata, body)
.await
.unwrap();
assert!(rx.try_recv().is_err());
}