use std::{error, fmt, fs, io, mem};
use std::collections::{HashMap, VecDeque};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use chrono::Duration;
use log::{debug, error, info, warn};
use rpki::uri;
use rpki::ca::publication;
use rpki::ca::idexchange::PublisherHandle;
use rpki::ca::publication::Base64;
use rpki::repository::manifest::Manifest;
use rpki::repository::x509::Time;
use rpki::rrdp::{DeltaInfo, Hash, NotificationFile, SnapshotInfo};
use rpki::xml::decode::Name;
use serde::de;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use uuid::Uuid;
use crate::api::admin::PublishedFile;
use crate::api::pubd::{PublisherManifestStats, PublisherStats};
use crate::commons::file;
use crate::commons::KrillResult;
use crate::commons::error::{Error, KrillIoError};
use crate::constants::{
REPOSITORY_RRDP_ARCHIVE_DIR, REPOSITORY_RRDP_DIR,
RRDP_FIRST_SERIAL,
};
use crate::config::RrdpUpdatesConfig;
const VERSION: &str = "1";
const NS: &str = "http://www.ripe.net/rpki/rrdp";
const SNAPSHOT: Name = Name::unqualified(b"snapshot");
const DELTA: Name = Name::unqualified(b"delta");
const PUBLISH: Name = Name::unqualified(b"publish");
const WITHDRAW: Name = Name::unqualified(b"withdraw");
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RrdpServer {
rrdp_base_uri: uri::Https,
rrdp_base_dir: PathBuf,
rrdp_archive_dir: PathBuf,
session: RrdpSession,
serial: u64,
last_update: Time,
snapshot: SnapshotData,
deltas: VecDeque<DeltaData>,
#[serde(default)]
staged_elements: HashMap<PublisherHandle, StagedElements>,
}
impl RrdpServer {
#[deprecated]
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
rrdp_base_uri: uri::Https,
rrdp_base_dir: PathBuf,
rrdp_archive_dir: PathBuf,
session: RrdpSession,
serial: u64,
last_update: Time,
snapshot: SnapshotData,
deltas: VecDeque<DeltaData>,
staged_elements: HashMap<PublisherHandle, StagedElements>,
) -> Self {
RrdpServer {
rrdp_base_uri,
rrdp_base_dir,
rrdp_archive_dir,
session,
serial,
last_update,
snapshot,
deltas,
staged_elements,
}
}
pub fn create(
rrdp_base_uri: uri::Https,
repo_dir: &Path,
session: RrdpSession,
) -> Self {
let mut rrdp_base_dir = repo_dir.to_path_buf();
rrdp_base_dir.push(REPOSITORY_RRDP_DIR);
let mut rrdp_archive_dir = repo_dir.to_path_buf();
rrdp_archive_dir.push(REPOSITORY_RRDP_ARCHIVE_DIR);
let serial = RRDP_FIRST_SERIAL;
let last_update = Time::now();
let snapshot = SnapshotData::empty();
RrdpServer {
rrdp_base_uri,
rrdp_base_dir,
rrdp_archive_dir,
session,
serial,
last_update,
snapshot,
deltas: VecDeque::new(),
staged_elements: HashMap::new(),
}
}
pub fn clear(&self) {
let _ = fs::remove_dir_all(&self.rrdp_base_dir);
let _ = fs::remove_dir_all(&self.rrdp_archive_dir);
}
pub fn rrdp_base_uri(&self) -> &uri::Https {
&self.rrdp_base_uri
}
pub fn session(&self) -> RrdpSession {
self.session
}
pub fn serial(&self) -> u64 {
self.serial
}
pub fn last_update(&self) -> Time {
self.last_update
}
pub fn snapshot(&self) -> &SnapshotData {
&self.snapshot
}
pub fn resolve_request_path(
&self, path: &str
) -> Option<PathBuf> {
Self::is_request_path_valid(path).map(|_| {
self.rrdp_base_dir.join(path)
})
}
fn is_request_path_valid(path: &str) -> Option<()> {
if path == "notification.xml" {
return Some(())
}
let mut path = path.split('/');
for item in [path.next()?, path.next()?, path.next()?] {
for &ch in item.as_bytes() {
if !ch.is_ascii_alphanumeric() && ch != b'-' {
return None
}
}
}
match path.next()? {
"snapshot.xml" | "delta.xml" => { }
_ => return None,
}
if path.next().is_some() {
return None
}
Some(())
}
pub fn publishers(&self) -> Vec<PublisherHandle> {
let publisher_current_objects =
self.snapshot.publishers_current_objects();
let mut publishers: Vec<_> =
publisher_current_objects.keys().cloned().collect();
for staged_publisher in self.staged_elements.keys() {
if !publisher_current_objects.contains_key(staged_publisher) {
publishers.push(staged_publisher.clone())
}
}
publishers
}
pub fn get_publisher_staged(
&self, publisher: &PublisherHandle
) -> Option<&StagedElements> {
self.staged_elements.get(publisher)
}
pub fn reset_session(&self) -> RrdpSessionReset {
let last_update = Time::now();
let session = RrdpSession::random();
let snapshot = self.snapshot.clone_with_new_random();
RrdpSessionReset {
last_update,
snapshot,
session,
}
}
pub fn apply_session_reset(&mut self, reset: RrdpSessionReset) {
self.snapshot = reset.snapshot;
self.session = reset.session;
self.last_update = reset.last_update;
self.serial = RRDP_FIRST_SERIAL;
self.deltas = VecDeque::new();
}
pub fn apply_publisher_added(&mut self, publisher: PublisherHandle) {
self.snapshot.apply_publisher_added(publisher);
}
pub fn apply_publisher_removed(&mut self, publisher: &PublisherHandle) {
self.snapshot.apply_publisher_removed(publisher);
}
pub fn apply_rrdp_staged(
&mut self,
publisher: PublisherHandle,
elements: DeltaElements,
) {
self.staged_elements.entry(
publisher
).or_default().merge_new_elements(elements);
}
pub fn apply_rrdp_updated(&mut self, update: RrdpUpdated) {
self.serial += 1;
let mut staged_elements = HashMap::default();
mem::swap(&mut self.staged_elements, &mut staged_elements);
let mut rrdp_delta_elements = DeltaElements::default();
for (publisher, staged_elements) in staged_elements {
let delta: DeltaElements = staged_elements.into();
self.snapshot.apply_delta(&publisher, delta.clone());
rrdp_delta_elements.append(delta);
}
let delta = DeltaData::new(
self.serial,
update.time,
update.random,
rrdp_delta_elements,
);
self.deltas.truncate(update.deltas_truncate);
self.deltas.push_front(delta);
self.deltas_truncate_size();
self.last_update = update.time;
}
pub fn update_rrdp_needed(
&self,
rrdp_updates_config: RrdpUpdatesConfig,
) -> RrdpUpdateNeeded {
if self.staged_elements.values().any(|el| !el.0.is_empty()) {
let interval = Duration::seconds(
rrdp_updates_config.rrdp_delta_interval_min_seconds.into(),
);
let next_update_time = self.last_update + interval;
if next_update_time > Time::now() {
debug!(
"RRDP update is delayed to: {}",
next_update_time.to_rfc3339()
);
RrdpUpdateNeeded::Later(next_update_time)
} else {
debug!("RRDP update is needed");
RrdpUpdateNeeded::Yes
}
} else {
debug!("No RRDP update is needed, there are no staged changes");
RrdpUpdateNeeded::No
}
}
pub fn update_rrdp(
&self,
rrdp_updates_config: RrdpUpdatesConfig,
) -> KrillResult<RrdpUpdated> {
let time = Time::now();
let random = RrdpFileRandom::default();
let deltas_truncate =
self.find_deltas_truncate_age(rrdp_updates_config);
Ok(RrdpUpdated {
time,
random,
deltas_truncate,
})
}
fn deltas_truncate_size(&mut self) {
let snapshot_size = self.snapshot().size_approx();
let mut total_deltas_size = 0;
let mut keep = 0;
for delta in &self.deltas {
total_deltas_size += delta.elements().size_approx();
if total_deltas_size > snapshot_size {
break;
} else {
keep += 1;
}
}
self.deltas.truncate(keep);
}
fn find_deltas_truncate_age(
&self,
rrdp_updates_config: RrdpUpdatesConfig,
) -> usize {
let mut keep = 0;
let min_nr = rrdp_updates_config.rrdp_delta_files_min_nr;
let min_secs = rrdp_updates_config.rrdp_delta_files_min_seconds;
let max_nr = rrdp_updates_config.rrdp_delta_files_max_nr;
let max_secs = rrdp_updates_config.rrdp_delta_files_max_seconds;
for delta in &self.deltas {
if keep < min_nr || delta.younger_than_seconds(min_secs.into()) {
keep += 1
} else if keep == max_nr - 1
|| delta.older_than_seconds(max_secs.into())
{
break;
} else {
keep += 1;
}
}
keep
}
pub fn update_rrdp_files(
&self,
rrdp_updates_config: RrdpUpdatesConfig,
) -> Result<(), Error> {
debug!(
"Write updated RRDP state to disk - \n
if there are any updates that is."
);
let old_notification_opt: Option<NotificationFile> = file::read(
&self.notification_path()
).ok().and_then(|bytes| {
rpki::rrdp::NotificationFile::parse(bytes.as_ref()).ok()
});
if
let Some(old_notification) = old_notification_opt.as_ref()
&& old_notification.serial() == self.serial
&& old_notification.session_id() == self.session.uuid()
{
debug!(
"Existing notification file matches current session \
and serial. Nothing to write."
);
return Ok(());
}
let deltas = self.write_delta_files(old_notification_opt)?;
let snapshot = self.write_snapshot_file()?;
self.write_notification_file(snapshot, deltas)?;
self.cleanup_old_rrdp_files(rrdp_updates_config)
}
fn write_delta_files(
&self,
old_notification_opt: Option<NotificationFile>,
) -> KrillResult<Vec<DeltaInfo>> {
let mut deltas_from_old_notification = match old_notification_opt {
None => {
debug!("No old notification file found");
vec![]
}
Some(mut old_notification) => {
if old_notification.session_id()
== self.session.uuid()
{
if old_notification.sort_and_verify_deltas(None) {
debug!(
"Found existing notification file for \
current session with deltas."
);
old_notification.deltas().to_vec()
}
else {
debug!(
"Found existing notification file with \
incomplete deltas, will regenerate files."
);
vec![]
}
}
else {
debug!(
"Existing notification file was for different \
session, will regenerate all files."
);
vec![]
}
}
};
if let Some(last) = self.deltas.back() {
deltas_from_old_notification .retain(|delta| {
delta.serial() >= last.serial()
});
}
else if !deltas_from_old_notification.is_empty() {
deltas_from_old_notification = vec![];
}
let last_written_serial = deltas_from_old_notification.last();
let mut deltas = vec![];
for delta in &self.deltas {
if
let Some(last) = last_written_serial
&& delta.serial() <= last.serial()
{
debug!(
"Skip writing delta for serial {}. \
File should exist.",
delta.serial()
);
continue;
}
let path = delta.path(
self.session, delta.serial(), &self.rrdp_base_dir
);
let uri = delta.uri(
self.session, delta.serial(), &self.rrdp_base_uri
);
let xml_bytes = delta.xml(self.session, delta.serial());
let hash = Hash::from_data(xml_bytes.as_slice());
debug!("Write delta file to: {}", path.to_string_lossy());
file::save(&xml_bytes, &path)?;
deltas.push(DeltaInfo::new(delta.serial(), uri, hash));
}
deltas_from_old_notification.reverse();
deltas.append(&mut deltas_from_old_notification);
Ok(deltas)
}
fn write_snapshot_file(&self) -> KrillResult<SnapshotInfo> {
let path = self.snapshot().path(
self.session, self.serial, &self.rrdp_base_dir,
);
let uri = self.snapshot().uri(
self.session, self.serial, &self.rrdp_base_uri,
);
let xml_bytes = self.snapshot().xml(self.session, self.serial);
let hash = Hash::from_data(&xml_bytes);
debug!("Write snapshot file to: {}", path.to_string_lossy());
file::save(&xml_bytes, &path)?;
Ok(SnapshotInfo::new(uri, hash))
}
fn write_notification_file(
&self, snapshot: SnapshotInfo, deltas: Vec<DeltaInfo>,
) -> KrillResult<()> {
let notification = NotificationFile::new(
self.session.uuid(), self.serial, snapshot, deltas,
);
let notification_path_new = self.notification_path_new();
let mut notification_file_new =
file::create_file_with_path(¬ification_path_new)?;
notification
.write_xml(&mut notification_file_new)
.map_err(|e| {
KrillIoError::new(
format!(
"could not write new notification file to {}",
notification_path_new.to_string_lossy()
),
e,
)
})?;
let notification_path = self.notification_path();
fs::rename(¬ification_path_new, ¬ification_path).map_err(
|e| {
KrillIoError::new(
format!(
"Could not rename notification file from '{}' to '{}'",
notification_path_new.to_string_lossy(),
notification_path.to_string_lossy()
),
e,
)
},
)?;
Ok(())
}
fn cleanup_old_rrdp_files(
&self,
rrdp_updates_config: RrdpUpdatesConfig,
) -> KrillResult<()> {
for entry in fs::read_dir(&self.rrdp_base_dir).map_err(|e| {
KrillIoError::new(
format!(
"Could not read RRDP base directory '{}'",
self.rrdp_base_dir.to_string_lossy()
),
e,
)
})? {
let entry = entry.map_err(|e| {
KrillIoError::new(
format!(
"Could not read entry in RRDP base directory '{}'",
self.rrdp_base_dir.to_string_lossy()
),
e,
)
})?;
if self.session.to_string() == entry.file_name().to_string_lossy()
{
continue;
} else {
let path = entry.path();
if path.is_dir() {
let _best_effort_rm = fs::remove_dir_all(path);
}
}
}
let session_dir = self.rrdp_base_dir.join(self.session.to_string());
let lowest_delta =
self.deltas.back().map(|delta| delta.serial()).unwrap_or(0);
let highest_delta =
self.deltas.front().map(|delta| delta.serial()).unwrap_or(0);
for entry in fs::read_dir(&session_dir).map_err(|e| {
KrillIoError::new(
format!(
"Could not read RRDP session directory '{}'",
session_dir.to_string_lossy()
),
e,
)
})? {
let entry = entry.map_err(|e| {
KrillIoError::new(
format!(
"Could not read entry in RRDP session directory '{}'",
session_dir.to_string_lossy()
),
e,
)
})?;
let path = entry.path();
if let Ok(serial) =
u64::from_str(entry.file_name().to_string_lossy().as_ref())
{
if serial == self.serial {
continue;
} else if serial < lowest_delta || serial > highest_delta {
if rrdp_updates_config.rrdp_files_archive {
let mut dest = self.rrdp_archive_dir.clone();
dest.push(self.session.to_string());
dest.push(format!("{serial}"));
info!(
"Archiving RRDP serial '{}' to '{}",
serial,
dest.to_string_lossy()
);
let _ = fs::create_dir_all(&dest);
let _ = fs::rename(path, dest);
} else if path.is_dir() {
let _best_effort_rm = fs::remove_dir_all(path);
} else {
let _best_effort_rm = fs::remove_file(path);
}
} else if !rrdp_updates_config.rrdp_files_archive {
if
let Ok(Some(snapshot_file_to_remove)) =
Self::session_dir_snapshot(&session_dir, serial)
&& let Err(e) =
fs::remove_file(&snapshot_file_to_remove)
{
warn!(
"Could not delete snapshot file '{}'. \
Error was: {}",
snapshot_file_to_remove.to_string_lossy(),
e
);
}
} else {
}
} else {
warn!(
"Found unexpected file or dir in RRDP repository - \
will try to remove: {}",
path.to_string_lossy()
);
if path.is_dir() {
let _best_effort_rm = fs::remove_dir_all(path);
} else {
let _best_effort_rm = fs::remove_file(path);
}
}
}
Ok(())
}
}
impl RrdpServer {
fn notification_path_new(&self) -> PathBuf {
let mut path = self.rrdp_base_dir.clone();
path.push("new-notification.xml");
path
}
fn notification_path(&self) -> PathBuf {
let mut path = self.rrdp_base_dir.clone();
path.push("notification.xml");
path
}
pub fn session_dir_snapshot(
session_path: &Path,
serial: u64,
) -> KrillResult<Option<PathBuf>> {
Self::find_in_serial_dir(session_path, serial, "snapshot.xml")
}
pub fn find_in_serial_dir(
session_path: &Path,
serial: u64,
filename: &str,
) -> KrillResult<Option<PathBuf>> {
let serial_dir = session_path.join(serial.to_string());
if let Ok(randoms) = fs::read_dir(&serial_dir) {
for entry in randoms {
let entry = entry.map_err(|e| {
Error::io_error_with_context(
format!(
"Could not open directory entry under RRDP directory {}",
serial_dir.to_string_lossy()
),
e,
)
})?;
if let Ok(files) = fs::read_dir(entry.path()) {
for file in files {
let file = file.map_err(|e| {
Error::io_error_with_context(
format!(
"Could not open directory entry under RRDP directory {}",
entry.path().to_string_lossy()
),
e,
)
})?;
if file.file_name().to_string_lossy() == filename {
return Ok(Some(file.path()));
}
}
}
}
}
Ok(None)
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum RrdpUpdateNeeded {
Yes,
No,
Later(Time),
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct RrdpSessionReset {
pub last_update: Time,
pub session: RrdpSession,
pub snapshot: SnapshotData,
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct RrdpUpdated {
pub time: Time,
pub random: RrdpFileRandom,
pub deltas_truncate: usize,
}
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct RrdpSession(Uuid);
impl RrdpSession {
pub fn random() -> Self {
Self::default()
}
pub fn from_uuid(uuid: Uuid) -> Self {
Self(uuid)
}
pub fn uuid(&self) -> Uuid {
self.0
}
}
impl Default for RrdpSession {
fn default() -> Self {
RrdpSession(Uuid::new_v4())
}
}
impl fmt::Display for RrdpSession {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0.hyphenated())
}
}
impl<'de> Deserialize<'de> for RrdpSession {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let string = String::deserialize(deserializer)?;
let uuid = Uuid::parse_str(&string).map_err(de::Error::custom)?;
Ok(RrdpSession(uuid))
}
}
impl Serialize for RrdpSession {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.to_string().serialize(serializer)
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct SnapshotData {
random: RrdpFileRandom,
publishers_current_objects: HashMap<PublisherHandle, CurrentObjects>,
}
impl SnapshotData {
pub fn new(
random: RrdpFileRandom,
publishers_current_objects: HashMap<PublisherHandle, CurrentObjects>,
) -> Self {
SnapshotData {
random,
publishers_current_objects,
}
}
pub fn empty() -> Self {
SnapshotData::new(RrdpFileRandom::default(), HashMap::default())
}
pub fn clone_with_new_random(&self) -> Self {
SnapshotData::new(
RrdpFileRandom::default(),
self.publishers_current_objects.clone(),
)
}
pub fn publishers_current_objects(
&self,
) -> &HashMap<PublisherHandle, CurrentObjects> {
&self.publishers_current_objects
}
pub fn set_random(&mut self, random: RrdpFileRandom) {
self.random = random;
}
pub fn size_approx(&self) -> usize {
self.publishers_current_objects.values().fold(
0, |tot, objects| tot + objects.size_approx()
)
}
pub fn get_publisher_objects(
&self,
publisher: &PublisherHandle,
) -> Option<&CurrentObjects> {
self.publishers_current_objects.get(publisher)
}
pub fn apply_delta(
&mut self,
publisher: &PublisherHandle,
delta: DeltaElements,
) {
if let Some(objects)
= self.publishers_current_objects.get_mut(publisher)
{
objects.apply_delta(delta);
if objects.is_empty() {
self.publishers_current_objects.remove(publisher);
}
}
else {
let mut objects = CurrentObjects::default();
objects.apply_delta(delta);
self.publishers_current_objects.insert(
publisher.clone(), objects
);
}
}
pub fn apply_publisher_added(&mut self, publisher: PublisherHandle) {
self.publishers_current_objects.entry(publisher).or_default();
}
pub fn apply_publisher_removed(&mut self, publisher: &PublisherHandle) {
self.publishers_current_objects.remove(publisher);
}
fn rel_path(&self, session: RrdpSession, serial: u64) -> String {
format!("{}/{}/{}/snapshot.xml", session, serial, self.random.0)
}
pub fn uri(
&self,
session: RrdpSession,
serial: u64,
rrdp_base_uri: &uri::Https,
) -> uri::Https {
rrdp_base_uri.join(self.rel_path(session, serial).as_ref()).unwrap()
}
pub fn path(
&self,
session: RrdpSession,
serial: u64,
base_path: &Path,
) -> PathBuf {
base_path.join(self.rel_path(session, serial))
}
pub fn write_xml(
&self,
session: RrdpSession,
serial: u64,
path: &Path,
) -> Result<(), KrillIoError> {
debug!("Writing snapshot file: {}", path.to_string_lossy());
let mut f = file::create_file_with_path(path)?;
self.write_xml_to_writer(session, serial, &mut f).map_err(|e| {
KrillIoError::new(
format!(
"cannot write snapshot xml to: {}",
path.to_string_lossy()
),
e,
)
})?;
debug!("Finished snapshot xml");
Ok(())
}
pub fn xml(&self, session: RrdpSession, serial: u64) -> Vec<u8> {
let mut res = vec![];
self.write_xml_to_writer(session, serial, &mut res).unwrap();
res
}
fn write_xml_to_writer(
&self,
session: RrdpSession,
serial: u64,
writer: &mut impl io::Write,
) -> Result<(), io::Error> {
let mut writer = rpki::xml::encode::Writer::new(writer);
writer
.element(SNAPSHOT)?
.attr("xmlns", NS)?
.attr("version", VERSION)?
.attr("session_id", &session)?
.attr("serial", &serial)?
.content(|content| {
for publisher_objects in
self.publishers_current_objects.values()
{
for (uri, base64) in publisher_objects.iter() {
content
.element(PUBLISH)?
.attr("uri", uri.as_str())?
.content(|content| {
content.raw(base64.as_str())
})?;
}
}
Ok(())
})?;
writer.done()
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct CurrentObjects(HashMap<CurrentObjectUri, Base64>);
impl CurrentObjects {
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn size_approx(&self) -> usize {
self.0.values().fold(0, |tot, el| tot + el.size_approx())
}
pub fn extend(&mut self, other: Self) {
self.0.extend(other.0)
}
pub fn diff(&self, other: &Self) -> KrillResult<DeltaElements> {
let mut publishes = vec![];
let mut updates = vec![];
let mut withdraws = vec![];
for (uri_key, base64) in &other.0 {
match self.0.get(uri_key) {
None => {
publishes.push(PublishElement {
uri: uri_key.try_into()?,
base64: base64.clone(),
});
}
Some(existing_b64) => {
if base64 != existing_b64 {
updates.push(UpdateElement {
uri: uri_key.try_into()?,
hash: existing_b64.to_hash(),
base64: base64.clone()
});
}
}
}
}
for (uri_key, base64) in &self.0 {
if !other.0.contains_key(uri_key) {
let wdr = WithdrawElement {
uri: uri_key.try_into()?,
hash: base64.to_hash(),
};
withdraws.push(wdr);
}
}
Ok(DeltaElements::new(publishes, updates, withdraws))
}
pub fn iter(&self) -> impl Iterator<Item = (&CurrentObjectUri, &Base64)> {
self.0.iter()
}
pub fn try_into_published_files(
self,
) -> KrillResult<Vec<PublishedFile>> {
let mut elements = Vec::new();
for (uri_key, base64) in self.0.into_iter() {
elements.push(PublishedFile { uri: uri_key.try_into()?, base64 });
}
Ok(elements)
}
pub fn try_to_withdraw_elements(
&self
) -> KrillResult<Vec<WithdrawElement>> {
let mut elements = Vec::new();
for (uri_key, base64) in self.0.iter() {
elements.push(WithdrawElement {
uri: uri_key.try_into()?,
hash: base64.to_hash(),
});
}
Ok(elements)
}
pub fn verify_delta_applies(
&self,
delta: &DeltaElements,
jail: &uri::Rsync,
) -> Result<(), PublicationDeltaError> {
for p in delta.publishes() {
if !jail.is_parent_of(&p.uri) {
return Err(PublicationDeltaError::outside(jail, &p.uri));
}
if self.0.contains_key(&CurrentObjectUri::from(&p.uri)) {
return Err(PublicationDeltaError::present(&p.uri));
}
}
for u in delta.updates() {
if !jail.is_parent_of(&u.uri) {
return Err(PublicationDeltaError::outside(jail, &u.uri));
}
if !self.contains(u.hash, &u.uri) {
return Err(PublicationDeltaError::no_match(&u.uri));
}
}
for w in delta.withdraws() {
if !jail.is_parent_of(&w.uri) {
return Err(PublicationDeltaError::outside(jail, &w.uri));
}
if !self.contains(w.hash, &w.uri) {
return Err(PublicationDeltaError::no_match(&w.uri));
}
}
Ok(())
}
fn contains(&self, hash: Hash, uri: &uri::Rsync) -> bool {
match self.0.get(&CurrentObjectUri::from(uri)) {
Some(base64) => base64.to_hash() == hash,
None => false,
}
}
pub fn apply_delta(&mut self, delta: DeltaElements) {
let (publishes, updates, withdraws) = delta.unpack();
for p in publishes {
self.0.insert(CurrentObjectUri::from(p.uri), p.base64);
}
for u in updates {
self.0.insert(CurrentObjectUri::from(u.uri), u.base64);
}
for w in withdraws {
self.0.remove(&CurrentObjectUri::from(w.uri));
}
}
pub fn get_matching_withdraws(
&self,
match_uri: &uri::Rsync,
) -> KrillResult<Vec<WithdrawElement>> {
let match_uri = CurrentObjectUri::from(match_uri);
let mut withdraws = Vec::new();
for (uri_key, base64) in &self.0 {
if uri_key == &match_uri
|| (match_uri.as_str().ends_with('/')
&& uri_key.as_str().starts_with(match_uri.as_str()))
{
withdraws.push(WithdrawElement {
uri: uri_key.try_into()?,
hash: base64.to_hash(),
});
}
}
Ok(withdraws)
}
pub fn get_list_reply(&self) -> KrillResult<publication::ListReply> {
let mut elements = Vec::new();
for (key, base64) in &self.0 {
elements.push(publication::ListElement::new(
key.try_into()?, base64.to_hash()
));
}
Ok(publication::ListReply::new(elements))
}
pub fn get_stats(&self) -> PublisherStats {
let mut manifests = vec![];
for (uri_key, base64) in self.iter() {
if
uri_key.as_str().ends_with("mft")
&& let Ok(mft) = Manifest::decode(
base64.to_bytes().as_ref(), false
) && let Ok(stats) = PublisherManifestStats::try_from(&mft)
{
manifests.push(stats)
}
}
PublisherStats {
objects: self.len(),
size: self.size_approx(),
manifests,
}
}
}
impl<K> FromIterator<(K, Base64)> for CurrentObjects
where K: Into<CurrentObjectUri> {
fn from_iter<T: IntoIterator<Item = (K, Base64)>>(
iter: T
) -> Self {
Self(iter.into_iter().map(|(k, v)| (k.into(), v)).collect())
}
}
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct CurrentObjectUri(Arc<str>);
impl CurrentObjectUri {
pub fn as_str(&self) -> &str {
&self.0
}
}
impl From<&uri::Rsync> for CurrentObjectUri {
fn from(value: &uri::Rsync) -> Self {
CurrentObjectUri(
format!("{}{}", value.canonical_module(), value.path()).into()
)
}
}
impl From<uri::Rsync> for CurrentObjectUri {
fn from(value: uri::Rsync) -> Self {
Self::from(&value)
}
}
impl TryFrom<&CurrentObjectUri> for uri::Rsync {
type Error = Error;
fn try_from(key: &CurrentObjectUri) -> Result<Self, Self::Error> {
uri::Rsync::from_slice(key.0.as_bytes()).map_err(|e| {
Error::Custom(format!(
"Found invalid object uri: {}. Error: {}",
key.0, e
))
})
}
}
impl TryFrom<CurrentObjectUri> for uri::Rsync {
type Error = Error;
fn try_from(key: CurrentObjectUri) -> Result<Self, Self::Error> {
uri::Rsync::try_from(&key)
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct RrdpFileRandom(String);
impl Default for RrdpFileRandom {
fn default() -> Self {
let mut bytes = [0; 8];
openssl::rand::rand_bytes(&mut bytes).unwrap();
let s = hex::encode(bytes);
RrdpFileRandom(s)
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct DeltaData {
random: RrdpFileRandom,
serial: u64,
time: Time,
elements: DeltaElements,
}
impl DeltaData {
pub fn new(
serial: u64,
time: Time,
random: RrdpFileRandom,
elements: DeltaElements,
) -> Self {
DeltaData {
serial,
random,
time,
elements,
}
}
pub fn serial(&self) -> u64 {
self.serial
}
pub fn random(&self) -> &RrdpFileRandom {
&self.random
}
pub fn older_than_seconds(&self, seconds: i64) -> bool {
let then = Time::now() - Duration::seconds(seconds);
self.time < then
}
pub fn younger_than_seconds(&self, seconds: i64) -> bool {
let then = Time::now() - Duration::seconds(seconds);
self.time > then
}
pub fn elements(&self) -> &DeltaElements {
&self.elements
}
fn rel_path(&self, session: RrdpSession, serial: u64) -> String {
format!("{}/{}/{}/delta.xml", session, serial, self.random.0)
}
pub fn uri(
&self,
session: RrdpSession,
serial: u64,
rrdp_base_uri: &uri::Https,
) -> uri::Https {
rrdp_base_uri
.join(self.rel_path(session, serial).as_ref())
.unwrap()
}
pub fn path(
&self,
session: RrdpSession,
serial: u64,
base_path: &Path,
) -> PathBuf {
base_path.join(self.rel_path(session, serial))
}
pub fn xml(&self, session: RrdpSession, serial: u64) -> Vec<u8> {
let mut res = vec![];
self.write_xml_to_writer(session, serial, &mut res).unwrap();
res
}
fn write_xml_to_writer(
&self,
session: RrdpSession,
serial: u64,
writer: &mut impl io::Write,
) -> Result<(), io::Error> {
let mut writer = rpki::xml::encode::Writer::new(writer);
writer
.element(DELTA)?
.attr("xmlns", NS)?
.attr("version", VERSION)?
.attr("session_id", &session)?
.attr("serial", &serial)?
.content(|content| {
for el in self.elements().publishes() {
content
.element(PUBLISH.into_unqualified())?
.attr("uri", &el.uri)?
.content(|content| {
content.raw(el.base64.as_str())
})?;
}
for el in self.elements().updates() {
content
.element(PUBLISH.into_unqualified())?
.attr("uri", &el.uri)?
.attr("hash", &el.hash)?
.content(|content| {
content.raw(el.base64.as_str())
})?;
}
for el in self.elements().withdraws() {
content
.element(WITHDRAW.into_unqualified())?
.attr("uri", &el.uri)?
.attr("hash", &el.hash)?;
}
Ok(())
})?;
writer.done()
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct DeltaElements {
publishes: Vec<PublishElement>,
updates: Vec<UpdateElement>,
withdraws: Vec<WithdrawElement>,
}
impl DeltaElements {
pub fn new(
publishes: Vec<PublishElement>,
updates: Vec<UpdateElement>,
withdraws: Vec<WithdrawElement>,
) -> Self {
DeltaElements {
publishes,
updates,
withdraws,
}
}
pub fn unpack(
self
) -> (
Vec<PublishElement>,
Vec<UpdateElement>,
Vec<WithdrawElement>,
) {
(self.publishes, self.updates, self.withdraws)
}
pub fn len(&self) -> usize {
self.publishes.len() + self.updates.len() + self.withdraws.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn size_approx(&self) -> usize {
let sum_publishes = self
.publishes
.iter()
.fold(0, |sum, p| sum + p.base64.size_approx());
let sum_updates =
self.updates.iter().fold(0, |sum, u| sum + u.base64.size_approx());
sum_publishes + sum_updates
}
pub fn append(&mut self, mut other: Self) {
self.publishes.append(&mut other.publishes);
self.updates.append(&mut other.updates);
self.withdraws.append(&mut other.withdraws);
}
pub fn publishes(&self) -> &[PublishElement] {
&self.publishes
}
pub fn updates(&self) -> &[UpdateElement] {
&self.updates
}
pub fn withdraws(&self) -> &[WithdrawElement] {
&self.withdraws
}
}
impl From<publication::PublishDelta> for DeltaElements {
fn from(d: publication::PublishDelta) -> Self {
let mut publishes = vec![];
let mut updates = vec![];
let mut withdraws = vec![];
for el in d.into_elements() {
match el {
publication::PublishDeltaElement::Publish(p) => {
publishes.push(p.into())
}
publication::PublishDeltaElement::Update(u) => {
updates.push(u.into())
}
publication::PublishDeltaElement::Withdraw(w) => {
withdraws.push(w.into())
}
}
}
DeltaElements {
publishes,
updates,
withdraws,
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct StagedElements(HashMap<uri::Rsync, DeltaElement>);
impl StagedElements {
fn merge_new_elements(&mut self, elements: DeltaElements) {
let (publishes, updates, withdraws) = elements.unpack();
let general_merge_message = "Non-critical publish merge conflict resolved. Please contact rpki-team@nlnetlabs.nl if this happens more frequently.";
for pbl in publishes {
let uri = pbl.uri.clone();
match self.0.get_mut(&uri) {
Some(DeltaElement::Publish(staged_publish)) => {
error!(
"{} Received new publish element for {} with content hash {} while another publish element with content hash {} was already staged. Expected new *update* element instead. Will use new publish.",
general_merge_message,
uri,
pbl.base64.to_hash(),
staged_publish.base64.to_hash()
);
self.0.insert(uri, DeltaElement::Publish(pbl));
}
Some(DeltaElement::Update(staged_update)) => {
error!(
"{} Received new publish element for {} with content hash {} while an *update* with content hash {} was already staged. Expected new *update* element instead. Will merge content of publish into staged update.",
general_merge_message,
uri,
pbl.base64.to_hash(),
staged_update.base64.to_hash()
);
staged_update.base64 = pbl.base64;
}
Some(DeltaElement::Withdraw(staged_withdraw)) => {
let hash = staged_withdraw.hash;
let update = UpdateElement {
uri: uri.clone(), hash, base64: pbl.base64
};
self.0.insert(uri, DeltaElement::Update(update));
}
None => {
self.0.insert(uri, DeltaElement::Publish(pbl));
}
}
}
for mut upd in updates {
let uri = upd.uri.clone();
match self.0.get_mut(&uri) {
Some(DeltaElement::Publish(staged_publish)) => {
staged_publish.base64 = upd.base64;
}
Some(DeltaElement::Update(staged_update)) => {
staged_update.base64 = upd.base64;
}
Some(DeltaElement::Withdraw(staged_withdraw)) => {
error!(
"{} Received new update element for {} with content hash {} and replacing object hash {}, while a *withdraw* for content hash {} was already staged. Expected new *update* element instead. Will stage the update instead of withdraw.",
general_merge_message,
uri,
upd.base64.to_hash(),
upd.hash,
staged_withdraw.hash,
);
upd.hash = staged_withdraw.hash;
self.0.insert(uri, DeltaElement::Update(upd));
}
None => {
self.0.insert(uri, DeltaElement::Update(upd));
}
}
}
for mut wdr in withdraws {
let uri = wdr.uri.clone();
match self.0.get(&uri) {
Some(DeltaElement::Publish(_)) => {
self.0.remove(&uri);
}
Some(DeltaElement::Update(staged_update)) => {
wdr.hash = staged_update.hash;
self.0.insert(uri, DeltaElement::Withdraw(wdr));
}
Some(DeltaElement::Withdraw(staged_wdr)) => {
error!("{} We received a withdraw for an object that was already withdrawn.\nExisting withdraw: {} {}\nReceived withdraw: {} {}\n", general_merge_message, staged_wdr.hash, staged_wdr.uri, wdr.hash, wdr.uri)
}
None => {
self.0.insert(uri, DeltaElement::Withdraw(wdr));
}
}
}
}
}
impl From<StagedElements> for DeltaElements {
fn from(staged: StagedElements) -> Self {
let mut publishes = vec![];
let mut updates = vec![];
let mut withdraws = vec![];
for el in staged.0.into_values() {
match el {
DeltaElement::Publish(publish) => publishes.push(publish),
DeltaElement::Update(update) => updates.push(update),
DeltaElement::Withdraw(withdraw) => withdraws.push(withdraw),
}
}
DeltaElements::new(publishes, updates, withdraws)
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct PublishElement {
pub uri: uri::Rsync,
pub base64: Base64,
}
impl From<publication::Publish> for PublishElement {
fn from(p: publication::Publish) -> Self {
let (_tag, uri, base64) = p.unpack();
PublishElement { base64, uri }
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct UpdateElement {
pub uri: uri::Rsync,
pub hash: Hash,
pub base64: Base64,
}
impl UpdateElement {
pub fn into_publish(self) -> PublishElement {
PublishElement {
uri: self.uri,
base64: self.base64,
}
}
}
impl From<publication::Update> for UpdateElement {
fn from(u: publication::Update) -> Self {
let (_tag, uri, base64, hash) = u.unpack();
UpdateElement { uri, hash, base64 }
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct WithdrawElement {
pub uri: uri::Rsync,
pub hash: Hash,
}
impl From<publication::Withdraw> for WithdrawElement {
fn from(w: publication::Withdraw) -> Self {
let (_tag, uri, hash) = w.unpack();
WithdrawElement { uri, hash }
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub enum DeltaElement {
Publish(PublishElement),
Update(UpdateElement),
Withdraw(WithdrawElement),
}
#[derive(Clone, Debug)]
pub enum PublicationDeltaError {
UriOutsideJail(uri::Rsync, uri::Rsync),
ObjectAlreadyPresent(uri::Rsync),
NoObjectForHashAndOrUri(uri::Rsync),
}
impl PublicationDeltaError {
fn outside(jail: &uri::Rsync, uri: &uri::Rsync) -> Self {
PublicationDeltaError::UriOutsideJail(uri.clone(), jail.clone())
}
fn present(uri: &uri::Rsync) -> Self {
PublicationDeltaError::ObjectAlreadyPresent(uri.clone())
}
fn no_match(uri: &uri::Rsync) -> Self {
PublicationDeltaError::NoObjectForHashAndOrUri(uri.clone())
}
}
impl fmt::Display for PublicationDeltaError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
PublicationDeltaError::UriOutsideJail(uri, jail) => {
write!(f,
"Publishing '{uri}' outside of jail URI '{jail}'"
)
}
PublicationDeltaError::ObjectAlreadyPresent(uri) => {
write!(f,
"File already exists for uri (use update!): {uri}"
)
}
PublicationDeltaError::NoObjectForHashAndOrUri(uri) => {
write!(f, "File does not match hash at uri: {uri}")
}
}
}
}
impl error::Error for PublicationDeltaError { }
#[cfg(test)]
mod test {
use crate::commons::test::*;
use super::*;
#[test]
fn current_objects_delta() {
let jail = rsync("rsync://example.krill.cloud/repo/publisher");
let file1_uri =
rsync("rsync://example.krill.cloud/repo/publisher/file1.txt");
let file1_content = Base64::from_content(&[1]);
let file1_content_2 = Base64::from_content(&[1, 2]);
let file2_content = Base64::from_content(&[2]);
let mut objects = CurrentObjects::default();
let publish_file1 = DeltaElements {
publishes: vec![PublishElement {
uri: file1_uri.clone(),
base64: file1_content.clone(),
}],
updates: vec![],
withdraws: vec![],
};
assert!(objects.verify_delta_applies(&publish_file1, &jail).is_ok());
objects.apply_delta(publish_file1.clone());
assert!(objects.verify_delta_applies(&publish_file1, &jail).is_err());
let publish_file2 = DeltaElements {
publishes: vec![PublishElement {
uri: file1_uri.clone(),
base64: file2_content,
}],
updates: vec![],
withdraws: vec![],
};
assert!(objects.verify_delta_applies(&publish_file2, &jail).is_err());
let update_file1 = DeltaElements {
publishes: vec![],
updates: vec![UpdateElement {
uri: file1_uri.clone(),
hash: file1_content.to_hash(),
base64: file1_content_2.clone(),
}],
withdraws: vec![],
};
assert!(objects.verify_delta_applies(&update_file1, &jail).is_ok());
objects.apply_delta(update_file1.clone());
assert!(objects.verify_delta_applies(&update_file1, &jail).is_err());
let withdraw_file1 = DeltaElements {
publishes: vec![],
updates: vec![],
withdraws: vec![WithdrawElement {
uri: file1_uri.clone(),
hash: file1_content.to_hash(),
}],
};
assert!(
objects.verify_delta_applies(&withdraw_file1, &jail).is_err()
);
let withdraw_file1_updated = DeltaElements {
publishes: vec![],
updates: vec![],
withdraws: vec![WithdrawElement {
uri: file1_uri,
hash: file1_content_2.to_hash(),
}],
};
assert!(
objects.verify_delta_applies(
&withdraw_file1_updated, &jail
).is_ok()
);
}
#[test]
fn current_objects_deltas() {
fn file_rsync_uri(name: &str) -> uri::Rsync {
let jail = rsync("rsync://example.krill.cloud/repo/publisher");
jail.join(name.as_bytes()).unwrap()
}
fn file_uri(name: &str) -> CurrentObjectUri {
CurrentObjectUri(
format!(
"rsync://example.krill.cloud/repo/publisher/{name}"
)
.into(),
)
}
fn random_content() -> Base64 {
let mut bytes = [0; 8];
openssl::rand::rand_bytes(&mut bytes).unwrap();
Base64::from_content(&bytes)
}
pub fn equivalent(
mut this: DeltaElements, mut other: DeltaElements
) -> bool {
this.publishes
.sort_by(|a, b| a.uri.as_str().cmp(b.uri.as_str()));
other.publishes
.sort_by(|a, b| a.uri.as_str().cmp(b.uri.as_str()));
this.updates
.sort_by(|a, b| a.uri.as_str().cmp(b.uri.as_str()));
other.updates
.sort_by(|a, b| a.uri.as_str().cmp(b.uri.as_str()));
this.withdraws
.sort_by(|a, b| a.uri.as_str().cmp(b.uri.as_str()));
other.withdraws
.sort_by(|a, b| a.uri.as_str().cmp(b.uri.as_str()));
this.publishes == other.publishes
&& this.updates == other.updates
&& this.withdraws == other.withdraws
}
let mut objects: HashMap<CurrentObjectUri, Base64> = HashMap::new();
objects.insert(file_uri("file1"), random_content());
objects.insert(file_uri("file2"), random_content());
objects.insert(file_uri("file3"), random_content());
objects.insert(file_uri("file4"), random_content());
let publishes = vec![
PublishElement {
uri: file_rsync_uri("file5"), base64: random_content()
},
PublishElement {
uri: file_rsync_uri("file6"), base64: random_content(),
},
];
let updates = vec![
UpdateElement {
uri: file_rsync_uri("file1"),
hash: objects.get(&file_uri("file1")).unwrap().to_hash(),
base64: random_content(),
},
UpdateElement {
uri: file_rsync_uri("file2"),
hash: objects.get(&file_uri("file2")).unwrap().to_hash(),
base64: random_content(),
},
];
let withdraws = vec![WithdrawElement {
uri: file_rsync_uri("file3"),
hash: objects.get(&file_uri("file3")).unwrap().to_hash(),
}];
let delta_a_b = DeltaElements::new(publishes, updates, withdraws);
let objects_a = CurrentObjects(objects);
let mut objects_b = objects_a.clone();
objects_b.apply_delta(delta_a_b.clone());
let derived_delta_a_b = objects_a.diff(&objects_b).unwrap();
assert!(equivalent(delta_a_b, derived_delta_a_b));
let derived_delta_b_a = objects_b.diff(&objects_a).unwrap();
let mut objects_a_from_b = objects_b.clone();
objects_a_from_b.apply_delta(derived_delta_b_a);
assert_eq!(objects_a, objects_a_from_b);
}
}