use std::{fmt, fs, io};
use std::borrow::Cow;
use std::collections::HashMap;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use bytes::Bytes;
use crossbeam_queue::{ArrayQueue, SegQueue};
use crossbeam_utils::thread;
use log::{debug, error, warn};
use rpki::repository::cert::{Cert, KeyUsage, ResourceCert};
use rpki::repository::crl::Crl;
use rpki::repository::crypto::keys::KeyIdentifier;
use rpki::repository::manifest::{Manifest, ManifestContent, ManifestHash};
use rpki::repository::roa::{Roa, RouteOriginAttestation};
use rpki::repository::sigobj::SignedObject;
use rpki::repository::tal::{Tal, TalInfo, TalUri};
use rpki::repository::x509::{Time, ValidationError, Validity};
use rpki::uri;
use crate::{collector, store};
use crate::config::{Config, FilterPolicy};
use crate::collector::Collector;
use crate::error::Failed;
use crate::metrics::{
Metrics, PublicationMetrics, RepositoryMetrics, TalMetrics
};
use crate::payload::ValidationReport;
use crate::store::{Store, StoredManifest, StoredObject, StoredPoint};
use crate::utils::str::str_from_ascii;
const CRL_CACHE_LIMIT: usize = 50;
#[derive(Debug)]
pub struct Engine {
tal_dir: PathBuf,
tal_labels: HashMap<String, String>,
tals: Vec<Tal>,
collector: Option<Collector>,
store: Store,
strict: bool,
stale: FilterPolicy,
validation_threads: usize,
dirty_repository: bool,
max_ca_depth: usize,
}
impl Engine {
pub fn init(config: &Config) -> Result<(), Failed> {
Collector::init(config)?;
Store::init(config)?;
Ok(())
}
pub fn new(
config: &Config,
update: bool,
) -> Result<Self, Failed> {
let collector = if update {
Some(Collector::new(config)?)
}
else {
None
};
let store = Store::new(config)?;
let mut res = Engine {
tal_dir: config.tal_dir.clone(),
tal_labels: config.tal_labels.clone(),
tals: Vec::new(),
collector,
store,
strict: config.strict,
stale: config.stale,
validation_threads: config.validation_threads,
dirty_repository: config.dirty_repository,
max_ca_depth: config.max_ca_depth,
};
res.reload_tals()?;
Ok(res)
}
pub fn reload_tals(&mut self) -> Result<(), Failed> {
let mut res = Vec::new();
let dir = match fs::read_dir(&self.tal_dir) {
Ok(dir) => dir,
Err(err) => {
if err.kind() == io::ErrorKind::NotFound {
error!(
"Missing TAL directory {}.\n\
You may have to initialize it via \
\'routinator init\'.",
self.tal_dir.display()
);
}
else {
error!("Failed to open TAL directory: {}.", err);
}
return Err(Failed)
}
};
for entry in dir {
let entry = match entry {
Ok(entry) => entry,
Err(err) => {
error!(
"Failed to iterate over tal directory: {}",
err
);
return Err(Failed)
}
};
if !entry.file_type().map(|ft| ft.is_file()).unwrap_or(false) {
continue
}
let path = entry.path();
if path.extension().map(|ext| ext != "tal").unwrap_or(true) {
continue
}
let mut file = match File::open(&path) {
Ok(file) => {
file
}
Err(err) => {
error!(
"Failed to open TAL {}: {}. \n\
Aborting.",
path.display(), err
);
return Err(Failed)
}
};
let mut tal = match Tal::read_named(
self.path_to_tal_label(&path),
&mut file
) {
Ok(tal) => tal,
Err(err) => {
error!(
"Failed to read TAL {}: {}. \n\
Aborting.",
path.display(), err
);
return Err(Failed)
}
};
tal.prefer_https();
res.push(tal);
}
if res.is_empty() {
warn!(
"No TALs found in TAL directory. Starting anyway."
);
}
self.tals = res;
Ok(())
}
fn path_to_tal_label(&self, path: &Path) -> String {
if let Some(name) = path.file_name().unwrap().to_str() {
if let Some(label) = self.tal_labels.get(name) {
return label.clone()
}
}
path.file_stem().unwrap().to_string_lossy().into_owned()
}
pub fn ignite(&mut self) -> Result<(), Failed> {
if let Some(collector) = self.collector.as_mut() {
collector.ignite()?;
}
Ok(())
}
pub fn start<P: ProcessRun>(
&self, processor: P
) -> Result<Run<P>, Failed> {
Ok(Run::new(
self,
self.collector.as_ref().map(Collector::start),
self.store.start(),
processor
))
}
pub fn process_origins(
&self
) -> Result<(ValidationReport, Metrics), Failed> {
let report = ValidationReport::new();
let mut run = self.start(&report)?;
run.process()?;
run.cleanup()?;
let metrics = run.done();
Ok((report, metrics))
}
pub fn dump(&self, dir: &Path) -> Result<(), Failed> {
self.store.dump(dir)?;
if let Some(collector) = self.collector.as_ref() {
collector.dump(dir)?;
}
Ok(())
}
}
pub struct Run<'a, P> {
validation: &'a Engine,
collector: Option<collector::Run<'a>>,
store: store::Run<'a>,
processor: P,
metrics: Metrics,
}
impl<'a, P> Run<'a, P> {
fn new(
validation: &'a Engine,
collector: Option<collector::Run<'a>>,
store: store::Run<'a>,
processor: P,
) -> Self {
Run {
validation, collector, store, processor,
metrics: Default::default()
}
}
pub fn cleanup(&mut self) -> Result<(), Failed> {
if self.validation.dirty_repository {
debug!("Skipping cleanup as configured.");
return Ok(())
}
let mut retain = collector::Cleanup::new();
self.store.cleanup(&mut retain)?;
if let Some(collector) = self.collector.as_mut() {
collector.cleanup(&mut retain)?;
}
Ok(())
}
pub fn done(self) -> Metrics {
let mut metrics = self.metrics;
if let Some(collector) = self.collector {
collector.done(&mut metrics)
}
self.store.done(&mut metrics);
metrics
}
}
impl<'a, P: ProcessRun> Run<'a, P> {
pub fn process(&mut self) -> Result<(), Failed> {
if self.validation.tals.is_empty() {
return Ok(())
}
let metrics = RunMetrics::default();
let tasks = SegQueue::new();
for (index, tal) in self.validation.tals.iter().enumerate() {
tasks.push(Task::Tal(TalTask { tal, index }));
self.metrics.tals.push(TalMetrics::new(tal.info().clone()));
}
let had_err = AtomicBool::new(false);
let thread_metrics = ArrayQueue::new(
self.validation.validation_threads
);
let res = thread::scope(|scope| {
for _ in 0 .. self.validation.validation_threads {
scope.spawn(|_| {
let mut metrics = metrics.fork();
while let Some(task) = tasks.pop() {
if self.process_task(
task, &tasks, &mut metrics, &had_err,
).is_err() {
break;
}
}
thread_metrics.push(metrics).unwrap();
});
}
});
if res.is_err() {
error!(
"Engine failed after a worker thread has panicked. \
This is most assuredly a bug."
);
return Err(Failed);
}
if had_err.load(Ordering::Relaxed) {
return Err(Failed);
}
metrics.prepare_final(&mut self.metrics);
while let Some(metrics) = thread_metrics.pop() {
metrics.collapse(&mut self.metrics);
}
Ok(())
}
fn process_task(
&self,
task: Task<P::PubPoint>,
tasks: &SegQueue<Task<P::PubPoint>>,
metrics: &mut RunMetrics,
had_err: &AtomicBool,
) -> Result<(), Failed> {
match task {
Task::Tal(task) => {
self.process_tal_task(task, tasks, metrics, had_err)
}
Task::Ca(task) => {
self.process_ca_task(task, tasks, metrics, had_err)
}
}
}
fn process_tal_task(
&self, task: TalTask,
tasks: &SegQueue<Task<P::PubPoint>>,
metrics: &mut RunMetrics,
had_err: &AtomicBool,
) -> Result<(), Failed> {
for uri in task.tal.uris() {
let cert = match self.load_ta(uri, task.tal.info())? {
Some(cert) => cert,
_ => continue,
};
if cert.subject_public_key_info() != task.tal.key_info() {
warn!(
"Trust anchor {}: key doesn’t match TAL.",
uri
);
continue;
}
let cert = match cert.validate_ta(
task.tal.info().clone(), self.validation.strict
) {
Ok(cert) => CaCert::root(cert, uri.clone(), task.index),
Err(_) => {
warn!("Trust anchor {}: doesn’t validate.", uri);
continue;
}
};
let cert = match cert {
Ok(cert) => cert,
Err(_) => continue,
};
debug!("Found valid trust anchor {}. Processing.", uri);
match self.processor.process_ta(
task.tal, uri, &cert, cert.tal
)? {
Some(processor) => {
return self.process_ca_task(
CaTask {
cert, processor,
repository_index: None,
defer: false,
},
tasks, metrics, had_err
)
}
None => {
debug!("Skipping trust anchor {}.", uri);
return Ok(())
}
}
}
warn!("No valid trust anchor for TAL {}", task.tal.info().name());
Ok(())
}
fn load_ta(
&self,
uri: &TalUri,
_info: &TalInfo,
) -> Result<Option<Cert>, Failed> {
if let Some(collector) = self.collector.as_ref() {
if let Some(bytes) = collector.load_ta(uri) {
if let Ok(cert) = Cert::decode(bytes.clone()) {
self.store.update_ta(uri, &bytes)?;
return Ok(Some(cert))
}
}
}
self.store.load_ta(uri).map(|bytes| {
bytes.and_then(|bytes| Cert::decode(bytes).ok())
})
}
fn process_ca_task(
&self,
task: CaTask<P::PubPoint>,
tasks: &SegQueue<Task<P::PubPoint>>,
metrics: &mut RunMetrics,
had_err: &AtomicBool,
) -> Result<(), Failed> {
let more_tasks = PubPoint::new(
self, &task.cert, task.processor, task.repository_index,
).and_then(|point| {
point.process(metrics)
}).map_err(|_| {
had_err.store(true, Ordering::Relaxed);
Failed
})?;
for task in more_tasks {
if had_err.load(Ordering::Relaxed) {
return Err(Failed)
}
if task.defer {
tasks.push(Task::Ca(task))
}
else {
self.process_ca_task(task, tasks, metrics, had_err)?;
}
}
Ok(())
}
}
struct PubPoint<'a, P: ProcessRun> {
run: &'a Run<'a, P>,
cert: &'a Arc<CaCert>,
processor: P::PubPoint,
repository_index: Option<usize>,
metrics: PublicationMetrics,
}
impl<'a, P: ProcessRun> PubPoint<'a, P> {
pub fn new(
run: &'a Run<'a, P>,
cert: &'a Arc<CaCert>,
processor: P::PubPoint,
repository_index: Option<usize>,
) -> Result<Self, Failed> {
Ok(PubPoint {
run, cert, processor, repository_index,
metrics: Default::default(),
})
}
pub fn process(
self,
metrics: &mut RunMetrics,
) -> Result<Vec<CaTask<P::PubPoint>>, Failed> {
let mut store = self.run.store.pub_point(self.cert)?;
if let Some(collector) = self.run.collector.as_ref() {
if let Some(collector) = collector.repository(self.cert)? {
match self.process_collected(
collector, &mut store, metrics
)? {
Ok(res) => return Ok(res),
Err(mut this) => {
this.metrics = Default::default();
return this.process_stored(store, metrics)
}
}
}
}
self.process_stored(store, metrics)
}
#[allow(clippy::type_complexity)] fn process_collected(
mut self,
collector: collector::Repository,
store: &mut StoredPoint,
metrics: &mut RunMetrics,
) -> Result<Result<Vec<CaTask<P::PubPoint>>, Self>, Failed> {
let collected = match collector.load_object(
self.cert.rpki_manifest()
)? {
Some(collected) => collected,
None => return Ok(Err(self))
};
let same = if let Some(mft) = store.manifest() {
mft.manifest() == &collected
&& mft.ca_repository() == self.cert.ca_repository()
}
else {
false
};
if same {
return Ok(Err(self))
}
let mut collected = match self.validate_collected_manifest(
collected, &collector
)? {
Some(collected) => collected,
None => {
return Ok(Err(self))
}
};
let mut ca_tasks = Vec::new();
let mut items = collected.content.iter();
let mut point_ok = true;
let update_result = store.update(
StoredManifest::new(
collected.ee_cert.validity().not_after(),
self.cert.rpki_notify().cloned(),
self.cert.ca_repository().clone(),
self.cert.rpki_manifest().clone(),
collected.manifest_bytes.clone(),
collected.crl_uri.clone(),
collected.crl_bytes.clone(),
),
|| {
let item = match items.next() {
Some(item) => item,
None => return Ok(None)
};
let file = match str_from_ascii(item.file()) {
Ok(file) => file,
Err(_) => {
warn!("{}: illegal file name '{}'.",
self.cert.rpki_manifest(),
String::from_utf8_lossy(item.file())
);
return Err(store::UpdateError::Abort)
}
};
let uri = self.cert.ca_repository().join(
file.as_ref()
).unwrap();
let hash = ManifestHash::new(
item.hash().clone(), collected.content.file_hash_alg()
);
let content = match collector.load_object(&uri)? {
Some(content) => content,
None => {
warn!("{}: failed to load.", uri);
return Err(store::UpdateError::Abort)
}
};
if hash.verify(&content).is_err() {
warn!("{}: file has wrong manifest hash.", uri);
return Err(store::UpdateError::Abort)
}
if !self.process_object(
&uri, content.clone(),
&mut collected, &mut ca_tasks
)? {
point_ok = false;
}
Ok(Some(StoredObject::new(uri, content, Some(hash))))
}
);
match update_result {
Ok(()) => {
if point_ok {
self.accept_point(collected, metrics);
Ok(Ok(ca_tasks))
}
else {
self.reject_point(metrics);
Ok(Ok(Vec::new()))
}
}
Err(store::UpdateError::Abort) => {
Ok(Err(self))
}
Err(store::UpdateError::Fatal) => {
Err(Failed)
}
}
}
fn validate_collected_manifest(
&mut self,
manifest_bytes: Bytes,
repository: &collector::Repository,
) -> Result<Option<ValidPointManifest>, Failed> {
let manifest = match Manifest::decode(
manifest_bytes.clone(), self.run.validation.strict
) {
Ok(manifest) => manifest,
Err(_) => {
self.metrics.invalid_manifests += 1;
warn!("{}: failed to decode", self.cert.rpki_manifest());
return Ok(None)
}
};
let (ee_cert, content) = match manifest.validate(
self.cert.cert(), self.run.validation.strict
) {
Ok(some) => some,
Err(_) => {
self.metrics.invalid_manifests += 1;
warn!("{}: failed to validate", self.cert.rpki_manifest());
return Ok(None)
}
};
if content.is_stale() {
self.metrics.stale_manifests += 1;
match self.run.validation.stale {
FilterPolicy::Reject => {
warn!("{}: stale manifest", self.cert.rpki_manifest());
return Ok(None)
}
FilterPolicy::Warn => {
warn!("{}: stale manifest", self.cert.rpki_manifest());
}
FilterPolicy::Accept => { }
}
}
let (crl_uri, crl, crl_bytes) = match self.validate_collected_crl(
&ee_cert, &content, repository
)? {
Some(some) => some,
None => return Ok(None)
};
self.metrics.valid_manifests += 1;
Ok(Some(ValidPointManifest {
ee_cert, content, crl_uri, crl, manifest_bytes, crl_bytes,
metrics: Default::default(),
}))
}
fn validate_collected_crl(
&mut self,
ee_cert: &ResourceCert,
manifest: &ManifestContent,
repository: &collector::Repository
) -> Result<Option<(uri::Rsync, Crl, Bytes)>, Failed> {
let crl_uri = match ee_cert.crl_uri() {
Some(some) if some.ends_with(".crl") => some.clone(),
_ => {
self.metrics.invalid_manifests += 1;
warn!("{}: invalid CRL URI.", self.cert.rpki_manifest());
return Ok(None)
}
};
let crl_name = match crl_uri.relative_to(self.cert.ca_repository()) {
Some(name) => name,
None => {
self.metrics.invalid_manifests += 1;
warn!(
"{}: CRL URI outside repository directory.",
self.cert.rpki_manifest()
);
return Ok(None)
}
};
let mut crl_bytes = None;
for item in manifest.iter() {
let (file, hash) = item.into_pair();
if file == crl_name {
let bytes = match repository.load_object(&crl_uri)? {
Some(bytes) => bytes,
None => {
self.metrics.invalid_crls += 1;
warn!("{}: failed to load.", crl_uri);
return Ok(None)
}
};
let hash = ManifestHash::new(hash, manifest.file_hash_alg());
if hash.verify(&bytes).is_err() {
self.metrics.invalid_crls += 1;
warn!("{}: file has wrong hash.", crl_uri);
return Ok(None)
}
crl_bytes = Some(bytes);
}
}
let crl_bytes = match crl_bytes {
Some(some) => some,
None => {
self.metrics.invalid_crls += 1;
warn!(
"{}: CRL not listed on manifest.",
self.cert.rpki_manifest()
);
return Ok(None)
}
};
let mut crl = match Crl::decode(crl_bytes.clone()) {
Ok(crl) => crl,
Err(_) => {
self.metrics.invalid_crls += 1;
warn!("{}: failed to decode.", crl_uri);
return Ok(None)
}
};
if crl.validate(self.cert.cert().subject_public_key_info()).is_err() {
self.metrics.invalid_crls += 1;
warn!("{}: failed to validate.", crl_uri);
return Ok(None)
}
if crl.is_stale() {
self.metrics.stale_crls += 1;
match self.run.validation.stale {
FilterPolicy::Reject => {
warn!("{}: stale CRL.", crl_uri);
return Ok(None)
}
FilterPolicy::Warn => {
warn!("{}: stale CRL.", crl_uri);
}
FilterPolicy::Accept => { }
}
}
if manifest.len() > CRL_CACHE_LIMIT {
crl.cache_serials()
}
if crl.contains(ee_cert.serial_number()) {
self.metrics.invalid_manifests += 1;
warn!(
"{}: certificate has been revoked.",
self.cert.rpki_manifest()
);
return Ok(None)
}
Ok(Some((crl_uri, crl, crl_bytes)))
}
fn process_stored(
mut self,
mut store: StoredPoint,
metrics: &mut RunMetrics,
) -> Result<Vec<CaTask<P::PubPoint>>, Failed> {
let manifest = match store.take_manifest() {
Some(manifest) => manifest,
None => {
warn!(
"{}: No valid manifest found.",
self.cert.rpki_manifest()
);
self.metrics.missing_manifests += 1;
self.reject_point(metrics);
return Ok(Vec::new())
}
};
let mut manifest = match self.validate_stored_manifest(manifest) {
Ok(manifest) => manifest,
Err(_) => {
self.reject_point(metrics);
return Ok(Vec::new())
}
};
let mut ca_tasks = Vec::new();
for object in &mut store {
let object = object?;
if !self.process_object(
object.uri(), object.content().clone(),
&mut manifest, &mut ca_tasks
)? {
self.reject_point(metrics);
return Ok(Vec::new())
}
}
self.accept_point(manifest, metrics);
Ok(ca_tasks)
}
fn validate_stored_manifest(
&mut self,
stored_manifest: StoredManifest,
) -> Result<ValidPointManifest, ValidationError> {
let manifest = match Manifest::decode(
stored_manifest.manifest().clone(), self.run.validation.strict
) {
Ok(manifest) => manifest,
Err(_) => {
warn!("{}: failed to decode", self.cert.rpki_manifest());
self.metrics.invalid_manifests += 1;
return Err(ValidationError);
}
};
let (ee_cert, content) = match manifest.validate(
self.cert.cert(), self.run.validation.strict
) {
Ok(some) => some,
Err(_) => {
warn!("{}: failed to validate", self.cert.rpki_manifest());
self.metrics.invalid_manifests += 1;
return Err(ValidationError);
}
};
if content.is_stale() {
self.metrics.stale_manifests += 1;
match self.run.validation.stale {
FilterPolicy::Reject => {
warn!("{}: stale manifest", self.cert.rpki_manifest());
self.metrics.invalid_manifests += 1;
return Err(ValidationError);
}
FilterPolicy::Warn => {
warn!("{}: stale manifest", self.cert.rpki_manifest());
}
FilterPolicy::Accept => { }
}
}
let crl_uri = match ee_cert.crl_uri() {
Some(uri) => uri.clone(),
None => {
warn!(
"{}: manifest without CRL URI.",
self.cert.rpki_manifest()
);
self.metrics.invalid_manifests += 1;
return Err(ValidationError)
}
};
let mut crl = match Crl::decode(stored_manifest.crl().clone()) {
Ok(crl) => crl,
Err(_) => {
warn!("{}: failed to decode.", crl_uri);
self.metrics.invalid_manifests += 1;
self.metrics.invalid_crls += 1;
return Err(ValidationError)
}
};
if crl.validate(self.cert.cert().subject_public_key_info()).is_err() {
warn!("{}: failed to validate.", crl_uri);
self.metrics.invalid_manifests += 1;
self.metrics.invalid_crls += 1;
return Err(ValidationError)
}
if crl.is_stale() {
self.metrics.stale_crls += 1;
match self.run.validation.stale {
FilterPolicy::Reject => {
warn!("{}: stale CRL.", crl_uri);
self.metrics.invalid_manifests += 1;
self.metrics.invalid_crls += 1;
return Err(ValidationError)
}
FilterPolicy::Warn => {
warn!("{}: stale CRL.", crl_uri);
}
FilterPolicy::Accept => { }
}
}
if content.len() > CRL_CACHE_LIMIT {
crl.cache_serials()
}
if crl.contains(ee_cert.serial_number()) {
warn!(
"{}: certificate has been revoked.",
self.cert.rpki_manifest()
);
self.metrics.invalid_manifests += 1;
return Err(ValidationError)
}
self.metrics.valid_manifests += 1;
self.metrics.valid_crls += 1;
Ok(ValidPointManifest {
ee_cert, content, crl_uri, crl,
manifest_bytes: stored_manifest.manifest().clone(),
crl_bytes: stored_manifest.crl().clone(),
metrics: Default::default(),
})
}
fn accept_point(
mut self,
manifest: ValidPointManifest,
metrics: &mut RunMetrics,
) {
self.metrics.valid_points += 1;
self.metrics += manifest.metrics;
self.apply_metrics(metrics);
self.processor.commit();
}
fn reject_point(
mut self,
metrics: &mut RunMetrics,
) {
self.metrics.rejected_points += 1;
self.apply_metrics(metrics);
self.processor.cancel(self.cert);
}
fn apply_metrics(
&mut self,
metrics: &mut RunMetrics,
) {
let repository_index = self.repository_index.unwrap_or_else(|| {
metrics.repository_index(self.cert)
});
self.processor.repository_index(repository_index);
metrics.apply(
&self.metrics,
repository_index,
self.cert.tal
);
}
fn process_object(
&mut self,
uri: &uri::Rsync,
content: Bytes,
manifest: &mut ValidPointManifest,
ca_task: &mut Vec<CaTask<P::PubPoint>>,
) -> Result<bool, Failed> {
if !self.processor.want(uri)? {
return Ok(true)
}
if uri.ends_with(".cer") {
self.process_cer(uri, content, manifest, ca_task)?;
}
else if uri.ends_with(".roa") {
self.process_roa(uri, content, manifest)?;
}
else if uri.ends_with(".crl") {
if *uri != manifest.crl_uri {
warn!("{}: stray CRL.", uri);
manifest.metrics.stray_crls += 1;
}
}
else if uri.ends_with(".gbr") {
self.process_gbr(uri, content, manifest)?;
}
else {
manifest.metrics.others += 1;
warn!("{}: unknown object type.", uri);
}
Ok(true)
}
fn process_cer(
&mut self,
uri: &uri::Rsync,
content: Bytes,
manifest: &mut ValidPointManifest,
ca_task: &mut Vec<CaTask<P::PubPoint>>,
) -> Result<(), Failed> {
let cert = match Cert::decode(content) {
Ok(cert) => cert,
Err(_) => {
warn!("{}: failed to decode.", uri);
manifest.metrics.invalid_certs += 1;
return Ok(())
}
};
if cert.key_usage() == KeyUsage::Ca {
self.process_ca_cer(uri, cert, manifest, ca_task)
}
else {
self.process_ee_cer(uri, cert, manifest)
}
}
#[allow(clippy::too_many_arguments)]
fn process_ca_cer(
&mut self, uri: &uri::Rsync, cert: Cert,
manifest: &mut ValidPointManifest,
ca_task: &mut Vec<CaTask<P::PubPoint>>,
) -> Result<(), Failed> {
if self.cert.check_loop(&cert).is_err() {
warn!("{}: certificate loop detected.", uri);
manifest.metrics.invalid_certs += 1;
return Ok(())
}
let cert = match cert.validate_ca(
self.cert.cert(), self.run.validation.strict
) {
Ok(cert) => cert,
Err(_) => {
warn!("{}: CA certificate failed to validate.", uri);
manifest.metrics.invalid_certs += 1;
return Ok(())
}
};
if manifest.check_crl(uri, &cert).is_err() {
manifest.metrics.invalid_certs += 1;
return Ok(())
}
let cert = match CaCert::chain(
self.cert, uri.clone(), cert, self.run.validation.max_ca_depth,
) {
Ok(cert) => cert,
Err(_) => {
manifest.metrics.invalid_certs += 1;
return Ok(())
}
};
manifest.metrics.valid_ca_certs += 1;
let mut processor = match self.processor.process_ca(
uri, &cert
)? {
Some(processor) => processor,
None => return Ok(())
};
processor.update_refresh(cert.cert().validity().not_after());
let defer = match self.run.collector.as_ref() {
Some(collector) => !collector.was_updated(&cert),
None => false,
};
let repository_index = if cert.repository_switch() {
None
}
else {
self.repository_index
};
ca_task.push(CaTask {
cert, processor, repository_index, defer
});
Ok(())
}
fn process_ee_cer(
&mut self, uri: &uri::Rsync, cert: Cert,
manifest: &mut ValidPointManifest,
) -> Result<(), Failed> {
if cert.validate_router(
self.cert.cert(), self.run.validation.strict
).is_err() {
warn!("{}: router certificate failed to validate.", uri);
manifest.metrics.invalid_certs += 1;
return Ok(())
};
if manifest.check_crl(uri, &cert).is_err() {
manifest.metrics.invalid_certs += 1;
return Ok(())
}
manifest.metrics.valid_ee_certs += 1;
self.processor.process_ee_cert(uri, cert)?;
Ok(())
}
fn process_roa(
&mut self, uri: &uri::Rsync, content: Bytes,
manifest: &mut ValidPointManifest,
) -> Result<(), Failed> {
let roa = match Roa::decode(
content, self.run.validation.strict
) {
Ok(roa) => roa,
Err(_) => {
warn!("{}: decoding failed.", uri);
manifest.metrics.invalid_roas += 1;
return Ok(())
}
};
match roa.process(
self.cert.cert(),
self.run.validation.strict,
|cert| manifest.check_crl(uri, cert)
) {
Ok((cert, route)) => {
manifest.metrics.valid_roas += 1;
self.processor.process_roa(uri, cert, route)?
}
Err(_) => {
manifest.metrics.invalid_roas += 1;
warn!("{}: validation failed.", uri)
}
}
Ok(())
}
fn process_gbr(
&mut self, uri: &uri::Rsync, content: Bytes,
manifest: &mut ValidPointManifest,
) -> Result<(), Failed> {
let obj = match SignedObject::decode(
content, self.run.validation.strict
) {
Ok(obj) => obj,
Err(_) => {
warn!("{}: decoding failed.", uri);
manifest.metrics.invalid_gbrs += 1;
return Ok(())
}
};
match obj.process(
self.cert.cert(),
self.run.validation.strict,
|cert| manifest.check_crl(uri, cert)
) {
Ok((cert, content)) => {
manifest.metrics.valid_gbrs += 1;
self.processor.process_gbr(uri, cert, content)?
}
Err(_) => {
manifest.metrics.invalid_gbrs += 1;
warn!("{}: validation failed.", uri)
}
}
Ok(())
}
}
#[derive(Clone, Debug)]
struct ValidPointManifest {
ee_cert: ResourceCert,
content: ManifestContent,
crl_uri: uri::Rsync,
crl: Crl,
manifest_bytes: Bytes,
crl_bytes: Bytes,
metrics: PublicationMetrics,
}
impl ValidPointManifest {
fn check_crl(
&self, uri: &uri::Rsync, cert: &Cert
) -> Result<(), ValidationError> {
let crl_uri = match cert.crl_uri() {
Some(some) => some,
None => {
warn!("{}: certificate has no CRL URI", uri);
return Err(ValidationError)
}
};
if *crl_uri != self.crl_uri {
warn!("{}: certifacte's CRL differs from manifest's.", uri);
return Err(ValidationError)
}
if self.crl.contains(cert.serial_number()) {
warn!("{}: certificate has been revoked.", uri);
return Err(ValidationError)
}
Ok(())
}
}
enum Task<'a, P> {
Tal(TalTask<'a>),
Ca(CaTask<P>),
}
impl<'a, P> fmt::Debug for Task<'a, P> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
Task::Tal(ref inner) => {
write!(f, "TalTask {{ tal: {} }}", inner.tal.info().name())
}
Task::Ca(ref inner) => {
write!(
f, "CaTask {{ ca_repository: {} }}",
inner.cert.ca_repository
)
}
}
}
}
struct TalTask<'a> {
tal: &'a Tal,
index: usize,
}
struct CaTask<P> {
cert: Arc<CaCert>,
processor: P,
repository_index: Option<usize>,
defer: bool,
}
#[derive(Clone, Debug)]
pub struct CaCert {
cert: ResourceCert,
#[allow(dead_code)] uri: TalUri,
ca_repository: uri::Rsync,
rpki_manifest: uri::Rsync,
parent: Option<Arc<CaCert>>,
chain_len: usize,
pub(crate) tal: usize,
combined_validity: Validity,
}
impl CaCert {
pub fn root(
cert: ResourceCert, uri: TalUri, tal: usize
) -> Result<Arc<Self>, Failed> {
Self::new(cert, uri, None, 0, tal)
}
pub fn chain(
issuer: &Arc<Self>,
uri: uri::Rsync,
cert: ResourceCert,
max_depth: usize,
) -> Result<Arc<Self>, Failed> {
let chain_len = match issuer.chain_len.checked_add(1) {
Some(chain_len) => chain_len,
None => {
error!(
"CA {}: CA depth overrun.",
uri
);
return Err(Failed)
}
};
if chain_len > max_depth {
error!(
"CA {}: CA depth overrun.",
uri
);
return Err(Failed)
}
Self::new(
cert, TalUri::Rsync(uri),
Some(issuer.clone()), chain_len,
issuer.tal
)
}
fn new(
cert: ResourceCert,
uri: TalUri,
parent: Option<Arc<Self>>,
chain_len: usize,
tal: usize,
) -> Result<Arc<Self>, Failed> {
let combined_validity = match parent.as_ref() {
Some(ca) => cert.validity().trim(ca.combined_validity()),
None => cert.validity()
};
let ca_repository = match cert.ca_repository() {
Some(uri) => uri.clone(),
None => {
error!(
"CA cert {} has no repository URI. \
Why has it not been rejected yet?",
uri
);
return Err(Failed)
}
};
let rpki_manifest = match cert.rpki_manifest() {
Some(uri) => uri.clone(),
None => {
error!(
"CA cert {} has no manifest URI. \
Why has it not been rejected yet?",
uri
);
return Err(Failed)
}
};
Ok(Arc::new(CaCert {
cert, uri, ca_repository, rpki_manifest, parent, chain_len, tal,
combined_validity,
}))
}
pub fn check_loop(&self, cert: &Cert) -> Result<(), Failed> {
self._check_loop(cert.subject_key_identifier())
}
fn _check_loop(&self, key_id: KeyIdentifier) -> Result<(), Failed> {
if self.cert.subject_key_identifier() == key_id {
Err(Failed)
}
else if let Some(ref parent) = self.parent {
parent._check_loop(key_id)
}
else {
Ok(())
}
}
pub fn cert(&self) -> &ResourceCert {
&self.cert
}
pub fn ca_repository(&self) -> &uri::Rsync {
&self.ca_repository
}
pub fn rpki_manifest(&self) -> &uri::Rsync {
&self.rpki_manifest
}
pub fn rpki_notify(&self) -> Option<&uri::Https> {
self.cert.rpki_notify()
}
pub fn combined_validity(&self) -> Validity {
self.combined_validity
}
pub(crate) fn repository_switch(&self) -> bool {
let parent = match self.parent.as_ref() {
Some(parent) => parent,
None => return true,
};
match self.rpki_notify() {
Some(rpki_notify) => {
Some(rpki_notify) != parent.rpki_notify()
}
None => {
self.ca_repository.module() != parent.ca_repository.module()
}
}
}
}
#[derive(Debug, Default)]
struct RunMetrics {
tals: Vec<PublicationMetrics>,
repositories: Vec<PublicationMetrics>,
publication: PublicationMetrics,
repository_indexes: Arc<Mutex<HashMap<String, usize>>>,
}
impl RunMetrics {
pub fn fork(&self) -> Self {
RunMetrics {
tals: Default::default(),
repositories: Default::default(),
publication: Default::default(),
repository_indexes: self.repository_indexes.clone(),
}
}
pub fn repository_index(&self, cert: &CaCert) -> usize {
let uri = cert.rpki_notify().map(|uri| {
Cow::Borrowed(uri.as_str())
}).unwrap_or_else(|| {
cert.ca_repository.canonical_module()
});
let mut repository_indexes = self.repository_indexes.lock().unwrap();
if let Some(index) = repository_indexes.get(uri.as_ref()) {
return *index
}
let index = repository_indexes.len();
repository_indexes.insert(uri.into_owned(), index);
index
}
pub fn apply(
&mut self, metrics: &PublicationMetrics,
repository_index: usize, tal_index: usize
) {
while self.repositories.len() <= repository_index {
self.repositories.push(Default::default())
}
self.repositories[repository_index] += metrics;
while self.tals.len() <= tal_index {
self.tals.push(Default::default())
}
self.tals[tal_index] += metrics;
self.publication += metrics;
}
pub fn prepare_final(&self, target: &mut Metrics) {
let mut indexes: Vec<_>
= self.repository_indexes.lock().unwrap().iter().map(|item| {
(item.0.clone(), *item.1)
}).collect();
indexes.sort_by_key(|(_, idx)| *idx);
target.repositories = indexes.into_iter().map(|(uri, _)| {
RepositoryMetrics::new(uri)
}).collect();
}
pub fn collapse(self, target: &mut Metrics) {
for (target, metric) in target.tals.iter_mut().zip(
self.tals.into_iter()
) {
target.publication += metric
}
for (target, metric) in target.repositories.iter_mut().zip(
self.repositories.into_iter()
) {
target.publication += metric
}
target.publication += self.publication;
}
}
pub trait ProcessRun: Send + Sync {
type PubPoint: ProcessPubPoint;
fn process_ta(
&self, tal: &Tal, uri: &TalUri, cert: &CaCert, tal_index: usize
) -> Result<Option<Self::PubPoint>, Failed>;
}
pub trait ProcessPubPoint: Sized + Send + Sync {
fn repository_index(&mut self, repository_index: usize) {
let _ = repository_index;
}
fn update_refresh(&mut self, not_after: Time) {
let _ = not_after;
}
fn want(&self, uri: &uri::Rsync) -> Result<bool, Failed>;
fn process_ca(
&mut self, uri: &uri::Rsync, cert: &CaCert,
) -> Result<Option<Self>, Failed>;
fn process_ee_cert(
&mut self, uri: &uri::Rsync, cert: Cert
) -> Result<(), Failed> {
let _ = (uri, cert);
Ok(())
}
fn process_roa(
&mut self,
uri: &uri::Rsync,
cert: ResourceCert,
route: RouteOriginAttestation
) -> Result<(), Failed> {
let _ = (uri, cert, route);
Ok(())
}
fn process_gbr(
&mut self,
uri: &uri::Rsync,
cert: ResourceCert,
content: Bytes
) -> Result<(), Failed> {
let _ = (uri, cert, content);
Ok(())
}
fn restart(&mut self) -> Result<(), Failed>;
fn commit(self);
fn cancel(self, _cert: &CaCert) {
}
}