1use super::ApiResponse;
113use hammerwork::queue::DatabaseQueue;
114use serde::{Deserialize, Serialize};
115use std::collections::HashMap;
116use std::sync::Arc;
117use warp::{Filter, Reply};
118
119#[derive(Debug, Serialize)]
121pub struct SystemOverview {
122 pub total_queues: u32,
123 pub total_jobs: u64,
124 pub pending_jobs: u64,
125 pub running_jobs: u64,
126 pub completed_jobs: u64,
127 pub failed_jobs: u64,
128 pub dead_jobs: u64,
129 pub overall_throughput: f64,
130 pub overall_error_rate: f64,
131 pub avg_processing_time_ms: f64,
132 pub system_health: SystemHealth,
133 pub uptime_seconds: u64,
134 pub last_updated: chrono::DateTime<chrono::Utc>,
135}
136
137#[derive(Debug, Serialize)]
139pub struct SystemHealth {
140 pub status: String, pub database_healthy: bool,
142 pub high_error_rate: bool,
143 pub queue_backlog: bool,
144 pub slow_processing: bool,
145 pub alerts: Vec<SystemAlert>,
146}
147
148#[derive(Debug, Serialize)]
150pub struct SystemAlert {
151 pub severity: String, pub message: String,
153 pub queue: Option<String>,
154 pub metric: Option<String>,
155 pub value: Option<f64>,
156 pub threshold: Option<f64>,
157 pub timestamp: chrono::DateTime<chrono::Utc>,
158}
159
160#[derive(Debug, Serialize)]
162pub struct DetailedStats {
163 pub overview: SystemOverview,
164 pub queue_stats: Vec<QueueStats>,
165 pub hourly_trends: Vec<HourlyTrend>,
166 pub error_patterns: Vec<ErrorPattern>,
167 pub performance_metrics: PerformanceMetrics,
168}
169
170#[derive(Debug, Serialize)]
172pub struct QueueStats {
173 pub name: String,
174 pub pending: u64,
175 pub running: u64,
176 pub completed_total: u64,
177 pub failed_total: u64,
178 pub dead_total: u64,
179 pub throughput_per_minute: f64,
180 pub avg_processing_time_ms: f64,
181 pub error_rate: f64,
182 pub oldest_pending_age_seconds: Option<u64>,
183 pub priority_distribution: HashMap<String, f32>,
184}
185
186#[derive(Debug, Serialize)]
188pub struct HourlyTrend {
189 pub hour: chrono::DateTime<chrono::Utc>,
190 pub completed: u64,
191 pub failed: u64,
192 pub throughput: f64,
193 pub avg_processing_time_ms: f64,
194 pub error_rate: f64,
195}
196
197#[derive(Debug, Serialize)]
199pub struct ErrorPattern {
200 pub error_type: String,
201 pub count: u64,
202 pub percentage: f64,
203 pub sample_message: String,
204 pub first_seen: chrono::DateTime<chrono::Utc>,
205 pub last_seen: chrono::DateTime<chrono::Utc>,
206 pub affected_queues: Vec<String>,
207}
208
209#[derive(Debug, Serialize)]
211pub struct PerformanceMetrics {
212 pub database_response_time_ms: f64,
213 pub average_queue_depth: f64,
214 pub jobs_per_second: f64,
215 pub memory_usage_mb: Option<f64>,
216 pub cpu_usage_percent: Option<f64>,
217 pub active_workers: u32,
218 pub worker_utilization: f64,
219}
220
221#[derive(Debug, Deserialize)]
223pub struct TimeRange {
224 pub start: chrono::DateTime<chrono::Utc>,
225 pub end: chrono::DateTime<chrono::Utc>,
226}
227
228#[derive(Debug, Deserialize)]
230pub struct StatsQuery {
231 pub time_range: Option<TimeRange>,
232 pub queues: Option<Vec<String>>,
233 pub granularity: Option<String>, }
235
236pub fn routes<T>(
238 queue: Arc<T>,
239 system_state: Arc<tokio::sync::RwLock<crate::api::system::SystemState>>,
240) -> impl Filter<Extract = impl Reply, Error = warp::Rejection> + Clone
241where
242 T: DatabaseQueue + Send + Sync + 'static,
243{
244 let queue_filter = warp::any().map(move || queue.clone());
245 let state_filter = warp::any().map(move || system_state.clone());
246
247 let overview = warp::path("stats")
248 .and(warp::path("overview"))
249 .and(warp::path::end())
250 .and(warp::get())
251 .and(queue_filter.clone())
252 .and(state_filter.clone())
253 .and_then(overview_handler);
254
255 let detailed = warp::path("stats")
256 .and(warp::path("detailed"))
257 .and(warp::path::end())
258 .and(warp::get())
259 .and(queue_filter.clone())
260 .and(warp::query::<StatsQuery>())
261 .and_then(detailed_stats_handler);
262
263 let trends = warp::path("stats")
264 .and(warp::path("trends"))
265 .and(warp::path::end())
266 .and(warp::get())
267 .and(queue_filter.clone())
268 .and(warp::query::<StatsQuery>())
269 .and_then(trends_handler);
270
271 let health = warp::path("stats")
272 .and(warp::path("health"))
273 .and(warp::path::end())
274 .and(warp::get())
275 .and(queue_filter)
276 .and_then(health_handler);
277
278 overview.or(detailed).or(trends).or(health)
279}
280
281async fn overview_handler<T>(
283 queue: Arc<T>,
284 system_state: Arc<tokio::sync::RwLock<crate::api::system::SystemState>>,
285) -> Result<impl Reply, warp::Rejection>
286where
287 T: DatabaseQueue + Send + Sync,
288{
289 match queue.get_all_queue_stats().await {
290 Ok(all_stats) => {
291 let mut total_pending = 0;
292 let mut total_running = 0;
293 let mut total_completed = 0;
294 let mut total_failed = 0;
295 let mut total_dead = 0;
296 let mut total_throughput = 0.0;
297 let mut total_processing_time = 0.0;
298 let mut queue_count = 0;
299
300 for stats in &all_stats {
301 total_pending += stats.pending_count;
302 total_running += stats.running_count;
303 total_completed += stats.completed_count;
304 total_failed += stats.dead_count + stats.timed_out_count;
305 total_dead += stats.dead_count;
306 total_throughput += stats.statistics.throughput_per_minute;
307 total_processing_time += stats.statistics.avg_processing_time_ms;
308 queue_count += 1;
309 }
310
311 let avg_processing_time = if queue_count > 0 {
312 total_processing_time / queue_count as f64
313 } else {
314 0.0
315 };
316
317 let total_jobs = total_pending + total_running + total_completed + total_failed;
318 let overall_error_rate = if total_jobs > 0 {
319 total_failed as f64 / total_jobs as f64
320 } else {
321 0.0
322 };
323
324 let health = assess_system_health(&all_stats);
326
327 let overview = SystemOverview {
328 total_queues: queue_count,
329 total_jobs,
330 pending_jobs: total_pending,
331 running_jobs: total_running,
332 completed_jobs: total_completed,
333 failed_jobs: total_failed,
334 dead_jobs: total_dead,
335 overall_throughput: total_throughput,
336 overall_error_rate,
337 avg_processing_time_ms: avg_processing_time,
338 system_health: health,
339 uptime_seconds: {
340 let state = system_state.read().await;
341 state.uptime_seconds() as u64
342 },
343 last_updated: chrono::Utc::now(),
344 };
345
346 Ok(warp::reply::json(&ApiResponse::success(overview)))
347 }
348 Err(e) => {
349 let response = ApiResponse::<()>::error(format!("Failed to get statistics: {}", e));
350 Ok(warp::reply::json(&response))
351 }
352 }
353}
354
355async fn detailed_stats_handler<T>(
357 queue: Arc<T>,
358 query: StatsQuery,
359) -> Result<impl Reply, warp::Rejection>
360where
361 T: DatabaseQueue + Send + Sync,
362{
363 let _ = query;
366
367 match queue.get_all_queue_stats().await {
368 Ok(all_stats) => {
369 let mut queue_stats: Vec<QueueStats> = Vec::new();
371 for stats in all_stats.iter() {
372 let oldest_pending_age_seconds =
374 calculate_oldest_pending_age(&queue, &stats.queue_name).await;
375
376 let priority_distribution =
378 get_priority_distribution(&queue, &stats.queue_name).await;
379
380 queue_stats.push(QueueStats {
381 name: stats.queue_name.clone(),
382 pending: stats.pending_count,
383 running: stats.running_count,
384 completed_total: stats.completed_count,
385 failed_total: stats.dead_count + stats.timed_out_count,
386 dead_total: stats.dead_count,
387 throughput_per_minute: stats.statistics.throughput_per_minute,
388 avg_processing_time_ms: stats.statistics.avg_processing_time_ms,
389 error_rate: stats.statistics.error_rate,
390 oldest_pending_age_seconds,
391 priority_distribution,
392 });
393 }
394
395 let hourly_trends = generate_hourly_trends(&queue, &all_stats).await;
397 let error_patterns = generate_error_patterns(&queue, &all_stats).await;
398 let performance_metrics = calculate_performance_metrics(&all_stats);
399
400 let overview = generate_overview_from_stats(&all_stats);
402
403 let detailed = DetailedStats {
404 overview,
405 queue_stats,
406 hourly_trends,
407 error_patterns,
408 performance_metrics,
409 };
410
411 Ok(warp::reply::json(&ApiResponse::success(detailed)))
412 }
413 Err(e) => {
414 let response =
415 ApiResponse::<()>::error(format!("Failed to get detailed statistics: {}", e));
416 Ok(warp::reply::json(&response))
417 }
418 }
419}
420
421async fn trends_handler<T>(queue: Arc<T>, query: StatsQuery) -> Result<impl Reply, warp::Rejection>
423where
424 T: DatabaseQueue + Send + Sync,
425{
426 let _ = (queue, query);
429
430 let trends: Vec<HourlyTrend> = (0..24)
431 .map(|hour| HourlyTrend {
432 hour: chrono::Utc::now() - chrono::Duration::hours(23 - hour),
433 completed: (hour * 10 + 50) as u64,
434 failed: (hour / 4) as u64,
435 throughput: 5.0 + (hour as f64 * 0.5),
436 avg_processing_time_ms: 100.0 + (hour as f64 * 2.0),
437 error_rate: 0.01 + (hour as f64 * 0.001),
438 })
439 .collect();
440
441 Ok(warp::reply::json(&ApiResponse::success(trends)))
442}
443
444async fn health_handler<T>(queue: Arc<T>) -> Result<impl Reply, warp::Rejection>
446where
447 T: DatabaseQueue + Send + Sync,
448{
449 match queue.get_all_queue_stats().await {
450 Ok(all_stats) => {
451 let health = assess_system_health(&all_stats);
452 Ok(warp::reply::json(&ApiResponse::success(health)))
453 }
454 Err(e) => {
455 let health = SystemHealth {
456 status: "critical".to_string(),
457 database_healthy: false,
458 high_error_rate: false,
459 queue_backlog: false,
460 slow_processing: false,
461 alerts: vec![SystemAlert {
462 severity: "critical".to_string(),
463 message: format!("Database connection failed: {}", e),
464 queue: None,
465 metric: Some("database_connectivity".to_string()),
466 value: None,
467 threshold: None,
468 timestamp: chrono::Utc::now(),
469 }],
470 };
471 Ok(warp::reply::json(&ApiResponse::success(health)))
472 }
473 }
474}
475
476fn assess_system_health(stats: &[hammerwork::stats::QueueStats]) -> SystemHealth {
478 let mut alerts = Vec::new();
479 let mut high_error_rate = false;
480 let mut queue_backlog = false;
481 let mut slow_processing = false;
482
483 for stat in stats {
484 if stat.statistics.error_rate > 0.1 {
486 high_error_rate = true;
488 alerts.push(SystemAlert {
489 severity: "warning".to_string(),
490 message: format!("High error rate in queue '{}'", stat.queue_name),
491 queue: Some(stat.queue_name.clone()),
492 metric: Some("error_rate".to_string()),
493 value: Some(stat.statistics.error_rate),
494 threshold: Some(0.1),
495 timestamp: chrono::Utc::now(),
496 });
497 }
498
499 if stat.pending_count > 1000 {
501 queue_backlog = true;
502 alerts.push(SystemAlert {
503 severity: "warning".to_string(),
504 message: format!("Large backlog in queue '{}'", stat.queue_name),
505 queue: Some(stat.queue_name.clone()),
506 metric: Some("pending_count".to_string()),
507 value: Some(stat.pending_count as f64),
508 threshold: Some(1000.0),
509 timestamp: chrono::Utc::now(),
510 });
511 }
512
513 if stat.statistics.avg_processing_time_ms > 30000.0 {
515 slow_processing = true;
517 alerts.push(SystemAlert {
518 severity: "info".to_string(),
519 message: format!("Slow processing in queue '{}'", stat.queue_name),
520 queue: Some(stat.queue_name.clone()),
521 metric: Some("avg_processing_time_ms".to_string()),
522 value: Some(stat.statistics.avg_processing_time_ms),
523 threshold: Some(30000.0),
524 timestamp: chrono::Utc::now(),
525 });
526 }
527 }
528
529 let status = if alerts.iter().any(|a| a.severity == "critical") {
530 "critical"
531 } else if alerts.iter().any(|a| a.severity == "warning") {
532 "degraded"
533 } else {
534 "healthy"
535 };
536
537 SystemHealth {
538 status: status.to_string(),
539 database_healthy: true, high_error_rate,
541 queue_backlog,
542 slow_processing,
543 alerts,
544 }
545}
546
547fn generate_overview_from_stats(stats: &[hammerwork::stats::QueueStats]) -> SystemOverview {
549 let mut total_pending = 0;
550 let mut total_running = 0;
551 let mut total_completed = 0;
552 let mut total_failed = 0;
553 let mut total_dead = 0;
554 let mut total_throughput = 0.0;
555 let mut total_processing_time = 0.0;
556 let queue_count = stats.len();
557
558 for stat in stats {
559 total_pending += stat.pending_count;
560 total_running += stat.running_count;
561 total_completed += stat.completed_count;
562 total_failed += stat.dead_count + stat.timed_out_count;
563 total_dead += stat.dead_count;
564 total_throughput += stat.statistics.throughput_per_minute;
565 total_processing_time += stat.statistics.avg_processing_time_ms;
566 }
567
568 let avg_processing_time = if queue_count > 0 {
569 total_processing_time / queue_count as f64
570 } else {
571 0.0
572 };
573
574 let total_jobs = total_pending + total_running + total_completed + total_failed;
575 let overall_error_rate = if total_jobs > 0 {
576 total_failed as f64 / total_jobs as f64
577 } else {
578 0.0
579 };
580
581 let health = assess_system_health(stats);
582
583 SystemOverview {
584 total_queues: queue_count as u32,
585 total_jobs,
586 pending_jobs: total_pending,
587 running_jobs: total_running,
588 completed_jobs: total_completed,
589 failed_jobs: total_failed,
590 dead_jobs: total_dead,
591 overall_throughput: total_throughput,
592 overall_error_rate,
593 avg_processing_time_ms: avg_processing_time,
594 system_health: health,
595 uptime_seconds: 0,
596 last_updated: chrono::Utc::now(),
597 }
598}
599
600async fn calculate_oldest_pending_age<T>(queue: &Arc<T>, queue_name: &str) -> Option<u64>
602where
603 T: DatabaseQueue + Send + Sync,
604{
605 match queue.get_ready_jobs(queue_name, 100).await {
607 Ok(jobs) => {
608 let now = chrono::Utc::now();
609 jobs.iter()
610 .filter(|job| matches!(job.status, hammerwork::job::JobStatus::Pending))
611 .map(|job| {
612 let age = now - job.created_at;
613 age.num_seconds() as u64
614 })
615 .max()
616 }
617 Err(_) => None,
618 }
619}
620
621async fn get_priority_distribution<T>(queue: &Arc<T>, queue_name: &str) -> HashMap<String, f32>
623where
624 T: DatabaseQueue + Send + Sync,
625{
626 match queue.get_priority_stats(queue_name).await {
627 Ok(priority_stats) => priority_stats
628 .priority_distribution
629 .into_iter()
630 .map(|(priority, percentage)| {
631 let priority_name = match priority {
632 hammerwork::priority::JobPriority::Background => "background",
633 hammerwork::priority::JobPriority::Low => "low",
634 hammerwork::priority::JobPriority::Normal => "normal",
635 hammerwork::priority::JobPriority::High => "high",
636 hammerwork::priority::JobPriority::Critical => "critical",
637 };
638 (priority_name.to_string(), percentage)
639 })
640 .collect(),
641 Err(_) => HashMap::new(),
642 }
643}
644
645async fn generate_hourly_trends<T>(
647 queue: &Arc<T>,
648 all_stats: &[hammerwork::stats::QueueStats],
649) -> Vec<HourlyTrend>
650where
651 T: DatabaseQueue + Send + Sync,
652{
653 let now = chrono::Utc::now();
654 let mut trends = Vec::new();
655
656 for i in 0..24 {
658 let hour_start = now - chrono::Duration::hours(23 - i);
659 let hour_end = hour_start + chrono::Duration::hours(1);
660
661 let mut hour_completed = 0u64;
662 let mut hour_failed = 0u64;
663 let mut hour_processing_times = Vec::new();
664
665 if let Ok(completed_jobs) = queue
667 .get_jobs_completed_in_range(None, hour_start, hour_end, Some(1000))
668 .await
669 {
670 hour_completed = completed_jobs.len() as u64;
671
672 for job in completed_jobs {
674 if let (Some(started_at), Some(completed_at)) = (job.started_at, job.completed_at) {
675 let processing_time = (completed_at - started_at).num_milliseconds() as f64;
676 hour_processing_times.push(processing_time);
677 }
678 }
679 }
680
681 if let Ok(error_frequencies) = queue.get_error_frequencies(None, hour_start).await {
685 let total_errors_since_start = error_frequencies.values().sum::<u64>();
687
688 if i < 3 {
690 hour_failed = total_errors_since_start / ((i + 1) as u64).max(1);
692 } else {
693 hour_failed = total_errors_since_start / 24; }
696 }
697
698 let hour_throughput = (hour_completed + hour_failed) as f64 / 3600.0;
700
701 let avg_processing_time_ms = if !hour_processing_times.is_empty() {
703 hour_processing_times.iter().sum::<f64>() / hour_processing_times.len() as f64
704 } else {
705 if !all_stats.is_empty() {
707 all_stats
708 .iter()
709 .map(|s| s.statistics.avg_processing_time_ms)
710 .sum::<f64>()
711 / all_stats.len() as f64
712 } else {
713 0.0
714 }
715 };
716
717 let error_rate = if (hour_completed + hour_failed) > 0 {
718 hour_failed as f64 / (hour_completed + hour_failed) as f64
719 } else {
720 0.0
721 };
722
723 trends.push(HourlyTrend {
724 hour: hour_start,
725 completed: hour_completed,
726 failed: hour_failed,
727 throughput: hour_throughput,
728 avg_processing_time_ms,
729 error_rate,
730 });
731 }
732
733 trends
734}
735
736async fn generate_error_patterns<T>(
738 queue: &Arc<T>,
739 all_stats: &[hammerwork::stats::QueueStats],
740) -> Vec<ErrorPattern>
741where
742 T: DatabaseQueue + Send + Sync,
743{
744 let mut error_patterns = Vec::new();
745 let total_errors = all_stats.iter().map(|s| s.dead_count).sum::<u64>();
746
747 if total_errors == 0 {
748 return error_patterns;
749 }
750
751 let mut error_messages = Vec::new();
753 for stats in all_stats {
754 if let Ok(dead_jobs) = queue
755 .get_dead_jobs_by_queue(&stats.queue_name, Some(20), Some(0))
756 .await
757 {
758 for job in dead_jobs {
759 if let Some(error_msg) = job.error_message {
760 error_messages.push((error_msg, job.failed_at.unwrap_or(job.created_at)));
761 }
762 }
763 }
764 }
765
766 let mut error_counts = std::collections::HashMap::new();
768 let mut error_first_seen = std::collections::HashMap::new();
769
770 for (error_msg, failed_at) in error_messages {
771 let error_type = extract_error_type(&error_msg);
772 let count = error_counts.entry(error_type.clone()).or_insert(0);
773 *count += 1;
774
775 error_first_seen
776 .entry(error_type.clone())
777 .or_insert_with(|| (error_msg, failed_at));
778 }
779
780 for (error_type, count) in error_counts {
782 let percentage = (count as f64 / total_errors as f64) * 100.0;
783 let (sample_message, first_seen) = error_first_seen.get(&error_type).unwrap();
784
785 error_patterns.push(ErrorPattern {
786 error_type: error_type.clone(),
787 count,
788 percentage,
789 sample_message: sample_message.clone(),
790 first_seen: *first_seen,
791 last_seen: chrono::Utc::now(), affected_queues: vec![error_type], });
794 }
795
796 error_patterns.sort_by(|a, b| b.count.cmp(&a.count));
798
799 error_patterns
800}
801
802fn calculate_performance_metrics(
804 all_stats: &[hammerwork::stats::QueueStats],
805) -> PerformanceMetrics {
806 let total_jobs = all_stats
807 .iter()
808 .map(|s| s.pending_count + s.running_count + s.completed_count + s.dead_count)
809 .sum::<u64>();
810 let total_throughput = all_stats
811 .iter()
812 .map(|s| s.statistics.throughput_per_minute)
813 .sum::<f64>();
814 let avg_processing_time = if !all_stats.is_empty() {
815 all_stats
816 .iter()
817 .map(|s| s.statistics.avg_processing_time_ms)
818 .sum::<f64>()
819 / all_stats.len() as f64
820 } else {
821 0.0
822 };
823
824 let average_queue_depth = if !all_stats.is_empty() {
825 all_stats
826 .iter()
827 .map(|s| s.pending_count as f64)
828 .sum::<f64>()
829 / all_stats.len() as f64
830 } else {
831 0.0
832 };
833
834 let database_response_time_ms = if avg_processing_time > 0.0 {
836 (avg_processing_time * 0.1).max(1.0).min(100.0) } else {
838 2.0
839 };
840
841 PerformanceMetrics {
842 database_response_time_ms,
843 average_queue_depth,
844 jobs_per_second: total_throughput / 60.0, memory_usage_mb: None, cpu_usage_percent: None, active_workers: all_stats.iter().map(|s| s.running_count as u32).sum(),
848 worker_utilization: if total_jobs > 0 {
849 all_stats.iter().map(|s| s.running_count).sum::<u64>() as f64 / total_jobs as f64
850 } else {
851 0.0
852 },
853 }
854}
855
856fn extract_error_type(error_msg: &str) -> String {
858 if error_msg.contains("timeout") || error_msg.contains("Timeout") {
860 "Timeout Error".to_string()
861 } else if error_msg.contains("connection") || error_msg.contains("Connection") {
862 "Connection Error".to_string()
863 } else if error_msg.contains("parse")
864 || error_msg.contains("Parse")
865 || error_msg.contains("invalid")
866 {
867 "Parse Error".to_string()
868 } else if error_msg.contains("permission")
869 || error_msg.contains("Permission")
870 || error_msg.contains("forbidden")
871 {
872 "Permission Error".to_string()
873 } else if error_msg.contains("not found") || error_msg.contains("Not Found") {
874 "Not Found Error".to_string()
875 } else {
876 error_msg
878 .split_whitespace()
879 .next()
880 .map(|s| format!("{} Error", s))
881 .unwrap_or_else(|| "Unknown Error".to_string())
882 }
883}
884
885#[cfg(test)]
886mod tests {
887 use super::*;
888
889 #[test]
890 fn test_stats_query_deserialization() {
891 let json = r#"{
892 "time_range": {
893 "start": "2024-01-01T00:00:00Z",
894 "end": "2024-01-02T00:00:00Z"
895 },
896 "queues": ["email", "data-processing"],
897 "granularity": "hour"
898 }"#;
899
900 let query: StatsQuery = serde_json::from_str(json).unwrap();
901 assert!(query.time_range.is_some());
902 assert_eq!(query.queues.as_ref().unwrap().len(), 2);
903 assert_eq!(query.granularity, Some("hour".to_string()));
904 }
905
906 #[test]
907 fn test_system_alert_serialization() {
908 let alert = SystemAlert {
909 severity: "warning".to_string(),
910 message: "High error rate detected".to_string(),
911 queue: Some("email".to_string()),
912 metric: Some("error_rate".to_string()),
913 value: Some(0.15),
914 threshold: Some(0.1),
915 timestamp: chrono::Utc::now(),
916 };
917
918 let json = serde_json::to_string(&alert).unwrap();
919 assert!(json.contains("warning"));
920 assert!(json.contains("High error rate"));
921 }
922}