use std::{
collections::hash_map::Keys, fmt::Debug, ops::ControlFlow, sync::Arc,
};
use bytes::Bytes;
use chrono::{DateTime, Utc};
use inetnum::addr::Prefix;
use log::debug;
use routecore::bgp::nlri::afisafi::Nlri;
use routecore::bgp::{
message::{SessionConfig, UpdateMessage},
types::AfiSafiType,
};
use routecore::bmp::message::{
Message as BmpMsg, PerPeerHeader, TerminationMessage,
};
use smallvec::SmallVec;
use crate::{
ingress,
payload::{Payload, Update},
units::bmp_tcp_in::state_machine::machine::{
BmpStateIdx, PeerState, PeerStates,
},
};
use super::initiating::Initiating;
use super::super::{
machine::{BmpState, BmpStateDetails, Initiable, PeerAware},
processing::ProcessingResult,
};
#[derive(Debug, Default)]
pub struct Dumping {
pub sys_name: String,
pub sys_desc: String,
pub sys_extra: Vec<String>,
pub peer_states: PeerStates,
}
impl BmpStateDetails<Dumping> {
#[allow(dead_code)]
pub fn process_msg(
self,
received: std::time::Instant,
bmp_msg: BmpMsg<Bytes>,
trace_id: Option<u8>,
) -> ProcessingResult {
match bmp_msg {
BmpMsg::InitiationMessage(msg) => self.initiate(msg),
BmpMsg::PeerUpNotification(msg) => {
let res = self.peer_up(msg);
if let BmpState::Dumping(state) = &res.next_state {
let num_pending_eors =
state.details.peer_states.num_pending_eors();
state.status_reporter.pending_eors_update(
state.router_id.clone(),
num_pending_eors,
);
}
res
}
BmpMsg::PeerDownNotification(msg) => self.peer_down(msg),
BmpMsg::RouteMonitoring(msg) => self.route_monitoring(
received,
msg,
trace_id,
|s, pph, update| {
s.route_monitoring_preprocessing(pph, update)
},
),
BmpMsg::TerminationMessage(msg) => self.terminate(Some(msg)),
_ => {
self.mk_other_result()
}
}
}
}
impl BmpStateDetails<Dumping> {
pub fn route_monitoring_preprocessing(
mut self,
pph: &PerPeerHeader<Bytes>,
update: &UpdateMessage<Bytes>,
) -> ControlFlow<ProcessingResult, Self> {
if let Ok(Some(afi_safi)) = update.is_eor() {
if self
.details
.remove_pending_eor(pph, (afi_safi).try_into().unwrap())
{
let num_pending_eors = self.details.num_pending_eors();
self.status_reporter.pending_eors_update(
self.router_id.clone(),
num_pending_eors,
);
return ControlFlow::Break(Self::mk_state_transition_result(
BmpStateIdx::Dumping,
BmpState::Updating(self.into()),
));
}
}
ControlFlow::Continue(self)
}
pub fn terminate<Octets: AsRef<[u8]>>(
self,
_msg: Option<TerminationMessage<Octets>>,
) -> ProcessingResult {
debug!("dumping terminate()");
let mut ids = SmallVec::<[ingress::IngressId; 8]>::new();
for pph in self.details.get_peers() {
if let Some(id) = self.details.get_peer_ingress_id(pph) {
ids.push(id);
}
}
let next_state = BmpState::Terminated(self.into());
if ids.is_empty() {
Self::mk_state_transition_result(BmpStateIdx::Dumping, next_state)
} else {
let update = Update::WithdrawBulk(ids);
Self::mk_final_routing_update_result(next_state, update)
}
}
}
impl From<Initiating> for Dumping {
fn from(v: Initiating) -> Self {
Self {
sys_name: v.sys_name.unwrap(),
sys_desc: v.sys_desc.unwrap(),
sys_extra: v.sys_extra,
..Default::default()
}
}
}
impl From<BmpStateDetails<Initiating>> for BmpStateDetails<Dumping> {
fn from(v: BmpStateDetails<Initiating>) -> Self {
let details: Dumping = v.details.into();
Self {
ingress_id: v.ingress_id,
router_id: v.router_id,
status_reporter: v.status_reporter,
ingress_register: v.ingress_register,
details,
}
}
}
impl Initiable for Dumping {
fn set_information_tlvs(
&mut self,
sys_name: String,
sys_desc: String,
sys_extra: Vec<String>,
) {
self.sys_name = sys_name;
self.sys_desc = sys_desc;
self.sys_extra = sys_extra;
}
fn sys_name(&self) -> Option<&str> {
Some(self.sys_name.as_str())
}
}
impl PeerAware for Dumping {
fn add_peer_config(
&mut self,
pph: PerPeerHeader<Bytes>,
session_config: SessionConfig,
eor_capable: bool,
ingress_register: Arc<ingress::Register>,
bmp_ingress_id: ingress::IngressId,
) -> bool {
self.peer_states.add_peer_config(
pph,
session_config,
eor_capable,
ingress_register,
bmp_ingress_id,
)
}
fn get_peers(&self) -> Keys<'_, PerPeerHeader<Bytes>, PeerState> {
self.peer_states.get_peers()
}
fn get_peer_ingress_id(
&self,
pph: &PerPeerHeader<Bytes>,
) -> Option<ingress::IngressId> {
self.peer_states.get_peer_ingress_id(pph)
}
fn update_peer_config(
&mut self,
pph: &PerPeerHeader<Bytes>,
config: SessionConfig,
) -> bool {
self.peer_states.update_peer_config(pph, config)
}
fn get_peer_config(
&self,
pph: &PerPeerHeader<Bytes>,
) -> Option<&SessionConfig> {
self.peer_states.get_peer_config(pph)
}
fn remove_peer(
&mut self,
pph: &PerPeerHeader<Bytes>,
) -> Option<PeerState> {
self.peer_states.remove_peer(pph)
}
fn num_peer_configs(&self) -> usize {
self.peer_states.num_peer_configs()
}
fn is_peer_eor_capable(
&self,
pph: &PerPeerHeader<Bytes>,
) -> Option<bool> {
self.peer_states.is_peer_eor_capable(pph)
}
fn add_pending_eor(
&mut self,
pph: &PerPeerHeader<Bytes>,
afi_safi: AfiSafiType,
) -> usize {
self.peer_states.add_pending_eor(pph, afi_safi)
}
fn remove_pending_eor(
&mut self,
pph: &PerPeerHeader<Bytes>,
afi_safi: AfiSafiType,
) -> bool {
self.peer_states.remove_pending_eor(pph, afi_safi)
}
fn num_pending_eors(&self) -> usize {
self.peer_states.num_pending_eors()
}
fn add_announced_prefix(
&mut self,
pph: &PerPeerHeader<Bytes>,
nlri: Nlri<bytes::Bytes>,
) -> bool {
self.peer_states.add_announced_prefix(pph, nlri)
}
fn remove_announced_prefix(
&mut self,
pph: &PerPeerHeader<Bytes>,
nlri: &Nlri<bytes::Bytes>,
) {
self.peer_states.remove_announced_prefix(pph, nlri)
}
fn get_announced_prefixes(
&self,
pph: &PerPeerHeader<Bytes>,
) -> Option<std::collections::hash_set::Iter<Nlri<bytes::Bytes>>> {
self.peer_states.get_announced_prefixes(pph)
}
}