1use super::ApiResponse;
113use hammerwork::queue::DatabaseQueue;
114use serde::{Deserialize, Serialize};
115use std::collections::HashMap;
116use std::sync::Arc;
117use warp::{Filter, Reply};
118
119#[derive(Debug, Serialize)]
121pub struct SystemOverview {
122 pub total_queues: u32,
123 pub total_jobs: u64,
124 pub pending_jobs: u64,
125 pub running_jobs: u64,
126 pub completed_jobs: u64,
127 pub failed_jobs: u64,
128 pub dead_jobs: u64,
129 pub overall_throughput: f64,
130 pub overall_error_rate: f64,
131 pub avg_processing_time_ms: f64,
132 pub system_health: SystemHealth,
133 pub uptime_seconds: u64,
134 pub last_updated: chrono::DateTime<chrono::Utc>,
135}
136
137#[derive(Debug, Serialize)]
139pub struct SystemHealth {
140 pub status: String, pub database_healthy: bool,
142 pub high_error_rate: bool,
143 pub queue_backlog: bool,
144 pub slow_processing: bool,
145 pub alerts: Vec<SystemAlert>,
146}
147
148#[derive(Debug, Serialize)]
150pub struct SystemAlert {
151 pub severity: String, pub message: String,
153 pub queue: Option<String>,
154 pub metric: Option<String>,
155 pub value: Option<f64>,
156 pub threshold: Option<f64>,
157 pub timestamp: chrono::DateTime<chrono::Utc>,
158}
159
160#[derive(Debug, Serialize)]
162pub struct DetailedStats {
163 pub overview: SystemOverview,
164 pub queue_stats: Vec<QueueStats>,
165 pub hourly_trends: Vec<HourlyTrend>,
166 pub error_patterns: Vec<ErrorPattern>,
167 pub performance_metrics: PerformanceMetrics,
168}
169
170#[derive(Debug, Serialize)]
172pub struct QueueStats {
173 pub name: String,
174 pub pending: u64,
175 pub running: u64,
176 pub completed_total: u64,
177 pub failed_total: u64,
178 pub dead_total: u64,
179 pub throughput_per_minute: f64,
180 pub avg_processing_time_ms: f64,
181 pub error_rate: f64,
182 pub oldest_pending_age_seconds: Option<u64>,
183 pub priority_distribution: HashMap<String, u64>,
184}
185
186#[derive(Debug, Serialize)]
188pub struct HourlyTrend {
189 pub hour: chrono::DateTime<chrono::Utc>,
190 pub completed: u64,
191 pub failed: u64,
192 pub throughput: f64,
193 pub avg_processing_time_ms: f64,
194 pub error_rate: f64,
195}
196
197#[derive(Debug, Serialize)]
199pub struct ErrorPattern {
200 pub error_type: String,
201 pub count: u64,
202 pub percentage: f64,
203 pub sample_message: String,
204 pub first_seen: chrono::DateTime<chrono::Utc>,
205 pub last_seen: chrono::DateTime<chrono::Utc>,
206 pub affected_queues: Vec<String>,
207}
208
209#[derive(Debug, Serialize)]
211pub struct PerformanceMetrics {
212 pub database_response_time_ms: f64,
213 pub average_queue_depth: f64,
214 pub jobs_per_second: f64,
215 pub memory_usage_mb: Option<f64>,
216 pub cpu_usage_percent: Option<f64>,
217 pub active_workers: u32,
218 pub worker_utilization: f64,
219}
220
221#[derive(Debug, Deserialize)]
223pub struct TimeRange {
224 pub start: chrono::DateTime<chrono::Utc>,
225 pub end: chrono::DateTime<chrono::Utc>,
226}
227
228#[derive(Debug, Deserialize)]
230pub struct StatsQuery {
231 pub time_range: Option<TimeRange>,
232 pub queues: Option<Vec<String>>,
233 pub granularity: Option<String>, }
235
236pub fn routes<T>(
238 queue: Arc<T>,
239) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
240where
241 T: DatabaseQueue + Send + Sync + 'static,
242{
243 let queue_filter = warp::any().map(move || queue.clone());
244
245 let overview = warp::path("stats")
246 .and(warp::path("overview"))
247 .and(warp::path::end())
248 .and(warp::get())
249 .and(queue_filter.clone())
250 .and_then(overview_handler);
251
252 let detailed = warp::path("stats")
253 .and(warp::path("detailed"))
254 .and(warp::path::end())
255 .and(warp::get())
256 .and(queue_filter.clone())
257 .and(warp::query::<StatsQuery>())
258 .and_then(detailed_stats_handler);
259
260 let trends = warp::path("stats")
261 .and(warp::path("trends"))
262 .and(warp::path::end())
263 .and(warp::get())
264 .and(queue_filter.clone())
265 .and(warp::query::<StatsQuery>())
266 .and_then(trends_handler);
267
268 let health = warp::path("stats")
269 .and(warp::path("health"))
270 .and(warp::path::end())
271 .and(warp::get())
272 .and(queue_filter)
273 .and_then(health_handler);
274
275 overview.or(detailed).or(trends).or(health)
276}
277
278async fn overview_handler<T>(queue: Arc<T>) -> Result<impl Reply, warp::Rejection>
280where
281 T: DatabaseQueue + Send + Sync,
282{
283 match queue.get_all_queue_stats().await {
284 Ok(all_stats) => {
285 let mut total_pending = 0;
286 let mut total_running = 0;
287 let mut total_completed = 0;
288 let mut total_failed = 0;
289 let mut total_dead = 0;
290 let mut total_throughput = 0.0;
291 let mut total_processing_time = 0.0;
292 let mut queue_count = 0;
293
294 for stats in &all_stats {
295 total_pending += stats.pending_count;
296 total_running += stats.running_count;
297 total_completed += stats.completed_count;
298 total_failed += stats.dead_count + stats.timed_out_count;
299 total_dead += stats.dead_count;
300 total_throughput += stats.statistics.throughput_per_minute;
301 total_processing_time += stats.statistics.avg_processing_time_ms;
302 queue_count += 1;
303 }
304
305 let avg_processing_time = if queue_count > 0 {
306 total_processing_time / queue_count as f64
307 } else {
308 0.0
309 };
310
311 let total_jobs = total_pending + total_running + total_completed + total_failed;
312 let overall_error_rate = if total_jobs > 0 {
313 total_failed as f64 / total_jobs as f64
314 } else {
315 0.0
316 };
317
318 let health = assess_system_health(&all_stats);
320
321 let overview = SystemOverview {
322 total_queues: queue_count,
323 total_jobs,
324 pending_jobs: total_pending,
325 running_jobs: total_running,
326 completed_jobs: total_completed,
327 failed_jobs: total_failed,
328 dead_jobs: total_dead,
329 overall_throughput: total_throughput,
330 overall_error_rate,
331 avg_processing_time_ms: avg_processing_time,
332 system_health: health,
333 uptime_seconds: 0, last_updated: chrono::Utc::now(),
335 };
336
337 Ok(warp::reply::json(&ApiResponse::success(overview)))
338 }
339 Err(e) => {
340 let response = ApiResponse::<()>::error(format!("Failed to get statistics: {}", e));
341 Ok(warp::reply::json(&response))
342 }
343 }
344}
345
346async fn detailed_stats_handler<T>(
348 queue: Arc<T>,
349 query: StatsQuery,
350) -> Result<impl Reply, warp::Rejection>
351where
352 T: DatabaseQueue + Send + Sync,
353{
354 let _ = query;
357
358 match queue.get_all_queue_stats().await {
359 Ok(all_stats) => {
360 let queue_stats: Vec<QueueStats> = all_stats
362 .iter()
363 .map(|stats| QueueStats {
364 name: stats.queue_name.clone(),
365 pending: stats.pending_count,
366 running: stats.running_count,
367 completed_total: stats.completed_count,
368 failed_total: stats.dead_count + stats.timed_out_count,
369 dead_total: stats.dead_count,
370 throughput_per_minute: stats.statistics.throughput_per_minute,
371 avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
372 error_rate: stats.statistics.error_rate,
373 oldest_pending_age_seconds: None, priority_distribution: HashMap::new(), })
376 .collect();
377
378 let hourly_trends = Vec::new();
380 let error_patterns = Vec::new();
381 let performance_metrics = PerformanceMetrics {
382 database_response_time_ms: 5.0, average_queue_depth: 10.5, jobs_per_second: 2.5, memory_usage_mb: None,
386 cpu_usage_percent: None,
387 active_workers: 4, worker_utilization: 0.75, };
390
391 let overview = generate_overview_from_stats(&all_stats);
393
394 let detailed = DetailedStats {
395 overview,
396 queue_stats,
397 hourly_trends,
398 error_patterns,
399 performance_metrics,
400 };
401
402 Ok(warp::reply::json(&ApiResponse::success(detailed)))
403 }
404 Err(e) => {
405 let response =
406 ApiResponse::<()>::error(format!("Failed to get detailed statistics: {}", e));
407 Ok(warp::reply::json(&response))
408 }
409 }
410}
411
412async fn trends_handler<T>(queue: Arc<T>, query: StatsQuery) -> Result<impl Reply, warp::Rejection>
414where
415 T: DatabaseQueue + Send + Sync,
416{
417 let _ = (queue, query);
420
421 let trends: Vec<HourlyTrend> = (0..24)
422 .map(|hour| HourlyTrend {
423 hour: chrono::Utc::now() - chrono::Duration::hours(23 - hour),
424 completed: (hour * 10 + 50) as u64,
425 failed: (hour / 4) as u64,
426 throughput: 5.0 + (hour as f64 * 0.5),
427 avg_processing_time_ms: 100.0 + (hour as f64 * 2.0),
428 error_rate: 0.01 + (hour as f64 * 0.001),
429 })
430 .collect();
431
432 Ok(warp::reply::json(&ApiResponse::success(trends)))
433}
434
435async fn health_handler<T>(queue: Arc<T>) -> Result<impl Reply, warp::Rejection>
437where
438 T: DatabaseQueue + Send + Sync,
439{
440 match queue.get_all_queue_stats().await {
441 Ok(all_stats) => {
442 let health = assess_system_health(&all_stats);
443 Ok(warp::reply::json(&ApiResponse::success(health)))
444 }
445 Err(e) => {
446 let health = SystemHealth {
447 status: "critical".to_string(),
448 database_healthy: false,
449 high_error_rate: false,
450 queue_backlog: false,
451 slow_processing: false,
452 alerts: vec![SystemAlert {
453 severity: "critical".to_string(),
454 message: format!("Database connection failed: {}", e),
455 queue: None,
456 metric: Some("database_connectivity".to_string()),
457 value: None,
458 threshold: None,
459 timestamp: chrono::Utc::now(),
460 }],
461 };
462 Ok(warp::reply::json(&ApiResponse::success(health)))
463 }
464 }
465}
466
467fn assess_system_health(stats: &[hammerwork::stats::QueueStats]) -> SystemHealth {
469 let mut alerts = Vec::new();
470 let mut high_error_rate = false;
471 let mut queue_backlog = false;
472 let mut slow_processing = false;
473
474 for stat in stats {
475 if stat.statistics.error_rate > 0.1 {
477 high_error_rate = true;
479 alerts.push(SystemAlert {
480 severity: "warning".to_string(),
481 message: format!("High error rate in queue '{}'", stat.queue_name),
482 queue: Some(stat.queue_name.clone()),
483 metric: Some("error_rate".to_string()),
484 value: Some(stat.statistics.error_rate),
485 threshold: Some(0.1),
486 timestamp: chrono::Utc::now(),
487 });
488 }
489
490 if stat.pending_count > 1000 {
492 queue_backlog = true;
493 alerts.push(SystemAlert {
494 severity: "warning".to_string(),
495 message: format!("Large backlog in queue '{}'", stat.queue_name),
496 queue: Some(stat.queue_name.clone()),
497 metric: Some("pending_count".to_string()),
498 value: Some(stat.pending_count as f64),
499 threshold: Some(1000.0),
500 timestamp: chrono::Utc::now(),
501 });
502 }
503
504 if stat.statistics.avg_processing_time_ms > 30000.0 {
506 slow_processing = true;
508 alerts.push(SystemAlert {
509 severity: "info".to_string(),
510 message: format!("Slow processing in queue '{}'", stat.queue_name),
511 queue: Some(stat.queue_name.clone()),
512 metric: Some("avg_processing_time_ms".to_string()),
513 value: Some(stat.statistics.avg_processing_time_ms),
514 threshold: Some(30000.0),
515 timestamp: chrono::Utc::now(),
516 });
517 }
518 }
519
520 let status = if alerts.iter().any(|a| a.severity == "critical") {
521 "critical"
522 } else if alerts.iter().any(|a| a.severity == "warning") {
523 "degraded"
524 } else {
525 "healthy"
526 };
527
528 SystemHealth {
529 status: status.to_string(),
530 database_healthy: true, high_error_rate,
532 queue_backlog,
533 slow_processing,
534 alerts,
535 }
536}
537
538fn generate_overview_from_stats(stats: &[hammerwork::stats::QueueStats]) -> SystemOverview {
540 let mut total_pending = 0;
541 let mut total_running = 0;
542 let mut total_completed = 0;
543 let mut total_failed = 0;
544 let mut total_dead = 0;
545 let mut total_throughput = 0.0;
546 let mut total_processing_time = 0.0;
547 let queue_count = stats.len();
548
549 for stat in stats {
550 total_pending += stat.pending_count;
551 total_running += stat.running_count;
552 total_completed += stat.completed_count;
553 total_failed += stat.dead_count + stat.timed_out_count;
554 total_dead += stat.dead_count;
555 total_throughput += stat.statistics.throughput_per_minute;
556 total_processing_time += stat.statistics.avg_processing_time_ms;
557 }
558
559 let avg_processing_time = if queue_count > 0 {
560 total_processing_time / queue_count as f64
561 } else {
562 0.0
563 };
564
565 let total_jobs = total_pending + total_running + total_completed + total_failed;
566 let overall_error_rate = if total_jobs > 0 {
567 total_failed as f64 / total_jobs as f64
568 } else {
569 0.0
570 };
571
572 let health = assess_system_health(stats);
573
574 SystemOverview {
575 total_queues: queue_count as u32,
576 total_jobs,
577 pending_jobs: total_pending,
578 running_jobs: total_running,
579 completed_jobs: total_completed,
580 failed_jobs: total_failed,
581 dead_jobs: total_dead,
582 overall_throughput: total_throughput,
583 overall_error_rate,
584 avg_processing_time_ms: avg_processing_time,
585 system_health: health,
586 uptime_seconds: 0,
587 last_updated: chrono::Utc::now(),
588 }
589}
590
591#[cfg(test)]
592mod tests {
593 use super::*;
594
595 #[test]
596 fn test_stats_query_deserialization() {
597 let json = r#"{
598 "time_range": {
599 "start": "2024-01-01T00:00:00Z",
600 "end": "2024-01-02T00:00:00Z"
601 },
602 "queues": ["email", "data-processing"],
603 "granularity": "hour"
604 }"#;
605
606 let query: StatsQuery = serde_json::from_str(json).unwrap();
607 assert!(query.time_range.is_some());
608 assert_eq!(query.queues.as_ref().unwrap().len(), 2);
609 assert_eq!(query.granularity, Some("hour".to_string()));
610 }
611
612 #[test]
613 fn test_system_alert_serialization() {
614 let alert = SystemAlert {
615 severity: "warning".to_string(),
616 message: "High error rate detected".to_string(),
617 queue: Some("email".to_string()),
618 metric: Some("error_rate".to_string()),
619 value: Some(0.15),
620 threshold: Some(0.1),
621 timestamp: chrono::Utc::now(),
622 };
623
624 let json = serde_json::to_string(&alert).unwrap();
625 assert!(json.contains("warning"));
626 assert!(json.contains("High error rate"));
627 }
628}