mod api_v1;
mod audit_jsonl;
mod auth_routes;
mod config;
mod event_log_store;
mod health;
mod jwt_key_boot;
mod metrics;
mod middleware;
mod state;
mod stubs;
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use std::time::Duration;
use anyhow::Context;
use clap::Parser;
use tokio::signal::unix::{signal, SignalKind};
use tracing::{error, info};
use tracing_subscriber::EnvFilter;
use std::sync::Arc;
use crate::config::ServerConfig;
use crate::state::AppState;
use gradatum_auth::revocation::boot_guard_check;
use gradatum_db_sqlite::{run_migrations, SqliteQueueStore};
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions};
#[derive(Parser, Debug)]
#[command(version, about = "gradatum-server façade HTTP/MCP")]
struct Cli {
#[arg(long)]
config: Option<PathBuf>,
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
let cfg = ServerConfig::load(cli.config.as_deref())
.map_err(|e| anyhow::anyhow!("échec chargement config : {e}"))?;
init_tracing(&cfg.log.format);
info!(
bind = %cfg.server.bind,
metrics_bind = %cfg.server.metrics_bind,
version = env!("CARGO_PKG_VERSION"),
"gradatum-server démarrage"
);
if !cfg.server.metrics_bind.ip().is_loopback() {
anyhow::bail!(
"metrics_bind doit être loopback (caveat C7) : adresse refusée = {}",
cfg.server.metrics_bind
);
}
let bind_is_loopback = cfg.server.bind.ip().is_loopback();
boot_guard_check(bind_is_loopback, &cfg.auth.revocation_store)
.map_err(|e| anyhow::anyhow!("{e}"))?;
let jwt_key_dir = {
let default_dir = cfg.storage.root.join("config");
let derived = cfg
.auth
.jwt_private_key_path
.parent()
.map(|p| p.to_path_buf());
match derived {
Some(ref parent) if parent.starts_with(&cfg.storage.root) => parent.clone(),
_ => default_dir,
}
};
tokio::fs::create_dir_all(&jwt_key_dir)
.await
.with_context(|| format!("création du répertoire clé JWT: {}", jwt_key_dir.display()))?;
tokio::fs::set_permissions(&jwt_key_dir, std::fs::Permissions::from_mode(0o700))
.await
.with_context(|| {
format!(
"chmod 0o700 du répertoire clé JWT: {}",
jwt_key_dir.display()
)
})?;
let jwt_kid = "gradatum-v0".to_string();
let jwt_service = crate::jwt_key_boot::load_or_generate_jwt_key(
&jwt_key_dir,
jwt_kid,
"gradatum".to_string(),
cfg.auth.jwt_ttl_human_secs,
cfg.auth.jwt_ttl_service_secs,
)
.context("chargement ou génération de la clé de signature JWT")?;
if cfg.storage.legacy_alias_used() {
tracing::warn!(
"[storage].db_path is deprecated, use vault_index_path. \
Retrait prévu en alpha.7+1. Voir CHANGELOG v0.1.0-alpha.7."
);
}
let queue_path = cfg.storage.root.join("db/queue.sqlite");
let vault_path = cfg.storage.root.join("vault");
let search_path = cfg
.storage
.root
.join("vault")
.join(".gradatum")
.join("index.db");
let revocation_db_path = cfg
.auth
.revocation_db_path
.clone()
.unwrap_or_else(|| cfg.storage.root.join("db/revocation.sqlite"));
let state = AppState::with_jwt(jwt_service)
.with_queue_path(&queue_path)
.await
.context("queue init failed")?
.with_vault_path(&vault_path)
.await
.context("vault init failed")?
.with_search_path(&search_path)
.await
.context("search index init failed")?;
let state = if cfg.auth.revocation_store == "sqlite" {
if let Some(parent) = revocation_db_path.parent() {
tokio::fs::create_dir_all(parent).await.with_context(|| {
format!("création du répertoire revocation db: {}", parent.display())
})?;
}
let state = state
.with_revocation_path(&revocation_db_path)
.await
.context("revocation store init failed")?;
tracing::info!(
path = %revocation_db_path.display(),
"SqliteRevocationStore ready"
);
state
} else {
tracing::warn!(
store = %cfg.auth.revocation_store,
"revocation_store non-sqlite — InMemoryRevocationStore actif (DEV ONLY)"
);
state
};
let api_keys_db_path = cfg
.auth
.api_keys_db_path
.clone()
.unwrap_or_else(|| cfg.storage.root.join("db/api_keys.sqlite"));
if let Some(parent) = api_keys_db_path.parent() {
tokio::fs::create_dir_all(parent)
.await
.with_context(|| format!("création du répertoire api_keys db: {}", parent.display()))?;
}
let state = state
.with_api_keys_path(&api_keys_db_path)
.await
.context("api_keys store init failed")?;
tracing::info!(
path = %api_keys_db_path.display(),
"SqliteApiKeyStore ready"
);
let state = state.with_acl_preset_path(&cfg.acl.preset_path);
let jobs_db_path = cfg.storage.root.join("db/queue.sqlite");
if let Some(parent) = jobs_db_path.parent() {
tokio::fs::create_dir_all(parent)
.await
.with_context(|| format!("création répertoire jobs db: {}", parent.display()))?;
}
let jobs_opts = SqliteConnectOptions::new()
.filename(&jobs_db_path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.busy_timeout(std::time::Duration::from_secs(5));
let jobs_pool = SqlitePoolOptions::new()
.max_connections(4)
.connect_with(jobs_opts)
.await
.context("jobs pool init failed")?;
run_migrations(&jobs_pool)
.await
.context("jobs migrations failed")?;
let job_store = Arc::new(SqliteQueueStore::new(jobs_pool.clone()));
let state = state.with_job_store(job_store, jobs_pool);
tracing::info!(
path = %jobs_db_path.display(),
"SqliteQueueStore (F-16) ready"
);
let state = if cfg.embed.enabled {
let embedder =
gradatum_embed::HttpEmbedder::new(&cfg.embed.endpoint, &cfg.embed.model, cfg.embed.dim)
.with_timeout(std::time::Duration::from_millis(cfg.embed.timeout_ms));
tracing::info!(
endpoint = %cfg.embed.endpoint,
model = %cfg.embed.model,
dim = cfg.embed.dim,
timeout_ms = cfg.embed.timeout_ms,
"embedder HTTP wired (Phase 2.1.1)"
);
state.with_embedder(Arc::new(embedder))
} else {
tracing::warn!("embed.enabled=false — Noop embedder actif (aucun embedding généré)");
state
};
let state = state
.with_event_log_path(&search_path)
.await
.context("EventLogStore init failed")?;
tracing::info!(
path = %search_path.display(),
"EventLogStore (B1) câblé sur index.db"
);
{
let retention_cfg = cfg.event_log.clone();
let event_log_store = state
.event_log
.clone()
.expect("EventLogStore câblé — invariant post with_event_log_path");
let metrics = state.metrics.clone();
tokio::spawn(async move {
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::time::{interval, Duration, MissedTickBehavior};
let interval_secs = retention_cfg.purge_interval_secs.max(60);
let mut ticker = interval(Duration::from_secs(interval_secs));
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
ticker.tick().await;
loop {
ticker.tick().await;
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let retention_ms = (retention_cfg.retention_days as i64) * 86_400_000;
let cutoff_ms = now_ms - retention_ms;
match event_log_store
.purge(cutoff_ms, retention_cfg.max_rows)
.await
{
Ok(purged) => {
tracing::info!(
purged = purged,
retention_days = retention_cfg.retention_days,
max_rows = retention_cfg.max_rows,
"event_log rétention : purge terminée"
);
}
Err(e) => {
tracing::warn!(error = %e, "event_log purge échouée — non fatal");
}
}
match event_log_store.count().await {
Ok(count) => {
metrics.event_log_rows.set(count as i64);
}
Err(e) => {
tracing::warn!(error = %e, "event_log count échoué — gauge non mise à jour");
}
}
}
});
}
tracing::info!(queue_path = %queue_path.display(), "SqliteQueue ready");
tracing::info!(vault_path = %vault_path.display(), "Vault ready");
tracing::info!(search_path = %search_path.display(), "SqliteIndex (FTS5) ready");
tracing::info!(
enabled = cfg.ratelimit.enabled,
per_minute = cfg.ratelimit.per_minute,
burst = cfg.ratelimit.burst,
exempt_localhost = cfg.ratelimit.exempt_localhost,
"rate limiting V3"
);
let app = build_router(state.clone(), &cfg.ratelimit);
let listener = tokio::net::TcpListener::bind(cfg.server.bind).await?;
let actual_addr = listener
.local_addr()
.expect("obtenir l'adresse locale après bind — le listener est actif");
info!(addr = %actual_addr, "serveur en écoute");
#[cfg(target_os = "linux")]
if let Err(e) = sd_notify::notify(false, &[sd_notify::NotifyState::Ready]) {
tracing::debug!(error = %e, "sd_notify ready ignoré (exécution hors systemd)");
}
let metrics_bind = cfg.server.metrics_bind;
let app_metrics = state.metrics.clone();
tokio::spawn(async move {
if let Err(e) = metrics::spawn_metrics_listener(metrics_bind, app_metrics).await {
error!(error = %e, "metrics listener arrêté avec erreur");
}
});
let shutdown = shutdown_signal();
axum::serve(
listener,
app.into_make_service_with_connect_info::<std::net::SocketAddr>(),
)
.with_graceful_shutdown(shutdown)
.await
.map_err(|e| {
error!(error = %e, "serveur arrêté avec erreur");
anyhow::anyhow!("erreur axum serve : {e}")
})?;
info!("gradatum-server arrêté proprement");
Ok(())
}
fn init_tracing(format: &str) {
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let registry = tracing_subscriber::fmt().with_env_filter(filter);
if format == "json" {
registry.json().init();
} else {
registry.pretty().init();
}
}
fn build_router(state: AppState, rl: &crate::config::RateLimitConfig) -> axum::Router {
use axum::{middleware, routing::get, Router};
let authed =
Router::new()
.nest("/api/v1", api_v1::router())
.layer(middleware::from_fn_with_state(
state.clone(),
crate::middleware::auth_middleware,
));
let auth_exchange = auth_routes::router();
let (authed, auth_exchange) = match crate::middleware::build_warden_layer(rl) {
Some(warden) => (authed.layer(warden.clone()), auth_exchange.layer(warden)),
None => (authed, auth_exchange),
};
let unauthed = Router::new()
.route("/health", get(health::handler))
.merge(auth_exchange);
authed.merge(unauthed).with_state(state)
}
async fn shutdown_signal() {
let mut sigterm =
signal(SignalKind::terminate()).expect("installer le handler SIGTERM — OS UNIX requis");
let mut sigint =
signal(SignalKind::interrupt()).expect("installer le handler SIGINT — OS UNIX requis");
tokio::select! {
_ = sigterm.recv() => info!("SIGTERM reçu, drain en cours"),
_ = sigint.recv() => info!("SIGINT reçu, drain en cours"),
}
tokio::time::sleep(Duration::from_millis(50)).await;
info!("signal d'arrêt traité");
}