/// Updating and processing of RPKI data.
///
/// This module provides types and traits implementing validation of RPKI data
/// from a set of trust anchor locators to some output data.
///
/// Data validation is configured through [`Engine`] so that the
/// configuration can be used for multiple validation runs. This includes both
/// a [collector][crate::collector::Collector] and
/// [store][crate::store::Store] to use for validation.
///
/// Individual validation runs are managed through [`Run`]. Such a runner can
/// be obtained from validation via its [`start`][Engine::start] method.
/// It in turn provides the [`process`][Run::process] method which drives the
/// actual validation.
///
/// Engine runs are generic over what exactly should be done with valid
/// RPKI data. The trait [`ProcessRun`] represents a full validation run with
/// the accompanying trait [`ProcessPubPoint`] dealing with individual
/// publication points.
use std::{fmt, fs};
use std::borrow::Cow;
use std::collections::HashMap;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use bytes::Bytes;
use crossbeam_queue::{ArrayQueue, SegQueue};
use crossbeam_utils::thread;
use log::{debug, error, info, warn};
use rpki::crypto::keys::KeyIdentifier;
use rpki::repository::cert::{Cert, KeyUsage, ResourceCert};
use rpki::repository::crl::Crl;
use rpki::repository::error::{InspectionError, ValidationError};
use rpki::repository::manifest::{Manifest, ManifestContent, ManifestHash};
use rpki::repository::roa::{Roa, RouteOriginAttestation};
use rpki::repository::sigobj::SignedObject;
use rpki::repository::tal::{Tal, TalInfo, TalUri};
use rpki::repository::x509::{Time, Validity};
use rpki::uri;
use crate::{collector, store, tals};
use crate::config::{Config, FilterPolicy};
use crate::collector::Collector;
use crate::error::Failed;
use crate::metrics::{
Metrics, PublicationMetrics, RepositoryMetrics, TalMetrics
};
use crate::store::{Store, StoredManifest, StoredObject, StoredPoint};
use crate::utils::str::str_from_ascii;
//------------ Configuration -------------------------------------------------
/// The minimum number of manifest entries that triggers CRL serial caching.
///
/// The value has been determined experimentally with the RPKI repository at
/// a certain state so may or may not be a good one, really.
const CRL_CACHE_LIMIT: usize = 50;
//------------ Engine --------------------------------------------------------
/// The mechanism to update and process RPKI data.
///
/// A validation value can be created from the configuration via
/// [`Engine::new`]. If you don’t actually want to perform a validation run
/// but just initialize everything, [`Engine::init`] will suffice.
///
/// When created, the set of TALs is loaded and kept around. It will only be
/// refreshed explicitly through the [`reload_tals`][Self::reload_tals]
/// method.
///
/// Before starting the very first validation run, you need to call
/// [`ignite`][Self::ignite] at least once. As this may spawn threads, this
/// must happen after a possible fork.
///
/// A run is started via the [`start`][Self::start] method, providing a
/// processor that handles valid data. The method returns a [Run] value that
/// drives the validation run.
#[derive(Debug)]
pub struct Engine {
/// A list of built-in TALs to use.
bundled_tals: Vec<Tal>,
/// An optional directory to load TALs from.
extra_tals_dir: Option<PathBuf>,
/// A mapping of TAL file names to TAL labels.
tal_labels: HashMap<String, String>,
/// The list of our TALs.
tals: Vec<Tal>,
/// The collector to load updated data from.
///
/// If this is `None`, updates have been disabled.
collector: Option<Collector>,
/// The store to load stored data from.
store: Store,
/// Should we be strict when decoding data?
strict: bool,
/// How do we deal with stale objects?
stale: FilterPolicy,
/// Number of validation threads.
validation_threads: usize,
/// Should we leave the repository dirty after a valiation run.
dirty_repository: bool,
/// Maximum depth of the CA chain.
max_ca_depth: usize,
}
impl Engine {
/// Initializes the engine without creating a value.
///
/// This ensures that the TAL directory is present and logs a hint how
/// to achieve that if not.
///
/// The function is called implicitly by [`new`][Self::new].
pub fn init(config: &Config) -> Result<(), Failed> {
Collector::init(config)?;
Store::init(config)?;
Ok(())
}
/// Creates a new engine.
///
/// Takes all necessary information from `config`.
/// It also takes over the provided cache and store for use during
/// validation.
///
/// Loads the initial set of TALs and errors out if that fails.
pub fn new(
config: &Config,
update: bool,
) -> Result<Self, Failed> {
let collector = if update {
Some(Collector::new(config)?)
}
else {
None
};
let store = Store::new(config)?;
let mut res = Engine {
bundled_tals: tals::collect_tals(config)?,
extra_tals_dir: config.extra_tals_dir.clone(),
tal_labels: config.tal_labels.clone(),
tals: Vec::new(),
collector,
store,
strict: config.strict,
stale: config.stale,
validation_threads: config.validation_threads,
dirty_repository: config.dirty_repository,
max_ca_depth: config.max_ca_depth,
};
res.reload_tals()?;
Ok(res)
}
/// Reloads the set of TALs.
///
/// Assumes that all regular files with an extension of `tal` in the
/// TAL directory specified during object creation are TAL files and
/// tries to load and decode them. Fails if that fails for at least one
/// of those files.
///
/// It is not considered an error if there are no TAL files in the TAL
/// directory. However, a warning will be logged in this case.
pub fn reload_tals(&mut self) -> Result<(), Failed> {
let mut res = self.bundled_tals.clone();
if let Some(extra_tals_dir) = self.extra_tals_dir.as_ref() {
let dir = match fs::read_dir(extra_tals_dir) {
Ok(dir) => dir,
Err(err) => {
error!("Failed to open TAL directory {}: {}.",
extra_tals_dir.display(), err
);
return Err(Failed)
}
};
for entry in dir {
let entry = match entry {
Ok(entry) => entry,
Err(err) => {
error!(
"Failed to iterate over tal directory: {}",
err
);
return Err(Failed)
}
};
if !entry.file_type().map(|ft| ft.is_file()).unwrap_or(false) {
continue
}
let path = entry.path();
if path.extension().map(|ext| ext != "tal").unwrap_or(true) {
continue
}
let mut file = match File::open(&path) {
Ok(file) => {
file
}
Err(err) => {
error!(
"Failed to open TAL {}: {}. \n\
Aborting.",
path.display(), err
);
return Err(Failed)
}
};
let mut tal = match Tal::read_named(
self.path_to_tal_label(&path),
&mut file
) {
Ok(tal) => tal,
Err(err) => {
error!(
"Failed to read TAL {}: {}. \n\
Aborting.",
path.display(), err
);
return Err(Failed)
}
};
tal.prefer_https();
res.push(tal);
}
}
if res.is_empty() {
warn!(
"No TALs provided. Starting anyway."
);
}
res.sort_by(|left, right| {
left.info().name().cmp(right.info().name())
});
self.tals = res;
Ok(())
}
/// Converts a path into a TAL label.
///
/// This will be an explicitly configured TAL label if the file name
/// portion of the path is registered in `self.tal_labels` or the file
/// name without the `tal` extension otherwise.
fn path_to_tal_label(&self, path: &Path) -> String {
if let Some(name) = path.file_name().unwrap().to_str() {
if let Some(label) = self.tal_labels.get(name) {
return label.clone()
}
}
path.file_stem().unwrap().to_string_lossy().into_owned()
}
/// Ignites validation processing.
///
/// This spawns threads and therefore needs to be done after a
/// possible fork.
pub fn ignite(&mut self) -> Result<(), Failed> {
if let Some(collector) = self.collector.as_mut() {
collector.ignite()?;
}
Ok(())
}
/// Starts a validation run.
///
/// During the run, `processor` will be responsible for dealing with
/// valid objects. It must implement the [`ProcessRun`] trait.
///
/// The method returns a [`Run`] that drives the validation run.
pub fn start<P: ProcessRun>(
&self, processor: P
) -> Result<Run<P>, Failed> {
info!("Using the following TALs:");
for tal in &self.tals {
info!(" * {}", tal.info().name());
}
Ok(Run::new(
self,
self.collector.as_ref().map(Collector::start),
self.store.start(),
processor
))
}
/// Dumps the content of the collector and store owned by the engine.
pub fn dump(&self, dir: &Path) -> Result<(), Failed> {
self.store.dump(dir)?;
if let Some(collector) = self.collector.as_ref() {
collector.dump(dir)?;
}
Ok(())
}
}
//------------ Run -----------------------------------------------------------
/// A single validation run.
///
/// The runner is generic over the processor of valid data which must
/// implement the [`ProcessRun`] trait. The actual run is triggered by the
/// [`process`][Self::process] method. Upon completion, metrics of the run
/// can be extracted through [`done`][Self::done].
pub struct Run<'a, P> {
/// A reference to the underlying validation.
validation: &'a Engine,
/// The runner for the collector.
collector: Option<collector::Run<'a>>,
/// The runner for the store.
store: store::Run<'a>,
/// The processor for valid data.
processor: P,
/// The metrics collected during the run.
metrics: Metrics,
}
impl<'a, P> Run<'a, P> {
/// Creates a new runner from all the parts.
fn new(
validation: &'a Engine,
collector: Option<collector::Run<'a>>,
store: store::Run<'a>,
processor: P,
) -> Self {
Run {
validation, collector, store, processor,
metrics: Default::default()
}
}
/// Cleans the collector and store owned by the engine.
pub fn cleanup(&mut self) -> Result<(), Failed> {
if self.validation.dirty_repository {
debug!("Skipping cleanup as configured.");
return Ok(())
}
let mut retain = collector::Cleanup::new();
self.store.cleanup(&mut retain)?;
if let Some(collector) = self.collector.as_mut() {
collector.cleanup(&mut retain)?;
}
Ok(())
}
/// Finishes the validation run and returns the metrics.
///
/// If you are not interested in the metrics, you can simple drop the
/// value, instead.
pub fn done(self) -> Metrics {
let mut metrics = self.metrics;
if let Some(collector) = self.collector {
collector.done(&mut metrics)
}
self.store.done(&mut metrics);
metrics
}
}
impl<'a, P: ProcessRun> Run<'a, P> {
/// Performs the validation run.
pub fn process(&mut self) -> Result<(), Failed> {
// If we don’t have any TALs, we ain’t got nothing to do.
if self.validation.tals.is_empty() {
return Ok(())
}
// Initialize our task queue with all the TALs.
let metrics = RunMetrics::default();
let tasks = SegQueue::new();
for (index, tal) in self.validation.tals.iter().enumerate() {
tasks.push(Task::Tal(TalTask { tal, index }));
self.metrics.tals.push(TalMetrics::new(tal.info().clone()));
}
// And off we trot.
// Keep a flag to cancel everything if something goes wrong.
let had_err = AtomicBool::new(false);
let thread_metrics = ArrayQueue::new(
self.validation.validation_threads
);
let res = thread::scope(|scope| {
for _ in 0 .. self.validation.validation_threads {
scope.spawn(|_| {
let mut metrics = metrics.fork();
while let Some(task) = tasks.pop() {
if self.process_task(
task, &tasks, &mut metrics, &had_err,
).is_err() {
break;
}
}
thread_metrics.push(metrics).unwrap();
});
}
});
if res.is_err() {
// One of the workers has panicked. Well gosh darn.
error!(
"Engine failed after a worker thread has panicked. \
This is most assuredly a bug."
);
return Err(Failed);
}
if had_err.load(Ordering::Relaxed) {
return Err(Failed);
}
metrics.prepare_final(&mut self.metrics);
while let Some(metrics) = thread_metrics.pop() {
metrics.collapse(&mut self.metrics);
}
Ok(())
}
/// Process a task. Any task.
fn process_task(
&self,
task: Task<P::PubPoint>,
tasks: &SegQueue<Task<P::PubPoint>>,
metrics: &mut RunMetrics,
had_err: &AtomicBool,
) -> Result<(), Failed> {
match task {
Task::Tal(task) => {
self.process_tal_task(task, tasks, metrics, had_err)
}
Task::Ca(task) => {
self.process_ca_task(task, tasks, metrics, had_err)
}
}
}
/// Processes a trust anchor.
fn process_tal_task(
&self, task: TalTask,
tasks: &SegQueue<Task<P::PubPoint>>,
metrics: &mut RunMetrics,
had_err: &AtomicBool,
) -> Result<(), Failed> {
for uri in task.tal.uris() {
let cert = match self.load_ta(uri, task.tal.info())? {
Some(cert) => cert,
_ => continue,
};
if cert.subject_public_key_info() != task.tal.key_info() {
warn!(
"Trust anchor {}: key doesn’t match TAL.",
uri
);
continue;
}
let cert = match cert.validate_ta(
task.tal.info().clone(), self.validation.strict
) {
Ok(cert) => CaCert::root(cert, uri.clone(), task.index),
Err(err) => {
warn!("Trust anchor {}: {}.", uri, err);
continue;
}
};
let cert = match cert {
Ok(cert) => cert,
Err(_) => continue,
};
debug!("Found valid trust anchor {}. Processing.", uri);
match self.processor.process_ta(
task.tal, uri, &cert, cert.tal
)? {
Some(processor) => {
return self.process_ca_task(
CaTask {
cert, processor,
repository_index: None,
defer: false,
},
tasks, metrics, had_err
)
}
None => {
debug!("Skipping trust anchor {}.", uri);
return Ok(())
}
}
}
warn!("No valid trust anchor for TAL {}", task.tal.info().name());
Ok(())
}
/// Loads a trust anchor certificate with the given URI.
///
/// Attempts to download the certificate from upstream but falls back to
/// the version in the store if available.
fn load_ta(
&self,
uri: &TalUri,
_info: &TalInfo,
) -> Result<Option<Cert>, Failed> {
// Get the new version, store and return it if it decodes.
if let Some(collector) = self.collector.as_ref() {
if let Some(bytes) = collector.load_ta(uri) {
if let Ok(cert) = Cert::decode(bytes.clone()) {
self.store.update_ta(uri, &bytes)?;
return Ok(Some(cert))
}
}
}
// Get what we have in store.
self.store.load_ta(uri).map(|bytes| {
bytes.and_then(|bytes| Cert::decode(bytes).ok())
})
}
/// Processes a CA.
fn process_ca_task(
&self,
task: CaTask<P::PubPoint>,
tasks: &SegQueue<Task<P::PubPoint>>,
metrics: &mut RunMetrics,
had_err: &AtomicBool,
) -> Result<(), Failed> {
let more_tasks = PubPoint::new(
self, &task.cert, task.processor, task.repository_index,
).and_then(|point| {
point.process(metrics)
}).map_err(|_| {
had_err.store(true, Ordering::Relaxed);
Failed
})?;
for task in more_tasks {
if had_err.load(Ordering::Relaxed) {
return Err(Failed)
}
if task.defer {
tasks.push(Task::Ca(task))
}
else {
self.process_ca_task(task, tasks, metrics, had_err)?;
}
}
Ok(())
}
}
//------------ PubPoint ------------------------------------------------------
/// Validation of a single publication point.
struct PubPoint<'a, P: ProcessRun> {
/// A reference to the runner.
run: &'a Run<'a, P>,
/// A reference to the CA certificate of the publication point.
cert: &'a Arc<CaCert>,
/// The processor for valid data at this publication point.
processor: P::PubPoint,
/// The index of this point’s repository in the run’s metrics.
repository_index: Option<usize>,
/// The base publication metrics for this publication point.
///
/// This one only receives information about the publication point itself,
/// as well as the manifest and manifest CRL. Everything else will be
/// collected during object processing via `ValidPointManifest` so we can
/// drop it if the point gets cancelled.
metrics: PublicationMetrics,
}
impl<'a, P: ProcessRun> PubPoint<'a, P> {
/// Creates a new publication point validator based on a CA certificate.
pub fn new(
run: &'a Run<'a, P>,
cert: &'a Arc<CaCert>,
processor: P::PubPoint,
repository_index: Option<usize>,
) -> Result<Self, Failed> {
Ok(PubPoint {
run, cert, processor, repository_index,
metrics: Default::default(),
})
}
/// Performs validation of the publication point.
///
/// Upon success, returns a list of all the child CAs of this publication
/// point as CA processing tasks.
pub fn process(
self,
metrics: &mut RunMetrics,
) -> Result<Vec<CaTask<P::PubPoint>>, Failed> {
let mut store = self.run.store.pub_point(self.cert)?;
if let Some(collector) = self.run.collector.as_ref() {
if let Some(collector) = collector.repository(self.cert)? {
match self.process_collected(
collector, &mut store, metrics
)? {
Ok(res) => return Ok(res),
Err(mut this) => {
this.metrics = Default::default();
return this.process_stored(store, metrics)
}
}
}
}
self.process_stored(store, metrics)
}
/// Tries to update the stored data and validate at the same time.
///
/// Tries to fetch the updated manifest from the collector. If it differs
/// from the stored manifest, updates the stored manifest and objects if
/// the manifest is valid and all the objects are present and match their
/// hashes. While checking, also processes the data so we only need to do
/// that once.
///
/// Returns a list of the child CAs if it successfully finished or `self`
/// if there is no update or it is not valid and the store needs to be
/// consulted.
#[allow(clippy::type_complexity)] // Yeah, I know ...
fn process_collected(
mut self,
collector: collector::Repository,
store: &mut StoredPoint,
metrics: &mut RunMetrics,
) -> Result<Result<Vec<CaTask<P::PubPoint>>, Self>, Failed> {
// Try to load the manifest from the collector. If there isn’t one,
// we are done, too.
let collected = match collector.load_object(
self.cert.rpki_manifest()
)? {
Some(collected) => collected,
None => return Ok(Err(self))
};
// If the stored and collected manifests are the same, nothing has
// changed and we can abort the update. However, we need to check that
// the stored manifest refers to the same CA repository URI, just to
// be sure.
let same = if let Some(mft) = store.manifest() {
mft.manifest() == &collected
&& mft.ca_repository() == self.cert.ca_repository()
}
else {
false
};
if same {
return Ok(Err(self))
}
// Validate the collected manifest. If it is no good, well, you know
// the drill.
let mut collected = match self.validate_collected_manifest(
collected, &collector
)? {
Some(collected) => collected,
None => {
return Ok(Err(self))
}
};
// The manifest is fine, so we can now look at the objects. The
// objects are fine if they are present and match the hash. If they
// don’t we have to cancel the update. We also validate them while we
// are at it. This also collects all the child CAs that need
// processing later on in `ca_tasks`.
//
// However, the processor can decide it doesn’t like the publication
// point at all. This is not an error -- the publication point is
// correct from a store perspective --, but we must not process te
// collected `ca_tasks`. We keep track of this through `point_ok` and,
// if that happens to end up being `false` return an empty list to
// signal that the publication point was processed successfully but
// shouldn’t be considered further.
let mut ca_tasks = Vec::new();
let mut items = collected.content.iter();
let mut point_ok = true;
let update_result = store.update(
StoredManifest::new(
collected.ee_cert.validity().not_after(),
self.cert.rpki_notify().cloned(),
self.cert.ca_repository().clone(),
self.cert.rpki_manifest().clone(),
collected.manifest_bytes.clone(),
collected.crl_uri.clone(),
collected.crl_bytes.clone(),
),
|| {
let item = match items.next() {
Some(item) => item,
None => return Ok(None)
};
let file = match str_from_ascii(item.file()) {
Ok(file) => file,
Err(_) => {
warn!("{}: illegal file name '{}'.",
self.cert.rpki_manifest(),
String::from_utf8_lossy(item.file())
);
return Err(store::UpdateError::Abort)
}
};
let uri = self.cert.ca_repository().join(
file.as_ref()
).unwrap();
let hash = ManifestHash::new(
item.hash().clone(), collected.content.file_hash_alg()
);
let content = match collector.load_object(&uri)? {
Some(content) => content,
None => {
warn!("{}: failed to load.", uri);
return Err(store::UpdateError::Abort)
}
};
if hash.verify(&content).is_err() {
warn!("{}: file has wrong manifest hash.", uri);
return Err(store::UpdateError::Abort)
}
if !self.process_object(
&uri, content.clone(),
&mut collected, &mut ca_tasks
)? {
point_ok = false;
}
Ok(Some(StoredObject::new(uri, content, Some(hash))))
}
);
match update_result {
Ok(()) => {
// Update was successful. We have to accept whatever result
// we got.
if point_ok {
self.accept_point(collected, metrics);
Ok(Ok(ca_tasks))
}
else {
self.reject_point(metrics);
Ok(Ok(Vec::new()))
}
}
Err(store::UpdateError::Abort) => {
// Update was aborted. We need to use the store.
Ok(Err(self))
}
Err(store::UpdateError::Fatal) => {
// We are doomed.
Err(Failed)
}
}
}
/// Tries to validate a manifest acquired from the collector.
///
/// Checks that the manifest is correct itself and has been signed by the
/// publication point’s CA. Tries to load the associated CRL from the
/// collector, validates that against the CA and checks that the manifest
/// has not been revoked.
fn validate_collected_manifest(
&mut self,
manifest_bytes: Bytes,
repository: &collector::Repository,
) -> Result<Option<ValidPointManifest>, Failed> {
let manifest = match Manifest::decode(
manifest_bytes.clone(), self.run.validation.strict
) {
Ok(manifest) => manifest,
Err(err) => {
self.metrics.invalid_manifests += 1;
warn!("{}: {}.", self.cert.rpki_manifest(), err);
return Ok(None)
}
};
let (ee_cert, content) = match manifest.validate(
self.cert.cert(), self.run.validation.strict
) {
Ok(some) => some,
Err(err) => {
self.metrics.invalid_manifests += 1;
warn!("{}: {}.", self.cert.rpki_manifest(), err);
return Ok(None)
}
};
if content.this_update() > Time::now() {
self.metrics.premature_manifests += 1;
warn!("{}: premature manifest", self.cert.rpki_manifest());
return Ok(None)
}
if content.is_stale() {
self.metrics.stale_manifests += 1;
match self.run.validation.stale {
FilterPolicy::Reject => {
warn!("{}: stale manifest", self.cert.rpki_manifest());
return Ok(None)
}
FilterPolicy::Warn => {
warn!("{}: stale manifest", self.cert.rpki_manifest());
}
FilterPolicy::Accept => { }
}
}
let (crl_uri, crl, crl_bytes) = match self.validate_collected_crl(
&ee_cert, &content, repository
)? {
Some(some) => some,
None => return Ok(None)
};
self.metrics.valid_manifests += 1;
Ok(Some(ValidPointManifest {
ee_cert, content, crl_uri, crl, manifest_bytes, crl_bytes,
metrics: Default::default(),
}))
}
/// Check the manifest CRL.
///
/// Checks that there is exactly one CRL on the manifest, that it matches
/// the CRL mentioned in the manifest’s EE certificate, that it matches
/// its manifest hash, that it is a valid CRL for the CA, and that it does
/// not revoke the manifest’s EE certificate.
///
/// If all that is true, returns the decoded CRL.
fn validate_collected_crl(
&mut self,
ee_cert: &ResourceCert,
manifest: &ManifestContent,
repository: &collector::Repository
) -> Result<Option<(uri::Rsync, Crl, Bytes)>, Failed> {
// Let’s first get the manifest CRL’s name relative to repo_uri. If
// it ain’t relative at all, this is already invalid.
let crl_uri = match ee_cert.crl_uri() {
// RFC 6481: MUST end in .crl.
Some(some) if some.ends_with(".crl") => some.clone(),
_ => {
self.metrics.invalid_manifests += 1;
warn!("{}: invalid CRL URI.", self.cert.rpki_manifest());
return Ok(None)
}
};
let crl_name = match crl_uri.relative_to(self.cert.ca_repository()) {
Some(name) => name,
None => {
self.metrics.invalid_manifests += 1;
warn!(
"{}: CRL URI outside repository directory.",
self.cert.rpki_manifest()
);
return Ok(None)
}
};
// Now we go over the manifest and try to find an entry matching
// crl_name.
let mut crl_bytes = None;
for item in manifest.iter() {
let (file, hash) = item.into_pair();
if file == crl_name {
let bytes = match repository.load_object(&crl_uri)? {
Some(bytes) => bytes,
None => {
self.metrics.invalid_crls += 1;
warn!("{}: failed to load.", crl_uri);
return Ok(None)
}
};
let hash = ManifestHash::new(hash, manifest.file_hash_alg());
if hash.verify(&bytes).is_err() {
self.metrics.invalid_crls += 1;
warn!("{}: file has wrong hash.", crl_uri);
return Ok(None)
}
crl_bytes = Some(bytes);
}
}
let crl_bytes = match crl_bytes {
Some(some) => some,
None => {
self.metrics.invalid_crls += 1;
warn!(
"{}: CRL not listed on manifest.",
self.cert.rpki_manifest()
);
return Ok(None)
}
};
// Decode and validate the CRL.
let mut crl = match Crl::decode(crl_bytes.clone()) {
Ok(crl) => crl,
Err(err) => {
self.metrics.invalid_crls += 1;
warn!("{}: {}.", crl_uri, err);
return Ok(None)
}
};
if let Err(err) = crl.verify_signature(
self.cert.cert().subject_public_key_info()
) {
self.metrics.invalid_crls += 1;
warn!("{}: {}.", crl_uri, err);
return Ok(None)
}
if crl.is_stale() {
self.metrics.stale_crls += 1;
match self.run.validation.stale {
FilterPolicy::Reject => {
warn!("{}: stale CRL.", crl_uri);
return Ok(None)
}
FilterPolicy::Warn => {
warn!("{}: stale CRL.", crl_uri);
}
FilterPolicy::Accept => { }
}
}
// Turn on serial caching before looking for the first serial.
if manifest.len() > CRL_CACHE_LIMIT {
crl.cache_serials()
}
// Finally: has the manifest’s cert been revoked?
if crl.contains(ee_cert.serial_number()) {
self.metrics.invalid_manifests += 1;
warn!(
"{}: certificate has been revoked.",
self.cert.rpki_manifest()
);
return Ok(None)
}
// Phew: All good.
self.metrics.valid_crls += 1;
Ok(Some((crl_uri, crl, crl_bytes)))
}
/// Process the stored version of the publication point.
///
/// Validates the objects of the currently stored version of the
/// publication point if there is one.
///
/// Returns a list of all child CAs of this publication point.
fn process_stored(
mut self,
mut store: StoredPoint,
metrics: &mut RunMetrics,
) -> Result<Vec<CaTask<P::PubPoint>>, Failed> {
let manifest = match store.take_manifest() {
Some(manifest) => manifest,
None => {
// We don’t seem to have this point in the store either.
// Warn and return.
warn!(
"{}: no valid manifest found.",
self.cert.rpki_manifest()
);
self.metrics.missing_manifests += 1;
self.reject_point(metrics);
return Ok(Vec::new())
}
};
let mut manifest = match self.validate_stored_manifest(manifest) {
Ok(manifest) => manifest,
Err(_) => {
self.reject_point(metrics);
return Ok(Vec::new())
}
};
let mut ca_tasks = Vec::new();
for object in &mut store {
let object = match object {
Ok(object) => object,
Err(err) => {
if err.is_fatal() {
error!(
"Fatal: failed to read from {}: {}",
store.path().display(), err
);
return Err(Failed)
}
else {
info!(
"Ignoring invalid stored publication point \
at {}: {}",
store.path().display(), err
);
self.reject_point(metrics);
return Ok(Vec::new())
}
}
};
if !self.process_object(
object.uri(), object.content().clone(),
&mut manifest, &mut ca_tasks
)? {
self.reject_point(metrics);
return Ok(Vec::new())
}
}
self.accept_point(manifest, metrics);
Ok(ca_tasks)
}
/// Tries to validate a stored manifest.
///
/// This is similar to
/// [`validate_collected_manifest`][Self::validate_collected_manifest]
/// but has less hassle with the CRL because that is actually included in
/// the stored manifest.
fn validate_stored_manifest(
&mut self,
stored_manifest: StoredManifest,
) -> Result<ValidPointManifest, Failed> {
// Decode and validate the manifest.
let manifest = match Manifest::decode(
stored_manifest.manifest().clone(), self.run.validation.strict
) {
Ok(manifest) => manifest,
Err(err) => {
warn!("{}: {}.", self.cert.rpki_manifest(), err);
self.metrics.invalid_manifests += 1;
return Err(Failed);
}
};
let (ee_cert, content) = match manifest.validate(
self.cert.cert(), self.run.validation.strict
) {
Ok(some) => some,
Err(err) => {
warn!("{}: {}.", self.cert.rpki_manifest(), err);
self.metrics.invalid_manifests += 1;
return Err(Failed);
}
};
if content.is_stale() {
self.metrics.stale_manifests += 1;
match self.run.validation.stale {
FilterPolicy::Reject => {
warn!("{}: stale manifest", self.cert.rpki_manifest());
self.metrics.invalid_manifests += 1;
return Err(Failed);
}
FilterPolicy::Warn => {
warn!("{}: stale manifest", self.cert.rpki_manifest());
}
FilterPolicy::Accept => { }
}
}
// Get the CRL URI. We actually only need this for error reporting.
let crl_uri = match ee_cert.crl_uri() {
Some(uri) => uri.clone(),
None => {
// This should have been ruled out in manifest validation.
warn!(
"{}: manifest without CRL URI.",
self.cert.rpki_manifest()
);
self.metrics.invalid_manifests += 1;
return Err(Failed)
}
};
// Decode and validate the CRL.
let mut crl = match Crl::decode(stored_manifest.crl().clone()) {
Ok(crl) => crl,
Err(err) => {
warn!("{}: {}.", crl_uri, err);
self.metrics.invalid_manifests += 1;
self.metrics.invalid_crls += 1;
return Err(Failed)
}
};
if let Err(err) = crl.verify_signature(
self.cert.cert().subject_public_key_info()
) {
warn!("{}: {}.", crl_uri, err);
self.metrics.invalid_manifests += 1;
self.metrics.invalid_crls += 1;
return Err(Failed)
}
if crl.is_stale() {
self.metrics.stale_crls += 1;
match self.run.validation.stale {
FilterPolicy::Reject => {
warn!("{}: stale CRL.", crl_uri);
self.metrics.invalid_manifests += 1;
self.metrics.invalid_crls += 1;
return Err(Failed)
}
FilterPolicy::Warn => {
warn!("{}: stale CRL.", crl_uri);
}
FilterPolicy::Accept => { }
}
}
// Turn on serial caching before looking for the first serial.
if content.len() > CRL_CACHE_LIMIT {
crl.cache_serials()
}
// Finally: has the manifest’s cert been revoked?
//
// XXX This shouldn’t really happen because if it were we would never
// have stored this manifest.
if crl.contains(ee_cert.serial_number()) {
warn!(
"{}: certificate has been revoked.",
self.cert.rpki_manifest()
);
self.metrics.invalid_manifests += 1;
return Err(Failed)
}
self.metrics.valid_manifests += 1;
self.metrics.valid_crls += 1;
Ok(ValidPointManifest {
ee_cert, content, crl_uri, crl,
manifest_bytes: stored_manifest.manifest().clone(),
crl_bytes: stored_manifest.crl().clone(),
metrics: Default::default(),
})
}
// XXX Check metrics generation and concentrate at one central point.
fn accept_point(
mut self,
manifest: ValidPointManifest,
metrics: &mut RunMetrics,
) {
self.metrics.valid_points += 1;
self.metrics += manifest.metrics;
self.apply_metrics(metrics);
self.processor.commit();
}
fn reject_point(
mut self,
metrics: &mut RunMetrics,
) {
self.metrics.rejected_points += 1;
self.apply_metrics(metrics);
self.processor.cancel(self.cert);
}
fn apply_metrics(
&mut self,
metrics: &mut RunMetrics,
) {
let repository_index = self.repository_index.unwrap_or_else(|| {
metrics.repository_index(self.cert)
});
self.processor.repository_index(repository_index);
metrics.apply(
&self.metrics,
repository_index,
self.cert.tal
);
}
/// Processes a single object.
///
/// Returns whether processing should continue or whether the entire (!)
/// publication point should be disregarded.
fn process_object(
&mut self,
uri: &uri::Rsync,
content: Bytes,
manifest: &mut ValidPointManifest,
ca_task: &mut Vec<CaTask<P::PubPoint>>,
) -> Result<bool, Failed> {
if !self.processor.want(uri)? {
return Ok(true)
}
if uri.ends_with(".cer") {
self.process_cer(uri, content, manifest, ca_task)?;
}
else if uri.ends_with(".roa") {
self.process_roa(uri, content, manifest)?;
}
else if uri.ends_with(".crl") {
if *uri != manifest.crl_uri {
warn!("{}: stray CRL.", uri);
manifest.metrics.stray_crls += 1;
}
}
else if uri.ends_with(".gbr") {
self.process_gbr(uri, content, manifest)?;
}
else {
manifest.metrics.others += 1;
warn!("{}: unknown object type.", uri);
}
Ok(true)
}
/// Processes a certificate object.
fn process_cer(
&mut self,
uri: &uri::Rsync,
content: Bytes,
manifest: &mut ValidPointManifest,
ca_task: &mut Vec<CaTask<P::PubPoint>>,
) -> Result<(), Failed> {
let cert = match Cert::decode(content) {
Ok(cert) => cert,
Err(err) => {
warn!("{}: {}.", uri, err);
manifest.metrics.invalid_certs += 1;
return Ok(())
}
};
if cert.key_usage() == KeyUsage::Ca {
self.process_ca_cer(uri, cert, manifest, ca_task)
}
else {
self.process_ee_cer(uri, cert, manifest)
}
}
/// Processes a CA certificate.
#[allow(clippy::too_many_arguments)]
fn process_ca_cer(
&mut self, uri: &uri::Rsync, cert: Cert,
manifest: &mut ValidPointManifest,
ca_task: &mut Vec<CaTask<P::PubPoint>>,
) -> Result<(), Failed> {
if self.cert.check_loop(&cert).is_err() {
warn!("{}: certificate loop detected.", uri);
manifest.metrics.invalid_certs += 1;
return Ok(())
}
let cert = match cert.validate_ca(
self.cert.cert(), self.run.validation.strict
) {
Ok(cert) => cert,
Err(err) => {
warn!("{}: {}.", uri, err);
manifest.metrics.invalid_certs += 1;
return Ok(())
}
};
if let Err(err) = manifest.check_crl(&cert) {
warn!("{}: {}.", uri, err);
manifest.metrics.invalid_certs += 1;
return Ok(())
}
let cert = match CaCert::chain(
self.cert, uri.clone(), cert, self.run.validation.max_ca_depth,
) {
Ok(cert) => cert,
Err(_) => {
manifest.metrics.invalid_certs += 1;
return Ok(())
}
};
manifest.metrics.valid_ca_certs += 1;
let mut processor = match self.processor.process_ca(
uri, &cert
)? {
Some(processor) => processor,
None => return Ok(())
};
processor.update_refresh(cert.cert().validity().not_after());
// Defer operation if we need to update the repository part where
// the CA lives.
let defer = match self.run.collector.as_ref() {
Some(collector) => !collector.was_updated(&cert),
None => false,
};
// If we switch repositories, we need to apply our metrics.
let repository_index = if cert.repository_switch() {
None
}
else {
self.repository_index
};
ca_task.push(CaTask {
cert, processor, repository_index, defer
});
Ok(())
}
/// Processes an EE certificate.
fn process_ee_cer(
&mut self, uri: &uri::Rsync, cert: Cert,
manifest: &mut ValidPointManifest,
) -> Result<(), Failed> {
if let Err(err) = cert.validate_router(
self.cert.cert(), self.run.validation.strict
) {
warn!("{}: {}.", uri, err);
manifest.metrics.invalid_certs += 1;
return Ok(())
};
if let Err(err) = manifest.check_crl(&cert) {
warn!("{}: {}.", uri, err);
manifest.metrics.invalid_certs += 1;
return Ok(())
}
manifest.metrics.valid_ee_certs += 1;
self.processor.process_ee_cert(uri, cert, self.cert)?;
Ok(())
}
/// Processes a ROA object.
fn process_roa(
&mut self, uri: &uri::Rsync, content: Bytes,
manifest: &mut ValidPointManifest,
) -> Result<(), Failed> {
let roa = match Roa::decode(
content, self.run.validation.strict
) {
Ok(roa) => roa,
Err(err) => {
warn!("{}: {}.", uri, err);
manifest.metrics.invalid_roas += 1;
return Ok(())
}
};
match roa.process(
self.cert.cert(),
self.run.validation.strict,
|cert| manifest.check_crl(cert)
) {
Ok((cert, route)) => {
manifest.metrics.valid_roas += 1;
self.processor.process_roa(uri, cert, route)?
}
Err(err) => {
manifest.metrics.invalid_roas += 1;
warn!("{}: {}.", uri, err)
}
}
Ok(())
}
/// Processes a Ghostbuster Record.
fn process_gbr(
&mut self, uri: &uri::Rsync, content: Bytes,
manifest: &mut ValidPointManifest,
) -> Result<(), Failed> {
let obj = match SignedObject::decode(
content, self.run.validation.strict
) {
Ok(obj) => obj,
Err(err) => {
warn!("{}: {}.", uri, err);
manifest.metrics.invalid_gbrs += 1;
return Ok(())
}
};
match obj.process(
self.cert.cert(),
self.run.validation.strict,
|cert| manifest.check_crl(cert)
) {
Ok((cert, content)) => {
manifest.metrics.valid_gbrs += 1;
self.processor.process_gbr(uri, cert, content)?
}
Err(err) => {
manifest.metrics.invalid_gbrs += 1;
warn!("{}: {}.", uri, err)
}
}
Ok(())
}
}
//------------ ValidPointManifest --------------------------------------------
/// All information from a validated manifest.
#[derive(Clone, Debug)]
struct ValidPointManifest {
/// The EE certificate the manifest was signed with.
ee_cert: ResourceCert,
/// The payload of the manifest.
content: ManifestContent,
/// The CRL distribution point URI of the manifest.
///
/// This is here separately because it may be `None` in a `ResourceCert`
/// but can’t be in a valid CA cert.
crl_uri: uri::Rsync,
/// The CRL.
crl: Crl,
/// The raw bytes of the manifest.
manifest_bytes: Bytes,
/// The raw bytes of the CRL.
crl_bytes: Bytes,
/// The publication metrics collected while processing the manifest.
///
/// We keep them here separatedly, so we can just zero them out if
/// processor decides to drop the publication point.
metrics: PublicationMetrics,
}
impl ValidPointManifest {
/// Checks whether `cert` has been revoked.
fn check_crl(&self, cert: &Cert) -> Result<(), ValidationError> {
let crl_uri = match cert.crl_uri() {
Some(some) => some,
None => {
return Err(InspectionError::new(
"certificate has no CRL URI"
).into())
}
};
if *crl_uri != self.crl_uri {
return Err(InspectionError::new(
"certificate's CRL differs from manifest's"
).into())
}
if self.crl.contains(cert.serial_number()) {
return Err(InspectionError::new(
"certificate has been revoked"
).into())
}
Ok(())
}
}
//------------ Task ----------------------------------------------------------
/// Any task that can be queued for delayed processing.
enum Task<'a, P> {
/// The task is to process a trust anchor locator.
Tal(TalTask<'a>),
/// The task is to process a CA.
Ca(CaTask<P>),
}
impl<'a, P> fmt::Debug for Task<'a, P> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Task::Tal(ref inner) => {
write!(f, "TalTask {{ tal: {} }}", inner.tal.info().name())
}
Task::Ca(ref inner) => {
write!(
f, "CaTask {{ ca_repository: {} }}",
inner.cert.ca_repository
)
}
}
}
}
//------------ TalTask ------------------------------------------------------
/// A task for processing a single trust anchor locator.
struct TalTask<'a> {
/// A reference to the actual TAL.
tal: &'a Tal,
/// The index of this TAL in the metrics.
index: usize,
}
//------------ CaTask --------------------------------------------------------
/// A task for processing a single CA.
struct CaTask<P> {
/// The CA certificate of the CA.
cert: Arc<CaCert>,
/// The processor for this CA.
processor: P,
/// The repository index of we know it already.
repository_index: Option<usize>,
/// Defer processing?
///
/// Processing is deferred if the CA lives in a different repository than
/// its issuing CA:
defer: bool,
}
//------------ CaCert --------------------------------------------------------
/// A CA certificate plus references to all its parents.
#[derive(Clone, Debug)]
pub struct CaCert {
/// The CA certificate of this CA.
cert: ResourceCert,
/// The certificate’s location.
#[allow(dead_code)] // Keep it even if unused, we may want it metrics later
uri: TalUri,
/// The CA repository URI of the certificate.
ca_repository: uri::Rsync,
/// The manifest URI of the certificate.
rpki_manifest: uri::Rsync,
/// The parent CA.
///
/// This will be `None` for a trust anchor certificate.
parent: Option<Arc<CaCert>>,
/// The length of the chain of certificates from a trust anchor.
chain_len: usize,
/// The index of the TAL in the metrics.
pub(crate) // XXX
tal: usize,
/// The combined validity of the certificate.
///
/// This is derived from the validity of all the parents and the
/// certificate itself.
combined_validity: Validity,
}
impl CaCert {
/// Creates a new CA cert for a trust anchor.
pub fn root(
cert: ResourceCert, uri: TalUri, tal: usize
) -> Result<Arc<Self>, Failed> {
Self::new(cert, uri, None, 0, tal)
}
/// Creates a new CA cert for an issued CA.
pub fn chain(
issuer: &Arc<Self>,
uri: uri::Rsync,
cert: ResourceCert,
max_depth: usize,
) -> Result<Arc<Self>, Failed> {
let chain_len = match issuer.chain_len.checked_add(1) {
Some(chain_len) => chain_len,
None => {
error!(
"CA {}: CA depth overrun.",
uri
);
return Err(Failed)
}
};
if chain_len > max_depth {
error!(
"CA {}: CA depth overrun.",
uri
);
return Err(Failed)
}
Self::new(
cert, TalUri::Rsync(uri),
Some(issuer.clone()), chain_len,
issuer.tal
)
}
/// Creates a new CA cert from its various parts.
fn new(
cert: ResourceCert,
uri: TalUri,
parent: Option<Arc<Self>>,
chain_len: usize,
tal: usize,
) -> Result<Arc<Self>, Failed> {
let combined_validity = match parent.as_ref() {
Some(ca) => cert.validity().trim(ca.combined_validity()),
None => cert.validity()
};
let ca_repository = match cert.ca_repository() {
Some(uri) => uri.clone(),
None => {
// This is actually checked during certificate validation,
// so this should never happen.
error!(
"CA cert {} has no repository URI. \
Why has it not been rejected yet?",
uri
);
return Err(Failed)
}
};
let rpki_manifest = match cert.rpki_manifest() {
Some(uri) => uri.clone(),
None => {
// This is actually checked during certificate validation,
// so this should never happen.
error!(
"CA cert {} has no manifest URI. \
Why has it not been rejected yet?",
uri
);
return Err(Failed)
}
};
Ok(Arc::new(CaCert {
cert, uri, ca_repository, rpki_manifest, parent, chain_len, tal,
combined_validity,
}))
}
/// Checks whether a child cert has appeared in the chain already.
pub fn check_loop(&self, cert: &Cert) -> Result<(), Failed> {
self._check_loop(cert.subject_key_identifier())
}
/// The actual recursive loop test.
///
/// We are comparing certificates by comparing their subject key
/// identifiers.
fn _check_loop(&self, key_id: KeyIdentifier) -> Result<(), Failed> {
if self.cert.subject_key_identifier() == key_id {
Err(Failed)
}
else if let Some(ref parent) = self.parent {
parent._check_loop(key_id)
}
else {
Ok(())
}
}
/// Returns a reference to the resource certificate.
pub fn cert(&self) -> &ResourceCert {
&self.cert
}
/// Returns a reference the caRepository URI of the certificate.
pub fn ca_repository(&self) -> &uri::Rsync {
&self.ca_repository
}
/// Returns a reference to the rpkiManifest URI of the certificate.
pub fn rpki_manifest(&self) -> &uri::Rsync {
&self.rpki_manifest
}
/// Returns a reference to the rpkiNotify URI of the certificate.
pub fn rpki_notify(&self) -> Option<&uri::Https> {
self.cert.rpki_notify()
}
/// Returns the combined validaty of the whole CA.
pub fn combined_validity(&self) -> Validity {
self.combined_validity
}
/// Returns whether the CA is in a different repository from its parent.
///
/// This is just a quick check and may report a switch when in fact there
/// isn’t one.
pub(crate) // XXX
fn repository_switch(&self) -> bool {
let parent = match self.parent.as_ref() {
Some(parent) => parent,
None => return true,
};
match self.rpki_notify() {
Some(rpki_notify) => {
Some(rpki_notify) != parent.rpki_notify()
}
None => {
self.ca_repository.module() != parent.ca_repository.module()
}
}
}
}
//------------ RunMetrics ----------------------------------------------------
/// The metrics collected during a engine run.
#[derive(Debug, Default)]
struct RunMetrics {
/// The per-TAL metrics.
tals: Vec<PublicationMetrics>,
/// The per-repository metrics.
repositories: Vec<PublicationMetrics>,
/// The overall metrics.
publication: PublicationMetrics,
/// The indexes of repositories in the repository metrics vec.
///
/// The key is the string representation of the rpkiNotify or rsync
/// module URI.
repository_indexes: Arc<Mutex<HashMap<String, usize>>>,
}
impl RunMetrics {
/// Creates a new value that shares indexes with the current one.
pub fn fork(&self) -> Self {
RunMetrics {
tals: Default::default(),
repositories: Default::default(),
publication: Default::default(),
repository_indexes: self.repository_indexes.clone(),
}
}
/// Returns the index of a repository in the metrics.
///
/// Adds a new repository if necessary.
pub fn repository_index(&self, cert: &CaCert) -> usize {
let uri = cert.rpki_notify().map(|uri| {
Cow::Borrowed(uri.as_str())
}).unwrap_or_else(|| {
cert.ca_repository.canonical_module()
});
let mut repository_indexes = self.repository_indexes.lock().unwrap();
if let Some(index) = repository_indexes.get(uri.as_ref()) {
return *index
}
let index = repository_indexes.len();
repository_indexes.insert(uri.into_owned(), index);
index
}
/// Apply publication metrics.
pub fn apply(
&mut self, metrics: &PublicationMetrics,
repository_index: usize, tal_index: usize
) {
while self.repositories.len() <= repository_index {
self.repositories.push(Default::default())
}
self.repositories[repository_index] += metrics;
while self.tals.len() <= tal_index {
self.tals.push(Default::default())
}
self.tals[tal_index] += metrics;
self.publication += metrics;
}
/// Prepares the final metrics.
pub fn prepare_final(&self, target: &mut Metrics) {
let mut indexes: Vec<_>
= self.repository_indexes.lock().unwrap().iter().map(|item| {
(item.0.clone(), *item.1)
}).collect();
indexes.sort_by_key(|(_, idx)| *idx);
target.repositories = indexes.into_iter().map(|(uri, _)| {
RepositoryMetrics::new(uri)
}).collect();
}
/// Collapse into the final metrics.
///
/// Assumes that the target has been extended to fit all TALs and
/// repositories.
///
/// This only collapses the publication metrics since those are the ones
/// collected by the engine.
pub fn collapse(self, target: &mut Metrics) {
for (target, metric) in target.tals.iter_mut().zip(
self.tals.into_iter()
) {
target.publication += metric
}
for (target, metric) in target.repositories.iter_mut().zip(
self.repositories.into_iter()
) {
target.publication += metric
}
target.publication += self.publication;
}
}
//------------ ProcessRun ----------------------------------------------------
/// A type that can process the valid data from the RPKI.
pub trait ProcessRun: Send + Sync {
/// The type processing the valid data of a single publication point.
type PubPoint: ProcessPubPoint;
/// Processes the given trust anchor.
///
/// If the method wants the content of this trust anchor to be validated
/// and processed, it returns a processor for it as some success value.
/// If it rather wishes to skip this trust anchor, it returns `Ok(None)`.
/// If it wishes to abort processing, it returns an error.
///
/// The `tal_index` argument indicates the index of the TAL in the
/// metrics produced by the processing run. Similarly, the
/// `repository_index` argument refers to the index of the repository
/// publishing the trust anchor CA’s publication point in the metrics.
fn process_ta(
&self, tal: &Tal, uri: &TalUri, cert: &CaCert, tal_index: usize
) -> Result<Option<Self::PubPoint>, Failed>;
}
//------------ ProcessPubPoint -----------------------------------------------
/// A type that can process the valid data from an RPKI publication point.
pub trait ProcessPubPoint: Sized + Send + Sync {
/// Sets the index of repository in the processing run metrics.
fn repository_index(&mut self, repository_index: usize) {
let _ = repository_index;
}
/// Updates the refresh time for this publication poont.
fn update_refresh(&mut self, not_after: Time) {
let _ = not_after;
}
/// Determines whether an object with the given URI should be processed.
///
/// The object will only be processed if the method returns `Ok(true)`.
/// If it returns `Ok(false)`, the object will be skipped quietly. If it
/// returns an error, the entire processing run will be aborted.
fn want(&self, uri: &uri::Rsync) -> Result<bool, Failed>;
/// Process the content of a validated CA.
///
/// The method can choose how to proceed. If it chooses to process the CA,
/// it returns `Ok(Some(value))` with a new processor to be used for this
/// CA. If it wishes to skip this CA, it returns `Ok(None)`. And if it
/// wishes to abort processing, it returns an error.
///
/// The `repository_index` argument indicates the index of the repository
/// publishing the CA’s publication point in the metrics produced by the
/// processing run.
fn process_ca(
&mut self, uri: &uri::Rsync, cert: &CaCert,
) -> Result<Option<Self>, Failed>;
/// Process the content of a validated EE certificate.
///
/// The method is given both the URI and the certificate. If it
/// returns an error, the entire processing run will be aborted.
fn process_ee_cert(
&mut self, uri: &uri::Rsync, cert: Cert, ca_cert: &CaCert,
) -> Result<(), Failed> {
let _ = (uri, cert, ca_cert);
Ok(())
}
/// Process the content of a validated ROA.
///
/// The method is given both the URI and the content of the ROA. If it
/// returns an error, the entire processing run will be aborted.
fn process_roa(
&mut self,
uri: &uri::Rsync,
cert: ResourceCert,
route: RouteOriginAttestation
) -> Result<(), Failed> {
let _ = (uri, cert, route);
Ok(())
}
/// Process the content of a Ghostbuster Record.
///
/// The method is given both the URI and the raw content of the object
/// as we currently don’t support parsing of these records.
///
/// If the method returns an error, the entire processing run will be
/// aborted.
fn process_gbr(
&mut self,
uri: &uri::Rsync,
cert: ResourceCert,
content: Bytes
) -> Result<(), Failed> {
let _ = (uri, cert, content);
Ok(())
}
/// Restart processing with an empty data set.
///
/// The method is called if processing has to switch to a different
/// object set for a publication point. An implementation must drop all
/// data collected so far and start with an empty set again.
///
/// The method does not indicate whether processing has succeeded or
/// failed.
fn restart(&mut self) -> Result<(), Failed>;
/// Completes processing of the CA.
///
/// The method is called when all objects of the CA have been processed
/// successfully or have been actively ignored and no error has happend.
fn commit(self);
/// Completes processing of an invalid CA.
///
/// The method is called when at least one of the objects published by the
/// CA is invalid.
///
/// The default implementation does nothing at all.
fn cancel(self, _cert: &CaCert) {
}
}