use std::path::PathBuf;
use std::sync::Arc;
use bytes::Bytes;
use log::{debug, info};
use rpki::ca::publication;
use rpki::ca::idexchange::{
PublisherHandle, PublisherRequest, RepositoryResponse
};
use rpki::ca::publication::{ListReply, PublishDelta};
use rpki::repository::x509::Time;
use crate::api::admin::{
PublicationServerUris, PublisherDetails, RepoFileDeleteCriteria,
};
use crate::api::pubd::RepoStats;
use crate::commons::KrillResult;
use crate::commons::actor::Actor;
use crate::commons::cmslogger::CmsLogger;
use crate::commons::crypto::KrillSigner;
use crate::commons::error::Error;
use crate::config::Config;
use crate::server::mq::{now, Task, TaskQueue};
use super::access::RepositoryAccessProxy;
use super::content::RepositoryContentProxy;
use super::rrdp::RrdpUpdateNeeded;
pub struct RepositoryManager {
access: Arc<RepositoryAccessProxy>,
content: Arc<RepositoryContentProxy>,
tasks: Arc<TaskQueue>,
config: Arc<Config>,
signer: Arc<KrillSigner>,
}
impl RepositoryManager {
pub fn build(
config: Arc<Config>,
tasks: Arc<TaskQueue>,
signer: Arc<KrillSigner>,
) -> Result<Self, Error> {
let access_proxy = Arc::new(RepositoryAccessProxy::create(&config)?);
let content_proxy = Arc::new(
RepositoryContentProxy::create(&config)?
);
Ok(RepositoryManager {
access: access_proxy,
content: content_proxy,
tasks,
config,
signer,
})
}
pub fn is_initialized(&self) -> KrillResult<bool> {
self.access.is_initialized()
}
pub fn init(&self, uris: PublicationServerUris) -> KrillResult<()> {
info!("Initializing repository");
self.access.init(uris.clone(), self.signer.clone())?;
self.content.init(self.config.repo_dir(), uris)?;
self.content
.write_repository(self.config.rrdp_updates_config)?;
Ok(())
}
pub fn repository_clear(&self) -> KrillResult<()> {
self.access.clear()?;
self.content.clear()
}
pub fn publishers(&self) -> KrillResult<Vec<PublisherHandle>> {
self.access.publishers()
}
pub fn resolve_rrdp_request_path(
&self, path: &str
) -> KrillResult<Option<PathBuf>> {
self.content.resolve_rrdp_request_path(path)
}
}
impl RepositoryManager {
pub fn rfc8181(
&self,
publisher_handle: PublisherHandle,
msg_bytes: Bytes,
) -> KrillResult<Bytes> {
let cms_logger = CmsLogger::for_rfc8181_rcvd(
self.config.rfc8181_log_dir.as_ref(),
&publisher_handle,
);
let cms = self.access.decode_and_validate(
&publisher_handle, &msg_bytes
).map_err(|e| {
Error::Custom(format!(
"Issue with publication request by publisher '{publisher_handle}': {e}"
))
})?;
let message = cms.into_message();
let query = message.as_query()?;
let is_list_query = query == publication::Query::List;
let response_result = self.rfc8181_message(&publisher_handle, query);
let should_log_cms = response_result.is_err() || !is_list_query;
let response = match response_result {
Ok(response) => response,
Err(e) => {
let error_code = e.to_rfc8181_error_code();
let report_error =
publication::ReportError::with_code(error_code);
let error_reply =
publication::ErrorReply::for_error(report_error);
publication::Message::error(error_reply)
}
};
let response_bytes = self.access.create_response(
response, &self.signer
)?.to_bytes();
if should_log_cms {
cms_logger.received(&msg_bytes)?;
cms_logger.reply(&response_bytes)?;
}
Ok(response_bytes)
}
pub fn rfc8181_message(
&self,
publisher_handle: &PublisherHandle,
query: publication::Query,
) -> KrillResult<publication::Message> {
match query {
publication::Query::List => {
debug!(
"Received RFC 8181 list query for {publisher_handle}"
);
let list_reply = self.list(publisher_handle)?;
Ok(publication::Message::list_reply(list_reply))
}
publication::Query::Delta(delta) => {
debug!(
"Received RFC 8181 delta query for {publisher_handle}"
);
self.publish(publisher_handle, delta)?;
Ok(publication::Message::success())
}
}
}
pub fn rrdp_session_reset(&self) -> KrillResult<()> {
self.content.session_reset(self.config.rrdp_updates_config)
}
pub fn publish(
&self,
publisher_handle: &PublisherHandle,
delta: PublishDelta,
) -> KrillResult<()> {
let publisher = self.access.get_publisher(publisher_handle)?;
self.content.publish(
publisher_handle.clone(),
delta,
publisher.base_uri(),
)?;
self.tasks.schedule(Task::RrdpUpdateIfNeeded, now())
}
pub fn update_rrdp_if_needed(&self) -> KrillResult<Option<Time>> {
match self.content.rrdp_update_needed(
self.config.rrdp_updates_config)?
{
RrdpUpdateNeeded::No => return Ok(None),
RrdpUpdateNeeded::Later(time) => return Ok(Some(time)),
RrdpUpdateNeeded::Yes => {} }
let content = self.content.update_rrdp(
self.config.rrdp_updates_config
)?;
content.write_repository(self.config.rrdp_updates_config)?;
Ok(None)
}
pub fn delete_matching_files(
&self,
criteria: RepoFileDeleteCriteria,
) -> KrillResult<()> {
self.content.update_rrdp(self.config.rrdp_updates_config)?;
self.content.delete_matching_files(criteria.base_uri)?;
let content =
self.content.update_rrdp(self.config.rrdp_updates_config)?;
content.write_repository(self.config.rrdp_updates_config)?;
Ok(())
}
pub fn repo_stats(&self) -> KrillResult<RepoStats> {
self.content.stats()
}
pub fn list(
&self, publisher: &PublisherHandle,
) -> KrillResult<ListReply> {
self.content.list_reply(publisher)
}
}
impl RepositoryManager {
pub fn get_publisher_details(
&self,
handle: PublisherHandle,
) -> KrillResult<PublisherDetails> {
let publisher = self.access.get_publisher(&handle)?;
let id_cert = publisher.id_cert().clone();
let base_uri = publisher.base_uri().clone();
let current_files = self
.content
.current_objects(&handle)?
.try_into_published_files()?;
Ok(PublisherDetails { handle, id_cert, base_uri, current_files })
}
pub fn repository_response(
&self,
publisher: &PublisherHandle,
) -> KrillResult<RepositoryResponse> {
let rfc8181_uri = self.config.rfc8181_uri(publisher);
self.access.repository_response(rfc8181_uri, publisher)
}
pub fn create_publisher(
&self,
req: PublisherRequest,
actor: &Actor,
) -> KrillResult<()> {
let name = req.publisher_handle().clone();
self.access.add_publisher(req, actor)?;
self.content.add_publisher(name)
}
pub fn remove_publisher(
&self,
name: PublisherHandle,
actor: &Actor,
) -> KrillResult<()> {
self.content.remove_publisher(name.clone())?;
self.access.remove_publisher(name, actor)?;
self.tasks.schedule(Task::RrdpUpdateIfNeeded, now())
}
}
impl RepositoryManager {
pub fn write_repository(&self) -> KrillResult<()> {
self.content
.write_repository(self.config.rrdp_updates_config)
}
}
#[cfg(test)]
mod tests {
use std::fs;
use std::path::{Path, PathBuf};
use std::str::{from_utf8, FromStr};
use std::time::Duration;
use bytes::Bytes;
use tokio::time::sleep;
use url::Url;
use rpki::uri;
use rpki::ca::idexchange::Handle;
use rpki::ca::publication::{ListElement, PublishDelta};
use crate::api::ca::IdCertInfo;
use crate::commons::file;
use crate::commons::crypto::{KrillSignerBuilder, OpenSslSignerConfig};
use crate::commons::file::CurrentFile;
use crate::commons::test::{self, https, rsync};
use crate::constants::{
ACTOR_DEF_TEST, RRDP_FIRST_SERIAL, enable_test_mode
};
use crate::config::{SignerConfig, SignerType};
use crate::server::pubd::Publisher;
use crate::server::pubd::rrdp::{PublicationDeltaError, RrdpServer};
use super::*;
fn publisher_alice(storage_uri: &Url) -> Publisher {
let signer = {
let signer_type =
SignerType::OpenSsl(OpenSslSignerConfig::default());
let signer_config =
SignerConfig::new("Alice".to_string(), signer_type);
let signer_configs = &[signer_config];
KrillSignerBuilder::new(
storage_uri,
Duration::from_secs(1),
signer_configs,
)
.build()
.unwrap()
};
let id_cert = signer.create_self_signed_id_cert().unwrap();
let base_uri =
uri::Rsync::from_str("rsync://localhost/repo/alice/").unwrap();
Publisher::new(id_cert.into(), base_uri)
}
fn make_publisher_req(
handle: &str,
id_cert: &IdCertInfo,
) -> PublisherRequest {
let handle = Handle::from_str(handle).unwrap();
PublisherRequest::new(
id_cert.base64.clone(),
handle,
None,
)
}
fn make_server(
storage_uri: &Url
) -> (RepositoryManager, tempfile::TempDir) {
let data_dir = tempfile::tempdir().unwrap();
enable_test_mode();
let mut config = Config::test(
storage_uri,
Some(data_dir.path()),
true,
false,
false,
false,
);
let _ = config.init_logging();
config.process().unwrap();
let signer = KrillSignerBuilder::new(
storage_uri,
Duration::from_secs(1),
&config.signers,
)
.with_default_signer(config.default_signer())
.with_one_off_signer(config.one_off_signer())
.build()
.unwrap();
let signer = Arc::new(signer);
let config = Arc::new(config);
let mq = Arc::new(TaskQueue::new(&config.storage_uri).unwrap());
let repository_manager =
RepositoryManager::build(config, mq, signer).unwrap();
let uris = PublicationServerUris {
rrdp_base_uri: https("https://localhost/repo/rrdp/"),
rsync_jail: rsync("rsync://localhost/repo/"),
};
repository_manager.init(uris).unwrap();
(repository_manager, data_dir)
}
#[test]
fn should_add_publisher() {
let storage_uri = test::mem_storage();
let (server, _data_dir) = make_server(&storage_uri);
let alice = publisher_alice(&storage_uri);
let alice_handle = Handle::from_str("alice").unwrap();
let publisher_req =
make_publisher_req(alice_handle.as_str(), alice.id_cert());
let actor = ACTOR_DEF_TEST;
server.create_publisher(publisher_req, &actor).unwrap();
let alice_found =
server.get_publisher_details(alice_handle).unwrap();
assert_eq!(alice_found.base_uri, alice.base_uri());
assert_eq!(alice_found.id_cert, *alice.id_cert());
assert!(alice_found.current_files.is_empty());
}
#[test]
fn should_not_add_publisher_twice() {
let storage_uri = test::mem_storage();
let (server, _data_dir) = make_server(&storage_uri);
let alice = publisher_alice(&storage_uri);
let alice_handle = Handle::from_str("alice").unwrap();
let publisher_req =
make_publisher_req(alice_handle.as_str(), alice.id_cert());
let actor = ACTOR_DEF_TEST;
server
.create_publisher(publisher_req.clone(), &actor)
.unwrap();
match server.create_publisher(publisher_req, &actor) {
Err(Error::PublisherDuplicate(name)) => {
assert_eq!(name, alice_handle)
}
_ => panic!("Expected error"),
}
}
#[test]
fn should_list_files() {
let storage_uri = test::mem_storage();
let (server, _data_dir) = make_server(&storage_uri);
let alice = publisher_alice(&storage_uri);
let alice_handle = Handle::from_str("alice").unwrap();
let publisher_req =
make_publisher_req(alice_handle.as_str(), alice.id_cert());
let actor = ACTOR_DEF_TEST;
server.create_publisher(publisher_req, &actor).unwrap();
let list_reply = server.list(&alice_handle).unwrap();
assert_eq!(0, list_reply.elements().len());
}
#[tokio::test]
async fn should_publish_files() {
let storage_uri = test::mem_storage();
let (server, data_dir) = make_server(&storage_uri);
let session = session_dir(data_dir.path());
assert!(!session_dir_contains_serial(&session, 0));
assert!(session_dir_contains_serial(&session, RRDP_FIRST_SERIAL));
let alice = publisher_alice(&storage_uri);
let alice_handle = Handle::from_str("alice").unwrap();
let publisher_req =
make_publisher_req(alice_handle.as_str(), alice.id_cert());
let actor = ACTOR_DEF_TEST;
server.create_publisher(publisher_req, &actor).unwrap();
fn find_in_reply<'a>(
reply: &'a ListReply,
uri: &uri::Rsync,
) -> Option<&'a ListElement> {
reply.elements().iter().find(|e| e.uri() == uri)
}
let file1 = CurrentFile::new(
test::rsync("rsync://localhost/repo/alice/file.txt"),
&Bytes::from("example content"),
);
let file2 = CurrentFile::new(
test::rsync("rsync://localhost/repo/alice/file2.txt"),
&Bytes::from("example content 2"),
);
let mut delta = PublishDelta::empty();
delta.add_publish(file1.as_publish());
delta.add_publish(file2.as_publish());
server.publish(&alice_handle, delta).unwrap();
server.update_rrdp_if_needed().unwrap();
server.write_repository().unwrap();
let list_reply = server.list(&alice_handle).unwrap();
assert_eq!(2, list_reply.elements().len());
assert!(find_in_reply(
&list_reply,
&test::rsync("rsync://localhost/repo/alice/file.txt")
)
.is_some());
assert!(find_in_reply(
&list_reply,
&test::rsync("rsync://localhost/repo/alice/file2.txt")
)
.is_some());
sleep(Duration::from_secs(2)).await;
let file1_update = CurrentFile::new(
test::rsync("rsync://localhost/repo/alice/file.txt"),
&Bytes::from("example content - updated"),
);
let big_file_3 = include_bytes!("../../../LICENSE");
let file3 = CurrentFile::new(
test::rsync("rsync://localhost/repo/alice/file3.txt"),
&Bytes::from_static(big_file_3),
);
let mut delta = PublishDelta::empty();
delta.add_update(file1_update.as_update(file1.hash()));
delta.add_withdraw(file2.as_withdraw());
delta.add_publish(file3.as_publish());
server.publish(&alice_handle, delta).unwrap();
server.update_rrdp_if_needed().unwrap();
server.write_repository().unwrap();
let list_reply = server.list(&alice_handle).unwrap();
assert_eq!(2, list_reply.elements().len());
assert!(find_in_reply(
&list_reply,
&test::rsync("rsync://localhost/repo/alice/file.txt")
)
.is_some());
assert_eq!(
find_in_reply(
&list_reply,
&test::rsync("rsync://localhost/repo/alice/file.txt")
)
.unwrap()
.hash(),
file1_update.hash()
);
assert!(find_in_reply(
&list_reply,
&test::rsync("rsync://localhost/repo/alice/file3.txt")
)
.is_some());
let file_outside = CurrentFile::new(
test::rsync("rsync://localhost/repo/bob/file.txt"),
&Bytes::from("irrelevant"),
);
let mut delta = PublishDelta::empty();
delta.add_publish(file_outside.as_publish());
match server.publish(&alice_handle, delta) {
Err(Error::Rfc8181Delta(
PublicationDeltaError::UriOutsideJail(_, _),
)) => {} _ => panic!("Expected error publishing outside of base uri jail"),
}
let file2_update = CurrentFile::new(
test::rsync("rsync://localhost/repo/alice/file2.txt"),
&Bytes::from("example content 2 updated"),
); let mut delta = PublishDelta::empty();
delta.add_update(file2_update.as_update(file2.hash()));
match server.publish(&alice_handle, delta) {
Err(Error::Rfc8181Delta(
PublicationDeltaError::NoObjectForHashAndOrUri(_),
)) => {}
_ => panic!("Expected error when file for update can't be found"),
}
let mut delta = PublishDelta::empty();
delta.add_withdraw(file2.as_withdraw());
match server.publish(&alice_handle, delta) {
Err(Error::Rfc8181Delta(
PublicationDeltaError::NoObjectForHashAndOrUri(_),
)) => {} _ => {
panic!("Expected error withdrawing file that does not exist")
}
}
let mut delta = PublishDelta::empty();
delta.add_publish(file3.as_publish());
match server.publish(&alice_handle, delta) {
Err(Error::Rfc8181Delta(
PublicationDeltaError::ObjectAlreadyPresent(uri),
)) => {
assert_eq!(
uri,
test::rsync("rsync://localhost/repo/alice/file3.txt")
)
}
_ => panic!("Expected error publishing file that already exists"),
}
assert!(!session_dir_contains_serial(&session, RRDP_FIRST_SERIAL));
assert!(!session_dir_contains_serial(
&session,
RRDP_FIRST_SERIAL + 1
));
let file4 = CurrentFile::new(
test::rsync("rsync://localhost/repo/alice/file4.txt"),
&Bytes::from("4"),
);
let mut delta = PublishDelta::empty();
delta.add_publish(file4.as_publish());
server.publish(&alice_handle, delta).unwrap();
server.update_rrdp_if_needed().unwrap();
server.write_repository().unwrap();
assert!(session_dir_contains_serial(&session, RRDP_FIRST_SERIAL + 3));
assert!(session_dir_contains_delta(&session, RRDP_FIRST_SERIAL + 3));
assert!(session_dir_contains_snapshot(
&session,
RRDP_FIRST_SERIAL + 3
));
assert!(session_dir_contains_delta(&session, RRDP_FIRST_SERIAL + 2));
server.remove_publisher(alice_handle, &actor).unwrap();
server.update_rrdp_if_needed().unwrap();
server.write_repository().unwrap();
assert!(session_dir_contains_snapshot(
&session,
RRDP_FIRST_SERIAL + 4
));
let snapshot_bytes = file::read(
&RrdpServer::session_dir_snapshot(
&session,
RRDP_FIRST_SERIAL + 4,
)
.unwrap()
.unwrap(),
)
.unwrap();
let snapshot_xml = from_utf8(&snapshot_bytes).unwrap();
assert!(!snapshot_xml.contains("/alice/"));
assert!(!session_dir_contains_serial(
&session,
RRDP_FIRST_SERIAL + 2
));
assert!(!session_dir_contains_serial(
&session,
RRDP_FIRST_SERIAL + 3
));
}
#[test]
pub fn repository_session_reset() {
let storage_uri = test::mem_storage();
let (server, data_dir) = make_server(&storage_uri);
let alice = publisher_alice(&storage_uri);
let alice_handle = Handle::from_str("alice").unwrap();
let publisher_req =
make_publisher_req(alice_handle.as_str(), alice.id_cert());
let actor = ACTOR_DEF_TEST;
server.create_publisher(publisher_req, &actor).unwrap();
fn find_in_reply<'a>(
reply: &'a ListReply,
uri: &uri::Rsync,
) -> Option<&'a ListElement> {
reply.elements().iter().find(|e| e.uri() == uri)
}
let file1 = CurrentFile::new(
test::rsync("rsync://localhost/repo/alice/file.txt"),
&Bytes::from("example content"),
);
let file2 = CurrentFile::new(
test::rsync("rsync://localhost/repo/alice/file2.txt"),
&Bytes::from("example content 2"),
);
let mut delta = PublishDelta::empty();
delta.add_publish(file1.as_publish());
delta.add_publish(file2.as_publish());
server.publish(&alice_handle, delta).unwrap();
server.update_rrdp_if_needed().unwrap();
server.write_repository().unwrap();
let list_reply = server.list(&alice_handle).unwrap();
assert_eq!(2, list_reply.elements().len());
assert!(find_in_reply(
&list_reply,
&test::rsync("rsync://localhost/repo/alice/file.txt")
)
.is_some());
assert!(find_in_reply(
&list_reply,
&test::rsync("rsync://localhost/repo/alice/file2.txt")
)
.is_some());
let stats_before = server.repo_stats().unwrap();
let session_before = stats_before.session;
let snapshot_before_session_reset = find_in_session_and_serial_dir(
data_dir.path(),
session_before,
RRDP_FIRST_SERIAL + 1,
"snapshot.xml",
);
assert!(snapshot_before_session_reset.is_some());
server.rrdp_session_reset().unwrap();
let stats_after = server.repo_stats().unwrap();
let session_after = stats_after.session;
let snapshot_after_session_reset = find_in_session_and_serial_dir(
data_dir.path(),
session_after,
RRDP_FIRST_SERIAL,
"snapshot.xml",
);
assert_ne!(
snapshot_before_session_reset,
snapshot_after_session_reset
);
assert!(snapshot_after_session_reset.is_some());
let snapshot_before_session_reset = find_in_session_and_serial_dir(
data_dir.path(),
session_before,
RRDP_FIRST_SERIAL + 1,
"snapshot.xml",
);
assert!(snapshot_before_session_reset.is_none());
}
fn session_dir(base_dir: &Path) -> PathBuf {
let mut rrdp_dir = base_dir.to_path_buf();
rrdp_dir = rrdp_dir.join("repo/rrdp");
for entry in fs::read_dir(&rrdp_dir).unwrap() {
let entry = entry.unwrap();
if entry.file_name().to_string_lossy() != "notification.xml" {
return entry.path();
}
}
panic!(
"Could not find session dir under: {}",
base_dir.to_string_lossy()
)
}
fn session_dir_contains_serial(session_uri: &Path, serial: u64) -> bool {
let mut path = session_uri.to_path_buf();
path.push(serial.to_string());
path.is_dir()
}
fn session_dir_contains_delta(session_uri: &Path, serial: u64) -> bool {
RrdpServer::find_in_serial_dir(session_uri, serial, "delta.xml")
.unwrap()
.is_some()
}
fn session_dir_contains_snapshot(
session_uri: &Path,
serial: u64,
) -> bool {
RrdpServer::session_dir_snapshot(session_uri, serial)
.unwrap()
.is_some()
}
fn find_in_session_and_serial_dir(
base_dir: &Path,
session: uuid::Uuid,
serial: u64,
filename: &str,
) -> Option<PathBuf> {
let session_path = base_dir.join(format!("repo/rrdp/{session}"));
RrdpServer::find_in_serial_dir(&session_path, serial, filename)
.unwrap()
}
}