use std::{cmp, error, fmt, fs, io};
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::fs::File;
use std::io::{Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, SystemTime};
use bytes::Bytes;
use chrono::{DateTime, TimeZone, Utc};
use log::{debug, error, info, warn};
use rand::Rng;
use ring::digest;
use ring::constant_time::verify_slices_are_equal;
use reqwest::header;
use reqwest::{Certificate, Proxy, StatusCode};
use reqwest::blocking::{Client, ClientBuilder, RequestBuilder, Response};
use rpki::{rrdp, uri};
use rpki::repository::crypto::DigestAlgorithm;
use rpki::rrdp::{DeltaInfo, NotificationFile, ProcessDelta, ProcessSnapshot};
use uuid::Uuid;
use crate::config::Config;
use crate::error::Failed;
use crate::metrics::{Metrics, RrdpRepositoryMetrics};
use crate::utils::fatal;
use crate::utils::binio::{Compose, Parse};
use crate::utils::date::{parse_http_date, format_http_date};
use crate::utils::dump::DumpRegistry;
use crate::utils::json::JsonBuilder;
use crate::utils::uri::UriExt;
#[derive(Debug)]
pub struct Collector {
working_dir: PathBuf,
http: HttpClient,
filter_dubious: bool,
fallback_time: FallbackTime,
max_object_size: Option<u64>,
max_delta_count: usize,
}
impl Collector {
pub fn init(config: &Config) -> Result<(), Failed> {
let _ = Self::create_working_dir(config)?;
Ok(())
}
fn create_working_dir(config: &Config) -> Result<PathBuf, Failed> {
let working_dir = config.cache_dir.join("rrdp");
if config.fresh {
if let Err(err) = fs::remove_dir_all(&working_dir) {
if err.kind() != io::ErrorKind::NotFound {
error!(
"Failed to delete RRDP working directory at {}: {}",
working_dir.display(), err
);
return Err(Failed)
}
}
}
if let Err(err) = fs::create_dir_all(&working_dir) {
error!(
"Failed to create RRDP working directory {}: {}.",
working_dir.display(), err
);
return Err(Failed);
}
Ok(working_dir)
}
pub fn new(config: &Config) -> Result<Option<Self>, Failed> {
if config.disable_rrdp {
return Ok(None)
}
Ok(Some(Collector {
working_dir: Self::create_working_dir(config)?,
http: HttpClient::new(config)?,
filter_dubious: !config.allow_dubious_hosts,
fallback_time: FallbackTime::from_config(config),
max_object_size: config.max_object_size,
max_delta_count: config.rrdp_max_delta_count,
}))
}
pub fn ignite(&mut self) -> Result<(), Failed> {
self.http.ignite()
}
pub fn start(&self) -> Run {
Run::new(self)
}
#[allow(clippy::mutable_key_type)]
pub fn dump(&self, dir: &Path) -> Result<(), Failed> {
let dir = dir.join("rrdp");
debug!("Dumping RRDP collector content to {}", dir.display());
let mut registry = DumpRegistry::new(dir);
let mut states = HashMap::new();
for entry in fatal::read_dir(&self.working_dir)? {
let entry = entry?;
if !entry.is_dir() {
continue;
}
for entry in fatal::read_dir(entry.path())? {
let entry = entry?;
if entry.is_dir() {
self.dump_repository(
entry.path(), &mut registry, &mut states
)?;
}
}
}
self.dump_repository_json(registry, states)?;
debug!("RRDP collector dump complete.");
Ok(())
}
#[allow(clippy::mutable_key_type)]
fn dump_repository(
&self,
repo_path: &Path,
registry: &mut DumpRegistry,
state_registry: &mut HashMap<uri::Https, RepositoryState>,
) -> Result<(), Failed> {
let state_path = repo_path.join(RepositoryState::FILE_NAME);
let state = match RepositoryState::load_path(&state_path)? {
Some(state) => state,
None => return Ok(())
};
let target_path = registry.get_repo_path(Some(&state.rpki_notify));
fatal::create_dir_all(&target_path)?;
Self::dump_tree(&repo_path.join("rsync"), &target_path)?;
state_registry.insert(state.rpki_notify.clone(), state);
Ok(())
}
fn dump_tree(
source_path: &Path,
target_path: &Path,
) -> Result<(), Failed> {
for entry in fatal::read_dir(source_path)? {
let entry = entry?;
if entry.is_dir() {
Self::dump_tree(
entry.path(), &target_path.join(entry.file_name())
)?;
}
else if entry.is_file() {
let target_path = target_path.join(entry.file_name());
fatal::create_parent_all(&target_path)?;
if let Err(err) = fs::copy(entry.path(), &target_path) {
error!(
"Fatal: failed to copy {} to {}: {}",
entry.path().display(),
target_path.display(),
err
);
return Err(Failed)
}
}
}
Ok(())
}
#[allow(clippy::mutable_key_type)]
fn dump_repository_json(
&self,
repos: DumpRegistry,
states: HashMap<uri::Https, RepositoryState>,
) -> Result<(), Failed> {
let path = repos.base_dir().join("repositories.json");
if let Err(err) = fs::write(
&path,
&JsonBuilder::build(|builder| {
builder.member_array("repositories", |builder| {
for (key, value) in repos.rrdp_uris() {
builder.array_object(|builder| {
builder.member_str(
"path", value
);
builder.member_str("type", "rrdp");
builder.member_str(
"rpkiNotify",
key
);
if let Some(state) = states.get(key) {
builder.member_raw("serial", state.serial);
builder.member_str("session", state.session);
builder.member_str(
"updated",
state.updated().to_rfc3339()
);
}
})
}
builder.array_object(|builder| {
builder.member_str("path", "rsync");
builder.member_str("type", "rsync");
});
})
})
) {
error!( "Failed to write {}: {}", path.display(), err);
return Err(Failed)
}
Ok(())
}
fn repository_path(&self, rpki_notify: &uri::Https) -> PathBuf {
let authority = rpki_notify.canonical_authority();
let alg = DigestAlgorithm::sha256();
let mut dir = String::with_capacity(
authority.len()
+ alg.digest_len()
+ 1 );
dir.push_str(&authority);
dir.push('/');
crate::utils::str::append_hex(
alg.digest(rpki_notify.as_slice()).as_ref(),
&mut dir
);
self.working_dir.join(dir)
}
}
#[derive(Debug)]
pub struct Run<'a> {
collector: &'a Collector,
updated: RwLock<HashMap<uri::Https, Option<Repository>>>,
running: RwLock<HashMap<uri::Https, Arc<Mutex<()>>>>,
metrics: Mutex<Vec<RrdpRepositoryMetrics>>,
}
impl<'a> Run<'a> {
fn new(collector: &'a Collector) -> Self {
Run {
collector,
updated: Default::default(),
running: Default::default(),
metrics: Mutex::new(Vec::new()),
}
}
pub fn load_ta(&self, uri: &uri::Https) -> Option<Bytes> {
let mut response = match self.collector.http.response(uri, false) {
Ok(response) => response,
Err(_) => return None,
};
if response.content_length() > self.collector.max_object_size {
warn!(
"Trust anchor certificate {} exceeds size limit. \
Ignoring.",
uri
);
return None
}
let mut bytes = Vec::new();
if let Err(err) = response.copy_to(&mut bytes) {
info!("Failed to get trust anchor {}: {}", uri, err);
return None
}
Some(Bytes::from(bytes))
}
pub fn was_updated(&self, notify_uri: &uri::Https) -> bool {
self.updated.read().unwrap().get(notify_uri).is_some()
}
pub fn load_repository(
&self, rpki_notify: &uri::Https
) -> Result<Option<Repository>, Failed> {
if let Some(repo) = self.updated.read().unwrap().get(rpki_notify) {
return Ok(repo.clone())
}
let mutex = {
self.running.write().unwrap()
.entry(rpki_notify.clone()).or_default()
.clone()
};
let _lock = mutex.lock().unwrap();
if let Some(res) = self.updated.read().unwrap().get(rpki_notify) {
return Ok(res.clone())
}
let repository = Repository::try_update(self, rpki_notify.clone())?;
self.running.write().unwrap().remove(rpki_notify);
self.updated.write().unwrap().insert(
rpki_notify.clone(), repository.clone()
);
Ok(repository)
}
#[allow(clippy::mutable_key_type)]
pub fn cleanup(
&self,
retain: &mut HashSet<uri::Https>
) -> Result<(), Failed> {
for uri in self.updated.read().unwrap().keys() {
retain.insert(uri.clone());
}
for entry in fatal::read_dir(&self.collector.working_dir)? {
let entry = entry?;
if entry.is_file() {
if let Err(err) = fs::remove_file(entry.path()) {
error!(
"Fatal: failed to delete stray file {}: {}",
entry.path().display(), err
);
return Err(Failed)
}
}
else if entry.is_dir() {
self.cleanup_authority(entry.path(), retain)?;
}
}
Ok(())
}
#[allow(clippy::mutable_key_type)]
pub fn cleanup_authority(
&self,
path: &Path,
retain: &HashSet<uri::Https>
) -> Result<(), Failed> {
for entry in fatal::read_dir(path)? {
let entry = entry?;
if entry.is_file() {
if let Err(err) = fs::remove_file(entry.path()) {
error!(
"Fatal: failed to delete stray file {}: {}",
entry.path().display(), err
);
return Err(Failed)
}
}
else if entry.is_dir() {
self.cleanup_repository(entry.path(), retain)?;
}
}
Ok(())
}
#[allow(clippy::mutable_key_type)]
pub fn cleanup_repository(
&self,
path: &Path,
retain: &HashSet<uri::Https>
) -> Result<(), Failed> {
let state_path = path.join(RepositoryState::FILE_NAME);
let keep = match RepositoryState::load_path(&state_path)? {
Some(state) => {
retain.contains(&state.rpki_notify)
}
None => false,
};
if !keep {
debug!("Deleting unused RRDP tree {}.", path.display());
if let Err(err) = fs::remove_dir_all(path) {
error!(
"Fatal: failed to delete tree {}: {}.",
path.display(), err
);
return Err(Failed)
}
}
Ok(())
}
pub fn done(self, metrics: &mut Metrics) {
metrics.rrdp = self.metrics.into_inner().unwrap()
}
}
#[derive(Clone, Debug)]
pub struct Repository {
rpki_notify: uri::Https,
path: PathBuf,
}
impl Repository {
pub fn load_object(
&self,
uri: &uri::Rsync
) -> Result<Option<Bytes>, Failed> {
RepositoryObject::load(&self.object_path(uri)).map(|maybe_obj| {
maybe_obj.map(|obj| obj.content)
})
}
fn object_base(&self) -> PathBuf {
self.path.join("rsync")
}
fn object_path(&self, uri: &uri::Rsync) -> PathBuf {
self.path.join(
format!(
"rsync/{}/{}/{}",
uri.canonical_authority(),
uri.module_name(),
uri.path()
)
)
}
fn tmp_base(&self) -> PathBuf {
self.path.join("tmp")
}
fn tmp_object_path(&self, uri: &uri::Rsync) -> PathBuf {
self.path.join(
format!(
"tmp/{}/{}/{}",
uri.canonical_authority(),
uri.module_name(),
uri.path()
)
)
}
}
impl Repository {
fn try_update(
run: &Run, rpki_notify: uri::Https
) -> Result<Option<Self>, Failed> {
if run.collector.filter_dubious && rpki_notify.has_dubious_authority() {
warn!(
"{}: Dubious host name. Not using the repository.",
rpki_notify
);
return Ok(None)
}
let path = run.collector.repository_path(&rpki_notify);
let repo = Repository { rpki_notify: rpki_notify.clone(), path };
let state = match RepositoryState::load(&repo) {
Ok(state) => {
state
}
Err(_) => {
if let Err(err) = fs::remove_dir_all(&repo.path) {
error!(
"Fatal: failed to delete corrupted repository \
directory {}: {}",
repo.path.display(), err
);
return Err(Failed)
}
None
}
};
let is_current = match state.as_ref() {
Some(state) => !state.is_expired(),
None => false,
};
let best_before = state.as_ref().map(|state| state.best_before());
let start_time = SystemTime::now();
let mut metrics = RrdpRepositoryMetrics::new(rpki_notify.clone());
let is_updated = repo._update(run, state, &mut metrics)?;
metrics.duration = SystemTime::now().duration_since(start_time);
run.metrics.lock().unwrap().push(metrics);
if is_updated || is_current {
Ok(Some(repo))
}
else {
match best_before {
Some(date) => {
info!(
"RRDP {}: Update failed and \
current copy is expired since {}.",
rpki_notify, date
);
},
None => {
info!(
"RRDP {}: Update failed and there is no current copy.",
rpki_notify
);
}
}
Ok(None)
}
}
fn _update(
&self,
run: &Run,
mut state: Option<RepositoryState>,
metrics: &mut RrdpRepositoryMetrics,
) -> Result<bool, Failed> {
let notify = match run.collector.http.notification_file(
&self.rpki_notify,
state.as_ref(),
&mut metrics.notify_status
) {
Ok(Some(notify)) => notify,
Ok(None) => {
self.not_modified(run, state.as_mut())?;
return Ok(true)
}
Err(Failed) => {
return Ok(false)
}
};
metrics.serial = Some(notify.content.serial());
metrics.session = Some(notify.content.session_id());
match self.delta_update(run, state.as_ref(), ¬ify, metrics)? {
None => {
return Ok(true)
}
Some(reason) => {
metrics.snapshot_reason = Some(reason)
}
}
self.snapshot_update(run, ¬ify, metrics)
}
fn not_modified(
&self,
run: &Run,
state: Option<&mut RepositoryState>,
) -> Result<(), Failed> {
debug!("RRDP {}: Not modified.", self.rpki_notify);
if let Some(state) = state {
state.touch(run.collector.fallback_time);
state.write(self)?
}
Ok(())
}
fn snapshot_update(
&self,
run: &Run,
notify: &Notification,
metrics: &mut RrdpRepositoryMetrics,
) -> Result<bool, Failed> {
debug!("RRDP {}: updating from snapshot.", self.rpki_notify);
match SnapshotUpdate::new(
run.collector, self, notify, metrics
).try_update() {
Ok(()) => {
debug!(
"RRDP {}: snapshot update completed.",
self.rpki_notify
);
Ok(true)
}
Err(SnapshotError::Fatal) => Err(Failed),
Err(err) => {
warn!(
"RRDP {}: failed to process snapshot file {}: {}",
self.rpki_notify, notify.content.snapshot().uri(), err
);
Ok(false)
}
}
}
fn delta_update(
&self,
run: &Run,
state: Option<&RepositoryState>,
notify: &Notification,
metrics: &mut RrdpRepositoryMetrics,
) -> Result<Option<SnapshotReason>, Failed> {
let state = match state {
Some(state) => state,
None => return Ok(Some(SnapshotReason::NewRepository)),
};
let deltas = match self.calc_deltas(¬ify.content, state) {
Ok(deltas) => deltas,
Err(reason) => return Ok(Some(reason)),
};
if deltas.len() > run.collector.max_delta_count {
debug!(
"RRDP: {}: Too many delta steps required ({})",
self.rpki_notify, deltas.len()
);
return Ok(Some(SnapshotReason::TooManyDeltas))
}
if !deltas.is_empty() {
let count = deltas.len();
for (i, info) in deltas.iter().enumerate() {
debug!(
"RRDP {}: Delta update step ({}/{}).",
self.rpki_notify, i + 1, count
);
if let Some(reason) = DeltaUpdate::new(
run.collector, self, notify.content.session_id(),
info, metrics
).try_update()? {
info!(
"RRDP {}: Delta update failed, \
trying snapshot instead.",
self.rpki_notify
);
return Ok(Some(reason))
}
}
}
RepositoryState::from_notify(
self.rpki_notify.clone(),
notify,
run.collector.fallback_time
).write(self)?;
debug!("RRDP {}: Delta update completed.", self.rpki_notify);
Ok(None)
}
fn calc_deltas<'b>(
&self,
notify: &'b NotificationFile,
state: &RepositoryState
) -> Result<&'b [rrdp::DeltaInfo], SnapshotReason> {
if notify.session_id() != state.session {
debug!("New session. Need to get snapshot.");
return Err(SnapshotReason::NewSession)
}
debug!("{}: Serials: us {}, them {}.",
self.rpki_notify, state.serial, notify.serial()
);
if notify.serial() == state.serial {
return Ok(&[]);
}
if notify.deltas().last().map(|delta| delta.serial())
!= Some(notify.serial())
{
debug!("Last delta serial differs from current serial.");
return Err(SnapshotReason::BadDeltaSet)
}
let mut deltas = notify.deltas();
let serial = match state.serial.checked_add(1) {
Some(serial) => serial,
None => return Err(SnapshotReason::LargeSerial)
};
loop {
let first = match deltas.first() {
Some(first) => first,
None => {
debug!("Ran out of deltas.");
return Err(SnapshotReason::BadDeltaSet)
}
};
match first.serial().cmp(&serial) {
cmp::Ordering::Greater => {
debug!("First delta is too new ({})", first.serial());
return Err(SnapshotReason::OutdatedLocal)
}
cmp::Ordering::Equal => break,
cmp::Ordering::Less => deltas = &deltas[1..]
}
}
Ok(deltas)
}
}
struct SnapshotUpdate<'a> {
collector: &'a Collector,
repository: &'a Repository,
notify: &'a Notification,
metrics: &'a mut RrdpRepositoryMetrics,
}
impl<'a> SnapshotUpdate<'a> {
pub fn new(
collector: &'a Collector,
repository: &'a Repository,
notify: &'a Notification,
metrics: &'a mut RrdpRepositoryMetrics,
) -> Self {
SnapshotUpdate { collector, repository, notify, metrics }
}
pub fn try_update(mut self) -> Result<(), SnapshotError> {
let response = match self.collector.http.response(
self.notify.content.snapshot().uri(), false
) {
Ok(response) => {
self.metrics.payload_status = Some(response.status().into());
response
}
Err(err) => {
self.metrics.payload_status = Some(HttpStatus::Error);
return Err(err.into())
}
};
let tmp_base = self.repository.tmp_base();
if let Err(err) = fs::create_dir_all(&tmp_base) {
error!(
"Fatal: failed to create RRDP temporary directory {}: {}",
tmp_base.display(), err
);
return Err(SnapshotError::Fatal)
}
match self.try_process(response) {
Ok(()) => {
RepositoryState::remove(self.repository)?;
let object_base = self.repository.object_base();
if let Err(err) = fs::remove_dir_all(&object_base) {
if err.kind() != io::ErrorKind::NotFound {
error!(
"Fatal: failed to delete RRDP object \
directory {}: {}",
object_base.display(), err
);
return Err(SnapshotError::Fatal)
}
}
if let Err(err) = fs::rename(&tmp_base, &object_base) {
error!(
"Fatal: failed to rename {} to {}: {}",
tmp_base.display(), object_base.display(), err
);
return Err(SnapshotError::Fatal)
}
RepositoryState::from_notify(
self.repository.rpki_notify.clone(),
self.notify,
self.collector.fallback_time
).write(self.repository)?;
Ok(())
}
Err(err) => {
if let Err(err) = fs::remove_dir_all(&tmp_base) {
error!(
"Fatal: failed to delete RRDP temporary \
directory {}:{}",
tmp_base.display(), err
);
return Err(SnapshotError::Fatal)
}
Err(err)
}
}
}
pub fn try_process(
&mut self,
response: HttpResponse
) -> Result<(), SnapshotError> {
let mut reader = io::BufReader::new(HashRead::new(response));
self.process(&mut reader)?;
let hash = reader.into_inner().into_hash();
if verify_slices_are_equal(
hash.as_ref(),
self.notify.content.snapshot().hash().as_ref()
).is_err() {
return Err(SnapshotError::HashMismatch)
}
Ok(())
}
}
impl<'a> ProcessSnapshot for SnapshotUpdate<'a> {
type Err = SnapshotError;
fn meta(
&mut self,
session_id: Uuid,
serial: u64,
) -> Result<(), Self::Err> {
if session_id != self.notify.content.session_id() {
return Err(SnapshotError::SessionMismatch {
expected: self.notify.content.session_id(),
received: session_id
})
}
if serial != self.notify.content.serial() {
return Err(SnapshotError::SerialMismatch {
expected: self.notify.content.serial(),
received: serial
})
}
Ok(())
}
fn publish(
&mut self,
uri: uri::Rsync,
data: &mut rrdp::ObjectReader,
) -> Result<(), Self::Err> {
let path = self.repository.tmp_object_path(&uri);
let mut data = MaxSizeRead::new(data, self.collector.max_object_size);
if RepositoryObject::create(&path, &mut data).is_err() {
if data.was_triggered() {
Err(SnapshotError::LargeObject(uri))
}
else {
Err(SnapshotError::Fatal)
}
}
else {
Ok(())
}
}
}
struct DeltaUpdate<'a> {
collector: &'a Collector,
repository: &'a Repository,
session_id: Uuid,
info: &'a DeltaInfo,
metrics: &'a mut RrdpRepositoryMetrics,
publish: HashSet<uri::Rsync>,
withdraw: HashSet<uri::Rsync>,
}
impl<'a> DeltaUpdate<'a> {
pub fn new(
collector: &'a Collector,
repository: &'a Repository,
session_id: Uuid,
info: &'a DeltaInfo,
metrics: &'a mut RrdpRepositoryMetrics,
) -> Self {
DeltaUpdate {
collector, repository, session_id, info, metrics,
publish: Default::default(), withdraw: Default::default(),
}
}
pub fn try_update(
mut self
) -> Result<Option<SnapshotReason>, Failed> {
if let Err(err) = self.collect_changes() {
warn!(
"RRDP {}: failed to process delta: {}",
self.repository.rpki_notify, err
);
return Ok(Some(SnapshotReason::ConflictingDelta))
}
self.apply_changes()?;
Ok(None)
}
fn collect_changes(&mut self) -> Result<(), DeltaError> {
let response = match self.collector.http.response(
self.info.uri(), false
) {
Ok(response) => {
self.metrics.payload_status = Some(response.status().into());
response
}
Err(err) => {
self.metrics.payload_status = Some(HttpStatus::Error);
return Err(err.into())
}
};
self.try_process(response)?;
if let Some(uri) = self.publish.intersection(&self.withdraw).next() {
return Err(DeltaError::ObjectRepeated { uri: uri.clone() })
}
Ok(())
}
fn apply_changes(self) -> Result<(), Failed> {
RepositoryState::remove(self.repository)?;
if self._apply_changes().is_err() {
if let Err(err) = fs::remove_dir_all(&self.repository.path) {
error!(
"Fatal: failed to delete repository directory {}: {}",
self.repository.path.display(), err
);
}
return Err(Failed)
}
RepositoryState::new_for_delta(
self.repository.rpki_notify.clone(),
self.session_id,
self.info.serial(),
self.collector.fallback_time
).write(self.repository)?;
Ok(())
}
fn _apply_changes(&self) -> Result<(), Failed> {
for uri in &self.publish {
let tmp_path = self.repository.tmp_object_path(uri);
let obj_path = self.repository.object_path(uri);
if let Err(err) = fs::remove_file(&obj_path) {
if err.kind() != io::ErrorKind::NotFound {
error!(
"Fatal: failed to delete {}: {}",
obj_path.display(), err
);
return Err(Failed)
}
}
if let Some(parent) = obj_path.parent() {
if let Err(err) = fs::create_dir_all(&parent) {
error!(
"Fatal: failed to create directory {}: {}",
parent.display(), err
);
return Err(Failed)
}
}
if let Err(err) = fs::rename(&tmp_path, &obj_path) {
error!(
"Fatal: failed to move {} to {}: {}",
tmp_path.display(), obj_path.display(), err
);
return Err(Failed)
}
}
for uri in &self.withdraw {
let obj_path = self.repository.object_path(uri);
if let Err(err) = fs::remove_file(&obj_path) {
if err.kind() != io::ErrorKind::NotFound {
error!(
"Fatal: failed to delete {}: {}",
obj_path.display(), err
);
return Err(Failed)
}
}
}
Ok(())
}
pub fn try_process(
&mut self,
response: HttpResponse
) -> Result<(), DeltaError> {
let mut reader = io::BufReader::new(HashRead::new(response));
self.process(&mut reader)?;
let hash = reader.into_inner().into_hash();
if verify_slices_are_equal(
hash.as_ref(),
self.info.hash().as_ref()
).is_err() {
return Err(DeltaError::DeltaHashMismatch)
}
Ok(())
}
fn check_hash(
&self, uri: &uri::Rsync, expected: Option<rrdp::Hash>
) -> Result<(), DeltaError> {
let current = RepositoryObject::load_hash(
&self.repository.object_path(uri)
)?;
if current == expected {
Ok(())
}
else if expected.is_none() {
Err(DeltaError::ObjectAlreadyPresent { uri: uri.clone() })
}
else if current.is_none() {
Err(DeltaError::MissingObject { uri: uri.clone() })
}
else {
Err(DeltaError::ObjectHashMismatch { uri: uri.clone() })
}
}
}
impl<'a> ProcessDelta for DeltaUpdate<'a> {
type Err = DeltaError;
fn meta(
&mut self, session_id: Uuid, serial: u64
) -> Result<(), Self::Err> {
if session_id != self.session_id {
return Err(DeltaError::SessionMismatch {
expected: self.session_id,
received: session_id
})
}
if serial != self.info.serial() {
return Err(DeltaError::SerialMismatch {
expected: self.info.serial(),
received: serial
})
}
Ok(())
}
fn publish(
&mut self,
uri: uri::Rsync,
hash: Option<rrdp::Hash>,
data: &mut rrdp::ObjectReader<'_>
) -> Result<(), Self::Err> {
self.check_hash(&uri, hash)?;
let mut data = MaxSizeRead::new(data, self.collector.max_object_size);
let path = self.repository.tmp_object_path(&uri);
if RepositoryObject::create(&path, &mut data).is_err() {
if data.was_triggered() {
return Err(DeltaError::LargeObject(uri))
}
else {
return Err(DeltaError::Fatal)
}
}
if !self.publish.insert(uri.clone()) {
return Err(DeltaError::ObjectRepeated { uri })
}
Ok(())
}
fn withdraw(
&mut self,
uri: uri::Rsync,
hash: rrdp::Hash
) -> Result<(), Self::Err> {
self.check_hash(&uri, Some(hash))?;
if !self.withdraw.insert(uri.clone()) {
return Err(DeltaError::ObjectRepeated { uri })
}
Ok(())
}
}
#[derive(Debug)]
struct HttpClient {
client: Result<Client, Option<ClientBuilder>>,
response_dir: Option<PathBuf>,
timeout: Option<Duration>,
}
impl HttpClient {
pub fn new(config: &Config) -> Result<Self, Failed> {
#[cfg(not(feature = "native-tls"))]
fn create_builder() -> ClientBuilder {
Client::builder().use_rustls_tls()
}
#[cfg(feature = "native-tls")]
fn create_builder() -> ClientBuilder {
Client::builder().use_native_tls()
}
let mut builder = create_builder();
builder = builder.user_agent(&config.rrdp_user_agent);
builder = builder.timeout(None); if let Some(timeout) = config.rrdp_connect_timeout {
builder = builder.connect_timeout(timeout);
}
if let Some(addr) = config.rrdp_local_addr {
builder = builder.local_address(addr)
}
for path in &config.rrdp_root_certs {
builder = builder.add_root_certificate(
Self::load_cert(path)?
);
}
for proxy in &config.rrdp_proxies {
let proxy = match Proxy::all(proxy) {
Ok(proxy) => proxy,
Err(err) => {
error!(
"Invalid rrdp-proxy '{}': {}", proxy, err
);
return Err(Failed)
}
};
builder = builder.proxy(proxy);
}
Ok(HttpClient {
client: Err(Some(builder)),
response_dir: config.rrdp_keep_responses.clone(),
timeout: config.rrdp_timeout,
})
}
pub fn ignite(&mut self) -> Result<(), Failed> {
let builder = match self.client.as_mut() {
Ok(_) => return Ok(()),
Err(builder) => match builder.take() {
Some(builder) => builder,
None => {
error!("Previously failed to initialize HTTP client.");
return Err(Failed)
}
}
};
let client = match builder.build() {
Ok(client) => client,
Err(err) => {
error!("Failed to initialize HTTP client: {}.", err);
return Err(Failed)
}
};
self.client = Ok(client);
Ok(())
}
fn load_cert(path: &Path) -> Result<Certificate, Failed> {
let mut file = match fs::File::open(path) {
Ok(file) => file,
Err(err) => {
error!(
"Cannot open rrdp-root-cert file '{}': {}'",
path.display(), err
);
return Err(Failed);
}
};
let mut data = Vec::new();
if let Err(err) = io::Read::read_to_end(&mut file, &mut data) {
error!(
"Cannot read rrdp-root-cert file '{}': {}'",
path.display(), err
);
return Err(Failed);
}
Certificate::from_pem(&data).map_err(|err| {
error!(
"Cannot decode rrdp-root-cert file '{}': {}'",
path.display(), err
);
Failed
})
}
fn client(&self) -> &Client {
self.client.as_ref().unwrap()
}
pub fn response(
&self,
uri: &uri::Https,
multi: bool,
) -> Result<HttpResponse, reqwest::Error> {
self._response(uri, self.client().get(uri.as_str()), multi)
}
fn _response(
&self,
uri: &uri::Https,
mut request: RequestBuilder,
multi: bool
) -> Result<HttpResponse, reqwest::Error> {
if let Some(timeout) = self.timeout {
request = request.timeout(timeout);
}
request.send().map(|response| {
HttpResponse::create(response, uri, &self.response_dir, multi)
})
}
pub fn notification_file(
&self,
uri: &uri::Https,
state: Option<&RepositoryState>,
status: &mut HttpStatus,
) -> Result<Option<Notification>, Failed> {
let mut request = self.client().get(uri.as_str());
if let Some(state) = state {
if let Some(etag) = state.etag.as_ref() {
request = request.header(
header::IF_NONE_MATCH, etag.as_ref()
);
}
if let Some(ts) = state.last_modified_ts {
request = request.header(
header::IF_MODIFIED_SINCE,
format_http_date(Utc.timestamp(ts, 0))
);
}
}
let response = match self._response(uri, request, true) {
Ok(response) => {
*status = response.status().into();
response
}
Err(err) => {
warn!("RRDP {}: {}", uri, err);
*status = HttpStatus::Error;
return Err(Failed)
}
};
if response.status() == StatusCode::NOT_MODIFIED {
Ok(None)
}
else if !response.status().is_success() {
warn!(
"RRDP {}: Getting notification file failed with status {}",
uri, response.status()
);
Err(Failed)
}
else {
Notification::from_response(uri, response).map(Some)
}
}
}
struct HttpResponse {
response: Response,
file: Option<fs::File>,
}
impl HttpResponse {
pub fn create(
response: Response,
uri: &uri::Https,
response_dir: &Option<PathBuf>,
multi: bool
) -> Self {
HttpResponse {
response,
file: response_dir.as_ref().and_then(|base| {
Self::open_file(base, uri, multi).ok()
})
}
}
fn open_file(
base: &Path, uri: &uri::Https, multi: bool
) -> Result<fs::File, Failed> {
let path = base.join(&uri.as_str()[8..]);
let path = if multi {
path.join(Utc::now().to_rfc3339())
}
else {
path
};
let parent = match path.parent() {
Some(parent) => parent,
None => {
warn!(
"Cannot keep HTTP response; \
URI translated into a bad path '{}'",
path.display()
);
return Err(Failed)
}
};
if let Err(err) = fs::create_dir_all(&parent) {
warn!(
"Cannot keep HTTP response; \
creating directory {} failed: {}",
parent.display(), err
);
return Err(Failed)
}
fs::File::create(&path).map_err(|err| {
warn!(
"Cannot keep HTTP response; \
creating file {} failed: {}",
path.display(), err
);
Failed
})
}
pub fn content_length(&self) -> Option<u64> {
self.response.content_length()
}
pub fn copy_to<W: io::Write + ?Sized>(
&mut self, w: &mut W
) -> Result<u64, io::Error> {
io::copy(self, w)
}
pub fn status(&self) -> StatusCode {
self.response.status()
}
pub fn etag(&self) -> Option<Bytes> {
let mut etags = self.response.headers()
.get_all(header::ETAG)
.into_iter();
let etag = etags.next()?;
if etags.next().is_some() {
return None
}
Self::parse_etag(etag.as_bytes())
}
fn parse_etag(etag: &[u8]) -> Option<Bytes> {
let start = if etag.starts_with(b"W/\"") {
3
}
else if etag.get(0) == Some(&b'"') {
1
}
else {
return None
};
if etag.len() <= start {
return None
}
if etag.last() != Some(&b'"') {
return None
}
Some(Bytes::copy_from_slice(etag))
}
pub fn last_modified(&self) -> Option<DateTime<Utc>> {
let mut iter = self.response.headers()
.get_all(header::LAST_MODIFIED)
.into_iter();
let value = iter.next()?;
if iter.next().is_some() {
return None
}
parse_http_date(value.to_str().ok()?)
}
}
impl io::Read for HttpResponse {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
let res = self.response.read(buf)?;
if let Some(file) = self.file.as_mut() {
file.write_all(&buf[..res])?;
}
Ok(res)
}
}
struct Notification {
content: NotificationFile,
etag: Option<Bytes>,
last_modified: Option<DateTime<Utc>>,
}
impl Notification {
fn from_response(
uri: &uri::Https, response: HttpResponse
) -> Result<Self, Failed> {
let etag = response.etag();
let last_modified = response.last_modified();
let mut content = NotificationFile::parse(
io::BufReader::new(response)
).map_err(|err| {
warn!("RRDP {}: {}", uri, err);
Failed
})?;
content.sort_deltas();
Ok(Notification { content, etag, last_modified })
}
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct RepositoryState {
pub rpki_notify: uri::Https,
pub session: Uuid,
pub serial: u64,
pub updated_ts: i64,
pub best_before_ts: i64,
pub last_modified_ts: Option<i64>,
pub etag: Option<Bytes>,
}
impl RepositoryState {
pub fn new_for_delta(
rpki_notify: uri::Https,
session: Uuid,
serial: u64,
fallback: FallbackTime,
) -> Self {
RepositoryState {
rpki_notify,
session,
serial,
updated_ts: Utc::now().timestamp(),
best_before_ts: fallback.best_before().timestamp(),
last_modified_ts: None,
etag: None
}
}
pub fn from_notify(
rpki_notify: uri::Https,
notify: &Notification,
fallback: FallbackTime,
) -> Self {
RepositoryState {
rpki_notify,
session: notify.content.session_id(),
serial: notify.content.serial(),
updated_ts: Utc::now().timestamp(),
best_before_ts: fallback.best_before().timestamp(),
last_modified_ts: notify.last_modified.map(|x| x.timestamp()),
etag: notify.etag.clone(),
}
}
pub fn updated(&self) -> DateTime<Utc> {
Utc.timestamp(self.updated_ts, 0)
}
pub fn best_before(&self) -> DateTime<Utc> {
Utc.timestamp(self.best_before_ts, 0)
}
pub fn touch(&mut self, fallback: FallbackTime) {
self.updated_ts = Utc::now().timestamp();
self.best_before_ts = fallback.best_before().timestamp();
}
pub fn is_expired(&self) -> bool {
Utc::now() > self.best_before()
}
pub fn load(repo: &Repository) -> Result<Option<Self>, Failed> {
Self::load_path(&Self::file_path(repo))
}
pub fn load_path(path: &Path) -> Result<Option<Self>, Failed> {
let mut file = match File::open(path) {
Ok(file) => file,
Err(err) if err.kind() == io::ErrorKind::NotFound => {
return Ok(None)
}
Err(err) => {
warn!(
"Failed to open repository state file {}: {}",
path.display(), err
);
return Err(Failed)
}
};
Self::_read(&mut file)
.map(Some)
.map_err(|err| {
warn!(
"Failed to read repository state file {}: {}",
path.display(), err
);
Failed
})
}
pub fn remove(repo: &Repository) -> Result<(), Failed> {
fatal::remove_file(&Self::file_path(repo))
}
fn _read(reader: &mut impl io::Read) -> Result<Self, io::Error> {
let version = u8::parse(reader)?;
if version != 0 {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("unexpected version {}", version)
))
}
Ok(RepositoryState {
rpki_notify: Parse::parse(reader)?,
session: Parse::parse(reader)?,
serial: Parse::parse(reader)?,
updated_ts: Parse::parse(reader)?,
best_before_ts: Parse::parse(reader)?,
last_modified_ts: Parse::parse(reader)?,
etag: Parse::parse(reader)?,
})
}
pub fn write(&self, repo: &Repository) -> Result<(), Failed> {
let path = Self::file_path(repo);
let mut file = match File::create(&path) {
Ok(file) => file,
Err(err) => {
error!(
"Fatal: Failed to open repository state file {}: {}",
path.display(), err
);
return Err(Failed)
}
};
self._write(&mut file).map_err(|err| {
error!(
"Fatal: Failed to write repository state file {}: {}",
path.display(), err
);
Failed
})
}
fn _write(
&self, writer: &mut impl io::Write
) -> Result<(), io::Error> {
0u8.compose(writer)?; self.rpki_notify.compose(writer)?;
self.session.compose(writer)?;
self.serial.compose(writer)?;
self.updated_ts.compose(writer)?;
self.best_before_ts.compose(writer)?;
self.last_modified_ts.compose(writer)?;
self.etag.compose(writer)?;
Ok(())
}
pub const FILE_NAME: &'static str = "state.bin";
pub fn file_path(repo: &Repository) -> PathBuf {
repo.path.join(Self::FILE_NAME)
}
}
#[derive(Clone, Debug)]
struct RepositoryObject {
#[allow(dead_code)]
hash: rrdp::Hash,
content: Bytes,
}
impl RepositoryObject {
pub fn load(path: &Path) -> Result<Option<Self>, Failed> {
let mut file = match Self::open(path)? {
Some(file) => file,
None => return Ok(None)
};
Self::read(&mut file).map(Some).map_err(|err| {
error!("Fatal: failed to read {}: {}", path.display(), err);
Failed
})
}
pub fn load_hash(path: &Path) -> Result<Option<rrdp::Hash>, Failed> {
let mut file = match Self::open(path)? {
Some(file) => file,
None => return Ok(None)
};
rrdp::Hash::parse(&mut file).map(Some).map_err(|err| {
error!("Fatal: failed to read {}: {}", path.display(), err);
Failed
})
}
fn open(path: &Path) -> Result<Option<File>, Failed> {
match File::open(path) {
Ok(file) => Ok(Some(file)),
Err(err) if err.kind() == io::ErrorKind::NotFound => {
Ok(None)
}
Err(err) => {
error!("Fatal: failed to open {}: {}", path.display(), err);
Err(Failed)
}
}
}
fn read(source: &mut impl io::Read) -> Result<Self, io::Error> {
let hash = rrdp::Hash::parse(source)?;
let mut content = Vec::new();
source.read_to_end(&mut content)?;
Ok(RepositoryObject {
hash,
content: content.into(),
})
}
pub fn create(
path: &Path, data: &mut impl io::Read
) -> Result<(), Failed> {
if let Some(parent) = path.parent() {
if let Err(err) = fs::create_dir_all(parent) {
error!(
"Fatal: failed to create directory {}: {}.",
parent.display(), err
);
return Err(Failed)
}
}
let mut target = match File::create(&path) {
Ok(target) => target,
Err(err) => {
error!(
"Fatal: failed to open file {}: {}", path.display(), err
);
return Err(Failed)
}
};
Self::_create(data, &mut target).map_err(|err| {
error!(
"Fatal: failed to write file {}: {}", path.display(), err
);
Failed
})
}
fn _create(
data: &mut impl io::Read, target: &mut File
) -> Result<(), io::Error> {
rrdp::Hash::from([0u8; 32]).compose(target)?;
let mut reader = HashRead::new(data);
io::copy(&mut reader, target)?;
target.seek(SeekFrom::Start(0))?;
reader.into_hash().compose(target)?;
Ok(())
}
}
#[derive(Clone, Copy, Debug)]
struct FallbackTime {
min: Duration,
max: Duration,
}
impl FallbackTime {
pub fn from_config(config: &Config) -> Self {
FallbackTime {
min: config.refresh,
max: cmp::max(2 * config.refresh, config.rrdp_fallback_time)
}
}
pub fn best_before(self) -> DateTime<Utc> {
Utc::now() + chrono::Duration::from_std(
rand::thread_rng().gen_range(self.min..self.max)
).unwrap_or_else(|_| chrono::Duration::milliseconds(i64::MAX))
}
}
struct HashRead<R> {
reader: R,
context: digest::Context,
}
impl<R> HashRead<R> {
pub fn new(reader: R) -> Self {
HashRead {
reader,
context: digest::Context::new(&digest::SHA256)
}
}
pub fn into_hash(self) -> rrdp::Hash {
rrdp::Hash::try_from(self.context.finish()).unwrap()
}
}
impl<R: io::Read> io::Read for HashRead<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
let res = self.reader.read(buf)?;
self.context.update(&buf[..res]);
Ok(res)
}
}
struct MaxSizeRead<R> {
reader: R,
left: Option<u64>,
triggered: bool,
}
impl<R> MaxSizeRead<R> {
pub fn new(reader: R, max_size: Option<u64>) -> Self {
MaxSizeRead { reader, left: max_size, triggered: false }
}
pub fn was_triggered(&self) -> bool {
self.triggered
}
}
impl<R: io::Read> io::Read for MaxSizeRead<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
let res = self.reader.read(buf)?;
if let Some(left) = self.left {
let res64 = match u64::try_from(res) {
Ok(res) => res,
Err(_) => {
self.left = Some(0);
self.triggered = true;
return Err(io::Error::new(
io::ErrorKind::Other, "size limit exceeded"
))
}
};
if res64 > left {
self.left = Some(0);
self.triggered = true;
Err(io::Error::new(
io::ErrorKind::Other, "size limit exceeded")
)
}
else {
self.left = Some(left - res64);
Ok(res)
}
}
else {
Ok(res)
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum SnapshotReason {
NewRepository,
NewSession,
BadDeltaSet,
LargeSerial,
OutdatedLocal,
ConflictingDelta,
TooManyDeltas,
}
impl SnapshotReason {
pub fn code(self) -> &'static str {
use SnapshotReason::*;
match self {
NewRepository => "new-repository",
NewSession => "new-session",
BadDeltaSet => "inconsistent-delta-set",
LargeSerial => "large-serial",
OutdatedLocal => "outdate-local",
ConflictingDelta => "conflicting-delta",
TooManyDeltas => "too-many-deltas",
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum HttpStatus {
Response(StatusCode),
Error
}
impl HttpStatus {
pub fn into_i16(self) -> i16 {
match self {
HttpStatus::Response(code) => code.as_u16() as i16,
HttpStatus::Error => -1
}
}
pub fn is_not_modified(self) -> bool {
matches!(
self,
HttpStatus::Response(code) if code == StatusCode::NOT_MODIFIED
)
}
pub fn is_success(self) -> bool {
matches!(
self,
HttpStatus::Response(code) if code.is_success()
)
}
}
impl From<StatusCode> for HttpStatus {
fn from(code: StatusCode) -> Self {
HttpStatus::Response(code)
}
}
#[derive(Debug)]
enum SnapshotError {
Http(reqwest::Error),
Rrdp(rrdp::ProcessError),
SessionMismatch {
expected: Uuid,
received: Uuid
},
SerialMismatch {
expected: u64,
received: u64,
},
HashMismatch,
LargeObject(uri::Rsync),
Fatal,
}
impl From<reqwest::Error> for SnapshotError {
fn from(err: reqwest::Error) -> Self {
SnapshotError::Http(err)
}
}
impl From<rrdp::ProcessError> for SnapshotError {
fn from(err: rrdp::ProcessError) -> Self {
SnapshotError::Rrdp(err)
}
}
impl From<io::Error> for SnapshotError {
fn from(err: io::Error) -> Self {
SnapshotError::Rrdp(err.into())
}
}
impl From<Failed> for SnapshotError {
fn from(_: Failed) -> Self {
SnapshotError::Fatal
}
}
impl fmt::Display for SnapshotError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
SnapshotError::Http(ref err) => err.fmt(f),
SnapshotError::Rrdp(ref err) => err.fmt(f),
SnapshotError::SessionMismatch { ref expected, ref received } => {
write!(
f,
"session ID mismatch (notification_file: {}, \
snapshot file: {}",
expected, received
)
}
SnapshotError::SerialMismatch { ref expected, ref received } => {
write!(
f,
"serial number mismatch (notification_file: {}, \
snapshot file: {}",
expected, received
)
}
SnapshotError::HashMismatch => {
write!(f, "hash value mismatch")
}
SnapshotError::LargeObject(ref uri) => {
write!(f, "object exceeds size limit: {}", uri)
}
SnapshotError::Fatal => Ok(())
}
}
}
impl error::Error for SnapshotError { }
#[derive(Debug)]
enum DeltaError {
Http(reqwest::Error),
Rrdp(rrdp::ProcessError),
SessionMismatch {
expected: Uuid,
received: Uuid
},
SerialMismatch {
expected: u64,
received: u64,
},
MissingObject {
uri: uri::Rsync,
},
ObjectAlreadyPresent {
uri: uri::Rsync,
},
ObjectHashMismatch {
uri: uri::Rsync,
},
ObjectRepeated {
uri: uri::Rsync,
},
DeltaHashMismatch,
LargeObject(uri::Rsync),
Fatal,
}
impl From<reqwest::Error> for DeltaError {
fn from(err: reqwest::Error) -> Self {
DeltaError::Http(err)
}
}
impl From<rrdp::ProcessError> for DeltaError {
fn from(err: rrdp::ProcessError) -> Self {
DeltaError::Rrdp(err)
}
}
impl From<io::Error> for DeltaError {
fn from(err: io::Error) -> Self {
DeltaError::Rrdp(err.into())
}
}
impl From<Failed> for DeltaError {
fn from(_: Failed) -> Self {
DeltaError::Fatal
}
}
impl fmt::Display for DeltaError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
DeltaError::Http(ref err) => err.fmt(f),
DeltaError::Rrdp(ref err) => err.fmt(f),
DeltaError::SessionMismatch { ref expected, ref received } => {
write!(
f,
"session ID mismatch (notification_file: {}, \
snapshot file: {}",
expected, received
)
}
DeltaError::SerialMismatch { ref expected, ref received } => {
write!(
f,
"serial number mismatch (notification_file: {}, \
snapshot file: {}",
expected, received
)
}
DeltaError::MissingObject { ref uri } => {
write!(
f,
"reference to missing object {}",
uri
)
}
DeltaError::ObjectAlreadyPresent { ref uri } => {
write!(
f,
"attempt to add already present object {}",
uri
)
}
DeltaError::ObjectHashMismatch { ref uri } => {
write!(
f,
"local object {} has different hash",
uri
)
}
DeltaError::ObjectRepeated { ref uri } => {
write!(f, "object appears multiple times: {}", uri)
}
DeltaError::LargeObject(ref uri) => {
write!(f, "object exceeds size limit: {}", uri)
}
DeltaError::DeltaHashMismatch => {
write!(f, "delta file hash value mismatch")
}
DeltaError::Fatal => {
Ok(())
}
}
}
}
impl error::Error for DeltaError { }
#[cfg(test)]
mod test {
use super::*;
use std::str::FromStr;
#[test]
fn write_read_repository_state() {
let orig = RepositoryState {
rpki_notify: uri::Https::from_str(
"https://foo.bar/bla/blubb"
).unwrap(),
session: Uuid::nil(),
serial: 12,
updated_ts: 28,
best_before_ts: 892,
last_modified_ts: Some(23),
etag: Some(Bytes::copy_from_slice(b"23890"))
};
let mut written = Vec::new();
orig._write(&mut written).unwrap();
let mut slice = written.as_slice();
let decoded = RepositoryState::_read(&mut slice).unwrap();
assert!(slice.is_empty());
assert_eq!(orig, decoded);
}
}