use crate::common::model::message::TaskEvent;
use crate::engine::api::auth::auth_middleware;
use crate::engine::api::config::{
create_account, create_middleware, create_module, create_platform, create_profile,
delete_account, delete_middleware, delete_module, delete_platform, get_account, get_middleware,
get_module, get_platform, get_profile, list_accounts, list_middlewares, list_modules,
list_platforms, list_profiles, patch_account, patch_middleware, patch_module, patch_platform,
patch_profile,
};
use crate::engine::api::control::{
get_cluster_leader, get_module_fallback_gate, get_nodes, pause_engine, resume_engine,
};
use crate::engine::api::debug::{
get_debug_cached_response, get_debug_config, get_debug_profile, get_debug_status,
get_debug_status_counts, list_debug_status_by_stage,
};
use crate::engine::api::dlq::{delete_dlq_message, get_dlq_messages, requeue_dlq_message};
use crate::engine::api::health;
use crate::engine::api::limit;
use crate::engine::api::state::ApiState;
use crate::queue::QueuedItem;
use axum::extract::State;
use axum::http::StatusCode;
use axum::routing::{delete, get, post};
use axum::{Json, Router, middleware};
use std::time::Instant;
pub fn router(state: ApiState) -> Router {
let protected_routes = Router::new()
.route("/tasks/dispatch", post(dispatch_task))
.route("/cluster/nodes", get(get_nodes))
.route("/cluster/leader", get(get_cluster_leader))
.route("/config/accounts", get(list_accounts).post(create_account))
.route(
"/config/accounts/{name}",
get(get_account).patch(patch_account).delete(delete_account),
)
.route(
"/config/platforms",
get(list_platforms).post(create_platform),
)
.route(
"/config/platforms/{name}",
get(get_platform)
.patch(patch_platform)
.delete(delete_platform),
)
.route("/config/modules", get(list_modules).post(create_module))
.route(
"/config/modules/{name}",
get(get_module).patch(patch_module).delete(delete_module),
)
.route(
"/config/middlewares",
get(list_middlewares).post(create_middleware),
)
.route(
"/config/middlewares/{name}",
get(get_middleware)
.patch(patch_middleware)
.delete(delete_middleware),
)
.route("/config/profiles", get(list_profiles).post(create_profile))
.route(
"/config/profiles/{account}/{platform}/{module}",
get(get_profile).patch(patch_profile),
)
.route("/dlq/messages", get(get_dlq_messages))
.route("/dlq/messages/{id}/requeue", post(requeue_dlq_message))
.route("/dlq/messages/{id}", delete(delete_dlq_message))
.route("/debug/status/counts", get(get_debug_status_counts))
.route(
"/debug/status/stage/{stage}",
get(list_debug_status_by_stage),
)
.route("/debug/status/{task_id}", get(get_debug_status))
.route(
"/debug/cache/response/{cache_key}",
get(get_debug_cached_response),
)
.route(
"/debug/config/{account}/{platform}/{module}",
get(get_debug_config),
)
.route(
"/debug/profile/{account}/{platform}/{module}",
get(get_debug_profile),
)
.route("/control/pause", post(pause_engine))
.route("/control/resume", post(resume_engine))
.route(
"/control/fallback-gates/{module}",
get(get_module_fallback_gate),
)
.route_layer(middleware::from_fn_with_state(
state.clone(),
auth_middleware,
));
let rate_limited_protected_routes = protected_routes.route_layer(
middleware::from_fn_with_state(state.clone(), limit::rate_limit_middleware),
);
let public_routes = Router::new()
.route("/metrics", get(metrics_handler))
.route("/health", get(health::health_check));
public_routes
.merge(rate_limited_protected_routes)
.with_state(state)
}
pub async fn metrics_handler(State(state): State<ApiState>) -> String {
if let Some(handle) = &state.prometheus_handle {
handle.render()
} else {
"Prometheus metrics not available (recorder not initialized)".to_string()
}
}
pub async fn dispatch_task(
State(app_state): State<ApiState>,
Json(task): Json<TaskEvent>,
) -> StatusCode {
let started = Instant::now();
let task_pop_chain = app_state.queue_manager.get_task_push_channel().clone();
let dispatch = match build_task_dispatch(&task, app_state.queue_manager.namespace.clone()) {
Ok(dispatch) => dispatch,
Err(e) => {
log::error!("Failed to build task dispatch envelope: {}", e);
record_dispatch_status("dispatch_task", StatusCode::INTERNAL_SERVER_ERROR, started);
return StatusCode::INTERNAL_SERVER_ERROR;
}
};
if let Err(e) = task_pop_chain.send(QueuedItem::new(dispatch)).await {
log::error!("Failed to send task to processing channel: {}", e);
record_dispatch_status("dispatch_task", StatusCode::INTERNAL_SERVER_ERROR, started);
return StatusCode::INTERNAL_SERVER_ERROR;
}
record_dispatch_status("dispatch_task", StatusCode::ACCEPTED, started);
StatusCode::ACCEPTED
}
fn record_dispatch_status(action: &str, status: StatusCode, started: Instant) {
let result = match status {
StatusCode::OK | StatusCode::ACCEPTED | StatusCode::CREATED | StatusCode::NO_CONTENT => {
"success"
}
StatusCode::NOT_FOUND => "not_found",
StatusCode::BAD_REQUEST => "bad_request",
_ => "error",
};
crate::common::metrics::inc_throughput("control_plane", "dispatch", action, result, 1);
crate::common::metrics::observe_latency(
"control_plane",
"dispatch",
action,
result,
started.elapsed().as_secs_f64(),
);
if result == "error" {
crate::common::metrics::inc_error("control_plane", "dispatch", "http", status.as_str(), 1);
}
}
use crate::engine::task::task_dispatch_adapter::build_task_dispatch;