use std::io::Cursor;
use std::net::{IpAddr, SocketAddr};
use bytes::{Buf, Bytes, BytesMut};
use tokio::io::AsyncReadExt;
use tokio::net::tcp::OwnedReadHalf;
use tokio::sync::{mpsc, oneshot};
use crate::bgp::message::keepalive::KeepaliveBuilder;
use crate::bgp::message::notification::{
CeaseSubcode, Details, FiniteStateMachineSubcode, NotificationBuilder,
OpenMessageSubcode,
};
use crate::bgp::message::open::{Capability, OpenBuilder};
use crate::bgp::message::{
Message as BgpMsg, NotificationMessage, SessionConfig, UpdateMessage,
};
use crate::bgp::types::{AddpathDirection, AddpathFamDir, AfiSafiType};
use crate::bgp::ParseError;
use inetnum::asn::Asn;
use log::{debug, error, info, warn};
use super::state_machine::{Event, SessionAttributes, State};
use super::timers::Timer;
#[allow(unused_imports)]
use super::util::to_pcap;
#[derive(Debug)]
pub struct Session<C> {
config: C,
negotiated: Option<NegotiatedConfig>,
local_capabilities: Vec<u8>,
attributes: SessionAttributes,
connection: Option<Connection>,
channel: mpsc::Sender<Message>,
commands: mpsc::Receiver<Command>,
pdu_out_tx: mpsc::Sender<BgpMsg<Bytes>>,
connect_retry_timer: Timer,
hold_timer: Timer,
keepalive_timer: Timer,
delay_open_timer: Timer,
}
impl<C: BgpConfig + Send> Session<C> {
pub fn new(
config: C,
tcp_in: OwnedReadHalf,
channel: mpsc::Sender<Message>,
commands: mpsc::Receiver<Command>,
pdu_out_tx: mpsc::Sender<BgpMsg<Bytes>>,
) -> Self {
let mut attributes = SessionAttributes::default();
if let Some(hold_time) = config.hold_time() {
attributes.set_hold_time(hold_time);
}
Self {
config,
negotiated: None,
attributes,
connection: Some(Connection::for_read_half(tcp_in)),
channel,
commands,
pdu_out_tx,
connect_retry_timer: Timer::new(
attributes.connect_retry_time().into(),
),
hold_timer: Timer::new(attributes.hold_time().into()),
keepalive_timer: Timer::new(u64::from(
attributes.hold_time() / 3,
)),
delay_open_timer: Timer::new(u64::from(
attributes.delay_open_time(),
)),
local_capabilities: vec![], }
}
pub async fn attach_stream(&mut self, stream: OwnedReadHalf) {
let _socket_status = stream.readable().await;
self.connection = Some(Connection::for_read_half(stream));
self.connection_established().await;
}
pub fn connected_addr(&self) -> Option<SocketAddr> {
self.connection.as_ref().map(|c| c.remote_addr)
}
pub const fn config(&self) -> &C {
&self.config
}
pub fn set_negotiated_config(&mut self, config: NegotiatedConfig) {
self.negotiated = Some(config);
if let Some(sc) = self.connection.as_mut() {
for famdir in &self.negotiated.as_ref().unwrap().addpath {
sc.session_config_mut().add_famdir(*famdir);
}
} else {
warn!("set_negotiated_config: no Connection for Session");
}
}
pub const fn negotiated(&self) -> Option<&NegotiatedConfig> {
self.negotiated.as_ref()
}
fn drop_connection(&mut self) {
if self.connection.take().is_none() {
warn!("trying to disconnect non-existing connection");
}
}
fn disconnect(&mut self, reason: DisconnectReason) {
match reason {
DisconnectReason::ConnectionRejected => {
self.send_notification(CeaseSubcode::ConnectionRejected);
}
DisconnectReason::Reconfiguration => {
self.send_notification(
CeaseSubcode::OtherConfigurationChange,
);
}
DisconnectReason::Deconfigured => {
self.send_notification(CeaseSubcode::PeerDeconfigured);
}
DisconnectReason::HoldTimerExpired => {
self.send_notification(Details::HoldTimerExpired);
}
DisconnectReason::Shutdown => {
self.send_notification(CeaseSubcode::AdministrativeShutdown);
}
DisconnectReason::FsmViolation(maybe_notification) => {
if let Some(details) = maybe_notification {
self.send_notification(details);
}
}
DisconnectReason::Other => {
debug!("DisconnectReason::Other, not sending NOTIFICATION");
}
}
debug!(
"disconnecting peer {:?}: {:?}",
self.connection.as_ref().map(|c| c.remote_addr),
reason,
);
self.keepalive_timer.stop_and_reset();
self.hold_timer.stop_and_reset();
self.drop_connection();
}
pub async fn tick(&mut self) -> Result<(), Error> {
tokio::select! {
Some(cmd) = self.commands.recv() => {
match cmd {
Command::AttachStream{stream} => {
debug!("Command::AttachStream");
self.attach_stream(stream).await;
}
Command::GetAttributes{resp} => {
let _ = resp.send(self.attributes);
}
Command::Disconnect(reason) => {
self.disconnect(reason);
}
Command::ForcedKeepalive => {
self.send_keepalive();
}
}
}
Some(msg) = maybe_read_frame(self.connection.as_mut()) => {
match msg {
Ok(Some(m)) => {
if let Err(msg) = self.handle_msg(m).await {
debug!("handle_msg returned err: {msg}, break");
self.set_state(State::Connect);
return Err(Error { msg: "handle_msg failed" });
}
}
Ok(None) => {
if let Some(ref connection) = self.connection {
warn!(
"[{}] Connection lost",
connection.remote_addr
);
let _ = self.channel.send(
Message::ConnectionLost(
Some(connection.remote_addr)
)
).await;
} else {
error!("Connection lost but no stream attached");
}
self.connection = None;
self.set_state(State::Connect);
}
Err(e) => {
error!("{e}");
self.connection = None;
self.set_state(State::Connect);
return Err(Error { msg: "error from read_frame" });
}
}
}
_ = self.keepalive_timer.tick() => {
self.handle_event(Event::KeepaliveTimerExpires).await?;
}
_ = self.hold_timer.tick() => {
self.handle_event(Event::HoldTimerExpires).await?;
}
_ = self.delay_open_timer.tick() => {
self.handle_event(Event::DelayOpenTimerExpires).await?;
}
}
Ok(())
}
pub async fn process(mut self) -> Result<(), Error> {
loop {
self.tick().await?;
}
}
fn send_pdu(&self, pdu: BgpMsg<Bytes>) {
if self.pdu_out_tx.try_send(pdu).is_err() {
warn!("outgoing pdu queue blocked");
}
}
pub fn send_open(&mut self) {
let mut openbuilder =
OpenBuilder::from_target(BytesMut::new()).unwrap();
openbuilder.set_asn(self.config.local_asn());
openbuilder.set_holdtime(self.attributes.hold_time());
openbuilder.set_bgp_id(self.config.bgp_id());
openbuilder.four_octet_capable(self.config.local_asn());
for afisafi in &self.config.protocols() {
openbuilder.add_mp(*afisafi);
}
for fam in self.config.addpath() {
openbuilder.add_addpath(fam, AddpathDirection::SendReceive);
}
let open_msg = openbuilder.into_message();
self.set_local_capabilities(open_msg.capabilities_as_vec());
self.send_pdu(BgpMsg::Open(open_msg));
}
fn set_local_capabilities(&mut self, capabilities: Vec<u8>) {
self.local_capabilities = capabilities;
}
fn local_capabilities(&self) -> &[u8] {
self.local_capabilities.as_ref()
}
fn send_notification<S>(&self, subcode: S)
where
S: Into<Details>,
{
let msg = NotificationBuilder::new_vec_nodata(subcode);
self.send_pdu(BgpMsg::Notification(
NotificationMessage::from_octets(Bytes::from(msg)).unwrap(),
));
}
fn send_keepalive(&self) {
self.send_pdu(BgpMsg::Keepalive(
KeepaliveBuilder::from_target(BytesMut::new())
.unwrap()
.into_message(),
));
}
pub fn local_asn(&self) -> Asn {
self.config.local_asn()
}
pub const fn attributes(&self) -> &SessionAttributes {
&self.attributes
}
pub const fn hold_time(&self) -> u16 {
self.attributes().hold_time()
}
pub fn set_hold_time(&mut self, time: u16) {
self.attributes_mut().set_hold_time(time);
}
pub fn enable_delay_open(&mut self) {
self.attributes_mut().enable_delay_open();
}
fn attributes_mut(&mut self) -> &mut SessionAttributes {
&mut self.attributes
}
pub const fn state(&self) -> State {
self.attributes.state()
}
fn increase_connect_retry_counter(&mut self) {
self.attributes_mut().increase_connect_retry_counter();
}
fn reset_connect_retry_counter(&mut self) {
self.attributes_mut().reset_connect_retry_counter();
}
fn set_state(&mut self, state: State) {
self.attributes_mut().set_state(state);
}
pub async fn manual_start(&mut self) {
if self.attributes.passive_tcp_establishment() {
let _ = self
.handle_event(Event::ManualStartWithPassiveTcpEstablishment)
.await;
} else {
let _ = self.handle_event(Event::ManualStart).await;
}
}
pub async fn connection_established(&mut self) {
let _ = self.handle_event(Event::TcpConnectionConfirmed).await;
}
async fn handle_msg(&mut self, msg: BgpMsg<Bytes>) -> Result<(), Error> {
match msg {
BgpMsg::Open(m) => {
debug!("got OPEN from {}, generating event", m.my_asn());
if self.delay_open_timer.is_running() {
self.handle_event(
Event::BgpOpenWithDelayOpenTimerRunning(m),
)
.await?;
} else {
self.handle_event(Event::BgpOpen(m)).await?;
}
}
BgpMsg::Keepalive(_m) => {
self.handle_event(Event::KeepaliveMsg).await?;
}
BgpMsg::Update(m) => {
self.handle_event(Event::UpdateMsg).await?;
let tx = self.channel.clone();
let _ = tx.send(Message::UpdateMessage(m)).await;
}
BgpMsg::Notification(m) => {
let tx = self.channel.clone();
let _ = tx.send(Message::NotificationMessage(m)).await;
}
BgpMsg::RouteRefresh(_m) => {
debug!("got ROUTEREFRESH, not doing anything");
}
}
Ok(())
}
#[allow(unreachable_code)]
#[allow(clippy::too_many_lines)]
async fn handle_event(&mut self, event: Event) -> Result<(), Error> {
use Event as E;
use State as S;
match (self.state(), &event) {
(S::Unimplemented(_), _ ) => {
error!("Unimplemented state, resetting to Idle");
self.set_state(State::Idle);
}
(S::Idle, E::ManualStart | E::AutomaticStart) => {
self.reset_connect_retry_counter();
self.connect_retry_timer.start();
todo!();
self.set_state(State::Connect);
}
(S::Idle,
E::ManualStartWithPassiveTcpEstablishment |
E::AutomaticStartWithPassiveTcpEstablishment
) => {
self.reset_connect_retry_counter();
self.connect_retry_timer.start();
self.set_state(State::Active);
}
(S::Idle, E::ManualStop) => {
info!("ignored ManualStop in Idle state");
}
(S::Idle,
E::ConnectRetryTimerExpires |
E::HoldTimerExpires |
E::KeepaliveTimerExpires |
E::DelayOpenTimerExpires |
E::TcpCrAcked |
E::TcpConnectionConfirmed |
E::TcpConnectionFails |
E::BgpOpen(_) |
E::BgpOpenWithDelayOpenTimerRunning(_) |
E::BgpHeaderErr |
E::BgpOpenMsgErr |
E::NotifMsgVerErr |
E::NotifMsg |
E::KeepaliveMsg |
E::UpdateMsg |
E::UpdateMsgErr
) => warn!("(unexpected) non-event {:?} in state Idle", event),
(S::Connect,
E::ManualStart |
E::AutomaticStart |
E::ManualStartWithPassiveTcpEstablishment |
E::AutomaticStartWithPassiveTcpEstablishment
) => {
warn!("ignored {:?} in state Connect", event);
}
(S::Connect, E::ManualStop) => {
self.reset_connect_retry_counter();
self.connect_retry_timer.stop_and_reset();
self.set_state(State::Idle);
}
(S::Connect, E::ConnectRetryTimerExpires) => {
todo!();
}
(S::Connect, E::DelayOpenTimerExpires) => {
self.send_open();
self.set_state(State::OpenSent);
}
(S::Connect, E::TcpCrAcked | E::TcpConnectionConfirmed) => {
if self.attributes().delay_open() {
self.connect_retry_timer.stop_and_reset();
self.delay_open_timer.start();
} else {
self.connect_retry_timer.stop_and_reset();
self.send_open();
self.set_state(State::OpenSent);
}
}
(S::Connect, E::TcpConnectionFails) => {
if self.delay_open_timer.is_running() {
todo!();
} else {
self.connect_retry_timer.stop_and_reset();
self.set_state(State::Idle);
}
}
(S::Connect, E::BgpOpenWithDelayOpenTimerRunning(_open_msg)) => {
debug!("Received OPEN during DelayOpen");
todo!();
}
(S::Connect, E::BgpHeaderErr | E::BgpOpenMsgErr) => { todo!() }
(S::Connect, E::NotifMsgVerErr) => { todo!() }
(S::Connect,
E::HoldTimerExpires |
E::KeepaliveTimerExpires |
E::BgpOpen(_) |
E::NotifMsg |
E::KeepaliveMsg |
E::UpdateMsg |
E::UpdateMsgErr
) => {
self.connect_retry_timer.stop_and_reset();
self.delay_open_timer.stop_and_reset();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::Active,
E::ManualStart |
E::AutomaticStart |
E::ManualStartWithPassiveTcpEstablishment |
E::AutomaticStartWithPassiveTcpEstablishment
) => {
info!("ignored {:?} in state Active", event);
}
(S::Active, E::ManualStop) => {
if self.delay_open_timer.is_running() &&
self.attributes().notification_without_open()
{
self.disconnect(DisconnectReason::Shutdown);
}
self.delay_open_timer.stop_and_reset();
self.reset_connect_retry_counter();
self.connect_retry_timer.stop_and_reset();
self.set_state(State::Idle);
}
(S::Active, E::ConnectRetryTimerExpires) => {
if self.config.is_exact() {
todo!();
self.set_state(State::Connect);
} else {
warn!(
"ConnectRetry expired for flexible config, \
staying in State::Active"
);
}
}
(S::Active, E::DelayOpenTimerExpires) => {
self.connect_retry_timer.stop_and_reset();
self.delay_open_timer.stop_and_reset();
self.send_open();
self.set_state(State::OpenSent);
}
(S::Active, E::TcpCrAcked | E::TcpConnectionConfirmed) => {
if self.attributes.delay_open() {
self.connect_retry_timer.stop_and_reset();
self.delay_open_timer.start();
} else {
self.connect_retry_timer.start();
debug!("send_open in Active, no DelayOpen");
self.send_open();
self.set_state(State::OpenSent);
}
}
(S::Active, E::TcpConnectionFails) => {
self.connect_retry_timer.start();
self.delay_open_timer.stop_and_reset();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::Active, E::BgpOpenWithDelayOpenTimerRunning(open_msg)) => {
self.connect_retry_timer.stop_and_reset();
self.delay_open_timer.stop_and_reset();
if !self.config.remote_asn_allowed(open_msg.my_asn()) {
warn!(
"unexpected ASN {} in OPEN",
open_msg.my_asn()
);
self.disconnect(DisconnectReason::FsmViolation(Some(
OpenMessageSubcode::BadPeerAs.into()
)));
self.set_state(State::Idle);
return Err(Error { msg: "stop processing" })
}
let received_addpaths = open_msg.addpath_families_vec()
.map_err(|_| Error { msg: "failed to parse addpath caps" })?;
let intersection = received_addpaths.iter().filter(|(fam, dir)|{
matches!(
dir,
AddpathDirection::Send |
AddpathDirection::SendReceive
) &&
self.config.addpath().contains(fam)
}).map(|(fam, dir)| AddpathFamDir::new(*fam, *dir)).collect::<Vec<_>>();
debug!("addpath intersection: {:?}", &intersection);
let negotiated = NegotiatedConfig {
hold_time: std::cmp::min(open_msg.holdtime(), self.hold_time()),
remote_bgp_id: open_msg.identifier()[0..4].try_into().unwrap(),
remote_asn: open_msg.my_asn(),
remote_addr: self.connection.as_ref().unwrap().remote_addr(),
addpath: intersection,
local_capabilities: self.local_capabilities().to_vec(),
remote_capabilities: open_msg.capabilities_as_vec(),
};
self.send_open();
self.set_negotiated_config(negotiated.clone());
debug!(
"Negotiated: {}@{} id {:?}, hold time {}s",
negotiated.remote_asn,
negotiated.remote_addr,
negotiated.remote_bgp_id,
negotiated.hold_time,
);
let _ = self.channel.send(Message::SessionNegotiated(negotiated)).await;
self.send_keepalive();
self.keepalive_timer.start();
self.hold_timer.start();
self.set_state(State::OpenConfirm);
},
(S::Active, E::BgpHeaderErr | E::BgpOpenMsgErr) => {
if self.attributes().notification_without_open() {
todo!();
}
self.connect_retry_timer.stop_and_reset();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::Active, E::NotifMsgVerErr) => {
if self.delay_open_timer.is_running() {
self.connect_retry_timer.stop_and_reset();
self.delay_open_timer.stop_and_reset();
} else {
self.connect_retry_timer.stop_and_reset();
self.increase_connect_retry_counter();
}
self.set_state(State::Idle);
}
(S::Active,
E::HoldTimerExpires |
E::KeepaliveTimerExpires |
E::BgpOpen(_) |
E::NotifMsg |
E::KeepaliveMsg |
E::UpdateMsg |
E::UpdateMsgErr
) => {
self.connect_retry_timer.stop_and_reset();
self.increase_connect_retry_counter();
warn!(
"Changing from Active to Idle because of {:?}",
event
);
self.set_state(State::Idle);
}
(S::OpenSent,
E::ManualStart |
E::AutomaticStart |
E::ManualStartWithPassiveTcpEstablishment |
E::AutomaticStartWithPassiveTcpEstablishment
) => {
info!("ignored {:?} in state OpenSent", event);
}
(S::OpenSent, E::ManualStop) => {
self.disconnect(DisconnectReason::Shutdown);
self.connect_retry_timer.stop_and_reset();
self.reset_connect_retry_counter();
self.set_state(State::Idle);
}
(S::OpenSent, E::HoldTimerExpires) => {
self.disconnect(DisconnectReason::HoldTimerExpired);
self.connect_retry_timer.stop_and_reset();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::OpenSent,
E::TcpCrAcked | E::TcpConnectionConfirmed ) => {
todo!()
}
(S::OpenSent, E::TcpConnectionFails) => {
self.connect_retry_timer.reset();
self.set_state(State::Active);
}
(S::OpenSent, E::BgpOpen(open_msg)) => {
self.delay_open_timer.stop_and_reset();
self.connect_retry_timer.stop_and_reset();
if !self.config.remote_asn_allowed(open_msg.my_asn()) {
warn!(
"unexpected ASN {} in OPEN",
open_msg.my_asn()
);
self.disconnect(DisconnectReason::FsmViolation(Some(
OpenMessageSubcode::BadPeerAs.into()
)));
self.set_state(State::Idle);
return Err(Error { msg: "stop processing" })
}
let received_addpaths = open_msg.addpath_families_vec()
.map_err(|_| Error { msg: "failed to parse addpath caps" })?;
let intersection = received_addpaths.iter().filter(|(fam, dir)|{
matches!(
dir,
AddpathDirection::Send |
AddpathDirection::SendReceive
) &&
self.config.addpath().contains(fam)
}).map(|(fam, dir)| AddpathFamDir::new(*fam, *dir)).collect::<Vec<_>>();
debug!("addpath intersection: {:?}", &intersection);
let negotiated = NegotiatedConfig {
hold_time: std::cmp::min(open_msg.holdtime(), self.hold_time()),
remote_bgp_id: open_msg.identifier()[0..4].try_into().unwrap(),
remote_asn: open_msg.my_asn(),
remote_addr: self.connection.as_ref().unwrap().remote_addr(),
addpath: intersection,
local_capabilities: self.local_capabilities().to_vec(),
remote_capabilities: open_msg.capabilities_as_vec(),
};
debug!(
"Negotiated: {}@{} id {:?}, hold time {}s",
negotiated.remote_asn,
negotiated.remote_addr,
negotiated.remote_bgp_id,
negotiated.hold_time,
);
self.set_negotiated_config(negotiated.clone());
let _ = self.channel.send(Message::SessionNegotiated(negotiated)).await;
self.send_keepalive();
self.keepalive_timer.start();
self.hold_timer.start();
self.set_state(State::OpenConfirm);
}
(S::OpenSent, E::BgpHeaderErr | E::BgpOpenMsgErr) => {
self.connect_retry_timer.stop_and_reset();
self.drop_connection();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::OpenSent, E::NotifMsgVerErr) => {
self.connect_retry_timer.stop_and_reset();
self.drop_connection();
self.set_state(State::Idle);
}
(S::OpenSent,
E::ConnectRetryTimerExpires |
E::KeepaliveTimerExpires |
E::DelayOpenTimerExpires |
E::BgpOpenWithDelayOpenTimerRunning(_) |
E::NotifMsg |
E::KeepaliveMsg |
E::UpdateMsg |
E::UpdateMsgErr
) => {
self.disconnect(DisconnectReason::FsmViolation(Some(
FiniteStateMachineSubcode::
UnexpectedMessageInOpenSentState.into()
)));
self.connect_retry_timer.stop_and_reset();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::OpenConfirm,
E::ManualStart |
E::AutomaticStart |
E::ManualStartWithPassiveTcpEstablishment |
E::AutomaticStartWithPassiveTcpEstablishment
) => {
info!("ignored {:?} in state OpenConfirm", event);
}
(S::OpenConfirm, E::ManualStop) => {
self.disconnect(DisconnectReason::Shutdown);
self.reset_connect_retry_counter();
self.connect_retry_timer.stop_and_reset();
self.set_state(State::Idle);
}
(S::OpenConfirm, E::HoldTimerExpires) => {
self.disconnect(DisconnectReason::HoldTimerExpired);
self.connect_retry_timer.stop_and_reset();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::OpenConfirm, E::KeepaliveTimerExpires) => {
self.send_keepalive();
}
(S::OpenConfirm,
E::TcpCrAcked | E::TcpConnectionConfirmed ) => {
todo!()
}
(S::OpenConfirm, E::TcpConnectionFails | E::NotifMsg ) => {
self.connect_retry_timer.stop_and_reset();
self.drop_connection();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::OpenConfirm, E::NotifMsgVerErr) => {
self.connect_retry_timer.stop_and_reset();
self.drop_connection();
self.set_state(State::Idle);
}
(S::OpenConfirm, E::BgpOpen(_)) => {
todo!();
self.connect_retry_timer.stop_and_reset();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::OpenConfirm, E::BgpHeaderErr | E::BgpOpenMsgErr) => {
self.connect_retry_timer.stop_and_reset();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::OpenConfirm, E::KeepaliveMsg) => {
self.hold_timer.reset();
self.set_state(State::Established);
}
(S::OpenConfirm,
E::ConnectRetryTimerExpires |
E::DelayOpenTimerExpires |
E::BgpOpenWithDelayOpenTimerRunning(_) |
E::UpdateMsg |
E::UpdateMsgErr
) => {
self.disconnect(DisconnectReason::FsmViolation(Some(
FiniteStateMachineSubcode::
UnexpectedMessageInOpenConfirmState.into()
)));
self.connect_retry_timer.stop_and_reset();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::Established,
E::ManualStart |
E::AutomaticStart |
E::ManualStartWithPassiveTcpEstablishment |
E::AutomaticStartWithPassiveTcpEstablishment
) => {
info!("ignored {:?} in state Established", event);
}
(S::Established, E::ManualStop) => {
self.disconnect(DisconnectReason::Shutdown);
self.connect_retry_timer.stop_and_reset();
self.reset_connect_retry_counter();
self.set_state(State::Idle);
}
(S::Established, E::HoldTimerExpires) => {
self.disconnect(DisconnectReason::HoldTimerExpired);
self.connect_retry_timer.stop_and_reset();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::Established, E::KeepaliveTimerExpires) => {
self.send_keepalive();
}
(S::Established,
E::TcpCrAcked | E::TcpConnectionConfirmed ) => {
todo!()
}
(S::Established, E::BgpOpen(_)) => {
todo!()
}
(S::Established,
E::NotifMsgVerErr | E::NotifMsg | E::TcpConnectionFails) => {
self.connect_retry_timer.stop_and_reset();
self.drop_connection();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::Established, E::KeepaliveMsg) => {
self.hold_timer.reset();
}
(S::Established, E::UpdateMsg) => {
self.hold_timer.reset();
}
(S::Established, E::UpdateMsgErr) => {
self.connect_retry_timer.stop_and_reset();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
(S::Established,
E::ConnectRetryTimerExpires |
E::DelayOpenTimerExpires |
E::BgpOpenWithDelayOpenTimerRunning(_) |
E::BgpHeaderErr |
E::BgpOpenMsgErr
) => {
debug!("got unexpected {event:?} in S::Established");
self.disconnect(DisconnectReason::FsmViolation(Some(
FiniteStateMachineSubcode::
UnexpectedMessageInEstablishedState.into()
)));
self.connect_retry_timer.stop_and_reset();
self.increase_connect_retry_counter();
self.set_state(State::Idle);
}
}
Ok(())
}
}
pub enum Message {
UpdateMessage(UpdateMessage<Bytes>),
NotificationMessage(NotificationMessage<Bytes>),
Attributes(SessionAttributes),
SessionNegotiated(NegotiatedConfig),
ConnectionLost(Option<SocketAddr>),
}
#[derive(Debug)]
pub enum Command {
AttachStream {
stream: OwnedReadHalf,
},
GetAttributes {
resp: oneshot::Sender<SessionAttributes>,
},
Disconnect(DisconnectReason),
ForcedKeepalive,
}
#[derive(Copy, Clone, Debug)]
pub enum DisconnectReason {
ConnectionRejected,
Reconfiguration,
Deconfigured,
HoldTimerExpired,
Shutdown,
FsmViolation(Option<Details>),
Other,
}
#[derive(Debug)]
pub struct Connection {
remote_addr: SocketAddr,
tcp_in: OwnedReadHalf,
buffer: BytesMut,
session_config: SessionConfig,
}
impl Connection {
pub const fn remote_addr(&self) -> IpAddr {
self.remote_addr.ip()
}
pub fn for_read_half(tcp_in: OwnedReadHalf) -> Self {
Self {
remote_addr: tcp_in.peer_addr().unwrap(),
tcp_in,
buffer: BytesMut::with_capacity(2 << 20),
session_config: SessionConfig::modern(),
}
}
pub fn session_config_mut(&mut self) -> &mut SessionConfig {
&mut self.session_config
}
async fn read_frame(&mut self) -> Result<Option<BgpMsg<Bytes>>, Error> {
loop {
if let Some(frame) = self.parse_frame()? {
return Ok(Some(frame));
}
if 0 == self.tcp_in.read_buf(&mut self.buffer).await? {
if self.buffer.is_empty() {
return Ok(None);
}
return Err(Error::for_str("connection reset by peer"));
}
}
}
fn parse_frame(&mut self) -> Result<Option<BgpMsg<Bytes>>, ParseError> {
let mut buf = Cursor::new(&self.buffer[..]);
if buf.remaining() >= 16 + 2 {
buf.set_position(16);
let len = buf.get_u16();
if buf.remaining() >= ((len as usize) - 18) {
buf.set_position(0);
let b =
Bytes::copy_from_slice(&buf.into_inner()[..len.into()]);
let msg = BgpMsg::from_octets(b, Some(&self.session_config))?;
self.buffer.advance(len.into());
return Ok(Some(msg));
}
}
Ok(None)
}
}
async fn maybe_read_frame(
conn: Option<&mut Connection>,
) -> Option<Result<Option<BgpMsg<Bytes>>, Error>> {
if let Some(c) = conn {
Some(c.read_frame().await)
} else {
None
}
}
pub trait BgpConfig {
fn local_asn(&self) -> Asn;
fn bgp_id(&self) -> [u8; 4];
fn remote_addr_allowed(&self, remote_addr: IpAddr) -> bool;
fn remote_asn_allowed(&self, remote_asn: Asn) -> bool;
fn hold_time(&self) -> Option<u16>;
fn is_exact(&self) -> bool;
fn protocols(&self) -> Vec<AfiSafiType>;
fn addpath(&self) -> Vec<AfiSafiType>;
}
#[derive(Clone, Debug)]
pub struct BasicConfig {
local_asn: Asn,
bgp_id: [u8; 4],
pub remote_asn: Asn,
pub remote_addr: IpAddr,
pub hold_time: Option<u16>,
_capabilities: Vec<Capability<Vec<u8>>>,
}
impl BasicConfig {
pub const fn new(
local_asn: Asn,
bgp_id: [u8; 4],
remote_addr: IpAddr,
remote_asn: Asn,
hold_time: Option<u16>,
) -> Self {
Self {
local_asn,
bgp_id,
remote_asn,
remote_addr,
hold_time,
_capabilities: vec![],
}
}
}
impl BgpConfig for BasicConfig {
fn local_asn(&self) -> Asn {
self.local_asn
}
fn bgp_id(&self) -> [u8; 4] {
self.bgp_id
}
fn remote_addr_allowed(&self, remote_addr: IpAddr) -> bool {
remote_addr == self.remote_addr
}
fn remote_asn_allowed(&self, remote_asn: Asn) -> bool {
remote_asn == self.remote_asn
}
fn hold_time(&self) -> Option<u16> {
self.hold_time
}
fn is_exact(&self) -> bool {
true
}
fn protocols(&self) -> Vec<AfiSafiType> {
vec![AfiSafiType::Ipv4Unicast, AfiSafiType::Ipv6Unicast]
}
fn addpath(&self) -> Vec<AfiSafiType> {
vec![]
}
}
#[derive(Clone, Debug)]
pub struct NegotiatedConfig {
hold_time: u16, remote_bgp_id: [u8; 4],
remote_asn: Asn,
remote_addr: IpAddr,
addpath: Vec<AddpathFamDir>,
local_capabilities: Vec<u8>,
remote_capabilities: Vec<u8>,
}
impl NegotiatedConfig {
pub const fn remote_asn(&self) -> Asn {
self.remote_asn
}
pub const fn remote_addr(&self) -> IpAddr {
self.remote_addr
}
pub fn local_capabilities(&self) -> &[u8] {
self.local_capabilities.as_ref()
}
pub fn remote_capabilities(&self) -> &[u8] {
self.remote_capabilities.as_ref()
}
pub fn dummy() -> Self {
Self {
hold_time: 0,
remote_bgp_id: [1, 2, 3, 4],
remote_asn: Asn::from_u32(12345),
remote_addr: IpAddr::V4([1, 2, 3, 4].into()),
addpath: vec![],
local_capabilities: vec![],
remote_capabilities: vec![],
}
}
}
use std::fmt;
#[derive(Debug)]
pub struct ConnectionError(String);
impl std::error::Error for ConnectionError {}
impl fmt::Display for ConnectionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Connection error: {}", self.0)
}
}
impl From<std::io::Error> for ConnectionError {
fn from(e: std::io::Error) -> Self {
Self(format!("Connection io error: {e}"))
}
}
#[derive(Debug)]
pub struct UnexpectedPeer(IpAddr);
impl std::error::Error for UnexpectedPeer {}
impl fmt::Display for UnexpectedPeer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "unexpected peer {}", self.0)
}
}
impl From<UnexpectedPeer> for ConnectionError {
fn from(e: UnexpectedPeer) -> Self {
Self(e.to_string())
}
}
#[derive(Debug)]
pub struct Error {
msg: &'static str,
}
impl std::fmt::Display for Error {
fn fmt(
&self,
f: &mut std::fmt::Formatter,
) -> Result<(), std::fmt::Error> {
write!(f, "error: {}", self.msg)
}
}
impl Error {
pub const fn for_str(msg: &'static str) -> Self {
Self { msg }
}
}
impl From<std::io::Error> for Error {
fn from(_e: std::io::Error) -> Self {
Self::for_str("io error")
}
}
impl From<ParseError> for Error {
fn from(_e: ParseError) -> Self {
Self::for_str("parse error")
}
}