Skip to main content

reasonkit_web/handlers/
orchestrator.rs

1//! Orchestrator API endpoints for the ReasonKit system
2//!
3//! This module provides comprehensive REST API endpoints for managing and monitoring
4//! the entire ReasonKit ecosystem, including service management, task execution,
5//! configuration, monitoring, and health checking for all integrated services.
6//!
7//! # Integrated Services
8//!
9//! - **listmonk** - Newsletter and mailing list management
10//! - **postiz** - Social media scheduling and management
11//! - **OpenWebUI** - Web-based UI for AI interactions
12//! - **n8n** - Workflow automation platform
13//! - **Metabase** - Business intelligence and analytics
14//! - **Typesense** - Search engine and database
15
16use axum::{
17    extract::{Path, Query, State},
18    http::StatusCode,
19    response::{IntoResponse, Json, Response},
20    routing::get,
21    Router,
22};
23use chrono::{DateTime, Utc};
24use metrics::counter;
25use serde::{Deserialize, Serialize};
26use serde_json::Value as JsonValue;
27use std::time::Instant;
28use std::{collections::HashMap, sync::Arc};
29use tokio::sync::RwLock;
30use tracing::{debug, error, info, instrument};
31use uuid::Uuid;
32
33use crate::handlers::AppState;
34
35// ============================================================================
36// Configuration Types
37// ============================================================================
38
39/// Configuration for all integrated services
40#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
41pub struct ServiceConfig {
42    /// Listmonk configuration
43    #[serde(default)]
44    pub listmonk: ServiceEndpointConfig,
45
46    /// Postiz configuration
47    #[serde(default)]
48    pub postiz: ServiceEndpointConfig,
49
50    /// OpenWebUI configuration
51    #[serde(default)]
52    pub openwebui: ServiceEndpointConfig,
53
54    /// n8n configuration
55    #[serde(default)]
56    pub n8n: ServiceEndpointConfig,
57
58    /// Metabase configuration
59    #[serde(default)]
60    pub metabase: ServiceEndpointConfig,
61
62    /// Typesense configuration
63    #[serde(default)]
64    pub typesense: ServiceEndpointConfig,
65}
66
67/// Configuration for individual service endpoints
68#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
69pub struct ServiceEndpointConfig {
70    /// Service URL
71    pub url: Option<String>,
72
73    /// API key or token for authentication
74    pub api_key: Option<String>,
75
76    /// Whether the service is enabled
77    #[serde(default = "default_true")]
78    pub enabled: bool,
79
80    /// Timeout in seconds
81    #[serde(default = "default_timeout")]
82    pub timeout_secs: u64,
83}
84
85fn default_true() -> bool {
86    true
87}
88fn default_timeout() -> u64 {
89    30
90}
91
92// ============================================================================
93// Task Management Types
94// ============================================================================
95
96/// Task execution request
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct TaskRequest {
99    /// Task name/type
100    pub task_type: String,
101
102    /// Task parameters
103    #[serde(default)]
104    pub parameters: HashMap<String, JsonValue>,
105
106    /// Priority level (1-10)
107    #[serde(default = "default_priority")]
108    pub priority: u8,
109
110    /// Optional deadline
111    pub deadline: Option<DateTime<Utc>>,
112}
113
114fn default_priority() -> u8 {
115    5
116}
117
118/// Task execution response
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct TaskResponse {
121    /// Task ID
122    pub task_id: Uuid,
123
124    /// Task status
125    pub status: TaskStatus,
126
127    /// Creation timestamp
128    pub created_at: DateTime<Utc>,
129
130    /// Last updated timestamp
131    pub updated_at: DateTime<Utc>,
132
133    /// Task result (if completed)
134    pub result: Option<JsonValue>,
135
136    /// Task error (if failed)
137    pub error: Option<String>,
138}
139
140/// Task status enumeration
141#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
142pub enum TaskStatus {
143    /// Task has been queued
144    Queued,
145
146    /// Task is currently running
147    Running,
148
149    /// Task completed successfully
150    Completed,
151
152    /// Task failed
153    Failed,
154
155    /// Task was cancelled
156    Cancelled,
157}
158
159/// Task filtering parameters
160#[derive(Debug, Clone, Serialize, Deserialize, Default)]
161pub struct TaskFilter {
162    /// Filter by status
163    pub status: Option<TaskStatus>,
164
165    /// Filter by task type
166    pub task_type: Option<String>,
167
168    /// Limit number of results
169    pub limit: Option<u32>,
170
171    /// Offset for pagination
172    pub offset: Option<u32>,
173}
174
175// ============================================================================
176// Service Management Types
177// ============================================================================
178
179/// Service health status
180#[derive(Debug, Clone, Serialize, Deserialize)]
181pub struct ServiceHealth {
182    /// Service name
183    pub name: String,
184
185    /// Current status
186    pub status: ServiceStatus,
187
188    /// Last checked timestamp
189    pub last_checked: DateTime<Utc>,
190
191    /// Response time in milliseconds
192    pub response_time_ms: Option<u64>,
193
194    /// Additional details
195    pub details: Option<JsonValue>,
196}
197
198/// Service status enumeration
199#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
200pub enum ServiceStatus {
201    /// Service is healthy and operational
202    Healthy,
203
204    /// Service is degraded but partially functional
205    Degraded,
206
207    /// Service is unhealthy and not responding
208    Unhealthy,
209
210    /// Service is disabled
211    Disabled,
212
213    /// Service status is unknown
214    Unknown,
215}
216
217/// Service control command
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub enum ServiceCommand {
220    /// Start the service
221    Start,
222
223    /// Stop the service
224    Stop,
225
226    /// Restart the service
227    Restart,
228
229    /// Enable the service
230    Enable,
231
232    /// Disable the service
233    Disable,
234}
235
236// ============================================================================
237// Monitoring and Metrics Types
238// ============================================================================
239
240/// System metrics snapshot
241#[derive(Debug, Clone, Serialize, Deserialize)]
242pub struct SystemMetrics {
243    /// CPU usage percentage
244    pub cpu_usage_percent: f64,
245
246    /// Memory usage in bytes
247    pub memory_usage_bytes: u64,
248
249    /// Disk usage percentage
250    pub disk_usage_percent: f64,
251
252    /// Network throughput in bytes/sec
253    pub network_throughput_bytes_sec: u64,
254
255    /// Timestamp of metrics collection
256    pub timestamp: DateTime<Utc>,
257}
258
259/// Service metrics
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub struct ServiceMetrics {
262    /// Service name
263    pub service_name: String,
264
265    /// Request count
266    pub request_count: u64,
267
268    /// Error count
269    pub error_count: u64,
270
271    /// Average response time in milliseconds
272    pub avg_response_time_ms: f64,
273
274    /// Uptime in seconds
275    pub uptime_seconds: u64,
276}
277
278// ============================================================================
279// Error Types
280// ============================================================================
281
282/// Orchestrator API errors
283#[derive(Debug, thiserror::Error)]
284pub enum OrchestratorError {
285    /// Invalid request parameters
286    #[error("Invalid request: {0}")]
287    InvalidRequest(String),
288
289    /// Service not found
290    #[error("Service not found: {0}")]
291    ServiceNotFound(String),
292
293    /// Service unavailable
294    #[error("Service unavailable: {0}")]
295    ServiceUnavailable(String),
296
297    /// Task not found
298    #[error("Task not found: {0}")]
299    TaskNotFound(Uuid),
300
301    /// Task execution failed
302    #[error("Task execution failed: {0}")]
303    TaskExecutionFailed(String),
304
305    /// Configuration error
306    #[error("Configuration error: {0}")]
307    ConfigurationError(String),
308
309    /// Authentication required
310    #[error("Authentication required: {0}")]
311    AuthenticationRequired(String),
312
313    /// Permission denied
314    #[error("Permission denied: {0}")]
315    PermissionDenied(String),
316
317    /// Internal server error
318    #[error("Internal server error: {0}")]
319    InternalError(String),
320}
321
322impl IntoResponse for OrchestratorError {
323    fn into_response(self) -> Response {
324        let (status, error_message) = match &self {
325            OrchestratorError::InvalidRequest(_) => (StatusCode::BAD_REQUEST, self.to_string()),
326            OrchestratorError::ServiceNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
327            OrchestratorError::ServiceUnavailable(_) => {
328                (StatusCode::SERVICE_UNAVAILABLE, self.to_string())
329            }
330            OrchestratorError::TaskNotFound(_) => (StatusCode::NOT_FOUND, self.to_string()),
331            OrchestratorError::TaskExecutionFailed(_) => {
332                (StatusCode::BAD_REQUEST, self.to_string())
333            }
334            OrchestratorError::ConfigurationError(_) => (StatusCode::BAD_REQUEST, self.to_string()),
335            OrchestratorError::AuthenticationRequired(_) => {
336                (StatusCode::UNAUTHORIZED, self.to_string())
337            }
338            OrchestratorError::PermissionDenied(_) => (StatusCode::FORBIDDEN, self.to_string()),
339            OrchestratorError::InternalError(_) => {
340                (StatusCode::INTERNAL_SERVER_ERROR, self.to_string())
341            }
342        };
343
344        // Increment error counter
345        let error_type = std::any::type_name::<Self>();
346        counter!("orchestrator_errors_total", "type" => error_type).increment(1);
347
348        let body = serde_json::json!({
349            "error": {
350                "type": error_type,
351                "message": error_message,
352            }
353        });
354
355        (status, Json(body)).into_response()
356    }
357}
358
359// ============================================================================
360// Shared State
361// ============================================================================
362
363/// Shared state for orchestrator handlers
364#[derive(Clone)]
365pub struct OrchestratorState {
366    /// Application state for metrics
367    pub app_state: Arc<AppState>,
368
369    /// Service configuration
370    pub config: Arc<RwLock<ServiceConfig>>,
371
372    /// Task storage
373    pub tasks: Arc<RwLock<HashMap<Uuid, TaskResponse>>>,
374
375    /// Service health cache
376    pub service_health: Arc<RwLock<HashMap<String, ServiceHealth>>>,
377}
378
379impl OrchestratorState {
380    /// Create new orchestrator state
381    pub fn new(app_state: Arc<AppState>) -> Self {
382        Self {
383            app_state,
384            config: Arc::new(RwLock::new(ServiceConfig::default())),
385            tasks: Arc::new(RwLock::new(HashMap::new())),
386            service_health: Arc::new(RwLock::new(HashMap::new())),
387        }
388    }
389
390    /// Update service configuration
391    pub async fn update_config(&self, new_config: ServiceConfig) {
392        let mut config = self.config.write().await;
393        *config = new_config;
394    }
395
396    /// Get current service configuration
397    pub async fn get_config(&self) -> ServiceConfig {
398        self.config.read().await.clone()
399    }
400
401    /// Add a new task
402    pub async fn add_task(&self, task: TaskResponse) {
403        let mut tasks = self.tasks.write().await;
404        tasks.insert(task.task_id, task);
405    }
406
407    /// Get task by ID
408    pub async fn get_task(&self, task_id: Uuid) -> Option<TaskResponse> {
409        let tasks = self.tasks.read().await;
410        tasks.get(&task_id).cloned()
411    }
412
413    /// Update service health status
414    pub async fn update_service_health(&self, service_name: String, health: ServiceHealth) {
415        let mut health_cache = self.service_health.write().await;
416        health_cache.insert(service_name, health);
417    }
418
419    /// Get service health status
420    pub async fn get_service_health(&self, service_name: &str) -> Option<ServiceHealth> {
421        let health_cache = self.service_health.read().await;
422        health_cache.get(service_name).cloned()
423    }
424}
425
426// ============================================================================
427// Request/Response Types
428// ============================================================================
429
430/// Generic API response wrapper
431#[derive(Debug, Clone, Serialize, Deserialize)]
432pub struct ApiResponse<T> {
433    /// Response data
434    pub data: T,
435
436    /// Response metadata
437    pub meta: Option<ApiMeta>,
438}
439
440/// API response metadata
441#[derive(Debug, Clone, Serialize, Deserialize)]
442pub struct ApiMeta {
443    /// Response timestamp
444    pub timestamp: DateTime<Utc>,
445
446    /// Response duration in milliseconds
447    pub duration_ms: u64,
448}
449
450impl<T> ApiResponse<T> {
451    /// Create a new API response
452    pub fn new(data: T, start_time: Instant) -> Self {
453        let duration_ms = start_time.elapsed().as_millis() as u64;
454        Self {
455            data,
456            meta: Some(ApiMeta {
457                timestamp: Utc::now(),
458                duration_ms,
459            }),
460        }
461    }
462}
463
464// ============================================================================
465// Handler Implementations
466// ============================================================================
467
468/// Root endpoint - API information
469#[instrument(skip_all)]
470pub async fn root_handler(
471    State(_state): State<Arc<OrchestratorState>>,
472) -> Result<Json<ApiResponse<serde_json::Value>>, OrchestratorError> {
473    let start_time = Instant::now();
474    info!("API root endpoint accessed");
475
476    let response = serde_json::json!({
477        "name": "ReasonKit Orchestrator API",
478        "version": env!("CARGO_PKG_VERSION"),
479        "description": "Central orchestration API for ReasonKit services",
480        "endpoints": [
481            "GET /api/v1 - API information",
482            "GET /api/v1/health - Overall system health",
483            "GET /api/v1/services - List all services",
484            "GET /api/v1/services/{name} - Get service details",
485            "POST /api/v1/services/{name} - Control service",
486            "GET /api/v1/services/{name}/health - Get service health",
487            "GET /api/v1/tasks - List tasks",
488            "POST /api/v1/tasks - Create new task",
489            "GET /api/v1/tasks/{id} - Get task details",
490            "DELETE /api/v1/tasks/{id} - Cancel task",
491            "GET /api/v1/metrics - Get system metrics",
492            "GET /api/v1/config - Get configuration",
493            "PUT /api/v1/config - Update configuration"
494        ]
495    });
496
497    Ok(Json(ApiResponse::new(response, start_time)))
498}
499
500/// Overall system health endpoint
501#[instrument(skip_all)]
502pub async fn health_handler(
503    State(state): State<Arc<OrchestratorState>>,
504) -> Result<Json<ApiResponse<serde_json::Value>>, OrchestratorError> {
505    let start_time = Instant::now();
506    info!("System health check requested");
507
508    // Authenticate request
509    // authenticate().await?;
510
511    // Check overall system health
512    let app_health = state.app_state.health();
513    let services_health = check_all_services_health(state.clone()).await;
514
515    let response = serde_json::json!({
516        "status": "healthy",
517        "timestamp": Utc::now().to_rfc3339(),
518        "uptime_seconds": state.app_state.uptime_seconds(),
519        "application": app_health,
520        "services": services_health
521    });
522
523    Ok(Json(ApiResponse::new(response, start_time)))
524}
525
526/// List all services
527#[instrument(skip_all)]
528pub async fn list_services_handler(
529    State(state): State<Arc<OrchestratorState>>,
530) -> Result<Json<ApiResponse<Vec<String>>>, OrchestratorError> {
531    let start_time = Instant::now();
532    info!("Listing all services");
533
534    let config = state.get_config().await;
535    let mut services = Vec::new();
536
537    if config.listmonk.enabled {
538        services.push("listmonk".to_string());
539    }
540    if config.postiz.enabled {
541        services.push("postiz".to_string());
542    }
543    if config.openwebui.enabled {
544        services.push("openwebui".to_string());
545    }
546    if config.n8n.enabled {
547        services.push("n8n".to_string());
548    }
549    if config.metabase.enabled {
550        services.push("metabase".to_string());
551    }
552    if config.typesense.enabled {
553        services.push("typesense".to_string());
554    }
555
556    Ok(Json(ApiResponse::new(services, start_time)))
557}
558
559/// Get service details
560#[instrument(skip_all, fields(service_name = %service_name))]
561pub async fn get_service_handler(
562    Path(service_name): Path<String>,
563    State(state): State<Arc<OrchestratorState>>,
564) -> Result<Json<ApiResponse<serde_json::Value>>, OrchestratorError> {
565    let start_time = Instant::now();
566    info!("Getting service details for: {}", service_name);
567
568    let config = state.get_config().await;
569
570    let service_info = match service_name.as_str() {
571        "listmonk" => serde_json::to_value(&config.listmonk).unwrap_or(serde_json::Value::Null),
572        "postiz" => serde_json::to_value(&config.postiz).unwrap_or(serde_json::Value::Null),
573        "openwebui" => serde_json::to_value(&config.openwebui).unwrap_or(serde_json::Value::Null),
574        "n8n" => serde_json::to_value(&config.n8n).unwrap_or(serde_json::Value::Null),
575        "metabase" => serde_json::to_value(&config.metabase).unwrap_or(serde_json::Value::Null),
576        "typesense" => serde_json::to_value(&config.typesense).unwrap_or(serde_json::Value::Null),
577        _ => return Err(OrchestratorError::ServiceNotFound(service_name)),
578    };
579
580    Ok(Json(ApiResponse::new(service_info, start_time)))
581}
582
583/// Control service (start, stop, restart, etc.)
584#[instrument(skip_all, fields(service_name = %service_name))]
585pub async fn control_service_handler(
586    Path(service_name): Path<String>,
587    State(_state): State<Arc<OrchestratorState>>,
588    Json(command): Json<ServiceCommand>,
589) -> Result<Json<ApiResponse<serde_json::Value>>, OrchestratorError> {
590    let start_time = Instant::now();
591    info!(
592        "Controlling service: {} with command: {:?}",
593        service_name, command
594    );
595
596    // In a real implementation, this would actually control the services
597    // For now, we'll simulate the behavior
598
599    let result = match command {
600        ServiceCommand::Start => format!("Service {} started", service_name),
601        ServiceCommand::Stop => format!("Service {} stopped", service_name),
602        ServiceCommand::Restart => format!("Service {} restarted", service_name),
603        ServiceCommand::Enable => format!("Service {} enabled", service_name),
604        ServiceCommand::Disable => format!("Service {} disabled", service_name),
605    };
606
607    let response = serde_json::json!({
608        "message": result,
609        "status": "success",
610        "timestamp": Utc::now().to_rfc3339()
611    });
612
613    Ok(Json(ApiResponse::new(response, start_time)))
614}
615
616/// Get service health
617#[instrument(skip_all, fields(service_name = %service_name))]
618pub async fn service_health_handler(
619    Path(service_name): Path<String>,
620    State(state): State<Arc<OrchestratorState>>,
621) -> Result<Json<ApiResponse<ServiceHealth>>, OrchestratorError> {
622    let start_time = Instant::now();
623    info!("Checking health for service: {}", service_name);
624
625    // Check if we have cached health info
626    if let Some(health) = state.get_service_health(&service_name).await {
627        return Ok(Json(ApiResponse::new(health, start_time)));
628    }
629
630    // In a real implementation, this would actually check service health
631    // For now, we'll simulate a health check
632
633    let health = ServiceHealth {
634        name: service_name.clone(),
635        status: ServiceStatus::Healthy,
636        last_checked: Utc::now(),
637        response_time_ms: Some(rand::random::<u64>() % 100),
638        details: Some(serde_json::json!({"message": "Service is healthy"})),
639    };
640
641    // Cache the health info
642    state
643        .update_service_health(service_name, health.clone())
644        .await;
645
646    Ok(Json(ApiResponse::new(health, start_time)))
647}
648
649/// List tasks
650#[instrument(skip_all)]
651pub async fn list_tasks_handler(
652    Query(filter): Query<TaskFilter>,
653    State(state): State<Arc<OrchestratorState>>,
654) -> Result<Json<ApiResponse<Vec<TaskResponse>>>, OrchestratorError> {
655    let start_time = Instant::now();
656    info!("Listing tasks with filter: {:?}", filter);
657
658    let tasks = state.tasks.read().await;
659    let mut task_list: Vec<TaskResponse> = tasks.values().cloned().collect();
660
661    // Apply filters
662    if let Some(status) = filter.status {
663        task_list.retain(|task| task.status == status);
664    }
665
666    if let Some(task_type) = filter.task_type {
667        // In a real implementation, we would filter by task type
668        // For now, we'll just log the filter
669        debug!("Filtering tasks by type: {}", task_type);
670    }
671
672    // Apply pagination
673    if let Some(limit) = filter.limit {
674        task_list.truncate(limit as usize);
675    }
676
677    Ok(Json(ApiResponse::new(task_list, start_time)))
678}
679
680/// Create new task
681#[instrument(skip_all)]
682pub async fn create_task_handler(
683    State(state): State<Arc<OrchestratorState>>,
684    Json(task_request): Json<TaskRequest>,
685) -> Result<Json<ApiResponse<TaskResponse>>, OrchestratorError> {
686    let start_time = Instant::now();
687    info!("Creating new task: {}", task_request.task_type);
688
689    // Create task response
690    let task_id = Uuid::new_v4();
691    let task_response = TaskResponse {
692        task_id,
693        status: TaskStatus::Queued,
694        created_at: Utc::now(),
695        updated_at: Utc::now(),
696        result: None,
697        error: None,
698    };
699
700    // Store task
701    state.add_task(task_response.clone()).await;
702
703    // In a real implementation, we would actually execute the task
704    // For now, we'll just log it
705    info!("Task {} queued for execution", task_id);
706
707    Ok(Json(ApiResponse::new(task_response, start_time)))
708}
709
710/// Get task details
711#[instrument(skip_all, fields(task_id = %task_id))]
712pub async fn get_task_handler(
713    Path(task_id): Path<Uuid>,
714    State(state): State<Arc<OrchestratorState>>,
715) -> Result<Json<ApiResponse<TaskResponse>>, OrchestratorError> {
716    let start_time = Instant::now();
717    info!("Getting task details for: {}", task_id);
718
719    if let Some(task) = state.get_task(task_id).await {
720        Ok(Json(ApiResponse::new(task, start_time)))
721    } else {
722        Err(OrchestratorError::TaskNotFound(task_id))
723    }
724}
725
726/// Cancel task
727#[instrument(skip_all, fields(task_id = %task_id))]
728pub async fn cancel_task_handler(
729    Path(task_id): Path<Uuid>,
730    State(state): State<Arc<OrchestratorState>>,
731) -> Result<Json<ApiResponse<serde_json::Value>>, OrchestratorError> {
732    let start_time = Instant::now();
733    info!("Cancelling task: {}", task_id);
734
735    let mut tasks = state.tasks.write().await;
736    if let Some(task) = tasks.get_mut(&task_id) {
737        task.status = TaskStatus::Cancelled;
738        task.updated_at = Utc::now();
739        task.error = Some("Task cancelled by user".to_string());
740
741        let response = serde_json::json!({
742            "message": format!("Task {} cancelled", task_id),
743            "status": "success",
744            "timestamp": Utc::now().to_rfc3339()
745        });
746
747        Ok(Json(ApiResponse::new(response, start_time)))
748    } else {
749        Err(OrchestratorError::TaskNotFound(task_id))
750    }
751}
752
753/// Get system metrics
754#[instrument(skip_all)]
755pub async fn metrics_handler(
756    State(_state): State<Arc<OrchestratorState>>,
757) -> Result<Json<ApiResponse<SystemMetrics>>, OrchestratorError> {
758    let start_time = Instant::now();
759    info!("Getting system metrics");
760
761    // In a real implementation, this would collect actual system metrics
762    // For now, we'll generate sample metrics
763
764    let metrics = SystemMetrics {
765        cpu_usage_percent: rand::random::<f64>() * 100.0,
766        memory_usage_bytes: rand::random::<u64>() % 8000000000,
767        disk_usage_percent: rand::random::<f64>() * 100.0,
768        network_throughput_bytes_sec: rand::random::<u64>() % 1000000,
769        timestamp: Utc::now(),
770    };
771
772    Ok(Json(ApiResponse::new(metrics, start_time)))
773}
774
775/// Get configuration
776#[instrument(skip_all)]
777pub async fn get_config_handler(
778    State(state): State<Arc<OrchestratorState>>,
779) -> Result<Json<ApiResponse<ServiceConfig>>, OrchestratorError> {
780    let start_time = Instant::now();
781    info!("Getting configuration");
782
783    let config = state.get_config().await;
784    Ok(Json(ApiResponse::new(config, start_time)))
785}
786
787/// Update configuration
788#[instrument(skip_all)]
789pub async fn update_config_handler(
790    State(state): State<Arc<OrchestratorState>>,
791    Json(new_config): Json<ServiceConfig>,
792) -> Result<Json<ApiResponse<serde_json::Value>>, OrchestratorError> {
793    let start_time = Instant::now();
794    info!("Updating configuration");
795
796    state.update_config(new_config).await;
797
798    let response = serde_json::json!({
799        "message": "Configuration updated successfully",
800        "status": "success",
801        "timestamp": Utc::now().to_rfc3339()
802    });
803
804    Ok(Json(ApiResponse::new(response, start_time)))
805}
806
807// ============================================================================
808// Helper Functions
809// ============================================================================
810
811/// Check health of all services
812async fn check_all_services_health(state: Arc<OrchestratorState>) -> Vec<ServiceHealth> {
813    let services = vec![
814        "listmonk",
815        "postiz",
816        "openwebui",
817        "n8n",
818        "metabase",
819        "typesense",
820    ];
821    let mut health_statuses = Vec::new();
822
823    for service_name in services {
824        // Try to get cached health first
825        if let Some(health) = state.get_service_health(service_name).await {
826            health_statuses.push(health);
827        } else {
828            // Create a new health status
829            let health = ServiceHealth {
830                name: service_name.to_string(),
831                status: ServiceStatus::Healthy, // In reality, we'd actually check
832                last_checked: Utc::now(),
833                response_time_ms: Some(rand::random::<u64>() % 100),
834                details: Some(serde_json::json!({"message": "Service health check performed"})),
835            };
836
837            // Cache it
838            state
839                .update_service_health(service_name.to_string(), health.clone())
840                .await;
841            health_statuses.push(health);
842        }
843    }
844
845    health_statuses
846}
847
848// ============================================================================
849// Router Configuration
850// ============================================================================
851
852/// Create the orchestrator router with all endpoints
853pub fn orchestrator_router(state: Arc<OrchestratorState>) -> Router {
854    Router::new()
855        .route("/api/v1", get(root_handler))
856        .route("/api/v1/health", get(health_handler))
857        .route("/api/v1/services", get(list_services_handler))
858        .route(
859            "/api/v1/services/:service_name",
860            get(get_service_handler).post(control_service_handler),
861        )
862        .route(
863            "/api/v1/services/:service_name/health",
864            get(service_health_handler),
865        )
866        .route(
867            "/api/v1/tasks",
868            get(list_tasks_handler).post(create_task_handler),
869        )
870        .route(
871            "/api/v1/tasks/:task_id",
872            get(get_task_handler).delete(cancel_task_handler),
873        )
874        .route("/api/v1/metrics", get(metrics_handler))
875        .route(
876            "/api/v1/config",
877            get(get_config_handler).put(update_config_handler),
878        )
879        .with_state(state)
880}
881
882#[cfg(test)]
883mod tests {
884    use super::*;
885    use tokio;
886
887    #[test]
888    fn test_service_config_default() {
889        let config = ServiceConfig::default();
890        // All services should be disabled by default for security
891        assert!(!config.listmonk.enabled);
892        assert!(!config.postiz.enabled);
893        assert!(!config.openwebui.enabled);
894        assert!(!config.n8n.enabled);
895        assert!(!config.metabase.enabled);
896        assert!(!config.typesense.enabled);
897    }
898
899    #[test]
900    fn test_task_status_serialization() {
901        let status = TaskStatus::Running;
902        let json = serde_json::to_string(&status).unwrap();
903        assert_eq!(json, "\"Running\"");
904    }
905
906    #[test]
907    fn test_service_status_serialization() {
908        let status = ServiceStatus::Healthy;
909        let json = serde_json::to_string(&status).unwrap();
910        assert_eq!(json, "\"Healthy\"");
911    }
912
913    #[tokio::test]
914    async fn test_orchestrator_state() {
915        let app_state = Arc::new(AppState::new());
916        let orchestrator_state = OrchestratorState::new(app_state);
917
918        // Test config operations
919        let config = orchestrator_state.get_config().await;
920        assert_eq!(config, ServiceConfig::default());
921
922        // Test task operations
923        let task_id = Uuid::new_v4();
924        let task = TaskResponse {
925            task_id,
926            status: TaskStatus::Queued,
927            created_at: Utc::now(),
928            updated_at: Utc::now(),
929            result: None,
930            error: None,
931        };
932
933        orchestrator_state.add_task(task.clone()).await;
934        let retrieved_task = orchestrator_state.get_task(task_id).await;
935        assert!(retrieved_task.is_some());
936        assert_eq!(retrieved_task.unwrap().task_id, task_id);
937    }
938
939    #[test]
940    fn test_api_response_serialization() {
941        let start_time = Instant::now();
942        let data = serde_json::json!({"test": "value"});
943        let response: ApiResponse<serde_json::Value> = ApiResponse::new(data, start_time);
944
945        assert!(response.meta.is_some());
946        assert_eq!(
947            response.meta.as_ref().unwrap().duration_ms,
948            start_time.elapsed().as_millis() as u64
949        );
950    }
951
952    #[tokio::test]
953    async fn test_service_health_caching() {
954        let app_state = Arc::new(AppState::new());
955        let orchestrator_state = OrchestratorState::new(app_state);
956
957        let service_name = "test-service".to_string();
958        let health = ServiceHealth {
959            name: service_name.clone(),
960            status: ServiceStatus::Healthy,
961            last_checked: Utc::now(),
962            response_time_ms: Some(50),
963            details: Some(serde_json::json!({"test": "details"})),
964        };
965
966        orchestrator_state
967            .update_service_health(service_name.clone(), health.clone())
968            .await;
969        let retrieved_health = orchestrator_state.get_service_health(&service_name).await;
970
971        assert!(retrieved_health.is_some());
972        assert_eq!(retrieved_health.unwrap().name, service_name);
973    }
974}