use atomic_enum::atomic_enum;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use log::{debug, error, warn};
use inetnum::addr::Prefix;
use rotonda_store::prefix_record::RouteStatus;
use routecore::bgp::fsm::session;
use routecore::{
bgp::nlri::afisafi::IsPrefix,
bgp::nlri::afisafi::Nlri,
bgp::{
message::{open::CapabilityType, SessionConfig, UpdateMessage},
types::AfiSafiType,
workshop::route::RouteWorkshop,
},
bmp::message::{
InformationTlvType, InitiationMessage, Message as BmpMsg,
PeerDownNotification, PeerUpNotification, PerPeerHeader, RibType,
RouteMonitoring,
},
};
use smallvec::SmallVec;
use std::{
collections::{
hash_map::{DefaultHasher, Keys},
BTreeSet, HashMap, HashSet,
},
hash::{Hash, Hasher},
io::Read,
ops::ControlFlow,
sync::Arc,
};
use crate::{
common::{
routecore_extra::generate_alternate_config,
status_reporter::AnyStatusReporter,
},
ingress,
payload::{Payload, RouterId, Update},
roto_runtime::types::{
explode_announcements, explode_withdrawals, FreshRouteContext,
PeerId, PeerRibType, Provenance,
},
};
use super::{
metrics::BmpStateMachineMetrics,
processing::{MessageType, ProcessingResult},
states::{
dumping::Dumping, initiating::Initiating, terminated::Terminated,
updating::Updating,
},
status_reporter::{BmpStateMachineStatusReporter, UpdateReportMessage},
};
use routecore::Octets;
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
pub struct EoRProperties {
pub afi_safi: AfiSafiType,
pub post_policy: bool, pub adj_rib_out: bool, }
impl EoRProperties {
pub fn new<T: AsRef<[u8]>>(
pph: &PerPeerHeader<T>,
afi_safi: AfiSafiType,
) -> Self {
EoRProperties {
afi_safi,
post_policy: pph.is_post_policy(),
adj_rib_out: pph.adj_rib_type() == RibType::AdjRibOut,
}
}
}
pub struct PeerDetails {
peer_bgp_id: [u8; 4],
peer_distinguisher: [u8; 8],
peer_rib_type: RibType,
peer_id: PeerId,
}
pub struct PeerState {
pub session_config: SessionConfig,
pub eor_capable: bool,
pub pending_eors: HashSet<EoRProperties>,
pub announced_nlri: HashSet<Nlri<bytes::Bytes>>,
pub peer_details: PeerDetails,
pub ingress_id: ingress::IngressId,
}
impl std::fmt::Debug for PeerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PeerState")
.field("session_config", &self.session_config)
.field("pending_eors", &self.pending_eors)
.finish()
}
}
#[derive(Debug)]
pub enum BmpState {
Initiating(BmpStateDetails<Initiating>),
Dumping(BmpStateDetails<Dumping>),
Updating(BmpStateDetails<Updating>),
Terminated(BmpStateDetails<Terminated>),
_Aborted(ingress::IngressId, Arc<RouterId>),
}
#[atomic_enum]
#[derive(Default, PartialEq, Eq, Hash)]
pub enum BmpStateIdx {
#[default]
Initiating = 0,
Dumping = 1,
Updating = 2,
Terminated = 3,
Aborted = 4,
}
impl Default for AtomicBmpStateIdx {
fn default() -> Self {
Self::new(Default::default())
}
}
impl std::fmt::Display for BmpStateIdx {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BmpStateIdx::Initiating => write!(f, "Initiating"),
BmpStateIdx::Dumping => write!(f, "Dumping"),
BmpStateIdx::Updating => write!(f, "Updating"),
BmpStateIdx::Terminated => write!(f, "Terminated"),
BmpStateIdx::Aborted => write!(f, "Aborted"),
}
}
}
#[derive(Debug)]
pub struct BmpStateDetails<T>
where
BmpState: From<BmpStateDetails<T>>,
{
pub ingress_id: ingress::IngressId,
pub router_id: Arc<String>,
pub status_reporter: Arc<BmpStateMachineStatusReporter>,
pub ingress_register: Arc<ingress::Register>,
pub details: T,
}
impl BmpState {
pub fn _ingress_id(&self) -> ingress::IngressId {
match self {
BmpState::Initiating(v) => v.ingress_id.clone(),
BmpState::Dumping(v) => v.ingress_id.clone(),
BmpState::Updating(v) => v.ingress_id.clone(),
BmpState::Terminated(v) => v.ingress_id.clone(),
BmpState::_Aborted(ingress_id, _) => ingress_id.clone(),
}
}
pub fn router_id(&self) -> Arc<String> {
match self {
BmpState::Initiating(v) => v.router_id.clone(),
BmpState::Dumping(v) => v.router_id.clone(),
BmpState::Updating(v) => v.router_id.clone(),
BmpState::Terminated(v) => v.router_id.clone(),
BmpState::_Aborted(_, router_id) => router_id.clone(),
}
}
pub fn state_idx(&self) -> BmpStateIdx {
match self {
BmpState::Initiating(_) => BmpStateIdx::Initiating,
BmpState::Dumping(_) => BmpStateIdx::Dumping,
BmpState::Updating(_) => BmpStateIdx::Updating,
BmpState::Terminated(_) => BmpStateIdx::Terminated,
BmpState::_Aborted(_, _) => BmpStateIdx::Aborted,
}
}
pub fn status_reporter(
&self,
) -> Option<Arc<BmpStateMachineStatusReporter>> {
match self {
BmpState::Initiating(v) => Some(v.status_reporter.clone()),
BmpState::Dumping(v) => Some(v.status_reporter.clone()),
BmpState::Updating(v) => Some(v.status_reporter.clone()),
BmpState::Terminated(v) => Some(v.status_reporter.clone()),
BmpState::_Aborted(_, _) => None,
}
}
}
impl<T> std::hash::Hash for BmpStateDetails<T>
where
BmpState: From<BmpStateDetails<T>>,
{
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.ingress_id.hash(state);
}
}
impl<T> BmpStateDetails<T>
where
BmpState: From<BmpStateDetails<T>>,
{
pub fn mk_invalid_message_result<U: Into<String>>(
self,
err: U,
known_peer: Option<bool>,
msg_bytes: Option<Bytes>,
) -> ProcessingResult {
ProcessingResult::new(
MessageType::InvalidMessage {
err: err.into(),
known_peer,
msg_bytes,
},
self.into(),
)
}
pub fn mk_other_result(self) -> ProcessingResult {
ProcessingResult::new(MessageType::Other, self.into())
}
pub fn mk_routing_update_result(
self,
update: Update,
) -> ProcessingResult {
ProcessingResult::new(
MessageType::RoutingUpdate { update },
self.into(),
)
}
pub fn mk_final_routing_update_result(
next_state: BmpState,
update: Update,
) -> ProcessingResult {
ProcessingResult::new(
MessageType::RoutingUpdate { update },
next_state,
)
}
pub fn mk_state_transition_result(
prev_state: BmpStateIdx,
next_state: BmpState,
) -> ProcessingResult {
if let Some(status_reporter) = next_state.status_reporter() {
status_reporter.change_state(
next_state.router_id(),
prev_state,
next_state.state_idx(),
);
}
ProcessingResult::new(MessageType::StateTransition, next_state)
}
}
pub trait Initiable {
fn set_information_tlvs(
&mut self,
sys_name: String,
sys_desc: String,
sys_extra: Vec<String>,
);
fn sys_name(&self) -> Option<&str>;
}
impl<T> BmpStateDetails<T>
where
T: Initiable,
BmpState: From<BmpStateDetails<T>>,
{
pub fn initiate<Octs: Octets>(
mut self,
msg: InitiationMessage<Octs>,
) -> ProcessingResult {
let sys_name = msg
.information_tlvs()
.filter(|tlv| tlv.typ() == InformationTlvType::SysName)
.map(|tlv| String::from_utf8_lossy(tlv.value()).into_owned())
.collect::<Vec<_>>()
.join("|");
if sys_name.is_empty() {
warn!(
"Invalid BMP InitiationMessage: \
Missing or empty sysName Information TLV"
);
}
let sys_desc = msg
.information_tlvs()
.filter(|tlv| tlv.typ() == InformationTlvType::SysDesc)
.map(|tlv| String::from_utf8_lossy(tlv.value()).into_owned())
.collect::<Vec<_>>()
.join("|");
let extra = msg
.information_tlvs()
.filter(|tlv| tlv.typ() == InformationTlvType::String)
.map(|tlv| String::from_utf8_lossy(tlv.value()).into_owned())
.collect::<Vec<_>>();
self.details.set_information_tlvs(sys_name, sys_desc, extra);
self.mk_other_result()
}
}
pub trait PeerAware {
fn add_peer_config(
&mut self,
pph: PerPeerHeader<Bytes>,
config: SessionConfig,
eor_capable: bool,
ingress_register: Arc<ingress::Register>,
bmp_ingress_id: ingress::IngressId,
) -> bool;
fn get_peers(&self) -> Keys<'_, PerPeerHeader<Bytes>, PeerState>;
fn remove_peer(
&mut self,
pph: &PerPeerHeader<Bytes>,
) -> Option<PeerState>;
fn update_peer_config(
&mut self,
pph: &PerPeerHeader<Bytes>,
config: SessionConfig,
) -> bool;
fn get_peer_config(
&self,
pph: &PerPeerHeader<Bytes>,
) -> Option<&SessionConfig>;
fn get_peer_ingress_id(
&self,
_pph: &PerPeerHeader<Bytes>,
) -> Option<ingress::IngressId>;
fn num_peer_configs(&self) -> usize;
fn is_peer_eor_capable(&self, pph: &PerPeerHeader<Bytes>)
-> Option<bool>;
fn add_pending_eor(
&mut self,
pph: &PerPeerHeader<Bytes>,
afi_safi: AfiSafiType,
) -> usize;
fn remove_pending_eor(
&mut self,
pph: &PerPeerHeader<Bytes>,
afi_safi: AfiSafiType,
) -> bool;
fn num_pending_eors(&self) -> usize;
fn add_announced_prefix(
&mut self,
pph: &PerPeerHeader<Bytes>,
prefix: Nlri<bytes::Bytes>,
) -> bool;
fn remove_announced_prefix(
&mut self,
pph: &PerPeerHeader<Bytes>,
prefix: &Nlri<bytes::Bytes>,
);
fn get_announced_prefixes(
&self,
pph: &PerPeerHeader<Bytes>,
) -> Option<std::collections::hash_set::Iter<Nlri<bytes::Bytes>>>;
}
impl<T> BmpStateDetails<T>
where
T: PeerAware,
BmpState: From<BmpStateDetails<T>>,
{
pub fn peer_up(
mut self,
msg: PeerUpNotification<Bytes>,
) -> ProcessingResult {
let pph = msg.per_peer_header();
let config = msg.session_config();
let eor_capable = msg
.bgp_open_rcvd()
.capabilities()
.any(|cap| cap.typ() == CapabilityType::GracefulRestart);
if !self.details.add_peer_config(
pph,
config,
eor_capable,
self.ingress_register.clone(),
self.ingress_id,
) {
return self.mk_invalid_message_result(
format!(
"PeerUpNotification received for peer that is already 'up': {}",
msg.per_peer_header()
),
Some(true),
Some(Bytes::copy_from_slice(msg.as_ref())),
);
}
self.status_reporter
.peer_up(self.router_id.clone(), eor_capable);
self.mk_other_result()
}
pub fn peer_down(
mut self,
msg: PeerDownNotification<Bytes>,
) -> ProcessingResult {
let pph = msg.per_peer_header();
if let Some(removed_peer) = self.details.remove_peer(&pph) {
self.status_reporter.routing_update(UpdateReportMessage {
router_id: self.router_id.clone(),
n_new_prefixes: 0, n_valid_announcements: 0, n_valid_withdrawals: 0, n_stored_prefixes: 0, n_invalid_announcements: 0,
n_invalid_withdrawals: 0,
last_invalid_announcement: None,
last_invalid_withdrawal: None,
});
let eor_capable = self.details.is_peer_eor_capable(&pph);
self.status_reporter
.peer_down(self.router_id.clone(), eor_capable);
self.mk_routing_update_result(Update::Withdraw(
removed_peer.ingress_id,
None,
))
} else {
return self.mk_invalid_message_result(
"PeerDownNotification received for peer that was not 'up'",
Some(false),
Some(Bytes::copy_from_slice(msg.as_ref())),
);
}
}
pub fn route_monitoring<CB>(
mut self,
received: std::time::Instant,
msg: RouteMonitoring<Bytes>,
trace_id: Option<u8>,
do_state_specific_pre_processing: CB,
) -> ProcessingResult
where
CB: Fn(
BmpStateDetails<T>,
&PerPeerHeader<Bytes>,
&UpdateMessage<Bytes>,
) -> ControlFlow<ProcessingResult, Self>,
{
let mut tried_peer_configs = SmallVec::<[SessionConfig; 4]>::new();
let pph = msg.per_peer_header();
let Some(peer_config) = self.details.get_peer_config(&pph) else {
self.status_reporter.peer_unknown(self.router_id.clone());
return self.mk_invalid_message_result(
format!(
"RouteMonitoring message received for peer that is not 'up': {}",
msg.per_peer_header()
),
Some(false),
Some(Bytes::copy_from_slice(msg.as_ref())),
);
};
let mut peer_config = peer_config.clone();
let mut retry_due_to_err: Option<String> = None;
loop {
let res = match msg.bgp_update(&peer_config) {
Ok(update) => {
if let Some(err_str) = retry_due_to_err {
self.status_reporter.bgp_update_parse_soft_fail(
self.router_id.clone(),
err_str,
Some(Bytes::copy_from_slice(msg.as_ref())),
);
self.details
.update_peer_config(&pph, peer_config.clone());
}
let mut saved_self =
match do_state_specific_pre_processing(
self, &pph, &update,
) {
ControlFlow::Break(res) => return res,
ControlFlow::Continue(saved_self) => saved_self,
};
if let Ok((payloads, mut update_report_msg)) = saved_self
.extract_route_monitoring_routes(
received,
pph.clone(),
&update,
trace_id,
)
{
match update.announcements_vec() {
Err(err) => {
return saved_self.mk_invalid_message_result(
format!(
"Invalid BMP RouteMonitoring BGP \
UPDATE message. One or more elements in the \
NLRI(s) cannot be parsed: ({:?}) {:?}",
&peer_config,
err.to_string()
),
Some(true),
Some(Bytes::copy_from_slice(
msg.as_ref(),
)),
);
}
Ok(announcements) => {
if update_report_msg.n_valid_announcements > 0
&& saved_self
.details
.is_peer_eor_capable(&pph)
== Some(true)
{
let afi_safi: AfiSafiType = announcements
.first()
.unwrap()
.afi_safi();
let num_pending_eors = saved_self
.details
.add_pending_eor(&pph, afi_safi);
saved_self
.status_reporter
.pending_eors_update(
saved_self.router_id.clone(),
num_pending_eors,
);
}
}
}
saved_self
.status_reporter
.routing_update(update_report_msg);
saved_self
.mk_routing_update_result(Update::Bulk(payloads))
} else {
return saved_self.mk_invalid_message_result(
"Invalid BMP RouteMonitoring BGP UPDATE message. The message cannot be parsed.",
Some(true),
Some(Bytes::copy_from_slice(msg.as_ref())),
);
}
}
Err(err) => {
tried_peer_configs.push(peer_config.clone());
if let Some(alt_config) =
generate_alternate_config(&peer_config)
{
if !tried_peer_configs.contains(&alt_config) {
peer_config = alt_config;
if retry_due_to_err.is_none() {
retry_due_to_err = Some(err.to_string());
}
continue;
}
}
self.mk_invalid_message_result(
format!(
"Invalid BMP RouteMonitoring BGP UPDATE message: ({:?}) {}",
&peer_config, err
),
Some(true),
Some(Bytes::copy_from_slice(msg.as_ref())),
)
}
};
break res;
}
}
pub fn extract_route_monitoring_routes(
&mut self,
received: std::time::Instant,
pph: PerPeerHeader<Bytes>,
bgp_msg: &UpdateMessage<Bytes>,
_trace_id: Option<u8>,
) -> Result<(SmallVec<[Payload; 8]>, UpdateReportMessage), session::Error>
{
let rr_reach = explode_announcements(bgp_msg)?;
let rr_unreach = explode_withdrawals(bgp_msg)?;
let ingress_id = if let Some(ingress_id) =
self.details.get_peer_ingress_id(&pph)
{
ingress_id
} else {
error!("no ingress_id for {:?}", &pph);
return Err(session::Error::for_str("missing ingress_id"));
};
let mut payloads = SmallVec::new();
let mut update_report_msg =
UpdateReportMessage::new(self.router_id.clone());
let provenance = Provenance::for_bmp(
ingress_id,
pph.address(),
pph.asn(),
pph.address(), [0; 9], PeerRibType::from((pph.is_post_policy(), pph.adj_rib_type())),
);
let context = FreshRouteContext::new(
bgp_msg.clone(),
RouteStatus::Active,
provenance,
);
if rr_reach.len() > 0 {
update_report_msg.n_new_prefixes = rr_reach.len();
}
payloads.extend(
rr_reach.into_iter().map(|rr| {
update_report_msg.inc_valid_announcements();
Payload::with_received(
rr,
context.clone().into(),
None,
received,
)
}),
);
let context = FreshRouteContext {
status: RouteStatus::Withdrawn,
..context
};
payloads.extend(rr_unreach.into_iter().map(|rr| {
update_report_msg.inc_valid_withdrawals();
Payload::with_received(rr, context.clone().into(), None, received)
}));
Ok((payloads, update_report_msg))
}
}
impl BmpState {
pub fn new<T: AnyStatusReporter>(
source_id: ingress::IngressId,
router_id: Arc<RouterId>,
parent_status_reporter: Arc<T>,
metrics: Arc<BmpStateMachineMetrics>,
ingress_register: Arc<ingress::Register>,
) -> Self {
let child_name = parent_status_reporter.link_names("bmp_state");
let status_reporter =
Arc::new(BmpStateMachineStatusReporter::new(child_name, metrics));
BmpState::Initiating(BmpStateDetails::<Initiating>::new(
source_id,
router_id,
status_reporter,
ingress_register.clone(),
))
}
#[allow(dead_code)]
pub fn process_msg(
self,
received: std::time::Instant,
bmp_msg: BmpMsg<Bytes>,
trace_id: Option<u8>,
) -> ProcessingResult {
let res = match self {
BmpState::Initiating(inner) => {
inner.process_msg(bmp_msg, trace_id)
}
BmpState::Dumping(inner) => {
inner.process_msg(received, bmp_msg, trace_id)
}
BmpState::Updating(inner) => {
inner.process_msg(received, bmp_msg, trace_id)
}
BmpState::Terminated(inner) => {
inner.process_msg(bmp_msg.into(), trace_id)
}
BmpState::_Aborted(source_id, router_id) => {
ProcessingResult::new(
MessageType::Aborted,
BmpState::_Aborted(source_id, router_id),
)
}
};
if let ProcessingResult {
message_type:
MessageType::InvalidMessage {
known_peer: _known_peer,
msg_bytes,
err,
},
next_state,
} = res
{
if let Some(reporter) = next_state.status_reporter() {
reporter.bgp_update_parse_hard_fail(
next_state.router_id(),
err.clone(),
msg_bytes,
);
}
ProcessingResult::new(
MessageType::InvalidMessage {
known_peer: None,
msg_bytes: None,
err,
},
next_state,
)
} else {
res
}
}
}
impl From<BmpStateDetails<Initiating>> for BmpState {
fn from(v: BmpStateDetails<Initiating>) -> Self {
Self::Initiating(v)
}
}
impl From<BmpStateDetails<Dumping>> for BmpState {
fn from(v: BmpStateDetails<Dumping>) -> Self {
Self::Dumping(v)
}
}
impl From<BmpStateDetails<Updating>> for BmpState {
fn from(v: BmpStateDetails<Updating>) -> Self {
Self::Updating(v)
}
}
impl From<BmpStateDetails<Terminated>> for BmpState {
fn from(v: BmpStateDetails<Terminated>) -> Self {
Self::Terminated(v)
}
}
#[derive(Debug, Default)]
pub struct PeerStates(HashMap<PerPeerHeader<Bytes>, PeerState>);
impl PeerStates {
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
}
impl PeerAware for PeerStates {
fn add_peer_config(
&mut self,
pph: PerPeerHeader<Bytes>,
session_config: SessionConfig,
eor_capable: bool,
ingress_register: Arc<ingress::Register>,
bmp_ingress_id: ingress::IngressId,
) -> bool {
let mut added = false;
let query_ingress = ingress::IngressInfo::new()
.with_parent(bmp_ingress_id)
.with_remote_addr(pph.address())
.with_remote_asn(pph.asn())
.with_rib_type(pph.rib_type());
let peer_ingress_id;
if let Some((ingress_id, _ingress_info)) =
ingress_register.find_existing_peer(&query_ingress)
{
peer_ingress_id = ingress_id;
} else {
peer_ingress_id = ingress_register.register();
ingress_register.update_info(peer_ingress_id, query_ingress);
}
let _ = self.0.entry(pph.clone()).or_insert_with(|| {
added = true;
PeerState {
session_config,
eor_capable,
pending_eors: HashSet::with_capacity(0),
announced_nlri: HashSet::with_capacity(0),
peer_details: PeerDetails {
peer_bgp_id: pph.bgp_id(),
peer_distinguisher: pph
.distinguisher()
.try_into()
.unwrap(),
peer_rib_type: pph.rib_type(),
peer_id: PeerId::new(pph.address(), pph.asn()),
},
ingress_id: peer_ingress_id,
}
});
added
}
fn get_peers(&self) -> Keys<'_, PerPeerHeader<Bytes>, PeerState> {
self.0.keys()
}
fn get_peer_ingress_id(
&self,
pph: &PerPeerHeader<Bytes>,
) -> Option<ingress::IngressId> {
self.0.get(pph).map(|e| e.ingress_id)
}
fn update_peer_config(
&mut self,
pph: &PerPeerHeader<Bytes>,
new_config: SessionConfig,
) -> bool {
if let Some(peer_state) = self.0.get_mut(pph) {
peer_state.session_config = new_config;
peer_state.peer_details = PeerDetails {
peer_bgp_id: pph.bgp_id(),
peer_distinguisher: pph.distinguisher().try_into().unwrap(),
peer_rib_type: pph.rib_type(),
peer_id: PeerId::new(pph.address(), pph.asn()),
};
true
} else {
false
}
}
fn get_peer_config(
&self,
pph: &PerPeerHeader<Bytes>,
) -> Option<&SessionConfig> {
self.0.get(pph).map(|peer_state| &peer_state.session_config)
}
fn remove_peer(
&mut self,
pph: &PerPeerHeader<Bytes>,
) -> Option<PeerState> {
self.0.remove(pph) }
fn num_peer_configs(&self) -> usize {
self.0.len()
}
fn is_peer_eor_capable(
&self,
pph: &PerPeerHeader<Bytes>,
) -> Option<bool> {
self.0.get(pph).map(|peer_state| peer_state.eor_capable)
}
fn add_pending_eor(
&mut self,
pph: &PerPeerHeader<Bytes>,
afi_safi: AfiSafiType,
) -> usize {
if let Some(peer_state) = self.0.get_mut(pph) {
peer_state
.pending_eors
.insert(EoRProperties::new(pph, afi_safi));
peer_state.pending_eors.len()
} else {
0
}
}
fn remove_pending_eor(
&mut self,
pph: &PerPeerHeader<Bytes>,
afi_safi: AfiSafiType,
) -> bool {
if let Some(peer_state) = self.0.get_mut(pph) {
peer_state
.pending_eors
.remove(&EoRProperties::new(pph, afi_safi));
}
self.0
.values()
.all(|peer_state| peer_state.pending_eors.is_empty())
}
fn num_pending_eors(&self) -> usize {
self.0
.values()
.fold(0, |acc, peer_state| acc + peer_state.pending_eors.len())
}
fn add_announced_prefix(
&mut self,
pph: &PerPeerHeader<Bytes>,
prefix: Nlri<bytes::Bytes>,
) -> bool {
if let Some(peer_state) = self.0.get_mut(pph) {
peer_state.announced_nlri.insert(prefix)
} else {
false
}
}
fn remove_announced_prefix(
&mut self,
pph: &PerPeerHeader<Bytes>,
nlri: &Nlri<bytes::Bytes>,
) {
if let Some(peer_state) = self.0.get_mut(pph) {
peer_state.announced_nlri.remove(nlri);
}
}
fn get_announced_prefixes(
&self,
pph: &PerPeerHeader<Bytes>,
) -> Option<std::collections::hash_set::Iter<Nlri<bytes::Bytes>>> {
self.0
.get(pph)
.map(|peer_state| peer_state.announced_nlri.iter())
}
}