rust_task_queue/
actix.rs

1//! Actix Web integration helpers with comprehensive metrics endpoints
2
3use crate::prelude::*;
4use crate::queue_names;
5#[cfg(feature = "actix-integration")]
6use actix_web::{web, HttpResponse, Result as ActixResult};
7use serde_json::json;
8use std::collections::HashMap;
9use std::sync::Arc;
10
11#[cfg(feature = "actix-integration")]
12/// Detailed health check with component status
13async fn detailed_health_check(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
14    match task_queue.health_check().await {
15        Ok(health_status) => Ok(HttpResponse::Ok().json(health_status)),
16        Err(e) => Ok(HttpResponse::ServiceUnavailable().json(json!({
17            "status": "unhealthy",
18            "error": e.to_string(),
19            "timestamp": Utc::now()
20        }))),
21    }
22}
23
24#[cfg(feature = "actix-integration")]
25/// System status with health metrics
26async fn system_status(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
27    let health_status = task_queue.metrics.get_health_status().await;
28    let worker_count = task_queue.worker_count().await;
29    let is_shutting_down = task_queue.is_shutting_down().await;
30
31    Ok(HttpResponse::Ok().json(json!({
32        "health": health_status,
33        "workers": {
34            "active_count": worker_count,
35            "shutting_down": is_shutting_down
36        },
37        "timestamp": Utc::now()
38    })))
39}
40
41#[cfg(feature = "actix-integration")]
42/// Comprehensive metrics combining all available metrics
43async fn get_comprehensive_metrics(
44    task_queue: web::Data<Arc<TaskQueue>>,
45) -> ActixResult<HttpResponse> {
46    // Handle mixed Result and non-Result return types separately
47    let system_metrics = task_queue.get_system_metrics().await;
48    let worker_count = task_queue.worker_count().await;
49
50    match tokio::try_join!(
51        task_queue.get_metrics(),
52        task_queue.autoscaler.collect_metrics(),
53    ) {
54        Ok((queue_metrics, autoscaler_metrics)) => Ok(HttpResponse::Ok().json(json!({
55            "timestamp": Utc::now(),
56            "task_queue_metrics": queue_metrics,
57            "system_metrics": system_metrics,
58            "autoscaler_metrics": autoscaler_metrics,
59            "worker_count": worker_count
60        }))),
61        Err(e) => Ok(HttpResponse::InternalServerError().json(json!({
62            "error": e.to_string(),
63            "timestamp": Utc::now()
64        }))),
65    }
66}
67
68#[cfg(feature = "actix-integration")]
69/// Enhanced system metrics with memory and performance data
70async fn get_system_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
71    let system_metrics = task_queue.get_system_metrics().await;
72    Ok(HttpResponse::Ok().json(system_metrics))
73}
74
75#[cfg(feature = "actix-integration")]
76/// Performance metrics and performance report
77async fn get_performance_metrics(
78    task_queue: web::Data<Arc<TaskQueue>>,
79) -> ActixResult<HttpResponse> {
80    let performance_report = task_queue.metrics.get_performance_report().await;
81    Ok(HttpResponse::Ok().json(performance_report))
82}
83
84#[cfg(feature = "actix-integration")]
85/// AutoScaler metrics and recommendations
86async fn get_autoscaler_metrics(
87    task_queue: web::Data<Arc<TaskQueue>>,
88) -> ActixResult<HttpResponse> {
89    match tokio::try_join!(task_queue.autoscaler.collect_metrics(),) {
90        Ok(metrics) => Ok(HttpResponse::Ok().json(json!({
91            "metrics": metrics,
92            "timestamp": Utc::now()
93        }))),
94        Err(e) => Ok(HttpResponse::InternalServerError().json(json!({
95            "error": e.to_string(),
96            "timestamp": Utc::now()
97        }))),
98    }
99}
100
101#[cfg(feature = "actix-integration")]
102/// Individual queue metrics for all queues
103async fn get_queue_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
104    let queues = [
105        queue_names::DEFAULT,
106        queue_names::LOW_PRIORITY,
107        queue_names::HIGH_PRIORITY,
108    ];
109    let mut queue_metrics = Vec::new();
110    let mut errors = Vec::new();
111
112    for queue in &queues {
113        match task_queue.broker.get_queue_metrics(queue).await {
114            Ok(metrics) => queue_metrics.push(metrics),
115            Err(e) => errors.push(json!({
116                "queue": queue,
117                "error": e.to_string()
118            })),
119        }
120    }
121
122    Ok(HttpResponse::Ok().json(json!({
123        "queue_metrics": queue_metrics,
124        "errors": errors,
125        "timestamp": Utc::now()
126    })))
127}
128
129#[cfg(feature = "actix-integration")]
130/// Worker-specific metrics
131async fn get_worker_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
132    let worker_count = task_queue.worker_count().await;
133    let system_metrics = task_queue.get_system_metrics().await;
134
135    Ok(HttpResponse::Ok().json(json!({
136        "active_workers": worker_count,
137        "worker_metrics": system_metrics.workers,
138        "is_shutting_down": task_queue.is_shutting_down().await,
139        "timestamp": Utc::now()
140    })))
141}
142
143#[cfg(feature = "actix-integration")]
144/// Memory usage metrics
145async fn get_memory_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
146    let system_metrics = task_queue.get_system_metrics().await;
147    Ok(HttpResponse::Ok().json(json!({
148        "memory_metrics": system_metrics.memory,
149        "timestamp": Utc::now()
150    })))
151}
152
153#[cfg(not(all(feature = "actix-integration", feature = "auto-register")))]
154/// Fallback when auto-register feature is not enabled
155async fn get_registered_tasks() -> ActixResult<HttpResponse> {
156    Ok(HttpResponse::Ok().json(json!({
157        "message": "Auto-registration feature not enabled",
158        "auto_registration_enabled": false,
159        "timestamp": Utc::now()
160    })))
161}
162
163#[cfg(feature = "actix-integration")]
164/// Detailed task registry information
165async fn get_registry_info() -> ActixResult<HttpResponse> {
166    #[cfg(feature = "auto-register")]
167    {
168        match TaskRegistry::with_auto_registered() {
169            Ok(registry) => {
170                let registered_tasks = registry.registered_tasks();
171                Ok(HttpResponse::Ok().json(json!({
172                    "registry_type": "auto_registered",
173                    "task_count": registered_tasks.len(),
174                    "tasks": registered_tasks,
175                    "features": {
176                        "auto_register": true,
177                        "inventory_based": true
178                    },
179                    "timestamp": Utc::now()
180                })))
181            }
182            Err(e) => Ok(HttpResponse::InternalServerError().json(json!({
183                "error": e.to_string(),
184                "registry_type": "auto_registered",
185                "timestamp": Utc::now()
186            }))),
187        }
188    }
189    #[cfg(not(feature = "auto-register"))]
190    {
191        Ok(HttpResponse::Ok().json(json!({
192            "registry_type": "manual",
193            "message": "Auto-registration not available - manual registry in use",
194            "features": {
195                "auto_register": false,
196                "inventory_based": false
197            },
198            "timestamp": Utc::now()
199        })))
200    }
201}
202
203#[cfg(feature = "actix-integration")]
204/// Get active alerts from the metrics system
205async fn get_active_alerts(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
206    let performance_report = task_queue.metrics.get_performance_report().await;
207    Ok(HttpResponse::Ok().json(json!({
208        "active_alerts": performance_report.active_alerts,
209        "alert_count": performance_report.active_alerts.len(),
210        "timestamp": Utc::now()
211    })))
212}
213
214#[cfg(feature = "actix-integration")]
215/// SLA status and violations
216async fn get_sla_status(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
217    let performance_report = task_queue.metrics.get_performance_report().await;
218    let system_metrics = task_queue.get_system_metrics().await;
219
220    Ok(HttpResponse::Ok().json(json!({
221        "sla_violations": performance_report.sla_violations,
222        "violation_count": performance_report.sla_violations.len(),
223        "performance_metrics": system_metrics.performance,
224        "success_rate_percentage": system_metrics.performance.success_rate * 100.0,
225        "error_rate_percentage": system_metrics.performance.error_rate * 100.0,
226        "timestamp": Utc::now()
227    })))
228}
229
230#[cfg(feature = "actix-integration")]
231/// Comprehensive diagnostics information
232async fn get_diagnostics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
233    let health_status = task_queue.metrics.get_health_status().await;
234    let performance_report = task_queue.metrics.get_performance_report().await;
235    let worker_count = task_queue.worker_count().await;
236    let is_shutting_down = task_queue.is_shutting_down().await;
237
238    // Get individual queue diagnostics
239    let queues = ["default", "high_priority", "low_priority"];
240    let mut queue_diagnostics = HashMap::new();
241
242    for queue in &queues {
243        if let Ok(metrics) = task_queue.broker.get_queue_metrics(queue).await {
244            queue_diagnostics.insert(queue.to_string(), json!({
245                "pending_tasks": metrics.pending_tasks,
246                "processed_tasks": metrics.processed_tasks,
247                "failed_tasks": metrics.failed_tasks,
248                "health": if metrics.failed_tasks > 0 && metrics.processed_tasks > 0 {
249                    let error_rate = metrics.failed_tasks as f64 / (metrics.processed_tasks + metrics.failed_tasks) as f64;
250                    if error_rate > 0.1 { "unhealthy" } else if error_rate > 0.05 { "warning" } else { "healthy" }
251                } else {
252                    "healthy"
253                }
254            }));
255        }
256    }
257
258    Ok(HttpResponse::Ok().json(json!({
259        "system_health": health_status,
260        "performance_report": performance_report,
261        "worker_diagnostics": {
262            "active_count": worker_count,
263            "shutting_down": is_shutting_down
264        },
265        "queue_diagnostics": queue_diagnostics,
266        "uptime_seconds": performance_report.uptime_seconds,
267        "timestamp": Utc::now()
268    })))
269}
270
271#[cfg(feature = "actix-integration")]
272/// System uptime and runtime information
273async fn get_uptime_info(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
274    let system_metrics = task_queue.get_system_metrics().await;
275    let performance_report = task_queue.metrics.get_performance_report().await;
276
277    let uptime_duration = std::time::Duration::from_secs(system_metrics.uptime_seconds);
278    let days = uptime_duration.as_secs() / 86400;
279    let hours = (uptime_duration.as_secs() % 86400) / 3600;
280    let minutes = (uptime_duration.as_secs() % 3600) / 60;
281    let seconds = uptime_duration.as_secs() % 60;
282
283    Ok(HttpResponse::Ok().json(json!({
284        "uptime": {
285            "seconds": system_metrics.uptime_seconds,
286            "formatted": format!("{}d {}h {}m {}s", days, hours, minutes, seconds),
287            "started_at": Utc::now() - Duration::seconds(system_metrics.uptime_seconds as i64)
288        },
289        "runtime_info": {
290            "total_tasks_executed": system_metrics.tasks.total_executed,
291            "total_tasks_succeeded": system_metrics.tasks.total_succeeded,
292            "total_tasks_failed": system_metrics.tasks.total_failed,
293            "success_rate_percentage": system_metrics.performance.success_rate * 100.0,
294            "average_tasks_per_second": system_metrics.performance.tasks_per_second
295        },
296        "performance_summary": {
297            "task_performance": performance_report.task_performance
298        },
299        "timestamp": Utc::now()
300    })))
301}
302
303#[cfg(feature = "actix-integration")]
304/// Helper for creating TaskQueue with auto-registration for Actix Web apps
305pub async fn create_auto_registered_task_queue(
306    redis_url: &str,
307    initial_workers: Option<usize>,
308) -> Result<Arc<TaskQueue>, TaskQueueError> {
309    let mut builder = TaskQueueBuilder::new(redis_url);
310
311    #[cfg(feature = "auto-register")]
312    {
313        builder = builder.auto_register_tasks();
314    }
315
316    if let Some(workers) = initial_workers {
317        builder = builder.initial_workers(workers);
318    }
319
320    Ok(Arc::new(builder.build().await?))
321}
322
323#[cfg(feature = "actix-integration")]
324/// Create TaskQueue from global configuration - ideal for Actix apps
325pub async fn create_task_queue_from_config() -> Result<Arc<TaskQueue>, TaskQueueError> {
326    let task_queue = TaskQueueBuilder::from_global_config()?.build().await?;
327    Ok(Arc::new(task_queue))
328}
329
330#[cfg(feature = "actix-integration")]
331/// Auto-configure TaskQueue for Actix Web with minimal setup
332pub async fn auto_configure_task_queue() -> Result<Arc<TaskQueue>, TaskQueueError> {
333    // Initialize global configuration
334    TaskQueueConfig::init_global()?;
335
336    // Create TaskQueue from global config
337    let task_queue = TaskQueueBuilder::from_global_config()?.build().await?;
338    Ok(Arc::new(task_queue))
339}
340
341#[cfg(feature = "actix-integration")]
342/// Configure Actix Web routes based on global configuration
343pub fn configure_task_queue_routes_auto(cfg: &mut web::ServiceConfig) {
344    if let Some(config) = TaskQueueConfig::global() {
345        #[cfg(feature = "actix-integration")]
346        if config.actix.auto_configure_routes {
347            let prefix = &config.actix.route_prefix;
348            let mut scope = web::scope(prefix);
349
350            if config.actix.enable_health_check {
351                scope = scope
352                    .route("/health", web::get().to(detailed_health_check))
353                    .route("/status", web::get().to(system_status));
354            }
355
356            if config.actix.enable_metrics {
357                scope = scope
358                    .route("/metrics", web::get().to(get_comprehensive_metrics))
359                    .route("/metrics/system", web::get().to(get_system_metrics))
360                    .route(
361                        "/metrics/performance",
362                        web::get().to(get_performance_metrics),
363                    )
364                    .route("/metrics/autoscaler", web::get().to(get_autoscaler_metrics))
365                    .route("/metrics/queues", web::get().to(get_queue_metrics))
366                    .route("/metrics/workers", web::get().to(get_worker_metrics))
367                    .route("/metrics/memory", web::get().to(get_memory_metrics))
368                    .route("/registry", web::get().to(get_registry_info))
369                    .route("/alerts", web::get().to(get_active_alerts))
370                    .route("/sla", web::get().to(get_sla_status))
371                    .route("/diagnostics", web::get().to(get_diagnostics))
372                    .route("/uptime", web::get().to(get_uptime_info));
373            }
374
375            cfg.service(scope);
376        }
377    } else {
378        // Fallback to default comprehensive configuration
379        configure_task_queue_routes(cfg);
380    }
381}
382
383#[cfg(feature = "actix-integration")]
384/// Comprehensive Actix Web service configuration with all metrics endpoints
385pub fn configure_task_queue_routes(cfg: &mut web::ServiceConfig) {
386    cfg.service(
387        web::scope("/tasks")
388            // Health and Status endpoints
389            .route("/health", web::get().to(detailed_health_check))
390            .route("/status", web::get().to(system_status))
391            // Metrics endpoints
392            .route("/metrics", web::get().to(get_comprehensive_metrics))
393            .route("/metrics/system", web::get().to(get_system_metrics))
394            .route(
395                "/metrics/performance",
396                web::get().to(get_performance_metrics),
397            )
398            .route("/metrics/autoscaler", web::get().to(get_autoscaler_metrics))
399            .route("/metrics/queues", web::get().to(get_queue_metrics))
400            .route("/metrics/workers", web::get().to(get_worker_metrics))
401            .route("/metrics/memory", web::get().to(get_memory_metrics))
402            // Task Registry endpoints
403            .route("/registry", web::get().to(get_registry_info))
404            // Administrative endpoints
405            .route("/alerts", web::get().to(get_active_alerts))
406            .route("/sla", web::get().to(get_sla_status))
407            .route("/diagnostics", web::get().to(get_diagnostics))
408            .route("/uptime", web::get().to(get_uptime_info)),
409    );
410}
411
412#[cfg(feature = "actix-integration")]
413/// Helper macro for creating typed task endpoints
414#[macro_export]
415macro_rules! create_task_endpoint {
416    ($task_type:ty, $queue:expr) => {
417        async fn enqueue_task(
418            task_queue: actix_web::web::Data<std::sync::Arc<$crate::TaskQueue>>,
419            task_data: actix_web::web::Json<$task_type>,
420        ) -> actix_web::Result<actix_web::HttpResponse> {
421            match task_queue.enqueue(task_data.into_inner(), $queue).await {
422                Ok(task_id) => {
423                    let mut response = std::collections::HashMap::new();
424                    response.insert("task_id", task_id.to_string());
425                    response.insert("queue", $queue.to_string());
426                    response.insert("status", "enqueued".to_string());
427                    Ok(actix_web::HttpResponse::Ok().json(response))
428                }
429                Err(e) => {
430                    let mut response = std::collections::HashMap::new();
431                    response.insert("error", e.to_string());
432                    response.insert("queue", $queue.to_string());
433                    Ok(actix_web::HttpResponse::InternalServerError().json(response))
434                }
435            }
436        }
437    };
438}