pub mod admin;
pub mod data;
pub mod openapi;
pub mod response_helpers;
use std::sync::Arc;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::routing::get;
use axum::{Json, Router};
use serde_json::json;
use utoipa::OpenApi;
use crate::server::state::AppState;
pub fn api_routes() -> Router<AppState> {
let router = Router::new()
.route("/health", get(health_check))
.route("/healthz", get(liveness_check))
.route("/readyz", get(readiness_check))
.route("/metrics", get(metrics_endpoint))
.nest("/api/v1/admin", admin::admin_routes())
.nest("/api/v1/data", data::data_routes());
router.merge(
utoipa_swagger_ui::SwaggerUi::new("/docs")
.url("/api/v1/openapi.json", openapi::ApiDoc::openapi()),
)
}
#[utoipa::path(
get,
path = "/health",
tag = "Operational",
responses(
(status = 200, description = "Service healthy"),
(status = 503, description = "Service degraded"),
)
)]
#[tracing::instrument(skip(state))]
pub(crate) async fn health_check(State(state): State<AppState>) -> impl IntoResponse {
let uptime = chrono::Utc::now() - state.start_time;
let db_healthy = state.workflow_repo.ping().await.is_ok();
let mut workflows_loaded = 0;
let engine_healthy = match tokio::time::timeout(
std::time::Duration::from_secs(state.config.engine.health_check_timeout_secs),
crate::engine::acquire_engine_read(&state.engine),
)
.await
{
Ok(engine) => {
workflows_loaded = engine.workflows().len();
true
}
Err(_) => false,
};
let cb_states = state.connector_registry.circuit_breaker_states().await;
let overall_healthy = db_healthy && engine_healthy;
let status_str = if overall_healthy { "ok" } else { "degraded" };
let http_status = if overall_healthy {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
let body = json!({
"status": status_str,
"version": env!("CARGO_PKG_VERSION"),
"git_hash": env!("GIT_HASH"),
"build_timestamp": env!("BUILD_TIMESTAMP"),
"uptime_seconds": uptime.num_seconds(),
"workflows_loaded": workflows_loaded,
"components": {
"database": if db_healthy { "ok" } else { "error" },
"engine": if engine_healthy { "ok" } else { "error" },
},
"connectors": {
"circuit_breakers": cb_states,
}
});
(http_status, Json(body))
}
#[utoipa::path(
get,
path = "/metrics",
tag = "Operational",
responses(
(status = 200, description = "Prometheus metrics", content_type = "text/plain"),
)
)]
pub(crate) async fn metrics_endpoint(State(state): State<AppState>) -> impl IntoResponse {
crate::metrics::set_db_pool_size(state.db_pool.size() as f64);
crate::metrics::set_db_pool_idle(state.db_pool.num_idle() as f64);
let metrics = state.metrics_handle.render();
(
StatusCode::OK,
[("content-type", "text/plain; version=0.0.4; charset=utf-8")],
metrics,
)
}
pub(crate) async fn liveness_check() -> impl IntoResponse {
(StatusCode::OK, Json(json!({ "status": "ok" })))
}
pub(crate) async fn readiness_check(State(state): State<AppState>) -> impl IntoResponse {
use std::sync::atomic::Ordering;
let db_healthy = state.workflow_repo.ping().await.is_ok();
let engine_healthy = tokio::time::timeout(
std::time::Duration::from_secs(state.config.engine.health_check_timeout_secs),
crate::engine::acquire_engine_read(&state.engine),
)
.await
.is_ok();
let initialized = state.ready.load(Ordering::Acquire);
let all_ready = db_healthy && engine_healthy && initialized;
let http_status = if all_ready {
StatusCode::OK
} else {
StatusCode::SERVICE_UNAVAILABLE
};
let body = json!({
"status": if all_ready { "ready" } else { "not_ready" },
"components": {
"database": if db_healthy { "ok" } else { "error" },
"engine": if engine_healthy { "ok" } else { "error" },
"initialized": initialized,
}
});
(http_status, Json(body))
}
#[tracing::instrument(skip(state))]
pub async fn reload_engine(state: &AppState) -> Result<(), crate::errors::OrionError> {
let start = std::time::Instant::now();
let result = async {
let channels = state.channel_repo.list_active().await?;
let channels = crate::engine::filter_channels(channels, &state.config.channels);
let active_workflows = state.workflow_repo.list_active().await?;
let workflows = crate::engine::build_engine_workflows(&channels, &active_workflows);
let current_engine = crate::engine::acquire_engine_read(&state.engine).await;
let new_engine = Arc::new(
current_engine
.with_new_workflows(workflows)
.map_err(crate::errors::OrionError::Engine)?,
);
let mut engine_write = tokio::time::timeout(
std::time::Duration::from_secs(state.config.engine.reload_timeout_secs),
crate::engine::acquire_engine_write(&state.engine),
)
.await
.map_err(|_| {
crate::errors::OrionError::Internal(
"Engine reload timed out waiting for write lock".into(),
)
})?;
*engine_write = new_engine;
state
.channel_registry
.reload(
&channels,
&state.connector_registry,
&state.cache_pool,
&state.datalogic,
&state.config.tracing.storage,
)
.await;
crate::metrics::set_active_workflows(active_workflows.len() as f64);
if state.config.kafka.enabled {
restart_kafka_consumer_if_needed(state, &channels).await;
}
tracing::info!(
workflow_count = active_workflows.len(),
channel_count = channels.len(),
"Engine reloaded"
);
Ok(())
}
.await;
let duration = start.elapsed().as_secs_f64();
crate::metrics::record_engine_reload_duration(duration);
match &result {
Ok(()) => crate::metrics::record_engine_reload("success"),
Err(_) => crate::metrics::record_engine_reload("failure"),
}
result
}
async fn restart_kafka_consumer_if_needed(
state: &AppState,
channels: &[crate::storage::models::Channel],
) {
use std::collections::HashSet;
let mut all_topics = state.config.kafka.topics.clone();
for ch in channels {
if (ch.protocol == crate::storage::models::ChannelProtocol::Kafka.as_str()
|| ch.channel_type == "async")
&& let Some(ref topic) = ch.topic
&& !all_topics.iter().any(|t| t.topic == *topic)
{
all_topics.push(crate::config::TopicMapping {
topic: topic.clone(),
channel: ch.name.clone(),
});
}
}
let new_topic_set: HashSet<String> = all_topics.iter().map(|t| t.topic.clone()).collect();
let mut handle_guard = state.kafka_consumer_handle.lock().await;
if let Some(ref existing_handle) = *handle_guard
&& *existing_handle.topics() == new_topic_set
{
tracing::info!("Kafka topics unchanged, pausing consumer during engine swap");
if let Err(e) = existing_handle.pause() {
tracing::warn!(error = %e, "Failed to pause Kafka consumer, falling back to full restart");
} else {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
if let Err(e) = existing_handle.resume() {
tracing::error!(error = %e, "Failed to resume Kafka consumer after engine reload");
} else {
tracing::info!("Kafka consumer resumed after engine reload");
return;
}
}
}
if let Some(ref existing_handle) = *handle_guard {
let _ = existing_handle.pause(); tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
if let Some(old_handle) = handle_guard.take() {
tracing::info!("Shutting down Kafka consumer for topic refresh...");
old_handle.shutdown().await;
}
if all_topics.is_empty() {
tracing::info!("No Kafka topics configured or from DB, consumer not started");
return;
}
let merged_config = crate::config::KafkaIngestConfig {
topics: all_topics,
..state.config.kafka.clone()
};
let dlq_producer = if state.config.kafka.dlq.enabled {
state.kafka_producer.clone()
} else {
None
};
let dlq_topic = if state.config.kafka.dlq.enabled {
Some(state.config.kafka.dlq.topic.clone())
} else {
None
};
match crate::kafka::consumer::start_consumer(
&merged_config,
state.engine.clone(),
dlq_producer,
dlq_topic,
) {
Ok(new_handle) => {
tracing::info!(
topics = ?new_topic_set,
"Kafka consumer restarted with updated topics"
);
*handle_guard = Some(new_handle);
}
Err(e) => {
tracing::error!(error = %e, "Failed to restart Kafka consumer");
}
}
}