use std::{cmp, fmt, fs, thread};
use std::borrow::Cow;
use std::collections::HashMap;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use bytes::Bytes;
use crossbeam_queue::{ArrayQueue, SegQueue};
use log::{debug, error, info, warn};
use rand::seq::SliceRandom;
use rpki::crypto::keys::KeyIdentifier;
#[allow(unused_imports)]
use rpki::repository::aspa::{Aspa, AsProviderAttestation};
use rpki::repository::cert::{Cert, KeyUsage, ResourceCert};
use rpki::repository::crl::Crl;
use rpki::repository::error::{InspectionError, ValidationError};
use rpki::repository::manifest::{Manifest, ManifestContent, ManifestHash};
use rpki::repository::roa::{Roa, RouteOriginAttestation};
use rpki::repository::sigobj::SignedObject;
use rpki::repository::tal::{Tal, TalInfo, TalUri};
use rpki::repository::x509::{Time, Validity};
use rpki::uri;
use crate::{collector, store, tals};
use crate::config::{Config, FilterPolicy};
use crate::collector::Collector;
use crate::error::{Failed, Fatal, RunFailed};
use crate::log::{LogBook, LogBookWriter};
use crate::metrics::{
Metrics, PublicationMetrics, RepositoryMetrics, TalMetrics
};
use crate::store::{
Store, StoredManifest, StoredObject, StoredPoint, StoredStatus
};
use crate::utils::str::str_from_ascii;
const CRL_CACHE_LIMIT: usize = 50;
#[derive(Debug)]
pub struct Engine {
bundled_tals: Vec<Tal>,
extra_tals_dir: Option<PathBuf>,
tal_labels: HashMap<String, String>,
tals: Vec<Tal>,
collector: Option<Collector>,
store: Store,
strict: bool,
stale: FilterPolicy,
validation_threads: usize,
dirty_repository: bool,
max_ca_depth: usize,
log_repository_issues: bool,
}
impl Engine {
pub fn init(config: &Config) -> Result<(), Failed> {
Collector::init(config)?;
Store::init(config)?;
Ok(())
}
pub fn new(
config: &Config,
update: bool,
) -> Result<Self, Failed> {
let collector = if update {
Some(Collector::new(config)?)
}
else {
None
};
let store = Store::new(config)?;
let mut res = Engine {
bundled_tals: tals::collect_tals(config)?,
extra_tals_dir: config.extra_tals_dir.clone(),
tal_labels: config.tal_labels.clone(),
tals: Vec::new(),
collector,
store,
strict: config.strict,
stale: config.stale,
validation_threads: config.validation_threads,
dirty_repository: config.dirty_repository,
max_ca_depth: config.max_ca_depth,
log_repository_issues: config.log_repository_issues,
};
res.reload_tals()?;
Ok(res)
}
pub fn disable_collector(&mut self) {
self.collector = None;
}
pub fn store_status(&self) -> Result<Option<StoredStatus>, Failed> {
self.store.status()
}
pub fn reload_tals(&mut self) -> Result<(), Failed> {
let mut res = self.bundled_tals.clone();
if let Some(extra_tals_dir) = self.extra_tals_dir.as_ref() {
let dir = match fs::read_dir(extra_tals_dir) {
Ok(dir) => dir,
Err(err) => {
error!("Failed to open TAL directory {}: {}.",
extra_tals_dir.display(), err
);
return Err(Failed)
}
};
for entry in dir {
let entry = match entry {
Ok(entry) => entry,
Err(err) => {
error!(
"Failed to iterate over tal directory: {err}"
);
return Err(Failed)
}
};
if !entry.file_type().map(|ft| ft.is_file()).unwrap_or(false) {
continue
}
let path = entry.path();
if path.extension().map(|ext| ext != "tal").unwrap_or(true) {
continue
}
let mut file = match File::open(&path) {
Ok(file) => {
file
}
Err(err) => {
error!(
"Failed to open TAL {}: {}. \n\
Aborting.",
path.display(), err
);
return Err(Failed)
}
};
let mut tal = match Tal::read_named(
self.path_to_tal_label(&path),
&mut file
) {
Ok(tal) => tal,
Err(err) => {
error!(
"Failed to read TAL {}: {}. \n\
Aborting.",
path.display(), err
);
return Err(Failed)
}
};
tal.prefer_https();
res.push(tal);
}
}
if res.is_empty() {
warn!(
"No TALs provided. Starting anyway."
);
}
res.sort_by(|left, right| {
left.info().name().cmp(right.info().name())
});
self.tals = res;
Ok(())
}
fn path_to_tal_label(&self, path: &Path) -> String {
if let Some(name) = path.file_name().unwrap().to_str() {
if let Some(label) = self.tal_labels.get(name) {
return label.clone()
}
}
path.file_stem().unwrap().to_string_lossy().into_owned()
}
pub fn ignite(&mut self) -> Result<(), Failed> {
if let Some(collector) = self.collector.as_mut() {
collector.ignite()?;
}
Ok(())
}
pub fn sanitize(&self) -> Result<(), Fatal> {
self.store.sanitize()?;
if let Some(collector) = self.collector.as_ref() {
collector.sanitize()?;
}
Ok(())
}
pub fn start<P: ProcessRun>(
&self, processor: P, initial: bool,
) -> Result<Run<'_, P>, Failed> {
info!("Using the following TALs:");
for tal in &self.tals {
info!(" * {}", tal.info().name());
}
Ok(Run::new(
self,
if initial {
None
}
else {
self.collector.as_ref().map(Collector::start)
},
self.store.start(),
processor,
initial,
))
}
pub fn dump(&self, dir: &Path) -> Result<(), Failed> {
self.store.dump(dir)?;
if let Some(collector) = self.collector.as_ref() {
collector.dump(dir)?;
}
Ok(())
}
}
pub struct Run<'a, P> {
validation: &'a Engine,
collector: Option<collector::Run<'a>>,
store: store::Run<'a>,
processor: P,
initial: bool,
had_err: AtomicBool,
is_fatal: AtomicBool,
metrics: Metrics,
}
impl<'a, P> Run<'a, P> {
fn new(
validation: &'a Engine,
collector: Option<collector::Run<'a>>,
store: store::Run<'a>,
processor: P,
initial: bool
) -> Self {
Run {
validation, collector, store, processor, initial,
had_err: AtomicBool::new(false),
is_fatal: AtomicBool::new(false),
metrics: Default::default()
}
}
pub fn cleanup(&mut self) -> Result<(), Failed> {
if self.validation.dirty_repository {
debug!("Skipping cleanup as configured.");
return Ok(())
}
let mut retain = collector::Cleanup::new();
self.store.cleanup(&mut retain)?;
if let Some(collector) = self.collector.as_mut() {
collector.cleanup(&mut retain)?;
}
Ok(())
}
pub fn done(self) -> Metrics {
let mut metrics = self.metrics;
if let Some(collector) = self.collector {
collector.done(&mut metrics)
}
self.store.done(&mut metrics);
metrics
}
}
impl<P: ProcessRun> Run<'_, P> {
pub fn process(&mut self) -> Result<(), RunFailed> {
if self.validation.tals.is_empty() {
return Ok(())
}
let metrics = RunMetrics::default();
let tasks = SegQueue::new();
for (index, tal) in self.validation.tals.iter().enumerate() {
tasks.push(Task::Tal(TalTask { tal, index }));
self.metrics.tals.push(TalMetrics::new(tal.info().clone()));
}
let thread_metrics = ArrayQueue::new(
self.validation.validation_threads
);
thread::scope(|scope| {
for _ in 0 .. self.validation.validation_threads {
scope.spawn(|| {
let mut metrics = metrics.fork();
while let Some(task) = tasks.pop() {
if self.process_task(
task, &tasks, &mut metrics,
).is_err() {
break;
}
}
thread_metrics.push(metrics).unwrap();
});
}
});
if self.had_err.load(Ordering::Relaxed) {
if self.is_fatal.load(Ordering::Relaxed) {
return Err(RunFailed::fatal())
}
else {
return Err(RunFailed::retry())
}
}
metrics.prepare_final(&mut self.metrics);
while let Some(metrics) = thread_metrics.pop() {
metrics.collapse(&mut self.metrics);
}
Ok(())
}
fn process_task(
&self,
task: Task<P::PubPoint>,
tasks: &SegQueue<Task<P::PubPoint>>,
metrics: &mut RunMetrics,
) -> Result<(), Failed> {
match task {
Task::Tal(task) => {
self.process_tal_task(task, tasks, metrics)
}
Task::Ca(task) => {
self.process_ca_task(task, tasks, metrics)
}
}
}
fn process_tal_task(
&self, task: TalTask,
tasks: &SegQueue<Task<P::PubPoint>>,
metrics: &mut RunMetrics,
) -> Result<(), Failed> {
for uri in task.tal.uris() {
let cert = match self.load_ta(uri, task.tal.info())? {
Some(cert) => cert,
_ => continue,
};
if cert.subject_public_key_info() != task.tal.key_info() {
warn!(
"Trust anchor {uri}: key doesn’t match TAL."
);
continue;
}
let cert = match cert.validate_ta(
task.tal.info().clone(), self.validation.strict
) {
Ok(cert) => CaCert::root(cert, uri.clone(), task.index),
Err(err) => {
warn!("Trust anchor {uri}: {err}.");
continue;
}
};
let cert = match cert {
Ok(cert) => cert,
Err(_) => continue,
};
debug!("Found valid trust anchor {uri}. Processing.");
match self.processor.process_ta(
task.tal, uri, &cert, cert.tal
)? {
Some(processor) => {
return self.process_ca_task(
CaTask {
cert, processor,
repository_index: None,
defer: false,
},
tasks, metrics,
)
}
None => {
debug!("Skipping trust anchor {uri}.");
return Ok(())
}
}
}
if self.initial {
info!(
"Initial quick validation failed: \
no trust anchor for TAL {}.",
task.tal.info().name()
);
self.run_failed(RunFailed::retry());
Err(Failed)
}
else {
warn!("No valid trust anchor for TAL {}", task.tal.info().name());
Ok(())
}
}
fn load_ta(
&self,
uri: &TalUri,
_info: &TalInfo,
) -> Result<Option<Cert>, Failed> {
if let Some(collector) = self.collector.as_ref() {
if let Some(bytes) = collector.load_ta(uri) {
if let Ok(cert) = Cert::decode(bytes.clone()) {
self.store.update_ta(uri, &bytes)?;
return Ok(Some(cert))
}
}
}
self.store.load_ta(uri).map(|bytes| {
bytes.and_then(|bytes| Cert::decode(bytes).ok())
})
}
fn process_ca_task(
&self,
task: CaTask<P::PubPoint>,
tasks: &SegQueue<Task<P::PubPoint>>,
metrics: &mut RunMetrics,
) -> Result<(), Failed> {
let more_tasks = PubPoint::new(
self, &task.cert, task.processor, task.repository_index,
).and_then(|point| {
point.process(metrics)
}).map_err(|err| {
self.run_failed(err);
Failed
})?;
for task in more_tasks {
if self.had_err.load(Ordering::Relaxed) {
return Err(Failed)
}
if task.defer {
tasks.push(Task::Ca(task))
}
else {
self.process_ca_task(task, tasks, metrics)?;
}
}
Ok(())
}
fn run_failed(&self, err: RunFailed) {
self.had_err.store(true, Ordering::Relaxed);
if err.is_fatal() {
self.is_fatal.store(true, Ordering::Relaxed);
}
}
}
struct PubPoint<'a, P: ProcessRun> {
run: &'a Run<'a, P>,
cert: &'a Arc<CaCert>,
processor: P::PubPoint,
repository_index: Option<usize>,
metrics: PublicationMetrics,
log: LogBookWriter,
}
impl<'a, P: ProcessRun> PubPoint<'a, P> {
pub fn new(
run: &'a Run<'a, P>,
cert: &'a Arc<CaCert>,
processor: P::PubPoint,
repository_index: Option<usize>,
) -> Result<Self, RunFailed> {
Ok(PubPoint {
run, cert, processor, repository_index,
metrics: Default::default(),
log: LogBookWriter::new(
run.validation.log_repository_issues.then(|| {
format!("{}: ", cert.ca_repository())
})
),
})
}
pub fn process(
self,
metrics: &mut RunMetrics,
) -> Result<Vec<CaTask<P::PubPoint>>, RunFailed> {
let mut store = self.run.store.pub_point(self.cert)?;
if self.run.initial && store.is_new() {
info!(
"Initial quick validation failed: \
encountered new publication point '{}'.",
self.cert.rpki_manifest()
);
self.run.run_failed(RunFailed::retry());
return Err(RunFailed::retry());
}
if let Some(collector) = self.run.collector.as_ref() {
if let Some(collector) = collector.repository(self.cert)? {
match self.process_collected(
collector, &mut store, metrics
)? {
Ok(res) => return Ok(res),
Err(mut this) => {
this.metrics = Default::default();
return Ok(this.process_stored(store, metrics)?)
}
}
}
}
Ok(self.process_stored(store, metrics)?)
}
#[allow(clippy::type_complexity)] fn process_collected(
mut self,
collector: collector::Repository,
store: &mut StoredPoint,
metrics: &mut RunMetrics,
) -> Result<Result<Vec<CaTask<P::PubPoint>>, Self>, RunFailed> {
let collected = match collector.load_object(
self.cert.rpki_manifest()
)? {
Some(collected) => collected,
None => return Ok(Err(self))
};
let same = if let Some(mft) = store.manifest() {
mft.manifest == collected
&& mft.ca_repository == *self.cert.ca_repository()
}
else {
false
};
if same {
return Ok(Err(self))
}
let mut collected = match self.validate_collected_manifest(
collected, &collector
)? {
Some(collected) => collected,
None => {
return Ok(Err(self))
}
};
if !self.check_collected_is_newer(&collected, store)? {
return Ok(Err(self))
}
collected.point_validity(&mut self.processor);
let mut ca_tasks = Vec::new();
let mut items_random: Vec<_> = collected.content.iter().collect();
items_random.shuffle(&mut rand::rng());
let mut items = items_random.into_iter();
let mut point_ok = true;
let update_result = store.update(
&self.run.validation.store,
StoredManifest::new(
&collected.ee_cert,
&collected.content,
self.cert,
collected.manifest_bytes.clone(),
collected.crl_uri.clone(),
collected.crl_bytes.clone(),
),
|| {
let item = match items.next() {
Some(item) => item,
None => return Ok(None)
};
let file = match str_from_ascii(item.file()) {
Ok(file) => file,
Err(_) => {
self.log.warn(format_args!(
"manifest {} contains illegal file name '{}'.",
self.cert.rpki_manifest(),
String::from_utf8_lossy(item.file())
));
return Err(store::UpdateError::Abort)
}
};
let uri = self.cert.ca_repository().join(
file.as_ref()
).unwrap();
let hash = ManifestHash::new(
item.hash().clone(), collected.content.file_hash_alg()
);
let content = match collector.load_object(&uri)? {
Some(content) => content,
None => {
self.log.warn(format_args!(
"{uri}: failed to load."
));
return Err(store::UpdateError::Abort)
}
};
if hash.verify(&content).is_err() {
self.log.warn(format_args!(
"{uri}: file has wrong manifest hash."
));
return Err(store::UpdateError::Abort)
}
if !self.process_object(
&uri, content.clone(),
&mut collected, &mut ca_tasks
)? {
point_ok = false;
}
Ok(Some(StoredObject::new(uri, content, Some(hash))))
}
);
match update_result {
Ok(()) => {
if point_ok {
self.accept_point(collected, metrics);
Ok(Ok(ca_tasks))
}
else {
self.reject_point(metrics);
Ok(Ok(Vec::new()))
}
}
Err(store::UpdateError::Abort) => {
Ok(Err(self))
}
Err(store::UpdateError::Failed(err)) => {
Err(err)
}
}
}
fn validate_collected_manifest(
&mut self,
manifest_bytes: Bytes,
repository: &collector::Repository,
) -> Result<Option<ValidPointManifest>, RunFailed> {
let manifest = match Manifest::decode(
manifest_bytes.clone(), self.run.validation.strict
) {
Ok(manifest) => manifest,
Err(_) => {
self.metrics.invalid_manifests += 1;
self.log.warn(format_args!(
"failed to decode manifest {}.",
self.cert.rpki_manifest()
));
return Ok(None)
}
};
let (ee_cert, content) = match manifest.validate(
self.cert.cert(), self.run.validation.strict
) {
Ok(some) => some,
Err(err) => {
self.metrics.invalid_manifests += 1;
self.log.warn(format_args!(
"manifest {}: {}.", self.cert.rpki_manifest(), err
));
return Ok(None)
}
};
if content.this_update() > Time::now() {
self.metrics.premature_manifests += 1;
self.log.warn(format_args!(
"premature manifest {}", self.cert.rpki_manifest()
));
return Ok(None)
}
if content.is_stale() {
self.metrics.stale_manifests += 1;
match self.run.validation.stale {
FilterPolicy::Reject => {
self.log.warn(format_args!(
"rejecting stale manifest {}",
self.cert.rpki_manifest()
));
return Ok(None)
}
FilterPolicy::Warn => {
self.log.warn(format_args!(
"stale manifest {}", self.cert.rpki_manifest()
));
}
FilterPolicy::Accept => { }
}
}
let (crl_uri, crl, crl_bytes) = match self.validate_collected_crl(
&ee_cert, &content, repository
)? {
Some(some) => some,
None => return Ok(None)
};
self.metrics.valid_manifests += 1;
Ok(Some(ValidPointManifest {
ee_cert, content, crl_uri, crl, manifest_bytes, crl_bytes,
metrics: Default::default(),
}))
}
fn check_collected_is_newer(
&mut self,
collected: &ValidPointManifest,
stored: &mut StoredPoint,
) -> Result<bool, Failed> {
let Some(stored_mft) = stored.manifest() else {
return Ok(true);
};
if collected.content.manifest_number() > stored_mft.manifest_number
&& collected.content.this_update() > stored_mft.this_update
{
return Ok(true);
}
if let Ok(mft) = Manifest::decode(
stored_mft.manifest.clone(), self.run.validation.strict
) {
if mft.content().manifest_number() == stored_mft.manifest_number
&& mft.content().this_update() == stored_mft.this_update
{
if collected.content.manifest_number()
<= stored_mft.manifest_number
{
self.log.warn(format_args!(
"manifest {}: manifest number is not greater than in \
stored version. Using stored publication point.",
self.cert.rpki_manifest(),
));
return Ok(false);
}
if collected.content.this_update()
<= stored_mft.this_update
{
self.log.warn(format_args!(
"manifest {}: manifest thisUpdate is not later than in \
stored version. Using stored publication point.",
self.cert.rpki_manifest(),
));
return Ok(false);
}
}
}
stored.reject()?;
Ok(true)
}
fn validate_collected_crl(
&mut self,
ee_cert: &ResourceCert,
manifest: &ManifestContent,
repository: &collector::Repository
) -> Result<Option<(uri::Rsync, Crl, Bytes)>, RunFailed> {
let crl_uri = match ee_cert.crl_uri() {
Some(some) if some.ends_with(".crl") => some.clone(),
Some(some) => {
self.metrics.invalid_manifests += 1;
self.log.warn(format_args!("invalid CRL URI {}", some));
return Ok(None)
}
None => {
self.metrics.invalid_manifests += 1;
self.log.warn(format_args!(
"missing CRL URI on manifest {}",
self.cert.rpki_manifest()
));
return Ok(None)
}
};
let crl_name = match crl_uri.relative_to(self.cert.ca_repository()) {
Some(name) => name,
None => {
self.metrics.invalid_manifests += 1;
self.log.warn(format_args!(
"CRL URI {crl_uri} outside repository directory."
));
return Ok(None)
}
};
let mut crl_bytes = None;
for item in manifest.iter() {
let (file, hash) = item.into_pair();
if file == crl_name {
let bytes = match repository.load_object(&crl_uri)? {
Some(bytes) => bytes,
None => {
self.metrics.invalid_crls += 1;
self.log.warn(format_args!(
"{crl_uri}: failed to load."
));
return Ok(None)
}
};
let hash = ManifestHash::new(hash, manifest.file_hash_alg());
if hash.verify(&bytes).is_err() {
self.metrics.invalid_crls += 1;
self.log.warn(format_args!(
"file {crl_uri} has wrong hash."
));
return Ok(None)
}
crl_bytes = Some(bytes);
}
}
let crl_bytes = match crl_bytes {
Some(some) => some,
None => {
self.metrics.invalid_crls += 1;
self.log.warn(format_args!("CRL not listed on manifest."));
return Ok(None)
}
};
let mut crl = match Crl::decode(crl_bytes.clone()) {
Ok(crl) => crl,
Err(_) => {
self.metrics.invalid_crls += 1;
self.log.warn(format_args!(
"CRL {crl_uri}: failed to decode."
));
return Ok(None)
}
};
if let Err(err) = crl.verify_signature(
self.cert.cert().subject_public_key_info()
) {
self.metrics.invalid_crls += 1;
self.log.warn(format_args!("CRL {crl_uri}: {err}."));
return Ok(None)
}
if crl.is_stale() {
self.metrics.stale_crls += 1;
match self.run.validation.stale {
FilterPolicy::Reject => {
self.log.warn(format_args!(
"rejecting stale CRL {crl_uri}."
));
return Ok(None)
}
FilterPolicy::Warn => {
self.log.warn(format_args!("stale CRL {crl_uri}."));
}
FilterPolicy::Accept => { }
}
}
if manifest.len() > CRL_CACHE_LIMIT {
crl.cache_serials()
}
if crl.contains(ee_cert.serial_number()) {
self.metrics.invalid_manifests += 1;
self.log.warn(format_args!(
"manifest certificate has been revoked."
));
return Ok(None)
}
self.metrics.valid_crls += 1;
Ok(Some((crl_uri, crl, crl_bytes)))
}
fn process_stored(
mut self,
mut store: StoredPoint,
metrics: &mut RunMetrics,
) -> Result<Vec<CaTask<P::PubPoint>>, Failed> {
let manifest = match store.manifest() {
Some(manifest) => manifest,
None => {
self.log.warn(format_args!(
"no valid manifest {} found.",
self.cert.rpki_manifest()
));
self.metrics.missing_manifests += 1;
self.reject_point(metrics);
return Ok(Vec::new())
}
};
let mut manifest = match self.validate_stored_manifest(manifest) {
Ok(manifest) => manifest,
Err(_) => {
self.reject_point(metrics);
return Ok(Vec::new())
}
};
manifest.point_validity(&mut self.processor);
let mut ca_tasks = Vec::new();
for object in &mut store {
let object = match object {
Ok(object) => object,
Err(err) => {
if err.is_fatal() {
error!(
"Fatal: failed to read from {}: {}",
store.path().display(), err
);
return Err(Failed)
}
else {
debug!(
"Ignoring invalid stored publication point \
at {}: {}",
store.path().display(), err
);
self.reject_point(metrics);
return Ok(Vec::new())
}
}
};
if !self.process_object(
&object.uri, object.content.clone(),
&mut manifest, &mut ca_tasks
)? {
self.reject_point(metrics);
return Ok(Vec::new())
}
}
self.accept_point(manifest, metrics);
Ok(ca_tasks)
}
fn validate_stored_manifest(
&mut self,
stored_manifest: &StoredManifest,
) -> Result<ValidPointManifest, Failed> {
let manifest = match Manifest::decode(
stored_manifest.manifest.clone(), self.run.validation.strict
) {
Ok(manifest) => manifest,
Err(_) => {
self.metrics.invalid_manifests += 1;
self.log.warn(format_args!(
"failed to decode manifest {}.",
self.cert.rpki_manifest(),
));
return Err(Failed);
}
};
let (ee_cert, content) = match manifest.validate(
self.cert.cert(), self.run.validation.strict
) {
Ok(some) => some,
Err(err) => {
self.log.warn(format_args!(
"manifest {}: {}.",
self.cert.rpki_manifest(), err
));
self.metrics.invalid_manifests += 1;
return Err(Failed);
}
};
if content.is_stale() {
self.metrics.stale_manifests += 1;
match self.run.validation.stale {
FilterPolicy::Reject => {
self.log.warn(format_args!(
"rejecting stale manifest {}",
self.cert.rpki_manifest()
));
self.metrics.invalid_manifests += 1;
return Err(Failed);
}
FilterPolicy::Warn => {
self.log.warn(format_args!(
"stale manifest {}", self.cert.rpki_manifest()
));
}
FilterPolicy::Accept => { }
}
}
let crl_uri = match ee_cert.crl_uri() {
Some(uri) => uri.clone(),
None => {
self.log.warn(format_args!("manifest without CRL URI."));
self.metrics.invalid_manifests += 1;
return Err(Failed)
}
};
let mut crl = match Crl::decode(stored_manifest.crl.clone()) {
Ok(crl) => crl,
Err(_) => {
self.metrics.invalid_manifests += 1;
self.metrics.invalid_crls += 1;
self.log.warn(format_args!(
"failed to decode CRL {crl_uri}."
));
return Err(Failed)
}
};
if let Err(err) = crl.verify_signature(
self.cert.cert().subject_public_key_info()
) {
self.log.warn(format_args!("CRL {crl_uri}: {err}."));
self.metrics.invalid_manifests += 1;
self.metrics.invalid_crls += 1;
return Err(Failed)
}
if crl.is_stale() {
self.metrics.stale_crls += 1;
match self.run.validation.stale {
FilterPolicy::Reject => {
self.log.warn(format_args!(
"rejecting stale CRL {crl_uri}."
));
self.metrics.invalid_manifests += 1;
self.metrics.invalid_crls += 1;
return Err(Failed)
}
FilterPolicy::Warn => {
self.log.warn(format_args!(
"stale CRL {crl_uri}."
));
}
FilterPolicy::Accept => { }
}
}
if content.len() > CRL_CACHE_LIMIT {
crl.cache_serials()
}
if crl.contains(ee_cert.serial_number()) {
self.log.warn(format_args!(
"manifest {}: certificate has been revoked.",
self.cert.rpki_manifest()
));
self.metrics.invalid_manifests += 1;
return Err(Failed)
}
self.metrics.valid_manifests += 1;
self.metrics.valid_crls += 1;
Ok(ValidPointManifest {
ee_cert, content, crl_uri, crl,
manifest_bytes: stored_manifest.manifest.clone(),
crl_bytes: stored_manifest.crl.clone(),
metrics: Default::default(),
})
}
fn accept_point(
mut self,
manifest: ValidPointManifest,
metrics: &mut RunMetrics,
) {
self.metrics.valid_points += 1;
self.metrics += manifest.metrics;
self.apply_metrics(metrics);
self.processor.commit();
}
fn reject_point(
mut self,
metrics: &mut RunMetrics,
) {
self.metrics.rejected_points += 1;
self.apply_metrics(metrics);
self.processor.cancel(self.cert);
let log = self.log.into_book();
if !log.is_empty() {
metrics.append_log(self.cert.ca_repository().clone(), log);
}
}
fn apply_metrics(
&mut self,
metrics: &mut RunMetrics,
) {
let repository_index = self.repository_index.unwrap_or_else(|| {
metrics.repository_index(self.cert)
});
self.processor.repository_index(repository_index);
metrics.apply(
&self.metrics,
repository_index,
self.cert.tal,
);
}
fn process_object(
&mut self,
uri: &uri::Rsync,
content: Bytes,
manifest: &mut ValidPointManifest,
ca_task: &mut Vec<CaTask<P::PubPoint>>,
) -> Result<bool, Failed> {
if !self.processor.want(uri)? {
return Ok(true)
}
if uri.ends_with(".cer") {
self.process_cer(uri, content, manifest, ca_task)?;
}
else if uri.ends_with(".roa") {
self.process_roa(uri, content, manifest)?;
}
else if uri.ends_with(".asa") {
self.process_aspa(uri, content, manifest)?;
}
else if uri.ends_with(".gbr") {
self.process_gbr(uri, content, manifest)?;
}
else if uri.ends_with(".crl") {
if *uri != manifest.crl_uri {
self.log.warn(format_args!("stray CRL {uri}."));
manifest.metrics.stray_crls += 1;
}
}
else {
manifest.metrics.others += 1;
self.log.warn(format_args!(
"object {uri}: unknown type."
));
}
Ok(true)
}
fn process_cer(
&mut self,
uri: &uri::Rsync,
content: Bytes,
manifest: &mut ValidPointManifest,
ca_task: &mut Vec<CaTask<P::PubPoint>>,
) -> Result<(), Failed> {
let cert = match Cert::decode(content) {
Ok(cert) => cert,
Err(_) => {
manifest.metrics.invalid_certs += 1;
self.log.warn(format_args!(
"certificate {uri}: failed to decode."
));
return Ok(())
}
};
if cert.key_usage() == KeyUsage::Ca {
self.process_ca_cer(uri, cert, manifest, ca_task)
}
else {
self.process_router_cert(uri, cert, manifest)
}
}
#[allow(clippy::too_many_arguments)]
fn process_ca_cer(
&mut self, uri: &uri::Rsync, cert: Cert,
manifest: &mut ValidPointManifest,
ca_task: &mut Vec<CaTask<P::PubPoint>>,
) -> Result<(), Failed> {
if self.cert.check_loop(&cert).is_err() {
self.log.warn(format_args!(
"CA certificate {uri}: certificate loop detected."
));
manifest.metrics.invalid_certs += 1;
return Ok(())
}
let cert = match cert.validate_ca(
self.cert.cert(), self.run.validation.strict
) {
Ok(cert) => cert,
Err(err) => {
self.log.warn(format_args!(
"CA certificagte {uri}: {err}."
));
manifest.metrics.invalid_certs += 1;
return Ok(())
}
};
if let Err(err) = manifest.check_crl(&cert) {
self.log.warn(format_args!(
"CA certificate {uri}: {err}."
));
manifest.metrics.invalid_certs += 1;
return Ok(())
}
let cert = match CaCert::chain(
self.cert, uri.clone(), cert, self.run.validation.max_ca_depth,
) {
Ok(cert) => cert,
Err(_) => {
manifest.metrics.invalid_certs += 1;
return Ok(())
}
};
manifest.metrics.valid_ca_certs += 1;
let processor = match self.processor.process_ca(
uri, &cert
)? {
Some(processor) => processor,
None => return Ok(())
};
let defer = match self.run.collector.as_ref() {
Some(collector) => !collector.was_updated(&cert),
None => false,
};
let repository_index = if cert.repository_switch() {
None
}
else {
self.repository_index
};
ca_task.push(CaTask {
cert, processor, repository_index, defer
});
Ok(())
}
fn process_router_cert(
&mut self, uri: &uri::Rsync, cert: Cert,
manifest: &mut ValidPointManifest,
) -> Result<(), Failed> {
if let Err(err) = cert.validate_router(
self.cert.cert(), self.run.validation.strict
) {
self.log.warn(format_args!(
"router certificate {uri}: {err}."
));
manifest.metrics.invalid_certs += 1;
return Ok(())
};
if let Err(err) = manifest.check_crl(&cert) {
self.log.warn(format_args!(
"router certificate {uri}: {err}."
));
manifest.metrics.invalid_certs += 1;
return Ok(())
}
manifest.metrics.valid_router_certs += 1;
self.processor.process_router_cert(uri, cert, self.cert)?;
Ok(())
}
fn process_roa(
&mut self, uri: &uri::Rsync, content: Bytes,
manifest: &mut ValidPointManifest,
) -> Result<(), Failed> {
let roa = match Roa::decode(
content, self.run.validation.strict
) {
Ok(roa) => roa,
Err(_) => {
manifest.metrics.invalid_roas += 1;
self.log.warn(format_args!(
"ROA {uri}: failed to decode."
));
return Ok(())
}
};
match roa.process(
self.cert.cert(),
self.run.validation.strict,
|cert| manifest.check_crl(cert)
) {
Ok((cert, route)) => {
manifest.metrics.valid_roas += 1;
self.processor.process_roa(uri, cert, route)?
}
Err(err) => {
manifest.metrics.invalid_roas += 1;
self.log.warn(format_args!(
"ROA {uri}: {err}."
))
}
}
Ok(())
}
#[allow(unused_variables)]
fn process_aspa(
&mut self, uri: &uri::Rsync, content: Bytes,
manifest: &mut ValidPointManifest,
) -> Result<(), Failed> {
let aspa = match Aspa::decode(
content, self.run.validation.strict
) {
Ok(aspa) => aspa,
Err(err) => {
manifest.metrics.invalid_aspas += 1;
self.log.warn(format_args!(
"ASPA {uri}: failed to decode."
));
return Ok(())
}
};
match aspa.process(
self.cert.cert(),
self.run.validation.strict,
|cert| manifest.check_crl(cert)
) {
Ok((cert, aspa)) => {
manifest.metrics.valid_aspas += 1;
self.processor.process_aspa(uri, cert, aspa)?
}
Err(err) => {
manifest.metrics.invalid_aspas += 1;
self.log.warn(format_args!(
"ASPA {uri}: {err}."
))
}
}
Ok(())
}
fn process_gbr(
&mut self, uri: &uri::Rsync, content: Bytes,
manifest: &mut ValidPointManifest,
) -> Result<(), Failed> {
let obj = match SignedObject::decode(
content, self.run.validation.strict
) {
Ok(obj) => obj,
Err(_) => {
manifest.metrics.invalid_gbrs += 1;
self.log.warn(format_args!(
"GBR {uri}: failed to decode."
));
return Ok(())
}
};
match obj.process(
self.cert.cert(),
self.run.validation.strict,
|cert| manifest.check_crl(cert)
) {
Ok((cert, content)) => {
manifest.metrics.valid_gbrs += 1;
self.processor.process_gbr(uri, cert, content)?
}
Err(err) => {
manifest.metrics.invalid_gbrs += 1;
self.log.warn(format_args!(
"GBR {uri}: {err}."
))
}
}
Ok(())
}
}
#[derive(Clone, Debug)]
struct ValidPointManifest {
ee_cert: ResourceCert,
content: ManifestContent,
crl_uri: uri::Rsync,
crl: Crl,
manifest_bytes: Bytes,
crl_bytes: Bytes,
metrics: PublicationMetrics,
}
impl ValidPointManifest {
fn check_crl(&self, cert: &Cert) -> Result<(), ValidationError> {
let crl_uri = match cert.crl_uri() {
Some(some) => some,
None => {
return Err(InspectionError::new(
"certificate has no CRL URI"
).into())
}
};
if *crl_uri != self.crl_uri {
return Err(InspectionError::new(
"certificate's CRL differs from manifest's"
).into())
}
if self.crl.contains(cert.serial_number()) {
return Err(InspectionError::new(
"certificate has been revoked"
).into())
}
Ok(())
}
fn point_validity(&self, processor: &mut impl ProcessPubPoint) {
processor.point_validity(
self.ee_cert.validity(),
cmp::min(
self.content.next_update(),
self.crl.next_update(),
)
)
}
}
enum Task<'a, P> {
Tal(TalTask<'a>),
Ca(CaTask<P>),
}
impl<P> fmt::Debug for Task<'_, P> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Task::Tal(ref inner) => {
write!(f, "TalTask {{ tal: {} }}", inner.tal.info().name())
}
Task::Ca(ref inner) => {
write!(
f, "CaTask {{ ca_repository: {} }}",
inner.cert.ca_repository
)
}
}
}
}
struct TalTask<'a> {
tal: &'a Tal,
index: usize,
}
struct CaTask<P> {
cert: Arc<CaCert>,
processor: P,
repository_index: Option<usize>,
defer: bool,
}
#[derive(Clone, Debug)]
pub struct CaCert {
cert: ResourceCert,
uri: TalUri,
ca_repository: uri::Rsync,
rpki_manifest: uri::Rsync,
parent: Option<Arc<CaCert>>,
chain_len: usize,
pub(crate) tal: usize,
}
impl CaCert {
pub fn root(
cert: ResourceCert, uri: TalUri, tal: usize
) -> Result<Arc<Self>, Failed> {
Self::new(cert, uri, None, 0, tal)
}
pub fn chain(
issuer: &Arc<Self>,
uri: uri::Rsync,
cert: ResourceCert,
max_depth: usize,
) -> Result<Arc<Self>, Failed> {
let chain_len = match issuer.chain_len.checked_add(1) {
Some(chain_len) => chain_len,
None => {
error!(
"CA {uri}: CA depth overrun."
);
return Err(Failed)
}
};
if chain_len > max_depth {
error!(
"CA {uri}: CA depth overrun."
);
return Err(Failed)
}
Self::new(
cert, TalUri::Rsync(uri),
Some(issuer.clone()), chain_len,
issuer.tal
)
}
fn new(
cert: ResourceCert,
uri: TalUri,
parent: Option<Arc<Self>>,
chain_len: usize,
tal: usize,
) -> Result<Arc<Self>, Failed> {
let ca_repository = match cert.ca_repository() {
Some(uri) => uri.clone(),
None => {
error!(
"CA cert {uri} has no repository URI. \
Why has it not been rejected yet?"
);
return Err(Failed)
}
};
let rpki_manifest = match cert.rpki_manifest() {
Some(uri) => uri.clone(),
None => {
error!(
"CA cert {uri} has no manifest URI. \
Why has it not been rejected yet?"
);
return Err(Failed)
}
};
Ok(Arc::new(CaCert {
cert, uri, ca_repository, rpki_manifest, parent, chain_len, tal
}))
}
pub fn check_loop(&self, cert: &Cert) -> Result<(), Failed> {
self._check_loop(cert.subject_key_identifier())
}
fn _check_loop(&self, key_id: KeyIdentifier) -> Result<(), Failed> {
if self.cert.subject_key_identifier() == key_id {
Err(Failed)
}
else if let Some(ref parent) = self.parent {
parent._check_loop(key_id)
}
else {
Ok(())
}
}
pub fn cert(&self) -> &ResourceCert {
&self.cert
}
pub fn uri(&self) -> &TalUri {
&self.uri
}
pub fn ca_repository(&self) -> &uri::Rsync {
&self.ca_repository
}
pub fn rpki_manifest(&self) -> &uri::Rsync {
&self.rpki_manifest
}
pub fn rpki_notify(&self) -> Option<&uri::Https> {
self.cert.rpki_notify()
}
pub(crate) fn repository_switch(&self) -> bool {
let parent = match self.parent.as_ref() {
Some(parent) => parent,
None => return true,
};
match self.rpki_notify() {
Some(rpki_notify) => {
Some(rpki_notify) != parent.rpki_notify()
}
None => {
self.ca_repository.module() != parent.ca_repository.module()
}
}
}
}
#[derive(Debug, Default)]
struct RunMetrics {
tals: Vec<PublicationMetrics>,
repositories: Vec<PublicationMetrics>,
publication: PublicationMetrics,
log_books: Vec<(uri::Rsync, LogBook)>,
repository_indexes: Arc<Mutex<HashMap<String, usize>>>,
}
impl RunMetrics {
pub fn fork(&self) -> Self {
RunMetrics {
tals: Default::default(),
repositories: Default::default(),
publication: Default::default(),
repository_indexes: self.repository_indexes.clone(),
log_books: Default::default(),
}
}
pub fn repository_index(&self, cert: &CaCert) -> usize {
let uri = cert.rpki_notify().map(|uri| {
Cow::Borrowed(uri.as_str())
}).unwrap_or_else(|| {
cert.ca_repository.canonical_module()
});
let mut repository_indexes = self.repository_indexes.lock().unwrap();
if let Some(index) = repository_indexes.get(uri.as_ref()) {
return *index
}
let index = repository_indexes.len();
repository_indexes.insert(uri.into_owned(), index);
index
}
pub fn apply(
&mut self, metrics: &PublicationMetrics,
repository_index: usize, tal_index: usize,
) {
while self.repositories.len() <= repository_index {
self.repositories.push(Default::default())
}
self.repositories[repository_index] += metrics;
while self.tals.len() <= tal_index {
self.tals.push(Default::default())
}
self.tals[tal_index] += metrics;
self.publication += metrics;
}
pub fn append_log(&mut self, uri: uri::Rsync, book: LogBook) {
self.log_books.push((uri, book))
}
pub fn prepare_final(&self, target: &mut Metrics) {
let mut indexes: Vec<_>
= self.repository_indexes.lock().unwrap().iter().map(|item| {
(item.0.clone(), *item.1)
}).collect();
indexes.sort_by_key(|(_, idx)| *idx);
target.repositories = indexes.into_iter().map(|(uri, _)| {
RepositoryMetrics::new(uri)
}).collect();
}
pub fn collapse(mut self, target: &mut Metrics) {
for (target, metric) in target.tals.iter_mut().zip(self.tals) {
target.publication += metric
}
for (target, metric) in target.repositories.iter_mut().zip(
self.repositories
) {
target.publication += metric
}
target.publication += self.publication;
target.pub_point_logs.append(&mut self.log_books);
}
}
pub trait ProcessRun: Send + Sync {
type PubPoint: ProcessPubPoint;
fn process_ta(
&self, tal: &Tal, uri: &TalUri, cert: &CaCert, tal_index: usize
) -> Result<Option<Self::PubPoint>, Failed>;
}
pub trait ProcessPubPoint: Sized + Send + Sync {
fn repository_index(&mut self, repository_index: usize) {
let _ = repository_index;
}
fn point_validity(
&mut self,
manifest_ee: Validity,
stale: Time,
) {
let _ = (manifest_ee, stale);
}
fn want(&self, uri: &uri::Rsync) -> Result<bool, Failed>;
fn process_ca(
&mut self, uri: &uri::Rsync, cert: &CaCert,
) -> Result<Option<Self>, Failed>;
fn process_router_cert(
&mut self, uri: &uri::Rsync, cert: Cert, ca_cert: &CaCert,
) -> Result<(), Failed> {
let _ = (uri, cert, ca_cert);
Ok(())
}
fn process_roa(
&mut self,
uri: &uri::Rsync,
cert: ResourceCert,
route: RouteOriginAttestation
) -> Result<(), Failed> {
let _ = (uri, cert, route);
Ok(())
}
fn process_aspa(
&mut self,
uri: &uri::Rsync,
cert: ResourceCert,
aspa: AsProviderAttestation,
) -> Result<(), Failed> {
let _ = (uri, cert, aspa);
Ok(())
}
fn process_gbr(
&mut self,
uri: &uri::Rsync,
cert: ResourceCert,
content: Bytes
) -> Result<(), Failed> {
let _ = (uri, cert, content);
Ok(())
}
fn restart(&mut self) -> Result<(), Failed>;
fn commit(self);
fn cancel(self, _cert: &CaCert) {
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn dump_empty_cache() {
let _ = crate::process::Process::init(); let src = tempfile::tempdir().unwrap();
let target = tempfile::tempdir().unwrap();
let target = target.path().join("dump");
let mut config = Config::default_with_paths(
Default::default(), src.path().into()
);
config.rsync_command = "echo".into();
config.rsync_args = Some(vec!["some".into()]);
let engine = Engine::new(&config, true).unwrap();
engine.dump(&target).unwrap();
}
}