rust_task_queue/
actix.rs

1//! Actix Web integration helpers with comprehensive metrics endpoints
2
3use crate::prelude::*;
4use crate::queue_names;
5#[cfg(feature = "actix-integration")]
6use actix_web::{web, HttpResponse, Result as ActixResult};
7use serde_json::json;
8use std::collections::HashMap;
9use std::sync::Arc;
10
11#[cfg(feature = "actix-integration")]
12/// Detailed health check with component status
13async fn detailed_health_check(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
14    match task_queue.health_check().await {
15        Ok(health_status) => Ok(HttpResponse::Ok().json(health_status)),
16        Err(e) => Ok(HttpResponse::ServiceUnavailable().json(json!({
17            "status": "unhealthy",
18            "error": e.to_string(),
19            "timestamp": Utc::now()
20        }))),
21    }
22}
23
24#[cfg(feature = "actix-integration")]
25/// System status with health metrics
26async fn system_status(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
27    let health_status = task_queue.metrics.get_health_status().await;
28    let worker_count = task_queue.worker_count().await;
29    let is_shutting_down = task_queue.is_shutting_down().await;
30
31    Ok(HttpResponse::Ok().json(json!({
32        "health": health_status,
33        "workers": {
34            "active_count": worker_count,
35            "shutting_down": is_shutting_down
36        },
37        "timestamp": Utc::now()
38    })))
39}
40
41#[cfg(feature = "actix-integration")]
42/// Comprehensive metrics combining all available metrics
43async fn get_comprehensive_metrics(
44    task_queue: web::Data<Arc<TaskQueue>>,
45) -> ActixResult<HttpResponse> {
46    // Handle mixed Result and non-Result return types separately
47    let system_metrics = task_queue.get_system_metrics().await;
48    let worker_count = task_queue.worker_count().await;
49
50    match tokio::try_join!(
51        task_queue.get_metrics(),
52        task_queue.autoscaler.collect_metrics(),
53    ) {
54        Ok((queue_metrics, autoscaler_metrics)) => {
55            // Generate scaling report based on current state
56            let scaling_report = json!({
57                "current_workers": worker_count,
58                "recommended_action": if autoscaler_metrics.queue_pressure_score > 0.8 {
59                    "scale_up"
60                } else if autoscaler_metrics.worker_utilization < 0.3 {
61                    "scale_down"
62                } else {
63                    "maintain"
64                },
65                "confidence": "high",
66                "next_evaluation": Utc::now() + Duration::seconds(60)
67            });
68
69            Ok(HttpResponse::Ok().json(json!({
70                "timestamp": Utc::now(),
71                "task_queue_metrics": queue_metrics,
72                "system_metrics": system_metrics,
73                "autoscaler_metrics": autoscaler_metrics,
74                "scaling_report": scaling_report,
75                "worker_count": worker_count
76            })))
77        },
78        Err(e) => Ok(HttpResponse::InternalServerError().json(json!({
79            "error": e.to_string(),
80            "timestamp": Utc::now()
81        }))),
82    }
83}
84
85#[cfg(feature = "actix-integration")]
86/// Enhanced system metrics with memory and performance data
87async fn get_system_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
88    let system_metrics = task_queue.get_system_metrics().await;
89    Ok(HttpResponse::Ok().json(system_metrics))
90}
91
92#[cfg(feature = "actix-integration")]
93/// Performance metrics and performance report
94async fn get_performance_metrics(
95    task_queue: web::Data<Arc<TaskQueue>>,
96) -> ActixResult<HttpResponse> {
97    let performance_report = task_queue.metrics.get_performance_report().await;
98    Ok(HttpResponse::Ok().json(performance_report))
99}
100
101#[cfg(feature = "actix-integration")]
102/// AutoScaler metrics and recommendations
103async fn get_autoscaler_metrics(
104    task_queue: web::Data<Arc<TaskQueue>>,
105) -> ActixResult<HttpResponse> {
106    match tokio::try_join!(task_queue.autoscaler.collect_metrics(),) {
107        Ok((metrics,)) => {
108            // Generate recommendations based on current metrics
109            let recommendations = if metrics.queue_pressure_score > 0.8 {
110                "Consider scaling up workers due to high queue pressure"
111            } else if metrics.worker_utilization < 0.3 {
112                "Consider scaling down workers due to low utilization"
113            } else {
114                "Current worker count appears optimal"
115            };
116
117            Ok(HttpResponse::Ok().json(json!({
118                "metrics": metrics,
119                "recommendations": recommendations,
120                "timestamp": Utc::now()
121            })))
122        },
123        Err(e) => Ok(HttpResponse::InternalServerError().json(json!({
124            "error": e.to_string(),
125            "timestamp": Utc::now()
126        }))),
127    }
128}
129
130#[cfg(feature = "actix-integration")]
131/// Individual queue metrics for all queues
132async fn get_queue_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
133    let queues = [
134        queue_names::DEFAULT,
135        queue_names::LOW_PRIORITY,
136        queue_names::HIGH_PRIORITY,
137    ];
138    let mut queue_metrics = Vec::new();
139    let mut errors = Vec::new();
140
141    for queue in &queues {
142        match task_queue.broker.get_queue_metrics(queue).await {
143            Ok(metrics) => queue_metrics.push(metrics),
144            Err(e) => errors.push(json!({
145                "queue": queue,
146                "error": e.to_string()
147            })),
148        }
149    }
150
151    Ok(HttpResponse::Ok().json(json!({
152        "queue_metrics": queue_metrics,
153        "errors": errors,
154        "timestamp": Utc::now()
155    })))
156}
157
158#[cfg(feature = "actix-integration")]
159/// Worker-specific metrics
160async fn get_worker_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
161    let worker_count = task_queue.worker_count().await;
162    let system_metrics = task_queue.get_system_metrics().await;
163
164    Ok(HttpResponse::Ok().json(json!({
165        "active_workers": worker_count,
166        "worker_metrics": system_metrics.workers,
167        "is_shutting_down": task_queue.is_shutting_down().await,
168        "timestamp": Utc::now()
169    })))
170}
171
172#[cfg(feature = "actix-integration")]
173/// Memory usage metrics
174async fn get_memory_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
175    let system_metrics = task_queue.get_system_metrics().await;
176    Ok(HttpResponse::Ok().json(json!({
177        "memory_metrics": system_metrics.memory,
178        "timestamp": Utc::now()
179    })))
180}
181
182#[cfg(feature = "actix-integration")]
183/// Detailed task registry information
184async fn get_registry_info() -> ActixResult<HttpResponse> {
185    #[cfg(feature = "auto-register")]
186    {
187        match TaskRegistry::with_auto_registered() {
188            Ok(registry) => {
189                let registered_tasks = registry.registered_tasks();
190                Ok(HttpResponse::Ok().json(json!({
191                    "registry_type": "auto_registered",
192                    "task_count": registered_tasks.len(),
193                    "tasks": registered_tasks,
194                    "features": {
195                        "auto_register": true,
196                        "inventory_based": true
197                    },
198                    "timestamp": Utc::now()
199                })))
200            }
201            Err(e) => Ok(HttpResponse::InternalServerError().json(json!({
202                "error": e.to_string(),
203                "registry_type": "auto_registered",
204                "timestamp": Utc::now()
205            }))),
206        }
207    }
208    #[cfg(not(feature = "auto-register"))]
209    {
210        Ok(HttpResponse::Ok().json(json!({
211            "registry_type": "manual",
212            "message": "Auto-registration not available - manual registry in use",
213            "features": {
214                "auto_register": false,
215                "inventory_based": false
216            },
217            "timestamp": Utc::now()
218        })))
219    }
220}
221
222#[cfg(feature = "actix-integration")]
223/// Get active alerts from the metrics system
224async fn get_active_alerts(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
225    let performance_report = task_queue.metrics.get_performance_report().await;
226    Ok(HttpResponse::Ok().json(json!({
227        "active_alerts": performance_report.active_alerts,
228        "alert_count": performance_report.active_alerts.len(),
229        "timestamp": Utc::now()
230    })))
231}
232
233#[cfg(feature = "actix-integration")]
234/// SLA status and violations
235async fn get_sla_status(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
236    let performance_report = task_queue.metrics.get_performance_report().await;
237    let system_metrics = task_queue.get_system_metrics().await;
238
239    Ok(HttpResponse::Ok().json(json!({
240        "sla_violations": performance_report.sla_violations,
241        "violation_count": performance_report.sla_violations.len(),
242        "performance_metrics": system_metrics.performance,
243        "success_rate_percentage": system_metrics.performance.success_rate * 100.0,
244        "error_rate_percentage": system_metrics.performance.error_rate * 100.0,
245        "timestamp": Utc::now()
246    })))
247}
248
249#[cfg(feature = "actix-integration")]
250/// Comprehensive diagnostics information
251async fn get_diagnostics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
252    let health_status = task_queue.metrics.get_health_status().await;
253    let performance_report = task_queue.metrics.get_performance_report().await;
254    let worker_count = task_queue.worker_count().await;
255    let is_shutting_down = task_queue.is_shutting_down().await;
256
257    // Get individual queue diagnostics
258    let queues = ["default", "high_priority", "low_priority"];
259    let mut queue_diagnostics = HashMap::new();
260
261    for queue in &queues {
262        if let Ok(metrics) = task_queue.broker.get_queue_metrics(queue).await {
263            queue_diagnostics.insert(queue.to_string(), json!({
264                "pending_tasks": metrics.pending_tasks,
265                "processed_tasks": metrics.processed_tasks,
266                "failed_tasks": metrics.failed_tasks,
267                "health": if metrics.failed_tasks > 0 && metrics.processed_tasks > 0 {
268                    let error_rate = metrics.failed_tasks as f64 / (metrics.processed_tasks + metrics.failed_tasks) as f64;
269                    if error_rate > 0.1 { "unhealthy" } else if error_rate > 0.05 { "warning" } else { "healthy" }
270                } else {
271                    "healthy"
272                }
273            }));
274        }
275    }
276
277    Ok(HttpResponse::Ok().json(json!({
278        "system_health": health_status,
279        "performance_report": performance_report,
280        "worker_diagnostics": {
281            "active_count": worker_count,
282            "shutting_down": is_shutting_down
283        },
284        "queue_diagnostics": queue_diagnostics,
285        "uptime_seconds": performance_report.uptime_seconds,
286        "timestamp": Utc::now()
287    })))
288}
289
290#[cfg(feature = "actix-integration")]
291/// System uptime and runtime information
292async fn get_uptime_info(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
293    let system_metrics = task_queue.get_system_metrics().await;
294    let performance_report = task_queue.metrics.get_performance_report().await;
295
296    let uptime_duration = std::time::Duration::from_secs(system_metrics.uptime_seconds);
297    let days = uptime_duration.as_secs() / 86400;
298    let hours = (uptime_duration.as_secs() % 86400) / 3600;
299    let minutes = (uptime_duration.as_secs() % 3600) / 60;
300    let seconds = uptime_duration.as_secs() % 60;
301
302    Ok(HttpResponse::Ok().json(json!({
303        "uptime": {
304            "seconds": system_metrics.uptime_seconds,
305            "formatted": format!("{}d {}h {}m {}s", days, hours, minutes, seconds),
306            "started_at": Utc::now() - Duration::seconds(system_metrics.uptime_seconds as i64)
307        },
308        "runtime_info": {
309            "total_tasks_executed": system_metrics.tasks.total_executed,
310            "total_tasks_succeeded": system_metrics.tasks.total_succeeded,
311            "total_tasks_failed": system_metrics.tasks.total_failed,
312            "success_rate_percentage": system_metrics.performance.success_rate * 100.0,
313            "average_tasks_per_second": system_metrics.performance.tasks_per_second
314        },
315        "performance_summary": {
316            "task_performance": performance_report.task_performance
317        },
318        "timestamp": Utc::now()
319    })))
320}
321
322#[cfg(feature = "actix-integration")]
323/// Metrics summary with human-readable overview
324async fn get_metrics_summary(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
325    let system_metrics = task_queue.get_system_metrics().await;
326    let performance_report = task_queue.metrics.get_performance_report().await;
327    let worker_count = task_queue.worker_count().await;
328
329    let uptime_duration = std::time::Duration::from_secs(system_metrics.uptime_seconds);
330    let days = uptime_duration.as_secs() / 86400;
331    let hours = (uptime_duration.as_secs() % 86400) / 3600;
332    let minutes = (uptime_duration.as_secs() % 3600) / 60;
333
334    let summary = format!(
335        "TaskQueue Metrics Summary\n\
336        =========================\n\
337        Uptime: {}d {}h {}m\n\
338        Workers: {} active\n\
339        Tasks: {} executed, {} succeeded, {} failed\n\
340        Success Rate: {:.1}%\n\
341        Performance: {:.2} tasks/sec\n\
342        Status: {}",
343        days, hours, minutes,
344        worker_count,
345        system_metrics.tasks.total_executed,
346        system_metrics.tasks.total_succeeded,
347        system_metrics.tasks.total_failed,
348        system_metrics.performance.success_rate * 100.0,
349        system_metrics.performance.tasks_per_second,
350        if performance_report.active_alerts.is_empty() { "Healthy" } else { "Has Alerts" }
351    );
352
353    Ok(HttpResponse::Ok().json(json!({
354        "summary": summary,
355        "timestamp": Utc::now()
356    })))
357}
358
359#[cfg(feature = "actix-integration")]
360/// Helper for creating TaskQueue with auto-registration for Actix Web apps
361pub async fn create_auto_registered_task_queue(
362    redis_url: &str,
363    initial_workers: Option<usize>,
364) -> Result<Arc<TaskQueue>, TaskQueueError> {
365    let mut builder = TaskQueueBuilder::new(redis_url);
366
367    #[cfg(feature = "auto-register")]
368    {
369        builder = builder.auto_register_tasks();
370    }
371
372    if let Some(workers) = initial_workers {
373        builder = builder.initial_workers(workers);
374    }
375
376    Ok(Arc::new(builder.build().await?))
377}
378
379#[cfg(feature = "actix-integration")]
380/// Create TaskQueue from global configuration - ideal for Actix apps
381pub async fn create_task_queue_from_config() -> Result<Arc<TaskQueue>, TaskQueueError> {
382    let task_queue = TaskQueueBuilder::from_global_config()?.build().await?;
383    Ok(Arc::new(task_queue))
384}
385
386#[cfg(feature = "actix-integration")]
387/// Auto-configure TaskQueue for Actix Web with minimal setup
388pub async fn auto_configure_task_queue() -> Result<Arc<TaskQueue>, TaskQueueError> {
389    // Initialize global configuration
390    TaskQueueConfig::init_global()?;
391
392    // Create TaskQueue from global config
393    let task_queue = TaskQueueBuilder::from_global_config()?.build().await?;
394    Ok(Arc::new(task_queue))
395}
396
397#[cfg(feature = "actix-integration")]
398/// Configure Actix Web routes based on global configuration
399pub fn configure_task_queue_routes_auto(cfg: &mut web::ServiceConfig) {
400    if let Some(config) = TaskQueueConfig::global() {
401        #[cfg(feature = "actix-integration")]
402        if config.actix.auto_configure_routes {
403            let prefix = &config.actix.route_prefix;
404            let mut scope = web::scope(prefix);
405
406            if config.actix.enable_health_check {
407                scope = scope
408                    .route("/health", web::get().to(detailed_health_check))
409                    .route("/status", web::get().to(system_status));
410            }
411
412            if config.actix.enable_metrics {
413                scope = scope
414                    .route("/metrics", web::get().to(get_comprehensive_metrics))
415                    .route("/metrics/system", web::get().to(get_system_metrics))
416                    .route(
417                        "/metrics/performance",
418                        web::get().to(get_performance_metrics),
419                    )
420                    .route("/metrics/autoscaler", web::get().to(get_autoscaler_metrics))
421                    .route("/metrics/queues", web::get().to(get_queue_metrics))
422                    .route("/metrics/workers", web::get().to(get_worker_metrics))
423                    .route("/metrics/memory", web::get().to(get_memory_metrics))
424                    .route("/metrics/summary", web::get().to(get_metrics_summary))
425                    .route("/registry", web::get().to(get_registry_info))
426                    .route("/alerts", web::get().to(get_active_alerts))
427                    .route("/sla", web::get().to(get_sla_status))
428                    .route("/diagnostics", web::get().to(get_diagnostics))
429                    .route("/uptime", web::get().to(get_uptime_info));
430            }
431
432            cfg.service(scope);
433        }
434    } else {
435        // Fallback to default comprehensive configuration
436        configure_task_queue_routes(cfg);
437    }
438}
439
440#[cfg(feature = "actix-integration")]
441/// Comprehensive Actix Web service configuration with all metrics endpoints
442pub fn configure_task_queue_routes(cfg: &mut web::ServiceConfig) {
443    cfg.service(
444        web::scope("/tasks")
445            // Health and Status endpoints
446            .route("/health", web::get().to(detailed_health_check))
447            .route("/status", web::get().to(system_status))
448            // Metrics endpoints
449            .route("/metrics", web::get().to(get_comprehensive_metrics))
450            .route("/metrics/system", web::get().to(get_system_metrics))
451            .route(
452                "/metrics/performance",
453                web::get().to(get_performance_metrics),
454            )
455            .route("/metrics/autoscaler", web::get().to(get_autoscaler_metrics))
456            .route("/metrics/queues", web::get().to(get_queue_metrics))
457            .route("/metrics/workers", web::get().to(get_worker_metrics))
458            .route("/metrics/memory", web::get().to(get_memory_metrics))
459            .route("/metrics/summary", web::get().to(get_metrics_summary))
460            // Task Registry endpoints
461            .route("/registry", web::get().to(get_registry_info))
462            // Administrative endpoints
463            .route("/alerts", web::get().to(get_active_alerts))
464            .route("/sla", web::get().to(get_sla_status))
465            .route("/diagnostics", web::get().to(get_diagnostics))
466            .route("/uptime", web::get().to(get_uptime_info)),
467    );
468}
469
470#[cfg(feature = "actix-integration")]
471/// Helper macro for creating typed task endpoints
472#[macro_export]
473macro_rules! create_task_endpoint {
474    ($task_type:ty, $queue:expr) => {
475        async fn enqueue_task(
476            task_queue: actix_web::web::Data<std::sync::Arc<$crate::TaskQueue>>,
477            task_data: actix_web::web::Json<$task_type>,
478        ) -> actix_web::Result<actix_web::HttpResponse> {
479            match task_queue.enqueue(task_data.into_inner(), $queue).await {
480                Ok(task_id) => {
481                    let mut response = std::collections::HashMap::new();
482                    response.insert("task_id", task_id.to_string());
483                    response.insert("queue", $queue.to_string());
484                    response.insert("status", "enqueued".to_string());
485                    Ok(actix_web::HttpResponse::Ok().json(response))
486                }
487                Err(e) => {
488                    let mut response = std::collections::HashMap::new();
489                    response.insert("error", e.to_string());
490                    response.insert("queue", $queue.to_string());
491                    Ok(actix_web::HttpResponse::InternalServerError().json(response))
492                }
493            }
494        }
495    };
496}