mod database_objects;
mod database_permissions;
use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
#[cfg(feature = "non-fips")]
use cosmian_kms_crypto::reexport::cosmian_crypto_core::Secret;
use cosmian_kms_interfaces::{ObjectsStore, PermissionsStore};
#[cfg(feature = "non-fips")]
use redis::AsyncCommands;
use tokio::sync::RwLock;
use crate::error::DbResult;
mod main_db_params;
pub use main_db_params::{AdditionalObjectStoresParams, MainDbParams};
mod unwrapped_cache;
pub use crate::core::unwrapped_cache::{CachedObject, UnwrappedCache};
#[cfg(feature = "non-fips")]
use crate::stores::RedisWithFindex;
use crate::stores::{MySqlPool, PgPool, SqlitePool};
pub struct Database {
objects: RwLock<HashMap<String, Arc<dyn ObjectsStore + Sync + Send>>>,
permissions: Arc<dyn PermissionsStore + Sync + Send>,
unwrapped_cache: UnwrappedCache,
kind: MainDbKind,
health: Arc<dyn DatabaseHealth + Sync + Send>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum MainDbKind {
Sqlite,
Postgres,
Mysql,
#[cfg(feature = "non-fips")]
RedisFindex,
}
#[async_trait]
trait DatabaseHealth {
async fn check(&self) -> Result<(), String>;
}
impl Database {
pub async fn instantiate(
main_db_params: &MainDbParams,
clear_db_on_start: bool,
object_stores: HashMap<String, Arc<dyn ObjectsStore + Sync + Send>>,
cache_max_age: Duration,
) -> DbResult<Self> {
let db = Self::instantiate_main_database(main_db_params, clear_db_on_start, cache_max_age)
.await?;
for (prefix, store) in object_stores {
db.register_objects_store(&prefix, store).await;
}
Ok(db)
}
async fn instantiate_main_database(
main_db_params: &MainDbParams,
clear_db_on_start: bool,
cache_max_age: Duration,
) -> DbResult<Self> {
match main_db_params {
MainDbParams::Sqlite(db_path, max_conns) => {
let db = Arc::new(
SqlitePool::instantiate(&db_path.join("kms.db"), clear_db_on_start, *max_conns)
.await?,
);
let health = Arc::new(SqliteHealthProbe::new(db.clone()));
Ok(Self::new(
db.clone(),
db,
cache_max_age,
MainDbKind::Sqlite,
health,
))
}
MainDbParams::Postgres(url, max_conns) => {
let db = Arc::new(PgPool::instantiate(url, clear_db_on_start, *max_conns).await?);
let health = Arc::new(PgHealthProbe::new(db.clone()));
Ok(Self::new(
db.clone(),
db,
cache_max_age,
MainDbKind::Postgres,
health,
))
}
MainDbParams::Mysql(url, max_conns) => {
let db = Arc::new(
MySqlPool::instantiate(url.as_str(), clear_db_on_start, *max_conns).await?,
);
let health = Arc::new(MySqlHealthProbe::new(db.clone()));
Ok(Self::new(
db.clone(),
db,
cache_max_age,
MainDbKind::Mysql,
health,
))
}
#[cfg(feature = "non-fips")]
MainDbParams::RedisFindex(url, master_key) => {
use cosmian_kms_crypto::reexport::cosmian_crypto_core::FixedSizeCBytes;
use crate::stores::REDIS_WITH_FINDEX_MASTER_KEY_LENGTH;
let new_master_key =
Secret::<REDIS_WITH_FINDEX_MASTER_KEY_LENGTH>::from_unprotected_bytes(
&mut master_key.to_bytes(),
);
let db = Arc::new(
RedisWithFindex::instantiate(url.as_str(), new_master_key, clear_db_on_start)
.await?,
);
let health = Arc::new(RedisFindexHealthProbe::new(db.clone()));
Ok(Self::new(
db.clone(),
db,
cache_max_age,
MainDbKind::RedisFindex,
health,
))
}
}
}
pub const fn unwrapped_cache(&self) -> &UnwrappedCache {
&self.unwrapped_cache
}
fn new(
default_objects_database: Arc<dyn ObjectsStore + Sync + Send>,
permissions_database: Arc<dyn PermissionsStore + Sync + Send>,
cache_max_age: Duration,
kind: MainDbKind,
health: Arc<dyn DatabaseHealth + Sync + Send>,
) -> Self {
Self {
objects: RwLock::new(HashMap::from([(String::new(), default_objects_database)])),
permissions: permissions_database,
unwrapped_cache: UnwrappedCache::new(cache_max_age),
kind,
health,
}
}
#[must_use]
pub const fn main_db_kind(&self) -> MainDbKind {
self.kind
}
pub async fn health_check(&self) -> Result<(), String> {
self.health.check().await
}
}
struct SqliteHealthProbe {
store: Arc<SqlitePool>,
}
impl SqliteHealthProbe {
#[allow(clippy::missing_const_for_fn)]
fn new(store: Arc<SqlitePool>) -> Self {
Self { store }
}
}
#[async_trait]
impl DatabaseHealth for SqliteHealthProbe {
async fn check(&self) -> Result<(), String> {
self.store.health_check().await.map_err(|e| e.to_string())
}
}
struct PgHealthProbe {
store: Arc<PgPool>,
}
impl PgHealthProbe {
#[allow(clippy::missing_const_for_fn)]
fn new(store: Arc<PgPool>) -> Self {
Self { store }
}
}
#[async_trait]
impl DatabaseHealth for PgHealthProbe {
async fn check(&self) -> Result<(), String> {
self.store.health_check().await.map_err(|e| e.to_string())
}
}
struct MySqlHealthProbe {
store: Arc<MySqlPool>,
}
impl MySqlHealthProbe {
#[allow(clippy::missing_const_for_fn)]
fn new(store: Arc<MySqlPool>) -> Self {
Self { store }
}
}
#[async_trait]
impl DatabaseHealth for MySqlHealthProbe {
async fn check(&self) -> Result<(), String> {
self.store.health_check().await.map_err(|e| e.to_string())
}
}
#[cfg(feature = "non-fips")]
struct RedisFindexHealthProbe {
store: Arc<RedisWithFindex>,
}
#[cfg(feature = "non-fips")]
impl RedisFindexHealthProbe {
#[allow(clippy::missing_const_for_fn)]
fn new(store: Arc<RedisWithFindex>) -> Self {
Self { store }
}
}
#[cfg(feature = "non-fips")]
#[async_trait::async_trait]
impl DatabaseHealth for RedisFindexHealthProbe {
async fn check(&self) -> Result<(), String> {
let mut mgr = self.store.mgr.clone();
let pong: String = mgr.ping().await.map_err(|e| e.to_string())?;
#[allow(clippy::manual_ignore_case_cmp)]
if pong.eq_ignore_ascii_case("PONG") {
Ok(())
} else {
Err(format!("unexpected redis ping response: {pong}"))
}
}
}