use std::{cmp, io, fs};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use chrono::{DateTime, TimeZone, Utc};
use log::{error, warn};
use rand::Rng;
use rpki::{rrdp, uri};
use uuid::Uuid;
use crate::config::Config;
use crate::error::RunFailed;
use crate::utils::archive;
use crate::utils::archive::{
Archive, AppendArchive, ArchiveError, ArchiveStats, FetchError,
OpenError, PublishError,
};
use crate::utils::binio::{Compose, Parse};
#[derive(Debug)]
pub struct RrdpArchive {
path: Arc<PathBuf>,
archive: archive::Archive<RrdpObjectMeta>,
}
impl RrdpArchive {
pub fn create(
path: Arc<PathBuf>
) -> Result<Self, RunFailed> {
let archive = Archive::create(path.as_ref()).map_err(|err| {
archive_err(err, path.as_ref())
})?;
Ok(Self { path, archive })
}
pub fn create_with_file(
file: fs::File,
path: Arc<PathBuf>,
) -> Result<Self, RunFailed> {
let archive = Archive::create_with_file(file).map_err(|err| {
archive_err(err, path.as_ref())
})?;
Ok(Self { path, archive })
}
pub fn try_open(path: Arc<PathBuf>) -> Result<Option<Self>, RunFailed> {
let archive = match Archive::open(path.as_ref(), true) {
Ok(archive) => archive,
Err(OpenError::NotFound) => return Ok(None),
Err(OpenError::Archive(err)) => {
return Err(archive_err(err, path.as_ref()))
}
};
Ok(Some(Self { path, archive }))
}
pub fn open(path: Arc<PathBuf>) -> Result<Self, RunFailed> {
let archive = archive::Archive::open(
path.as_ref(), false
).map_err(|err| match err {
OpenError::NotFound => {
warn!(
"RRDP repository file {} not found.", path.display()
);
RunFailed::retry()
}
OpenError::Archive(err) => archive_err(err, path.as_ref())
})?;
Ok(Self { path, archive })
}
pub fn path(&self) -> &Arc<PathBuf> {
&self.path
}
}
impl RrdpArchive {
pub fn verify(path: &Path) -> Result<ArchiveStats, OpenError> {
let archive = archive::Archive::<RrdpObjectMeta>::open(path, false)?;
Ok(archive.verify()?)
}
pub fn load_object(
&self,
uri: &uri::Rsync
) -> Result<Option<Bytes>, RunFailed> {
let res = self.archive.fetch_bytes(uri.as_ref());
match res {
Ok(res) => Ok(Some(res)),
Err(FetchError::NotFound) => Ok(None),
Err(FetchError::Archive(err)) => {
Err(archive_err(err, self.path.as_ref()))
}
}
}
pub fn load_state(&self) -> Result<RepositoryState, RunFailed> {
let data = match self.archive.fetch(b"state") {
Ok(data) => data,
Err(archive::FetchError::NotFound) => {
return Err(
archive_err(
ArchiveError::Corrupt("state object missing"),
self.path.as_ref()
)
)
}
Err(archive::FetchError::Archive(err)) => {
return Err(archive_err(err, self.path.as_ref()))
}
};
let mut data = data.as_ref();
RepositoryState::parse(&mut data).map_err(|_| {
archive_err(
ArchiveError::Corrupt("parse error"),
self.path.as_ref()
)
})
}
pub fn objects(
&self
) -> Result<
impl Iterator<Item = Result<(uri::Rsync, Bytes), RunFailed>> + '_,
RunFailed
> {
self.archive.objects().map(|iter| {
iter.filter_map(|item| {
let (name, _meta, data) = match item {
Ok(some) => some,
Err(ArchiveError::Corrupt(_)) => {
return Some(Err(RunFailed::retry()))
}
Err(ArchiveError::Io(_)) => {
return Some(Err(RunFailed::fatal()))
}
};
let name = uri::Rsync::from_bytes(
name.into_owned().into()
).ok()?;
Some(Ok((name, data.into_owned().into())))
})
}).map_err(|err| {
match err {
ArchiveError::Corrupt(_) => RunFailed::retry(),
ArchiveError::Io(_) => RunFailed::fatal(),
}
})
}
}
impl RrdpArchive {
pub fn publish_object(
&mut self,
uri: &uri::Rsync,
content: &[u8]
) -> Result<(), PublishError> {
self.archive.publish(
uri.as_ref(),
&RrdpObjectMeta::from_content(content),
content
)
}
pub fn update_object(
&mut self,
uri: &uri::Rsync,
hash: rrdp::Hash,
content: &[u8]
) -> Result<(), AccessError> {
Ok(self.archive.update(
uri.as_ref(),
&RrdpObjectMeta::from_content(content),
content,
|meta| {
if meta.hash == hash {
Ok(())
}
else {
Err(HashMismatch)
}
}
)?)
}
pub fn delete_object(
&mut self, uri: &uri::Rsync, hash: rrdp::Hash,
) -> Result<(), AccessError> {
Ok(self.archive.delete(
uri.as_ref(),
|meta| {
if meta.hash == hash {
Ok(())
}
else {
Err(HashMismatch)
}
}
)?)
}
pub fn publish_state(
&mut self, state: &RepositoryState
) -> Result<(), RunFailed> {
let mut buf = Vec::new();
state.compose(&mut buf).expect("writing to vec failed");
self.archive.publish(
b"state", &Default::default(), &buf
).map_err(|err| match err {
archive::PublishError::Archive(ArchiveError::Io(err)) => {
error!(
"Fatal: Failed write to RRDP repository archive {}: {}",
self.path.display(), err
);
RunFailed::fatal()
}
_ => {
warn!(
"Failed to write local RRDP repository state in {}.",
self.path.display()
);
RunFailed::retry()
}
})
}
pub fn update_state(
&mut self, state: &RepositoryState
) -> Result<(), RunFailed> {
let mut buf = Vec::new();
state.compose(&mut buf).expect("writing to vec failed");
self.archive.update(
b"state", &Default::default(), &buf,
|_| Ok(())
).map_err(|err| match err {
archive::AccessError::Archive(ArchiveError::Io(err)) => {
error!(
"Fatal: Failed write to RRDP repository archive {}: {}",
self.path.display(), err
);
RunFailed::fatal()
}
_ => {
warn!(
"Failed to update local RRDP repository state in {}.",
self.path.display()
);
RunFailed::retry()
}
})
}
}
#[derive(Debug)]
pub struct SnapshotRrdpArchive {
path: Arc<PathBuf>,
archive: AppendArchive<RrdpObjectMeta>,
}
impl SnapshotRrdpArchive {
pub fn create_with_file(
file: fs::File,
path: Arc<PathBuf>,
) -> Result<Self, RunFailed> {
let archive = AppendArchive::create_with_file(file).map_err(|err| {
archive_err(err, path.as_ref())
})?;
Ok(Self { path, archive })
}
pub fn path(&self) -> &Arc<PathBuf> {
&self.path
}
pub fn publish_object(
&mut self,
uri: &uri::Rsync,
content: &[u8]
) -> Result<(), PublishError> {
self.archive.publish(
uri.as_ref(),
&RrdpObjectMeta::from_content(content),
content
)
}
pub fn publish_state(
&mut self, state: &RepositoryState
) -> Result<(), RunFailed> {
let mut buf = Vec::new();
state.compose(&mut buf).expect("writing to vec failed");
self.archive.publish(
b"state", &Default::default(), &buf
).map_err(|err| match err {
archive::PublishError::Archive(ArchiveError::Io(err)) => {
error!(
"Fatal: Failed write to RRDP repository archive {}: {}",
self.path.display(), err
);
RunFailed::fatal()
}
_ => {
warn!(
"Failed to write local RRDP repository state in {}.",
self.path.display()
);
RunFailed::retry()
}
})
}
pub fn finalize(&mut self) -> Result<(), RunFailed> {
self.archive.finalize().map_err(|err| {
error!(
"Fatal: Failed to write RRDP repository archive {}: {}",
self.path.display(), err
);
RunFailed::fatal()
})
}
}
fn archive_err(err: ArchiveError, path: &Path) -> RunFailed {
match err {
ArchiveError::Corrupt(err) => {
warn!(
"RRDP repository file '{}' is corrupt ('{}'). \
Deleting and starting again.",
path.display(), err,
);
match fs::remove_file(path) {
Ok(()) => {
RunFailed::retry()
}
Err(err) => {
warn!(
"Deleting RRDP repository archive '{}' failed: {}",
path.display(),
err
);
RunFailed::fatal()
}
}
}
ArchiveError::Io(err) => {
error!(
"Fatal: Failed to access RRDP repository archive '{}': {}",
path.display(),
err
);
RunFailed::fatal()
}
}
}
#[derive(Clone, Copy, Debug)]
pub struct RrdpObjectMeta {
hash: rrdp::Hash,
}
impl Default for RrdpObjectMeta {
fn default() -> Self {
Self {
hash: [0; 32].into(),
}
}
}
impl RrdpObjectMeta {
pub fn from_content(content: &[u8]) -> Self {
Self {
hash: rrdp::Hash::from_data(content)
}
}
}
impl archive::ObjectMeta for RrdpObjectMeta {
const SIZE: usize = 32;
type ConsistencyError = HashMismatch;
fn write(
&self, write: &mut archive::StorageWrite
) -> Result<(), ArchiveError> {
write.write(self.hash.as_slice())
}
fn read(
read: &mut archive::StorageRead
) -> Result<Self, ArchiveError> {
Ok(Self { hash: read.read_array()?.into() })
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct RepositoryState {
pub rpki_notify: uri::Https,
pub session: Uuid,
pub serial: u64,
pub updated_ts: i64,
pub best_before_ts: i64,
pub last_modified_ts: Option<i64>,
pub etag: Option<Bytes>,
pub delta_state: HashMap<u64, rrdp::Hash>,
}
impl RepositoryState {
const VERSION: u8 = 1;
fn parse(reader: &mut impl io::Read) -> Result<Self, io::Error> {
let version = u8::parse(reader)?;
if version != Self::VERSION {
return Err(io::Error::other(
format!("unexpected version {version}")
))
}
Ok(RepositoryState {
rpki_notify: Parse::parse(reader)?,
session: Parse::parse(reader)?,
serial: Parse::parse(reader)?,
updated_ts: Parse::parse(reader)?,
best_before_ts: Parse::parse(reader)?,
last_modified_ts: Parse::parse(reader)?,
etag: Parse::parse(reader)?,
delta_state: Parse::parse(reader)?,
})
}
fn compose(&self, writer: &mut impl io::Write) -> Result<(), io::Error> {
Self::VERSION.compose(writer)?; self.rpki_notify.compose(writer)?;
self.session.compose(writer)?;
self.serial.compose(writer)?;
self.updated_ts.compose(writer)?;
self.best_before_ts.compose(writer)?;
self.last_modified_ts.compose(writer)?;
self.etag.compose(writer)?;
self.delta_state.compose(writer)?;
Ok(())
}
pub fn updated(&self) -> Option<DateTime<Utc>> {
Utc.timestamp_opt(self.updated_ts, 0).single()
}
pub fn best_before(&self) -> Option<DateTime<Utc>> {
Utc.timestamp_opt(self.best_before_ts, 0).single()
}
pub fn touch(&mut self, fallback: FallbackTime) {
self.updated_ts = Utc::now().timestamp();
self.best_before_ts = fallback.best_before().timestamp();
}
pub fn is_expired(&self) -> bool {
match self.best_before() {
Some(best_before) => Utc::now() > best_before,
None => true,
}
}
pub fn last_modified(&self) -> Option<DateTime<Utc>> {
self.last_modified_ts.and_then(|ts| Utc.timestamp_opt(ts, 0).single())
}
}
#[derive(Clone, Copy, Debug)]
pub struct FallbackTime {
min: Duration,
max: Duration,
}
impl FallbackTime {
pub fn from_config(config: &Config) -> Self {
FallbackTime {
min: config.refresh,
max: cmp::max(2 * config.refresh, config.rrdp_fallback_time)
}
}
pub fn best_before(self) -> DateTime<Utc> {
Utc::now() + chrono::Duration::from_std(
rand::rng().random_range(self.min..self.max)
).unwrap_or_else(|_| {
chrono::Duration::try_milliseconds(i64::MAX).unwrap()
})
}
}
#[derive(Debug)]
pub struct HashMismatch;
#[derive(Debug)]
pub enum AccessError {
NotFound,
HashMismatch,
Archive(ArchiveError),
}
impl From<archive::AccessError<HashMismatch>> for AccessError {
fn from(err: archive::AccessError<HashMismatch>) -> Self {
match err {
archive::AccessError::NotFound => AccessError::NotFound,
archive::AccessError::Inconsistent(_) => AccessError::HashMismatch,
archive::AccessError::Archive(err) => AccessError::Archive(err),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use std::str::FromStr;
#[test]
fn compose_parse_repository_state() {
let state = RepositoryState {
rpki_notify: uri::Https::from_str(
"https://foo.bar/baz"
).unwrap(),
session: Uuid::from_u128(0xa1a2a3a4b1b2c1c2d1d2d3d4d5d6d7d8u128),
serial: 0x1234567812345678u64,
updated_ts: -12,
best_before_ts: 123789123789123,
last_modified_ts: Some(239123908123),
etag: None,
delta_state: [
(18, rrdp::Hash::from_data(b"123")),
(19, rrdp::Hash::from_data(b"332")),
].iter().cloned().collect(),
};
let mut buf = Vec::new();
state.compose(&mut buf).unwrap();
let parsed = RepositoryState::parse(&mut buf.as_slice()).unwrap();
assert_eq!(state, parsed);
}
}