use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use cosmian_kmip::{
kmip_0::kmip_types::State,
kmip_2_1::{kmip_attributes::Attributes, kmip_objects::Object},
};
use cosmian_kms_interfaces::{AtomicOperation, ObjectWithMetadata, ObjectsStore};
use crate::{
Database,
error::{DbError, DbResult},
};
impl Database {
#[allow(dead_code)]
pub async fn register_objects_store(
&self,
prefix: &str,
objects_store: Arc<dyn ObjectsStore + Sync + Send>,
) {
let mut map = self.objects.write().await;
map.insert(prefix.to_owned(), objects_store);
}
#[allow(dead_code)]
pub async fn unregister_object_store(&self, prefix: Option<&str>) {
let mut map = self.objects.write().await;
map.remove(prefix.unwrap_or(""));
}
async fn get_object_store(&self, uid: &str) -> DbResult<Arc<dyn ObjectsStore + Sync + Send>> {
let splits = uid.split_once("::");
Ok(match splits {
Some((prefix, _rest)) => self
.objects
.read()
.await
.get(prefix)
.ok_or_else(|| {
DbError::InvalidRequest(format!(
"No object store available for UIDs prefixed with: {prefix}"
))
})?
.clone(),
None => self
.objects
.read()
.await
.get("")
.ok_or_else(|| {
DbError::InvalidRequest("No default object store available".to_owned())
})?
.clone(),
})
}
pub async fn create(
&self,
uid: Option<String>,
owner: &str,
object: &Object,
attributes: &Attributes,
tags: &HashSet<String>,
) -> DbResult<String> {
let db = self
.get_object_store(uid.as_deref().unwrap_or_default())
.await?;
let uid = db.create(uid, owner, object, attributes, tags).await?;
self.unwrapped_cache.validate_cache(&uid, object).await?;
Ok(uid)
}
pub async fn retrieve_objects(
&self,
uid_or_tags: &str,
) -> DbResult<HashMap<String, ObjectWithMetadata>> {
let uids = if uid_or_tags.starts_with('[') {
let tags: HashSet<String> = serde_json::from_str(uid_or_tags)?;
self.list_uids_for_tags(&tags).await?
} else {
HashSet::from([uid_or_tags.to_owned()])
};
let mut results: HashMap<String, ObjectWithMetadata> = HashMap::new();
for uid in &uids {
let owm = self.retrieve_object(uid).await?;
if let Some(owm) = owm {
results.insert(uid.to_owned(), owm);
}
}
Ok(results)
}
pub async fn retrieve_object(&self, uid: &str) -> DbResult<Option<ObjectWithMetadata>> {
let db = self.get_object_store(uid).await?;
Ok(db.retrieve(uid).await?)
}
pub async fn retrieve_tags(&self, uid: &str) -> DbResult<HashSet<String>> {
let db = self.get_object_store(uid).await?;
Ok(db.retrieve_tags(uid).await?)
}
pub async fn update_object(
&self,
uid: &str,
object: &Object,
attributes: &Attributes,
tags: Option<&HashSet<String>>,
) -> DbResult<()> {
let db = self.get_object_store(uid).await?;
db.update_object(uid, object, attributes, tags).await?;
self.unwrapped_cache.validate_cache(uid, object).await?;
Ok(())
}
pub async fn update_state(&self, uid: &str, state: State) -> DbResult<()> {
let db = self.get_object_store(uid).await?;
Ok(db.update_state(uid, state).await?)
}
pub async fn delete(&self, uid: &str) -> DbResult<()> {
let db = self.get_object_store(uid).await?;
db.delete(uid).await?;
self.unwrapped_cache.clear_cache(uid).await;
Ok(())
}
pub async fn is_object_owned_by(&self, uid: &str, owner: &str) -> DbResult<bool> {
let db = self.get_object_store(uid).await?;
Ok(db.is_object_owned_by(uid, owner).await?)
}
pub async fn list_uids_for_tags(&self, tags: &HashSet<String>) -> DbResult<HashSet<String>> {
let db_map = self.objects.read().await;
let mut results = HashSet::new();
for (_prefix, db) in db_map.iter() {
results.extend(db.list_uids_for_tags(tags).await?);
}
Ok(results)
}
pub async fn find(
&self,
researched_attributes: Option<&Attributes>,
state: Option<State>,
user: &str,
user_must_be_owner: bool,
vendor_id: &str,
) -> DbResult<Vec<(String, State, Attributes)>> {
let map = self.objects.read().await;
let mut results: Vec<(String, State, Attributes)> = Vec::new();
for (_prefix, db) in map.iter() {
results.extend(
db.find(
researched_attributes,
state,
user,
user_must_be_owner,
vendor_id,
)
.await
.unwrap_or(vec![]),
);
}
Ok(results)
}
pub async fn atomic(
&self,
user: &str,
operations: &[AtomicOperation],
) -> DbResult<Vec<String>> {
if operations.is_empty() {
return Ok(vec![]);
}
#[expect(clippy::indexing_slicing)]
let first_op = &operations[0];
let first_uid = first_op.get_object_uid();
let db = self.get_object_store(first_uid).await?;
let ids = db.atomic(user, operations).await?;
for op in operations {
match op {
AtomicOperation::Create((uid, object, ..))
| AtomicOperation::UpdateObject((uid, object, ..))
| AtomicOperation::Upsert((uid, object, ..)) => {
self.unwrapped_cache.validate_cache(uid, object).await?;
}
AtomicOperation::Delete(uid) => {
self.unwrapped_cache.clear_cache(uid).await;
}
AtomicOperation::UpdateState(_) => {}
}
}
Ok(ids)
}
}