1use crate::prelude::*;
4use crate::queue_names;
5#[cfg(feature = "actix-integration")]
6use actix_web::{web, HttpResponse, Result as ActixResult};
7use serde_json::json;
8use std::collections::HashMap;
9use std::sync::Arc;
10
11#[cfg(feature = "actix-integration")]
12async fn detailed_health_check(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
14 match task_queue.health_check().await {
15 Ok(health_status) => Ok(HttpResponse::Ok().json(health_status)),
16 Err(e) => Ok(HttpResponse::ServiceUnavailable().json(json!({
17 "status": "unhealthy",
18 "error": e.to_string(),
19 "timestamp": Utc::now()
20 }))),
21 }
22}
23
24#[cfg(feature = "actix-integration")]
25async fn system_status(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
27 let health_status = task_queue.metrics.get_health_status().await;
28 let worker_count = task_queue.worker_count().await;
29 let is_shutting_down = task_queue.is_shutting_down().await;
30
31 Ok(HttpResponse::Ok().json(json!({
32 "health": health_status,
33 "workers": {
34 "active_count": worker_count,
35 "shutting_down": is_shutting_down
36 },
37 "timestamp": Utc::now()
38 })))
39}
40
41#[cfg(feature = "actix-integration")]
42async fn get_comprehensive_metrics(
44 task_queue: web::Data<Arc<TaskQueue>>,
45) -> ActixResult<HttpResponse> {
46 let system_metrics = task_queue.get_system_metrics().await;
48 let worker_count = task_queue.worker_count().await;
49
50 match tokio::try_join!(
51 task_queue.get_metrics(),
52 task_queue.autoscaler.collect_metrics(),
53 ) {
54 Ok((queue_metrics, autoscaler_metrics)) => {
55 let scaling_report = json!({
57 "current_workers": worker_count,
58 "recommended_action": if autoscaler_metrics.queue_pressure_score > 0.8 {
59 "scale_up"
60 } else if autoscaler_metrics.worker_utilization < 0.3 {
61 "scale_down"
62 } else {
63 "maintain"
64 },
65 "confidence": "high",
66 "next_evaluation": Utc::now() + Duration::seconds(60)
67 });
68
69 Ok(HttpResponse::Ok().json(json!({
70 "timestamp": Utc::now(),
71 "task_queue_metrics": queue_metrics,
72 "system_metrics": system_metrics,
73 "autoscaler_metrics": autoscaler_metrics,
74 "scaling_report": scaling_report,
75 "worker_count": worker_count
76 })))
77 },
78 Err(e) => Ok(HttpResponse::InternalServerError().json(json!({
79 "error": e.to_string(),
80 "timestamp": Utc::now()
81 }))),
82 }
83}
84
85#[cfg(feature = "actix-integration")]
86async fn get_system_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
88 let system_metrics = task_queue.get_system_metrics().await;
89 Ok(HttpResponse::Ok().json(system_metrics))
90}
91
92#[cfg(feature = "actix-integration")]
93async fn get_performance_metrics(
95 task_queue: web::Data<Arc<TaskQueue>>,
96) -> ActixResult<HttpResponse> {
97 let performance_report = task_queue.metrics.get_performance_report().await;
98 Ok(HttpResponse::Ok().json(performance_report))
99}
100
101#[cfg(feature = "actix-integration")]
102async fn get_autoscaler_metrics(
104 task_queue: web::Data<Arc<TaskQueue>>,
105) -> ActixResult<HttpResponse> {
106 match tokio::try_join!(task_queue.autoscaler.collect_metrics(),) {
107 Ok((metrics,)) => {
108 let recommendations = if metrics.queue_pressure_score > 0.8 {
110 "Consider scaling up workers due to high queue pressure"
111 } else if metrics.worker_utilization < 0.3 {
112 "Consider scaling down workers due to low utilization"
113 } else {
114 "Current worker count appears optimal"
115 };
116
117 Ok(HttpResponse::Ok().json(json!({
118 "metrics": metrics,
119 "recommendations": recommendations,
120 "timestamp": Utc::now()
121 })))
122 },
123 Err(e) => Ok(HttpResponse::InternalServerError().json(json!({
124 "error": e.to_string(),
125 "timestamp": Utc::now()
126 }))),
127 }
128}
129
130#[cfg(feature = "actix-integration")]
131async fn get_queue_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
133 let queues = [
134 queue_names::DEFAULT,
135 queue_names::LOW_PRIORITY,
136 queue_names::HIGH_PRIORITY,
137 ];
138 let mut queue_metrics = Vec::new();
139 let mut errors = Vec::new();
140
141 for queue in &queues {
142 match task_queue.broker.get_queue_metrics(queue).await {
143 Ok(metrics) => queue_metrics.push(metrics),
144 Err(e) => errors.push(json!({
145 "queue": queue,
146 "error": e.to_string()
147 })),
148 }
149 }
150
151 Ok(HttpResponse::Ok().json(json!({
152 "queue_metrics": queue_metrics,
153 "errors": errors,
154 "timestamp": Utc::now()
155 })))
156}
157
158#[cfg(feature = "actix-integration")]
159async fn get_worker_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
161 let worker_count = task_queue.worker_count().await;
162 let system_metrics = task_queue.get_system_metrics().await;
163
164 Ok(HttpResponse::Ok().json(json!({
165 "active_workers": worker_count,
166 "worker_metrics": system_metrics.workers,
167 "is_shutting_down": task_queue.is_shutting_down().await,
168 "timestamp": Utc::now()
169 })))
170}
171
172#[cfg(feature = "actix-integration")]
173async fn get_memory_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
175 let system_metrics = task_queue.get_system_metrics().await;
176 Ok(HttpResponse::Ok().json(json!({
177 "memory_metrics": system_metrics.memory,
178 "timestamp": Utc::now()
179 })))
180}
181
182#[cfg(feature = "actix-integration")]
183async fn get_registry_info() -> ActixResult<HttpResponse> {
185 #[cfg(feature = "auto-register")]
186 {
187 match TaskRegistry::with_auto_registered() {
188 Ok(registry) => {
189 let registered_tasks = registry.registered_tasks();
190 Ok(HttpResponse::Ok().json(json!({
191 "registry_type": "auto_registered",
192 "task_count": registered_tasks.len(),
193 "tasks": registered_tasks,
194 "features": {
195 "auto_register": true,
196 "inventory_based": true
197 },
198 "timestamp": Utc::now()
199 })))
200 }
201 Err(e) => Ok(HttpResponse::InternalServerError().json(json!({
202 "error": e.to_string(),
203 "registry_type": "auto_registered",
204 "timestamp": Utc::now()
205 }))),
206 }
207 }
208 #[cfg(not(feature = "auto-register"))]
209 {
210 Ok(HttpResponse::Ok().json(json!({
211 "registry_type": "manual",
212 "message": "Auto-registration not available - manual registry in use",
213 "features": {
214 "auto_register": false,
215 "inventory_based": false
216 },
217 "timestamp": Utc::now()
218 })))
219 }
220}
221
222#[cfg(feature = "actix-integration")]
223async fn get_active_alerts(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
225 let performance_report = task_queue.metrics.get_performance_report().await;
226 Ok(HttpResponse::Ok().json(json!({
227 "active_alerts": performance_report.active_alerts,
228 "alert_count": performance_report.active_alerts.len(),
229 "timestamp": Utc::now()
230 })))
231}
232
233#[cfg(feature = "actix-integration")]
234async fn get_sla_status(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
236 let performance_report = task_queue.metrics.get_performance_report().await;
237 let system_metrics = task_queue.get_system_metrics().await;
238
239 Ok(HttpResponse::Ok().json(json!({
240 "sla_violations": performance_report.sla_violations,
241 "violation_count": performance_report.sla_violations.len(),
242 "performance_metrics": system_metrics.performance,
243 "success_rate_percentage": system_metrics.performance.success_rate * 100.0,
244 "error_rate_percentage": system_metrics.performance.error_rate * 100.0,
245 "timestamp": Utc::now()
246 })))
247}
248
249#[cfg(feature = "actix-integration")]
250async fn get_diagnostics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
252 let health_status = task_queue.metrics.get_health_status().await;
253 let performance_report = task_queue.metrics.get_performance_report().await;
254 let worker_count = task_queue.worker_count().await;
255 let is_shutting_down = task_queue.is_shutting_down().await;
256
257 let queues = ["default", "high_priority", "low_priority"];
259 let mut queue_diagnostics = HashMap::new();
260
261 for queue in &queues {
262 if let Ok(metrics) = task_queue.broker.get_queue_metrics(queue).await {
263 queue_diagnostics.insert(queue.to_string(), json!({
264 "pending_tasks": metrics.pending_tasks,
265 "processed_tasks": metrics.processed_tasks,
266 "failed_tasks": metrics.failed_tasks,
267 "health": if metrics.failed_tasks > 0 && metrics.processed_tasks > 0 {
268 let error_rate = metrics.failed_tasks as f64 / (metrics.processed_tasks + metrics.failed_tasks) as f64;
269 if error_rate > 0.1 { "unhealthy" } else if error_rate > 0.05 { "warning" } else { "healthy" }
270 } else {
271 "healthy"
272 }
273 }));
274 }
275 }
276
277 Ok(HttpResponse::Ok().json(json!({
278 "system_health": health_status,
279 "performance_report": performance_report,
280 "worker_diagnostics": {
281 "active_count": worker_count,
282 "shutting_down": is_shutting_down
283 },
284 "queue_diagnostics": queue_diagnostics,
285 "uptime_seconds": performance_report.uptime_seconds,
286 "timestamp": Utc::now()
287 })))
288}
289
290#[cfg(feature = "actix-integration")]
291async fn get_uptime_info(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
293 let system_metrics = task_queue.get_system_metrics().await;
294 let performance_report = task_queue.metrics.get_performance_report().await;
295
296 let uptime_duration = std::time::Duration::from_secs(system_metrics.uptime_seconds);
297 let days = uptime_duration.as_secs() / 86400;
298 let hours = (uptime_duration.as_secs() % 86400) / 3600;
299 let minutes = (uptime_duration.as_secs() % 3600) / 60;
300 let seconds = uptime_duration.as_secs() % 60;
301
302 Ok(HttpResponse::Ok().json(json!({
303 "uptime": {
304 "seconds": system_metrics.uptime_seconds,
305 "formatted": format!("{}d {}h {}m {}s", days, hours, minutes, seconds),
306 "started_at": Utc::now() - Duration::seconds(system_metrics.uptime_seconds as i64)
307 },
308 "runtime_info": {
309 "total_tasks_executed": system_metrics.tasks.total_executed,
310 "total_tasks_succeeded": system_metrics.tasks.total_succeeded,
311 "total_tasks_failed": system_metrics.tasks.total_failed,
312 "success_rate_percentage": system_metrics.performance.success_rate * 100.0,
313 "average_tasks_per_second": system_metrics.performance.tasks_per_second
314 },
315 "performance_summary": {
316 "task_performance": performance_report.task_performance
317 },
318 "timestamp": Utc::now()
319 })))
320}
321
322#[cfg(feature = "actix-integration")]
323async fn get_metrics_summary(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
325 let system_metrics = task_queue.get_system_metrics().await;
326 let performance_report = task_queue.metrics.get_performance_report().await;
327 let worker_count = task_queue.worker_count().await;
328
329 let uptime_duration = std::time::Duration::from_secs(system_metrics.uptime_seconds);
330 let days = uptime_duration.as_secs() / 86400;
331 let hours = (uptime_duration.as_secs() % 86400) / 3600;
332 let minutes = (uptime_duration.as_secs() % 3600) / 60;
333
334 let summary = format!(
335 "TaskQueue Metrics Summary\n\
336 =========================\n\
337 Uptime: {}d {}h {}m\n\
338 Workers: {} active\n\
339 Tasks: {} executed, {} succeeded, {} failed\n\
340 Success Rate: {:.1}%\n\
341 Performance: {:.2} tasks/sec\n\
342 Status: {}",
343 days, hours, minutes,
344 worker_count,
345 system_metrics.tasks.total_executed,
346 system_metrics.tasks.total_succeeded,
347 system_metrics.tasks.total_failed,
348 system_metrics.performance.success_rate * 100.0,
349 system_metrics.performance.tasks_per_second,
350 if performance_report.active_alerts.is_empty() { "Healthy" } else { "Has Alerts" }
351 );
352
353 Ok(HttpResponse::Ok().json(json!({
354 "summary": summary,
355 "timestamp": Utc::now()
356 })))
357}
358
359#[cfg(feature = "actix-integration")]
360pub async fn create_auto_registered_task_queue(
362 redis_url: &str,
363 initial_workers: Option<usize>,
364) -> Result<Arc<TaskQueue>, TaskQueueError> {
365 let mut builder = TaskQueueBuilder::new(redis_url);
366
367 #[cfg(feature = "auto-register")]
368 {
369 builder = builder.auto_register_tasks();
370 }
371
372 if let Some(workers) = initial_workers {
373 builder = builder.initial_workers(workers);
374 }
375
376 Ok(Arc::new(builder.build().await?))
377}
378
379#[cfg(feature = "actix-integration")]
380pub async fn create_task_queue_from_config() -> Result<Arc<TaskQueue>, TaskQueueError> {
382 let task_queue = TaskQueueBuilder::from_global_config()?.build().await?;
383 Ok(Arc::new(task_queue))
384}
385
386#[cfg(feature = "actix-integration")]
387pub async fn auto_configure_task_queue() -> Result<Arc<TaskQueue>, TaskQueueError> {
389 TaskQueueConfig::init_global()?;
391
392 let task_queue = TaskQueueBuilder::from_global_config()?.build().await?;
394 Ok(Arc::new(task_queue))
395}
396
397#[cfg(feature = "actix-integration")]
398pub fn configure_task_queue_routes_auto(cfg: &mut web::ServiceConfig) {
400 if let Some(config) = TaskQueueConfig::global() {
401 #[cfg(feature = "actix-integration")]
402 if config.actix.auto_configure_routes {
403 let prefix = &config.actix.route_prefix;
404 let mut scope = web::scope(prefix);
405
406 if config.actix.enable_health_check {
407 scope = scope
408 .route("/health", web::get().to(detailed_health_check))
409 .route("/status", web::get().to(system_status));
410 }
411
412 if config.actix.enable_metrics {
413 scope = scope
414 .route("/metrics", web::get().to(get_comprehensive_metrics))
415 .route("/metrics/system", web::get().to(get_system_metrics))
416 .route(
417 "/metrics/performance",
418 web::get().to(get_performance_metrics),
419 )
420 .route("/metrics/autoscaler", web::get().to(get_autoscaler_metrics))
421 .route("/metrics/queues", web::get().to(get_queue_metrics))
422 .route("/metrics/workers", web::get().to(get_worker_metrics))
423 .route("/metrics/memory", web::get().to(get_memory_metrics))
424 .route("/metrics/summary", web::get().to(get_metrics_summary))
425 .route("/registry", web::get().to(get_registry_info))
426 .route("/alerts", web::get().to(get_active_alerts))
427 .route("/sla", web::get().to(get_sla_status))
428 .route("/diagnostics", web::get().to(get_diagnostics))
429 .route("/uptime", web::get().to(get_uptime_info));
430 }
431
432 cfg.service(scope);
433 }
434 } else {
435 configure_task_queue_routes(cfg);
437 }
438}
439
440#[cfg(feature = "actix-integration")]
441pub fn configure_task_queue_routes(cfg: &mut web::ServiceConfig) {
443 cfg.service(
444 web::scope("/tasks")
445 .route("/health", web::get().to(detailed_health_check))
447 .route("/status", web::get().to(system_status))
448 .route("/metrics", web::get().to(get_comprehensive_metrics))
450 .route("/metrics/system", web::get().to(get_system_metrics))
451 .route(
452 "/metrics/performance",
453 web::get().to(get_performance_metrics),
454 )
455 .route("/metrics/autoscaler", web::get().to(get_autoscaler_metrics))
456 .route("/metrics/queues", web::get().to(get_queue_metrics))
457 .route("/metrics/workers", web::get().to(get_worker_metrics))
458 .route("/metrics/memory", web::get().to(get_memory_metrics))
459 .route("/metrics/summary", web::get().to(get_metrics_summary))
460 .route("/registry", web::get().to(get_registry_info))
462 .route("/alerts", web::get().to(get_active_alerts))
464 .route("/sla", web::get().to(get_sla_status))
465 .route("/diagnostics", web::get().to(get_diagnostics))
466 .route("/uptime", web::get().to(get_uptime_info)),
467 );
468}
469
470#[cfg(feature = "actix-integration")]
471#[macro_export]
473macro_rules! create_task_endpoint {
474 ($task_type:ty, $queue:expr) => {
475 async fn enqueue_task(
476 task_queue: actix_web::web::Data<std::sync::Arc<$crate::TaskQueue>>,
477 task_data: actix_web::web::Json<$task_type>,
478 ) -> actix_web::Result<actix_web::HttpResponse> {
479 match task_queue.enqueue(task_data.into_inner(), $queue).await {
480 Ok(task_id) => {
481 let mut response = std::collections::HashMap::new();
482 response.insert("task_id", task_id.to_string());
483 response.insert("queue", $queue.to_string());
484 response.insert("status", "enqueued".to_string());
485 Ok(actix_web::HttpResponse::Ok().json(response))
486 }
487 Err(e) => {
488 let mut response = std::collections::HashMap::new();
489 response.insert("error", e.to_string());
490 response.insert("queue", $queue.to_string());
491 Ok(actix_web::HttpResponse::InternalServerError().json(response))
492 }
493 }
494 }
495 };
496}