use std::{fs, io};
use std::fs::File;
use std::io::{Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use bytes::Bytes;
use chrono::{TimeZone, Utc};
use log::{debug, error, info, warn};
use rand::random;
use rpki::crypto::digest::DigestAlgorithm;
use rpki::repository::cert::Cert;
use rpki::repository::manifest::ManifestHash;
use rpki::repository::tal::TalUri;
use rpki::repository::x509::Time;
use rpki::uri;
use crate::collector;
use crate::config::Config;
use crate::engine::CaCert;
use crate::error::Failed;
use crate::metrics::Metrics;
use crate::utils::fatal;
use crate::utils::binio::{Compose, Parse, ParseError};
use crate::utils::dump::DumpRegistry;
use crate::utils::json::JsonBuilder;
use crate::utils::uri::UriExt;
#[derive(Clone, Debug)]
pub struct Store {
path: PathBuf,
}
impl Store {
fn create_base_dir(config: &Config) -> Result<PathBuf, Failed> {
let path = config.cache_dir.join("stored");
if let Err(err) = fs::create_dir_all(&path) {
error!(
"Failed to create store directory {}: {}",
path.display(), err
);
return Err(Failed)
}
Ok(path)
}
pub fn init(config: &Config) -> Result<(), Failed> {
Self::create_base_dir(config)?;
Ok(())
}
pub fn new(config: &Config) -> Result<Self, Failed> {
Ok(Store {
path: Self::create_base_dir(config)?,
})
}
pub fn start(&self) -> Run {
Run::new(self)
}
pub fn dump(&self, dir: &Path) -> Result<(), Failed> {
self.dump_ta_certs(dir)?;
let dir = dir.join("store");
debug!("Dumping store content to {}", dir.display());
fatal::remove_dir_all(&dir)?;
let mut repos = DumpRegistry::new(dir);
self.dump_tree(&self.rsync_repository_path(), &mut repos)?;
self.dump_tree(&self.rrdp_repository_base(), &mut repos)?;
self.dump_repository_json(repos)?;
debug!("Store dump complete.");
Ok(())
}
fn dump_ta_certs(
&self,
target_base: &Path
) -> Result<(), Failed> {
let source = self.path.join("ta");
let target = target_base.join("ta");
debug!("Dumping trust anchor certificates to {}", target.display());
fatal::remove_dir_all(&target)?;
fatal::copy_dir_all(&source, &target)?;
debug!("Trust anchor certificate dump complete.");
Ok(())
}
fn dump_tree(
&self,
path: &Path,
repos: &mut DumpRegistry,
) -> Result<(), Failed> {
for entry in fatal::read_dir(path)? {
let entry = entry?;
if entry.is_dir() {
self.dump_tree(entry.path(), repos)?;
}
else if entry.is_file() {
self.dump_point(entry.path(), repos)?;
}
}
Ok(())
}
fn dump_point(
&self,
path: &Path,
repos: &mut DumpRegistry,
) -> Result<(), Failed> {
let mut file = match File::open(path) {
Ok(file) => file,
Err(err) => {
error!(
"Fatal: failed to open file {}: {}",
path.display(), err
);
return Err(Failed)
}
};
let manifest = match StoredManifest::read(&mut file) {
Ok(some) => some,
Err(err) => {
error!(
"Skipping {}: failed to read file: {}",
path.display(), err
);
return Ok(())
}
};
let repo_dir = repos.get_repo_path(manifest.rpki_notify.as_ref());
self.dump_object(
&repo_dir, &manifest.manifest_uri, &manifest.manifest
)?;
self.dump_object(&repo_dir, &manifest.crl_uri, &manifest.crl)?;
loop {
let object = match StoredObject::read(&mut file) {
Ok(Some(object)) => object,
Ok(None) => break,
Err(err) => {
warn!(
"Partially skipping {}: failed to read file: {}",
path.display(), err
);
return Ok(())
}
};
self.dump_object(&repo_dir, &object.uri, &object.content)?;
}
Ok(())
}
fn dump_object(
&self,
dir: &Path,
uri: &uri::Rsync,
content: &[u8]
) -> Result<(), Failed> {
let path = dir.join(
format!("{}/{}/{}",
uri.canonical_authority(),
uri.module_name(),
uri.path()
)
);
if let Some(dir) = path.parent() {
fatal::create_dir_all(dir)?;
}
let mut target = match File::create(&path) {
Ok(some) => some,
Err(err) => {
error!(
"Fatal: cannot create target file {}: {}",
path.display(), err
);
return Err(Failed)
}
};
if let Err(err) = target.write_all(content) {
error!(
"Fatal: failed to write to target file {}: {}",
path.display(), err
);
return Err(Failed)
}
Ok(())
}
fn dump_repository_json(
&self,
repos: DumpRegistry,
) -> Result<(), Failed> {
let path = repos.base_dir().join("repositories.json");
fatal::write_file(
&path,
JsonBuilder::build(|builder| {
builder.member_array("repositories", |builder| {
for (key, value) in repos.rrdp_uris() {
builder.array_object(|builder| {
builder.member_str(
"path", value
);
builder.member_str("type", "rrdp");
builder.member_str(
"rpkiNotify",
key
);
})
}
builder.array_object(|builder| {
builder.member_str("path", "rsync");
builder.member_str("type", "rsync");
});
})
}).as_bytes()
)
}
fn ta_path(&self, uri: &TalUri) -> PathBuf {
match *uri {
TalUri::Rsync(ref uri) => {
self.path.join(
uri.unique_path("ta/rsync", ".cer")
)
}
TalUri::Https(ref uri) => {
self.path.join(
uri.unique_path("ta/https", ".cer")
)
}
}
}
const RRDP_BASE: &'static str = "rrdp";
fn rrdp_repository_base(&self) -> PathBuf {
self.path.join(Self::RRDP_BASE)
}
fn rrdp_repository_path(&self, uri: &uri::Https) -> PathBuf {
self.path.join(uri.unique_path(Self::RRDP_BASE, ""))
}
fn rsync_repository_path(&self) -> PathBuf {
self.path.join("rsync")
}
const TMP_BASE: &'static str = "tmp";
fn tmp_file(&self) -> Result<(PathBuf, File), Failed> {
let tmp_dir = self.path.join(Self::TMP_BASE);
fatal::create_dir_all(&tmp_dir)?;
for _ in 0..100 {
let tmp_path = tmp_dir.join(format!("{:08x}", random::<u32>()));
let file = {
fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&tmp_path)
};
match file {
Ok(file) => return Ok((tmp_path, file)),
Err(ref err) if err.kind() == io::ErrorKind::AlreadyExists => {
continue
}
Err(err) => {
error!(
"Fatal: failed to create temporary file {}: {}",
tmp_path.display(), err
);
return Err(Failed)
}
}
}
error!(
"Fatal: repeatedly failed to create temporary file in {}",
tmp_dir.display()
);
Err(Failed)
}
}
#[derive(Debug)]
pub struct Run<'a> {
store: &'a Store,
}
impl<'a> Run<'a> {
fn new(
store: &'a Store,
) -> Self {
Run { store }
}
pub fn done(self, _metrics: &mut Metrics) {
}
pub fn load_ta(&self, uri: &TalUri) -> Result<Option<Bytes>, Failed> {
fatal::read_existing_file(&self.store.ta_path(uri)).map(|maybe| {
maybe.map(Into::into)
})
}
pub fn update_ta(
&self, uri: &TalUri, content: &[u8]
) -> Result<(), Failed> {
let path = self.store.ta_path(uri);
if let Some(dir) = path.parent() {
fatal::create_dir_all(dir)?;
}
fatal::write_file(&path, content)
}
pub fn repository(&self, ca_cert: &CaCert) -> Repository<'a> {
let (path, rrdp) = if let Some(rpki_notify) = ca_cert.rpki_notify() {
(self.store.rrdp_repository_path(rpki_notify), true)
}
else {
(self.store.rsync_repository_path(), false)
};
Repository::new(self.store, path, rrdp)
}
pub fn pub_point(
&self, ca_cert: &CaCert
) -> Result<StoredPoint<'a>, Failed> {
self.repository(ca_cert).get_point(ca_cert.rpki_manifest())
}
pub fn cleanup(
&self,
collector: &mut collector::Cleanup,
) -> Result<(), Failed> {
self.cleanup_ta()?;
self.cleanup_points(&self.store.rrdp_repository_base(), collector)?;
self.cleanup_points(&self.store.rsync_repository_path(), collector)?;
self.cleanup_tmp()?;
Ok(())
}
fn cleanup_ta(&self) -> Result<(), Failed> {
cleanup_dir_tree(&self.store.path.join("ta"), |path| {
let content = fatal::read_file(path)?;
if let Ok(cert) = Cert::decode(Bytes::from(content)) {
if cert.validity().not_after() > Time::now() {
return Ok(true)
}
}
Ok(false)
})
}
fn cleanup_points(
&self,
base: &Path,
retain: &mut collector::Cleanup,
) -> Result<(), Failed> {
cleanup_dir_tree(base, |path| {
if let Ok(stored) = StoredManifest::read(
&mut fatal::open_file(path)?
) {
if stored.retain() {
if let Some(uri) = stored.rpki_notify.as_ref() {
retain.add_rrdp_repository(uri)
}
else {
retain.add_rsync_module(&stored.manifest_uri)
}
return Ok(true)
}
}
Ok(false)
})
}
fn cleanup_tmp(&self) -> Result<(), Failed> {
cleanup_dir_tree(&self.store.path.join("tmp"), |_path| {
Ok(false)
})
}
}
pub struct Repository<'a> {
store: &'a Store,
path: PathBuf,
is_rrdp: bool,
}
impl<'a> Repository<'a> {
fn new(store: &'a Store, path: PathBuf, is_rrdp: bool) -> Self {
Repository { store, path, is_rrdp }
}
pub fn is_rrdp(&self) -> bool {
self.is_rrdp
}
pub fn get_point(
&self, manifest_uri: &uri::Rsync
) -> Result<StoredPoint<'a>, Failed> {
StoredPoint::open(
self.store, self.point_path(manifest_uri), self.is_rrdp
)
}
fn point_path(&self, manifest_uri: &uri::Rsync) -> PathBuf {
self.path.join(
format!(
"rsync/{}/{}/{}",
manifest_uri.canonical_authority(),
manifest_uri.module_name(),
manifest_uri.path(),
)
)
}
}
pub struct StoredPoint<'a> {
store: &'a Store,
path: PathBuf,
file: Option<File>,
manifest: Option<StoredManifest>,
is_rrdp: bool,
}
impl<'a> StoredPoint<'a> {
fn open(
store: &'a Store,
path: PathBuf,
is_rrdp: bool,
) -> Result<Self, Failed> {
let mut file = match File::open(&path) {
Ok(file) => file,
Err(ref err) if err.kind() == io::ErrorKind::NotFound => {
return Ok(StoredPoint {
store, path,
file: None,
manifest: None,
is_rrdp
})
}
Err(err) => {
error!(
"Failed to open stored publication point at {}: {}",
path.display(), err
);
return Err(Failed)
}
};
let manifest = match StoredManifest::read(&mut file) {
Ok(manifest) => Some(manifest),
Err(err) => {
if err.is_fatal() {
error!(
"Failed to read stored publication point at {}: {}",
path.display(), err
);
return Err(Failed)
}
else {
info!(
"Ignoring invalid stored publication point at {}: {}",
path.display(), err
);
None
}
}
};
Ok(StoredPoint {
store, path,
file: if manifest.is_some() {
Some(file)
}
else {
None
},
manifest,
is_rrdp
})
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn is_rrdp(&self) -> bool {
self.is_rrdp
}
pub fn manifest(&self) -> Option<&StoredManifest> {
self.manifest.as_ref()
}
pub fn take_manifest(&mut self) -> Option<StoredManifest> {
self.manifest.take()
}
pub fn update(
&mut self,
manifest: StoredManifest,
mut objects: impl FnMut() -> Result<Option<StoredObject>, UpdateError>
) -> Result<(), UpdateError> {
let (tmp_path, mut tmp_file) = self.store.tmp_file()?;
if let Err(err) = manifest.write(&mut tmp_file) {
error!(
"Fatal: failed to write to file {}: {}",
tmp_path.display(), err
);
return Err(UpdateError::Fatal)
}
let tmp_object_start = match tmp_file.seek(SeekFrom::Current(0)) {
Ok(some) => some,
Err(err) => {
error!(
"Fatal: failed to get position in file {}: {}",
tmp_path.display(), err
);
return Err(UpdateError::Fatal)
}
};
loop {
match objects() {
Ok(Some(object)) => {
if let Err(err) = object.write(&mut tmp_file) {
error!(
"Fatal: failed to write to file {}: {}",
tmp_path.display(), err
);
return Err(UpdateError::Fatal)
}
}
Ok(None) => break,
Err(err) => {
drop(tmp_file);
fatal::remove_file(&tmp_path)?;
return Err(err)
}
}
}
drop(tmp_file);
let existing = self.file.is_some();
drop(self.file.take());
if existing {
fatal::remove_file(&self.path)?;
}
else if let Some(path) = self.path.parent() {
fatal::create_dir_all(path)?;
}
fatal::rename(&tmp_path, &self.path)?;
let mut file = fatal::open_file(&self.path)?;
if let Err(err) = file.seek(SeekFrom::Start(tmp_object_start)) {
error!(
"Fatal: failed to position file {}: {}",
self.path.display(), err
);
return Err(UpdateError::Fatal)
}
self.file = Some(file);
self.manifest = Some(manifest);
Ok(())
}
}
impl<'a> Iterator for StoredPoint<'a> {
type Item = Result<StoredObject, ParseError>;
fn next(&mut self) -> Option<Self::Item> {
StoredObject::read(self.file.as_mut()?).transpose()
}
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct StoredManifest {
not_after: Time,
rpki_notify: Option<uri::Https>,
ca_repository: uri::Rsync,
manifest_uri: uri::Rsync,
manifest: Bytes,
crl_uri: uri::Rsync,
crl: Bytes,
}
impl StoredManifest {
pub fn new(
not_after: Time,
rpki_notify: Option<uri::Https>,
ca_repository: uri::Rsync,
manifest_uri: uri::Rsync,
manifest: Bytes,
crl_uri: uri::Rsync,
crl: Bytes,
) -> Self {
StoredManifest {
not_after, rpki_notify, ca_repository,
manifest_uri, manifest, crl_uri, crl
}
}
pub fn read(reader: &mut impl io::Read) -> Result<Self, ParseError> {
let version = u8::parse(reader)?;
if version != 0 {
return Err(ParseError::format(
format!("unexpected version {}", version)
))
}
let not_after = match Utc.timestamp_opt(
i64::parse(reader)?, 0
).single() {
Some(not_after) => not_after.into(),
None => {
return Err(ParseError::format(
String::from("invalid not_after time")
))
}
};
let rpki_notify = Option::parse(reader)?;
let ca_repository = uri::Rsync::parse(reader)?;
let manifest_uri = uri::Rsync::parse(reader)?;
let manifest = Bytes::parse(reader)?;
let crl_uri = uri::Rsync::parse(reader)?;
let crl = Bytes::parse(reader)?;
Ok(StoredManifest::new(
not_after, rpki_notify, ca_repository,
manifest_uri, manifest, crl_uri, crl
))
}
pub fn write(
&self, writer: &mut impl io::Write
) -> Result<(), io::Error> {
0u8.compose(writer)?;
self.not_after.timestamp().compose(writer)?;
self.rpki_notify.compose(writer)?;
self.ca_repository.compose(writer)?;
self.manifest_uri.compose(writer)?;
self.manifest.compose(writer)?;
self.crl_uri.compose(writer)?;
self.crl.compose(writer)?;
Ok(())
}
fn retain(&self) -> bool {
self.not_after > Time::now()
}
}
impl StoredManifest {
pub fn not_after(&self) -> Time {
self.not_after
}
pub fn ca_repository(&self) -> &uri::Rsync {
&self.ca_repository
}
pub fn manifest(&self) -> &Bytes {
&self.manifest
}
pub fn crl(&self) -> &Bytes {
&self.crl
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StoredObject {
uri: uri::Rsync,
hash: Option<ManifestHash>,
content: Bytes,
}
impl StoredObject {
pub fn new(
uri: uri::Rsync,
content: Bytes,
hash: Option<ManifestHash>,
) -> Self {
StoredObject { uri, hash, content }
}
pub fn read(
reader: &mut impl io::Read
) -> Result<Option<Self>, ParseError> {
let version = match u8::parse(reader) {
Ok(version) => version,
Err(err) => {
if err.is_eof() {
return Ok(None)
}
else {
return Err(err)
}
}
};
if version != 0 {
return Err(ParseError::format(
format!("unexpected version {}", version)
))
}
let uri = uri::Rsync::parse(reader)?;
let hash = match u8::parse(reader)? {
0 => None,
1 => {
let algorithm = DigestAlgorithm::sha256();
let mut value = vec![0u8; algorithm.digest_len()];
reader.read_exact(&mut value)?;
Some(ManifestHash::new(value.into(), algorithm))
}
hash_type => {
return Err(ParseError::format(
format!("unsupported hash type {}", hash_type)
));
}
};
let content = Bytes::parse(reader)?;
Ok(Some(StoredObject { uri, hash, content }))
}
pub fn write(
&self, writer: &mut impl io::Write
) -> Result<(), io::Error> {
0u8.compose(writer)?;
self.uri.compose(writer)?;
match self.hash.as_ref() {
Some(hash) if hash.algorithm().is_sha256() => {
1u8.compose(writer)?;
writer.write_all(hash.as_slice())?;
}
_ => {
0u8.compose(writer)?;
}
}
self.content.compose(writer)?;
Ok(())
}
pub fn uri(&self) -> &uri::Rsync {
&self.uri
}
pub fn content(&self) -> &Bytes {
&self.content
}
pub fn into_content(self) -> Bytes {
self.content
}
}
#[derive(Clone, Copy, Debug)]
pub enum UpdateError {
Abort,
Fatal,
}
impl From<Failed> for UpdateError {
fn from(_: Failed) -> Self {
UpdateError::Fatal
}
}
fn cleanup_dir_tree(
base: &Path,
mut keep: impl FnMut(&Path) -> Result<bool, Failed>
) -> Result<(), Failed> {
fn recurse(
base: &Path,
top: bool,
op: &mut impl FnMut(&Path) -> Result<bool, Failed>
) -> Result<bool, Failed> {
let dir = if top {
match fatal::read_existing_dir(base)? {
Some(dir) => dir,
None => return Ok(false),
}
}
else {
fatal::read_dir(base)?
};
let mut keep = false;
for entry in dir {
let entry = entry?;
if entry.is_dir() {
if !recurse(entry.path(), false, op)? {
fatal::remove_dir_all(entry.path())?;
}
else {
keep = true;
}
}
else if entry.is_file() {
if !op(entry.path())? {
fatal::remove_file(entry.path())?;
}
else {
keep = true;
}
}
}
Ok(keep)
}
recurse(base, true, &mut keep).map(|_| ())
}
#[cfg(test)]
mod test {
use std::str::FromStr;
use super::*;
#[test]
fn write_read_stored_manifest() {
let mut orig = StoredManifest::new(
Time::utc(2021, 2, 18, 13, 22, 6),
Some(uri::Https::from_str("https://foo.bar/bla/blubb").unwrap()),
uri::Rsync::from_str("rsync://foo.bar/bla/blubb").unwrap(),
uri::Rsync::from_str("rsync://foo.bar/bla/blubb").unwrap(),
Bytes::from(b"foobar".as_ref()),
uri::Rsync::from_str("rsync://foo.bar/bla/blubb").unwrap(),
Bytes::from(b"blablubb".as_ref())
);
let mut written = Vec::new();
orig.write(&mut written).unwrap();
let decoded = StoredManifest::read(&mut written.as_slice()).unwrap();
assert_eq!(orig, decoded);
orig.rpki_notify = None;
let mut written = Vec::new();
orig.write(&mut written).unwrap();
let decoded = StoredManifest::read(&mut written.as_slice()).unwrap();
assert_eq!(orig, decoded);
}
#[test]
fn write_read_stored_object() {
let orig = StoredObject::new(
uri::Rsync::from_str("rsync://foo.bar/bla/blubb").unwrap(),
Bytes::from(b"foobar".as_ref()),
None
);
let mut written = Vec::new();
orig.write(&mut written).unwrap();
let decoded = StoredObject::read(
&mut written.as_slice()
).unwrap().unwrap();
assert_eq!(orig, decoded);
}
}