use crate::bgpsvc::*;
use chrono::prelude::*;
use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::*;
use zettabgp::prelude::*;
pub struct BgpPeer<'a, H: BgpUpdateHandler> {
pub params: BgpSessionParams,
peersock: tokio::net::TcpStream,
keepalive_sent: DateTime<Local>,
sessionid: BgpSessionId,
update_handler: &'a H,
}
impl<'a, H: BgpUpdateHandler> BgpPeer<'a, H> {
pub fn new(
pars: BgpSessionParams,
stream: tokio::net::TcpStream,
handler: &'a H,
) -> BgpPeer<'a, H> {
let peerip = stream.peer_addr().unwrap().ip();
let mut ret = BgpPeer::<H> {
params: pars,
peersock: stream,
keepalive_sent: Local::now(),
update_handler: handler,
sessionid: 0,
};
ret.params.peer_mode = if peerip.is_ipv4() {
BgpTransportMode::IPv4
} else {
BgpTransportMode::IPv6
};
ret
}
async fn read_socket(&mut self, buf: &mut [u8]) -> Result<(), BgpError> {
match self.peersock.read_exact(buf).await {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}
async fn write_socket(&mut self, buf: &[u8]) -> Result<(), BgpError> {
match self.peersock.write_all(buf).await {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}
async fn recv_message_head(&mut self) -> Result<(BgpMessageType, usize), BgpError> {
let mut buf = [0u8; 19];
self.read_socket(&mut buf).await?;
self.params.decode_message_head(&buf)
}
fn get_message_body_ref(buf: &mut [u8]) -> Result<&mut [u8], BgpError> {
if buf.len() < 19 {
return Err(BgpError::insufficient_buffer_size());
}
Ok(&mut buf[19..])
}
async fn send_message_buf(
&mut self,
buf: &mut [u8],
messagetype: BgpMessageType,
messagelen: usize,
) -> Result<(), BgpError> {
let blen = self
.params
.prepare_message_buf(buf, messagetype, messagelen)?;
self.write_socket(&buf[0..blen]).await
}
pub async fn start_passive(&mut self) -> Result<(), BgpError> {
let mut bom = BgpOpenMessage::new();
let mut buf = [255u8; 255];
let msg = match self.recv_message_head().await {
Err(e) => return Err(e),
Ok(msg) => msg,
};
if msg.0 != BgpMessageType::Open {
return Err(BgpError::static_str("Invalid state to start_passive"));
}
self.read_socket(&mut buf[0..msg.1]).await?;
bom.decode_from(&self.params, &buf[0..msg.1])?;
let remsess = BgpPeerDesc::new(self.peersock.peer_addr().unwrap().ip(), bom.clone());
bom.router_id = self.params.router_id;
self.params.as_num = bom.as_num;
self.params.hold_time = bom.hold_time;
self.params.match_caps(&bom.caps);
let sz = match bom.encode_to(&self.params, BgpPeer::<H>::get_message_body_ref(&mut buf)?) {
Err(e) => return Err(e),
Ok(sz) => sz,
};
self.send_message_buf(&mut buf, BgpMessageType::Open, sz)
.await?;
let mysess = BgpPeerDesc::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), bom.clone());
self.sessionid = self
.update_handler
.register_session(Arc::new(BgpSessionDesc::new(mysess, remsess)))
.await;
Ok(())
}
pub async fn start_active(&mut self) -> Result<(), BgpError> {
info!("start_active");
loop {
let bom = self.params.open_message();
let mut buf = [255u8; 255];
let sz =
match bom.encode_to(&self.params, BgpPeer::<H>::get_message_body_ref(&mut buf)?) {
Err(e) => {
return Err(e);
}
Ok(sz) => sz,
};
let mysess = BgpPeerDesc::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), bom.clone());
self.send_message_buf(&mut buf, BgpMessageType::Open, sz)
.await?;
let msg = match self.recv_message_head().await {
Err(e) => {
return Err(e);
}
Ok(msg) => msg,
};
match msg.0 {
BgpMessageType::Open => {
self.read_socket(&mut buf[0..msg.1]).await?;
let mut bomrcv = self.params.open_message();
bomrcv.decode_from(&self.params, &buf[0..msg.1])?;
let remsess =
BgpPeerDesc::new(self.peersock.peer_addr().unwrap().ip(), bomrcv.clone());
self.params.hold_time = bomrcv.hold_time;
self.params.match_caps(&bomrcv.caps);
self.sessionid = self
.update_handler
.register_session(Arc::new(BgpSessionDesc::new(mysess, remsess)))
.await;
return Ok(());
}
BgpMessageType::Notification => {
self.read_socket(&mut buf[0..msg.1]).await?;
let mut bnrcv = BgpNotificationMessage::new();
bnrcv.decode_from(&self.params, &buf[0..msg.1])?;
warn!("Notification: {}", bnrcv.error_text());
if bnrcv.error_code == 2 && bnrcv.error_subcode == 7 {
let (cap, _) = BgpCapability::from_buffer(&buf[2..msg.1])?;
if let Ok(cap) = cap {
warn!("Unsupported capability: {:?} in {:?}", cap, self.params);
self.params.remove_capability(&cap);
continue;
}
}
return Err(BgpError::from_string(format!(
"Notification received: {}",
bnrcv.error_text()
)));
}
_ => {
return Err(BgpError::from_string(format!(
"Invalid message type received: {:?}",
msg.0
)));
}
}
}
}
pub async fn send_keepalive(&mut self) -> Result<(), BgpError> {
let mut buf = [255u8; 19];
let blen = self
.params
.prepare_message_buf(&mut buf, BgpMessageType::Keepalive, 0)?;
match self.write_socket(&buf[0..blen]).await {
Ok(_) => {
self.keepalive_sent = Local::now();
Ok(())
}
Err(e) => Err(e),
}
}
pub async fn lifecycle(&mut self, cancel: tokio_util::sync::CancellationToken) {
let mut buf = [255u8; 4096];
let keep_interval = chrono::Duration::seconds((self.params.hold_time / 3) as i64);
loop {
let mut tosleep = Local::now() - self.keepalive_sent;
if tosleep >= keep_interval {
match self.send_keepalive().await {
Ok(_) => {}
Err(e) => {
error!("Keepalive send error: {:?}", e);
}
}
tosleep = Local::now() - self.keepalive_sent;
}
tosleep = keep_interval - tosleep;
let tosleepstd = match tosleep.to_std() {
Ok(s) => s,
Err(_) => std::time::Duration::from_secs(1),
};
let msg = select! {
_ = cancel.cancelled() => {
break;
}
_ = tokio::time::sleep(tosleepstd) => {
(BgpMessageType::Keepalive,0)
}
msgin = self.recv_message_head() => {
match msgin {
Err(e) => {
error!("recv_message_head: {:?}", e);
break;
}
Ok(msg) => msg
}
}
};
if let Err(e) = self.read_socket(&mut buf[0..msg.1]).await {
warn!("recv_message: {:?}", e);
};
match msg.0 {
BgpMessageType::Open => {
error!("Incorrect open message!");
break;
}
BgpMessageType::Keepalive => match self.send_keepalive().await {
Ok(_) => {}
Err(e) => {
warn!("Keepalive sending error: {:?}", e);
}
},
BgpMessageType::Notification => {
let mut msgnotification = BgpNotificationMessage::new();
match msgnotification.decode_from(&self.params, &buf[0..msg.1]) {
Err(e) => {
warn!("BGP notification decode error: {:?}", e);
}
Ok(_) => {
info!(
"BGP notification: {:?} - {:?}",
msgnotification,
msgnotification.error_text()
);
}
};
break;
}
BgpMessageType::Update => {
let mut msgupdate = BgpUpdateMessage::new();
if let Err(e) = msgupdate.decode_from(&self.params, &buf[0..msg.1]) {
error!("BGP update decode error: {:?}", e);
continue;
}
self.update_handler
.handle_update(self.sessionid, msgupdate)
.await;
}
}
}
}
pub async fn close(&mut self) {
match self.peersock.shutdown().await {
Ok(_) => {}
Err(e) => {
error!("Warning: socket shutdown error: {}", e)
}
}
}
}