use log::{debug, info};
use rpki::ca::idexchange::{CaHandle, MyHandle};
use rpki::repository::x509::Time;
use crate::api::aspa::ProviderAsn;
use crate::commons::eventsourcing::{
AggregateStore, StoredCommand, StoredCommandBuilder
};
use crate::commons::storage::{Ident, KeyValueStore};
use crate::constants::{CASERVER_NS, CA_OBJECTS_NS};
use crate::server::ca::certauth::CertAuth;
use crate::server::ca::commands::CertAuthStorableCommand;
use crate::server::ca::events::{CertAuthEvent, CertAuthInitEvent};
use crate::server::ca::publishing::CaObjects;
use crate::config::Config;
use crate::upgrades::{
AspaMigrationConfigUpdates, AspaMigrationConfigs, CommandMigrationEffect,
UpgradeAggregateStorePre0_14, UpgradeError, UpgradeMode, UpgradeResult,
UnconvertedEffect,
};
use crate::upgrades::pre_0_14_0::OldStoredCommand;
use super::old_events::{
OldCaObjects, Pre0_10CertAuthEvent, Pre0_10CertAuthInitEvent
};
use super::old_commands::Pre0_10_0CertAuthStorableCommand;
pub struct CasMigration {
current_kv_store: KeyValueStore,
new_kv_store: KeyValueStore,
new_agg_store: AggregateStore<CertAuth>,
ca_objects_migration: CaObjectsMigration,
}
impl CasMigration {
pub fn upgrade(
mode: UpgradeMode,
config: &Config,
) -> UpgradeResult<AspaMigrationConfigs> {
Self {
current_kv_store: KeyValueStore::create(
&config.storage_uri, CASERVER_NS
)?,
new_kv_store: KeyValueStore::create_upgrade_store(
&config.storage_uri,
CASERVER_NS,
)?,
new_agg_store: AggregateStore::<CertAuth>::create_upgrade_store(
&config.storage_uri,
CASERVER_NS,
config.use_history_cache,
)?,
ca_objects_migration: CaObjectsMigration::create(config)?,
}
.upgrade(mode)
}
}
impl UpgradeAggregateStorePre0_14 for CasMigration {
type Aggregate = CertAuth;
type OldInitEvent = Pre0_10CertAuthInitEvent;
type OldEvent = Pre0_10CertAuthEvent;
type OldStorableDetails = Pre0_10_0CertAuthStorableCommand;
fn store_name(&self) -> &str {
"CAs"
}
fn convert_init_event(
&self,
old_init: Self::OldInitEvent,
handle: MyHandle,
actor: String,
time: Time,
) -> UpgradeResult<StoredCommand<Self::Aggregate>> {
let details = CertAuthStorableCommand::Init;
let init_event = CertAuthInitEvent { id: old_init.into() };
let builder = StoredCommandBuilder::<CertAuth>::new(
actor, time, handle, 0, details,
);
Ok(builder.finish_with_init_event(init_event))
}
fn deployed_store(&self) -> &KeyValueStore {
&self.current_kv_store
}
fn preparation_key_value_store(&self) -> &KeyValueStore {
&self.new_kv_store
}
fn preparation_aggregate_store(
&self,
) -> &AggregateStore<Self::Aggregate> {
&self.new_agg_store
}
fn convert_old_command(
&self,
old_command: OldStoredCommand<Self::OldStorableDetails>,
old_effect: UnconvertedEffect<Self::OldEvent>,
version: u64,
) -> UpgradeResult<CommandMigrationEffect<Self::Aggregate>> {
match old_command.details() {
Pre0_10_0CertAuthStorableCommand::AspaRemove { .. }
| Pre0_10_0CertAuthStorableCommand::AspasUpdate { .. }
| Pre0_10_0CertAuthStorableCommand::AspasUpdateExisting {
..
} => {
if let Some(events) = old_effect.into_events() {
for old_event in events {
match old_event {
Pre0_10CertAuthEvent::AspaObjectsUpdated {
updates,
..
} => {
let ca = old_command.handle().clone();
let removed = updates
.removed
.into_iter()
.map(rpki::resources::Asn::from)
.collect();
let added_or_updated = updates
.updated
.into_iter()
.map(|info| {
let customer =
info.definition.customer.provider;
let providers: Vec<ProviderAsn> =
info.definition
.providers
.into_iter()
.map(|p| p.provider)
.collect();
(customer, providers)
})
.collect();
let updates = AspaMigrationConfigUpdates {
ca,
added_or_updated,
removed,
};
return Ok(CommandMigrationEffect::AspaObjectsUpdates(updates));
}
_ => {
}
}
}
}
Ok(CommandMigrationEffect::Nothing)
}
_ => {
let new_command_builder =
StoredCommandBuilder::<CertAuth>::new(
old_command.actor().clone(),
old_command.time(),
old_command.handle().clone(),
version,
old_command.details().clone().into(),
);
let new_command = match old_effect {
UnconvertedEffect::Error { msg } => {
new_command_builder.finish_with_error(msg)
}
UnconvertedEffect::Success { events } => {
let mut full_events: Vec<CertAuthEvent> = vec![]; for old_event in events {
match old_event {
Pre0_10CertAuthEvent::AspaConfigAdded { .. }
| Pre0_10CertAuthEvent::AspaConfigRemoved { .. }
| Pre0_10CertAuthEvent::AspaConfigUpdated { .. }
| Pre0_10CertAuthEvent::AspaObjectsUpdated { .. } => {
}
_ => {
full_events.push(old_event.try_into()?);
}
}
}
new_command_builder.finish_with_events(full_events)
}
};
if
let Some(events) = new_command.events()
&& events.is_empty()
{
return Ok(CommandMigrationEffect::Nothing);
}
Ok(CommandMigrationEffect::StoredCommand(new_command))
}
}
}
fn post_command_migration(&self, handle: &MyHandle) -> UpgradeResult<()> {
info!(
"Will migrate the current repository objects for CA '{handle}'"
);
self.ca_objects_migration.prepare_new_data_for(handle)
}
}
struct CaObjectsMigration {
current_store: KeyValueStore,
new_store: KeyValueStore,
}
impl CaObjectsMigration {
fn create(config: &Config) -> Result<Self, UpgradeError> {
Ok(CaObjectsMigration {
current_store: KeyValueStore::create(
&config.storage_uri, CA_OBJECTS_NS
)?,
new_store: KeyValueStore::create_upgrade_store(
&config.storage_uri,
CA_OBJECTS_NS,
)?
})
}
fn prepare_new_data_for(
&self,
ca: &CaHandle,
) -> Result<(), UpgradeError> {
let key = Ident::builder(
Ident::from_handle(ca).into_owned()
).finish_with_extension( const { Ident::make("json") });
if let Some(old_objects) =
self.current_store.get::<OldCaObjects>(None, &key)?
{
let converted: CaObjects = old_objects.try_into()?;
self.new_store.store(None, &key, &converted)?;
debug!(
"Stored updated objects for CA {} in {:?}",
ca, self.new_store
);
}
Ok(())
}
}