rust_task_queue/
axum.rs

1//! Axum integration helpers with comprehensive metrics endpoints
2
3use crate::prelude::*;
4use crate::queue_names;
5#[cfg(feature = "axum-integration")]
6use axum::{
7    extract::State,
8    http::StatusCode,
9    response::Json as ResponseJson,
10    // Json,
11};
12use serde_json::json;
13use std::collections::HashMap;
14use std::sync::Arc;
15
16#[cfg(feature = "axum-integration")]
17/// Detailed health check with component status
18async fn detailed_health_check(
19    State(task_queue): State<Arc<TaskQueue>>,
20) -> Result<ResponseJson<serde_json::Value>, (StatusCode, ResponseJson<serde_json::Value>)> {
21    match task_queue.health_check().await {
22        Ok(health_status) => Ok(ResponseJson(json!(health_status))),
23        Err(e) => Err((
24            StatusCode::SERVICE_UNAVAILABLE,
25            ResponseJson(json!({
26                "status": "unhealthy",
27                "error": e.to_string(),
28                "timestamp": Utc::now()
29            })),
30        )),
31    }
32}
33
34#[cfg(feature = "axum-integration")]
35/// System status with health metrics
36async fn system_status(
37    State(task_queue): State<Arc<TaskQueue>>,
38) -> Result<ResponseJson<serde_json::Value>, (StatusCode, ResponseJson<serde_json::Value>)> {
39    let health_status = task_queue.metrics.get_health_status().await;
40    let worker_count = task_queue.worker_count().await;
41    let is_shutting_down = task_queue.is_shutting_down().await;
42
43    Ok(ResponseJson(json!({
44        "health": health_status,
45        "workers": {
46            "active_count": worker_count,
47            "shutting_down": is_shutting_down
48        },
49        "timestamp": Utc::now()
50    })))
51}
52
53#[cfg(feature = "axum-integration")]
54/// Comprehensive metrics combining all available metrics
55async fn get_comprehensive_metrics(
56    State(task_queue): State<Arc<TaskQueue>>,
57) -> Result<ResponseJson<serde_json::Value>, (StatusCode, ResponseJson<serde_json::Value>)> {
58    // Handle mixed Result and non-Result return types separately
59    let system_metrics = task_queue.get_system_metrics().await;
60    let worker_count = task_queue.worker_count().await;
61
62    match tokio::try_join!(
63        task_queue.get_metrics(),
64        task_queue.autoscaler.collect_metrics(),
65    ) {
66        Ok((queue_metrics, autoscaler_metrics)) => Ok(ResponseJson(json!({
67            "timestamp": Utc::now(),
68            "task_queue_metrics": queue_metrics,
69            "system_metrics": system_metrics,
70            "autoscaler_metrics": autoscaler_metrics,
71            "worker_count": worker_count
72        }))),
73        Err(e) => Err((
74            StatusCode::INTERNAL_SERVER_ERROR,
75            ResponseJson(json!({
76                "error": e.to_string(),
77                "timestamp": Utc::now()
78            })),
79        )),
80    }
81}
82
83#[cfg(feature = "axum-integration")]
84/// Enhanced system metrics with memory and performance data
85async fn get_system_metrics(
86    State(task_queue): State<Arc<TaskQueue>>,
87) -> ResponseJson<serde_json::Value> {
88    let system_metrics = task_queue.get_system_metrics().await;
89    ResponseJson(json!(system_metrics))
90}
91
92#[cfg(feature = "axum-integration")]
93/// Performance metrics and performance report
94async fn get_performance_metrics(
95    State(task_queue): State<Arc<TaskQueue>>,
96) -> ResponseJson<serde_json::Value> {
97    let performance_report = task_queue.metrics.get_performance_report().await;
98    ResponseJson(json!(performance_report))
99}
100
101#[cfg(feature = "axum-integration")]
102/// AutoScaler metrics and recommendations
103async fn get_autoscaler_metrics(
104    State(task_queue): State<Arc<TaskQueue>>,
105) -> Result<ResponseJson<serde_json::Value>, (StatusCode, ResponseJson<serde_json::Value>)> {
106    match tokio::try_join!(task_queue.autoscaler.collect_metrics(),) {
107        Ok(metrics) => Ok(ResponseJson(json!({
108            "metrics": metrics,
109            "timestamp": Utc::now()
110        }))),
111        Err(e) => Err((
112            StatusCode::INTERNAL_SERVER_ERROR,
113            ResponseJson(json!({
114                "error": e.to_string(),
115                "timestamp": Utc::now()
116            })),
117        )),
118    }
119}
120
121#[cfg(feature = "axum-integration")]
122/// Individual queue metrics for all queues
123async fn get_queue_metrics(
124    State(task_queue): State<Arc<TaskQueue>>,
125) -> ResponseJson<serde_json::Value> {
126    let queues = [
127        queue_names::DEFAULT,
128        queue_names::LOW_PRIORITY,
129        queue_names::HIGH_PRIORITY,
130    ];
131    let mut queue_metrics = Vec::new();
132    let mut errors = Vec::new();
133
134    for queue in &queues {
135        match task_queue.broker.get_queue_metrics(queue).await {
136            Ok(metrics) => queue_metrics.push(metrics),
137            Err(e) => errors.push(json!({
138                "queue": queue,
139                "error": e.to_string()
140            })),
141        }
142    }
143
144    ResponseJson(json!({
145        "queue_metrics": queue_metrics,
146        "errors": errors,
147        "timestamp": Utc::now()
148    }))
149}
150
151#[cfg(feature = "axum-integration")]
152/// Worker-specific metrics
153async fn get_worker_metrics(
154    State(task_queue): State<Arc<TaskQueue>>,
155) -> ResponseJson<serde_json::Value> {
156    let worker_count = task_queue.worker_count().await;
157    let system_metrics = task_queue.get_system_metrics().await;
158
159    ResponseJson(json!({
160        "active_workers": worker_count,
161        "worker_metrics": system_metrics.workers,
162        "is_shutting_down": task_queue.is_shutting_down().await,
163        "timestamp": Utc::now()
164    }))
165}
166
167#[cfg(feature = "axum-integration")]
168/// Memory usage metrics
169async fn get_memory_metrics(
170    State(task_queue): State<Arc<TaskQueue>>,
171) -> ResponseJson<serde_json::Value> {
172    let system_metrics = task_queue.get_system_metrics().await;
173    ResponseJson(json!({
174        "memory_metrics": system_metrics.memory,
175        "timestamp": Utc::now()
176    }))
177}
178
179#[cfg(not(all(feature = "axum-integration", feature = "auto-register")))]
180/// Fallback when auto-register feature is not enabled
181#[allow(dead_code)]
182async fn get_registered_tasks() -> ResponseJson<serde_json::Value> {
183    ResponseJson(json!({
184        "message": "Auto-registration feature not enabled",
185        "auto_registration_enabled": false,
186        "timestamp": Utc::now()
187    }))
188}
189
190#[cfg(feature = "axum-integration")]
191/// Detailed task registry information
192async fn get_registry_info() -> ResponseJson<serde_json::Value> {
193    #[cfg(feature = "auto-register")]
194    {
195        match TaskRegistry::with_auto_registered() {
196            Ok(registry) => {
197                let registered_tasks = registry.registered_tasks();
198                ResponseJson(json!({
199                    "registry_type": "auto_registered",
200                    "task_count": registered_tasks.len(),
201                    "tasks": registered_tasks,
202                    "features": {
203                        "auto_register": true,
204                        "inventory_based": true
205                    },
206                    "timestamp": Utc::now()
207                }))
208            }
209            Err(e) => ResponseJson(json!({
210                "error": e.to_string(),
211                "registry_type": "auto_registered",
212                "timestamp": Utc::now()
213            })),
214        }
215    }
216    #[cfg(not(feature = "auto-register"))]
217    {
218        ResponseJson(json!({
219            "registry_type": "manual",
220            "message": "Auto-registration not available - manual registry in use",
221            "features": {
222                "auto_register": false,
223                "inventory_based": false
224            },
225            "timestamp": Utc::now()
226        }))
227    }
228}
229
230#[cfg(feature = "axum-integration")]
231/// Get active alerts from the metrics system
232async fn get_active_alerts(
233    State(task_queue): State<Arc<TaskQueue>>,
234) -> ResponseJson<serde_json::Value> {
235    let performance_report = task_queue.metrics.get_performance_report().await;
236    ResponseJson(json!({
237        "active_alerts": performance_report.active_alerts,
238        "alert_count": performance_report.active_alerts.len(),
239        "timestamp": Utc::now()
240    }))
241}
242
243#[cfg(feature = "axum-integration")]
244/// SLA status and violations
245async fn get_sla_status(
246    State(task_queue): State<Arc<TaskQueue>>,
247) -> ResponseJson<serde_json::Value> {
248    let performance_report = task_queue.metrics.get_performance_report().await;
249    let system_metrics = task_queue.get_system_metrics().await;
250
251    ResponseJson(json!({
252        "sla_violations": performance_report.sla_violations,
253        "violation_count": performance_report.sla_violations.len(),
254        "performance_metrics": system_metrics.performance,
255        "success_rate_percentage": system_metrics.performance.success_rate * 100.0,
256        "error_rate_percentage": system_metrics.performance.error_rate * 100.0,
257        "timestamp": Utc::now()
258    }))
259}
260
261#[cfg(feature = "axum-integration")]
262/// Comprehensive diagnostics information
263async fn get_diagnostics(
264    State(task_queue): State<Arc<TaskQueue>>,
265) -> ResponseJson<serde_json::Value> {
266    let health_status = task_queue.metrics.get_health_status().await;
267    let performance_report = task_queue.metrics.get_performance_report().await;
268    let worker_count = task_queue.worker_count().await;
269    let is_shutting_down = task_queue.is_shutting_down().await;
270
271    // Get individual queue diagnostics
272    let queues = ["default", "high_priority", "low_priority"];
273    let mut queue_diagnostics = HashMap::new();
274
275    for queue in &queues {
276        if let Ok(metrics) = task_queue.broker.get_queue_metrics(queue).await {
277            queue_diagnostics.insert(queue.to_string(), json!({
278                "pending_tasks": metrics.pending_tasks,
279                "processed_tasks": metrics.processed_tasks,
280                "failed_tasks": metrics.failed_tasks,
281                "health": if metrics.failed_tasks > 0 && metrics.processed_tasks > 0 {
282                    let error_rate = metrics.failed_tasks as f64 / (metrics.processed_tasks + metrics.failed_tasks) as f64;
283                    if error_rate > 0.1 { "unhealthy" } else if error_rate > 0.05 { "warning" } else { "healthy" }
284                } else {
285                    "healthy"
286                }
287            }));
288        }
289    }
290
291    ResponseJson(json!({
292        "system_health": health_status,
293        "performance_report": performance_report,
294        "worker_diagnostics": {
295            "active_count": worker_count,
296            "shutting_down": is_shutting_down
297        },
298        "queue_diagnostics": queue_diagnostics,
299        "uptime_seconds": performance_report.uptime_seconds,
300        "timestamp": Utc::now()
301    }))
302}
303
304#[cfg(feature = "axum-integration")]
305/// System uptime and runtime information
306async fn get_uptime_info(
307    State(task_queue): State<Arc<TaskQueue>>,
308) -> ResponseJson<serde_json::Value> {
309    let system_metrics = task_queue.get_system_metrics().await;
310    let performance_report = task_queue.metrics.get_performance_report().await;
311
312    let uptime_duration = std::time::Duration::from_secs(system_metrics.uptime_seconds);
313    let days = uptime_duration.as_secs() / 86400;
314    let hours = (uptime_duration.as_secs() % 86400) / 3600;
315    let minutes = (uptime_duration.as_secs() % 3600) / 60;
316    let seconds = uptime_duration.as_secs() % 60;
317
318    ResponseJson(json!({
319        "uptime": {
320            "seconds": system_metrics.uptime_seconds,
321            "formatted": format!("{}d {}h {}m {}s", days, hours, minutes, seconds),
322            "started_at": Utc::now() - Duration::seconds(system_metrics.uptime_seconds as i64)
323        },
324        "runtime_info": {
325            "total_tasks_executed": system_metrics.tasks.total_executed,
326            "total_tasks_succeeded": system_metrics.tasks.total_succeeded,
327            "total_tasks_failed": system_metrics.tasks.total_failed,
328            "success_rate_percentage": system_metrics.performance.success_rate * 100.0,
329            "average_tasks_per_second": system_metrics.performance.tasks_per_second
330        },
331        "performance_summary": {
332            "task_performance": performance_report.task_performance
333        },
334        "timestamp": Utc::now()
335    }))
336}
337
338#[cfg(feature = "axum-integration")]
339/// Helper for creating TaskQueue with auto-registration for Axum apps
340pub async fn create_auto_registered_task_queue(
341    redis_url: &str,
342    initial_workers: Option<usize>,
343) -> Result<Arc<TaskQueue>, TaskQueueError> {
344    let mut builder = TaskQueueBuilder::new(redis_url);
345
346    #[cfg(feature = "auto-register")]
347    {
348        builder = builder.auto_register_tasks();
349    }
350
351    if let Some(workers) = initial_workers {
352        builder = builder.initial_workers(workers);
353    }
354
355    Ok(Arc::new(builder.build().await?))
356}
357
358#[cfg(feature = "axum-integration")]
359/// Create TaskQueue from global configuration - ideal for Axum apps
360pub async fn create_task_queue_from_config() -> Result<Arc<TaskQueue>, TaskQueueError> {
361    let task_queue = TaskQueueBuilder::from_global_config()?.build().await?;
362    Ok(Arc::new(task_queue))
363}
364
365#[cfg(feature = "axum-integration")]
366/// Auto-configure TaskQueue for Axum with minimal setup
367pub async fn auto_configure_task_queue() -> Result<Arc<TaskQueue>, TaskQueueError> {
368    // Initialize global configuration
369    TaskQueueConfig::init_global()?;
370
371    // Create TaskQueue from global config
372    let task_queue = TaskQueueBuilder::from_global_config()?.build().await?;
373    Ok(Arc::new(task_queue))
374}
375
376#[cfg(feature = "axum-integration")]
377/// Configure Axum routes based on global configuration
378pub fn configure_task_queue_routes_auto() -> axum::Router<Arc<TaskQueue>> {
379    use axum::routing::get;
380
381    if let Some(config) = TaskQueueConfig::global() {
382        #[cfg(feature = "axum-integration")]
383        if config.axum.auto_configure_routes {
384            let prefix = &config.axum.route_prefix;
385            let mut router = axum::Router::new();
386
387            if config.axum.enable_health_check {
388                router = router
389                    .route("/health", get(detailed_health_check))
390                    .route("/status", get(system_status));
391            }
392
393            if config.axum.enable_metrics {
394                router = router
395                    .route("/metrics", get(get_comprehensive_metrics))
396                    .route("/metrics/system", get(get_system_metrics))
397                    .route("/metrics/performance", get(get_performance_metrics))
398                    .route("/metrics/autoscaler", get(get_autoscaler_metrics))
399                    .route("/metrics/queues", get(get_queue_metrics))
400                    .route("/metrics/workers", get(get_worker_metrics))
401                    .route("/metrics/memory", get(get_memory_metrics))
402                    .route("/registry", get(get_registry_info))
403                    .route("/alerts", get(get_active_alerts))
404                    .route("/sla", get(get_sla_status))
405                    .route("/diagnostics", get(get_diagnostics))
406                    .route("/uptime", get(get_uptime_info));
407            }
408
409            return axum::Router::new().nest(prefix, router);
410        }
411    }
412
413    // Fallback to default comprehensive configuration
414    configure_task_queue_routes()
415}
416
417#[cfg(feature = "axum-integration")]
418/// Comprehensive Axum router configuration with all metrics endpoints
419pub fn configure_task_queue_routes() -> axum::Router<Arc<TaskQueue>> {
420    use axum::routing::get;
421
422    axum::Router::new().nest(
423        "/tasks",
424        axum::Router::new()
425            // Health and Status endpoints
426            .route("/health", get(detailed_health_check))
427            .route("/status", get(system_status))
428            // Metrics endpoints
429            .route("/metrics", get(get_comprehensive_metrics))
430            .route("/metrics/system", get(get_system_metrics))
431            .route("/metrics/performance", get(get_performance_metrics))
432            .route("/metrics/autoscaler", get(get_autoscaler_metrics))
433            .route("/metrics/queues", get(get_queue_metrics))
434            .route("/metrics/workers", get(get_worker_metrics))
435            .route("/metrics/memory", get(get_memory_metrics))
436            // Task Registry endpoints
437            .route("/registry", get(get_registry_info))
438            // Administrative endpoints
439            .route("/alerts", get(get_active_alerts))
440            .route("/sla", get(get_sla_status))
441            .route("/diagnostics", get(get_diagnostics))
442            .route("/uptime", get(get_uptime_info)),
443    )
444}
445
446#[cfg(feature = "axum-integration")]
447/// Helper macro for creating typed task endpoints for Axum
448#[macro_export]
449macro_rules! create_axum_task_endpoint {
450    ($task_type:ty, $queue:expr) => {
451        async fn enqueue_task(
452            axum::extract::State(task_queue): axum::extract::State<std::sync::Arc<$crate::TaskQueue>>,
453            axum::Json(task_data): axum::Json<$task_type>,
454        ) -> Result<
455            axum::Json<std::collections::HashMap<String, String>>,
456            (
457                axum::http::StatusCode,
458                axum::Json<std::collections::HashMap<String, String>>,
459            ),
460        > {
461            match task_queue.enqueue(task_data, $queue).await {
462                Ok(task_id) => {
463                    let mut response = std::collections::HashMap::new();
464                    response.insert("task_id".to_string(), task_id.to_string());
465                    response.insert("queue".to_string(), $queue.to_string());
466                    response.insert("status".to_string(), "enqueued".to_string());
467                    Ok(axum::Json(response))
468                }
469                Err(e) => {
470                    let mut response = std::collections::HashMap::new();
471                    response.insert("error".to_string(), e.to_string());
472                    response.insert("queue".to_string(), $queue.to_string());
473                    Err((
474                        axum::http::StatusCode::INTERNAL_SERVER_ERROR,
475                        axum::Json(response),
476                    ))
477                }
478            }
479        }
480    };
481}