use std::env;
use std::collections::HashMap;
use std::process;
use std::sync::Arc;
use k2db::{K2Db, RatatouilleLogger, Scope, ScopedK2Db};
use serde_json::json;
use tokio::sync::{Mutex, RwLock};
use crate::api_error::ApiError;
use crate::config::AppConfig;
use crate::telemetry::Telemetry;
#[derive(Debug, Clone)]
pub struct DbManager {
config: Arc<AppConfig>,
db_by_name: Arc<RwLock<HashMap<String, K2Db>>>,
init_lock: Arc<Mutex<()>>,
telemetry: Telemetry,
}
impl DbManager {
pub fn new(config: Arc<AppConfig>) -> Self {
Self {
config,
db_by_name: Arc::new(RwLock::new(HashMap::new())),
init_lock: Arc::new(Mutex::new(())),
telemetry: Telemetry::from_env("k2db-api-rust", None, "k2db-api*"),
}
}
pub async fn get_db(&self, database: &str) -> Result<K2Db, ApiError> {
let database = database.trim();
if database.is_empty() {
return Err(ApiError::internal(
"Database name is required",
"t-dbmanager-database-required-001",
));
}
if let Some(existing) = self.db_by_name.read().await.get(database).cloned() {
return Ok(existing);
}
let _guard = self.init_lock.lock().await;
if let Some(existing) = self.db_by_name.read().await.get(database).cloned() {
return Ok(existing);
}
let db = K2Db::new(self.config.database_config(database)).map_err(ApiError::from_k2db)?;
if let Some(logger) = build_ringtail_logger()? {
db.set_logger(logger).map_err(ApiError::from_k2db)?;
}
db.init().await.map_err(ApiError::from_k2db)?;
self.telemetry.emit(
"k2db-api:info",
"k2db-api:database_ready",
json!({
"database": database,
"ownership_mode": format!("{:?}", db.config().ownership_mode).to_lowercase(),
"slow_query_ms": db.config().slow_query_ms,
}),
);
self.db_by_name
.write()
.await
.insert(database.to_owned(), db.clone());
Ok(db)
}
pub async fn scoped_db(&self, database: &str, scope_header: &str) -> Result<ScopedK2Db, ApiError> {
let scope = Scope::from_legacy(scope_header).ok_or_else(|| {
ApiError::bad_request("Scope header is required", "t-scope-required-001")
})?;
Ok(self.get_db(database).await?.with_scope(scope))
}
pub async fn warm_databases(&self) -> Result<(), ApiError> {
for database in self.config.tenant_databases() {
self.get_db(&database).await?;
}
Ok(())
}
pub async fn is_db_healthy(&self, database: &str) -> bool {
let Some(db) = self.db_by_name.read().await.get(database).cloned() else {
return false;
};
db.is_healthy().await
}
}
fn build_ringtail_logger() -> Result<Option<RatatouilleLogger>, ApiError> {
let url = env::var("RINGTAIL_URL").ok().map(|value| value.trim().to_owned());
let Some(url) = url.filter(|value| !value.is_empty()) else {
return Ok(None);
};
let token = env::var("RINGTAIL_TOKEN")
.ok()
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty());
let app = env::var("RATATOUILLE_APP")
.ok()
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "k2db-api-rust".to_owned());
let where_value = env::var("RATATOUILLE_WHERE")
.ok()
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty())
.unwrap_or_else(|| "start".to_owned());
let instance = env::var("RATATOUILLE_INSTANCE")
.ok()
.map(|value| value.trim().to_owned())
.filter(|value| !value.is_empty())
.unwrap_or_else(|| format!("pid:{}", process::id()));
let sink = k2db::ratatouille::HttpSink::new(k2db::ratatouille::HttpSinkConfig {
url,
token,
user_agent: Some("k2db-api-rust/telemetry".to_owned()),
})
.map_err(|error| {
ApiError::internal(
format!("invalid Ringtail telemetry config: {error}"),
"t-ringtail-config-invalid-001",
)
})?;
Ok(Some(RatatouilleLogger::with_sink(
k2db::ratatouille::LoggerConfig {
filter: Some("k2db*".to_owned()),
format: k2db::ratatouille::Format::Ndjson,
source: k2db::ratatouille::SourceIdentity {
app: Some(app),
r#where: Some(where_value),
instance: Some(instance),
},
..k2db::ratatouille::LoggerConfig::default()
},
sink,
)))
}