use std::{fs, io};
use std::io::{BufRead, BufReader, Read, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed;
use std::time::SystemTime;
use bytes::Bytes;
use log::{info, warn};
use ring::digest;
use ring::constant_time::verify_slices_are_equal;
use rpki::uri;
use rpki::rrdp::{DigestHex, NotificationFile, UriAndHash};
use unwrap::unwrap;
use uuid::Uuid;
use crate::metrics::RrdpServerMetrics;
use crate::operation::Error;
use super::http::{DeltaTargets, HttpClient};
use super::utils::create_unique_dir;
#[derive(Debug)]
pub struct Server {
notify_uri: uri::Https,
server_dir: ServerDir,
updated: AtomicBool,
broken: AtomicBool,
mutex: Mutex<RrdpServerMetrics>,
}
impl Server {
fn new(
notify_uri: uri::Https,
server_dir: ServerDir,
broken: bool
) -> Self {
Server {
mutex: Mutex::new(RrdpServerMetrics::new(notify_uri.clone())),
notify_uri,
server_dir,
updated: AtomicBool::new(broken),
broken: AtomicBool::new(broken),
}
}
pub fn existing(notify_uri: uri::Https, server_dir: PathBuf) -> Self {
Self::new(notify_uri, ServerDir::new(server_dir), false)
}
pub fn create(notify_uri: uri::Https, cache_dir: &Path) -> Self {
let (server_dir, broken) = match ServerDir::create(cache_dir) {
Ok(server_dir) => (server_dir, false),
Err(server_dir) => (server_dir, true),
};
Self::new(notify_uri, server_dir, broken)
}
pub fn server_dir(&self) -> &Path {
&self.server_dir.base
}
pub fn update(&self, http: &HttpClient) {
if self.updated.load(Relaxed) {
return
}
let mut metrics = unwrap!(self.mutex.lock());
if self.updated.load(Relaxed) {
return
}
let start_time = SystemTime::now();
if self.try_update(http, &mut metrics).is_err() && self.check_broken() {
let _ = fs::remove_dir_all(self.server_dir.base());
}
self.updated.store(true, Relaxed);
metrics.duration = SystemTime::now().duration_since(start_time);
}
fn try_update(
&self,
http: &HttpClient,
metrics: &mut RrdpServerMetrics
) -> Result<(), Error> {
info!("RRDP {}: Updating server", self.notify_uri);
metrics.serial = None;
let notify = http.notification_file(
&self.notify_uri, &mut metrics.notify_status
)?;
if self.delta_update(¬ify, http, metrics).is_ok() {
info!("RRDP {}: Delta update succeeded.", self.notify_uri);
return Ok(())
}
self.snapshot_update(¬ify, http, metrics)
}
fn delta_update(
&self,
notify: &NotificationFile,
http: &HttpClient,
metrics: &mut RrdpServerMetrics
) -> Result<(), Error> {
let mut state = ServerState::load(self.server_dir.state_path())?;
let deltas = match Self::calc_deltas(notify, &state)? {
Some(deltas) => deltas,
None => {
return self.server_dir.check_digest(&state.hash)
}
};
let targets = self.collect_delta_targets(
&state, notify, deltas, http
);
let targets = match targets {
Ok(targets) => targets,
Err(_) => {
return Err(Error)
}
};
self.server_dir.check_digest(&state.hash)?;
if targets.apply().is_err() {
return Err(Error);
}
state.serial = notify.serial;
state.hash = match self.server_dir.digest() {
Ok(hash) => hash.into(),
Err(_) => {
return Err(Error);
}
};
if state.save(self.server_dir.state_path()).is_err() {
return Err(Error);
}
metrics.serial = Some(state.serial);
Ok(())
}
fn calc_deltas<'a>(
notify: &'a NotificationFile,
state: &ServerState
) -> Result<Option<&'a [(usize, UriAndHash)]>, Error> {
if notify.session_id != state.session {
info!("New session. Need to get snapshot.");
return Err(Error);
}
info!("Serials: us {}, them {}", state.serial, notify.serial);
if notify.serial == state.serial {
return Ok(None);
}
if notify.deltas.last().map(|delta| delta.0) != Some(notify.serial) {
info!("Last delta serial differs from current serial.");
return Err(Error)
}
let mut deltas = notify.deltas.as_slice();
let serial = match state.serial.checked_add(1) {
Some(serial) => serial,
None => return Err(Error)
};
loop {
let first = match deltas.first() {
Some(first) => first,
None => {
info!("Ran out of deltas.");
return Err(Error)
}
};
if first.0 > serial {
info!("First delta is too new ({})", first.0);
return Err(Error)
}
else if first.0 == serial {
break
}
else {
deltas = &deltas[1..];
}
}
Ok(Some(deltas))
}
fn collect_delta_targets(
&self,
state: &ServerState,
notify: &NotificationFile,
deltas: &[(usize, UriAndHash)],
http: &HttpClient
) -> Result<DeltaTargets, Error> {
self.server_dir.check_digest(&state.hash)?;
let mut targets = DeltaTargets::new(http.tmp_dir())?;
for delta in deltas {
http.delta(
&self.notify_uri, notify, delta, &mut targets,
|uri| self.server_dir.uri_path(uri)
)?
}
Ok(targets)
}
fn snapshot_update(
&self,
notify: &NotificationFile,
http: &HttpClient,
metrics: &mut RrdpServerMetrics
) -> Result<(), Error> {
info!("RRDP {}: updating from snapshot.", self.notify_uri);
let tmp_dir = ServerDir::create(http.tmp_dir()).map_err(|_| Error)?;
let state = match self.snapshot_into_tmp(notify, http, &tmp_dir) {
Ok(state) => state,
Err(_) => {
let _ = fs::remove_dir_all(tmp_dir.base());
return Err(Error);
}
};
self.move_from_tmp(tmp_dir)?;
metrics.serial = Some(state.serial);
Ok(())
}
fn snapshot_into_tmp(
&self,
notify: &NotificationFile,
http: &HttpClient,
tmp_dir: &ServerDir,
) -> Result<ServerState, Error> {
http.snapshot(notify, |uri| tmp_dir.uri_path(uri))?;
let state = ServerState {
notify_uri: self.notify_uri().clone(),
session: notify.session_id,
serial: notify.serial,
hash: tmp_dir.digest()?.into(),
};
state.save(tmp_dir.state_path())?;
Ok(state)
}
fn move_from_tmp(&self, tmp_dir: ServerDir) -> Result<(), Error> {
let _ = fs::remove_file(self.server_dir.state_path());
let state_res = fs::rename(
tmp_dir.state_path(), self.server_dir.state_path()
).map_err(|err| {
info!(
"Failed to move RRDP state file '{}' from temporary location \
'{}': {}.",
self.server_dir.state_path().display(),
tmp_dir.state_path().display(),
err
);
Error
});
let _ = fs::remove_dir_all(self.server_dir.data_path());
let data_res = fs::rename(
tmp_dir.data_path(), self.server_dir.data_path()
).map_err(|err| {
info!(
"Failed to move RRDP data directory '{}' from temporary \
location '{}': {}.",
self.server_dir.data_path().display(),
tmp_dir.data_path().display(),
err
);
Error
});
let _ = fs::remove_dir_all(tmp_dir.base());
if state_res.is_err() || data_res.is_err() {
Err(Error)
}
else {
Ok(())
}
}
fn check_broken(&self) -> bool {
let state = match ServerState::load(self.server_dir.state_path()) {
Ok(state) => state,
Err(_) => {
info!(
"Marking RRPD server '{}' as unusable",
self.notify_uri
);
self.broken.store(true, Relaxed);
return true;
}
};
let digest = match self.server_dir.digest() {
Ok(digest) => digest,
Err(_) => {
info!(
"Cannot digest RRPD server directory for '{}'. \
Marking as unsable.",
self.notify_uri
);
self.broken.store(true, Relaxed);
return true;
}
};
if verify_slices_are_equal(digest.as_ref(), state.hash.as_ref())
.is_err() {
info!(
"Hash for RRDP server directory for '{}' doesn’t match. \
Marking as unusable.",
self.notify_uri
);
self.broken.store(true, Relaxed);
true
}
else {
false
}
}
pub fn notify_uri(&self) -> &uri::Https {
&self.notify_uri
}
pub fn is_broken(&self) -> bool {
self.broken.load(Relaxed)
}
pub fn load_file(&self, uri: &uri::Rsync) -> Result<Option<Bytes>, Error> {
if self.broken.load(Relaxed) {
return Err(Error)
}
let path = self.server_dir.uri_path(uri);
let mut file = match fs::File::open(&path) {
Ok(file) => file,
Err(err) => {
if err.kind() == io::ErrorKind::NotFound {
info!("{} not found in its RRDP repository.", uri);
}
else {
warn!(
"Failed to open file '{}': {}.",
path.display(), err
);
}
return Ok(None)
}
};
let mut data = Vec::new();
if let Err(err) = file.read_to_end(&mut data) {
warn!(
"Failed to read file '{}': {}",
path.display(), err
);
return Ok(None)
}
Ok(Some(data.into()))
}
pub fn remove_unused(&self) -> bool {
if self.updated.load(Relaxed) && !self.broken.load(Relaxed) {
return false
}
let _ = fs::remove_dir_all(self.server_dir.base());
true
}
pub fn metrics(&self) -> Option<RrdpServerMetrics> {
if self.updated.load(Relaxed) {
match self.mutex.try_lock() {
Ok(metrics) => Some(metrics.clone()),
Err(err) => {
panic!("Failed to acquire metrics lock: {}", err);
}
}
}
else {
None
}
}
}
#[derive(Clone, Debug)]
struct ServerDir {
base: PathBuf,
state: PathBuf,
}
impl ServerDir {
fn new(base: PathBuf) -> Self {
ServerDir {
state: base.join("state.txt"),
base
}
}
fn broken() -> Self {
ServerDir {
base: PathBuf::new(),
state: PathBuf::new()
}
}
fn create(cache_dir: &Path) -> Result<Self, Self> {
match create_unique_dir(cache_dir) {
Ok(path) => Ok(ServerDir::new(path)),
Err(_) => Err(ServerDir::broken())
}
}
fn base(&self) -> &Path {
&self.base
}
fn state_path(&self) -> &Path {
&self.state
}
fn data_path(&self) -> PathBuf {
self.base.join("data")
}
fn module_path(&self, module: &uri::RsyncModule) -> PathBuf {
let mut res = self.data_path();
res.push(module.authority());
res.push(module.module());
res
}
fn uri_path(&self, uri: &uri::Rsync) -> PathBuf {
let mut res = self.module_path(uri.module());
res.push(uri.path());
res
}
pub fn digest(&self) -> Result<digest::Digest, Error> {
self._digest().map_err(|err| {
info!(
"Failed to caculate digest for '{}': {}",
self.data_path().display(), err
);
Error
})
}
fn _digest(&self) -> Result<digest::Digest, io::Error> {
let mut entries = Vec::new();
let mut dirs = vec![self.data_path()];
let mut context = digest::Context::new(&digest::SHA256);
while let Some(dir) = dirs.pop() {
for entry in dir.read_dir()? {
let entry = entry?;
let metadata = entry.metadata()?;
let name = entry.file_name();
if metadata.is_dir() {
entries.push((name, Ok(entry.path())))
}
else if metadata.is_file() {
entries.push((name, Err(metadata.len())))
}
}
entries.sort_by(|left, right| left.0.cmp(&right.0));
for (name, other) in entries.drain(..) {
context.update(name.to_string_lossy().as_bytes());
match other {
Ok(path) => dirs.push(path),
Err(len) => context.update(&len.to_ne_bytes()),
}
}
}
Ok(context.finish())
}
pub fn check_digest(&self, hash: &DigestHex) -> Result<(), Error> {
let digest = self.digest()?;
verify_slices_are_equal(digest.as_ref(), hash.as_ref()).map_err(|_| {
info!(
"Mismatch of digest for '{}'. Content must have changed.",
self.data_path().display()
);
Error
})
}
}
#[derive(Clone, Debug)]
pub struct ServerState {
pub notify_uri: uri::Https,
pub session: Uuid,
pub serial: usize,
pub hash: DigestHex,
}
impl ServerState {
pub fn load(path: &Path) -> Result<Self, Error> {
Self::_load(path).map_err(|err| {
if err.kind() != io::ErrorKind::NotFound {
info!(
"Failed to read state file '{}': {}",
path.display(), err
);
}
Error
})
}
fn _load(path: &Path) -> Result<Self, io::Error> {
let file = BufReader::new(fs::File::open(path)?);
let mut lines = file.lines();
let res = ServerState {
notify_uri: process_line(&mut lines, "notify-uri:")?,
session: process_line(&mut lines, "session:")?,
serial: process_line(&mut lines, "serial:")?,
hash: process_line(&mut lines, "hash:")?,
};
if lines.next().is_some() {
Err(io::Error::new(io::ErrorKind::InvalidData, "invalid data"))
}
else {
Ok(res)
}
}
pub fn save(&self, path: &Path) -> Result<(), Error> {
self._save(path).map_err(|err| {
info!(
"Failed to read write file '{}': {}",
path.display(), err
);
Error
})
}
fn _save(&self, path: &Path) -> Result<(), io::Error> {
let mut file = fs::File::create(path)?;
writeln!(
file, "notify-uri: {}\nsession: {}\nserial: {}\nhash: {}",
self.notify_uri, self.session, self.serial, self.hash
)
}
}
fn process_line<B: io::BufRead, T: FromStr>(
lines: &mut io::Lines<B>, expected_key: &str
) -> Result<T, io::Error> {
let line = lines.next().ok_or_else(||
io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF")
)??;
let mut line = line.split_whitespace();
let key = line.next().ok_or_else(||
io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF")
)?;
if key != expected_key {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid data"
))
}
let value = line.next().ok_or_else(||
io::Error::new(io::ErrorKind::UnexpectedEof, "unexpected EOF")
)?;
if line.next().is_some() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"invalid data"
))
}
match T::from_str(value) {
Ok(value) => Ok(value),
Err(_) => Err(io::Error::new(io::ErrorKind::InvalidData, "bad value"))
}
}