1use crate::prelude::*;
4use crate::queue_names;
5#[cfg(feature = "actix-integration")]
6use actix_web::{web, HttpResponse, Result as ActixResult};
7use serde_json::json;
8use std::collections::HashMap;
9use std::sync::Arc;
10
11#[cfg(feature = "actix-integration")]
12async fn detailed_health_check(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
14 match task_queue.health_check().await {
15 Ok(health_status) => Ok(HttpResponse::Ok().json(health_status)),
16 Err(e) => Ok(HttpResponse::ServiceUnavailable().json(json!({
17 "status": "unhealthy",
18 "error": e.to_string(),
19 "timestamp": Utc::now()
20 }))),
21 }
22}
23
24#[cfg(feature = "actix-integration")]
25async fn system_status(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
27 let health_status = task_queue.metrics.get_health_status().await;
28 let worker_count = task_queue.worker_count().await;
29 let is_shutting_down = task_queue.is_shutting_down().await;
30
31 Ok(HttpResponse::Ok().json(json!({
32 "health": health_status,
33 "workers": {
34 "active_count": worker_count,
35 "shutting_down": is_shutting_down
36 },
37 "timestamp": Utc::now()
38 })))
39}
40
41#[cfg(feature = "actix-integration")]
42async fn get_comprehensive_metrics(
44 task_queue: web::Data<Arc<TaskQueue>>,
45) -> ActixResult<HttpResponse> {
46 let system_metrics = task_queue.get_system_metrics().await;
48 let worker_count = task_queue.worker_count().await;
49
50 match tokio::try_join!(
51 task_queue.get_metrics(),
52 task_queue.autoscaler.collect_metrics(),
53 ) {
54 Ok((queue_metrics, autoscaler_metrics)) => Ok(HttpResponse::Ok().json(json!({
55 "timestamp": Utc::now(),
56 "task_queue_metrics": queue_metrics,
57 "system_metrics": system_metrics,
58 "autoscaler_metrics": autoscaler_metrics,
59 "worker_count": worker_count
60 }))),
61 Err(e) => Ok(HttpResponse::InternalServerError().json(json!({
62 "error": e.to_string(),
63 "timestamp": Utc::now()
64 }))),
65 }
66}
67
68#[cfg(feature = "actix-integration")]
69async fn get_system_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
71 let system_metrics = task_queue.get_system_metrics().await;
72 Ok(HttpResponse::Ok().json(system_metrics))
73}
74
75#[cfg(feature = "actix-integration")]
76async fn get_performance_metrics(
78 task_queue: web::Data<Arc<TaskQueue>>,
79) -> ActixResult<HttpResponse> {
80 let performance_report = task_queue.metrics.get_performance_report().await;
81 Ok(HttpResponse::Ok().json(performance_report))
82}
83
84#[cfg(feature = "actix-integration")]
85async fn get_autoscaler_metrics(
87 task_queue: web::Data<Arc<TaskQueue>>,
88) -> ActixResult<HttpResponse> {
89 match tokio::try_join!(task_queue.autoscaler.collect_metrics(),) {
90 Ok(metrics) => Ok(HttpResponse::Ok().json(json!({
91 "metrics": metrics,
92 "timestamp": Utc::now()
93 }))),
94 Err(e) => Ok(HttpResponse::InternalServerError().json(json!({
95 "error": e.to_string(),
96 "timestamp": Utc::now()
97 }))),
98 }
99}
100
101#[cfg(feature = "actix-integration")]
102async fn get_queue_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
104 let queues = [
105 queue_names::DEFAULT,
106 queue_names::LOW_PRIORITY,
107 queue_names::HIGH_PRIORITY,
108 ];
109 let mut queue_metrics = Vec::new();
110 let mut errors = Vec::new();
111
112 for queue in &queues {
113 match task_queue.broker.get_queue_metrics(queue).await {
114 Ok(metrics) => queue_metrics.push(metrics),
115 Err(e) => errors.push(json!({
116 "queue": queue,
117 "error": e.to_string()
118 })),
119 }
120 }
121
122 Ok(HttpResponse::Ok().json(json!({
123 "queue_metrics": queue_metrics,
124 "errors": errors,
125 "timestamp": Utc::now()
126 })))
127}
128
129#[cfg(feature = "actix-integration")]
130async fn get_worker_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
132 let worker_count = task_queue.worker_count().await;
133 let system_metrics = task_queue.get_system_metrics().await;
134
135 Ok(HttpResponse::Ok().json(json!({
136 "active_workers": worker_count,
137 "worker_metrics": system_metrics.workers,
138 "is_shutting_down": task_queue.is_shutting_down().await,
139 "timestamp": Utc::now()
140 })))
141}
142
143#[cfg(feature = "actix-integration")]
144async fn get_memory_metrics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
146 let system_metrics = task_queue.get_system_metrics().await;
147 Ok(HttpResponse::Ok().json(json!({
148 "memory_metrics": system_metrics.memory,
149 "timestamp": Utc::now()
150 })))
151}
152
153#[cfg(not(all(feature = "actix-integration", feature = "auto-register")))]
154async fn get_registered_tasks() -> ActixResult<HttpResponse> {
156 Ok(HttpResponse::Ok().json(json!({
157 "message": "Auto-registration feature not enabled",
158 "auto_registration_enabled": false,
159 "timestamp": Utc::now()
160 })))
161}
162
163#[cfg(feature = "actix-integration")]
164async fn get_registry_info() -> ActixResult<HttpResponse> {
166 #[cfg(feature = "auto-register")]
167 {
168 match TaskRegistry::with_auto_registered() {
169 Ok(registry) => {
170 let registered_tasks = registry.registered_tasks();
171 Ok(HttpResponse::Ok().json(json!({
172 "registry_type": "auto_registered",
173 "task_count": registered_tasks.len(),
174 "tasks": registered_tasks,
175 "features": {
176 "auto_register": true,
177 "inventory_based": true
178 },
179 "timestamp": Utc::now()
180 })))
181 }
182 Err(e) => Ok(HttpResponse::InternalServerError().json(json!({
183 "error": e.to_string(),
184 "registry_type": "auto_registered",
185 "timestamp": Utc::now()
186 }))),
187 }
188 }
189 #[cfg(not(feature = "auto-register"))]
190 {
191 Ok(HttpResponse::Ok().json(json!({
192 "registry_type": "manual",
193 "message": "Auto-registration not available - manual registry in use",
194 "features": {
195 "auto_register": false,
196 "inventory_based": false
197 },
198 "timestamp": Utc::now()
199 })))
200 }
201}
202
203#[cfg(feature = "actix-integration")]
204async fn get_active_alerts(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
206 let performance_report = task_queue.metrics.get_performance_report().await;
207 Ok(HttpResponse::Ok().json(json!({
208 "active_alerts": performance_report.active_alerts,
209 "alert_count": performance_report.active_alerts.len(),
210 "timestamp": Utc::now()
211 })))
212}
213
214#[cfg(feature = "actix-integration")]
215async fn get_sla_status(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
217 let performance_report = task_queue.metrics.get_performance_report().await;
218 let system_metrics = task_queue.get_system_metrics().await;
219
220 Ok(HttpResponse::Ok().json(json!({
221 "sla_violations": performance_report.sla_violations,
222 "violation_count": performance_report.sla_violations.len(),
223 "performance_metrics": system_metrics.performance,
224 "success_rate_percentage": system_metrics.performance.success_rate * 100.0,
225 "error_rate_percentage": system_metrics.performance.error_rate * 100.0,
226 "timestamp": Utc::now()
227 })))
228}
229
230#[cfg(feature = "actix-integration")]
231async fn get_diagnostics(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
233 let health_status = task_queue.metrics.get_health_status().await;
234 let performance_report = task_queue.metrics.get_performance_report().await;
235 let worker_count = task_queue.worker_count().await;
236 let is_shutting_down = task_queue.is_shutting_down().await;
237
238 let queues = ["default", "high_priority", "low_priority"];
240 let mut queue_diagnostics = HashMap::new();
241
242 for queue in &queues {
243 if let Ok(metrics) = task_queue.broker.get_queue_metrics(queue).await {
244 queue_diagnostics.insert(queue.to_string(), json!({
245 "pending_tasks": metrics.pending_tasks,
246 "processed_tasks": metrics.processed_tasks,
247 "failed_tasks": metrics.failed_tasks,
248 "health": if metrics.failed_tasks > 0 && metrics.processed_tasks > 0 {
249 let error_rate = metrics.failed_tasks as f64 / (metrics.processed_tasks + metrics.failed_tasks) as f64;
250 if error_rate > 0.1 { "unhealthy" } else if error_rate > 0.05 { "warning" } else { "healthy" }
251 } else {
252 "healthy"
253 }
254 }));
255 }
256 }
257
258 Ok(HttpResponse::Ok().json(json!({
259 "system_health": health_status,
260 "performance_report": performance_report,
261 "worker_diagnostics": {
262 "active_count": worker_count,
263 "shutting_down": is_shutting_down
264 },
265 "queue_diagnostics": queue_diagnostics,
266 "uptime_seconds": performance_report.uptime_seconds,
267 "timestamp": Utc::now()
268 })))
269}
270
271#[cfg(feature = "actix-integration")]
272async fn get_uptime_info(task_queue: web::Data<Arc<TaskQueue>>) -> ActixResult<HttpResponse> {
274 let system_metrics = task_queue.get_system_metrics().await;
275 let performance_report = task_queue.metrics.get_performance_report().await;
276
277 let uptime_duration = std::time::Duration::from_secs(system_metrics.uptime_seconds);
278 let days = uptime_duration.as_secs() / 86400;
279 let hours = (uptime_duration.as_secs() % 86400) / 3600;
280 let minutes = (uptime_duration.as_secs() % 3600) / 60;
281 let seconds = uptime_duration.as_secs() % 60;
282
283 Ok(HttpResponse::Ok().json(json!({
284 "uptime": {
285 "seconds": system_metrics.uptime_seconds,
286 "formatted": format!("{}d {}h {}m {}s", days, hours, minutes, seconds),
287 "started_at": Utc::now() - Duration::seconds(system_metrics.uptime_seconds as i64)
288 },
289 "runtime_info": {
290 "total_tasks_executed": system_metrics.tasks.total_executed,
291 "total_tasks_succeeded": system_metrics.tasks.total_succeeded,
292 "total_tasks_failed": system_metrics.tasks.total_failed,
293 "success_rate_percentage": system_metrics.performance.success_rate * 100.0,
294 "average_tasks_per_second": system_metrics.performance.tasks_per_second
295 },
296 "performance_summary": {
297 "task_performance": performance_report.task_performance
298 },
299 "timestamp": Utc::now()
300 })))
301}
302
303#[cfg(feature = "actix-integration")]
304pub async fn create_auto_registered_task_queue(
306 redis_url: &str,
307 initial_workers: Option<usize>,
308) -> Result<Arc<TaskQueue>, TaskQueueError> {
309 let mut builder = TaskQueueBuilder::new(redis_url);
310
311 #[cfg(feature = "auto-register")]
312 {
313 builder = builder.auto_register_tasks();
314 }
315
316 if let Some(workers) = initial_workers {
317 builder = builder.initial_workers(workers);
318 }
319
320 Ok(Arc::new(builder.build().await?))
321}
322
323#[cfg(feature = "actix-integration")]
324pub async fn create_task_queue_from_config() -> Result<Arc<TaskQueue>, TaskQueueError> {
326 let task_queue = TaskQueueBuilder::from_global_config()?.build().await?;
327 Ok(Arc::new(task_queue))
328}
329
330#[cfg(feature = "actix-integration")]
331pub async fn auto_configure_task_queue() -> Result<Arc<TaskQueue>, TaskQueueError> {
333 TaskQueueConfig::init_global()?;
335
336 let task_queue = TaskQueueBuilder::from_global_config()?.build().await?;
338 Ok(Arc::new(task_queue))
339}
340
341#[cfg(feature = "actix-integration")]
342pub fn configure_task_queue_routes_auto(cfg: &mut web::ServiceConfig) {
344 if let Some(config) = TaskQueueConfig::global() {
345 #[cfg(feature = "actix-integration")]
346 if config.actix.auto_configure_routes {
347 let prefix = &config.actix.route_prefix;
348 let mut scope = web::scope(prefix);
349
350 if config.actix.enable_health_check {
351 scope = scope
352 .route("/health", web::get().to(detailed_health_check))
353 .route("/status", web::get().to(system_status));
354 }
355
356 if config.actix.enable_metrics {
357 scope = scope
358 .route("/metrics", web::get().to(get_comprehensive_metrics))
359 .route("/metrics/system", web::get().to(get_system_metrics))
360 .route(
361 "/metrics/performance",
362 web::get().to(get_performance_metrics),
363 )
364 .route("/metrics/autoscaler", web::get().to(get_autoscaler_metrics))
365 .route("/metrics/queues", web::get().to(get_queue_metrics))
366 .route("/metrics/workers", web::get().to(get_worker_metrics))
367 .route("/metrics/memory", web::get().to(get_memory_metrics))
368 .route("/registry", web::get().to(get_registry_info))
369 .route("/alerts", web::get().to(get_active_alerts))
370 .route("/sla", web::get().to(get_sla_status))
371 .route("/diagnostics", web::get().to(get_diagnostics))
372 .route("/uptime", web::get().to(get_uptime_info));
373 }
374
375 cfg.service(scope);
376 }
377 } else {
378 configure_task_queue_routes(cfg);
380 }
381}
382
383#[cfg(feature = "actix-integration")]
384pub fn configure_task_queue_routes(cfg: &mut web::ServiceConfig) {
386 cfg.service(
387 web::scope("/tasks")
388 .route("/health", web::get().to(detailed_health_check))
390 .route("/status", web::get().to(system_status))
391 .route("/metrics", web::get().to(get_comprehensive_metrics))
393 .route("/metrics/system", web::get().to(get_system_metrics))
394 .route(
395 "/metrics/performance",
396 web::get().to(get_performance_metrics),
397 )
398 .route("/metrics/autoscaler", web::get().to(get_autoscaler_metrics))
399 .route("/metrics/queues", web::get().to(get_queue_metrics))
400 .route("/metrics/workers", web::get().to(get_worker_metrics))
401 .route("/metrics/memory", web::get().to(get_memory_metrics))
402 .route("/registry", web::get().to(get_registry_info))
404 .route("/alerts", web::get().to(get_active_alerts))
406 .route("/sla", web::get().to(get_sla_status))
407 .route("/diagnostics", web::get().to(get_diagnostics))
408 .route("/uptime", web::get().to(get_uptime_info)),
409 );
410}
411
412#[cfg(feature = "actix-integration")]
413#[macro_export]
415macro_rules! create_task_endpoint {
416 ($task_type:ty, $queue:expr) => {
417 async fn enqueue_task(
418 task_queue: actix_web::web::Data<std::sync::Arc<$crate::TaskQueue>>,
419 task_data: actix_web::web::Json<$task_type>,
420 ) -> actix_web::Result<actix_web::HttpResponse> {
421 match task_queue.enqueue(task_data.into_inner(), $queue).await {
422 Ok(task_id) => {
423 let mut response = std::collections::HashMap::new();
424 response.insert("task_id", task_id.to_string());
425 response.insert("queue", $queue.to_string());
426 response.insert("status", "enqueued".to_string());
427 Ok(actix_web::HttpResponse::Ok().json(response))
428 }
429 Err(e) => {
430 let mut response = std::collections::HashMap::new();
431 response.insert("error", e.to_string());
432 response.insert("queue", $queue.to_string());
433 Ok(actix_web::HttpResponse::InternalServerError().json(response))
434 }
435 }
436 }
437 };
438}