use std::sync::Arc;
use std::time::{Duration, Instant};
use axum::{extract::State, http::StatusCode, response::IntoResponse, Json};
use serde::Serialize;
use tokio::sync::Mutex;
use crate::QueryState;
use kyma_core::catalog::{Catalog, ColumnInfo};
#[derive(Serialize, Clone)]
pub struct SchemaDoc {
pub databases: Vec<DatabaseDoc>,
}
#[derive(Serialize, Clone)]
pub struct DatabaseDoc {
pub name: String,
pub tables: Vec<TableDoc>,
}
#[derive(Serialize, Clone)]
pub struct TableDoc {
pub name: String,
pub columns: Vec<ColumnInfo>,
}
const DEFAULT_TTL: Duration = Duration::from_secs(5);
pub struct SchemaCache {
inner: Mutex<Option<(Instant, Arc<SchemaDoc>)>>,
pub ttl: Duration,
}
impl Default for SchemaCache {
fn default() -> Self {
Self::new(DEFAULT_TTL)
}
}
impl SchemaCache {
pub fn new(ttl: Duration) -> Self {
Self {
inner: Mutex::new(None),
ttl,
}
}
pub fn from_env() -> Self {
let ttl = std::env::var("KYMA_SCHEMA_CACHE_TTL_SECS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.map(Duration::from_secs)
.unwrap_or(DEFAULT_TTL);
Self::new(ttl)
}
}
pub async fn schema_handler(State(state): State<QueryState>) -> impl IntoResponse {
let cache = state.schema_cache.clone();
let ttl = cache.ttl;
let cached = {
let guard = cache.inner.lock().await;
guard.as_ref().cloned()
};
if let Some((t, doc)) = &cached {
if t.elapsed() < ttl {
return (StatusCode::OK, Json((**doc).clone())).into_response();
}
}
let doc = match build(&*state.catalog).await {
Ok(d) => Arc::new(d),
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error":{"code":"catalog","message": e.to_string()}})),
)
.into_response();
}
};
{
let mut guard = cache.inner.lock().await;
*guard = Some((Instant::now(), doc.clone()));
}
(StatusCode::OK, Json((*doc).clone())).into_response()
}
async fn build(catalog: &dyn Catalog) -> Result<SchemaDoc, kyma_core::errors::CatalogError> {
let mut out = SchemaDoc {
databases: Vec::new(),
};
for db in catalog.list_databases().await? {
let mut tables = Vec::new();
for tbl in catalog.list_tables(&db).await? {
let cols = catalog.get_table_columns(&db, &tbl).await?;
tables.push(TableDoc {
name: tbl,
columns: cols,
});
}
out.databases.push(DatabaseDoc { name: db, tables });
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn zero_ttl_cache_is_always_cold() {
let cache = SchemaCache::new(Duration::ZERO);
{
let mut g = cache.inner.lock().await;
*g = Some((
Instant::now() - Duration::from_secs(1),
Arc::new(SchemaDoc { databases: vec![] }),
));
}
let cached = {
let g = cache.inner.lock().await;
g.as_ref().cloned()
};
if let Some((t, _)) = cached {
assert!(
!(t.elapsed() < cache.ttl),
"zero-ttl cache reported fresh — that is incorrect"
);
}
}
#[test]
fn default_ttl_is_five_seconds() {
let cache = SchemaCache::default();
assert_eq!(cache.ttl, Duration::from_secs(5));
}
#[test]
fn custom_ttl_is_stored() {
let cache = SchemaCache::new(Duration::from_secs(60));
assert_eq!(cache.ttl, Duration::from_secs(60));
}
}