use std::collections::HashSet;
use std::path::Path;
use bytes::Bytes;
use log::info;
use rpki::repository::tal::TalUri;
use rpki::uri;
use crate::config::{Config, FallbackPolicy};
use crate::error::Failed;
use crate::metrics::Metrics;
use crate::engine::CaCert;
use super::{rrdp, rsync};
#[derive(Debug)]
pub struct Collector {
rrdp: Option<rrdp::Collector>,
rsync: Option<rsync::Collector>,
rrdp_fallback: FallbackPolicy,
}
impl Collector {
pub fn init(config: &Config) -> Result<(), Failed> {
rrdp::Collector::init(config)?;
rsync::Collector::init(config)?;
Ok(())
}
pub fn new(
config: &Config,
) -> Result<Self, Failed> {
Self::init(config)?;
Ok(Collector {
rrdp: rrdp::Collector::new(config)?,
rsync: rsync::Collector::new(config)?,
rrdp_fallback: config.rrdp_fallback,
})
}
pub fn ignite(&mut self) -> Result<(), Failed> {
self.rrdp.as_mut().map_or(Ok(()), rrdp::Collector::ignite)?;
self.rsync.as_mut().map_or(Ok(()), rsync::Collector::ignite)?;
Ok(())
}
pub fn start(&self) -> Run {
Run::new(self)
}
pub fn dump(&self, dir: &Path) -> Result<(), Failed> {
if let Some(rrdp) = self.rrdp.as_ref() {
rrdp.dump(dir)?;
}
if let Some(rsync) = self.rsync.as_ref() {
rsync.dump(dir)?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct Run<'a> {
collector: &'a Collector,
rsync: Option<rsync::Run<'a>>,
rrdp: Option<rrdp::Run<'a>>,
}
impl<'a> Run<'a> {
fn new(collector: &'a Collector) -> Self {
Run {
collector,
rsync: collector.rsync.as_ref().map(|rsync| rsync.start()),
rrdp: collector.rrdp.as_ref().map(|rrdp| rrdp.start()),
}
}
pub fn done(self, metrics: &mut Metrics) {
if let Some(rrdp) = self.rrdp {
rrdp.done(metrics)
}
if let Some(rsync) = self.rsync {
rsync.done(metrics)
}
}
pub fn load_ta(&self, uri: &TalUri) -> Option<Bytes> {
match *uri {
TalUri::Rsync(ref uri) => {
self.rsync.as_ref().and_then(|rsync| {
rsync.load_module(uri);
rsync.load_file(uri)
})
}
TalUri::Https(ref uri) => {
self.rrdp.as_ref().and_then(|rrdp| rrdp.load_ta(uri))
}
}
}
pub fn repository<'s>(
&'s self, ca: &'s CaCert
) -> Result<Option<Repository<'s>>, Failed> {
if let Some(rrdp_uri) = ca.rpki_notify() {
if let Some(ref rrdp) = self.rrdp {
let (repo, first) = rrdp.load_repository(rrdp_uri)?;
match repo {
rrdp::LoadResult::Unavailable => {
if matches!(
self.collector.rrdp_fallback,
FallbackPolicy::Never
) {
return Ok(None)
}
}
rrdp::LoadResult::Stale => {
if !matches!(
self.collector.rrdp_fallback,
FallbackPolicy::Stale
) {
return Ok(None)
}
}
rrdp::LoadResult::Current => {
return Ok(None)
}
rrdp::LoadResult::Updated(repo) => {
return Ok(Some(Repository::rrdp(repo)))
}
}
if first && self.rsync.is_some() {
info!("RRDP {}: Falling back to rsync.", rrdp_uri);
}
}
}
if let Some(ref rsync) = self.rsync {
rsync.load_module(ca.ca_repository());
return Ok(Some(Repository::rsync(rsync)))
}
Ok(None)
}
pub fn was_updated(&self, ca: &CaCert) -> bool {
if let Some(rrdp_uri) = ca.rpki_notify() {
if let Some(ref rrdp) = self.rrdp {
return rrdp.was_updated(rrdp_uri);
}
}
if let Some(ref rsync) = self.rsync {
return rsync.was_updated(ca.ca_repository());
}
true
}
pub fn cleanup(&self, retain: &mut Cleanup) -> Result<(), Failed> {
if let Some(rsync) = self.rsync.as_ref() {
rsync.cleanup(&mut retain.rsync)?;
}
if let Some(rrdp) = self.rrdp.as_ref() {
rrdp.cleanup(&mut retain.rrdp)?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct Repository<'a>(RepoInner<'a>);
#[derive(Debug)]
enum RepoInner<'a> {
Rrdp {
repository: rrdp::Repository,
},
Rsync {
rsync: &'a rsync::Run<'a>,
}
}
impl<'a> Repository<'a> {
fn rrdp(repository: rrdp::Repository) -> Self {
Repository(RepoInner::Rrdp { repository })
}
fn rsync(rsync: &'a rsync::Run<'a>) -> Self {
Repository(
RepoInner::Rsync { rsync }
)
}
pub fn is_rrdp(&self) -> bool {
matches!(self.0, RepoInner::Rrdp { .. })
}
pub fn load_object(
&self, uri: &uri::Rsync
) -> Result<Option<Bytes>, Failed> {
match self.0 {
RepoInner::Rrdp { ref repository } => {
repository.load_object(uri)
}
RepoInner::Rsync { rsync } => {
Ok(rsync.load_file(uri))
}
}
}
}
#[derive(Clone, Debug, Default)]
pub struct Cleanup {
rsync: rsync::ModuleSet,
rrdp: HashSet<uri::Https>,
}
impl Cleanup {
pub fn new() -> Self {
Default::default()
}
pub fn add_rrdp_repository(&mut self, rpki_notify: &uri::Https) {
self.rrdp.insert(rpki_notify.clone());
}
pub fn add_rsync_module(&mut self, uri: &uri::Rsync) {
self.rsync.add_from_uri(uri);
}
}