use std::{
collections::{HashMap, HashSet},
path::PathBuf,
sync::Arc,
};
use cosmian_kmip::{
kmip_0::kmip_types::State,
kmip_2_1::{kmip_attributes::Attributes, kmip_objects::Object},
};
use cosmian_kms_interfaces::{AtomicOperation, ObjectWithMetadata, ObjectsStore, SessionParams};
use crate::{
Database,
error::{DbError, DbResult},
};
impl Database {
#[allow(dead_code)]
pub async fn register_objects_store(
&self,
prefix: &str,
objects_store: Arc<dyn ObjectsStore + Sync + Send>,
) {
let mut map = self.objects.write().await;
map.insert(prefix.to_owned(), objects_store);
}
#[allow(dead_code)]
pub async fn unregister_object_store(&self, prefix: Option<&str>) {
let mut map = self.objects.write().await;
map.remove(prefix.unwrap_or(""));
}
async fn get_object_store(&self, uid: &str) -> DbResult<Arc<dyn ObjectsStore + Sync + Send>> {
let splits = uid.split_once("::");
Ok(match splits {
Some((prefix, _rest)) => self
.objects
.read()
.await
.get(prefix)
.ok_or_else(|| {
DbError::InvalidRequest(format!(
"No object store available for UIDs prefixed with: {prefix}"
))
})?
.clone(),
None => self
.objects
.read()
.await
.get("")
.ok_or_else(|| {
DbError::InvalidRequest("No default object store available".to_owned())
})?
.clone(),
})
}
pub async fn filename(&self, group_id: u128) -> Option<PathBuf> {
self.get_object_store("")
.await
.ok()
.and_then(|db| db.filename(group_id))
}
pub async fn create(
&self,
uid: Option<String>,
owner: &str,
object: &Object,
attributes: &Attributes,
tags: &HashSet<String>,
params: Option<Arc<dyn SessionParams>>,
) -> DbResult<String> {
let db = self
.get_object_store(uid.clone().unwrap_or_default().as_str())
.await?;
let uid = db
.create(uid, owner, object, attributes, tags, params)
.await?;
self.unwrapped_cache.validate_cache(&uid, object).await;
Ok(uid)
}
pub async fn retrieve_objects(
&self,
uid_or_tags: &str,
params: Option<Arc<dyn SessionParams>>,
) -> DbResult<HashMap<String, ObjectWithMetadata>> {
let uids = if uid_or_tags.starts_with('[') {
let tags: HashSet<String> = serde_json::from_str(uid_or_tags)?;
self.list_uids_for_tags(&tags, params.clone()).await?
} else {
HashSet::from([uid_or_tags.to_owned()])
};
let mut results: HashMap<String, ObjectWithMetadata> = HashMap::new();
for uid in &uids {
let owm = self.retrieve_object(uid, params.clone()).await?;
if let Some(owm) = owm {
results.insert(uid.to_owned(), owm);
}
}
Ok(results)
}
pub async fn retrieve_object(
&self,
uid: &str,
params: Option<Arc<dyn SessionParams>>,
) -> DbResult<Option<ObjectWithMetadata>> {
let db = self.get_object_store(uid).await?;
Ok(db.retrieve(uid, params).await?)
}
pub async fn retrieve_tags(
&self,
uid: &str,
params: Option<Arc<dyn SessionParams>>,
) -> DbResult<HashSet<String>> {
let db = self.get_object_store(uid).await?;
Ok(db.retrieve_tags(uid, params).await?)
}
pub async fn update_object(
&self,
uid: &str,
object: &Object,
attributes: &Attributes,
tags: Option<&HashSet<String>>,
params: Option<Arc<dyn SessionParams>>,
) -> DbResult<()> {
let db = self.get_object_store(uid).await?;
db.update_object(uid, object, attributes, tags, params)
.await?;
self.unwrapped_cache.validate_cache(uid, object).await;
Ok(())
}
pub async fn update_state(
&self,
uid: &str,
state: State,
params: Option<Arc<dyn SessionParams>>,
) -> DbResult<()> {
let db = self.get_object_store(uid).await?;
Ok(db.update_state(uid, state, params).await?)
}
pub async fn delete(&self, uid: &str, params: Option<Arc<dyn SessionParams>>) -> DbResult<()> {
let db = self.get_object_store(uid).await?;
db.delete(uid, params).await?;
self.unwrapped_cache.clear_cache(uid).await;
Ok(())
}
pub async fn is_object_owned_by(
&self,
uid: &str,
owner: &str,
params: Option<Arc<dyn SessionParams>>,
) -> DbResult<bool> {
let db = self.get_object_store(uid).await?;
Ok(db.is_object_owned_by(uid, owner, params).await?)
}
pub async fn list_uids_for_tags(
&self,
tags: &HashSet<String>,
params: Option<Arc<dyn SessionParams>>,
) -> DbResult<HashSet<String>> {
let db_map = self.objects.read().await;
let mut results = HashSet::new();
for (_prefix, db) in db_map.iter() {
results.extend(db.list_uids_for_tags(tags, params.clone()).await?);
}
Ok(results)
}
pub async fn find(
&self,
researched_attributes: Option<&Attributes>,
state: Option<State>,
user: &str,
user_must_be_owner: bool,
params: Option<Arc<dyn SessionParams>>,
) -> DbResult<Vec<(String, State, Attributes)>> {
let map = self.objects.read().await;
let mut results: Vec<(String, State, Attributes)> = Vec::new();
for (_prefix, db) in map.iter() {
results.extend(
db.find(
researched_attributes,
state,
user,
user_must_be_owner,
params.clone(),
)
.await
.unwrap_or(vec![]),
);
}
Ok(results)
}
pub async fn atomic(
&self,
user: &str,
operations: &[AtomicOperation],
params: Option<Arc<dyn SessionParams>>,
) -> DbResult<Vec<String>> {
if operations.is_empty() {
return Ok(vec![]);
}
#[expect(clippy::indexing_slicing)]
let first_op = &operations[0];
let first_uid = first_op.get_object_uid();
let db = self.get_object_store(first_uid).await?;
let ids = db.atomic(user, operations, params).await?;
for op in operations {
match op {
AtomicOperation::Create((uid, object, ..))
| AtomicOperation::UpdateObject((uid, object, ..))
| AtomicOperation::Upsert((uid, object, ..)) => {
self.unwrapped_cache.validate_cache(uid, object).await;
}
AtomicOperation::Delete(uid) => {
self.unwrapped_cache.clear_cache(uid).await;
}
AtomicOperation::UpdateState(_) => {}
}
}
Ok(ids)
}
}