use std::{cmp, ops};
use std::collections::hash_map;
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::Hash;
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime};
use chrono::{DateTime, Utc};
use crossbeam_queue::SegQueue;
use log::{info, warn};
use routecore::addr;
use routecore::bgpsec::KeyIdentifier;
use rpki::repository::cert::{Cert, ResourceCert};
use rpki::repository::resources::{AsBlocks, IpBlock, IpBlocks, IpBlocksBuilder};
use rpki::repository::roa::RouteOriginAttestation;
use rpki::repository::tal::{Tal, TalInfo, TalUri};
use rpki::repository::x509::{Time, Validity};
use rpki::rtr::payload::{Action, Payload, RouteOrigin, RouterKey, Timing};
use rpki::rtr::pdu::RouterKeyInfo;
use rpki::rtr::server::{PayloadDiff, PayloadSet, PayloadSource};
use rpki::rtr::state::{Serial, State};
use rpki::uri;
use crate::config::{Config, FilterPolicy};
use crate::engine::{CaCert, Engine, ProcessPubPoint, ProcessRun};
use crate::error::Failed;
use crate::metrics::{Metrics, VrpMetrics};
use crate::slurm::{ExceptionInfo, LocalExceptions};
#[derive(Debug)]
pub struct ValidationReport {
pub_points: SegQueue<PubPoint>,
rejected: RejectedResourcesBuilder,
log_rejected: bool,
enable_bgpsec: bool,
limit_v4_len: Option<u8>,
limit_v6_len: Option<u8>,
}
impl ValidationReport {
pub fn new(config: &Config) -> Self {
ValidationReport {
pub_points: Default::default(),
rejected: Default::default(),
log_rejected: config.unsafe_vrps.log(),
enable_bgpsec: config.enable_bgpsec,
limit_v4_len: config.limit_v4_len,
limit_v6_len: config.limit_v6_len,
}
}
pub fn process(
engine: &Engine, config: &Config,
) -> Result<(Self, Metrics), Failed> {
let report = Self::new(config);
let mut run = engine.start(&report)?;
run.process()?;
run.cleanup()?;
let metrics = run.done();
Ok((report, metrics))
}
}
impl<'a> ProcessRun for &'a ValidationReport {
type PubPoint = PubPointProcessor<'a>;
fn process_ta(
&self,
_tal: &Tal, _uri: &TalUri, cert: &CaCert,
tal_index: usize,
) -> Result<Option<Self::PubPoint>, Failed> {
Ok(Some(
PubPointProcessor {
report: self,
pub_point: PubPoint::new_ta(cert, tal_index),
validity: cert.combined_validity(),
}
))
}
}
#[derive(Clone, Debug)]
pub struct PubPointProcessor<'a> {
report: &'a ValidationReport,
pub_point: PubPoint,
validity: Validity,
}
impl<'a> ProcessPubPoint for PubPointProcessor<'a> {
fn repository_index(&mut self, repository_index: usize) {
self.pub_point.repository_index = Some(repository_index)
}
fn update_refresh(&mut self, not_after: Time) {
self.pub_point.refresh = cmp::min(
self.pub_point.refresh, not_after
);
}
fn want(&self, _uri: &uri::Rsync) -> Result<bool, Failed> {
Ok(true)
}
fn process_ca(
&mut self, _uri: &uri::Rsync, cert: &CaCert,
) -> Result<Option<Self>, Failed> {
Ok(Some(
PubPointProcessor {
report: self.report,
pub_point: PubPoint::new_ca(&self.pub_point, cert),
validity: cert.combined_validity(),
}
))
}
fn process_ee_cert(
&mut self, uri: &uri::Rsync, cert: Cert, ca_cert: &CaCert,
) -> Result<(), Failed> {
if !self.report.enable_bgpsec {
return Ok(())
}
if
cert.as_resources().is_inherited()
|| !cert.as_resources().is_present()
{
warn!(
"{}: router certificate does not contain AS resources.", uri
);
return Ok(())
}
let asns = match cert.as_resources().to_blocks() {
Ok(blocks) => blocks,
Err(_) => {
warn!(
"{}: router certificate contains invalid AS resources.",
uri
);
return Ok(())
}
};
let id = cert.subject_key_identifier();
let key = cert.subject_public_key_info();
if !key.allow_router_cert() {
warn!(
"{}: router certificate has invalid key algorithm.", uri
);
return Ok(())
}
let key = match RouterKeyInfo::new(key.to_info_bytes()) {
Ok(key) => key,
Err(_) => {
warn!(
"{}: excessively large key in router certificate.", uri
);
return Ok(())
}
};
self.pub_point.update_refresh(cert.validity().not_after());
self.pub_point.add_router_key(
asns, id, key, Arc::new(PublishInfo::ee_cert(&cert, uri, ca_cert))
);
Ok(())
}
fn process_roa(
&mut self,
_uri: &uri::Rsync,
cert: ResourceCert,
route: RouteOriginAttestation
) -> Result<(), Failed> {
if self.pub_point.add_roa(
route, Arc::new(PublishInfo::signed_object(&cert, self.validity)),
self.report.limit_v4_len, self.report.limit_v6_len,
) {
self.pub_point.update_refresh(cert.validity().not_after());
}
Ok(())
}
fn restart(&mut self) -> Result<(), Failed> {
self.pub_point.restart();
Ok(())
}
fn commit(self) {
if !self.pub_point.is_empty() {
self.report.pub_points.push(self.pub_point);
}
}
fn cancel(self, cert: &CaCert) {
if self.report.log_rejected {
warn!(
"CA for {} rejected, resources marked as unsafe:",
cert.ca_repository()
);
for block in cert.cert().v4_resources().iter() {
warn!(" {}", block.display_v4());
}
for block in cert.cert().v6_resources().iter() {
warn!(" {}", block.display_v6());
}
for block in cert.cert().as_resources().iter() {
warn!(" {}", block);
}
}
self.report.rejected.extend_from_cert(cert);
}
}
#[derive(Clone, Debug)]
struct PubPoint {
payload: Vec<(Payload, Arc<PublishInfo>)>,
refresh: Time,
orig_refresh: Time,
tal_index: usize,
repository_index: Option<usize>,
}
impl PubPoint {
fn new_ta(cert: &CaCert, tal_index: usize) -> Self {
let refresh = cert.cert().validity().not_after();
PubPoint {
payload: Vec::new(),
refresh,
orig_refresh: refresh,
tal_index,
repository_index: None,
}
}
fn new_ca(parent: &PubPoint, cert: &CaCert) -> Self {
let refresh = cmp::min(
parent.refresh, cert.cert().validity().not_after()
);
PubPoint {
payload: Vec::new(),
refresh,
orig_refresh: refresh,
tal_index: parent.tal_index,
repository_index: None,
}
}
pub fn is_empty(&self) -> bool {
self.payload.is_empty()
}
fn update_refresh(&mut self, refresh: Time) {
self.refresh = cmp::min(self.refresh, refresh)
}
fn restart(&mut self) {
self.payload.clear();
self.refresh = self.orig_refresh;
}
fn add_roa(
&mut self,
roa: RouteOriginAttestation,
info: Arc<PublishInfo>,
limit_v4_len: Option<u8>,
limit_v6_len: Option<u8>,
) -> bool {
let mut any = false;
for origin in roa.iter_origins() {
let limit = if origin.prefix.prefix().is_v4() {
limit_v4_len
}
else {
limit_v6_len
};
if let Some(limit) = limit {
if origin.prefix.prefix().len() > limit {
continue;
}
}
self.payload.push((origin.into(), info.clone()));
any = true;
}
any
}
fn add_router_key(
&mut self,
asns: AsBlocks,
key_id: KeyIdentifier,
key_info: RouterKeyInfo,
info: Arc<PublishInfo>,
) {
self.payload.extend(asns.iter_asns().map(|asn| {
(RouterKey::new(key_id, asn, key_info.clone()).into(), info.clone())
}));
}
}
#[derive(Debug, Default)]
struct RejectedResourcesBuilder {
blocks: SegQueue<(bool, IpBlock)>,
}
impl RejectedResourcesBuilder {
fn extend_from_cert(&self, cert: &CaCert) {
for block in cert.cert().v4_resources().iter().filter(|block|
!block.is_slash_zero()
) {
self.blocks.push((true, block));
}
for block in cert.cert().v6_resources().iter().filter(|block|
!block.is_slash_zero()
) {
self.blocks.push((false, block));
}
}
fn finalize(self) -> RejectedResources {
let mut v4 = IpBlocksBuilder::new();
let mut v6 = IpBlocksBuilder::new();
while let Some((is_v4, block)) = self.blocks.pop() {
if is_v4 {
v4.push(block);
}
else {
v6.push(block);
}
}
RejectedResources {
v4: v4.finalize(),
v6: v6.finalize()
}
}
}
#[derive(Clone, Debug)]
pub struct RejectedResources {
v4: IpBlocks,
v6: IpBlocks
}
impl RejectedResources {
pub fn keep_prefix(&self, prefix: addr::Prefix) -> bool {
let raw = rpki::repository::resources::Prefix::new(
prefix.addr(), prefix.len()
);
if prefix.is_v4() {
!self.v4.intersects_block(raw)
}
else {
!self.v6.intersects_block(raw)
}
}
}
#[derive(Clone, Debug)]
pub struct SharedHistory(Arc<RwLock<PayloadHistory>>);
impl SharedHistory {
pub fn from_config(config: &Config) -> Self {
SharedHistory(Arc::new(RwLock::new(
PayloadHistory::from_config(config)
)))
}
pub fn read(&self) -> impl ops::Deref<Target = PayloadHistory> + '_ {
self.0.read().expect("Payload history lock poisoned")
}
fn write(&self) -> impl ops::DerefMut<Target = PayloadHistory> + '_ {
self.0.write().expect("Payload history lock poisoned")
}
pub fn update(
&self,
report: ValidationReport,
exceptions: &LocalExceptions,
mut metrics: Metrics
) -> bool {
let snapshot = SnapshotBuilder::from_report(
report, exceptions, &mut metrics,
self.read().unsafe_vrps
);
let (current, serial) = {
let read = self.read();
(read.current(), read.serial())
};
let delta = current.as_ref().and_then(|current| {
PayloadDelta::construct(¤t.to_builder(), &snapshot, serial)
});
let mut history = self.write();
history.metrics = Some(metrics.into());
if let Some(delta) = delta {
info!(
"Delta with {} announced and {} withdrawn items.",
delta.announce.len(),
delta.withdraw.len(),
);
history.current = Some(
PayloadSnapshot::from_builder(snapshot).into()
);
history.push_delta(delta);
true
}
else if current.is_none() {
history.current = Some(
PayloadSnapshot::from_builder(snapshot).into()
);
true
}
else {
false
}
}
pub fn mark_update_start(&self) {
self.write().last_update_start = Utc::now();
}
pub fn mark_update_done(&self) {
let mut locked = self.write();
let now = Utc::now();
locked.last_update_done = Some(now);
locked.last_update_duration = Some(
now.signed_duration_since(locked.last_update_start)
.to_std().unwrap_or_else(|_| Duration::from_secs(0))
);
locked.next_update_start = SystemTime::now() + locked.refresh;
if let Some(refresh) = locked.current.as_ref().and_then(|c|
c.refresh()
) {
let refresh = SystemTime::from(refresh);
if refresh < locked.next_update_start {
locked.next_update_start = refresh;
}
}
locked.created = {
if let Some(created) = locked.created {
if now.timestamp() <= created.timestamp() {
Some(created + chrono::Duration::seconds(1))
}
else {
Some(now)
}
}
else {
Some(now)
}
};
}
}
impl PayloadSource for SharedHistory {
type Set = SnapshotArcIter;
type Diff = DeltaVrpIter;
fn ready(&self) -> bool {
self.read().is_active()
}
fn notify(&self) -> State {
let read = self.read();
State::from_parts(read.rtr_session(), read.serial())
}
fn full(&self) -> (State, Self::Set) {
let read = self.read();
(
State::from_parts(read.rtr_session(), read.serial()),
read.current.clone().unwrap_or_default().arc_iter(),
)
}
fn diff(&self, state: State) -> Option<(State, Self::Diff)> {
let read = self.read();
if read.rtr_session() != state.session() {
return None
}
read.delta_since(state.serial()).map(|delta| {
(
State::from_parts(read.rtr_session(), read.serial()),
DeltaVrpIter::new(delta)
)
})
}
fn timing(&self) -> Timing {
let read = self.read();
let mut res = read.timing;
res.refresh = u32::try_from(
read.update_wait().as_secs()
).unwrap_or(u32::MAX);
res
}
}
#[derive(Clone, Debug)]
pub struct PayloadHistory {
current: Option<Arc<PayloadSnapshot>>,
deltas: VecDeque<Arc<PayloadDelta>>,
metrics: Option<Arc<Metrics>>,
session: u64,
keep: usize,
refresh: Duration,
unsafe_vrps: FilterPolicy,
last_update_start: DateTime<Utc>,
last_update_done: Option<DateTime<Utc>>,
last_update_duration: Option<Duration>,
next_update_start: SystemTime,
created: Option<DateTime<Utc>>,
timing: Timing,
}
impl PayloadHistory {
pub fn from_config(config: &Config) -> Self {
PayloadHistory {
current: None,
deltas: VecDeque::with_capacity(config.history_size),
metrics: None,
session: {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH).unwrap()
.as_secs()
},
keep: config.history_size,
refresh: config.refresh,
unsafe_vrps: config.unsafe_vrps,
last_update_start: Utc::now(),
last_update_done: None,
last_update_duration: None,
next_update_start: SystemTime::now() + config.refresh,
created: None,
timing: Timing {
refresh: config.refresh.as_secs() as u32,
retry: config.retry.as_secs() as u32,
expire: config.expire.as_secs() as u32,
},
}
}
fn push_delta(&mut self, delta: PayloadDelta) {
if self.deltas.len() == self.keep {
let _ = self.deltas.pop_back();
}
self.deltas.push_front(Arc::new(delta))
}
pub fn is_active(&self) -> bool {
self.current.is_some()
}
pub fn current(&self) -> Option<Arc<PayloadSnapshot>> {
self.current.clone()
}
pub fn refresh_wait(&self) -> Duration {
self.next_update_start
.duration_since(SystemTime::now())
.unwrap_or_else(|_| Duration::from_secs(0))
}
pub fn update_wait(&self) -> Duration {
let start = match self.last_update_duration {
Some(duration) => self.next_update_start + duration + duration,
None => self.next_update_start + self.refresh
};
start.duration_since(SystemTime::now()).unwrap_or(self.refresh)
}
pub fn delta_since(&self, serial: Serial) -> Option<Arc<PayloadDelta>> {
if let Some(delta) = self.deltas.front() {
if delta.serial() < serial {
return None
}
else if delta.serial() == serial {
return Some(Arc::new(PayloadDelta::empty(serial)))
}
else if delta.serial() == serial.add(1) {
return Some(delta.clone())
}
}
else {
if serial == 0 {
return Some(Arc::new(PayloadDelta::empty(serial)))
}
else {
return None
}
};
let mut iter = self.deltas.iter().rev();
for delta in &mut iter {
match delta.serial().partial_cmp(&serial) {
Some(cmp::Ordering::Greater) => return None,
Some(cmp::Ordering::Equal) => break,
_ => continue
}
}
Some(DeltaMerger::from_iter(iter).into_delta())
}
pub fn serial(&self) -> Serial {
self.deltas.front().map(|delta| {
delta.serial()
}).unwrap_or_else(|| 0.into())
}
pub fn session(&self) -> u64 {
self.session
}
pub fn rtr_session(&self) -> u16 {
self.session as u16
}
pub fn metrics(&self) -> Option<Arc<Metrics>> {
self.metrics.clone()
}
pub fn last_update_start(&self) -> DateTime<Utc> {
self.last_update_start
}
pub fn last_update_done(&self) -> Option<DateTime<Utc>> {
self.last_update_done
}
pub fn last_update_duration(&self) -> Option<Duration> {
self.last_update_duration
}
pub fn created(&self) -> Option<DateTime<Utc>> {
self.created
}
pub fn unsafe_vrps(&self) -> FilterPolicy {
self.unsafe_vrps
}
}
#[derive(Clone, Debug)]
pub struct PayloadSnapshot {
origins: Vec<(Payload, PayloadInfo)>,
router_keys: Vec<(Payload, PayloadInfo)>,
created: DateTime<Utc>,
refresh: Option<Time>,
}
impl PayloadSnapshot {
pub fn new() -> Self {
Default::default()
}
fn from_builder(
builder: SnapshotBuilder
) -> Self {
let mut origins = Vec::new();
let mut keys = Vec::new();
for (payload, info) in builder.payload.into_iter() {
match payload {
Payload::Origin(origin) => {
origins.push((Payload::Origin(origin), info))
}
Payload::RouterKey(key) => {
keys.push((Payload::RouterKey(key), info))
}
}
}
origins.sort_unstable_by(|left, right| left.0.cmp(&right.0));
keys.sort_unstable_by(|left, right| left.0.cmp(&right.0));
PayloadSnapshot {
origins,
router_keys: keys,
created: builder.created,
refresh: builder.refresh,
}
}
pub fn from_report(
report: ValidationReport,
exceptions: &LocalExceptions,
metrics: &mut Metrics,
unsafe_vrps: FilterPolicy
) -> Self {
Self::from_builder(SnapshotBuilder::from_report(
report, exceptions, metrics, unsafe_vrps
))
}
pub fn created(&self) -> DateTime<Utc> {
self.created
}
fn refresh(&self) -> Option<Time> {
self.refresh
}
pub fn payload(
&self
) -> impl Iterator<Item = (&Payload, &PayloadInfo)> {
self.origins.iter().chain(self.router_keys.iter())
.map(|item| (&item.0, &item.1))
}
pub fn origins(
&self
) -> impl Iterator<Item = (RouteOrigin, &PayloadInfo)> + '_ {
self.origins.iter().map(|item| {
match item.0 {
Payload::Origin(origin) => (origin, &item.1),
_ => panic!("non-origin in origin set")
}
})
}
pub fn router_keys(
&self
) -> impl Iterator<Item = (&RouterKey, &PayloadInfo)> + '_ {
self.router_keys.iter().map(|item| {
match item.0 {
Payload::RouterKey(ref key) => (key, &item.1),
_ => panic!("non-router key in router key set")
}
})
}
pub fn arc_iter(self: Arc<Self>) -> SnapshotArcIter {
SnapshotArcIter::new(self)
}
pub fn arc_origins_iter(self: Arc<Self>) -> SnapshotArcOriginsIter {
SnapshotArcOriginsIter::new(self)
}
pub fn arc_router_keys_iter(self: Arc<Self>) -> SnapshotArcRouterKeysIter {
SnapshotArcRouterKeysIter::new(self)
}
fn to_builder(&self) -> SnapshotBuilder {
SnapshotBuilder {
payload: self.payload().map(|item| {
(item.0.clone(), item.1.clone())
}).collect(),
created: self.created,
refresh: self.refresh,
}
}
}
impl Default for PayloadSnapshot {
fn default() -> Self {
PayloadSnapshot {
origins: Vec::new(),
router_keys: Vec::new(),
created: Utc::now(),
refresh: None
}
}
}
impl AsRef<PayloadSnapshot> for PayloadSnapshot {
fn as_ref(&self) -> &Self {
self
}
}
#[derive(Clone, Debug)]
pub struct SnapshotArcIter {
snapshot: Arc<PayloadSnapshot>,
next: PayloadIndex,
}
impl SnapshotArcIter {
fn new(snapshot: Arc<PayloadSnapshot>) -> Self {
SnapshotArcIter {
snapshot,
next: Default::default()
}
}
}
impl SnapshotArcIter {
pub fn next_with_info(&mut self) -> Option<(&Payload, &PayloadInfo)> {
match self.next {
PayloadIndex::Origin(idx) => {
if idx >= self.snapshot.origins.len() {
self.next = PayloadIndex::Key(0);
self.next_with_info()
}
else {
self.next = self.next.inc();
let res = &self.snapshot.origins[idx];
Some((&res.0, &res.1))
}
}
PayloadIndex::Key(idx) => {
let res = self.snapshot.router_keys.get(idx)?;
self.next = self.next.inc();
Some((&res.0, &res.1))
}
}
}
}
impl PayloadSet for SnapshotArcIter {
fn next(&mut self) -> Option<&Payload> {
self.next_with_info().map(|(res, _)| res)
}
}
#[derive(Clone, Copy, Debug)]
enum PayloadIndex {
Origin(usize),
Key(usize),
}
impl PayloadIndex {
fn inc(self) -> Self {
match self {
PayloadIndex::Origin(index) => PayloadIndex::Origin(index + 1),
PayloadIndex::Key(index) => PayloadIndex::Key(index + 1)
}
}
}
impl Default for PayloadIndex {
fn default() -> Self {
PayloadIndex::Origin(0)
}
}
#[derive(Clone, Debug)]
pub struct SnapshotArcOriginsIter {
snapshot: Arc<PayloadSnapshot>,
next: usize,
}
impl SnapshotArcOriginsIter {
fn new(snapshot: Arc<PayloadSnapshot>) -> Self {
Self {
snapshot,
next: 0
}
}
pub fn next_with_info(&mut self) -> Option<(RouteOrigin, &PayloadInfo)> {
let res = self.snapshot.origins.get(self.next)?;
self.next += 1;
match res.0 {
Payload::Origin(origin) => Some((origin, &res.1)),
_ => panic!("non-origin in origin set")
}
}
}
impl Iterator for SnapshotArcOriginsIter {
type Item = RouteOrigin;
fn next(&mut self) -> Option<Self::Item> {
self.next_with_info().map(|item| item.0)
}
}
#[derive(Clone, Debug)]
pub struct SnapshotArcRouterKeysIter {
snapshot: Arc<PayloadSnapshot>,
next: usize,
}
impl SnapshotArcRouterKeysIter {
fn new(snapshot: Arc<PayloadSnapshot>) -> Self {
Self {
snapshot,
next: 0
}
}
pub fn next_with_info(&mut self) -> Option<(&RouterKey, &PayloadInfo)> {
let res = self.snapshot.router_keys.get(self.next)?;
self.next += 1;
match res.0 {
Payload::RouterKey(ref key) => Some((key, &res.1)),
_ => panic!("non-router key in router key set")
}
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Option<&RouterKey> {
self.next_with_info().map(|item| item.0)
}
}
#[derive(Clone, Debug)]
struct SnapshotBuilder {
payload: HashMap<Payload, PayloadInfo>,
created: DateTime<Utc>,
refresh: Option<Time>,
}
impl SnapshotBuilder {
fn from_report(
report: ValidationReport,
exceptions: &LocalExceptions,
metrics: &mut Metrics,
unsafe_vrps: FilterPolicy
) -> Self {
let mut res = SnapshotBuilder {
payload: HashMap::new(),
created: metrics.time,
refresh: None
};
let rejected = report.rejected.finalize();
let mut unsafe_vrps_present = false;
while let Some(pub_point) = report.pub_points.pop() {
res.update_refresh(pub_point.refresh);
for (payload, roa_info) in pub_point.payload {
let mut point_metrics = AllVrpMetrics::new(
metrics, pub_point.tal_index, pub_point.repository_index,
&payload
);
point_metrics.update(|m| m.valid += 1);
if let Payload::Origin(origin) = payload {
if !rejected.keep_prefix(origin.prefix.prefix()) {
unsafe_vrps_present = true;
match unsafe_vrps {
FilterPolicy::Accept => {
}
FilterPolicy::Warn => {
point_metrics.update(|m| m.marked_unsafe += 1);
info!(
"Encountered potentially unsafe VRP \
({}/{}-{}, {})",
origin.prefix.addr(),
origin.prefix.prefix_len(),
origin.prefix.resolved_max_len(),
origin.asn
);
}
FilterPolicy::Reject => {
point_metrics.update(|m| m.marked_unsafe += 1);
warn!(
"Filtering potentially unsafe VRP \
({}/{}-{}, {})",
origin.prefix.addr(),
origin.prefix.prefix_len(),
origin.prefix.resolved_max_len(),
origin.asn
);
continue
}
}
}
}
if !exceptions.keep_payload(&payload) {
point_metrics.update(|m| m.locally_filtered += 1);
continue
}
match res.payload.entry(payload) {
hash_map::Entry::Vacant(entry) => {
entry.insert(roa_info.into());
point_metrics.update(|m| m.contributed += 1);
}
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().add_published(roa_info);
point_metrics.update(|m| m.duplicate += 1);
}
}
}
}
if unsafe_vrps_present && unsafe_vrps.log() {
warn!(
"For more information on unsafe VRPs, see \
https://routinator.docs.nlnetlabs.nl\
/en/stable/unsafe-vrps.html"
);
}
for (payload, info) in exceptions.assertions() {
match res.payload.entry(payload.clone()) {
hash_map::Entry::Vacant(entry) => {
entry.insert(info.into());
metrics.local.for_payload(payload).contributed += 1;
metrics.payload.for_payload(payload).contributed += 1;
}
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().add_local(info);
metrics.local.for_payload(payload).duplicate += 1;
metrics.payload.for_payload(payload).duplicate += 1;
}
}
}
metrics.finalize();
res
}
fn update_refresh(&mut self, refresh: Time) {
self.refresh = match self.refresh {
Some(old) => Some(cmp::min(old, refresh)),
None => Some(refresh)
}
}
}
impl Default for SnapshotBuilder {
fn default() -> Self {
SnapshotBuilder {
payload: HashMap::new(),
created: Utc::now(),
refresh: None
}
}
}
struct AllVrpMetrics<'a> {
tal: &'a mut VrpMetrics,
repo: Option<&'a mut VrpMetrics>,
all: &'a mut VrpMetrics,
}
impl<'a> AllVrpMetrics<'a> {
fn new(
metrics: &'a mut Metrics, tal_index: usize, repo_index: Option<usize>,
payload: &Payload,
) -> Self {
AllVrpMetrics {
tal: metrics.tals[tal_index].payload.for_payload(payload),
repo: match repo_index {
Some(index) => {
Some(
metrics.repositories[index]
.payload.for_payload(payload)
)
}
None => None
},
all: metrics.payload.for_payload(payload),
}
}
fn update(&mut self, op: impl Fn(&mut VrpMetrics)) {
op(self.tal);
if let Some(ref mut repo) = self.repo {
op(repo)
}
op(self.all)
}
}
#[derive(Clone, Debug)]
pub struct PayloadDelta {
serial: Serial,
announce: Vec<Payload>,
withdraw: Vec<Payload>,
}
impl PayloadDelta {
fn construct(
current: &SnapshotBuilder, next: &SnapshotBuilder, serial: Serial
) -> Option<Self> {
let announce = added_keys(&next.payload, ¤t.payload);
let withdraw = added_keys(¤t.payload, &next.payload);
if !announce.is_empty() || !withdraw.is_empty() {
Some(PayloadDelta {
serial: serial.add(1),
announce,
withdraw,
})
}
else {
None
}
}
pub fn empty(serial: Serial) -> Self {
PayloadDelta {
serial,
announce: Vec::new(),
withdraw: Vec::new(),
}
}
pub fn is_empty(&self) -> bool {
self.announce.is_empty() && self.withdraw.is_empty()
}
pub fn serial(&self) -> Serial {
self.serial
}
pub fn announce(&self) -> &[Payload] {
&self.announce
}
pub fn withdraw(&self) -> &[Payload] {
&self.withdraw
}
}
#[derive(Clone, Debug)]
pub struct DeltaVrpIter {
delta: Arc<PayloadDelta>,
pos: Result<usize, usize>,
}
impl DeltaVrpIter {
fn new(delta: Arc<PayloadDelta>) -> Self {
DeltaVrpIter {
delta,
pos: Ok(0)
}
}
}
impl PayloadDiff for DeltaVrpIter {
fn next(&mut self) -> Option<(&Payload, Action)> {
match self.pos {
Ok(pos) => {
match self.delta.announce.get(pos) {
Some(res) => {
self.pos = Ok(pos + 1);
Some((res, Action::Announce))
}
None => {
match self.delta.withdraw.get(0) {
Some(res) => {
self.pos = Err(1);
Some((res, Action::Withdraw))
}
None => {
self.pos = Err(0);
None
}
}
}
}
}
Err(pos) => {
match self.delta.withdraw.get(pos) {
Some(res) => {
self.pos = Err(pos + 1);
Some((res, Action::Withdraw))
}
None => None
}
}
}
}
}
#[derive(Clone, Debug, Default)]
struct DeltaMerger {
serial: Serial,
announce: HashSet<Payload>,
withdraw: HashSet<Payload>,
}
impl DeltaMerger {
fn from_iter<'a>(
mut iter: impl Iterator<Item = &'a Arc<PayloadDelta>>
) -> Self {
let mut res = match iter.next() {
Some(delta) => Self::new(delta),
None => return Self::default()
};
for delta in iter {
res.merge(delta)
}
res
}
fn new(delta: &PayloadDelta) -> Self {
DeltaMerger {
serial: delta.serial,
announce: delta.announce.iter().cloned().collect(),
withdraw: delta.withdraw.iter().cloned().collect(),
}
}
fn merge(&mut self, delta: &PayloadDelta) {
self.serial = delta.serial;
for origin in &delta.announce {
if !self.withdraw.remove(origin) {
self.announce.insert(origin.clone());
}
}
for origin in &delta.withdraw {
if !self.announce.remove(origin) {
self.withdraw.insert(origin.clone());
}
}
}
fn into_delta(self) -> Arc<PayloadDelta> {
Arc::new(PayloadDelta {
serial: self.serial,
announce: self.announce.into_iter().collect(),
withdraw: self.withdraw.into_iter().collect(),
})
}
}
#[derive(Clone, Debug)]
pub struct PayloadInfo {
head: Result<Arc<PublishInfo>, Arc<ExceptionInfo>>,
tail: Option<Box<PayloadInfo>>,
}
impl PayloadInfo {
fn add_published(&mut self, info: Arc<PublishInfo>) {
self.tail = Some(Box::new(PayloadInfo {
head: Ok(info),
tail: self.tail.take()
}));
}
fn add_local(&mut self, info: Arc<ExceptionInfo>) {
self.tail = Some(Box::new(PayloadInfo {
head: Err(info),
tail: self.tail.take()
}));
}
pub fn iter(&self) -> PayloadInfoIter {
PayloadInfoIter { info: Some(self) }
}
pub fn tal_name(&self) -> Option<&str> {
self.head.as_ref().map(|info| info.tal.name()).ok()
}
pub fn uri(&self) -> Option<&uri::Rsync> {
self.head.as_ref().ok().and_then(|info| info.uri.as_ref())
}
pub fn validity(&self) -> Option<Validity> {
self.head.as_ref().map(|info| info.roa_validity).ok()
}
pub fn publish_info(&self) -> Option<&PublishInfo> {
match self.head {
Ok(ref info) => Some(info),
Err(_) => None
}
}
pub fn exception_info(&self) -> Option<&ExceptionInfo> {
match self.head {
Ok(_) => None,
Err(ref info) => Some(info),
}
}
}
impl From<Arc<PublishInfo>> for PayloadInfo {
fn from(src: Arc<PublishInfo>) -> Self {
PayloadInfo { head: Ok(src), tail: None }
}
}
impl From<Arc<ExceptionInfo>> for PayloadInfo {
fn from(src: Arc<ExceptionInfo>) -> Self {
PayloadInfo { head: Err(src), tail: None }
}
}
impl<'a> IntoIterator for &'a PayloadInfo {
type Item = &'a PayloadInfo;
type IntoIter = PayloadInfoIter<'a>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
#[derive(Clone, Debug)]
pub struct PayloadInfoIter<'a> {
info: Option<&'a PayloadInfo>,
}
impl<'a> Iterator for PayloadInfoIter<'a> {
type Item = &'a PayloadInfo;
fn next(&mut self) -> Option<Self::Item> {
let res = self.info?;
self.info = res.tail.as_ref().map(AsRef::as_ref);
Some(res)
}
}
#[derive(Clone, Debug)]
pub struct PublishInfo {
pub tal: Arc<TalInfo>,
pub uri: Option<uri::Rsync>,
pub roa_validity: Validity,
pub chain_validity: Validity,
}
impl PublishInfo {
fn signed_object(cert: &ResourceCert, ca_validity: Validity) -> Self {
PublishInfo {
tal: cert.tal().clone(),
uri: cert.signed_object().cloned().map(|mut uri| {
uri.unshare(); uri
}),
roa_validity: cert.validity(),
chain_validity: cert.validity().trim(ca_validity),
}
}
fn ee_cert(cert: &Cert, uri: &uri::Rsync, ca_cert: &CaCert) -> Self {
PublishInfo {
tal: ca_cert.cert().tal().clone(),
uri: Some(uri.clone()),
roa_validity: cert.validity(),
chain_validity: cert.validity().trim(ca_cert.combined_validity())
}
}
}
fn added_keys<K: Clone + Hash + Eq, V>(
this: &HashMap<K, V>, other: &HashMap<K, V>
) -> Vec<K> {
this.keys().filter(|key| !other.contains_key(key)).cloned().collect()
}
#[cfg(test)]
mod test {
use std::str::FromStr;
use super::*;
fn make_prefix(s: &str, l: u8) -> addr::Prefix {
addr::Prefix::new(s.parse().unwrap(), l).unwrap()
}
fn origin(as_id: u32, prefix: &str, max_len: u8) -> Payload {
RouteOrigin::new(
addr::MaxLenPrefix::new(
addr::Prefix::from_str(prefix).unwrap(),
Some(max_len)
).unwrap(),
as_id.into(),
).into()
}
fn router_key(key_id: u32, asn: u32, key_info: u32) -> Payload {
Payload::router_key(
KeyIdentifier::from_str(&format!("{:040}", key_id)).unwrap(),
asn.into(),
RouterKeyInfo::new(format!("{}", key_info).into()).unwrap(),
)
}
#[test]
fn address_prefix_covers_v4() {
let outer = make_prefix("10.0.0.0", 16);
let host_roa = make_prefix("10.0.0.0", 32);
let sibling = make_prefix("10.1.0.0", 16);
let inner_low = make_prefix("10.0.0.0", 24);
let inner_mid = make_prefix("10.0.61.0", 24);
let inner_hi = make_prefix("10.0.255.0", 24);
let supernet = make_prefix("10.0.0.0", 8);
assert!(!outer.covers(sibling));
assert!(outer.covers(inner_low));
assert!(outer.covers(inner_mid));
assert!(outer.covers(inner_hi));
assert!(!host_roa.covers(outer));
assert!(!outer.covers(supernet));
}
#[test]
fn address_prefix_covers_v6() {
let outer = make_prefix("2001:db8::", 32);
let host_roa = make_prefix("2001:db8::", 128);
let sibling = make_prefix("2001:db9::", 32);
let inner_low = make_prefix("2001:db8::", 48);
let inner_mid = make_prefix("2001:db8:8000::", 48);
let inner_hi = make_prefix("2001:db8:FFFF::", 48);
let supernet = make_prefix("2001::", 24);
assert!(!outer.covers(sibling));
assert!(outer.covers(inner_low));
assert!(outer.covers(inner_mid));
assert!(outer.covers(inner_hi));
assert!(!host_roa.covers(outer));
assert!(!outer.covers(supernet));
}
#[test]
#[allow(clippy::mutable_key_type)]
fn payload_delta_construct() {
let o0 = origin(10, "10.0.0.0/10", 10);
let o1 = origin(11, "10.0.0.0/11", 11);
let o2 = origin(12, "10.0.0.0/12", 12);
let o3 = origin(13, "10.0.0.0/13", 13);
let o4 = origin(14, "10.0.0.0/14", 14);
let info = PayloadInfo::from(Arc::new(ExceptionInfo::default()));
let mut current = SnapshotBuilder::default();
current.payload.insert(o0.clone(), info.clone());
current.payload.insert(o1.clone(), info.clone());
current.payload.insert(o2.clone(), info.clone());
current.payload.insert(o3.clone(), info.clone());
let mut next = SnapshotBuilder::default();
next.payload.insert(o0, info.clone());
next.payload.insert(o2, info.clone());
next.payload.insert(o4.clone(), info);
let delta = PayloadDelta::construct(
¤t, &next, 12.into()
).unwrap();
assert_eq!(delta.serial, Serial::from(13));
let mut add: HashSet<_> = delta.announce.into_iter().collect();
let mut sub: HashSet<_> = delta.withdraw.into_iter().collect();
assert!(add.remove(&o4));
assert!(add.is_empty());
assert!(sub.remove(&o1));
assert!(sub.remove(&o3));
assert!(sub.is_empty());
assert!(
PayloadDelta::construct(¤t, ¤t, 10.into()).is_none()
);
}
#[test]
fn fn_added_keys() {
use std::iter::FromIterator;
assert_eq!(
added_keys(
&HashMap::from_iter(
vec![(1, ()), (2, ()), (3, ()), (4, ())].into_iter()
),
&HashMap::from_iter(
vec![(2, ()), (4, ()), (5, ())].into_iter()
)
).into_iter().collect::<HashSet<_>>(),
HashSet::from_iter(vec![1, 3].into_iter()),
);
}
#[test]
fn snapshot_order() {
let payload = vec![
origin(13, "10.0.0.0/11", 13),
origin(10, "10.0.0.0/10", 10),
router_key(22, 22, 22),
router_key(24, 24, 24),
origin(12, "10.0.0.0/11", 12),
origin(11, "10.0.0.0/11", 11),
router_key(23, 23, 23),
];
let mut builder = SnapshotBuilder::default();
for item in &payload {
builder.payload.insert(
item.clone(),
PayloadInfo::from(Arc::new(ExceptionInfo::default())),
);
}
let snapshot = PayloadSnapshot::from_builder(builder);
let mut origins = payload.iter().filter_map(|item| {
match item {
Payload::Origin(origin) => Some(origin),
_ => None
}
}).collect::<Vec<_>>();
origins.sort();
let mut origins_iter = origins.iter();
for item in snapshot.origins() {
assert_eq!(&item.0, *origins_iter.next().unwrap())
}
assert!(origins_iter.next().is_none());
let mut keys = payload.iter().filter_map(|item| {
match item {
Payload::RouterKey(key) => Some(key.clone()),
_ => None
}
}).collect::<Vec<_>>();
keys.sort();
let mut keys_iter = keys.iter();
for item in snapshot.router_keys() {
assert_eq!(item.0, keys_iter.next().unwrap())
}
assert!(keys_iter.next().is_none());
let snapshot = Arc::new(snapshot);
let mut origins_iter = origins.iter();
for item in snapshot.clone().arc_origins_iter() {
assert_eq!(&item, *origins_iter.next().unwrap())
}
assert!(origins_iter.next().is_none());
let mut keys_iter = keys.iter();
let mut arc_iter = snapshot.arc_router_keys_iter();
while let Some(item) = arc_iter.next() {
assert_eq!(item, keys_iter.next().unwrap())
}
assert!(keys_iter.next().is_none());
}
#[test]
fn delta_vrp_iter() {
use crate::payload::Action::*;
fn collect(iter: &mut DeltaVrpIter) -> Vec<(Payload, Action)> {
let mut res = Vec::new();
while let Some((payload, action)) = iter.next() {
res.push((payload.clone(), action));
}
res
}
let mut delta = PayloadDelta {
serial: Default::default(),
announce: vec![
origin(64496, "192.0.2.0/24", 24),
origin(64497, "198.51.100.0/24", 24),
origin(64497, "192.0.2.0/24", 24),
],
withdraw: vec![
origin(64496, "198.51.100.0/24", 24),
origin(64497, "2001:DB8::/32", 48),
]
};
assert_eq!(
collect(&mut DeltaVrpIter::new(Arc::new(delta.clone()))),
vec![
(origin(64496, "192.0.2.0/24", 24), Announce),
(origin(64497, "198.51.100.0/24", 24), Announce),
(origin(64497, "192.0.2.0/24", 24), Announce),
(origin(64496, "198.51.100.0/24", 24), Withdraw),
(origin(64497, "2001:DB8::/32", 48), Withdraw)
]
);
let mut no_announce = delta.clone();
no_announce.announce = Vec::new();
assert_eq!(
collect(&mut DeltaVrpIter::new(Arc::new(no_announce))),
vec![
(origin(64496, "198.51.100.0/24", 24), Withdraw),
(origin(64497, "2001:DB8::/32", 48), Withdraw)
]
);
let mut no_withdraw = delta.clone();
no_withdraw.withdraw = Vec::new();
assert_eq!(
collect(&mut DeltaVrpIter::new(Arc::new(no_withdraw))),
vec![
(origin(64496, "192.0.2.0/24", 24), Announce),
(origin(64497, "198.51.100.0/24", 24), Announce),
(origin(64497, "192.0.2.0/24", 24), Announce),
]
);
delta.announce = Vec::new();
delta.withdraw = Vec::new();
assert!(collect(&mut DeltaVrpIter::new(Arc::new(delta))).is_empty());
}
}