use axum::{
Router,
extract::DefaultBodyLimit,
routing::{delete, get, post, put},
};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use noetl_server::{
config::{AppConfig, DatabaseConfig, ShardingConfig},
db::{DbPool, DbPoolMap, create_pool},
handlers,
services::{
CatalogService, CredentialService, ExecutionService, KeychainService, ReplayService,
ResultStoreService, RuntimeService,
},
state::AppState,
};
fn init_tracing() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info,noetl_control_plane=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
}
#[allow(clippy::too_many_arguments)]
fn build_router(
state: AppState,
db_pool: DbPool,
catalog_service: CatalogService,
credential_service: CredentialService,
keychain_service: KeychainService,
execution_service: ExecutionService,
runtime_service: RuntimeService,
replay_service: ReplayService,
result_store_service: ResultStoreService,
wallet_cipher: noetl_server::crypto::EnvelopeCipher,
) -> Router {
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any);
let mut health_routes = Router::new()
.route("/health", get(handlers::health_check))
.route("/api/health", get(handlers::api_health))
.route("/api/pool/status", get(handlers::health::pool_status));
if !state.config.disable_metrics {
health_routes = health_routes.route("/metrics", get(handlers::health::metrics));
}
let health_routes = health_routes.with_state(state.clone());
let catalog_routes = Router::new()
.route("/api/catalog/register", post(handlers::catalog::register))
.route("/api/catalog/list", post(handlers::catalog::list))
.route(
"/api/catalog/resource",
post(handlers::catalog::get_resource),
)
.route("/api/catalog/{*tail}", get(handlers::catalog::ui_schema))
.with_state(catalog_service);
let credential_routes = Router::new()
.route(
"/api/credentials",
post(handlers::credentials::create_or_update),
)
.route("/api/credentials", get(handlers::credentials::list))
.route(
"/api/credentials/{identifier}",
get(handlers::credentials::get),
)
.route(
"/api/credentials/{identifier}",
delete(handlers::credentials::delete),
)
.with_state(credential_service.clone());
let sealed_credential_routes = Router::new()
.route(
"/api/credentials/{identifier}/sealed",
get(handlers::credentials::get_sealed),
)
.with_state(handlers::credentials::SealedCredentialDeps {
credentials: credential_service.clone(),
runtime: runtime_service.clone(),
});
let cross_region_routes = Router::new()
.route(
"/api/internal/cross-region/resolve",
post(handlers::cross_region::resolve),
)
.with_state(handlers::cross_region::CrossRegionDeps {
credentials: credential_service,
});
let wallet_rotate_service = noetl_server::services::wallet_rotate::WalletRotateService::new(
db_pool.clone(),
wallet_cipher.clone(),
);
let wallet_rotate_routes = Router::new()
.route(
"/api/internal/wallet/rotate-kek",
post(handlers::wallet_rotate::rotate_kek),
)
.route(
"/api/internal/wallet/key-status",
get(handlers::wallet_rotate::key_status),
)
.with_state(handlers::wallet_rotate::WalletRotateDeps {
service: wallet_rotate_service,
});
let secret_audit_routes = Router::new()
.route(
"/api/internal/secret-audit",
get(handlers::secret_audit::query),
)
.with_state(handlers::secret_audit::SecretAuditDeps {
pool: db_pool.clone(),
});
let container_callback_routes = Router::new()
.route(
"/api/internal/container-callback/{execution_id}/{step}",
post(handlers::container_callback::container_callback),
)
.with_state(state.clone());
let keychain_routes = Router::new()
.route(
"/api/keychain/{catalog_id}/{keychain_name}",
get(handlers::keychain::get),
)
.route(
"/api/keychain/{catalog_id}/{keychain_name}",
post(handlers::keychain::set),
)
.route(
"/api/keychain/{catalog_id}/{keychain_name}",
delete(handlers::keychain::delete),
)
.route(
"/api/keychain/catalog/{catalog_id}",
get(handlers::keychain::list_by_catalog),
)
.with_state(keychain_service);
let execution_routes = Router::new()
.route("/api/execute", post(handlers::execute))
.route("/api/events", post(handlers::handle_event))
.route(
"/api/events/batch",
post(handlers::events::handle_batch_events),
)
.route("/api/commands/{event_id}", get(handlers::get_command))
.route(
"/api/commands/{event_id}/claim",
post(handlers::events::claim_command),
)
.with_state(state.clone());
let executions_routes = Router::new()
.route("/api/executions", get(handlers::executions::list))
.route(
"/api/executions/{execution_id}",
get(handlers::executions::get),
)
.route(
"/api/executions/{execution_id}/status",
get(handlers::executions::get_status),
)
.route(
"/api/executions/{execution_id}/cancel",
post(handlers::executions::cancel),
)
.route(
"/api/executions/{execution_id}/cancellation-check",
get(handlers::executions::cancellation_check),
)
.route(
"/api/executions/{execution_id}/finalize",
post(handlers::executions::finalize),
)
.with_state(execution_service);
let replay_routes = Router::new()
.route("/api/replay/state", get(handlers::replay::replay_state))
.with_state(replay_service);
let result_store_routes = Router::new()
.route(
"/api/result/{execution_id}",
put(handlers::result_store::put_result),
)
.route(
"/api/result/resolve",
get(handlers::result_store::resolve_ref),
)
.layer(DefaultBodyLimit::max(64 * 1024 * 1024))
.with_state(handlers::result_store::ResultStoreDeps {
service: result_store_service,
});
let variable_routes = Router::new()
.route("/api/vars/{execution_id}", get(handlers::variables::list))
.route("/api/vars/{execution_id}", post(handlers::variables::set))
.route(
"/api/vars/{execution_id}",
delete(handlers::variables::cleanup),
)
.route(
"/api/vars/{execution_id}/{var_name}",
get(handlers::variables::get),
)
.route(
"/api/vars/{execution_id}/{var_name}",
delete(handlers::variables::delete_var),
)
.with_state(db_pool.clone());
let runtime_routes = Router::new()
.route(
"/api/worker/pool/register",
post(handlers::runtime::register_pool),
)
.route(
"/api/worker/pool/deregister",
delete(handlers::runtime::deregister_pool),
)
.route(
"/api/worker/pool/heartbeat",
post(handlers::runtime::heartbeat),
)
.route("/api/worker/pools", get(handlers::runtime::list_pools))
.with_state(runtime_service);
let sharding_routes = Router::new()
.route(
"/api/runtime/shard-info",
get(handlers::sharding::get_shard_info),
)
.with_state(state.clone());
let database_routes = Router::new()
.route(
"/api/postgres/execute",
post(handlers::database::execute_postgres),
)
.route("/api/db/init", post(handlers::database::init_database))
.route(
"/api/db/validate",
get(handlers::database::validate_database),
)
.with_state(db_pool.clone());
let internal_routes = Router::new()
.route(
"/api/internal/outbox/claim",
post(handlers::internal::outbox_claim),
)
.route(
"/api/internal/outbox/mark-published",
post(handlers::internal::outbox_mark_published),
)
.route(
"/api/internal/outbox/mark-failed",
post(handlers::internal::outbox_mark_failed),
)
.route(
"/api/internal/outbox/pending-count",
get(handlers::internal::outbox_pending_count),
)
.route(
"/api/internal/events/project",
post(handlers::internal::events_project),
)
.with_state(db_pool.clone());
let system_routes = Router::new()
.route("/api/status", get(handlers::system::get_status))
.route("/api/threads", get(handlers::system::get_threads))
.route(
"/api/profiler/status",
get(handlers::system::get_profiler_status),
)
.route(
"/api/profiler/memory/start",
post(handlers::system::start_memory_profiler),
)
.route(
"/api/profiler/memory/stop",
post(handlers::system::stop_memory_profiler),
)
.with_state(state);
let dashboard_routes = Router::new()
.route("/api/dashboard/stats", get(handlers::dashboard::get_stats))
.route(
"/api/dashboard/widgets",
get(handlers::dashboard::get_widgets),
)
.with_state(db_pool);
Router::new()
.merge(health_routes)
.merge(catalog_routes)
.merge(credential_routes)
.merge(sealed_credential_routes)
.merge(cross_region_routes)
.merge(wallet_rotate_routes)
.merge(secret_audit_routes)
.merge(container_callback_routes)
.merge(keychain_routes)
.merge(execution_routes)
.merge(executions_routes)
.merge(replay_routes)
.merge(result_store_routes)
.merge(variable_routes)
.merge(runtime_routes)
.merge(sharding_routes)
.merge(database_routes)
.merge(internal_routes)
.merge(system_routes)
.merge(dashboard_routes)
.layer(TraceLayer::new_for_http())
.layer(cors)
}
async fn connect_nats(config: &AppConfig) -> Option<async_nats::Client> {
let Some(ref nats_url) = config.nats_url else {
tracing::info!("NATS not configured, running without messaging");
return None;
};
let (clean_url, creds) = strip_nats_userinfo(nats_url);
let connect_future = match creds {
Some((user, password)) => {
async_nats::ConnectOptions::with_user_and_password(user, password).connect(&clean_url)
}
None => async_nats::ConnectOptions::new().connect(&clean_url),
};
match connect_future.await {
Ok(client) => {
tracing::info!(url = %clean_url, "Connected to NATS");
Some(client)
}
Err(e) => {
tracing::warn!(error = %e, url = %clean_url, "Failed to connect to NATS, continuing without it");
None
}
}
}
fn strip_nats_userinfo(url: &str) -> (String, Option<(String, String)>) {
let scheme_sep = "://";
let Some(scheme_idx) = url.find(scheme_sep) else {
return (url.to_string(), None);
};
let after_scheme = &url[scheme_idx + scheme_sep.len()..];
let Some(at_idx) = after_scheme.find('@') else {
return (url.to_string(), None);
};
let userinfo = &after_scheme[..at_idx];
let rest = &after_scheme[at_idx + 1..];
let mut parts = userinfo.splitn(2, ':');
let user = parts.next().unwrap_or("").to_string();
let password = parts.next().unwrap_or("").to_string();
if user.is_empty() {
return (url.to_string(), None);
}
let cleaned = format!("{}{}{}", &url[..scheme_idx], scheme_sep, rest);
(cleaned, Some((user, password)))
}
fn resolve_encryption_key(key_env: Option<String>, allow_insecure: bool) -> anyhow::Result<String> {
if let Some(k) = key_env {
if !k.trim().is_empty() {
return Ok(k);
}
}
if allow_insecure {
use base64::Engine as _;
use rand::RngCore;
let mut key = [0u8; 32];
rand::thread_rng().fill_bytes(&mut key);
let b64 = base64::engine::general_purpose::STANDARD.encode(key);
tracing::warn!(
"NOETL_ENCRYPTION_KEY is not set and NOETL_ALLOW_INSECURE_DEFAULT_KEY is \
enabled: generated a RANDOM ephemeral key for this process. Encrypted \
credentials will NOT decrypt after a restart. Never use in production."
);
return Ok(b64);
}
anyhow::bail!(
"NOETL_ENCRYPTION_KEY is not set. Refusing to start: the insecure all-zeros \
default key was removed (noetl/ai-meta#61). Provide a base64-encoded 32-byte \
key via NOETL_ENCRYPTION_KEY (production), or set \
NOETL_ALLOW_INSECURE_DEFAULT_KEY=true for ephemeral non-production use."
)
}
fn get_encryption_key() -> anyhow::Result<String> {
let key_env = std::env::var("NOETL_ENCRYPTION_KEY").ok();
let allow_insecure = std::env::var("NOETL_ALLOW_INSECURE_DEFAULT_KEY")
.map(|v| {
matches!(
v.trim().to_ascii_lowercase().as_str(),
"1" | "true" | "yes" | "on"
)
})
.unwrap_or(false);
resolve_encryption_key(key_env, allow_insecure)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
init_tracing();
tracing::info!(
version = env!("CARGO_PKG_VERSION"),
"Starting NoETL Control Plane"
);
let app_config = AppConfig::from_env().unwrap_or_else(|e| {
tracing::warn!(error = %e, "Failed to load app config, using defaults");
AppConfig::default()
});
let db_config = DatabaseConfig::from_env().unwrap_or_else(|e| {
tracing::warn!(error = %e, "Failed to load database config, using defaults");
DatabaseConfig::default()
});
tracing::info!(
host = %app_config.host,
port = app_config.port,
debug = app_config.debug,
"Configuration loaded"
);
let db_pool = create_pool(&db_config).await?;
let sharding_config = ShardingConfig::from_env().unwrap_or_else(|e| {
tracing::warn!(
error = %e,
"Failed to parse NOETL_SHARDS / NOETL_CLUSTER_DSN; falling back to single-pool mode"
);
ShardingConfig::default()
});
let pools = if sharding_config.is_disabled() {
DbPoolMap::from_single_pool(db_pool.clone())
} else {
DbPoolMap::new(&db_config, &sharding_config).await?
};
tracing::info!(
shard_count = pools.shard_count(),
single_pool_mode = pools.is_single_pool(),
"Database pool map ready"
);
let nats_client = connect_nats(&app_config).await;
let encryption_key = get_encryption_key()?;
let state = AppState::new(db_pool.clone(), pools, app_config.clone(), nats_client);
let catalog_service = CatalogService::new(db_pool.clone());
let wallet_cipher = noetl_server::crypto::build_envelope_cipher(&encryption_key)?;
let wallet_cipher_for_router = wallet_cipher.clone();
noetl_server::db::queries::secret_audit::ensure_table(&db_pool).await?;
noetl_server::db::queries::result_store::ensure_table(&db_pool).await?;
let keychain_service = KeychainService::new(db_pool.clone(), wallet_cipher.clone());
let credential_service =
CredentialService::new(db_pool.clone(), wallet_cipher, keychain_service.clone());
let execution_service = ExecutionService::new(state.pools.clone(), state.snowflake.clone());
let runtime_service = RuntimeService::new(db_pool.clone(), state.snowflake.clone());
let replay_service = ReplayService::new(state.pools.clone());
let result_store_service =
ResultStoreService::new(db_pool.clone(), state.snowflake.clone());
let app = build_router(
state,
db_pool,
catalog_service,
credential_service,
keychain_service,
execution_service,
runtime_service,
replay_service,
result_store_service,
wallet_cipher_for_router,
);
let addr: SocketAddr = app_config.bind_address().parse()?;
match noetl_server::tls::tls_params_from_env()? {
Some(params) => {
let mtls = params.mtls();
let server_config = noetl_server::tls::build_server_config(¶ms)?;
let rustls_config = axum_server::tls_rustls::RustlsConfig::from_config(
std::sync::Arc::new(server_config),
);
tracing::info!(address = %addr, tls = true, mtls, "Server listening (TLS)");
let handle = axum_server::Handle::new();
let shutdown_handle = handle.clone();
tokio::spawn(async move {
shutdown_signal().await;
shutdown_handle.graceful_shutdown(Some(std::time::Duration::from_secs(10)));
});
axum_server::bind_rustls(addr, rustls_config)
.handle(handle)
.serve(app.into_make_service())
.await?;
}
None => {
let listener = TcpListener::bind(addr).await?;
tracing::info!(address = %addr, "Server listening");
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
}
}
tracing::info!("Server shutdown complete");
Ok(())
}
async fn shutdown_signal() {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("Failed to install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
tracing::info!("Received Ctrl+C, starting graceful shutdown");
}
_ = terminate => {
tracing::info!("Received SIGTERM, starting graceful shutdown");
}
}
}
#[cfg(test)]
mod encryption_key_tests {
use super::resolve_encryption_key;
#[test]
fn uses_explicit_key_when_present() {
let k = resolve_encryption_key(Some("dGVzdC1rZXk=".to_string()), false).unwrap();
assert_eq!(k, "dGVzdC1rZXk=");
}
#[test]
fn fails_closed_when_absent_and_not_insecure() {
assert!(resolve_encryption_key(None, false).is_err());
}
#[test]
fn fails_closed_when_empty_and_not_insecure() {
assert!(resolve_encryption_key(Some(" ".to_string()), false).is_err());
}
#[test]
fn insecure_escape_hatch_generates_a_key() {
let k = resolve_encryption_key(None, true).unwrap();
assert!(!k.is_empty());
let k2 = resolve_encryption_key(None, true).unwrap();
assert_ne!(k, k2);
}
#[test]
fn explicit_key_wins_over_insecure_flag() {
let k = resolve_encryption_key(Some("ZXhwbGljaXQ=".to_string()), true).unwrap();
assert_eq!(k, "ZXhwbGljaXQ=");
}
}