hammerwork_web/api/
stats.rs

1//! Statistics and monitoring API endpoints.
2//!
3//! This module provides comprehensive monitoring and analytics endpoints for tracking
4//! system health, performance metrics, and operational insights across all job queues.
5//!
6//! # API Endpoints
7//!
8//! - `GET /api/stats/overview` - System overview with key metrics
9//! - `GET /api/stats/detailed` - Detailed statistics with historical trends
10//! - `GET /api/stats/trends` - Hourly/daily trend analysis
11//! - `GET /api/stats/health` - System health check and alerts
12//!
13//! # Examples
14//!
15//! ## System Overview
16//!
17//! ```rust
18//! use hammerwork_web::api::stats::{SystemOverview, SystemHealth, SystemAlert};
19//! use chrono::Utc;
20//!
21//! let overview = SystemOverview {
22//!     total_queues: 5,
23//!     total_jobs: 10000,
24//!     pending_jobs: 50,
25//!     running_jobs: 10,
26//!     completed_jobs: 9800,
27//!     failed_jobs: 125,
28//!     dead_jobs: 15,
29//!     overall_throughput: 150.5,
30//!     overall_error_rate: 0.0125,
31//!     avg_processing_time_ms: 250.0,
32//!     system_health: SystemHealth {
33//!         status: "healthy".to_string(),
34//!         database_healthy: true,
35//!         high_error_rate: false,
36//!         queue_backlog: false,
37//!         slow_processing: false,
38//!         alerts: vec![],
39//!     },
40//!     uptime_seconds: 86400,
41//!     last_updated: Utc::now(),
42//! };
43//!
44//! assert_eq!(overview.total_queues, 5);
45//! assert_eq!(overview.overall_error_rate, 0.0125);
46//! assert_eq!(overview.system_health.status, "healthy");
47//! ```
48//!
49//! ## Statistics Queries
50//!
51//! ```rust
52//! use hammerwork_web::api::stats::{StatsQuery, TimeRange};
53//! use chrono::{Utc, Duration};
54//!
55//! let time_range = TimeRange {
56//!     start: Utc::now() - Duration::hours(24),
57//!     end: Utc::now(),
58//! };
59//!
60//! let query = StatsQuery {
61//!     time_range: Some(time_range),
62//!     queues: Some(vec!["email".to_string(), "notifications".to_string()]),
63//!     granularity: Some("hour".to_string()),
64//! };
65//!
66//! assert!(query.time_range.is_some());
67//! assert_eq!(query.queues.as_ref().unwrap().len(), 2);
68//! assert_eq!(query.granularity, Some("hour".to_string()));
69//! ```
70//!
71//! ## System Alerts
72//!
73//! ```rust
74//! use hammerwork_web::api::stats::SystemAlert;
75//! use chrono::Utc;
76//!
77//! let alert = SystemAlert {
78//!     severity: "warning".to_string(),
79//!     message: "Queue backlog detected".to_string(),
80//!     queue: Some("image_processing".to_string()),
81//!     metric: Some("pending_count".to_string()),
82//!     value: Some(1500.0),
83//!     threshold: Some(1000.0),
84//!     timestamp: Utc::now(),
85//! };
86//!
87//! assert_eq!(alert.severity, "warning");
88//! assert_eq!(alert.queue, Some("image_processing".to_string()));
89//! assert_eq!(alert.value, Some(1500.0));
90//! ```
91//!
92//! ## Performance Metrics
93//!
94//! ```rust
95//! use hammerwork_web::api::stats::PerformanceMetrics;
96//!
97//! let metrics = PerformanceMetrics {
98//!     database_response_time_ms: 5.2,
99//!     average_queue_depth: 15.5,
100//!     jobs_per_second: 8.3,
101//!     memory_usage_mb: Some(512.0),
102//!     cpu_usage_percent: Some(45.2),
103//!     active_workers: 12,
104//!     worker_utilization: 0.75,
105//! };
106//!
107//! assert_eq!(metrics.database_response_time_ms, 5.2);
108//! assert_eq!(metrics.active_workers, 12);
109//! assert_eq!(metrics.worker_utilization, 0.75);
110//! ```
111
112use super::ApiResponse;
113use hammerwork::queue::DatabaseQueue;
114use serde::{Deserialize, Serialize};
115use std::collections::HashMap;
116use std::sync::Arc;
117use warp::{Filter, Reply};
118
119/// System overview statistics
120#[derive(Debug, Serialize)]
121pub struct SystemOverview {
122    pub total_queues: u32,
123    pub total_jobs: u64,
124    pub pending_jobs: u64,
125    pub running_jobs: u64,
126    pub completed_jobs: u64,
127    pub failed_jobs: u64,
128    pub dead_jobs: u64,
129    pub overall_throughput: f64,
130    pub overall_error_rate: f64,
131    pub avg_processing_time_ms: f64,
132    pub system_health: SystemHealth,
133    pub uptime_seconds: u64,
134    pub last_updated: chrono::DateTime<chrono::Utc>,
135}
136
137/// System health status
138#[derive(Debug, Serialize)]
139pub struct SystemHealth {
140    pub status: String, // "healthy", "degraded", "critical"
141    pub database_healthy: bool,
142    pub high_error_rate: bool,
143    pub queue_backlog: bool,
144    pub slow_processing: bool,
145    pub alerts: Vec<SystemAlert>,
146}
147
148/// System alert
149#[derive(Debug, Serialize)]
150pub struct SystemAlert {
151    pub severity: String, // "info", "warning", "error", "critical"
152    pub message: String,
153    pub queue: Option<String>,
154    pub metric: Option<String>,
155    pub value: Option<f64>,
156    pub threshold: Option<f64>,
157    pub timestamp: chrono::DateTime<chrono::Utc>,
158}
159
160/// Detailed statistics for monitoring
161#[derive(Debug, Serialize)]
162pub struct DetailedStats {
163    pub overview: SystemOverview,
164    pub queue_stats: Vec<QueueStats>,
165    pub hourly_trends: Vec<HourlyTrend>,
166    pub error_patterns: Vec<ErrorPattern>,
167    pub performance_metrics: PerformanceMetrics,
168}
169
170/// Queue statistics
171#[derive(Debug, Serialize)]
172pub struct QueueStats {
173    pub name: String,
174    pub pending: u64,
175    pub running: u64,
176    pub completed_total: u64,
177    pub failed_total: u64,
178    pub dead_total: u64,
179    pub throughput_per_minute: f64,
180    pub avg_processing_time_ms: f64,
181    pub error_rate: f64,
182    pub oldest_pending_age_seconds: Option<u64>,
183    pub priority_distribution: HashMap<String, u64>,
184}
185
186/// Hourly trend data
187#[derive(Debug, Serialize)]
188pub struct HourlyTrend {
189    pub hour: chrono::DateTime<chrono::Utc>,
190    pub completed: u64,
191    pub failed: u64,
192    pub throughput: f64,
193    pub avg_processing_time_ms: f64,
194    pub error_rate: f64,
195}
196
197/// Error pattern analysis
198#[derive(Debug, Serialize)]
199pub struct ErrorPattern {
200    pub error_type: String,
201    pub count: u64,
202    pub percentage: f64,
203    pub sample_message: String,
204    pub first_seen: chrono::DateTime<chrono::Utc>,
205    pub last_seen: chrono::DateTime<chrono::Utc>,
206    pub affected_queues: Vec<String>,
207}
208
209/// Performance metrics
210#[derive(Debug, Serialize)]
211pub struct PerformanceMetrics {
212    pub database_response_time_ms: f64,
213    pub average_queue_depth: f64,
214    pub jobs_per_second: f64,
215    pub memory_usage_mb: Option<f64>,
216    pub cpu_usage_percent: Option<f64>,
217    pub active_workers: u32,
218    pub worker_utilization: f64,
219}
220
221/// Time range for statistics queries
222#[derive(Debug, Deserialize)]
223pub struct TimeRange {
224    pub start: chrono::DateTime<chrono::Utc>,
225    pub end: chrono::DateTime<chrono::Utc>,
226}
227
228/// Statistics query parameters
229#[derive(Debug, Deserialize)]
230pub struct StatsQuery {
231    pub time_range: Option<TimeRange>,
232    pub queues: Option<Vec<String>>,
233    pub granularity: Option<String>, // "hour", "day", "week"
234}
235
236/// Create statistics routes
237pub fn routes<T>(
238    queue: Arc<T>,
239) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
240where
241    T: DatabaseQueue + Send + Sync + 'static,
242{
243    let queue_filter = warp::any().map(move || queue.clone());
244
245    let overview = warp::path("stats")
246        .and(warp::path("overview"))
247        .and(warp::path::end())
248        .and(warp::get())
249        .and(queue_filter.clone())
250        .and_then(overview_handler);
251
252    let detailed = warp::path("stats")
253        .and(warp::path("detailed"))
254        .and(warp::path::end())
255        .and(warp::get())
256        .and(queue_filter.clone())
257        .and(warp::query::<StatsQuery>())
258        .and_then(detailed_stats_handler);
259
260    let trends = warp::path("stats")
261        .and(warp::path("trends"))
262        .and(warp::path::end())
263        .and(warp::get())
264        .and(queue_filter.clone())
265        .and(warp::query::<StatsQuery>())
266        .and_then(trends_handler);
267
268    let health = warp::path("stats")
269        .and(warp::path("health"))
270        .and(warp::path::end())
271        .and(warp::get())
272        .and(queue_filter)
273        .and_then(health_handler);
274
275    overview.or(detailed).or(trends).or(health)
276}
277
278/// Handler for system overview statistics
279async fn overview_handler<T>(queue: Arc<T>) -> Result<impl Reply, warp::Rejection>
280where
281    T: DatabaseQueue + Send + Sync,
282{
283    match queue.get_all_queue_stats().await {
284        Ok(all_stats) => {
285            let mut total_pending = 0;
286            let mut total_running = 0;
287            let mut total_completed = 0;
288            let mut total_failed = 0;
289            let mut total_dead = 0;
290            let mut total_throughput = 0.0;
291            let mut total_processing_time = 0.0;
292            let mut queue_count = 0;
293
294            for stats in &all_stats {
295                total_pending += stats.pending_count;
296                total_running += stats.running_count;
297                total_completed += stats.completed_count;
298                total_failed += stats.dead_count + stats.timed_out_count;
299                total_dead += stats.dead_count;
300                total_throughput += stats.statistics.throughput_per_minute;
301                total_processing_time += stats.statistics.avg_processing_time_ms;
302                queue_count += 1;
303            }
304
305            let avg_processing_time = if queue_count > 0 {
306                total_processing_time / queue_count as f64
307            } else {
308                0.0
309            };
310
311            let total_jobs = total_pending + total_running + total_completed + total_failed;
312            let overall_error_rate = if total_jobs > 0 {
313                total_failed as f64 / total_jobs as f64
314            } else {
315                0.0
316            };
317
318            // Generate system health assessment
319            let health = assess_system_health(&all_stats);
320
321            let overview = SystemOverview {
322                total_queues: queue_count,
323                total_jobs,
324                pending_jobs: total_pending,
325                running_jobs: total_running,
326                completed_jobs: total_completed,
327                failed_jobs: total_failed,
328                dead_jobs: total_dead,
329                overall_throughput: total_throughput,
330                overall_error_rate,
331                avg_processing_time_ms: avg_processing_time,
332                system_health: health,
333                uptime_seconds: 0, // TODO: Track actual uptime
334                last_updated: chrono::Utc::now(),
335            };
336
337            Ok(warp::reply::json(&ApiResponse::success(overview)))
338        }
339        Err(e) => {
340            let response = ApiResponse::<()>::error(format!("Failed to get statistics: {}", e));
341            Ok(warp::reply::json(&response))
342        }
343    }
344}
345
346/// Handler for detailed statistics
347async fn detailed_stats_handler<T>(
348    queue: Arc<T>,
349    query: StatsQuery,
350) -> Result<impl Reply, warp::Rejection>
351where
352    T: DatabaseQueue + Send + Sync,
353{
354    // For now, return basic stats. In a real implementation, this would
355    // use the time_range and other query parameters to fetch historical data
356    let _ = query;
357
358    match queue.get_all_queue_stats().await {
359        Ok(all_stats) => {
360            // Convert hammerwork stats to our API format
361            let queue_stats: Vec<QueueStats> = all_stats
362                .iter()
363                .map(|stats| QueueStats {
364                    name: stats.queue_name.clone(),
365                    pending: stats.pending_count,
366                    running: stats.running_count,
367                    completed_total: stats.completed_count,
368                    failed_total: stats.dead_count + stats.timed_out_count,
369                    dead_total: stats.dead_count,
370                    throughput_per_minute: stats.statistics.throughput_per_minute,
371                    avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
372                    error_rate: stats.statistics.error_rate,
373                    oldest_pending_age_seconds: None, // TODO: Calculate from database
374                    priority_distribution: HashMap::new(), // TODO: Get from priority stats
375                })
376                .collect();
377
378            // Mock data for other fields (TODO: implement properly)
379            let hourly_trends = Vec::new();
380            let error_patterns = Vec::new();
381            let performance_metrics = PerformanceMetrics {
382                database_response_time_ms: 5.0, // Mock value
383                average_queue_depth: 10.5,      // Mock value
384                jobs_per_second: 2.5,           // Mock value
385                memory_usage_mb: None,
386                cpu_usage_percent: None,
387                active_workers: 4,        // Mock value
388                worker_utilization: 0.75, // Mock value
389            };
390
391            // Generate overview from the stats
392            let overview = generate_overview_from_stats(&all_stats);
393
394            let detailed = DetailedStats {
395                overview,
396                queue_stats,
397                hourly_trends,
398                error_patterns,
399                performance_metrics,
400            };
401
402            Ok(warp::reply::json(&ApiResponse::success(detailed)))
403        }
404        Err(e) => {
405            let response =
406                ApiResponse::<()>::error(format!("Failed to get detailed statistics: {}", e));
407            Ok(warp::reply::json(&response))
408        }
409    }
410}
411
412/// Handler for trend analysis
413async fn trends_handler<T>(queue: Arc<T>, query: StatsQuery) -> Result<impl Reply, warp::Rejection>
414where
415    T: DatabaseQueue + Send + Sync,
416{
417    // For now, return mock trend data
418    // In a real implementation, this would query historical data based on the time range
419    let _ = (queue, query);
420
421    let trends: Vec<HourlyTrend> = (0..24)
422        .map(|hour| HourlyTrend {
423            hour: chrono::Utc::now() - chrono::Duration::hours(23 - hour),
424            completed: (hour * 10 + 50) as u64,
425            failed: (hour / 4) as u64,
426            throughput: 5.0 + (hour as f64 * 0.5),
427            avg_processing_time_ms: 100.0 + (hour as f64 * 2.0),
428            error_rate: 0.01 + (hour as f64 * 0.001),
429        })
430        .collect();
431
432    Ok(warp::reply::json(&ApiResponse::success(trends)))
433}
434
435/// Handler for system health check
436async fn health_handler<T>(queue: Arc<T>) -> Result<impl Reply, warp::Rejection>
437where
438    T: DatabaseQueue + Send + Sync,
439{
440    match queue.get_all_queue_stats().await {
441        Ok(all_stats) => {
442            let health = assess_system_health(&all_stats);
443            Ok(warp::reply::json(&ApiResponse::success(health)))
444        }
445        Err(e) => {
446            let health = SystemHealth {
447                status: "critical".to_string(),
448                database_healthy: false,
449                high_error_rate: false,
450                queue_backlog: false,
451                slow_processing: false,
452                alerts: vec![SystemAlert {
453                    severity: "critical".to_string(),
454                    message: format!("Database connection failed: {}", e),
455                    queue: None,
456                    metric: Some("database_connectivity".to_string()),
457                    value: None,
458                    threshold: None,
459                    timestamp: chrono::Utc::now(),
460                }],
461            };
462            Ok(warp::reply::json(&ApiResponse::success(health)))
463        }
464    }
465}
466
467/// Assess overall system health based on queue statistics
468fn assess_system_health(stats: &[hammerwork::stats::QueueStats]) -> SystemHealth {
469    let mut alerts = Vec::new();
470    let mut high_error_rate = false;
471    let mut queue_backlog = false;
472    let mut slow_processing = false;
473
474    for stat in stats {
475        // Check error rate
476        if stat.statistics.error_rate > 0.1 {
477            // > 10% error rate
478            high_error_rate = true;
479            alerts.push(SystemAlert {
480                severity: "warning".to_string(),
481                message: format!("High error rate in queue '{}'", stat.queue_name),
482                queue: Some(stat.queue_name.clone()),
483                metric: Some("error_rate".to_string()),
484                value: Some(stat.statistics.error_rate),
485                threshold: Some(0.1),
486                timestamp: chrono::Utc::now(),
487            });
488        }
489
490        // Check queue backlog
491        if stat.pending_count > 1000 {
492            queue_backlog = true;
493            alerts.push(SystemAlert {
494                severity: "warning".to_string(),
495                message: format!("Large backlog in queue '{}'", stat.queue_name),
496                queue: Some(stat.queue_name.clone()),
497                metric: Some("pending_count".to_string()),
498                value: Some(stat.pending_count as f64),
499                threshold: Some(1000.0),
500                timestamp: chrono::Utc::now(),
501            });
502        }
503
504        // Check processing time
505        if stat.statistics.avg_processing_time_ms > 30000.0 {
506            // > 30 seconds
507            slow_processing = true;
508            alerts.push(SystemAlert {
509                severity: "info".to_string(),
510                message: format!("Slow processing in queue '{}'", stat.queue_name),
511                queue: Some(stat.queue_name.clone()),
512                metric: Some("avg_processing_time_ms".to_string()),
513                value: Some(stat.statistics.avg_processing_time_ms),
514                threshold: Some(30000.0),
515                timestamp: chrono::Utc::now(),
516            });
517        }
518    }
519
520    let status = if alerts.iter().any(|a| a.severity == "critical") {
521        "critical"
522    } else if alerts.iter().any(|a| a.severity == "warning") {
523        "degraded"
524    } else {
525        "healthy"
526    };
527
528    SystemHealth {
529        status: status.to_string(),
530        database_healthy: true, // If we got here, DB is accessible
531        high_error_rate,
532        queue_backlog,
533        slow_processing,
534        alerts,
535    }
536}
537
538/// Generate system overview from queue statistics
539fn generate_overview_from_stats(stats: &[hammerwork::stats::QueueStats]) -> SystemOverview {
540    let mut total_pending = 0;
541    let mut total_running = 0;
542    let mut total_completed = 0;
543    let mut total_failed = 0;
544    let mut total_dead = 0;
545    let mut total_throughput = 0.0;
546    let mut total_processing_time = 0.0;
547    let queue_count = stats.len();
548
549    for stat in stats {
550        total_pending += stat.pending_count;
551        total_running += stat.running_count;
552        total_completed += stat.completed_count;
553        total_failed += stat.dead_count + stat.timed_out_count;
554        total_dead += stat.dead_count;
555        total_throughput += stat.statistics.throughput_per_minute;
556        total_processing_time += stat.statistics.avg_processing_time_ms;
557    }
558
559    let avg_processing_time = if queue_count > 0 {
560        total_processing_time / queue_count as f64
561    } else {
562        0.0
563    };
564
565    let total_jobs = total_pending + total_running + total_completed + total_failed;
566    let overall_error_rate = if total_jobs > 0 {
567        total_failed as f64 / total_jobs as f64
568    } else {
569        0.0
570    };
571
572    let health = assess_system_health(stats);
573
574    SystemOverview {
575        total_queues: queue_count as u32,
576        total_jobs,
577        pending_jobs: total_pending,
578        running_jobs: total_running,
579        completed_jobs: total_completed,
580        failed_jobs: total_failed,
581        dead_jobs: total_dead,
582        overall_throughput: total_throughput,
583        overall_error_rate,
584        avg_processing_time_ms: avg_processing_time,
585        system_health: health,
586        uptime_seconds: 0,
587        last_updated: chrono::Utc::now(),
588    }
589}
590
591#[cfg(test)]
592mod tests {
593    use super::*;
594
595    #[test]
596    fn test_stats_query_deserialization() {
597        let json = r#"{
598            "time_range": {
599                "start": "2024-01-01T00:00:00Z",
600                "end": "2024-01-02T00:00:00Z"
601            },
602            "queues": ["email", "data-processing"],
603            "granularity": "hour"
604        }"#;
605
606        let query: StatsQuery = serde_json::from_str(json).unwrap();
607        assert!(query.time_range.is_some());
608        assert_eq!(query.queues.as_ref().unwrap().len(), 2);
609        assert_eq!(query.granularity, Some("hour".to_string()));
610    }
611
612    #[test]
613    fn test_system_alert_serialization() {
614        let alert = SystemAlert {
615            severity: "warning".to_string(),
616            message: "High error rate detected".to_string(),
617            queue: Some("email".to_string()),
618            metric: Some("error_rate".to_string()),
619            value: Some(0.15),
620            threshold: Some(0.1),
621            timestamp: chrono::Utc::now(),
622        };
623
624        let json = serde_json::to_string(&alert).unwrap();
625        assert!(json.contains("warning"));
626        assert!(json.contains("High error rate"));
627    }
628}