use std::error::Error;
use std::{error, fmt, io};
use std::collections::HashSet;
use std::io::Read;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use log::{error, warn};
use reqwest::StatusCode;
use ring::digest;
use rpki::{rrdp, uri};
use rpki::rrdp::{DeltaInfo, NotificationFile, ProcessDelta, ProcessSnapshot};
use uuid::Uuid;
use crate::error::{Failed, RunFailed};
use crate::log::LogBookWriter;
use crate::metrics::RrdpRepositoryMetrics;
use crate::utils::archive::{ArchiveError, PublishError};
use super::archive::{
AccessError, FallbackTime, RepositoryState, RrdpArchive,
SnapshotRrdpArchive,
};
use super::base::Collector;
use super::http::{HttpClient, HttpResponse, HttpStatus};
pub struct Notification {
uri: uri::Https,
content: NotificationFile,
etag: Option<Bytes>,
last_modified: Option<DateTime<Utc>>,
}
impl Notification {
pub fn get(
http: &HttpClient,
uri: &uri::Https,
state: Option<&RepositoryState>,
status: &mut HttpStatus,
delta_list_limit: usize,
log: &mut LogBookWriter,
) -> Result<Option<Self>, Failed> {
let response = match http.conditional_response(
uri,
state.and_then(|state| state.etag.as_ref()),
state.and_then(|state| state.last_modified()),
) {
Ok(response) => {
*status = response.status().into();
response
}
Err(err) => {
if let Some(source) = err.source() {
log.warn(format_args!("{err} ({source})"));
} else {
log.warn(format_args!("{err}"));
}
*status = HttpStatus::Error;
return Err(Failed)
}
};
if response.status() == StatusCode::NOT_MODIFIED {
Ok(None)
}
else if response.status() != StatusCode::OK {
log.warn(format_args!(
"Getting notification file failed with status {}",
response.status()
));
Err(Failed)
}
else {
Notification::from_response(
uri.clone(), response, delta_list_limit, log,
).map(Some)
}
}
fn from_response(
uri: uri::Https,
response: HttpResponse,
delta_list_limit: usize,
log: &mut LogBookWriter,
) -> Result<Self, Failed> {
let etag = response.etag();
let last_modified = response.last_modified();
let mut content = NotificationFile::parse_limited(
io::BufReader::new(response), delta_list_limit
).map_err(|err| {
log.warn(format_args!("{err}"));
Failed
})?;
if !content.has_matching_origins(&uri) {
log.warn(format_args!(
"snapshot or delta files with different origin"
));
return Err(Failed)
}
content.sort_deltas();
Ok(Notification { uri, content, etag, last_modified })
}
pub fn content(&self) -> &NotificationFile {
&self.content
}
pub fn to_repository_state(
&self, fallback: FallbackTime,
) -> RepositoryState {
RepositoryState {
rpki_notify: self.uri.clone(),
session: self.content.session_id(),
serial: self.content.serial(),
updated_ts: Utc::now().timestamp(),
best_before_ts: fallback.best_before().timestamp(),
last_modified_ts: self.last_modified.map(|x| x.timestamp()),
etag: self.etag.clone(),
delta_state: self.content.deltas().iter().map(|delta| {
(delta.serial(), delta.hash())
}).collect(),
}
}
pub fn check_deltas(
&self, state: &RepositoryState
) -> Result<(), SnapshotReason> {
for delta in self.content().deltas() {
if let Some(state_hash) = state.delta_state.get(&delta.serial()) {
if delta.hash() != *state_hash {
return Err(SnapshotReason::DeltaMutation)
}
}
}
Ok(())
}
}
pub struct SnapshotUpdate<'a> {
collector: &'a Collector,
archive: &'a mut SnapshotRrdpArchive,
notify: &'a Notification,
metrics: &'a mut RrdpRepositoryMetrics,
}
impl<'a> SnapshotUpdate<'a> {
pub fn new(
collector: &'a Collector,
archive: &'a mut SnapshotRrdpArchive,
notify: &'a Notification,
metrics: &'a mut RrdpRepositoryMetrics,
) -> Self {
SnapshotUpdate { collector, archive, notify, metrics }
}
pub fn try_update(mut self) -> Result<(), SnapshotError> {
let response = match self.collector.http().response(
self.notify.content.snapshot().uri()
) {
Ok(response) => {
self.metrics.payload_status = Some(response.status().into());
if response.status() != StatusCode::OK {
return Err(response.status().into())
}
else {
response
}
}
Err(err) => {
self.metrics.payload_status = Some(HttpStatus::Error);
return Err(err.into())
}
};
let mut reader = io::BufReader::new(HashRead::new(response));
self.process(&mut reader)?;
reader.into_inner().verify_hash(
self.notify.content.snapshot().hash()
)?;
self.archive.publish_state(
&self.notify.to_repository_state(
self.collector.config().fallback_time
)
)?;
self.archive.finalize()?;
Ok(())
}
}
impl ProcessSnapshot for SnapshotUpdate<'_> {
type Err = SnapshotError;
fn meta(
&mut self,
session_id: Uuid,
serial: u64,
) -> Result<(), Self::Err> {
if session_id != self.notify.content.session_id() {
return Err(SnapshotError::SessionMismatch {
expected: self.notify.content.session_id(),
received: session_id
})
}
if serial != self.notify.content.serial() {
return Err(SnapshotError::SerialMismatch {
expected: self.notify.content.serial(),
received: serial
})
}
Ok(())
}
fn publish(
&mut self,
uri: uri::Rsync,
data: &mut rrdp::ObjectReader,
) -> Result<(), Self::Err> {
let content = RrdpDataRead::new(
data, &uri, self.collector.config().max_object_size,
).read_all()?;
self.archive.publish_object(&uri, &content).map_err(|err| match err {
PublishError::AlreadyExists => {
SnapshotError::DuplicateObject(uri.clone())
}
PublishError::Archive(ArchiveError::Corrupt(_)) => {
warn!(
"Temporary RRDP repository file {} became corrupt.",
self.archive.path().display(),
);
SnapshotError::RunFailed(RunFailed::retry())
}
PublishError::Archive(ArchiveError::Io(err)) => {
error!(
"Fatal: Failed to write to temporary RRDP repository file \
{}: {}",
self.archive.path().display(), err,
);
SnapshotError::RunFailed(RunFailed::fatal())
}
})
}
}
pub struct DeltaUpdate<'a> {
collector: &'a Collector,
archive: &'a mut RrdpArchive,
session_id: Uuid,
info: &'a DeltaInfo,
metrics: &'a mut RrdpRepositoryMetrics,
seen: HashSet<uri::Rsync>,
}
impl<'a> DeltaUpdate<'a> {
pub fn new(
collector: &'a Collector,
archive: &'a mut RrdpArchive,
session_id: Uuid,
info: &'a DeltaInfo,
metrics: &'a mut RrdpRepositoryMetrics,
) -> Self {
DeltaUpdate {
collector, archive, session_id, info, metrics,
seen: Default::default(),
}
}
pub fn try_update(mut self) -> Result<(), DeltaError> {
let response = match self.collector.http().response(
self.info.uri()
) {
Ok(response) => {
self.metrics.payload_status = Some(response.status().into());
if response.status() != StatusCode::OK {
return Err(response.status().into())
}
else {
response
}
}
Err(err) => {
self.metrics.payload_status = Some(HttpStatus::Error);
return Err(err.into())
}
};
let mut reader = io::BufReader::new(HashRead::new(response));
self.process(&mut reader)?;
reader.into_inner().verify_hash(self.info.hash())?;
Ok(())
}
}
impl ProcessDelta for DeltaUpdate<'_> {
type Err = DeltaError;
fn meta(
&mut self, session_id: Uuid, serial: u64
) -> Result<(), Self::Err> {
if session_id != self.session_id {
return Err(DeltaError::SessionMismatch {
expected: self.session_id,
received: session_id
})
}
if serial != self.info.serial() {
return Err(DeltaError::SerialMismatch {
expected: self.info.serial(),
received: serial
})
}
Ok(())
}
fn publish(
&mut self,
uri: uri::Rsync,
hash: Option<rrdp::Hash>,
data: &mut rrdp::ObjectReader<'_>
) -> Result<(), Self::Err> {
if !self.seen.insert(uri.clone()) {
return Err(DeltaError::ObjectRepeated { uri })
}
let content = RrdpDataRead::new(
data, &uri, self.collector.config().max_object_size
).read_all()?;
match hash {
Some(hash) => {
self.archive.update_object(
&uri, hash, &content
).map_err(|err| match err {
AccessError::NotFound => {
DeltaError::MissingObject { uri: uri.clone() }
}
AccessError::HashMismatch => {
DeltaError::ObjectHashMismatch { uri: uri.clone() }
}
AccessError::Archive(err) => DeltaError::Archive(err),
})
}
None => {
self.archive.publish_object(&uri, &content).map_err(|err| {
match err {
PublishError::AlreadyExists => {
DeltaError::ObjectAlreadyPresent {
uri: uri.clone()
}
}
PublishError::Archive(err) => {
DeltaError::Archive(err)
}
}
})
}
}
}
fn withdraw(
&mut self,
uri: uri::Rsync,
hash: rrdp::Hash
) -> Result<(), Self::Err> {
if !self.seen.insert(uri.clone()) {
return Err(DeltaError::ObjectRepeated { uri })
}
self.archive.delete_object(&uri, hash).map_err(|err| match err {
AccessError::NotFound => {
DeltaError::MissingObject { uri: uri.clone() }
}
AccessError::HashMismatch => {
DeltaError::ObjectHashMismatch { uri: uri.clone() }
}
AccessError::Archive(err) => DeltaError::Archive(err),
})
}
}
struct HashRead<R> {
reader: R,
context: digest::Context,
}
impl<R> HashRead<R> {
pub fn new(reader: R) -> Self {
HashRead {
reader,
context: digest::Context::new(&digest::SHA256)
}
}
pub fn verify_hash(
self, expected: rrdp::Hash
) -> Result<(), HashMismatch> {
if self.context.finish().as_ref() != expected.as_ref() {
Err(HashMismatch)
}
else {
Ok(())
}
}
}
impl<R: io::Read> io::Read for HashRead<R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
let res = self.reader.read(buf)?;
self.context.update(&buf[..res]);
Ok(res)
}
}
struct RrdpDataRead<'a, R> {
reader: R,
uri: &'a uri::Rsync,
left: Option<u64>,
err: Option<RrdpDataReadError>,
}
impl<'a, R> RrdpDataRead<'a, R> {
pub fn new(reader: R, uri: &'a uri::Rsync, max_size: Option<u64>) -> Self {
RrdpDataRead { reader, uri, left: max_size, err: None }
}
pub fn take_err(&mut self) -> Option<RrdpDataReadError> {
self.err.take()
}
}
impl<R: io::Read> RrdpDataRead<'_, R> {
pub fn read_all(mut self) -> Result<Vec<u8>, RrdpDataReadError> {
let mut content = Vec::new();
if let Err(io_err) = self.read_to_end(&mut content) {
return Err(
match self.take_err() {
Some(data_err) => data_err,
None => RrdpDataReadError::Read(io_err),
}
)
}
Ok(content)
}
}
impl<R: io::Read> io::Read for RrdpDataRead<'_, R> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
let res = match self.reader.read(buf) {
Ok(res) => res,
Err(err) => {
self.err = Some(RrdpDataReadError::Read(err));
return Err(io::Error::other("reading data failed"))
}
};
if let Some(left) = self.left {
let res64 = match u64::try_from(res) {
Ok(res) => res,
Err(_) => {
self.left = Some(0);
self.err = Some(
RrdpDataReadError::LargeObject(self.uri.clone())
);
return Err(io::Error::other("size limit exceeded"))
}
};
if res64 > left {
self.left = Some(0);
self.err = Some(
RrdpDataReadError::LargeObject(self.uri.clone())
);
Err(io::Error::other("size limit exceeded"))
}
else {
self.left = Some(left - res64);
Ok(res)
}
}
else {
Ok(res)
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum SnapshotReason {
NewRepository,
NewSession,
BadDeltaSet,
LargeDeltaSet,
DeltaMutation,
LargeSerial,
OutdatedLocal,
ConflictingDelta,
TooManyDeltas,
CorruptArchive,
}
impl SnapshotReason {
pub fn code(self) -> &'static str {
use SnapshotReason::*;
match self {
NewRepository => "new-repository",
NewSession => "new-session",
BadDeltaSet => "inconsistent-delta-set",
LargeDeltaSet => "large-delta-set",
DeltaMutation => "delta-mutation",
LargeSerial => "large-serial",
OutdatedLocal => "outdate-local",
ConflictingDelta => "conflicting-delta",
TooManyDeltas => "too-many-deltas",
CorruptArchive => "corrupt-local-copy",
}
}
}
#[derive(Debug)]
enum RrdpDataReadError {
LargeObject(uri::Rsync),
Read(io::Error),
}
struct HashMismatch;
#[derive(Debug)]
pub enum SnapshotError {
Http(reqwest::Error),
HttpStatus(StatusCode),
Rrdp(rrdp::ProcessError),
SessionMismatch {
expected: Uuid,
received: Uuid
},
SerialMismatch {
expected: u64,
received: u64,
},
DuplicateObject(uri::Rsync),
HashMismatch,
LargeObject(uri::Rsync),
RunFailed(RunFailed),
}
impl From<reqwest::Error> for SnapshotError {
fn from(err: reqwest::Error) -> Self {
SnapshotError::Http(err)
}
}
impl From<StatusCode> for SnapshotError {
fn from(code: StatusCode) -> Self {
SnapshotError::HttpStatus(code)
}
}
impl From<rrdp::ProcessError> for SnapshotError {
fn from(err: rrdp::ProcessError) -> Self {
SnapshotError::Rrdp(err)
}
}
impl From<io::Error> for SnapshotError {
fn from(err: io::Error) -> Self {
SnapshotError::Rrdp(err.into())
}
}
impl From<RunFailed> for SnapshotError {
fn from(err: RunFailed) -> Self {
SnapshotError::RunFailed(err)
}
}
impl From<RrdpDataReadError> for SnapshotError {
fn from(err: RrdpDataReadError) -> Self {
match err {
RrdpDataReadError::LargeObject(uri) => {
SnapshotError::LargeObject(uri)
}
RrdpDataReadError::Read(err) => {
SnapshotError::Rrdp(err.into())
}
}
}
}
impl From<HashMismatch> for SnapshotError {
fn from(_: HashMismatch) -> Self {
Self::HashMismatch
}
}
impl fmt::Display for SnapshotError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
SnapshotError::Http(ref err) => err.fmt(f),
SnapshotError::HttpStatus(status) => {
write!(f, "HTTP {status}")
}
SnapshotError::Rrdp(ref err) => err.fmt(f),
SnapshotError::SessionMismatch { ref expected, ref received } => {
write!(
f,
"session ID mismatch (notification_file: {expected}, \
snapshot file: {received}"
)
}
SnapshotError::SerialMismatch { ref expected, ref received } => {
write!(
f,
"serial number mismatch (notification_file: {expected}, \
snapshot file: {received}"
)
}
SnapshotError::DuplicateObject(ref uri) => {
write!(f, "duplicate object: {uri}")
}
SnapshotError::HashMismatch => {
write!(f, "hash value mismatch")
}
SnapshotError::LargeObject(ref uri) => {
write!(f, "object exceeds size limit: {uri}")
}
SnapshotError::RunFailed(_) => Ok(()),
}
}
}
impl error::Error for SnapshotError { }
#[derive(Debug)]
pub enum DeltaError {
Http(reqwest::Error),
HttpStatus(StatusCode),
Rrdp(rrdp::ProcessError),
SessionMismatch {
expected: Uuid,
received: Uuid
},
SerialMismatch {
expected: u64,
received: u64,
},
MissingObject {
uri: uri::Rsync,
},
ObjectAlreadyPresent {
uri: uri::Rsync,
},
ObjectHashMismatch {
uri: uri::Rsync,
},
ObjectRepeated {
uri: uri::Rsync,
},
DeltaHashMismatch,
LargeObject(uri::Rsync),
Archive(ArchiveError),
}
impl From<reqwest::Error> for DeltaError {
fn from(err: reqwest::Error) -> Self {
DeltaError::Http(err)
}
}
impl From<StatusCode> for DeltaError {
fn from(code: StatusCode) -> Self {
DeltaError::HttpStatus(code)
}
}
impl From<rrdp::ProcessError> for DeltaError {
fn from(err: rrdp::ProcessError) -> Self {
DeltaError::Rrdp(err)
}
}
impl From<io::Error> for DeltaError {
fn from(err: io::Error) -> Self {
DeltaError::Rrdp(err.into())
}
}
impl From<RrdpDataReadError> for DeltaError {
fn from(err: RrdpDataReadError) -> Self {
match err {
RrdpDataReadError::LargeObject(uri) => {
DeltaError::LargeObject(uri)
}
RrdpDataReadError::Read(err) => {
DeltaError::Rrdp(err.into())
}
}
}
}
impl From<HashMismatch> for DeltaError {
fn from(_: HashMismatch) -> Self {
Self::DeltaHashMismatch
}
}
impl fmt::Display for DeltaError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
DeltaError::Http(ref err) => err.fmt(f),
DeltaError::HttpStatus(status) => {
write!(f, "HTTP {status}")
}
DeltaError::Rrdp(ref err) => err.fmt(f),
DeltaError::SessionMismatch { ref expected, ref received } => {
write!(
f,
"session ID mismatch (notification_file: {expected}, \
snapshot file: {received}"
)
}
DeltaError::SerialMismatch { ref expected, ref received } => {
write!(
f,
"serial number mismatch (notification_file: {expected}, \
snapshot file: {received}"
)
}
DeltaError::MissingObject { ref uri } => {
write!(
f,
"reference to missing object {uri}"
)
}
DeltaError::ObjectAlreadyPresent { ref uri } => {
write!(
f,
"attempt to add already present object {uri}"
)
}
DeltaError::ObjectHashMismatch { ref uri } => {
write!(
f,
"local object {uri} has different hash"
)
}
DeltaError::ObjectRepeated { ref uri } => {
write!(f, "object appears multiple times: {uri}")
}
DeltaError::LargeObject(ref uri) => {
write!(f, "object exceeds size limit: {uri}")
}
DeltaError::DeltaHashMismatch => {
write!(f, "delta file hash value mismatch")
}
DeltaError::Archive(ref err) => {
write!(f, "archive error: {err}")
}
}
}
}
impl error::Error for DeltaError { }