use std::borrow::Cow;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::RwLock;
use log::{info, trace};
use rpki::ca::idexchange::{CaHandle, ChildHandle, ParentHandle, ServiceUri};
use rpki::ca::provisioning::ResourceClassListResponse as Entitlements;
use rpki::ca::publication::PublishDelta;
use serde::{Deserialize, Serialize};
use url::Url;
use crate::api::ca::{
ChildConnectionStats, ChildStatus, ChildrenConnectionStats, ParentStatus,
ParentStatuses, RepoStatus,
};
use crate::api::status::ErrorResponse;
use crate::commons::httpclient;
use crate::commons::KrillResult;
use crate::commons::error::Error;
use crate::commons::storage::{Ident, KeyValueStore};
const PARENTS_PREFIX: &Ident = Ident::make("parents-");
const CHILDREN_PREFIX: &Ident = Ident::make("children-");
const JSON_EXTENSION: &Ident = Ident::make("json");
const JSON_SUFFIX: &str = ".json";
pub struct CaStatusStore {
store: KeyValueStore,
cache: RwLock<HashMap<CaHandle, CaStatus>>,
}
impl CaStatusStore {
pub fn create(
storage_uri: &Url,
namespace: &Ident,
) -> KrillResult<Self> {
let store = KeyValueStore::create(storage_uri, namespace)?;
let cache = RwLock::new(HashMap::new());
let store = Self { store, cache };
store.warm()?;
Ok(store)
}
fn warm(&self) -> KrillResult<()> {
for scope in self.store.scopes()? {
if let Some(ca) = scope.to_handle() {
self.convert_pre_0_9_5_full_status_if_present(&ca)?;
self.load_full_status(&ca)?;
}
}
Ok(())
}
fn load_full_status(&self, ca: &CaHandle) -> KrillResult<()> {
let scope = Self::scope(ca);
let repo: RepoStatus = match self.store.get(
Some(&scope), Self::repo_status_key()
) {
Ok(Some(status)) => status,
_ => RepoStatus::default(),
};
let mut parents = ParentStatuses::default();
for parent_key in self.store.keys(
Some(&scope), PARENTS_PREFIX.as_str()
)? {
if let Some(parent) = parent_key.as_str()
.strip_prefix(PARENTS_PREFIX.as_str())
.and_then(|pfx_stripped| {
pfx_stripped.strip_suffix(JSON_SUFFIX)
})
.and_then(|handle_str| {
ParentHandle::from_str(handle_str).ok()
})
{
let status: ParentStatus = match self.store.get(
Some(&scope), &parent_key
) {
Ok(Some(status)) => status,
_ => ParentStatus::default(),
};
parents.insert(parent, status);
}
}
let mut children = HashMap::new();
for child_key in self.store.keys(
Some(&scope), CHILDREN_PREFIX.as_str()
)? {
if let Some(child) = child_key.as_str()
.strip_prefix(CHILDREN_PREFIX.as_str())
.and_then(|pfx_stripped| {
pfx_stripped.strip_suffix(JSON_SUFFIX)
})
.and_then(|handle_str| ChildHandle::from_str(handle_str).ok())
{
let status: ChildStatus = match self.store.get(
Some(&scope), &child_key
) {
Ok(Some(status)) => status,
_ => ChildStatus::default(),
};
children.insert(child, status);
}
}
let status = CaStatus {
repo,
parents,
children,
};
self.cache.write().unwrap().insert(ca.clone(), status);
Ok(())
}
fn convert_pre_0_9_5_full_status_if_present(
&self,
ca: &CaHandle,
) -> KrillResult<()> {
let scope = Self::scope(ca);
const KEY: &Ident = Ident::make("status.json");
let status = self.store.get::<CaStatus>(
Some(&scope), KEY
).ok().flatten();
if let Some(full_status) = status {
info!(
"Migrating pre 0.9.5 connection status file for CA '{ca}' \
to new format"
);
self.store.store(
Some(&scope), Self::repo_status_key(),
full_status.repo()
)?;
for (parent, status) in full_status.parents().iter() {
self.store.store(
Some(&scope), &Self::parent_status_key(parent),
status,
)?;
}
for (child, status) in full_status.children.iter() {
self.store.store(
Some(&scope), &Self::child_status_key(child),
status,
)?;
}
self.store.drop_key(Some(&scope), KEY)?;
info!("Done migrating pre 0.9.5 connection status file");
}
Ok(())
}
fn scope(ca: &CaHandle) -> Cow<'_, Ident> {
Ident::from_handle(ca)
}
const fn repo_status_key() -> &'static Ident {
const { Ident::make("repos-main.json") }
}
fn parent_status_key(parent: &ParentHandle) -> Box<Ident> {
Ident::builder(
PARENTS_PREFIX
).push_handle(
parent
).finish_with_extension(JSON_EXTENSION)
}
fn child_status_key(child: &ChildHandle) -> Box<Ident> {
Ident::builder(
CHILDREN_PREFIX
).push_handle(
child
).finish_with_extension(JSON_EXTENSION)
}
pub fn get_ca_status(&self, ca: &CaHandle) -> CaStatus {
self.cache
.read()
.unwrap()
.get(ca)
.cloned()
.unwrap_or_default()
}
}
impl CaStatusStore {
pub fn set_parent_failure(
&self,
ca: &CaHandle,
parent: &ParentHandle,
uri: &ServiceUri,
error: &Error,
) -> KrillResult<()> {
let error_response = Self::error_to_error_res(error);
self.update_ca_parent_status(ca, parent, |status| {
status.set_failure(uri.clone(), error_response)
})
}
pub fn set_parent_last_updated(
&self,
ca: &CaHandle,
parent: &ParentHandle,
uri: &ServiceUri,
) -> KrillResult<()> {
self.update_ca_parent_status(ca, parent, |status| {
status.set_last_updated(uri.clone())
})
}
pub fn set_parent_entitlements(
&self,
ca: &CaHandle,
parent: &ParentHandle,
uri: &ServiceUri,
entitlements: &Entitlements,
) -> KrillResult<()> {
self.update_ca_parent_status(ca, parent, |status| {
status.set_entitlements(uri.clone(), entitlements)
})
}
pub fn remove_parent(
&self,
ca: &CaHandle,
parent: &ParentHandle,
) -> KrillResult<()> {
let mut cache = self.cache.write().unwrap();
if let Some(ca_status) = cache.get_mut(ca) {
let parent_status = ca_status.parents.remove(parent);
if parent_status.is_some() {
trace!("Parent status for {} was found", parent);
self.store.drop_key(
Some(&Self::scope(ca)), &Self::parent_status_key(parent)
)?;
}
}
Ok(())
}
pub fn set_child_success(
&self,
ca: &CaHandle,
child: &ChildHandle,
user_agent: Option<String>,
) -> KrillResult<()> {
self.update_ca_child_status(ca, child, |status| {
status.set_success(user_agent)
})
}
pub fn set_child_failure(
&self,
ca: &CaHandle,
child: &ChildHandle,
user_agent: Option<String>,
error: &Error,
) -> KrillResult<()> {
let error_response = Self::error_to_error_res(error);
self.update_ca_child_status(ca, child, |status| {
status.set_failure(user_agent, error_response)
})
}
pub fn set_child_suspended(
&self,
ca: &CaHandle,
child: &ChildHandle,
) -> KrillResult<()> {
self.update_ca_child_status(ca, child, |status| {
status.set_suspended()
})
}
pub fn remove_child(
&self,
ca: &CaHandle,
child: &ChildHandle,
) -> KrillResult<()> {
let mut cache = self.cache.write().unwrap();
if let Some(ca_status) = cache.get_mut(ca) {
let child_status = ca_status.children.remove(child);
if child_status.is_some() {
trace!("Child status for {} was found", child);
self.store.drop_key(
Some(&Self::scope(ca)), &Self::child_status_key(child)
)?;
}
}
Ok(())
}
pub fn remove_ca(&self, ca: &CaHandle) -> KrillResult<()> {
self.cache.write().unwrap().remove(ca);
let _ = self.store.drop_scope(&Self::scope(ca));
Ok(())
}
pub fn set_status_repo_failure(
&self,
ca: &CaHandle,
uri: ServiceUri,
error: &Error,
) -> KrillResult<()> {
let error_response = Self::error_to_error_res(error);
self.update_repo_status(ca, |status| {
status.set_failure(uri, error_response)
})
}
pub fn set_status_repo_success(
&self,
ca: &CaHandle,
uri: ServiceUri,
) -> KrillResult<()> {
self.update_repo_status(ca, |status| status.set_last_updated(uri))
}
pub fn set_status_repo_published(
&self,
ca: &CaHandle,
uri: ServiceUri,
delta: PublishDelta,
) -> KrillResult<()> {
self.update_repo_status(ca, |status| {
status.update_published(uri, delta)
})
}
fn update_repo_status<F: FnOnce(&mut RepoStatus)>(
&self, ca: &CaHandle, op: F
) -> KrillResult<()> {
let mut cache = self.cache.write().unwrap();
if !cache.contains_key(ca) {
cache.insert(ca.clone(), CaStatus::default());
}
let ca_status = cache.get_mut(ca).unwrap();
op(&mut ca_status.repo);
self.store.store(
Some(&Self::scope(ca)), Self::repo_status_key(),
ca_status.repo(),
)?;
Ok(())
}
fn update_ca_child_status<F: FnOnce(&mut ChildStatus)>(
&self,
ca: &CaHandle,
child: &ChildHandle,
op: F,
) -> KrillResult<()> {
let mut cache = self.cache.write().unwrap();
if !cache.contains_key(ca) {
cache.insert(ca.clone(), CaStatus::default());
}
let ca_status = cache.get_mut(ca).unwrap();
if !ca_status.children.contains_key(child) {
ca_status.children.insert(
child.clone(), ChildStatus::default()
);
}
let child_status = ca_status.children.get_mut(child).unwrap();
op(child_status);
self.store.store(
Some(&Self::scope(ca)), &Self::child_status_key(child),
child_status
)?;
Ok(())
}
fn update_ca_parent_status<F: FnOnce(&mut ParentStatus)>(
&self,
ca: &CaHandle,
parent: &ParentHandle,
op: F,
) -> KrillResult<()> {
let mut cache = self.cache.write().unwrap();
if !cache.contains_key(ca) {
cache.insert(ca.clone(), CaStatus::default());
}
let ca_status = cache.get_mut(ca).unwrap();
let parent_status = ca_status.parents.get_or_default_mut(parent);
op(parent_status);
self.store.store(
Some(&Self::scope(ca)), &Self::parent_status_key(parent),
&parent_status
)?;
Ok(())
}
fn error_to_error_res(error: &Error) -> ErrorResponse {
match error {
Error::HttpClientError(
httpclient::Error::ErrorResponseWithJson(_, _, res),
) => *res.clone(),
_ => error.to_error_response(),
}
}
}
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct CaStatus {
repo: RepoStatus,
parents: ParentStatuses,
#[serde(
skip_serializing_if = "HashMap::is_empty",
default = "HashMap::new"
)]
children: HashMap<ChildHandle, ChildStatus>,
}
impl CaStatus {
pub fn get_children_connection_stats(&self) -> ChildrenConnectionStats {
ChildrenConnectionStats {
children: self.children().iter().map(|(handle, status)| {
ChildConnectionStats {
handle: handle.clone(),
last_exchange: status.last_exchange.clone(),
state: status.child_state(),
}
}).collect()
}
}
pub fn repo(&self) -> &RepoStatus {
&self.repo
}
pub fn into_repo(self) -> RepoStatus {
self.repo
}
pub fn parents(&self) -> &ParentStatuses {
&self.parents
}
pub fn into_parents(self) -> ParentStatuses {
self.parents
}
pub fn children(&self) -> &HashMap<ChildHandle, ChildStatus> {
&self.children
}
}