use std::cell::RefCell;
use std::rc::Rc;
use std::sync::Arc;
use bytes::Bytes;
use inetnum::addr::Prefix;
use inetnum::asn::Asn;
use log::warn;
use rotonda_store::match_options::{IncludeHistory, MatchOptions, MatchType};
use routecore::bgp::aspath::{AsPath, Hop, HopPath};
use routecore::bgp::communities::{
LargeCommunity, StandardCommunity, Wellknown,
};
use routecore::bgp::message::update_builder::StandardCommunitiesList;
use routecore::bgp::message::SessionConfig;
use routecore::bgp::message::UpdateMessage as BgpUpdateMessage;
use routecore::bgp::nlri::afisafi::IsPrefix;
use routecore::bgp::path_attributes::LargeCommunitiesList;
use routecore::bmp::message::PerPeerHeader;
use routecore::bmp::message::{Message as BmpMsg, MessageType as BmpMsgType};
use roto::{roto_function, roto_method, roto_static_method, Context};
use super::types::{
InsertionInfo, Output, Provenance, RotoOutputStream, RouteContext,
};
use crate::payload::{RotondaRoute, RovStatus};
use crate::roto_runtime::types::LogEntry;
use crate::units::rib_unit::unit::RtrCache;
pub(crate) type Log = Rc<RefCell<RotoOutputStream>>;
pub(crate) type SharedRtrCache = Arc<RtrCache>;
pub(crate) type MutRotondaRoute = Rc<RefCell<RotondaRoute>>;
pub(crate) type MutLogEntry = Rc<RefCell<LogEntry>>;
impl From<RotondaRoute> for MutRotondaRoute {
fn from(value: RotondaRoute) -> Self {
Rc::new(RefCell::new(value))
}
}
#[derive(Context, Clone)]
pub struct Ctx {
pub output: Log,
pub rpki: SharedRtrCache,
}
unsafe impl Send for Ctx {}
impl Ctx {
pub fn new(log: Log, rpki: SharedRtrCache) -> Self {
Self {
output: log,
rpki,
}
}
}
pub fn create_runtime() -> Result<roto::Runtime, String> {
let mut rt = roto::Runtime::new();
rt.register_clone_type_with_name::<MutRotondaRoute>(
"Route",
"A single announced or withdrawn path",
)?;
rt.register_clone_type_with_name::<RouteContext>(
"RouteContext",
"Contextual information pertaining to the Route",
)?;
rt.register_copy_type::<Provenance>("Session/state information")?;
rt.register_clone_type_with_name::<Log>(
"Log",
"Machinery to create output entries",
)?;
rt.register_clone_type_with_name::<SharedRtrCache>(
"Rpki",
"RPKI information retrieved via RTR",
)?;
rt.register_context_type::<Ctx>()?;
rt.register_copy_type::<InsertionInfo>(
"Information from the RIB on an inserted route",
)?;
rt.register_clone_type_with_name::<MutLogEntry>(
"LogEntry",
"Entry to log to file/mqtt",
)?;
rt.register_clone_type_with_name::<BgpUpdateMessage<Bytes>>(
"BgpMsg",
"BGP UPDATE message",
)?;
rt.register_copy_type_with_name::<StandardCommunity>(
"Community",
"A BGP Standard Community (RFC1997)",
)?;
rt.register_copy_type_with_name::<LargeCommunity>(
"LargeCommunity",
"A BGP Large Community (RFC8092)",
)?;
#[roto_function(rt)]
fn community(raw: u32) -> StandardCommunity {
StandardCommunity::from_u32(raw)
}
#[roto_static_method(rt, StandardCommunity, new)]
fn new(raw: u32) -> StandardCommunity {
StandardCommunity::from_u32(raw)
}
#[roto_method(rt, Provenance)]
fn peer_asn(provenance: &Provenance) -> Asn {
provenance.peer_asn
}
#[roto_method(rt, Asn, fmt)]
fn fmt_asn(asn: Asn) -> Arc<str> {
asn.to_string().into()
}
#[roto_method(rt, MutRotondaRoute)]
fn prefix_matches(rr: &MutRotondaRoute, to_match: &Prefix) -> bool {
let rr = rr.borrow_mut();
let rr_prefix = match *rr {
RotondaRoute::Ipv4Unicast(n, ..) => n.prefix(),
RotondaRoute::Ipv6Unicast(n, ..) => n.prefix(),
RotondaRoute::Ipv4Multicast(n, ..) => n.prefix(),
RotondaRoute::Ipv6Multicast(n, ..) => n.prefix(),
};
rr_prefix == *to_match
}
#[roto_method(rt, MutRotondaRoute, aspath_contains)]
fn rr_aspath_contains(rr: &MutRotondaRoute, to_match: Asn) -> bool {
let rr = rr.borrow_mut();
if let Some(hoppath) = rr.owned_map().get::<HopPath>() {
hoppath.into_iter().any(|h| h == to_match.into())
} else {
false
}
}
#[roto_method(rt, MutRotondaRoute, match_aspath_origin)]
fn rr_match_aspath_origin(
rr: &MutRotondaRoute,
to_match: Asn,
) -> bool {
let rr = rr.borrow_mut();
if let Some(hoppath) = rr.owned_map().get::<HopPath>() {
if let Some(Hop::Asn(asn)) = hoppath.origin() {
return *asn == to_match;
}
}
false
}
#[roto_method(rt, MutRotondaRoute, contains_community)]
fn rr_contains_community(
rr: &MutRotondaRoute,
to_match: &StandardCommunity,
) -> bool {
let rr = rr.borrow_mut();
if let Some(list) = rr.owned_map().get::<StandardCommunitiesList>() {
return list.communities().iter().any(|c| c == to_match);
}
false
}
#[roto_method(rt, MutRotondaRoute, contains_large_community)]
fn rr_contains_large_community(
rr: &MutRotondaRoute,
to_match: &LargeCommunity,
) -> bool {
let rr = rr.borrow_mut();
if let Some(list) = rr.owned_map().get::<LargeCommunitiesList>() {
return list.communities().iter().any(|c| c == to_match);
}
false
}
#[roto_method(rt, MutRotondaRoute, has_attribute)]
fn rr_has_attribute(rr: &MutRotondaRoute, to_match: u8) -> bool {
let rr = rr.borrow_mut();
rr.owned_map()
.iter()
.any(|pa| pa.ok().is_some_and(|pa| pa.type_code() == to_match))
}
#[roto_method(rt, MutRotondaRoute, fmt_prefix)]
fn rr_fmt_prefix(rr: &MutRotondaRoute) -> Arc<str> {
let rr = rr.borrow();
let prefix = match *rr {
RotondaRoute::Ipv4Unicast(n, ..) => n.prefix(),
RotondaRoute::Ipv6Unicast(n, ..) => n.prefix(),
RotondaRoute::Ipv4Multicast(n, ..) => n.prefix(),
RotondaRoute::Ipv6Multicast(n, ..) => n.prefix(),
};
prefix.to_string().into()
}
#[roto_method(rt, MutRotondaRoute, fmt_rov_status)]
fn rr_fmt_rov_status(rr: &MutRotondaRoute) -> Arc<str> {
let rr = rr.borrow();
match rr.rotonda_pamap().rpki_info().rov_status() {
RovStatus::NotChecked => "not-checked",
RovStatus::NotFound => "not-found",
RovStatus::Valid => "valid",
RovStatus::Invalid => "invalid",
}.into()
}
#[roto_method(rt, MutRotondaRoute, fmt_aspath)]
fn rr_fmt_aspath(rr: &MutRotondaRoute) -> Arc<str> {
let rr = rr.borrow_mut();
if let Some(hoppath) = rr.owned_map().get::<HopPath>() {
let Ok(as_path) = hoppath.to_as_path();
_fmt_aspath(as_path)
} else {
"".into()
}
}
#[roto_method(rt, MutRotondaRoute, fmt_aspath_origin)]
fn rr_fmt_aspath_origin(rr: &MutRotondaRoute) -> Arc<str> {
let rr = rr.borrow_mut();
if let Some(hoppath) = rr.owned_map().get::<HopPath>() {
let Ok(as_path) = hoppath.to_as_path();
_fmt_aspath_origin(as_path)
} else {
"".into()
}
}
#[roto_method(rt, MutRotondaRoute, fmt_communities)]
fn rr_fmt_communities(rr: &MutRotondaRoute) -> Arc<str> {
let rr = rr.borrow_mut();
if let Some(iter) = rr.owned_map().get::<StandardCommunitiesList>() {
iter.communities()
.iter()
.map(|c| c.to_string())
.collect::<Vec<_>>()
.join(", ")
.into()
} else {
"".into()
}
}
#[roto_method(rt, MutRotondaRoute, fmt_large_communities)]
fn rr_fmt_large_communities(rr: &MutRotondaRoute) -> Arc<str> {
let rr = rr.borrow_mut();
if let Some(iter) = rr.owned_map().get::<LargeCommunitiesList>() {
iter.communities()
.iter()
.map(|c| c.to_string())
.collect::<Vec<_>>()
.join(", ")
.into()
} else {
"".into()
}
}
#[roto_method(rt, BgpUpdateMessage<Bytes>, aspath_contains)]
fn bgp_aspath_contains(
msg: &BgpUpdateMessage<Bytes>,
to_match: Asn,
) -> bool {
aspath_contains(msg, to_match)
}
#[roto_method(rt, BgpUpdateMessage<Bytes>, match_aspath_origin)]
fn bgp_match_aspath_origin(
msg: &BgpUpdateMessage<Bytes>,
to_match: Asn,
) -> bool {
match_aspath_origin(msg, to_match)
}
#[roto_method(rt, BgpUpdateMessage<Bytes>, contains_community)]
fn bgp_contains_community(
msg: &BgpUpdateMessage<Bytes>,
to_match: &StandardCommunity,
) -> bool {
contains_community(msg, to_match)
}
#[roto_method(rt, BgpUpdateMessage<Bytes>, contains_large_community)]
fn bgp_contains_large_community(
msg: &BgpUpdateMessage<Bytes>,
to_match: &LargeCommunity,
) -> bool {
contains_large_community(msg, to_match)
}
#[roto_method(rt, BgpUpdateMessage<Bytes>, has_attribute)]
fn bgp_has_attribute(
msg: &BgpUpdateMessage<Bytes>,
to_match: u8,
) -> bool {
has_attribute(msg, to_match)
}
#[roto_method(rt, BgpUpdateMessage<Bytes>, announcements_count)]
fn bgp_announcements_count(msg: &BgpUpdateMessage<Bytes>) -> u32 {
announcements_count(msg)
}
#[roto_method(rt, BgpUpdateMessage<Bytes>, withdrawals_count)]
fn bgp_withdrawals_count(msg: &BgpUpdateMessage<Bytes>) -> u32 {
withdrawals_count(msg)
}
#[roto_method(rt, BgpUpdateMessage<Bytes>, fmt_aspath)]
fn bgp_fmt_aspath(msg: &BgpUpdateMessage<Bytes>) -> Arc<str> {
fmt_aspath(msg)
}
#[roto_method(rt, BgpUpdateMessage<Bytes>, fmt_aspath_origin)]
fn bgp_fmt_aspath_origin(
msg: &BgpUpdateMessage<Bytes>,
) -> Arc<str> {
fmt_aspath_origin(msg)
}
#[roto_method(rt, BgpUpdateMessage<Bytes>, fmt_communities)]
fn bgp_fmt_communities(msg: &BgpUpdateMessage<Bytes>) -> Arc<str> {
fmt_communities(msg)
}
#[roto_method(rt, BgpUpdateMessage<Bytes>, fmt_large_communities)]
fn bgp_fmt_large_communities(
msg: &BgpUpdateMessage<Bytes>,
) -> Arc<str> {
fmt_large_communities(msg)
}
#[roto_method(rt, BgpUpdateMessage<Bytes>, fmt_pcap)]
fn bgp_fmt_pcap(msg: &BgpUpdateMessage<Bytes>) -> Arc<str> {
fmt_pcap(msg.as_ref())
}
rt.register_clone_type_with_name::<BmpMsg<Bytes>>(
"BmpMsg",
"BMP Message",
)?;
rt.register_clone_type::<PerPeerHeader<Bytes>>("BMP Per Peer Header")?;
#[roto_method(rt, BmpMsg<Bytes>)]
fn is_ibgp(msg: &BmpMsg<Bytes>, asn: Asn) -> bool {
let asn_in_msg = match msg {
BmpMsg::RouteMonitoring(m) => m.per_peer_header().asn(),
BmpMsg::StatisticsReport(m) => m.per_peer_header().asn(),
BmpMsg::PeerDownNotification(m) => m.per_peer_header().asn(),
BmpMsg::PeerUpNotification(m) => m.per_peer_header().asn(),
BmpMsg::InitiationMessage(_) => return false,
BmpMsg::TerminationMessage(_) => return false,
BmpMsg::RouteMirroring(m) => m.per_peer_header().asn(),
};
asn == asn_in_msg
}
#[roto_method(rt, BmpMsg<Bytes>)]
fn is_route_monitoring(msg: &BmpMsg<Bytes>) -> bool {
matches!(msg, BmpMsg::RouteMonitoring(..))
}
#[roto_method(rt, BmpMsg<Bytes>)]
fn is_peer_down(msg: &BmpMsg<Bytes>) -> bool {
msg.msg_type() == BmpMsgType::PeerDownNotification
}
#[roto_method(rt, BmpMsg<Bytes>, aspath_contains)]
fn bmp_aspath_contains(msg: &BmpMsg<Bytes>, to_match: Asn) -> bool {
let update = if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
upd
} else {
return false;
}
} else {
return false;
};
aspath_contains(&update, to_match)
}
#[roto_method(rt, BmpMsg<Bytes>, match_aspath_origin)]
fn bmp_match_aspath_origin(
msg: &BmpMsg<Bytes>,
to_match: Asn,
) -> bool {
let update = if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
upd
} else {
return false;
}
} else {
return false;
};
match_aspath_origin(&update, to_match)
}
#[roto_method(rt, BmpMsg<Bytes>, contains_community)]
fn bmp_contains_community(
msg: &BmpMsg<Bytes>,
to_match: &StandardCommunity,
) -> bool {
let update = if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
upd
} else {
return false;
}
} else {
return false;
};
contains_community(&update, to_match)
}
#[roto_method(rt, BmpMsg<Bytes>, contains_large_community)]
fn bmp_contains_large_community(
msg: &BmpMsg<Bytes>,
to_match: &LargeCommunity,
) -> bool {
let update = if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
upd
} else {
return false;
}
} else {
return false;
};
contains_large_community(&update, to_match)
}
#[roto_method(rt, BmpMsg<Bytes>, has_attribute)]
fn bmp_has_attribute(msg: &BmpMsg<Bytes>, to_match: u8) -> bool {
let update = if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
upd
} else {
return false;
}
} else {
return false;
};
has_attribute(&update, to_match)
}
#[roto_method(rt, BmpMsg<Bytes>, announcements_count)]
fn bmp_announcements_count(msg: &BmpMsg<Bytes>) -> u32 {
if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
return announcements_count(&upd);
} else {
return 0;
}
};
0
}
#[roto_method(rt, BmpMsg<Bytes>, withdrawals_count)]
fn bmp_withdrawals_count(msg: &BmpMsg<Bytes>) -> u32 {
if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
return withdrawals_count(&upd);
} else {
return 0;
}
};
0
}
#[roto_method(rt, BmpMsg<Bytes>, fmt_aspath)]
fn bmp_fmt_aspath(msg: &BmpMsg<Bytes>) -> Arc<str> {
let update = if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
upd
} else {
return "".into();
}
} else {
return "".into();
};
fmt_aspath(&update)
}
#[roto_method(rt, BmpMsg<Bytes>, fmt_aspath_origin)]
fn bmp_fmt_aspath_origin(msg: &BmpMsg<Bytes>) -> Arc<str> {
let update = if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
upd
} else {
return "".into();
}
} else {
return "".into();
};
fmt_aspath_origin(&update)
}
#[roto_method(rt, BmpMsg<Bytes>, fmt_communities)]
fn bmp_fmt_communities(msg: &BmpMsg<Bytes>) -> Arc<str> {
let update = if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
upd
} else {
return "".into();
}
} else {
return "".into();
};
fmt_communities(&update)
}
#[roto_method(rt, BmpMsg<Bytes>, fmt_large_communities)]
fn bmp_fmt_large_communities(msg: &BmpMsg<Bytes>) -> Arc<str> {
let update = if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
upd
} else {
return "".into();
}
} else {
return "".into();
};
fmt_large_communities(&update)
}
#[roto_method(rt, BmpMsg<Bytes>, fmt_pcap)]
fn bmp_fmt_pcap(msg: &BmpMsg<Bytes>) -> Arc<str> {
fmt_pcap(msg.as_ref())
}
#[roto_method(rt, Log)]
fn log_prefix(stream: &Log, prefix: &Prefix) {
let mut stream = stream.borrow_mut();
stream.push(Output::Prefix(*prefix));
}
#[roto_method(rt, Log, log_matched_asn)]
fn log_asn(stream: &Log, asn: Asn) {
let mut stream = stream.borrow_mut();
stream.push(Output::Asn(asn));
}
#[roto_method(rt, Log, log_matched_origin)]
fn log_origin(stream: &Log, origin: Asn) {
let mut stream = stream.borrow_mut();
stream.push(Output::Origin(origin));
}
#[roto_method(rt, Log, log_matched_community)]
fn log_community(stream: &Log, community: &StandardCommunity) {
let mut stream = stream.borrow_mut();
stream.push(Output::Community(community.to_u32()));
}
#[roto_method(rt, Log)]
fn log_peer_down(stream: &Log) {
let mut stream = stream.borrow_mut();
stream.push(Output::PeerDown);
}
#[roto_method(rt, Log)]
fn log_custom(stream: &Log, id: u32, local: u32) {
let mut stream = stream.borrow_mut();
stream.push(Output::Custom((id, local)));
}
#[roto_method(rt, Log)]
fn print(stream: &Log, msg: &Arc<str>) {
let stream = stream.borrow();
stream.print(msg);
}
#[roto_method(rt, Log)]
fn entry(stream: &Log) -> MutLogEntry {
let mut stream = stream.borrow_mut();
stream.entry()
}
#[roto_method(rt, MutLogEntry)]
fn custom(entry_ptr: &MutLogEntry, custom_msg: &Arc<str>) {
let mut entry = entry_ptr.borrow_mut();
entry.custom = Some(custom_msg.to_string());
}
#[roto_method(rt, MutLogEntry)]
fn origin_as(
entry_ptr: &MutLogEntry,
msg: &BmpMsg<Bytes>,
) -> MutLogEntry {
let mut entry = entry_ptr.borrow_mut();
if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
if let Some(asn) = upd
.aspath()
.ok()
.flatten()
.and_then(|asp| asp.origin())
.and_then(|asp| asp.try_into_asn().ok())
{
entry.origin_as = Some(asn);
}
}
}
entry_ptr.clone()
}
#[roto_method(rt, MutLogEntry)]
fn peer_as(
entry_ptr: &MutLogEntry,
msg: &BmpMsg<Bytes>,
) -> MutLogEntry {
let mut entry = entry_ptr.borrow_mut();
if let BmpMsg::RouteMonitoring(rm) = msg {
let asn = rm.per_peer_header().asn();
entry.peer_as = Some(asn);
}
entry_ptr.clone()
}
#[roto_method(rt, MutLogEntry)]
fn as_path_hops(
entry_ptr: &MutLogEntry,
msg: &BmpMsg<Bytes>,
) -> MutLogEntry {
let mut entry = entry_ptr.borrow_mut();
if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
let cnt =
upd.aspath().ok().flatten().map(|asp| asp.hops().count());
entry.as_path_hops = cnt;
}
}
entry_ptr.clone()
}
#[roto_method(rt, MutLogEntry)]
fn conventional_reach(
entry_ptr: &MutLogEntry,
msg: &BmpMsg<Bytes>,
) -> MutLogEntry {
let mut entry = entry_ptr.borrow_mut();
if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
let cnt = upd
.conventional_announcements()
.ok()
.map(|iter| iter.count())
.unwrap_or(0);
entry.conventional_reach = cnt;
}
}
entry_ptr.clone()
}
#[roto_method(rt, MutLogEntry)]
fn conventional_unreach(
entry_ptr: &MutLogEntry,
msg: &BmpMsg<Bytes>,
) -> MutLogEntry {
let mut entry = entry_ptr.borrow_mut();
if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
let cnt = upd
.conventional_withdrawals()
.ok()
.map(|iter| iter.count())
.unwrap_or(0);
entry.conventional_unreach = cnt;
}
}
entry_ptr.clone()
}
#[roto_method(rt, MutLogEntry)]
fn mp_reach(
entry_ptr: &MutLogEntry,
msg: &BmpMsg<Bytes>,
) -> MutLogEntry {
let mut entry = entry_ptr.borrow_mut();
if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
if let Some(iter) = upd.mp_announcements().ok().flatten() {
entry.mp_reach_afisafi = Some(iter.afi_safi());
entry.mp_reach = Some(iter.count());
}
}
}
entry_ptr.clone()
}
#[roto_method(rt, MutLogEntry)]
fn mp_unreach(
entry_ptr: &MutLogEntry,
msg: &BmpMsg<Bytes>,
) -> MutLogEntry {
let mut entry = entry_ptr.borrow_mut();
if let BmpMsg::RouteMonitoring(rm) = msg {
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
if let Some(iter) = upd.mp_withdrawals().ok().flatten() {
entry.mp_unreach_afisafi = Some(iter.afi_safi());
entry.mp_unreach = Some(iter.count());
}
}
}
entry_ptr.clone()
}
#[roto_method(rt, MutLogEntry)]
fn log_all(
entry_ptr: &MutLogEntry,
msg: &BmpMsg<Bytes>,
) -> MutLogEntry {
let mut entry = entry_ptr.borrow_mut();
if let BmpMsg::RouteMonitoring(rm) = msg {
let asn = rm.per_peer_header().asn();
entry.peer_as = Some(asn);
if let Ok(upd) = rm.bgp_update(&SessionConfig::modern()) {
if let Some(asp) = upd.aspath().ok().flatten() {
entry.as_path_hops = Some(asp.hops().count());
entry.origin_as = asp
.hops()
.last()
.and_then(|h| (h).try_into_asn().ok());
}
entry.conventional_reach = upd
.conventional_announcements()
.ok()
.map(|iter| iter.count())
.unwrap_or(0);
entry.conventional_unreach = upd
.conventional_withdrawals()
.ok()
.map(|iter| iter.count())
.unwrap_or(0);
if let Some(iter) = upd.mp_announcements().ok().flatten() {
entry.mp_reach_afisafi = Some(iter.afi_safi());
entry.mp_reach = Some(iter.count());
}
if let Some(iter) = upd.mp_withdrawals().ok().flatten() {
entry.mp_unreach_afisafi = Some(iter.afi_safi());
entry.mp_unreach = Some(iter.count());
}
}
}
entry_ptr.clone()
}
#[roto_method(rt, Log)]
fn write_entry(stream: &Log) {
let mut stream = stream.borrow_mut();
let entry = stream.take_entry();
let entry = Rc::unwrap_or_clone(entry).into_inner();
stream.push(Output::Entry(entry));
}
rt.register_copy_type::<RovStatus>("ROV status of a `Route`").unwrap();
#[roto_method(rt, RovStatus)]
fn is_valid(status: &RovStatus) -> bool {
*status == RovStatus::Valid
}
#[roto_method(rt, RovStatus)]
fn is_invalid(status: &RovStatus) -> bool {
*status == RovStatus::Invalid
}
#[roto_method(rt, RovStatus)]
fn is_not_found(status: &RovStatus) -> bool {
*status == RovStatus::NotFound
}
#[roto_method(rt, SharedRtrCache)]
fn check_rov(rpki: &SharedRtrCache, rr: &MutRotondaRoute) -> RovStatus {
let mut rr = rr.borrow_mut();
let prefix = match *rr {
RotondaRoute::Ipv4Unicast(nlri, _) => nlri.prefix(),
RotondaRoute::Ipv6Unicast(nlri, _) => nlri.prefix(),
_=> { return RovStatus::NotChecked ; } };
let mut covered = false;
let mut valid = false;
if let Some(hoppath) = rr.owned_map().get::<HopPath>() {
if let Some(origin) = hoppath.origin()
.and_then(|o| Hop::try_into_asn(o.clone()).ok())
{
let match_options = MatchOptions {
match_type: MatchType::LongestMatch,
include_withdrawn: false,
include_less_specifics: true,
include_more_specifics: false,
mui: None,
include_history: IncludeHistory::None,
};
let guard = &rotonda_store::epoch::pin();
let res = match rpki.vrps.match_prefix(
&prefix,
&match_options,
guard
) {
Ok(res) => res,
Err(e) => {
warn!("could not lookup {}: {}", &prefix, e);
return RovStatus::NotChecked;
}
};
covered = !res.records.is_empty();
'outer: for r in &res.records {
for maxlen in r.meta.iter() {
#[allow(clippy::collapsible_if)]
if prefix.len() <= *maxlen {
if r.multi_uniq_id == u32::from(origin) {
valid = true;
break 'outer;
}
}
}
}
if !valid {
if let Some(less_specifics) = res.less_specifics {
'outer: for r in less_specifics.iter() {
for record in r.meta.iter() {
for maxlen in record.meta.iter() {
#[allow(clippy::collapsible_if)]
if prefix.len() <= *maxlen {
if record.multi_uniq_id == u32::from(origin) {
valid = true;
break 'outer;
}
}
}
}
}
}
}
}
}
let rov = match (covered, valid) {
(true, true) => RovStatus::Valid,
(true, false) => RovStatus::Invalid,
(false, true) => unreachable!(),
(false, false) => RovStatus::NotFound,
};
rr.rotonda_pamap_mut().set_rpki_info(rov.into());
rov
}
rt.register_constant(
"NO_EXPORT",
"The well-known NO_EXPORT community (RFC1997)",
StandardCommunity::from_wellknown(Wellknown::NoExport),
)?;
rt.register_constant(
"NO_ADVERTISE",
"The well-known NO_ADVERTISE community (RFC1997)",
StandardCommunity::from_wellknown(Wellknown::NoAdvertise),
)?;
rt.register_constant(
"NO_EXPORT_SUBCONFED",
"The well-known NO_EXPORT_SUBCONFED community (RFC1997)",
StandardCommunity::from_wellknown(Wellknown::NoExportSubconfed),
)?;
rt.register_constant(
"NO_PEER",
"The well-known NO_PEER community (RFC3765)",
StandardCommunity::from_wellknown(Wellknown::NoPeer),
)?;
Ok(rt)
}
fn has_attribute(bgp_update: &BgpUpdateMessage<Bytes>, to_match: u8) -> bool {
if let Ok(mut pas) = bgp_update.path_attributes() {
pas.any(|p| p.ok().is_some_and(|p| p.type_code() == to_match))
} else {
false
}
}
fn contains_community(
bgp_update: &BgpUpdateMessage<Bytes>,
to_match: &StandardCommunity,
) -> bool {
if let Some(mut iter) = bgp_update.communities().ok().flatten() {
iter.any(|c| c == *to_match)
} else {
false
}
}
fn contains_large_community(
bgp_update: &BgpUpdateMessage<Bytes>,
to_match: &LargeCommunity,
) -> bool {
if let Some(mut iter) = bgp_update.large_communities().ok().flatten() {
iter.any(|c| c == *to_match)
} else {
false
}
}
fn aspath_contains(
bgp_update: &BgpUpdateMessage<Bytes>,
to_match: Asn,
) -> bool {
if let Some(aspath) = bgp_update.aspath().ok().flatten() {
aspath.hops().any(|h| h == to_match.into())
} else {
false
}
}
fn match_aspath_origin(
bgp_update: &BgpUpdateMessage<Bytes>,
to_match: Asn,
) -> bool {
if let Some(aspath) = bgp_update.aspath().ok().flatten() {
aspath.origin() == Some(to_match.into())
} else {
false
}
}
fn announcements_count(bgp_update: &BgpUpdateMessage<Bytes>) -> u32 {
if let Ok(iter) = bgp_update.announcements() {
iter.count().try_into().unwrap_or(u32::MAX)
} else {
0
}
}
fn withdrawals_count(bgp_update: &BgpUpdateMessage<Bytes>) -> u32 {
if let Ok(iter) = bgp_update.withdrawals() {
iter.count().try_into().unwrap_or(u32::MAX)
} else {
0
}
}
fn fmt_aspath(bgp_update: &BgpUpdateMessage<Bytes>) -> Arc<str> {
if let Some(aspath) = bgp_update.aspath().ok().flatten() {
_fmt_aspath(aspath)
} else {
"".into()
}
}
fn _fmt_aspath(aspath: AsPath<Bytes>) -> Arc<str> {
if let Ok(mut asns) = aspath.try_single_sequence_iter() {
let mut res = String::new();
if let Some(asn) = asns.next() {
res.push_str(&format!("{}", asn.into_u32()));
}
for asn in asns {
res.push_str(&format!(" {}", asn.into_u32()));
}
res.into()
} else {
aspath.to_string().into()
}
}
fn fmt_aspath_origin(bgp_update: &BgpUpdateMessage<Bytes>) -> Arc<str> {
if let Some(asp) = bgp_update.aspath().ok().flatten() {
_fmt_aspath_origin(asp)
} else {
"".into()
}
}
fn _fmt_aspath_origin(aspath: AsPath<Bytes>) -> Arc<str> {
if let Some(asn) = aspath.origin().and_then(|a| Asn::try_from(a).ok()) {
asn.to_string().into()
} else {
"".into()
}
}
fn fmt_communities(bgp_update: &BgpUpdateMessage<Bytes>) -> Arc<str> {
if let Some(iter) = bgp_update.communities().ok().flatten() {
iter.map(|c| c.to_string())
.collect::<Vec<_>>()
.join(", ")
.into()
} else {
"".into()
}
}
fn fmt_large_communities(bgp_update: &BgpUpdateMessage<Bytes>) -> Arc<str> {
if let Some(iter) = bgp_update.large_communities().ok().flatten() {
iter.map(|c| c.to_string())
.collect::<Vec<_>>()
.join(", ")
.into()
} else {
"".into()
}
}
fn fmt_pcap(buf: impl AsRef<[u8]>) -> Arc<str> {
let mut res = String::with_capacity(7 + buf.as_ref().len());
res.push_str("000000 ");
for b in buf.as_ref() {
res.push_str(&format!("{:02x} ", b));
}
res.into()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn packaged_roto_script() {
use crate::units::bgp_tcp_in::unit::RotoFunc as BgpInFunc;
use crate::units::bmp_tcp_in::unit::RotoFunc as BmpInFunc;
use crate::units::rib_unit::unit::RotoFuncPre as RibInPreFunc;
let roto_script = "etc/examples/filters.roto.example";
let i = roto::FileTree::single_file(roto_script);
let mut c = i.compile(create_runtime().unwrap())
.inspect_err(|e| eprintln!("{e}"))
.unwrap();
let _: BgpInFunc = c.get_function("bgp_in").unwrap();
let _: BmpInFunc = c.get_function("bmp_in").unwrap();
let _: RibInPreFunc = c.get_function("rib_in_pre").unwrap();
}
}