1use crate::prelude::*;
4use crate::queue_names;
5#[cfg(feature = "axum-integration")]
6use axum::{
7 extract::State,
8 http::StatusCode,
9 response::Json as ResponseJson,
10 };
12use serde_json::json;
13use std::collections::HashMap;
14use std::sync::Arc;
15
16#[cfg(feature = "axum-integration")]
17async fn detailed_health_check(
19 State(task_queue): State<Arc<TaskQueue>>,
20) -> Result<ResponseJson<serde_json::Value>, (StatusCode, ResponseJson<serde_json::Value>)> {
21 match task_queue.health_check().await {
22 Ok(health_status) => Ok(ResponseJson(json!(health_status))),
23 Err(e) => Err((
24 StatusCode::SERVICE_UNAVAILABLE,
25 ResponseJson(json!({
26 "status": "unhealthy",
27 "error": e.to_string(),
28 "timestamp": Utc::now()
29 })),
30 )),
31 }
32}
33
34#[cfg(feature = "axum-integration")]
35async fn system_status(
37 State(task_queue): State<Arc<TaskQueue>>,
38) -> Result<ResponseJson<serde_json::Value>, (StatusCode, ResponseJson<serde_json::Value>)> {
39 let health_status = task_queue.metrics.get_health_status().await;
40 let worker_count = task_queue.worker_count().await;
41 let is_shutting_down = task_queue.is_shutting_down().await;
42
43 Ok(ResponseJson(json!({
44 "health": health_status,
45 "workers": {
46 "active_count": worker_count,
47 "shutting_down": is_shutting_down
48 },
49 "timestamp": Utc::now()
50 })))
51}
52
53#[cfg(feature = "axum-integration")]
54async fn get_comprehensive_metrics(
56 State(task_queue): State<Arc<TaskQueue>>,
57) -> Result<ResponseJson<serde_json::Value>, (StatusCode, ResponseJson<serde_json::Value>)> {
58 let system_metrics = task_queue.get_system_metrics().await;
60 let worker_count = task_queue.worker_count().await;
61
62 match tokio::try_join!(
63 task_queue.get_metrics(),
64 task_queue.autoscaler.collect_metrics(),
65 ) {
66 Ok((queue_metrics, autoscaler_metrics)) => Ok(ResponseJson(json!({
67 "timestamp": Utc::now(),
68 "task_queue_metrics": queue_metrics,
69 "system_metrics": system_metrics,
70 "autoscaler_metrics": autoscaler_metrics,
71 "worker_count": worker_count
72 }))),
73 Err(e) => Err((
74 StatusCode::INTERNAL_SERVER_ERROR,
75 ResponseJson(json!({
76 "error": e.to_string(),
77 "timestamp": Utc::now()
78 })),
79 )),
80 }
81}
82
83#[cfg(feature = "axum-integration")]
84async fn get_system_metrics(
86 State(task_queue): State<Arc<TaskQueue>>,
87) -> ResponseJson<serde_json::Value> {
88 let system_metrics = task_queue.get_system_metrics().await;
89 ResponseJson(json!(system_metrics))
90}
91
92#[cfg(feature = "axum-integration")]
93async fn get_performance_metrics(
95 State(task_queue): State<Arc<TaskQueue>>,
96) -> ResponseJson<serde_json::Value> {
97 let performance_report = task_queue.metrics.get_performance_report().await;
98 ResponseJson(json!(performance_report))
99}
100
101#[cfg(feature = "axum-integration")]
102async fn get_autoscaler_metrics(
104 State(task_queue): State<Arc<TaskQueue>>,
105) -> Result<ResponseJson<serde_json::Value>, (StatusCode, ResponseJson<serde_json::Value>)> {
106 match tokio::try_join!(task_queue.autoscaler.collect_metrics(),) {
107 Ok(metrics) => Ok(ResponseJson(json!({
108 "metrics": metrics,
109 "timestamp": Utc::now()
110 }))),
111 Err(e) => Err((
112 StatusCode::INTERNAL_SERVER_ERROR,
113 ResponseJson(json!({
114 "error": e.to_string(),
115 "timestamp": Utc::now()
116 })),
117 )),
118 }
119}
120
121#[cfg(feature = "axum-integration")]
122async fn get_queue_metrics(
124 State(task_queue): State<Arc<TaskQueue>>,
125) -> ResponseJson<serde_json::Value> {
126 let queues = [
127 queue_names::DEFAULT,
128 queue_names::LOW_PRIORITY,
129 queue_names::HIGH_PRIORITY,
130 ];
131 let mut queue_metrics = Vec::new();
132 let mut errors = Vec::new();
133
134 for queue in &queues {
135 match task_queue.broker.get_queue_metrics(queue).await {
136 Ok(metrics) => queue_metrics.push(metrics),
137 Err(e) => errors.push(json!({
138 "queue": queue,
139 "error": e.to_string()
140 })),
141 }
142 }
143
144 ResponseJson(json!({
145 "queue_metrics": queue_metrics,
146 "errors": errors,
147 "timestamp": Utc::now()
148 }))
149}
150
151#[cfg(feature = "axum-integration")]
152async fn get_worker_metrics(
154 State(task_queue): State<Arc<TaskQueue>>,
155) -> ResponseJson<serde_json::Value> {
156 let worker_count = task_queue.worker_count().await;
157 let system_metrics = task_queue.get_system_metrics().await;
158
159 ResponseJson(json!({
160 "active_workers": worker_count,
161 "worker_metrics": system_metrics.workers,
162 "is_shutting_down": task_queue.is_shutting_down().await,
163 "timestamp": Utc::now()
164 }))
165}
166
167#[cfg(feature = "axum-integration")]
168async fn get_memory_metrics(
170 State(task_queue): State<Arc<TaskQueue>>,
171) -> ResponseJson<serde_json::Value> {
172 let system_metrics = task_queue.get_system_metrics().await;
173 ResponseJson(json!({
174 "memory_metrics": system_metrics.memory,
175 "timestamp": Utc::now()
176 }))
177}
178
179#[cfg(not(all(feature = "axum-integration", feature = "auto-register")))]
180#[allow(dead_code)]
182async fn get_registered_tasks() -> ResponseJson<serde_json::Value> {
183 ResponseJson(json!({
184 "message": "Auto-registration feature not enabled",
185 "auto_registration_enabled": false,
186 "timestamp": Utc::now()
187 }))
188}
189
190#[cfg(feature = "axum-integration")]
191async fn get_registry_info() -> ResponseJson<serde_json::Value> {
193 #[cfg(feature = "auto-register")]
194 {
195 match TaskRegistry::with_auto_registered() {
196 Ok(registry) => {
197 let registered_tasks = registry.registered_tasks();
198 ResponseJson(json!({
199 "registry_type": "auto_registered",
200 "task_count": registered_tasks.len(),
201 "tasks": registered_tasks,
202 "features": {
203 "auto_register": true,
204 "inventory_based": true
205 },
206 "timestamp": Utc::now()
207 }))
208 }
209 Err(e) => ResponseJson(json!({
210 "error": e.to_string(),
211 "registry_type": "auto_registered",
212 "timestamp": Utc::now()
213 })),
214 }
215 }
216 #[cfg(not(feature = "auto-register"))]
217 {
218 ResponseJson(json!({
219 "registry_type": "manual",
220 "message": "Auto-registration not available - manual registry in use",
221 "features": {
222 "auto_register": false,
223 "inventory_based": false
224 },
225 "timestamp": Utc::now()
226 }))
227 }
228}
229
230#[cfg(feature = "axum-integration")]
231async fn get_active_alerts(
233 State(task_queue): State<Arc<TaskQueue>>,
234) -> ResponseJson<serde_json::Value> {
235 let performance_report = task_queue.metrics.get_performance_report().await;
236 ResponseJson(json!({
237 "active_alerts": performance_report.active_alerts,
238 "alert_count": performance_report.active_alerts.len(),
239 "timestamp": Utc::now()
240 }))
241}
242
243#[cfg(feature = "axum-integration")]
244async fn get_sla_status(
246 State(task_queue): State<Arc<TaskQueue>>,
247) -> ResponseJson<serde_json::Value> {
248 let performance_report = task_queue.metrics.get_performance_report().await;
249 let system_metrics = task_queue.get_system_metrics().await;
250
251 ResponseJson(json!({
252 "sla_violations": performance_report.sla_violations,
253 "violation_count": performance_report.sla_violations.len(),
254 "performance_metrics": system_metrics.performance,
255 "success_rate_percentage": system_metrics.performance.success_rate * 100.0,
256 "error_rate_percentage": system_metrics.performance.error_rate * 100.0,
257 "timestamp": Utc::now()
258 }))
259}
260
261#[cfg(feature = "axum-integration")]
262async fn get_diagnostics(
264 State(task_queue): State<Arc<TaskQueue>>,
265) -> ResponseJson<serde_json::Value> {
266 let health_status = task_queue.metrics.get_health_status().await;
267 let performance_report = task_queue.metrics.get_performance_report().await;
268 let worker_count = task_queue.worker_count().await;
269 let is_shutting_down = task_queue.is_shutting_down().await;
270
271 let queues = ["default", "high_priority", "low_priority"];
273 let mut queue_diagnostics = HashMap::new();
274
275 for queue in &queues {
276 if let Ok(metrics) = task_queue.broker.get_queue_metrics(queue).await {
277 queue_diagnostics.insert(queue.to_string(), json!({
278 "pending_tasks": metrics.pending_tasks,
279 "processed_tasks": metrics.processed_tasks,
280 "failed_tasks": metrics.failed_tasks,
281 "health": if metrics.failed_tasks > 0 && metrics.processed_tasks > 0 {
282 let error_rate = metrics.failed_tasks as f64 / (metrics.processed_tasks + metrics.failed_tasks) as f64;
283 if error_rate > 0.1 { "unhealthy" } else if error_rate > 0.05 { "warning" } else { "healthy" }
284 } else {
285 "healthy"
286 }
287 }));
288 }
289 }
290
291 ResponseJson(json!({
292 "system_health": health_status,
293 "performance_report": performance_report,
294 "worker_diagnostics": {
295 "active_count": worker_count,
296 "shutting_down": is_shutting_down
297 },
298 "queue_diagnostics": queue_diagnostics,
299 "uptime_seconds": performance_report.uptime_seconds,
300 "timestamp": Utc::now()
301 }))
302}
303
304#[cfg(feature = "axum-integration")]
305async fn get_uptime_info(
307 State(task_queue): State<Arc<TaskQueue>>,
308) -> ResponseJson<serde_json::Value> {
309 let system_metrics = task_queue.get_system_metrics().await;
310 let performance_report = task_queue.metrics.get_performance_report().await;
311
312 let uptime_duration = std::time::Duration::from_secs(system_metrics.uptime_seconds);
313 let days = uptime_duration.as_secs() / 86400;
314 let hours = (uptime_duration.as_secs() % 86400) / 3600;
315 let minutes = (uptime_duration.as_secs() % 3600) / 60;
316 let seconds = uptime_duration.as_secs() % 60;
317
318 ResponseJson(json!({
319 "uptime": {
320 "seconds": system_metrics.uptime_seconds,
321 "formatted": format!("{}d {}h {}m {}s", days, hours, minutes, seconds),
322 "started_at": Utc::now() - Duration::seconds(system_metrics.uptime_seconds as i64)
323 },
324 "runtime_info": {
325 "total_tasks_executed": system_metrics.tasks.total_executed,
326 "total_tasks_succeeded": system_metrics.tasks.total_succeeded,
327 "total_tasks_failed": system_metrics.tasks.total_failed,
328 "success_rate_percentage": system_metrics.performance.success_rate * 100.0,
329 "average_tasks_per_second": system_metrics.performance.tasks_per_second
330 },
331 "performance_summary": {
332 "task_performance": performance_report.task_performance
333 },
334 "timestamp": Utc::now()
335 }))
336}
337
338#[cfg(feature = "axum-integration")]
339pub async fn create_auto_registered_task_queue(
341 redis_url: &str,
342 initial_workers: Option<usize>,
343) -> Result<Arc<TaskQueue>, TaskQueueError> {
344 let mut builder = TaskQueueBuilder::new(redis_url);
345
346 #[cfg(feature = "auto-register")]
347 {
348 builder = builder.auto_register_tasks();
349 }
350
351 if let Some(workers) = initial_workers {
352 builder = builder.initial_workers(workers);
353 }
354
355 Ok(Arc::new(builder.build().await?))
356}
357
358#[cfg(feature = "axum-integration")]
359pub async fn create_task_queue_from_config() -> Result<Arc<TaskQueue>, TaskQueueError> {
361 let task_queue = TaskQueueBuilder::from_global_config()?.build().await?;
362 Ok(Arc::new(task_queue))
363}
364
365#[cfg(feature = "axum-integration")]
366pub async fn auto_configure_task_queue() -> Result<Arc<TaskQueue>, TaskQueueError> {
368 TaskQueueConfig::init_global()?;
370
371 let task_queue = TaskQueueBuilder::from_global_config()?.build().await?;
373 Ok(Arc::new(task_queue))
374}
375
376#[cfg(feature = "axum-integration")]
377pub fn configure_task_queue_routes_auto() -> axum::Router<Arc<TaskQueue>> {
379 use axum::routing::get;
380
381 if let Some(config) = TaskQueueConfig::global() {
382 #[cfg(feature = "axum-integration")]
383 if config.axum.auto_configure_routes {
384 let prefix = &config.axum.route_prefix;
385 let mut router = axum::Router::new();
386
387 if config.axum.enable_health_check {
388 router = router
389 .route("/health", get(detailed_health_check))
390 .route("/status", get(system_status));
391 }
392
393 if config.axum.enable_metrics {
394 router = router
395 .route("/metrics", get(get_comprehensive_metrics))
396 .route("/metrics/system", get(get_system_metrics))
397 .route("/metrics/performance", get(get_performance_metrics))
398 .route("/metrics/autoscaler", get(get_autoscaler_metrics))
399 .route("/metrics/queues", get(get_queue_metrics))
400 .route("/metrics/workers", get(get_worker_metrics))
401 .route("/metrics/memory", get(get_memory_metrics))
402 .route("/registry", get(get_registry_info))
403 .route("/alerts", get(get_active_alerts))
404 .route("/sla", get(get_sla_status))
405 .route("/diagnostics", get(get_diagnostics))
406 .route("/uptime", get(get_uptime_info));
407 }
408
409 return axum::Router::new().nest(prefix, router);
410 }
411 }
412
413 configure_task_queue_routes()
415}
416
417#[cfg(feature = "axum-integration")]
418pub fn configure_task_queue_routes() -> axum::Router<Arc<TaskQueue>> {
420 use axum::routing::get;
421
422 axum::Router::new().nest(
423 "/tasks",
424 axum::Router::new()
425 .route("/health", get(detailed_health_check))
427 .route("/status", get(system_status))
428 .route("/metrics", get(get_comprehensive_metrics))
430 .route("/metrics/system", get(get_system_metrics))
431 .route("/metrics/performance", get(get_performance_metrics))
432 .route("/metrics/autoscaler", get(get_autoscaler_metrics))
433 .route("/metrics/queues", get(get_queue_metrics))
434 .route("/metrics/workers", get(get_worker_metrics))
435 .route("/metrics/memory", get(get_memory_metrics))
436 .route("/registry", get(get_registry_info))
438 .route("/alerts", get(get_active_alerts))
440 .route("/sla", get(get_sla_status))
441 .route("/diagnostics", get(get_diagnostics))
442 .route("/uptime", get(get_uptime_info)),
443 )
444}
445
446#[cfg(feature = "axum-integration")]
447#[macro_export]
449macro_rules! create_axum_task_endpoint {
450 ($task_type:ty, $queue:expr) => {
451 async fn enqueue_task(
452 axum::extract::State(task_queue): axum::extract::State<std::sync::Arc<$crate::TaskQueue>>,
453 axum::Json(task_data): axum::Json<$task_type>,
454 ) -> Result<
455 axum::Json<std::collections::HashMap<String, String>>,
456 (
457 axum::http::StatusCode,
458 axum::Json<std::collections::HashMap<String, String>>,
459 ),
460 > {
461 match task_queue.enqueue(task_data, $queue).await {
462 Ok(task_id) => {
463 let mut response = std::collections::HashMap::new();
464 response.insert("task_id".to_string(), task_id.to_string());
465 response.insert("queue".to_string(), $queue.to_string());
466 response.insert("status".to_string(), "enqueued".to_string());
467 Ok(axum::Json(response))
468 }
469 Err(e) => {
470 let mut response = std::collections::HashMap::new();
471 response.insert("error".to_string(), e.to_string());
472 response.insert("queue".to_string(), $queue.to_string());
473 Err((
474 axum::http::StatusCode::INTERNAL_SERVER_ERROR,
475 axum::Json(response),
476 ))
477 }
478 }
479 }
480 };
481}