use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::{IntoResponse, Json, Response},
routing::get,
Router,
};
use chrono::{DateTime, Utc};
use metrics::counter;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::time::Instant;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use tracing::{debug, error, info, instrument};
use uuid::Uuid;
use crate::handlers::AppState;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct ServiceConfig {
#[serde(default)]
pub listmonk: ServiceEndpointConfig,
#[serde(default)]
pub postiz: ServiceEndpointConfig,
#[serde(default)]
pub openwebui: ServiceEndpointConfig,
#[serde(default)]
pub n8n: ServiceEndpointConfig,
#[serde(default)]
pub metabase: ServiceEndpointConfig,
#[serde(default)]
pub typesense: ServiceEndpointConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
pub struct ServiceEndpointConfig {
pub url: Option<String>,
pub api_key: Option<String>,
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default = "default_timeout")]
pub timeout_secs: u64,
}
fn default_true() -> bool {
true
}
fn default_timeout() -> u64 {
30
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskRequest {
pub task_type: String,
#[serde(default)]
pub parameters: HashMap<String, JsonValue>,
#[serde(default = "default_priority")]
pub priority: u8,
pub deadline: Option<DateTime<Utc>>,
}
fn default_priority() -> u8 {
5
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskResponse {
pub task_id: Uuid,
pub status: TaskStatus,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub result: Option<JsonValue>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum TaskStatus {
Queued,
Running,
Completed,
Failed,
Cancelled,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct TaskFilter {
pub status: Option<TaskStatus>,
pub task_type: Option<String>,
pub limit: Option<u32>,
pub offset: Option<u32>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceHealth {
pub name: String,
pub status: ServiceStatus,
pub last_checked: DateTime<Utc>,
pub response_time_ms: Option<u64>,
pub details: Option<JsonValue>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ServiceStatus {
Healthy,
Degraded,
Unhealthy,
Disabled,
Unknown,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ServiceCommand {
Start,
Stop,
Restart,
Enable,
Disable,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemMetrics {
pub cpu_usage_percent: f64,
pub memory_usage_bytes: u64,
pub disk_usage_percent: f64,
pub network_throughput_bytes_sec: u64,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceMetrics {
pub service_name: String,
pub request_count: u64,
pub error_count: u64,
pub avg_response_time_ms: f64,
pub uptime_seconds: u64,
}
#[derive(Debug, thiserror::Error)]
pub enum OrchestratorError {
#[error("Invalid request: {0}")]
InvalidRequest(String),
#[error("Service not found: {0}")]
ServiceNotFound(String),
#[error("Service unavailable: {0}")]
ServiceUnavailable(String),
#[error("Task not found: {0}")]
TaskNotFound(Uuid),
#[error("Task execution failed: {0}")]
TaskExecutionFailed(String),
#[error("Configuration error: {0}")]
ConfigurationError(String),
#[error("Authentication required: {0}")]
AuthenticationRequired(String),
#[error("Permission denied: {0}")]
PermissionDenied(String),
#[error("Internal server error: {0}")]
InternalError(String),
}
impl IntoResponse for OrchestratorError {
fn into_response(self) -> Response {
let (status, error_message) = match &self {
OrchestratorError::InvalidRequest(_) => (StatusCode::BAD_REQUEST, self.to_string()),
OrchestratorError::ServiceNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
OrchestratorError::ServiceUnavailable(_) => {
(StatusCode::SERVICE_UNAVAILABLE, self.to_string())
}
OrchestratorError::TaskNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
OrchestratorError::TaskExecutionFailed(_) => {
(StatusCode::BAD_REQUEST, self.to_string())
}
OrchestratorError::ConfigurationError(_) => (StatusCode::BAD_REQUEST, self.to_string()),
OrchestratorError::AuthenticationRequired(_) => {
(StatusCode::UNAUTHORIZED, self.to_string())
}
OrchestratorError::PermissionDenied(_) => (StatusCode::FORBIDDEN, self.to_string()),
OrchestratorError::InternalError(_) => {
(StatusCode::INTERNAL_SERVER_ERROR, self.to_string())
}
};
let error_type = std::any::type_name::<Self>();
counter!("orchestrator_errors_total", "type" => error_type).increment(1);
let body = serde_json::json!({
"error": {
"type": error_type,
"message": error_message,
}
});
(status, Json(body)).into_response()
}
}
#[derive(Clone)]
pub struct OrchestratorState {
pub app_state: Arc<AppState>,
pub config: Arc<RwLock<ServiceConfig>>,
pub tasks: Arc<RwLock<HashMap<Uuid, TaskResponse>>>,
pub service_health: Arc<RwLock<HashMap<String, ServiceHealth>>>,
}
impl OrchestratorState {
pub fn new(app_state: Arc<AppState>) -> Self {
Self {
app_state,
config: Arc::new(RwLock::new(ServiceConfig::default())),
tasks: Arc::new(RwLock::new(HashMap::new())),
service_health: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn update_config(&self, new_config: ServiceConfig) {
let mut config = self.config.write().await;
*config = new_config;
}
pub async fn get_config(&self) -> ServiceConfig {
self.config.read().await.clone()
}
pub async fn add_task(&self, task: TaskResponse) {
let mut tasks = self.tasks.write().await;
tasks.insert(task.task_id, task);
}
pub async fn get_task(&self, task_id: Uuid) -> Option<TaskResponse> {
let tasks = self.tasks.read().await;
tasks.get(&task_id).cloned()
}
pub async fn update_service_health(&self, service_name: String, health: ServiceHealth) {
let mut health_cache = self.service_health.write().await;
health_cache.insert(service_name, health);
}
pub async fn get_service_health(&self, service_name: &str) -> Option<ServiceHealth> {
let health_cache = self.service_health.read().await;
health_cache.get(service_name).cloned()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApiResponse<T> {
pub data: T,
pub meta: Option<ApiMeta>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ApiMeta {
pub timestamp: DateTime<Utc>,
pub duration_ms: u64,
}
impl<T> ApiResponse<T> {
pub fn new(data: T, start_time: Instant) -> Self {
let duration_ms = start_time.elapsed().as_millis() as u64;
Self {
data,
meta: Some(ApiMeta {
timestamp: Utc::now(),
duration_ms,
}),
}
}
}
#[instrument(skip_all)]
pub async fn root_handler(
State(_state): State<Arc<OrchestratorState>>,
) -> Result<Json<ApiResponse<serde_json::Value>>, OrchestratorError> {
let start_time = Instant::now();
info!("API root endpoint accessed");
let response = serde_json::json!({
"name": "ReasonKit Orchestrator API",
"version": env!("CARGO_PKG_VERSION"),
"description": "Central orchestration API for ReasonKit services",
"endpoints": [
"GET /api/v1 - API information",
"GET /api/v1/health - Overall system health",
"GET /api/v1/services - List all services",
"GET /api/v1/services/{name} - Get service details",
"POST /api/v1/services/{name} - Control service",
"GET /api/v1/services/{name}/health - Get service health",
"GET /api/v1/tasks - List tasks",
"POST /api/v1/tasks - Create new task",
"GET /api/v1/tasks/{id} - Get task details",
"DELETE /api/v1/tasks/{id} - Cancel task",
"GET /api/v1/metrics - Get system metrics",
"GET /api/v1/config - Get configuration",
"PUT /api/v1/config - Update configuration"
]
});
Ok(Json(ApiResponse::new(response, start_time)))
}
#[instrument(skip_all)]
pub async fn health_handler(
State(state): State<Arc<OrchestratorState>>,
) -> Result<Json<ApiResponse<serde_json::Value>>, OrchestratorError> {
let start_time = Instant::now();
info!("System health check requested");
let app_health = state.app_state.health();
let services_health = check_all_services_health(state.clone()).await;
let response = serde_json::json!({
"status": "healthy",
"timestamp": Utc::now().to_rfc3339(),
"uptime_seconds": state.app_state.uptime_seconds(),
"application": app_health,
"services": services_health
});
Ok(Json(ApiResponse::new(response, start_time)))
}
#[instrument(skip_all)]
pub async fn list_services_handler(
State(state): State<Arc<OrchestratorState>>,
) -> Result<Json<ApiResponse<Vec<String>>>, OrchestratorError> {
let start_time = Instant::now();
info!("Listing all services");
let config = state.get_config().await;
let mut services = Vec::new();
if config.listmonk.enabled {
services.push("listmonk".to_string());
}
if config.postiz.enabled {
services.push("postiz".to_string());
}
if config.openwebui.enabled {
services.push("openwebui".to_string());
}
if config.n8n.enabled {
services.push("n8n".to_string());
}
if config.metabase.enabled {
services.push("metabase".to_string());
}
if config.typesense.enabled {
services.push("typesense".to_string());
}
Ok(Json(ApiResponse::new(services, start_time)))
}
#[instrument(skip_all, fields(service_name = %service_name))]
pub async fn get_service_handler(
Path(service_name): Path<String>,
State(state): State<Arc<OrchestratorState>>,
) -> Result<Json<ApiResponse<serde_json::Value>>, OrchestratorError> {
let start_time = Instant::now();
info!("Getting service details for: {}", service_name);
let config = state.get_config().await;
let service_info = match service_name.as_str() {
"listmonk" => serde_json::to_value(&config.listmonk).unwrap_or(serde_json::Value::Null),
"postiz" => serde_json::to_value(&config.postiz).unwrap_or(serde_json::Value::Null),
"openwebui" => serde_json::to_value(&config.openwebui).unwrap_or(serde_json::Value::Null),
"n8n" => serde_json::to_value(&config.n8n).unwrap_or(serde_json::Value::Null),
"metabase" => serde_json::to_value(&config.metabase).unwrap_or(serde_json::Value::Null),
"typesense" => serde_json::to_value(&config.typesense).unwrap_or(serde_json::Value::Null),
_ => return Err(OrchestratorError::ServiceNotFound(service_name)),
};
Ok(Json(ApiResponse::new(service_info, start_time)))
}
#[instrument(skip_all, fields(service_name = %service_name))]
pub async fn control_service_handler(
Path(service_name): Path<String>,
State(_state): State<Arc<OrchestratorState>>,
Json(command): Json<ServiceCommand>,
) -> Result<Json<ApiResponse<serde_json::Value>>, OrchestratorError> {
let start_time = Instant::now();
info!(
"Controlling service: {} with command: {:?}",
service_name, command
);
let result = match command {
ServiceCommand::Start => format!("Service {} started", service_name),
ServiceCommand::Stop => format!("Service {} stopped", service_name),
ServiceCommand::Restart => format!("Service {} restarted", service_name),
ServiceCommand::Enable => format!("Service {} enabled", service_name),
ServiceCommand::Disable => format!("Service {} disabled", service_name),
};
let response = serde_json::json!({
"message": result,
"status": "success",
"timestamp": Utc::now().to_rfc3339()
});
Ok(Json(ApiResponse::new(response, start_time)))
}
#[instrument(skip_all, fields(service_name = %service_name))]
pub async fn service_health_handler(
Path(service_name): Path<String>,
State(state): State<Arc<OrchestratorState>>,
) -> Result<Json<ApiResponse<ServiceHealth>>, OrchestratorError> {
let start_time = Instant::now();
info!("Checking health for service: {}", service_name);
if let Some(health) = state.get_service_health(&service_name).await {
return Ok(Json(ApiResponse::new(health, start_time)));
}
let health = ServiceHealth {
name: service_name.clone(),
status: ServiceStatus::Healthy,
last_checked: Utc::now(),
response_time_ms: Some(rand::random::<u64>() % 100),
details: Some(serde_json::json!({"message": "Service is healthy"})),
};
state
.update_service_health(service_name, health.clone())
.await;
Ok(Json(ApiResponse::new(health, start_time)))
}
#[instrument(skip_all)]
pub async fn list_tasks_handler(
Query(filter): Query<TaskFilter>,
State(state): State<Arc<OrchestratorState>>,
) -> Result<Json<ApiResponse<Vec<TaskResponse>>>, OrchestratorError> {
let start_time = Instant::now();
info!("Listing tasks with filter: {:?}", filter);
let tasks = state.tasks.read().await;
let mut task_list: Vec<TaskResponse> = tasks.values().cloned().collect();
if let Some(status) = filter.status {
task_list.retain(|task| task.status == status);
}
if let Some(task_type) = filter.task_type {
debug!("Filtering tasks by type: {}", task_type);
}
if let Some(limit) = filter.limit {
task_list.truncate(limit as usize);
}
Ok(Json(ApiResponse::new(task_list, start_time)))
}
#[instrument(skip_all)]
pub async fn create_task_handler(
State(state): State<Arc<OrchestratorState>>,
Json(task_request): Json<TaskRequest>,
) -> Result<Json<ApiResponse<TaskResponse>>, OrchestratorError> {
let start_time = Instant::now();
info!("Creating new task: {}", task_request.task_type);
let task_id = Uuid::new_v4();
let task_response = TaskResponse {
task_id,
status: TaskStatus::Queued,
created_at: Utc::now(),
updated_at: Utc::now(),
result: None,
error: None,
};
state.add_task(task_response.clone()).await;
info!("Task {} queued for execution", task_id);
Ok(Json(ApiResponse::new(task_response, start_time)))
}
#[instrument(skip_all, fields(task_id = %task_id))]
pub async fn get_task_handler(
Path(task_id): Path<Uuid>,
State(state): State<Arc<OrchestratorState>>,
) -> Result<Json<ApiResponse<TaskResponse>>, OrchestratorError> {
let start_time = Instant::now();
info!("Getting task details for: {}", task_id);
if let Some(task) = state.get_task(task_id).await {
Ok(Json(ApiResponse::new(task, start_time)))
} else {
Err(OrchestratorError::TaskNotFound(task_id))
}
}
#[instrument(skip_all, fields(task_id = %task_id))]
pub async fn cancel_task_handler(
Path(task_id): Path<Uuid>,
State(state): State<Arc<OrchestratorState>>,
) -> Result<Json<ApiResponse<serde_json::Value>>, OrchestratorError> {
let start_time = Instant::now();
info!("Cancelling task: {}", task_id);
let mut tasks = state.tasks.write().await;
if let Some(task) = tasks.get_mut(&task_id) {
task.status = TaskStatus::Cancelled;
task.updated_at = Utc::now();
task.error = Some("Task cancelled by user".to_string());
let response = serde_json::json!({
"message": format!("Task {} cancelled", task_id),
"status": "success",
"timestamp": Utc::now().to_rfc3339()
});
Ok(Json(ApiResponse::new(response, start_time)))
} else {
Err(OrchestratorError::TaskNotFound(task_id))
}
}
#[instrument(skip_all)]
pub async fn metrics_handler(
State(_state): State<Arc<OrchestratorState>>,
) -> Result<Json<ApiResponse<SystemMetrics>>, OrchestratorError> {
let start_time = Instant::now();
info!("Getting system metrics");
let metrics = SystemMetrics {
cpu_usage_percent: rand::random::<f64>() * 100.0,
memory_usage_bytes: rand::random::<u64>() % 8000000000,
disk_usage_percent: rand::random::<f64>() * 100.0,
network_throughput_bytes_sec: rand::random::<u64>() % 1000000,
timestamp: Utc::now(),
};
Ok(Json(ApiResponse::new(metrics, start_time)))
}
#[instrument(skip_all)]
pub async fn get_config_handler(
State(state): State<Arc<OrchestratorState>>,
) -> Result<Json<ApiResponse<ServiceConfig>>, OrchestratorError> {
let start_time = Instant::now();
info!("Getting configuration");
let config = state.get_config().await;
Ok(Json(ApiResponse::new(config, start_time)))
}
#[instrument(skip_all)]
pub async fn update_config_handler(
State(state): State<Arc<OrchestratorState>>,
Json(new_config): Json<ServiceConfig>,
) -> Result<Json<ApiResponse<serde_json::Value>>, OrchestratorError> {
let start_time = Instant::now();
info!("Updating configuration");
state.update_config(new_config).await;
let response = serde_json::json!({
"message": "Configuration updated successfully",
"status": "success",
"timestamp": Utc::now().to_rfc3339()
});
Ok(Json(ApiResponse::new(response, start_time)))
}
async fn check_all_services_health(state: Arc<OrchestratorState>) -> Vec<ServiceHealth> {
let services = vec![
"listmonk",
"postiz",
"openwebui",
"n8n",
"metabase",
"typesense",
];
let mut health_statuses = Vec::new();
for service_name in services {
if let Some(health) = state.get_service_health(service_name).await {
health_statuses.push(health);
} else {
let health = ServiceHealth {
name: service_name.to_string(),
status: ServiceStatus::Healthy, last_checked: Utc::now(),
response_time_ms: Some(rand::random::<u64>() % 100),
details: Some(serde_json::json!({"message": "Service health check performed"})),
};
state
.update_service_health(service_name.to_string(), health.clone())
.await;
health_statuses.push(health);
}
}
health_statuses
}
pub fn orchestrator_router(state: Arc<OrchestratorState>) -> Router {
Router::new()
.route("/api/v1", get(root_handler))
.route("/api/v1/health", get(health_handler))
.route("/api/v1/services", get(list_services_handler))
.route(
"/api/v1/services/:service_name",
get(get_service_handler).post(control_service_handler),
)
.route(
"/api/v1/services/:service_name/health",
get(service_health_handler),
)
.route(
"/api/v1/tasks",
get(list_tasks_handler).post(create_task_handler),
)
.route(
"/api/v1/tasks/:task_id",
get(get_task_handler).delete(cancel_task_handler),
)
.route("/api/v1/metrics", get(metrics_handler))
.route(
"/api/v1/config",
get(get_config_handler).put(update_config_handler),
)
.with_state(state)
}
#[cfg(test)]
mod tests {
use super::*;
use tokio;
#[test]
fn test_service_config_default() {
let config = ServiceConfig::default();
assert!(!config.listmonk.enabled);
assert!(!config.postiz.enabled);
assert!(!config.openwebui.enabled);
assert!(!config.n8n.enabled);
assert!(!config.metabase.enabled);
assert!(!config.typesense.enabled);
}
#[test]
fn test_task_status_serialization() {
let status = TaskStatus::Running;
let json = serde_json::to_string(&status).unwrap();
assert_eq!(json, "\"Running\"");
}
#[test]
fn test_service_status_serialization() {
let status = ServiceStatus::Healthy;
let json = serde_json::to_string(&status).unwrap();
assert_eq!(json, "\"Healthy\"");
}
#[tokio::test]
async fn test_orchestrator_state() {
let app_state = Arc::new(AppState::new());
let orchestrator_state = OrchestratorState::new(app_state);
let config = orchestrator_state.get_config().await;
assert_eq!(config, ServiceConfig::default());
let task_id = Uuid::new_v4();
let task = TaskResponse {
task_id,
status: TaskStatus::Queued,
created_at: Utc::now(),
updated_at: Utc::now(),
result: None,
error: None,
};
orchestrator_state.add_task(task.clone()).await;
let retrieved_task = orchestrator_state.get_task(task_id).await;
assert!(retrieved_task.is_some());
assert_eq!(retrieved_task.unwrap().task_id, task_id);
}
#[test]
fn test_api_response_serialization() {
let start_time = Instant::now();
let data = serde_json::json!({"test": "value"});
let response: ApiResponse<serde_json::Value> = ApiResponse::new(data, start_time);
assert!(response.meta.is_some());
assert_eq!(
response.meta.as_ref().unwrap().duration_ms,
start_time.elapsed().as_millis() as u64
);
}
#[tokio::test]
async fn test_service_health_caching() {
let app_state = Arc::new(AppState::new());
let orchestrator_state = OrchestratorState::new(app_state);
let service_name = "test-service".to_string();
let health = ServiceHealth {
name: service_name.clone(),
status: ServiceStatus::Healthy,
last_checked: Utc::now(),
response_time_ms: Some(50),
details: Some(serde_json::json!({"test": "details"})),
};
orchestrator_state
.update_service_health(service_name.clone(), health.clone())
.await;
let retrieved_health = orchestrator_state.get_service_health(&service_name).await;
assert!(retrieved_health.is_some());
assert_eq!(retrieved_health.unwrap().name, service_name);
}
}