Skip to main content

hammerwork_web/api/
stats.rs

1//! Statistics and monitoring API endpoints.
2//!
3//! This module provides comprehensive monitoring and analytics endpoints for tracking
4//! system health, performance metrics, and operational insights across all job queues.
5//!
6//! # API Endpoints
7//!
8//! - `GET /api/stats/overview` - System overview with key metrics
9//! - `GET /api/stats/detailed` - Detailed statistics with historical trends
10//! - `GET /api/stats/trends` - Hourly/daily trend analysis
11//! - `GET /api/stats/health` - System health check and alerts
12//!
13//! # Examples
14//!
15//! ## System Overview
16//!
17//! ```rust
18//! use hammerwork_web::api::stats::{SystemOverview, SystemHealth, SystemAlert};
19//! use chrono::Utc;
20//!
21//! let overview = SystemOverview {
22//!     total_queues: 5,
23//!     total_jobs: 10000,
24//!     pending_jobs: 50,
25//!     running_jobs: 10,
26//!     completed_jobs: 9800,
27//!     failed_jobs: 125,
28//!     dead_jobs: 15,
29//!     overall_throughput: 150.5,
30//!     overall_error_rate: 0.0125,
31//!     avg_processing_time_ms: 250.0,
32//!     system_health: SystemHealth {
33//!         status: "healthy".to_string(),
34//!         database_healthy: true,
35//!         high_error_rate: false,
36//!         queue_backlog: false,
37//!         slow_processing: false,
38//!         alerts: vec![],
39//!     },
40//!     uptime_seconds: 86400,
41//!     last_updated: Utc::now(),
42//! };
43//!
44//! assert_eq!(overview.total_queues, 5);
45//! assert_eq!(overview.overall_error_rate, 0.0125);
46//! assert_eq!(overview.system_health.status, "healthy");
47//! ```
48//!
49//! ## Statistics Queries
50//!
51//! ```rust
52//! use hammerwork_web::api::stats::{StatsQuery, TimeRange};
53//! use chrono::{Utc, Duration};
54//!
55//! let time_range = TimeRange {
56//!     start: Utc::now() - Duration::hours(24),
57//!     end: Utc::now(),
58//! };
59//!
60//! let query = StatsQuery {
61//!     time_range: Some(time_range),
62//!     queues: Some(vec!["email".to_string(), "notifications".to_string()]),
63//!     granularity: Some("hour".to_string()),
64//! };
65//!
66//! assert!(query.time_range.is_some());
67//! assert_eq!(query.queues.as_ref().unwrap().len(), 2);
68//! assert_eq!(query.granularity, Some("hour".to_string()));
69//! ```
70//!
71//! ## System Alerts
72//!
73//! ```rust
74//! use hammerwork_web::api::stats::SystemAlert;
75//! use chrono::Utc;
76//!
77//! let alert = SystemAlert {
78//!     severity: "warning".to_string(),
79//!     message: "Queue backlog detected".to_string(),
80//!     queue: Some("image_processing".to_string()),
81//!     metric: Some("pending_count".to_string()),
82//!     value: Some(1500.0),
83//!     threshold: Some(1000.0),
84//!     timestamp: Utc::now(),
85//! };
86//!
87//! assert_eq!(alert.severity, "warning");
88//! assert_eq!(alert.queue, Some("image_processing".to_string()));
89//! assert_eq!(alert.value, Some(1500.0));
90//! ```
91//!
92//! ## Performance Metrics
93//!
94//! ```rust
95//! use hammerwork_web::api::stats::PerformanceMetrics;
96//!
97//! let metrics = PerformanceMetrics {
98//!     database_response_time_ms: 5.2,
99//!     average_queue_depth: 15.5,
100//!     jobs_per_second: 8.3,
101//!     memory_usage_mb: Some(512.0),
102//!     cpu_usage_percent: Some(45.2),
103//!     active_workers: 12,
104//!     worker_utilization: 0.75,
105//! };
106//!
107//! assert_eq!(metrics.database_response_time_ms, 5.2);
108//! assert_eq!(metrics.active_workers, 12);
109//! assert_eq!(metrics.worker_utilization, 0.75);
110//! ```
111
112use super::ApiResponse;
113use hammerwork::queue::DatabaseQueue;
114use serde::{Deserialize, Serialize};
115use std::collections::HashMap;
116use std::sync::Arc;
117use warp::{Filter, Reply};
118
119/// System overview statistics
120#[derive(Debug, Serialize)]
121pub struct SystemOverview {
122    pub total_queues: u32,
123    pub total_jobs: u64,
124    pub pending_jobs: u64,
125    pub running_jobs: u64,
126    pub completed_jobs: u64,
127    pub failed_jobs: u64,
128    pub dead_jobs: u64,
129    pub overall_throughput: f64,
130    pub overall_error_rate: f64,
131    pub avg_processing_time_ms: f64,
132    pub system_health: SystemHealth,
133    pub uptime_seconds: u64,
134    pub last_updated: chrono::DateTime<chrono::Utc>,
135}
136
137/// System health status
138#[derive(Debug, Serialize)]
139pub struct SystemHealth {
140    pub status: String, // "healthy", "degraded", "critical"
141    pub database_healthy: bool,
142    pub high_error_rate: bool,
143    pub queue_backlog: bool,
144    pub slow_processing: bool,
145    pub alerts: Vec<SystemAlert>,
146}
147
148/// System alert
149#[derive(Debug, Serialize)]
150pub struct SystemAlert {
151    pub severity: String, // "info", "warning", "error", "critical"
152    pub message: String,
153    pub queue: Option<String>,
154    pub metric: Option<String>,
155    pub value: Option<f64>,
156    pub threshold: Option<f64>,
157    pub timestamp: chrono::DateTime<chrono::Utc>,
158}
159
160/// Detailed statistics for monitoring
161#[derive(Debug, Serialize)]
162pub struct DetailedStats {
163    pub overview: SystemOverview,
164    pub queue_stats: Vec<QueueStats>,
165    pub hourly_trends: Vec<HourlyTrend>,
166    pub error_patterns: Vec<ErrorPattern>,
167    pub performance_metrics: PerformanceMetrics,
168}
169
170/// Queue statistics
171#[derive(Debug, Serialize)]
172pub struct QueueStats {
173    pub name: String,
174    pub pending: u64,
175    pub running: u64,
176    pub completed_total: u64,
177    pub failed_total: u64,
178    pub dead_total: u64,
179    pub throughput_per_minute: f64,
180    pub avg_processing_time_ms: f64,
181    pub error_rate: f64,
182    pub oldest_pending_age_seconds: Option<u64>,
183    pub priority_distribution: HashMap<String, f32>,
184}
185
186/// Hourly trend data
187#[derive(Debug, Serialize)]
188pub struct HourlyTrend {
189    pub hour: chrono::DateTime<chrono::Utc>,
190    pub completed: u64,
191    pub failed: u64,
192    pub throughput: f64,
193    pub avg_processing_time_ms: f64,
194    pub error_rate: f64,
195}
196
197/// Error pattern analysis
198#[derive(Debug, Serialize)]
199pub struct ErrorPattern {
200    pub error_type: String,
201    pub count: u64,
202    pub percentage: f64,
203    pub sample_message: String,
204    pub first_seen: chrono::DateTime<chrono::Utc>,
205    pub last_seen: chrono::DateTime<chrono::Utc>,
206    pub affected_queues: Vec<String>,
207}
208
209/// Performance metrics
210#[derive(Debug, Serialize)]
211pub struct PerformanceMetrics {
212    pub database_response_time_ms: f64,
213    pub average_queue_depth: f64,
214    pub jobs_per_second: f64,
215    pub memory_usage_mb: Option<f64>,
216    pub cpu_usage_percent: Option<f64>,
217    pub active_workers: u32,
218    pub worker_utilization: f64,
219}
220
221/// Time range for statistics queries
222#[derive(Debug, Deserialize)]
223pub struct TimeRange {
224    pub start: chrono::DateTime<chrono::Utc>,
225    pub end: chrono::DateTime<chrono::Utc>,
226}
227
228/// Statistics query parameters
229#[derive(Debug, Deserialize)]
230pub struct StatsQuery {
231    pub time_range: Option<TimeRange>,
232    pub queues: Option<Vec<String>>,
233    pub granularity: Option<String>, // "hour", "day", "week"
234}
235
236/// Create statistics routes
237pub fn routes<T>(
238    queue: Arc<T>,
239    system_state: Arc<tokio::sync::RwLock<crate::api::system::SystemState>>,
240) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
241where
242    T: DatabaseQueue + Send + Sync + 'static,
243{
244    let queue_filter = warp::any().map(move || queue.clone());
245    let state_filter = warp::any().map(move || system_state.clone());
246
247    let overview = warp::path("stats")
248        .and(warp::path("overview"))
249        .and(warp::path::end())
250        .and(warp::get())
251        .and(queue_filter.clone())
252        .and(state_filter.clone())
253        .and_then(overview_handler);
254
255    let detailed = warp::path("stats")
256        .and(warp::path("detailed"))
257        .and(warp::path::end())
258        .and(warp::get())
259        .and(queue_filter.clone())
260        .and(warp::query::<StatsQuery>())
261        .and_then(detailed_stats_handler);
262
263    let trends = warp::path("stats")
264        .and(warp::path("trends"))
265        .and(warp::path::end())
266        .and(warp::get())
267        .and(queue_filter.clone())
268        .and(warp::query::<StatsQuery>())
269        .and_then(trends_handler);
270
271    let health = warp::path("stats")
272        .and(warp::path("health"))
273        .and(warp::path::end())
274        .and(warp::get())
275        .and(queue_filter)
276        .and_then(health_handler);
277
278    overview.or(detailed).or(trends).or(health)
279}
280
281/// Handler for system overview statistics
282async fn overview_handler<T>(
283    queue: Arc<T>,
284    system_state: Arc<tokio::sync::RwLock<crate::api::system::SystemState>>,
285) -> Result<impl Reply, warp::Rejection>
286where
287    T: DatabaseQueue + Send + Sync,
288{
289    match queue.get_all_queue_stats().await {
290        Ok(all_stats) => {
291            let mut total_pending = 0;
292            let mut total_running = 0;
293            let mut total_completed = 0;
294            let mut total_failed = 0;
295            let mut total_dead = 0;
296            let mut total_throughput = 0.0;
297            let mut total_processing_time = 0.0;
298            let mut queue_count = 0;
299
300            for stats in &all_stats {
301                total_pending += stats.pending_count;
302                total_running += stats.running_count;
303                total_completed += stats.completed_count;
304                total_failed += stats.dead_count + stats.timed_out_count;
305                total_dead += stats.dead_count;
306                total_throughput += stats.statistics.throughput_per_minute;
307                total_processing_time += stats.statistics.avg_processing_time_ms;
308                queue_count += 1;
309            }
310
311            let avg_processing_time = if queue_count > 0 {
312                total_processing_time / queue_count as f64
313            } else {
314                0.0
315            };
316
317            let total_jobs = total_pending + total_running + total_completed + total_failed;
318            let overall_error_rate = if total_jobs > 0 {
319                total_failed as f64 / total_jobs as f64
320            } else {
321                0.0
322            };
323
324            // Generate system health assessment
325            let health = assess_system_health(&all_stats);
326
327            let overview = SystemOverview {
328                total_queues: queue_count,
329                total_jobs,
330                pending_jobs: total_pending,
331                running_jobs: total_running,
332                completed_jobs: total_completed,
333                failed_jobs: total_failed,
334                dead_jobs: total_dead,
335                overall_throughput: total_throughput,
336                overall_error_rate,
337                avg_processing_time_ms: avg_processing_time,
338                system_health: health,
339                uptime_seconds: {
340                    let state = system_state.read().await;
341                    state.uptime_seconds() as u64
342                },
343                last_updated: chrono::Utc::now(),
344            };
345
346            Ok(warp::reply::json(&ApiResponse::success(overview)))
347        }
348        Err(e) => {
349            let response = ApiResponse::<()>::error(format!("Failed to get statistics: {}", e));
350            Ok(warp::reply::json(&response))
351        }
352    }
353}
354
355/// Handler for detailed statistics
356async fn detailed_stats_handler<T>(
357    queue: Arc<T>,
358    query: StatsQuery,
359) -> Result<impl Reply, warp::Rejection>
360where
361    T: DatabaseQueue + Send + Sync,
362{
363    // For now, return basic stats. In a real implementation, this would
364    // use the time_range and other query parameters to fetch historical data
365    let _ = query;
366
367    match queue.get_all_queue_stats().await {
368        Ok(all_stats) => {
369            // Convert hammerwork stats to our API format
370            let mut queue_stats: Vec<QueueStats> = Vec::new();
371            for stats in all_stats.iter() {
372                // Calculate oldest pending age seconds
373                let oldest_pending_age_seconds =
374                    calculate_oldest_pending_age(&queue, &stats.queue_name).await;
375
376                // Get priority distribution from priority stats
377                let priority_distribution =
378                    get_priority_distribution(&queue, &stats.queue_name).await;
379
380                queue_stats.push(QueueStats {
381                    name: stats.queue_name.clone(),
382                    pending: stats.pending_count,
383                    running: stats.running_count,
384                    completed_total: stats.completed_count,
385                    failed_total: stats.dead_count + stats.timed_out_count,
386                    dead_total: stats.dead_count,
387                    throughput_per_minute: stats.statistics.throughput_per_minute,
388                    avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
389                    error_rate: stats.statistics.error_rate,
390                    oldest_pending_age_seconds,
391                    priority_distribution,
392                });
393            }
394
395            // Generate realistic data based on actual statistics
396            let hourly_trends = generate_hourly_trends(&queue, &all_stats).await;
397            let error_patterns = generate_error_patterns(&queue, &all_stats).await;
398            let performance_metrics = calculate_performance_metrics(&all_stats);
399
400            // Generate overview from the stats
401            let overview = generate_overview_from_stats(&all_stats);
402
403            let detailed = DetailedStats {
404                overview,
405                queue_stats,
406                hourly_trends,
407                error_patterns,
408                performance_metrics,
409            };
410
411            Ok(warp::reply::json(&ApiResponse::success(detailed)))
412        }
413        Err(e) => {
414            let response =
415                ApiResponse::<()>::error(format!("Failed to get detailed statistics: {}", e));
416            Ok(warp::reply::json(&response))
417        }
418    }
419}
420
421/// Handler for trend analysis
422async fn trends_handler<T>(queue: Arc<T>, query: StatsQuery) -> Result<impl Reply, warp::Rejection>
423where
424    T: DatabaseQueue + Send + Sync,
425{
426    // For now, return mock trend data
427    // In a real implementation, this would query historical data based on the time range
428    let _ = (queue, query);
429
430    let trends: Vec<HourlyTrend> = (0..24)
431        .map(|hour| HourlyTrend {
432            hour: chrono::Utc::now() - chrono::Duration::hours(23 - hour),
433            completed: (hour * 10 + 50) as u64,
434            failed: (hour / 4) as u64,
435            throughput: 5.0 + (hour as f64 * 0.5),
436            avg_processing_time_ms: 100.0 + (hour as f64 * 2.0),
437            error_rate: 0.01 + (hour as f64 * 0.001),
438        })
439        .collect();
440
441    Ok(warp::reply::json(&ApiResponse::success(trends)))
442}
443
444/// Handler for system health check
445async fn health_handler<T>(queue: Arc<T>) -> Result<impl Reply, warp::Rejection>
446where
447    T: DatabaseQueue + Send + Sync,
448{
449    match queue.get_all_queue_stats().await {
450        Ok(all_stats) => {
451            let health = assess_system_health(&all_stats);
452            Ok(warp::reply::json(&ApiResponse::success(health)))
453        }
454        Err(e) => {
455            let health = SystemHealth {
456                status: "critical".to_string(),
457                database_healthy: false,
458                high_error_rate: false,
459                queue_backlog: false,
460                slow_processing: false,
461                alerts: vec![SystemAlert {
462                    severity: "critical".to_string(),
463                    message: format!("Database connection failed: {}", e),
464                    queue: None,
465                    metric: Some("database_connectivity".to_string()),
466                    value: None,
467                    threshold: None,
468                    timestamp: chrono::Utc::now(),
469                }],
470            };
471            Ok(warp::reply::json(&ApiResponse::success(health)))
472        }
473    }
474}
475
476/// Assess overall system health based on queue statistics
477fn assess_system_health(stats: &[hammerwork::stats::QueueStats]) -> SystemHealth {
478    let mut alerts = Vec::new();
479    let mut high_error_rate = false;
480    let mut queue_backlog = false;
481    let mut slow_processing = false;
482
483    for stat in stats {
484        // Check error rate
485        if stat.statistics.error_rate > 0.1 {
486            // > 10% error rate
487            high_error_rate = true;
488            alerts.push(SystemAlert {
489                severity: "warning".to_string(),
490                message: format!("High error rate in queue '{}'", stat.queue_name),
491                queue: Some(stat.queue_name.clone()),
492                metric: Some("error_rate".to_string()),
493                value: Some(stat.statistics.error_rate),
494                threshold: Some(0.1),
495                timestamp: chrono::Utc::now(),
496            });
497        }
498
499        // Check queue backlog
500        if stat.pending_count > 1000 {
501            queue_backlog = true;
502            alerts.push(SystemAlert {
503                severity: "warning".to_string(),
504                message: format!("Large backlog in queue '{}'", stat.queue_name),
505                queue: Some(stat.queue_name.clone()),
506                metric: Some("pending_count".to_string()),
507                value: Some(stat.pending_count as f64),
508                threshold: Some(1000.0),
509                timestamp: chrono::Utc::now(),
510            });
511        }
512
513        // Check processing time
514        if stat.statistics.avg_processing_time_ms > 30000.0 {
515            // > 30 seconds
516            slow_processing = true;
517            alerts.push(SystemAlert {
518                severity: "info".to_string(),
519                message: format!("Slow processing in queue '{}'", stat.queue_name),
520                queue: Some(stat.queue_name.clone()),
521                metric: Some("avg_processing_time_ms".to_string()),
522                value: Some(stat.statistics.avg_processing_time_ms),
523                threshold: Some(30000.0),
524                timestamp: chrono::Utc::now(),
525            });
526        }
527    }
528
529    let status = if alerts.iter().any(|a| a.severity == "critical") {
530        "critical"
531    } else if alerts.iter().any(|a| a.severity == "warning") {
532        "degraded"
533    } else {
534        "healthy"
535    };
536
537    SystemHealth {
538        status: status.to_string(),
539        database_healthy: true, // If we got here, DB is accessible
540        high_error_rate,
541        queue_backlog,
542        slow_processing,
543        alerts,
544    }
545}
546
547/// Generate system overview from queue statistics
548fn generate_overview_from_stats(stats: &[hammerwork::stats::QueueStats]) -> SystemOverview {
549    let mut total_pending = 0;
550    let mut total_running = 0;
551    let mut total_completed = 0;
552    let mut total_failed = 0;
553    let mut total_dead = 0;
554    let mut total_throughput = 0.0;
555    let mut total_processing_time = 0.0;
556    let queue_count = stats.len();
557
558    for stat in stats {
559        total_pending += stat.pending_count;
560        total_running += stat.running_count;
561        total_completed += stat.completed_count;
562        total_failed += stat.dead_count + stat.timed_out_count;
563        total_dead += stat.dead_count;
564        total_throughput += stat.statistics.throughput_per_minute;
565        total_processing_time += stat.statistics.avg_processing_time_ms;
566    }
567
568    let avg_processing_time = if queue_count > 0 {
569        total_processing_time / queue_count as f64
570    } else {
571        0.0
572    };
573
574    let total_jobs = total_pending + total_running + total_completed + total_failed;
575    let overall_error_rate = if total_jobs > 0 {
576        total_failed as f64 / total_jobs as f64
577    } else {
578        0.0
579    };
580
581    let health = assess_system_health(stats);
582
583    SystemOverview {
584        total_queues: queue_count as u32,
585        total_jobs,
586        pending_jobs: total_pending,
587        running_jobs: total_running,
588        completed_jobs: total_completed,
589        failed_jobs: total_failed,
590        dead_jobs: total_dead,
591        overall_throughput: total_throughput,
592        overall_error_rate,
593        avg_processing_time_ms: avg_processing_time,
594        system_health: health,
595        uptime_seconds: 0,
596        last_updated: chrono::Utc::now(),
597    }
598}
599
600/// Calculate the oldest pending job age in seconds for a queue
601async fn calculate_oldest_pending_age<T>(queue: &Arc<T>, queue_name: &str) -> Option<u64>
602where
603    T: DatabaseQueue + Send + Sync,
604{
605    // Get ready jobs (pending jobs) and find the oldest
606    match queue.get_ready_jobs(queue_name, 100).await {
607        Ok(jobs) => {
608            let now = chrono::Utc::now();
609            jobs.iter()
610                .filter(|job| matches!(job.status, hammerwork::job::JobStatus::Pending))
611                .map(|job| {
612                    let age = now - job.created_at;
613                    age.num_seconds() as u64
614                })
615                .max()
616        }
617        Err(_) => None,
618    }
619}
620
621/// Get priority distribution from priority stats for a queue
622async fn get_priority_distribution<T>(queue: &Arc<T>, queue_name: &str) -> HashMap<String, f32>
623where
624    T: DatabaseQueue + Send + Sync,
625{
626    match queue.get_priority_stats(queue_name).await {
627        Ok(priority_stats) => priority_stats
628            .priority_distribution
629            .into_iter()
630            .map(|(priority, percentage)| {
631                let priority_name = match priority {
632                    hammerwork::priority::JobPriority::Background => "background",
633                    hammerwork::priority::JobPriority::Low => "low",
634                    hammerwork::priority::JobPriority::Normal => "normal",
635                    hammerwork::priority::JobPriority::High => "high",
636                    hammerwork::priority::JobPriority::Critical => "critical",
637                };
638                (priority_name.to_string(), percentage)
639            })
640            .collect(),
641        Err(_) => HashMap::new(),
642    }
643}
644
645/// Generate hourly trends from queue statistics
646async fn generate_hourly_trends<T>(
647    queue: &Arc<T>,
648    all_stats: &[hammerwork::stats::QueueStats],
649) -> Vec<HourlyTrend>
650where
651    T: DatabaseQueue + Send + Sync,
652{
653    let now = chrono::Utc::now();
654    let mut trends = Vec::new();
655
656    // Generate trends for the last 24 hours using actual database queries
657    for i in 0..24 {
658        let hour_start = now - chrono::Duration::hours(23 - i);
659        let hour_end = hour_start + chrono::Duration::hours(1);
660
661        let mut hour_completed = 0u64;
662        let mut hour_failed = 0u64;
663        let mut hour_processing_times = Vec::new();
664
665        // Get completed jobs for this specific hour across all queues
666        if let Ok(completed_jobs) = queue
667            .get_jobs_completed_in_range(None, hour_start, hour_end, Some(1000))
668            .await
669        {
670            hour_completed = completed_jobs.len() as u64;
671
672            // Collect processing times for completed jobs
673            for job in completed_jobs {
674                if let (Some(started_at), Some(completed_at)) = (job.started_at, job.completed_at) {
675                    let processing_time = (completed_at - started_at).num_milliseconds() as f64;
676                    hour_processing_times.push(processing_time);
677                }
678            }
679        }
680
681        // Get failed jobs for this hour using error frequencies
682        // Since we don't have a direct method for failed jobs in time range,
683        // we'll estimate based on error frequencies for this hour
684        if let Ok(error_frequencies) = queue.get_error_frequencies(None, hour_start).await {
685            // This gives us errors since hour_start, so we need to estimate for just this hour
686            let total_errors_since_start = error_frequencies.values().sum::<u64>();
687
688            // For recent hours, use a more accurate estimate
689            if i < 3 {
690                // For the last 3 hours, assume more recent distribution
691                hour_failed = total_errors_since_start / ((i + 1) as u64).max(1);
692            } else {
693                // For older hours, use a smaller fraction
694                hour_failed = total_errors_since_start / 24; // Rough hourly average
695            }
696        }
697
698        // Calculate throughput (jobs per second for this hour)
699        let hour_throughput = (hour_completed + hour_failed) as f64 / 3600.0;
700
701        // Calculate average processing time for this hour
702        let avg_processing_time_ms = if !hour_processing_times.is_empty() {
703            hour_processing_times.iter().sum::<f64>() / hour_processing_times.len() as f64
704        } else {
705            // If no processing times available, use overall average from stats
706            if !all_stats.is_empty() {
707                all_stats
708                    .iter()
709                    .map(|s| s.statistics.avg_processing_time_ms)
710                    .sum::<f64>()
711                    / all_stats.len() as f64
712            } else {
713                0.0
714            }
715        };
716
717        let error_rate = if (hour_completed + hour_failed) > 0 {
718            hour_failed as f64 / (hour_completed + hour_failed) as f64
719        } else {
720            0.0
721        };
722
723        trends.push(HourlyTrend {
724            hour: hour_start,
725            completed: hour_completed,
726            failed: hour_failed,
727            throughput: hour_throughput,
728            avg_processing_time_ms,
729            error_rate,
730        });
731    }
732
733    trends
734}
735
736/// Generate error patterns from queue statistics
737async fn generate_error_patterns<T>(
738    queue: &Arc<T>,
739    all_stats: &[hammerwork::stats::QueueStats],
740) -> Vec<ErrorPattern>
741where
742    T: DatabaseQueue + Send + Sync,
743{
744    let mut error_patterns = Vec::new();
745    let total_errors = all_stats.iter().map(|s| s.dead_count).sum::<u64>();
746
747    if total_errors == 0 {
748        return error_patterns;
749    }
750
751    // Collect error messages from dead jobs across all queues
752    let mut error_messages = Vec::new();
753    for stats in all_stats {
754        if let Ok(dead_jobs) = queue
755            .get_dead_jobs_by_queue(&stats.queue_name, Some(20), Some(0))
756            .await
757        {
758            for job in dead_jobs {
759                if let Some(error_msg) = job.error_message {
760                    error_messages.push((error_msg, job.failed_at.unwrap_or(job.created_at)));
761                }
762            }
763        }
764    }
765
766    // Group similar error messages
767    let mut error_counts = std::collections::HashMap::new();
768    let mut error_first_seen = std::collections::HashMap::new();
769
770    for (error_msg, failed_at) in error_messages {
771        let error_type = extract_error_type(&error_msg);
772        let count = error_counts.entry(error_type.clone()).or_insert(0);
773        *count += 1;
774
775        error_first_seen
776            .entry(error_type.clone())
777            .or_insert_with(|| (error_msg, failed_at));
778    }
779
780    // Convert to error patterns
781    for (error_type, count) in error_counts {
782        let percentage = (count as f64 / total_errors as f64) * 100.0;
783        let (sample_message, first_seen) = error_first_seen.get(&error_type).unwrap();
784
785        error_patterns.push(ErrorPattern {
786            error_type: error_type.clone(),
787            count,
788            percentage,
789            sample_message: sample_message.clone(),
790            first_seen: *first_seen,
791            last_seen: chrono::Utc::now(), // In a real implementation, track actual last seen
792            affected_queues: vec![error_type], // In a real implementation, track actual affected queues
793        });
794    }
795
796    // Sort by count descending
797    error_patterns.sort_by(|a, b| b.count.cmp(&a.count));
798
799    error_patterns
800}
801
802/// Calculate performance metrics from queue statistics
803fn calculate_performance_metrics(
804    all_stats: &[hammerwork::stats::QueueStats],
805) -> PerformanceMetrics {
806    let total_jobs = all_stats
807        .iter()
808        .map(|s| s.pending_count + s.running_count + s.completed_count + s.dead_count)
809        .sum::<u64>();
810    let total_throughput = all_stats
811        .iter()
812        .map(|s| s.statistics.throughput_per_minute)
813        .sum::<f64>();
814    let avg_processing_time = if !all_stats.is_empty() {
815        all_stats
816            .iter()
817            .map(|s| s.statistics.avg_processing_time_ms)
818            .sum::<f64>()
819            / all_stats.len() as f64
820    } else {
821        0.0
822    };
823
824    let average_queue_depth = if !all_stats.is_empty() {
825        all_stats
826            .iter()
827            .map(|s| s.pending_count as f64)
828            .sum::<f64>()
829            / all_stats.len() as f64
830    } else {
831        0.0
832    };
833
834    // Estimate database response time based on processing time
835    let database_response_time_ms = if avg_processing_time > 0.0 {
836        (avg_processing_time * 0.1).max(1.0).min(100.0) // Assume DB is 10% of processing time
837    } else {
838        2.0
839    };
840
841    PerformanceMetrics {
842        database_response_time_ms,
843        average_queue_depth,
844        jobs_per_second: total_throughput / 60.0, // Convert from per minute to per second
845        memory_usage_mb: None,                    // Would need system monitoring
846        cpu_usage_percent: None,                  // Would need system monitoring
847        active_workers: all_stats.iter().map(|s| s.running_count as u32).sum(),
848        worker_utilization: if total_jobs > 0 {
849            all_stats.iter().map(|s| s.running_count).sum::<u64>() as f64 / total_jobs as f64
850        } else {
851            0.0
852        },
853    }
854}
855
856/// Extract error type from error message for grouping
857fn extract_error_type(error_msg: &str) -> String {
858    // Simple error classification logic
859    if error_msg.contains("timeout") || error_msg.contains("Timeout") {
860        "Timeout Error".to_string()
861    } else if error_msg.contains("connection") || error_msg.contains("Connection") {
862        "Connection Error".to_string()
863    } else if error_msg.contains("parse")
864        || error_msg.contains("Parse")
865        || error_msg.contains("invalid")
866    {
867        "Parse Error".to_string()
868    } else if error_msg.contains("permission")
869        || error_msg.contains("Permission")
870        || error_msg.contains("forbidden")
871    {
872        "Permission Error".to_string()
873    } else if error_msg.contains("not found") || error_msg.contains("Not Found") {
874        "Not Found Error".to_string()
875    } else {
876        // Use first word of error message as type
877        error_msg
878            .split_whitespace()
879            .next()
880            .map(|s| format!("{} Error", s))
881            .unwrap_or_else(|| "Unknown Error".to_string())
882    }
883}
884
885#[cfg(test)]
886mod tests {
887    use super::*;
888
889    #[test]
890    fn test_stats_query_deserialization() {
891        let json = r#"{
892            "time_range": {
893                "start": "2024-01-01T00:00:00Z",
894                "end": "2024-01-02T00:00:00Z"
895            },
896            "queues": ["email", "data-processing"],
897            "granularity": "hour"
898        }"#;
899
900        let query: StatsQuery = serde_json::from_str(json).unwrap();
901        assert!(query.time_range.is_some());
902        assert_eq!(query.queues.as_ref().unwrap().len(), 2);
903        assert_eq!(query.granularity, Some("hour".to_string()));
904    }
905
906    #[test]
907    fn test_system_alert_serialization() {
908        let alert = SystemAlert {
909            severity: "warning".to_string(),
910            message: "High error rate detected".to_string(),
911            queue: Some("email".to_string()),
912            metric: Some("error_rate".to_string()),
913            value: Some(0.15),
914            threshold: Some(0.1),
915            timestamp: chrono::Utc::now(),
916        };
917
918        let json = serde_json::to_string(&alert).unwrap();
919        assert!(json.contains("warning"));
920        assert!(json.contains("High error rate"));
921    }
922}