use std::fmt;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use log::{error, info};
use rpki::ca::publication;
use rpki::ca::idexchange::{
MyHandle, PublisherHandle, PublisherRequest, RepositoryResponse,
ServiceUri,
};
use rpki::ca::publication::PublicationCms;
use rpki::crypto::KeyIdentifier;
use rpki::uri;
use serde::{Deserialize, Serialize};
use crate::api::admin::PublicationServerUris;
use crate::api::ca::IdCertInfo;
use crate::api::history::CommandSummary;
use crate::commons::KrillResult;
use crate::commons::actor::Actor;
use crate::commons::crypto::KrillSigner;
use crate::commons::error::Error;
use crate::commons::eventsourcing::{
Aggregate, AggregateStore, CommandDetails, Event, InitCommandDetails,
InitEvent, SentCommand, SentInitCommand, WithStorableDetails,
};
use crate::constants::{
ACTOR_DEF_KRILL, PUBSERVER_DFLT, PUBSERVER_NS, TA_NAME
};
use crate::config::Config;
use super::publishers::Publisher;
pub struct RepositoryAccessProxy {
store: AggregateStore<RepositoryAccess>,
key: MyHandle,
}
impl RepositoryAccessProxy {
pub fn create(config: &Config) -> KrillResult<Self> {
let store = AggregateStore::<RepositoryAccess>::create(
&config.storage_uri,
PUBSERVER_NS,
config.use_history_cache,
)?;
let key = MyHandle::from_str(PUBSERVER_DFLT).unwrap();
if store.has(&key)? && let Err(e) = store.warm() {
error!(
"Could not warm up cache, data seems corrupt. \
You may need to restore a backup. Error was: {e}"
);
}
Ok(RepositoryAccessProxy { store, key })
}
pub fn is_initialized(&self) -> KrillResult<bool> {
self.store.has(&self.key).map_err(Error::AggregateStoreError)
}
pub fn init(
&self,
uris: PublicationServerUris,
signer: Arc<KrillSigner>,
) -> KrillResult<()> {
if self.is_initialized()? {
return Err(Error::RepositoryServerAlreadyInitialized)
};
let actor = ACTOR_DEF_KRILL;
let cmd = RepositoryAccessInitCommand::new(
self.key.clone(),
RepositoryAccessInitCommandDetails {
rrdp_base_uri: uris.rrdp_base_uri,
rsync_jail: uris.rsync_jail,
signer,
},
&actor,
);
self.store.add(cmd)?;
Ok(())
}
pub fn clear(&self) -> KrillResult<()> {
if !self.is_initialized()? {
Err(Error::RepositoryServerNotInitialized)
}
else if !self.publishers()?.is_empty() {
Err(Error::RepositoryServerHasPublishers)
}
else {
self.store.drop_aggregate(&self.key)?;
Ok(())
}
}
fn read(&self) -> KrillResult<Arc<RepositoryAccess>> {
if !self.is_initialized()? {
Err(Error::RepositoryServerNotInitialized)
}
else {
self.store.get_latest(&self.key).map_err(|e| {
Error::custom(format!("Publication Server data issue: {e}"))
})
}
}
pub fn publishers(&self) -> KrillResult<Vec<PublisherHandle>> {
Ok(self.read()?.publishers())
}
pub fn get_publisher(
&self,
name: &PublisherHandle,
) -> KrillResult<Publisher> {
self.read()?.get_publisher(name).cloned()
}
pub fn add_publisher(
&self,
req: PublisherRequest,
actor: &Actor,
) -> KrillResult<()> {
let name = req.publisher_handle().clone();
let id_cert = req.validate().map_err(Error::rfc8183)?;
let base_uri = self.read()?.publisher_rsync_base(&name)?;
let cmd = RepositoryAccessCommand::new(
self.key.clone(),
None,
RepositoryAccessCommandDetails::AddPublisher {
id_cert: id_cert.into(),
name,
base_uri,
},
actor,
);
self.store.command(cmd)?;
Ok(())
}
pub fn remove_publisher(
&self,
name: PublisherHandle,
actor: &Actor,
) -> KrillResult<()> {
if !self.is_initialized()? {
return Err(Error::RepositoryServerNotInitialized)
}
let cmd = RepositoryAccessCommand::new(
self.key.clone(),
None,
RepositoryAccessCommandDetails::RemovePublisher { name },
actor,
);
self.store.command(cmd)?;
Ok(())
}
pub fn repository_response(
&self,
rfc8181_uri: uri::Https,
publisher: &PublisherHandle,
) -> KrillResult<RepositoryResponse> {
self.read()?.repository_response(rfc8181_uri, publisher)
}
pub fn decode_and_validate(
&self,
publisher: &PublisherHandle,
bytes: &[u8],
) -> KrillResult<PublicationCms> {
let publisher = self.get_publisher(publisher)?;
let msg = PublicationCms::decode(bytes).map_err(Error::Rfc8181)?;
msg.validate(&publisher.id_cert().public_key)
.map_err(Error::Rfc8181)?;
Ok(msg)
}
pub fn create_response(
&self,
message: publication::Message,
signer: &KrillSigner,
) -> KrillResult<PublicationCms> {
let key_id = self.read()?.key_id();
signer
.create_rfc8181_cms(message, &key_id)
.map_err(Error::signer)
}
}
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct RepositoryAccess {
handle: MyHandle,
version: u64,
id_cert: IdCertInfo,
publishers: HashMap<PublisherHandle, Publisher>,
rsync_base: uri::Rsync,
rrdp_base: uri::Https,
}
impl RepositoryAccess {
pub fn key_id(&self) -> KeyIdentifier {
self.id_cert.public_key.key_identifier()
}
}
impl Aggregate for RepositoryAccess {
type Command = RepositoryAccessCommand;
type StorableCommandDetails = StorableRepositoryCommand;
type Event = RepositoryAccessEvent;
type InitCommand = RepositoryAccessInitCommand;
type InitEvent = RepositoryAccessInitEvent;
type Error = Error;
fn init(handle: &MyHandle, event: Self::InitEvent) -> Self {
RepositoryAccess {
handle: handle.clone(),
version: 1,
id_cert: event.id_cert,
publishers: HashMap::new(),
rsync_base: event.rsync_jail,
rrdp_base: event.rrdp_base_uri,
}
}
fn process_init_command(
command: Self::InitCommand,
) -> Result<Self::InitEvent, Self::Error> {
let details = command.into_details();
let id_cert_info = details.signer.create_self_signed_id_cert()?.into();
Ok(RepositoryAccessInitEvent {
id_cert: id_cert_info,
rrdp_base_uri: details.rrdp_base_uri,
rsync_jail: details.rsync_jail,
})
}
fn version(&self) -> u64 {
self.version
}
fn increment_version(&mut self) {
self.version += 1;
}
fn apply(&mut self, event: Self::Event) {
match event {
RepositoryAccessEvent::PublisherAdded { name, publisher } => {
self.publishers.insert(name, publisher);
}
RepositoryAccessEvent::PublisherRemoved { name } => {
self.publishers.remove(&name);
}
}
}
fn process_command(
&self,
command: Self::Command,
) -> Result<Vec<Self::Event>, Self::Error> {
info!(
"Processing command for publisher '{}', version: {}: {}",
self.handle, self.version, command
);
match command.into_details() {
RepositoryAccessCommandDetails::AddPublisher {
id_cert, name, base_uri,
} => {
self.process_add_publisher(id_cert, name, base_uri)
}
RepositoryAccessCommandDetails::RemovePublisher { name } => {
self.process_remove_publisher(name)
}
}
}
}
impl RepositoryAccess {
fn process_add_publisher(
&self,
id_cert: IdCertInfo,
name: PublisherHandle,
base_uri: uri::Rsync,
) -> Result<Vec<RepositoryAccessEvent>, Error> {
if self.publishers.contains_key(&name) {
Err(Error::PublisherDuplicate(name))
}
else {
let publisher = Publisher::new(id_cert, base_uri);
Ok(vec![RepositoryAccessEvent::PublisherAdded {
name, publisher,
}])
}
}
fn process_remove_publisher(
&self,
publisher_handle: PublisherHandle,
) -> Result<Vec<RepositoryAccessEvent>, Error> {
if !self.has_publisher(&publisher_handle) {
Err(Error::PublisherUnknown(publisher_handle))
}
else {
Ok(vec![RepositoryAccessEvent::PublisherRemoved {
name: publisher_handle,
}])
}
}
fn notification_uri(&self) -> uri::Https {
self.rrdp_base.join(b"notification.xml").unwrap()
}
fn publisher_rsync_base(
&self,
name: &PublisherHandle,
) -> KrillResult<uri::Rsync> {
if name.as_str() == TA_NAME {
Ok(self.rsync_base.clone())
}
else {
uri::Rsync::from_str(
&format!("{}{}/", self.rsync_base, name)).map_err(|_| {
Error::Custom(format!(
"Cannot derive base uri for {name}"
))
}
)
}
}
fn repository_response(
&self,
rfc8181_uri: uri::Https,
publisher_handle: &PublisherHandle,
) -> Result<RepositoryResponse, Error> {
let publisher = self.get_publisher(publisher_handle)?;
let rsync_base = publisher.base_uri();
let service_uri = ServiceUri::Https(rfc8181_uri);
Ok(RepositoryResponse::new(
self.id_cert.base64.clone(),
publisher_handle.clone(),
service_uri,
rsync_base.clone(),
Some(self.notification_uri()),
None,
))
}
fn get_publisher(
&self,
publisher_handle: &PublisherHandle,
) -> Result<&Publisher, Error> {
self.publishers.get(publisher_handle).ok_or_else(|| {
Error::PublisherUnknown(publisher_handle.clone())
})
}
fn has_publisher(&self, name: &PublisherHandle) -> bool {
self.publishers.contains_key(name)
}
fn publishers(&self) -> Vec<PublisherHandle> {
self.publishers.keys().cloned().collect()
}
}
pub type RepositoryAccessInitCommand = SentInitCommand<
RepositoryAccessInitCommandDetails
>;
#[derive(Clone, Debug)]
pub struct RepositoryAccessInitCommandDetails {
pub rrdp_base_uri: uri::Https,
pub rsync_jail: uri::Rsync,
pub signer: Arc<KrillSigner>,
}
impl InitCommandDetails for RepositoryAccessInitCommandDetails {
type StorableDetails = StorableRepositoryCommand;
fn store(&self) -> Self::StorableDetails {
StorableRepositoryCommand::make_init()
}
}
impl fmt::Display for RepositoryAccessInitCommandDetails {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.store().fmt(f)
}
}
pub type RepositoryAccessCommand = SentCommand<
RepositoryAccessCommandDetails
>;
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[allow(clippy::large_enum_variant)]
#[serde(rename_all = "snake_case", tag = "type")]
pub enum RepositoryAccessCommandDetails {
AddPublisher {
id_cert: IdCertInfo,
name: PublisherHandle,
base_uri: uri::Rsync,
},
RemovePublisher {
name: PublisherHandle,
},
}
impl CommandDetails for RepositoryAccessCommandDetails {
type Event = RepositoryAccessEvent;
type StorableDetails = StorableRepositoryCommand;
fn store(&self) -> Self::StorableDetails {
self.clone().into()
}
}
impl fmt::Display for RepositoryAccessCommandDetails {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
StorableRepositoryCommand::from(self.clone()).fmt(f)
}
}
impl From<RepositoryAccessCommandDetails> for StorableRepositoryCommand {
fn from(d: RepositoryAccessCommandDetails) -> Self {
match d {
RepositoryAccessCommandDetails::AddPublisher { name, .. } => {
StorableRepositoryCommand::AddPublisher { name }
}
RepositoryAccessCommandDetails::RemovePublisher { name } => {
StorableRepositoryCommand::RemovePublisher { name }
}
}
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[allow(clippy::large_enum_variant)]
#[serde(rename_all = "snake_case", tag = "type")]
pub enum StorableRepositoryCommand {
Init,
AddPublisher {
name: PublisherHandle
},
RemovePublisher {
name: PublisherHandle
},
}
impl WithStorableDetails for StorableRepositoryCommand {
fn summary(&self) -> CommandSummary {
match self {
StorableRepositoryCommand::Init => {
CommandSummary::new("pubd-init", self)
}
StorableRepositoryCommand::AddPublisher { name } => {
CommandSummary::new("pubd-publisher-add", self)
.publisher(name)
}
StorableRepositoryCommand::RemovePublisher { name } => {
CommandSummary::new("pubd-publisher-remove", self)
.publisher(name)
}
}
}
fn make_init() -> Self {
Self::Init
}
}
impl fmt::Display for StorableRepositoryCommand {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
StorableRepositoryCommand::Init => {
write!(f, "Initialise server")
}
StorableRepositoryCommand::AddPublisher { name } => {
write!(f, "Added publisher '{name}'")
}
StorableRepositoryCommand::RemovePublisher { name } => {
write!(f, "Removed publisher '{name}'")
}
}
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct RepositoryAccessInitEvent {
pub id_cert: IdCertInfo,
pub rrdp_base_uri: uri::Https,
pub rsync_jail: uri::Rsync,
}
impl InitEvent for RepositoryAccessInitEvent {}
impl RepositoryAccessInitEvent {
pub fn init(
rsync_jail: uri::Rsync,
rrdp_base_uri: uri::Https,
signer: &KrillSigner,
) -> KrillResult<RepositoryAccessInitEvent> {
signer.create_self_signed_id_cert().map_err(Error::signer).map(|id| {
RepositoryAccessInitEvent {
id_cert: id.into(),
rrdp_base_uri,
rsync_jail,
}
})
}
}
impl fmt::Display for RepositoryAccessInitEvent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Initialized publication server. RRDP base uri: {}, \
Rsync Jail: {}",
self.rrdp_base_uri, self.rsync_jail
)
}
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[allow(clippy::large_enum_variant)]
#[serde(rename_all = "snake_case", tag = "type")]
pub enum RepositoryAccessEvent {
PublisherAdded {
name: PublisherHandle,
publisher: Publisher,
},
PublisherRemoved {
name: PublisherHandle,
},
}
impl Event for RepositoryAccessEvent {}
impl fmt::Display for RepositoryAccessEvent {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
RepositoryAccessEvent::PublisherAdded { name, .. } => {
write!(f, "Publisher '{name}' added")
}
RepositoryAccessEvent::PublisherRemoved { name } => {
write!(f, "Publisher '{name}' removed")
}
}
}
}