use axum::{
routing::{delete, get, post},
Router,
};
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use noetl_server::{
config::{AppConfig, DatabaseConfig, ShardingConfig},
db::{create_pool, DbPool, DbPoolMap},
handlers,
services::{
CatalogService, CredentialService, ExecutionService, KeychainService, RuntimeService,
},
state::AppState,
};
const DEFAULT_ENCRYPTION_KEY: &str = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=";
fn init_tracing() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "info,noetl_control_plane=debug,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
}
fn build_router(
state: AppState,
db_pool: DbPool,
catalog_service: CatalogService,
credential_service: CredentialService,
keychain_service: KeychainService,
execution_service: ExecutionService,
runtime_service: RuntimeService,
) -> Router {
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any);
let mut health_routes = Router::new()
.route("/health", get(handlers::health_check))
.route("/api/health", get(handlers::api_health))
.route("/api/pool/status", get(handlers::health::pool_status));
if !state.config.disable_metrics {
health_routes = health_routes.route("/metrics", get(handlers::health::metrics));
}
let health_routes = health_routes.with_state(state.clone());
let catalog_routes = Router::new()
.route("/api/catalog/register", post(handlers::catalog::register))
.route("/api/catalog/list", post(handlers::catalog::list))
.route(
"/api/catalog/resource",
post(handlers::catalog::get_resource),
)
.route(
"/api/catalog/{*tail}",
get(handlers::catalog::ui_schema),
)
.with_state(catalog_service);
let credential_routes = Router::new()
.route(
"/api/credentials",
post(handlers::credentials::create_or_update),
)
.route("/api/credentials", get(handlers::credentials::list))
.route(
"/api/credentials/{identifier}",
get(handlers::credentials::get),
)
.route(
"/api/credentials/{identifier}",
delete(handlers::credentials::delete),
)
.with_state(credential_service);
let keychain_routes = Router::new()
.route(
"/api/keychain/{catalog_id}/{keychain_name}",
get(handlers::keychain::get),
)
.route(
"/api/keychain/{catalog_id}/{keychain_name}",
post(handlers::keychain::set),
)
.route(
"/api/keychain/{catalog_id}/{keychain_name}",
delete(handlers::keychain::delete),
)
.route(
"/api/keychain/catalog/{catalog_id}",
get(handlers::keychain::list_by_catalog),
)
.with_state(keychain_service);
let execution_routes = Router::new()
.route("/api/execute", post(handlers::execute))
.route("/api/events", post(handlers::handle_event))
.route(
"/api/events/batch",
post(handlers::events::handle_batch_events),
)
.route("/api/commands/{event_id}", get(handlers::get_command))
.route(
"/api/commands/{event_id}/claim",
post(handlers::events::claim_command),
)
.with_state(state.clone());
let executions_routes = Router::new()
.route("/api/executions", get(handlers::executions::list))
.route(
"/api/executions/{execution_id}",
get(handlers::executions::get),
)
.route(
"/api/executions/{execution_id}/status",
get(handlers::executions::get_status),
)
.route(
"/api/executions/{execution_id}/cancel",
post(handlers::executions::cancel),
)
.route(
"/api/executions/{execution_id}/cancellation-check",
get(handlers::executions::cancellation_check),
)
.route(
"/api/executions/{execution_id}/finalize",
post(handlers::executions::finalize),
)
.with_state(execution_service);
let variable_routes = Router::new()
.route("/api/vars/{execution_id}", get(handlers::variables::list))
.route("/api/vars/{execution_id}", post(handlers::variables::set))
.route(
"/api/vars/{execution_id}",
delete(handlers::variables::cleanup),
)
.route(
"/api/vars/{execution_id}/{var_name}",
get(handlers::variables::get),
)
.route(
"/api/vars/{execution_id}/{var_name}",
delete(handlers::variables::delete_var),
)
.with_state(db_pool.clone());
let runtime_routes = Router::new()
.route(
"/api/worker/pool/register",
post(handlers::runtime::register_pool),
)
.route(
"/api/worker/pool/deregister",
delete(handlers::runtime::deregister_pool),
)
.route(
"/api/worker/pool/heartbeat",
post(handlers::runtime::heartbeat),
)
.route("/api/worker/pools", get(handlers::runtime::list_pools))
.with_state(runtime_service);
let sharding_routes = Router::new()
.route(
"/api/runtime/shard-info",
get(handlers::sharding::get_shard_info),
)
.with_state(state.clone());
let database_routes = Router::new()
.route(
"/api/postgres/execute",
post(handlers::database::execute_postgres),
)
.route("/api/db/init", post(handlers::database::init_database))
.route(
"/api/db/validate",
get(handlers::database::validate_database),
)
.with_state(db_pool.clone());
let internal_routes = Router::new()
.route(
"/api/internal/outbox/claim",
post(handlers::internal::outbox_claim),
)
.route(
"/api/internal/outbox/mark-published",
post(handlers::internal::outbox_mark_published),
)
.route(
"/api/internal/outbox/mark-failed",
post(handlers::internal::outbox_mark_failed),
)
.route(
"/api/internal/outbox/pending-count",
get(handlers::internal::outbox_pending_count),
)
.route(
"/api/internal/events/project",
post(handlers::internal::events_project),
)
.with_state(db_pool.clone());
let system_routes = Router::new()
.route("/api/status", get(handlers::system::get_status))
.route("/api/threads", get(handlers::system::get_threads))
.route(
"/api/profiler/status",
get(handlers::system::get_profiler_status),
)
.route(
"/api/profiler/memory/start",
post(handlers::system::start_memory_profiler),
)
.route(
"/api/profiler/memory/stop",
post(handlers::system::stop_memory_profiler),
)
.with_state(state);
let dashboard_routes = Router::new()
.route("/api/dashboard/stats", get(handlers::dashboard::get_stats))
.route(
"/api/dashboard/widgets",
get(handlers::dashboard::get_widgets),
)
.with_state(db_pool);
Router::new()
.merge(health_routes)
.merge(catalog_routes)
.merge(credential_routes)
.merge(keychain_routes)
.merge(execution_routes)
.merge(executions_routes)
.merge(variable_routes)
.merge(runtime_routes)
.merge(sharding_routes)
.merge(database_routes)
.merge(internal_routes)
.merge(system_routes)
.merge(dashboard_routes)
.layer(TraceLayer::new_for_http())
.layer(cors)
}
async fn connect_nats(config: &AppConfig) -> Option<async_nats::Client> {
let Some(ref nats_url) = config.nats_url else {
tracing::info!("NATS not configured, running without messaging");
return None;
};
let (clean_url, creds) = strip_nats_userinfo(nats_url);
let connect_future = match creds {
Some((user, password)) => async_nats::ConnectOptions::with_user_and_password(user, password)
.connect(&clean_url),
None => async_nats::ConnectOptions::new().connect(&clean_url),
};
match connect_future.await {
Ok(client) => {
tracing::info!(url = %clean_url, "Connected to NATS");
Some(client)
}
Err(e) => {
tracing::warn!(error = %e, url = %clean_url, "Failed to connect to NATS, continuing without it");
None
}
}
}
fn strip_nats_userinfo(url: &str) -> (String, Option<(String, String)>) {
let scheme_sep = "://";
let Some(scheme_idx) = url.find(scheme_sep) else {
return (url.to_string(), None);
};
let after_scheme = &url[scheme_idx + scheme_sep.len()..];
let Some(at_idx) = after_scheme.find('@') else {
return (url.to_string(), None);
};
let userinfo = &after_scheme[..at_idx];
let rest = &after_scheme[at_idx + 1..];
let mut parts = userinfo.splitn(2, ':');
let user = parts.next().unwrap_or("").to_string();
let password = parts.next().unwrap_or("").to_string();
if user.is_empty() {
return (url.to_string(), None);
}
let cleaned = format!("{}{}{}", &url[..scheme_idx], scheme_sep, rest);
(cleaned, Some((user, password)))
}
fn get_encryption_key() -> String {
std::env::var("NOETL_ENCRYPTION_KEY").unwrap_or_else(|_| {
tracing::warn!("NOETL_ENCRYPTION_KEY not set, using default (not secure for production)");
DEFAULT_ENCRYPTION_KEY.to_string()
})
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();
init_tracing();
tracing::info!(
version = env!("CARGO_PKG_VERSION"),
"Starting NoETL Control Plane"
);
let app_config = AppConfig::from_env().unwrap_or_else(|e| {
tracing::warn!(error = %e, "Failed to load app config, using defaults");
AppConfig::default()
});
let db_config = DatabaseConfig::from_env().unwrap_or_else(|e| {
tracing::warn!(error = %e, "Failed to load database config, using defaults");
DatabaseConfig::default()
});
tracing::info!(
host = %app_config.host,
port = app_config.port,
debug = app_config.debug,
"Configuration loaded"
);
let db_pool = create_pool(&db_config).await?;
let sharding_config = ShardingConfig::from_env().unwrap_or_else(|e| {
tracing::warn!(
error = %e,
"Failed to parse NOETL_SHARDS / NOETL_CLUSTER_DSN; falling back to single-pool mode"
);
ShardingConfig::default()
});
let pools = if sharding_config.is_disabled() {
DbPoolMap::from_single_pool(db_pool.clone())
} else {
DbPoolMap::new(&db_config, &sharding_config).await?
};
tracing::info!(
shard_count = pools.shard_count(),
single_pool_mode = pools.is_single_pool(),
"Database pool map ready"
);
let nats_client = connect_nats(&app_config).await;
let encryption_key = get_encryption_key();
let state = AppState::new(db_pool.clone(), pools, app_config.clone(), nats_client);
let catalog_service = CatalogService::new(db_pool.clone());
let credential_service = CredentialService::new(db_pool.clone(), &encryption_key)?;
let keychain_service = KeychainService::new(db_pool.clone(), &encryption_key)?;
let execution_service = ExecutionService::new(state.pools.clone(), state.snowflake.clone());
let runtime_service = RuntimeService::new(db_pool.clone(), state.snowflake.clone());
let app = build_router(
state,
db_pool,
catalog_service,
credential_service,
keychain_service,
execution_service,
runtime_service,
);
let addr: SocketAddr = app_config.bind_address().parse()?;
let listener = TcpListener::bind(addr).await?;
tracing::info!(address = %addr, "Server listening");
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
tracing::info!("Server shutdown complete");
Ok(())
}
async fn shutdown_signal() {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("Failed to install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
tracing::info!("Received Ctrl+C, starting graceful shutdown");
}
_ = terminate => {
tracing::info!("Received SIGTERM, starting graceful shutdown");
}
}
}