use std::cmp;
use std::collections::hash_map;
use std::collections::HashMap;
use std::sync::Arc;
use crossbeam_queue::SegQueue;
use log::{info, warn};
use rpki::uri;
use rpki::crypto::keys::KeyIdentifier;
use rpki::repository::aspa::AsProviderAttestation;
use rpki::repository::cert::{Cert, ResourceCert};
use rpki::repository::resources::{
AsBlock, AsBlocks, IpBlock, IpBlocks, IpBlocksBuilder,
};
use rpki::repository::roa::RouteOriginAttestation;
use rpki::repository::tal::{Tal, TalUri};
use rpki::repository::x509::{Time, Validity};
use rpki::resources::{Asn, Prefix, SmallAsnSet};
use rpki::rtr::payload::{Aspa, RouteOrigin, RouterKey};
use rpki::rtr::pdu::{ProviderAsns, RouterKeyInfo};
use crate::config::{Config, FilterPolicy};
use crate::engine::{CaCert, Engine, ProcessPubPoint, ProcessRun};
use crate::error::{Failed, RunFailed};
use crate::metrics::{Metrics, PayloadMetrics, VrpMetrics};
use crate::slurm::LocalExceptions;
use super::info::{PayloadInfo, PublishInfo};
use super::snapshot::PayloadSnapshot;
#[derive(Debug)]
pub struct ValidationReport {
pub_points: SegQueue<PubPoint>,
rejected: RejectedResourcesBuilder,
log_rejected: bool,
enable_bgpsec: bool,
enable_aspa: bool,
limit_v4_len: Option<u8>,
limit_v6_len: Option<u8>,
unsafe_vrps: FilterPolicy,
}
impl ValidationReport {
pub fn new(config: &Config) -> Self {
ValidationReport {
pub_points: Default::default(),
rejected: Default::default(),
log_rejected: config.unsafe_vrps.log(),
enable_bgpsec: config.enable_bgpsec,
enable_aspa: config.enable_aspa,
limit_v4_len: config.limit_v4_len,
limit_v6_len: config.limit_v6_len,
unsafe_vrps: config.unsafe_vrps,
}
}
pub fn process(
engine: &Engine, config: &Config, initial: bool,
) -> Result<(Self, Metrics), RunFailed> {
let report = Self::new(config);
let mut run = engine.start(&report, initial)?;
run.process()?;
run.cleanup()?;
let metrics = run.done();
Ok((report, metrics))
}
pub fn into_snapshot(
self,
exceptions: &LocalExceptions,
metrics: &mut Metrics,
) -> PayloadSnapshot {
let mut builder = SnapshotBuilder::new(
self.rejected.finalize(), self.unsafe_vrps,
exceptions,
);
while let Some(point) = self.pub_points.pop() {
builder.process_pub_point(point, metrics)
}
builder.finalize(metrics)
}
}
impl<'a> ProcessRun for &'a ValidationReport {
type PubPoint = PubPointProcessor<'a>;
fn process_ta(
&self,
_tal: &Tal, _uri: &TalUri, cert: &CaCert,
tal_index: usize,
) -> Result<Option<Self::PubPoint>, Failed> {
Ok(Some(
PubPointProcessor {
report: self,
pub_point: PubPoint::new_ta(cert, tal_index),
validity: cert.cert().validity(),
point_stale: cert.cert().validity().not_after(),
}
))
}
}
#[derive(Clone, Debug)]
pub struct PubPointProcessor<'a> {
report: &'a ValidationReport,
pub_point: PubPoint,
validity: Validity,
point_stale: Time,
}
impl ProcessPubPoint for PubPointProcessor<'_> {
fn repository_index(&mut self, repository_index: usize) {
self.pub_point.repository_index = Some(repository_index)
}
fn point_validity(&mut self, manifest: Validity, stale: Time) {
self.pub_point.refresh = cmp::min(
self.pub_point.refresh,
cmp::min(manifest.not_after(), stale)
);
self.validity = self.validity.trim(manifest);
self.point_stale = cmp::min(self.point_stale, stale);
}
fn want(&self, _uri: &uri::Rsync) -> Result<bool, Failed> {
Ok(true)
}
fn process_ca(
&mut self, _uri: &uri::Rsync, cert: &CaCert,
) -> Result<Option<Self>, Failed> {
Ok(Some(
PubPointProcessor {
report: self.report,
pub_point: PubPoint::new_ca(&self.pub_point, cert),
validity: self.validity.trim(cert.cert().validity()),
point_stale: cmp::min(
self.point_stale, cert.cert().validity().not_after()
),
}
))
}
fn process_router_cert(
&mut self, uri: &uri::Rsync, cert: Cert, ca_cert: &CaCert,
) -> Result<(), Failed> {
if !self.report.enable_bgpsec {
return Ok(())
}
if
cert.as_resources().is_inherited()
|| !cert.as_resources().is_present()
{
warn!(
"{uri}: router certificate does not contain AS resources."
);
return Ok(())
}
let asns = match cert.as_resources().to_blocks() {
Ok(blocks) => blocks,
Err(_) => {
warn!(
"{uri}: router certificate contains invalid AS resources."
);
return Ok(())
}
};
let id = cert.subject_key_identifier();
let key = cert.subject_public_key_info();
if !key.allow_router_cert() {
warn!(
"{uri}: router certificate has invalid key algorithm."
);
return Ok(())
}
let key = match RouterKeyInfo::new(key.to_info_bytes()) {
Ok(key) => key,
Err(_) => {
warn!(
"{uri}: excessively large key in router certificate."
);
return Ok(())
}
};
self.pub_point.update_refresh(cert.validity().not_after());
self.pub_point.add_router_key(
asns, id, key,
Arc::new(PublishInfo::router_cert(
&cert, uri, ca_cert.cert().tal().clone(),
self.validity, self.point_stale,
)),
);
Ok(())
}
fn process_roa(
&mut self,
_uri: &uri::Rsync,
cert: ResourceCert,
route: RouteOriginAttestation
) -> Result<(), Failed> {
if self.pub_point.add_roa(
route,
Arc::new(PublishInfo::signed_object(
&cert, self.validity, self.point_stale
)),
self.report.limit_v4_len, self.report.limit_v6_len,
) {
self.pub_point.update_refresh(cert.validity().not_after());
}
Ok(())
}
fn process_aspa(
&mut self,
_uri: &uri::Rsync,
cert: ResourceCert,
aspa: AsProviderAttestation
) -> Result<(), Failed> {
if !self.report.enable_aspa {
return Ok(())
}
self.pub_point.update_refresh(cert.validity().not_after());
self.pub_point.add_aspa(
aspa,
Arc::new(PublishInfo::signed_object(
&cert, self.validity, self.point_stale
))
);
Ok(())
}
fn restart(&mut self) -> Result<(), Failed> {
self.pub_point.restart();
Ok(())
}
fn commit(self) {
if !self.pub_point.is_empty() {
self.report.pub_points.push(self.pub_point);
}
}
fn cancel(self, cert: &CaCert) {
if self.report.log_rejected {
warn!(
"CA for {} rejected, resources marked as unsafe:",
cert.ca_repository()
);
for block in cert.cert().v4_resources().iter() {
warn!(" {}", block.display_v4());
}
for block in cert.cert().v6_resources().iter() {
warn!(" {}", block.display_v6());
}
for block in cert.cert().as_resources().iter() {
warn!(" {block}");
}
}
self.report.rejected.extend_from_cert(cert);
}
}
#[derive(Clone, Debug)]
pub struct PubPoint {
origins: Vec<PubRouteOrigin>,
router_keys: Vec<PubRouterKey>,
aspas: Vec<PubAspa>,
refresh: Time,
orig_refresh: Time,
tal_index: usize,
repository_index: Option<usize>,
}
impl PubPoint {
fn new(refresh: Time, tal_index: usize) -> Self {
PubPoint {
origins: Vec::new(),
router_keys: Vec::new(),
aspas: Vec::new(),
refresh,
orig_refresh: refresh,
tal_index,
repository_index: None,
}
}
fn new_ta(cert: &CaCert, tal_index: usize) -> Self {
Self::new(cert.cert().validity().not_after(), tal_index)
}
fn new_ca(parent: &PubPoint, cert: &CaCert) -> Self {
Self::new(
cmp::min(
parent.refresh, cert.cert().validity().not_after()
),
parent.tal_index,
)
}
pub fn is_empty(&self) -> bool {
self.origins.is_empty()
&& self.router_keys.is_empty()
&& self.aspas.is_empty()
}
fn update_refresh(&mut self, refresh: Time) {
self.refresh = cmp::min(self.refresh, refresh)
}
fn restart(&mut self) {
self.origins.clear();
self.router_keys.clear();
self.aspas.clear();
self.refresh = self.orig_refresh;
}
fn add_roa(
&mut self,
roa: RouteOriginAttestation,
info: Arc<PublishInfo>,
limit_v4_len: Option<u8>,
limit_v6_len: Option<u8>,
) -> bool {
let mut any = false;
for origin in roa.iter_origins() {
let limit = if origin.prefix.prefix().is_v4() {
limit_v4_len
}
else {
limit_v6_len
};
if let Some(limit) = limit {
if origin.prefix.prefix().len() > limit {
continue;
}
}
self.origins.push(PubRouteOrigin { origin, info: info.clone() });
any = true;
}
any
}
fn add_router_key(
&mut self,
asns: AsBlocks,
key_id: KeyIdentifier,
key_info: RouterKeyInfo,
info: Arc<PublishInfo>,
) {
self.router_keys.push(
PubRouterKey { asns, key_id, key_info, info }
);
}
fn add_aspa(
&mut self,
aspa: AsProviderAttestation,
info: Arc<PublishInfo>,
) {
self.aspas.push(
PubAspa {
customer: aspa.customer_as(),
providers: aspa.provider_as_set().to_set(),
info
}
)
}
}
#[derive(Clone, Debug)]
pub struct PubRouteOrigin {
pub origin: RouteOrigin,
pub info: Arc<PublishInfo>,
}
#[derive(Clone, Debug)]
pub struct PubRouterKey {
pub asns: AsBlocks,
pub key_id: KeyIdentifier,
pub key_info: RouterKeyInfo,
pub info: Arc<PublishInfo>,
}
#[derive(Clone, Debug)]
pub struct PubAspa {
pub customer: Asn,
pub providers: SmallAsnSet,
pub info: Arc<PublishInfo>,
}
#[derive(Clone, Debug)]
pub struct RejectedResources {
v4: IpBlocks,
v6: IpBlocks,
}
impl RejectedResources {
pub fn keep_prefix(&self, prefix: Prefix) -> bool {
let raw = rpki::repository::resources::Prefix::new(
prefix.addr(), prefix.len()
);
if prefix.is_v4() {
!self.v4.intersects_block(raw)
}
else {
!self.v6.intersects_block(raw)
}
}
}
#[derive(Debug, Default)]
struct RejectedResourcesBuilder {
addrs: SegQueue<(bool, IpBlock)>,
asns: SegQueue<AsBlock>,
}
impl RejectedResourcesBuilder {
fn extend_from_cert(&self, cert: &CaCert) {
for block in cert.cert().v4_resources().iter().filter(|block|
!block.is_slash_zero()
) {
self.addrs.push((true, block));
}
for block in cert.cert().v6_resources().iter().filter(|block|
!block.is_slash_zero()
) {
self.addrs.push((false, block));
}
for block in cert.cert().as_resources().iter().filter(|block|
!block.is_whole_range()
) {
self.asns.push(block)
}
}
fn finalize(self) -> RejectedResources {
let mut v4 = IpBlocksBuilder::new();
let mut v6 = IpBlocksBuilder::new();
while let Some((is_v4, block)) = self.addrs.pop() {
if is_v4 {
v4.push(block);
}
else {
v6.push(block);
}
}
RejectedResources {
v4: v4.finalize(),
v6: v6.finalize(),
}
}
}
struct SnapshotBuilder<'a> {
origins: HashMap<RouteOrigin, PayloadInfo>,
router_keys: HashMap<RouterKey, PayloadInfo>,
aspas: HashMap<Asn, (SmallAsnSet, PayloadInfo)>,
rejected: RejectedResources,
unsafe_vrps: FilterPolicy,
unsafe_vrps_present: bool,
refresh: Option<Time>,
exceptions: &'a LocalExceptions,
}
impl<'a> SnapshotBuilder<'a> {
fn new(
rejected: RejectedResources,
unsafe_vrps: FilterPolicy,
exceptions: &'a LocalExceptions,
) -> Self {
Self {
origins: Default::default(),
router_keys: Default::default(),
aspas: Default::default(),
rejected,
unsafe_vrps,
unsafe_vrps_present: false,
refresh: None,
exceptions,
}
}
fn process_pub_point(
&mut self, point: PubPoint, metrics: &mut Metrics
) {
let mut metrics = AllVrpMetrics::new(
metrics, point.tal_index, point.repository_index,
);
self.update_refresh(point.refresh);
point.origins.into_iter().for_each(|item| {
self.process_origin(item, &mut metrics)
});
point.router_keys.into_iter().for_each(|item| {
self.process_key(item, &mut metrics)
});
point.aspas.into_iter().for_each(|item| {
self.process_aspa(item, &mut metrics)
});
}
fn update_refresh(&mut self, refresh: Time) {
self.refresh = match self.refresh {
Some(old) => Some(cmp::min(old, refresh)),
None => Some(refresh)
}
}
fn process_origin(
&mut self, origin: PubRouteOrigin, metrics: &mut AllVrpMetrics,
) {
let v4 = origin.origin.is_v4();
metrics.update_origin(v4, |m| m.valid += 1);
if !self.rejected.keep_prefix(origin.origin.prefix.prefix()) {
self.unsafe_vrps_present = true;
match self.unsafe_vrps {
FilterPolicy::Accept => {
}
FilterPolicy::Warn => {
metrics.update_origin(v4, |m| m.marked_unsafe += 1);
info!(
"Encountered potentially unsafe VRP \
({}/{}-{}, {})",
origin.origin.prefix.addr(),
origin.origin.prefix.prefix_len(),
origin.origin.prefix.resolved_max_len(),
origin.origin.asn
);
}
FilterPolicy::Reject => {
metrics.update_origin(v4, |m| m.marked_unsafe += 1);
warn!(
"Filtering potentially unsafe VRP \
({}/{}-{}, {})",
origin.origin.prefix.addr(),
origin.origin.prefix.prefix_len(),
origin.origin.prefix.resolved_max_len(),
origin.origin.asn
);
return
}
}
}
if self.exceptions.drop_origin(origin.origin) {
metrics.update_origin(v4, |m| m.locally_filtered += 1);
return
}
match self.origins.entry(origin.origin) {
hash_map::Entry::Vacant(entry) => {
entry.insert(origin.info.into());
metrics.update_origin(v4, |m| m.contributed += 1);
}
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().add_published(origin.info);
metrics.update_origin(v4, |m| m.duplicate += 1);
}
}
}
fn process_key(
&mut self, key: PubRouterKey, metrics: &mut AllVrpMetrics,
) {
metrics.update(|m| m.router_keys.valid += key.asns.asn_count());
for asn in key.asns.iter_asns() {
let router_key = RouterKey::new(
key.key_id, asn, key.key_info.clone()
);
if self.exceptions.drop_router_key(&router_key) {
metrics.update(|m| m.router_keys.locally_filtered += 1);
continue
}
match self.router_keys.entry(router_key) {
hash_map::Entry::Vacant(entry) => {
entry.insert(key.info.clone().into());
metrics.update(|m| m.router_keys.contributed += 1);
}
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().add_published(key.info.clone());
metrics.update(|m| m.router_keys.duplicate += 1);
}
}
}
}
fn process_aspa(&mut self, aspa: PubAspa, metrics: &mut AllVrpMetrics) {
metrics.update(|m| m.aspas.valid += 1);
match self.aspas.entry(aspa.customer) {
hash_map::Entry::Vacant(entry) => {
entry.insert((aspa.providers, aspa.info.into()));
metrics.update(|m| m.aspas.contributed += 1);
}
hash_map::Entry::Occupied(mut entry) => {
let entry = entry.get_mut();
entry.0 = entry.0.union(&aspa.providers).collect();
entry.1.add_published(aspa.info);
metrics.update(|m| m.aspas.duplicate += 1);
}
}
}
fn finalize(mut self, metrics: &mut Metrics) -> PayloadSnapshot {
if self.unsafe_vrps_present && self.unsafe_vrps.log() {
warn!(
"For more information on unsafe VRPs, see \
https://routinator.docs.nlnetlabs.nl\
/en/stable/unsafe-vrps.html"
);
}
self.insert_assertions(metrics);
metrics.finalize();
self.into_snapshot(metrics)
}
fn insert_assertions(&mut self, metrics: &mut Metrics) {
for (origin, info) in self.exceptions.origin_assertions() {
match self.origins.entry(origin) {
hash_map::Entry::Vacant(entry) => {
entry.insert(info.into());
if origin.is_v4() {
metrics.local.v4_origins.contributed += 1;
metrics.snapshot.payload.v4_origins.contributed += 1;
}
else {
metrics.local.v6_origins.contributed += 1;
metrics.snapshot.payload.v6_origins.contributed += 1;
}
}
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().add_local(info);
if origin.is_v4() {
metrics.local.v4_origins.duplicate += 1;
metrics.snapshot.payload.v4_origins.duplicate += 1;
}
else {
metrics.local.v6_origins.duplicate += 1;
metrics.snapshot.payload.v6_origins.duplicate += 1;
}
}
}
}
for (key, info) in self.exceptions.router_key_assertions() {
match self.router_keys.entry(key) {
hash_map::Entry::Vacant(entry) => {
entry.insert(info.into());
metrics.local.router_keys.contributed += 1;
metrics.snapshot.payload.router_keys.contributed += 1;
}
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().add_local(info);
metrics.local.router_keys.duplicate += 1;
metrics.snapshot.payload.router_keys.duplicate += 1;
}
}
}
}
fn into_snapshot(self, metrics: &mut Metrics) -> PayloadSnapshot {
PayloadSnapshot::new(
self.origins.into_iter(),
self.router_keys.into_iter(),
self.aspas.into_iter().filter_map(
|(customer, (providers, info))| {
match ProviderAsns::try_from_iter(providers.iter()) {
Ok(providers) => {
Some((Aspa::new(customer, providers), info))
}
Err(_) => {
warn!(
"Ignoring excessively large ASPA for {} \
with {} provider ASNs.",
customer, providers.len()
);
metrics.snapshot.large_aspas += 1;
None
}
}
}
),
self.refresh,
)
}
}
pub struct AllVrpMetrics<'a> {
tal: &'a mut PayloadMetrics,
repo: Option<&'a mut PayloadMetrics>,
all: &'a mut PayloadMetrics,
}
impl<'a> AllVrpMetrics<'a> {
pub fn new(
metrics: &'a mut Metrics, tal_index: usize, repo_index: Option<usize>,
) -> Self {
AllVrpMetrics {
tal: &mut metrics.tals[tal_index].payload,
repo: match repo_index {
Some(index) => Some(&mut metrics.repositories[index].payload),
None => None
},
all: &mut metrics.snapshot.payload,
}
}
pub fn update(&mut self, op: impl Fn(&mut PayloadMetrics)) {
op(self.tal);
if let Some(ref mut repo) = self.repo {
op(repo)
}
op(self.all)
}
pub fn update_origin(&mut self, v4: bool, op: impl Fn(&mut VrpMetrics)) {
self.update(|metrics| {
if v4 {
op(&mut metrics.v4_origins)
}
else {
op(&mut metrics.v6_origins)
}
})
}
}