1use axum::{
2 extract::{Path, Query, State},
3 http::StatusCode,
4 response::Json,
5};
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8use sqlx::Row;
9use std::collections::HashMap;
10
11use super::DashboardState;
12
13pub struct DashboardApi;
15
16#[derive(Debug, Deserialize)]
18pub struct TimeRangeQuery {
19 pub start: Option<DateTime<Utc>>,
21 pub end: Option<DateTime<Utc>>,
23 pub period: Option<String>,
25}
26
27impl TimeRangeQuery {
28 fn get_range(&self) -> (DateTime<Utc>, DateTime<Utc>) {
30 let end = self.end.unwrap_or_else(Utc::now);
31 let start = self.start.unwrap_or_else(|| {
32 match self.period.as_deref() {
33 Some("1h") => end - Duration::hours(1),
34 Some("24h") => end - Duration::hours(24),
35 Some("7d") => end - Duration::days(7),
36 Some("30d") => end - Duration::days(30),
37 _ => end - Duration::hours(1), }
39 });
40 (start, end)
41 }
42}
43
44#[derive(Debug, Deserialize)]
46pub struct PaginationQuery {
47 pub page: Option<u32>,
49 pub limit: Option<u32>,
51}
52
53impl PaginationQuery {
54 fn get_limit(&self) -> i64 {
55 self.limit.unwrap_or(50).min(1000) as i64
56 }
57
58 fn get_offset(&self) -> i64 {
59 let page = self.page.unwrap_or(1).max(1);
60 ((page - 1) * self.limit.unwrap_or(50)) as i64
61 }
62}
63
64#[derive(Debug, Deserialize)]
66pub struct LogSearchQuery {
67 pub level: Option<String>,
69 pub q: Option<String>,
71 pub start: Option<DateTime<Utc>>,
73 pub end: Option<DateTime<Utc>>,
75 pub limit: Option<u32>,
77}
78
79#[derive(Debug, Deserialize)]
81pub struct TraceSearchQuery {
82 #[allow(dead_code)]
84 pub service: Option<String>,
85 #[allow(dead_code)]
87 pub operation: Option<String>,
88 #[allow(dead_code)]
90 pub min_duration: Option<u64>,
91 pub errors_only: Option<bool>,
93 pub start: Option<DateTime<Utc>>,
95 pub end: Option<DateTime<Utc>>,
97 pub limit: Option<u32>,
99}
100
101#[derive(Debug, Serialize)]
103pub struct MetricSummary {
104 pub name: String,
105 pub kind: String,
106 pub description: Option<String>,
107 pub current_value: f64,
108 pub labels: HashMap<String, String>,
109 pub last_updated: DateTime<Utc>,
110}
111
112#[derive(Debug, Serialize)]
114pub struct MetricPoint {
115 pub timestamp: DateTime<Utc>,
116 pub value: f64,
117}
118
119#[derive(Debug, Serialize)]
121pub struct MetricSeries {
122 pub name: String,
123 pub labels: HashMap<String, String>,
124 pub points: Vec<MetricPoint>,
125}
126
127#[derive(Debug, Serialize)]
129pub struct LogEntry {
130 pub id: String,
131 pub timestamp: DateTime<Utc>,
132 pub level: String,
133 pub message: String,
134 pub fields: HashMap<String, serde_json::Value>,
135 pub trace_id: Option<String>,
136 pub span_id: Option<String>,
137}
138
139#[derive(Debug, Serialize)]
141pub struct TraceSummary {
142 pub trace_id: String,
143 pub root_span_name: String,
144 pub service: String,
145 pub duration_ms: u64,
146 pub span_count: u32,
147 pub error: bool,
148 pub started_at: DateTime<Utc>,
149}
150
151#[derive(Debug, Serialize)]
153pub struct TraceDetail {
154 pub trace_id: String,
155 pub spans: Vec<SpanDetail>,
156}
157
158#[derive(Debug, Serialize)]
160pub struct SpanDetail {
161 pub span_id: String,
162 pub parent_span_id: Option<String>,
163 pub name: String,
164 pub service: String,
165 pub kind: String,
166 pub status: String,
167 pub start_time: DateTime<Utc>,
168 pub end_time: Option<DateTime<Utc>>,
169 pub duration_ms: Option<u64>,
170 pub attributes: HashMap<String, serde_json::Value>,
171 pub events: Vec<SpanEvent>,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct SpanEvent {
177 pub name: String,
178 pub timestamp: DateTime<Utc>,
179 pub attributes: HashMap<String, serde_json::Value>,
180}
181
182#[derive(Debug, Serialize)]
184pub struct AlertSummary {
185 pub id: String,
186 pub rule_id: String,
187 pub name: String,
188 pub severity: String,
189 pub status: String,
190 pub metric_value: f64,
191 pub threshold: f64,
192 pub fired_at: DateTime<Utc>,
193 pub resolved_at: Option<DateTime<Utc>>,
194 pub acknowledged_at: Option<DateTime<Utc>>,
195 pub acknowledged_by: Option<String>,
196}
197
198#[derive(Debug, Serialize)]
200pub struct AlertRuleSummary {
201 pub id: String,
202 pub name: String,
203 pub description: Option<String>,
204 pub metric_name: String,
205 pub condition: String,
206 pub threshold: f64,
207 pub severity: String,
208 pub enabled: bool,
209 pub created_at: DateTime<Utc>,
210}
211
212#[derive(Debug, Deserialize)]
214pub struct CreateAlertRuleRequest {
215 pub name: String,
216 pub description: Option<String>,
217 pub metric_name: String,
218 pub condition: String,
219 pub threshold: f64,
220 pub duration_seconds: Option<i32>,
221 pub severity: Option<String>,
222 pub cooldown_seconds: Option<i32>,
223}
224
225#[derive(Debug, Deserialize)]
227pub struct UpdateAlertRuleRequest {
228 pub name: Option<String>,
229 pub description: Option<String>,
230 pub metric_name: Option<String>,
231 pub condition: Option<String>,
232 pub threshold: Option<f64>,
233 pub duration_seconds: Option<i32>,
234 pub severity: Option<String>,
235 pub enabled: Option<bool>,
236 pub cooldown_seconds: Option<i32>,
237}
238
239#[derive(Debug, Deserialize)]
241pub struct AcknowledgeAlertRequest {
242 pub acknowledged_by: String,
243}
244
245#[derive(Debug, Serialize)]
247pub struct JobStats {
248 pub pending: u64,
249 pub running: u64,
250 pub completed: u64,
251 pub failed: u64,
252 pub retrying: u64,
253 pub dead_letter: u64,
254}
255
256#[derive(Debug, Serialize)]
258pub struct WorkflowStats {
259 pub running: u64,
260 pub completed: u64,
261 pub waiting: u64,
262 pub failed: u64,
263 pub compensating: u64,
264}
265
266#[derive(Debug, Serialize)]
268pub struct WorkflowRun {
269 pub id: String,
270 pub workflow_name: String,
271 pub version: Option<String>,
272 pub status: String,
273 pub current_step: Option<String>,
274 pub started_at: DateTime<Utc>,
275 pub completed_at: Option<DateTime<Utc>>,
276 pub error: Option<String>,
277}
278
279#[derive(Debug, Serialize)]
281pub struct JobDetail {
282 pub id: String,
283 pub job_type: String,
284 pub status: String,
285 pub priority: i32,
286 pub attempts: i32,
287 pub max_attempts: i32,
288 pub progress_percent: Option<i32>,
289 pub progress_message: Option<String>,
290 pub input: Option<serde_json::Value>,
291 pub output: Option<serde_json::Value>,
292 pub scheduled_at: DateTime<Utc>,
293 pub created_at: DateTime<Utc>,
294 pub started_at: Option<DateTime<Utc>>,
295 pub completed_at: Option<DateTime<Utc>>,
296 pub last_error: Option<String>,
297}
298
299#[derive(Debug, Serialize)]
301pub struct WorkflowDetail {
302 pub id: String,
303 pub workflow_name: String,
304 pub version: Option<String>,
305 pub status: String,
306 pub input: Option<serde_json::Value>,
307 pub output: Option<serde_json::Value>,
308 pub current_step: Option<String>,
309 pub steps: Vec<WorkflowStepDetail>,
310 pub started_at: DateTime<Utc>,
311 pub completed_at: Option<DateTime<Utc>>,
312 pub error: Option<String>,
313}
314
315#[derive(Debug, Serialize)]
317pub struct WorkflowStepDetail {
318 pub name: String,
319 pub status: String,
320 pub result: Option<serde_json::Value>,
321 pub started_at: Option<DateTime<Utc>>,
322 pub completed_at: Option<DateTime<Utc>>,
323 pub error: Option<String>,
324}
325
326#[derive(Debug, Serialize)]
328pub struct NodeInfo {
329 pub id: String,
330 pub name: String,
331 pub roles: Vec<String>,
332 pub status: String,
333 pub last_heartbeat: DateTime<Utc>,
334 pub version: String,
335 pub started_at: DateTime<Utc>,
336}
337
338#[derive(Debug, Serialize)]
340pub struct ClusterHealth {
341 pub status: String,
342 pub node_count: u32,
343 pub healthy_nodes: u32,
344 pub leader_node: Option<String>,
345 pub leaders: HashMap<String, String>,
346}
347
348#[derive(Debug, Serialize)]
350pub struct SystemInfo {
351 pub version: String,
352 pub rust_version: String,
353 pub started_at: DateTime<Utc>,
354 pub uptime_seconds: u64,
355}
356
357#[derive(Debug, Serialize)]
359pub struct SystemStats {
360 pub http_requests_total: u64,
361 pub http_requests_per_second: f64,
362 pub p99_latency_ms: Option<f64>,
363 pub function_calls_total: u64,
364 pub active_connections: u32,
365 pub active_subscriptions: u32,
366 pub jobs_pending: u64,
367 pub memory_used_mb: u64,
368 pub cpu_usage_percent: f64,
369}
370
371#[derive(Debug, Serialize)]
373pub struct ApiResponse<T> {
374 pub success: bool,
375 pub data: Option<T>,
376 pub error: Option<String>,
377}
378
379impl<T> ApiResponse<T> {
380 pub fn success(data: T) -> Self {
381 Self {
382 success: true,
383 data: Some(data),
384 error: None,
385 }
386 }
387
388 pub fn error(message: impl Into<String>) -> Self {
389 Self {
390 success: false,
391 data: None,
392 error: Some(message.into()),
393 }
394 }
395}
396
397pub async fn list_metrics(
403 State(state): State<DashboardState>,
404 Query(_query): Query<TimeRangeQuery>,
405) -> Json<ApiResponse<Vec<MetricSummary>>> {
406 let result = sqlx::query(
407 r#"
408 SELECT DISTINCT ON (name) name, kind, value, labels, timestamp
409 FROM forge_metrics
410 ORDER BY name, timestamp DESC
411 "#,
412 )
413 .fetch_all(&state.pool)
414 .await;
415
416 match result {
417 Ok(rows) => {
418 let metrics: Vec<MetricSummary> = rows
419 .into_iter()
420 .map(|row| {
421 let labels: serde_json::Value = row.get("labels");
422 MetricSummary {
423 name: row.get("name"),
424 kind: row.get("kind"),
425 description: None,
426 current_value: row.get("value"),
427 labels: serde_json::from_value(labels).unwrap_or_default(),
428 last_updated: row.get("timestamp"),
429 }
430 })
431 .collect();
432 Json(ApiResponse::success(metrics))
433 }
434 Err(e) => Json(ApiResponse::error(e.to_string())),
435 }
436}
437
438pub async fn get_metric(
440 State(state): State<DashboardState>,
441 Path(name): Path<String>,
442 Query(query): Query<TimeRangeQuery>,
443) -> Json<ApiResponse<MetricSummary>> {
444 let (start, end) = query.get_range();
445
446 let result = sqlx::query(
447 r#"
448 SELECT name, kind, value, labels, timestamp
449 FROM forge_metrics
450 WHERE name = $1 AND timestamp >= $2 AND timestamp <= $3
451 ORDER BY timestamp DESC
452 LIMIT 1
453 "#,
454 )
455 .bind(&name)
456 .bind(start)
457 .bind(end)
458 .fetch_optional(&state.pool)
459 .await;
460
461 match result {
462 Ok(Some(row)) => {
463 let labels: serde_json::Value = row.get("labels");
464 Json(ApiResponse::success(MetricSummary {
465 name: row.get("name"),
466 kind: row.get("kind"),
467 description: None,
468 current_value: row.get("value"),
469 labels: serde_json::from_value(labels).unwrap_or_default(),
470 last_updated: row.get("timestamp"),
471 }))
472 }
473 Ok(None) => Json(ApiResponse::error(format!("Metric '{}' not found", name))),
474 Err(e) => Json(ApiResponse::error(e.to_string())),
475 }
476}
477
478pub async fn get_metric_series(
481 State(state): State<DashboardState>,
482 Query(query): Query<TimeRangeQuery>,
483) -> Json<ApiResponse<Vec<MetricSeries>>> {
484 let (start, end) = query.get_range();
485
486 let duration = end.signed_duration_since(start);
488 let bucket_interval = if duration.num_hours() <= 1 {
489 "1 minute" } else if duration.num_hours() <= 24 {
491 "5 minutes" } else if duration.num_days() <= 7 {
493 "1 hour" } else {
495 "1 day" };
497
498 let result = sqlx::query(
502 r#"
503 WITH bucketed AS (
504 SELECT
505 name,
506 labels,
507 kind,
508 date_trunc($3, timestamp) as bucket,
509 SUM(value) as sum_value,
510 MAX(value) as max_value,
511 COUNT(*) as cnt
512 FROM forge_metrics
513 WHERE timestamp >= $1 AND timestamp <= $2
514 GROUP BY name, labels, kind, date_trunc($3, timestamp)
515 ORDER BY name, bucket
516 )
517 SELECT
518 name,
519 labels,
520 bucket as timestamp,
521 CASE
522 WHEN kind = 'counter' THEN sum_value
523 ELSE max_value
524 END as value
525 FROM bucketed
526 ORDER BY name, bucket
527 "#,
528 )
529 .bind(start)
530 .bind(end)
531 .bind(bucket_interval)
532 .fetch_all(&state.pool)
533 .await;
534
535 match result {
536 Ok(rows) => {
537 let mut series_map: HashMap<String, MetricSeries> = HashMap::new();
538
539 for row in rows {
540 let name: String = row.get("name");
541 let value: f64 = row.get("value");
542 let timestamp: DateTime<Utc> = row.get("timestamp");
543 let labels: serde_json::Value = row.get("labels");
544
545 let series = series_map
546 .entry(name.clone())
547 .or_insert_with(|| MetricSeries {
548 name: name.clone(),
549 labels: serde_json::from_value(labels).unwrap_or_default(),
550 points: Vec::new(),
551 });
552
553 series.points.push(MetricPoint { timestamp, value });
554 }
555
556 Json(ApiResponse::success(series_map.into_values().collect()))
557 }
558 Err(e) => Json(ApiResponse::error(e.to_string())),
559 }
560}
561
562pub async fn list_logs(
568 State(state): State<DashboardState>,
569 Query(query): Query<LogSearchQuery>,
570) -> Json<ApiResponse<Vec<LogEntry>>> {
571 let limit = query.limit.unwrap_or(100).min(1000) as i64;
572 let level_filter = query.level.as_deref();
573
574 let result = sqlx::query(
575 r#"
576 SELECT id, level, message, target, fields, trace_id, span_id, timestamp
577 FROM forge_logs
578 WHERE ($1::TEXT IS NULL OR level = $1)
579 AND ($2::TIMESTAMPTZ IS NULL OR timestamp >= $2)
580 AND ($3::TIMESTAMPTZ IS NULL OR timestamp <= $3)
581 ORDER BY timestamp DESC
582 LIMIT $4
583 "#,
584 )
585 .bind(level_filter)
586 .bind(query.start)
587 .bind(query.end)
588 .bind(limit)
589 .fetch_all(&state.pool)
590 .await;
591
592 match result {
593 Ok(rows) => {
594 let logs: Vec<LogEntry> = rows
595 .into_iter()
596 .map(|row| {
597 let id: i64 = row.get("id");
598 let fields: serde_json::Value = row.get("fields");
599 LogEntry {
600 id: id.to_string(),
601 timestamp: row.get("timestamp"),
602 level: row.get("level"),
603 message: row.get("message"),
604 fields: serde_json::from_value(fields).unwrap_or_default(),
605 trace_id: row.get("trace_id"),
606 span_id: row.get("span_id"),
607 }
608 })
609 .collect();
610 Json(ApiResponse::success(logs))
611 }
612 Err(e) => Json(ApiResponse::error(e.to_string())),
613 }
614}
615
616pub async fn search_logs(
618 State(state): State<DashboardState>,
619 Query(query): Query<LogSearchQuery>,
620) -> Json<ApiResponse<Vec<LogEntry>>> {
621 let limit = query.limit.unwrap_or(100).min(1000) as i64;
622 let search_pattern = query.q.as_ref().map(|q| format!("%{}%", q));
623
624 let result = sqlx::query(
625 r#"
626 SELECT id, level, message, target, fields, trace_id, span_id, timestamp
627 FROM forge_logs
628 WHERE ($1::TEXT IS NULL OR message ILIKE $1)
629 AND ($2::TEXT IS NULL OR level = $2)
630 ORDER BY timestamp DESC
631 LIMIT $3
632 "#,
633 )
634 .bind(&search_pattern)
635 .bind(&query.level)
636 .bind(limit)
637 .fetch_all(&state.pool)
638 .await;
639
640 match result {
641 Ok(rows) => {
642 let logs: Vec<LogEntry> = rows
643 .into_iter()
644 .map(|row| {
645 let id: i64 = row.get("id");
646 let fields: serde_json::Value = row.get("fields");
647 LogEntry {
648 id: id.to_string(),
649 timestamp: row.get("timestamp"),
650 level: row.get("level"),
651 message: row.get("message"),
652 fields: serde_json::from_value(fields).unwrap_or_default(),
653 trace_id: row.get("trace_id"),
654 span_id: row.get("span_id"),
655 }
656 })
657 .collect();
658 Json(ApiResponse::success(logs))
659 }
660 Err(e) => Json(ApiResponse::error(e.to_string())),
661 }
662}
663
664pub async fn list_traces(
670 State(state): State<DashboardState>,
671 Query(query): Query<TraceSearchQuery>,
672) -> Json<ApiResponse<Vec<TraceSummary>>> {
673 let limit = query.limit.unwrap_or(50).min(1000) as i64;
674 let errors_only = query.errors_only.unwrap_or(false);
675
676 let result = sqlx::query(
677 r#"
678 WITH trace_stats AS (
679 SELECT
680 trace_id,
681 MIN(started_at) as started_at,
682 MAX(duration_ms) as duration_ms,
683 COUNT(*) as span_count,
684 BOOL_OR(status = 'error') as has_error,
685 (array_agg(name ORDER BY started_at ASC))[1] as root_span_name,
686 (array_agg(attributes->>'service.name' ORDER BY started_at ASC) FILTER (WHERE attributes->>'service.name' IS NOT NULL))[1] as service_name
687 FROM forge_traces
688 WHERE ($1::TIMESTAMPTZ IS NULL OR started_at >= $1)
689 AND ($2::TIMESTAMPTZ IS NULL OR started_at <= $2)
690 GROUP BY trace_id
691 )
692 SELECT * FROM trace_stats
693 WHERE ($3::BOOLEAN = FALSE OR has_error = TRUE)
694 ORDER BY started_at DESC
695 LIMIT $4
696 "#,
697 )
698 .bind(query.start)
699 .bind(query.end)
700 .bind(errors_only)
701 .bind(limit)
702 .fetch_all(&state.pool)
703 .await;
704
705 match result {
706 Ok(rows) => {
707 let traces: Vec<TraceSummary> = rows
708 .into_iter()
709 .map(|row| TraceSummary {
710 trace_id: row.get("trace_id"),
711 root_span_name: row
712 .get::<Option<String>, _>("root_span_name")
713 .unwrap_or_default(),
714 service: row
715 .get::<Option<String>, _>("service_name")
716 .unwrap_or_else(|| "unknown".to_string()),
717 duration_ms: row.get::<Option<i32>, _>("duration_ms").unwrap_or(0) as u64,
718 span_count: row.get::<i64, _>("span_count") as u32,
719 error: row.get("has_error"),
720 started_at: row.get("started_at"),
721 })
722 .collect();
723 Json(ApiResponse::success(traces))
724 }
725 Err(e) => Json(ApiResponse::error(e.to_string())),
726 }
727}
728
729pub async fn get_trace(
731 State(state): State<DashboardState>,
732 Path(trace_id): Path<String>,
733) -> Json<ApiResponse<TraceDetail>> {
734 let result = sqlx::query(
735 r#"
736 SELECT trace_id, span_id, parent_span_id, name, kind, status,
737 attributes, events, started_at, ended_at, duration_ms
738 FROM forge_traces
739 WHERE trace_id = $1
740 ORDER BY started_at ASC
741 "#,
742 )
743 .bind(&trace_id)
744 .fetch_all(&state.pool)
745 .await;
746
747 match result {
748 Ok(rows) if !rows.is_empty() => {
749 let spans: Vec<SpanDetail> = rows
750 .into_iter()
751 .map(|row| {
752 let attributes: serde_json::Value = row.get("attributes");
753 let events: serde_json::Value = row.get("events");
754 let end_time: Option<DateTime<Utc>> = row.get("ended_at");
755 let duration: Option<i32> = row.get("duration_ms");
756
757 let service = attributes
759 .get("service.name")
760 .and_then(|v| v.as_str())
761 .unwrap_or("unknown")
762 .to_string();
763
764 SpanDetail {
765 span_id: row.get("span_id"),
766 parent_span_id: row.get("parent_span_id"),
767 name: row.get("name"),
768 service,
769 kind: row.get("kind"),
770 status: row.get("status"),
771 start_time: row.get("started_at"),
772 end_time,
773 duration_ms: duration.map(|d| d as u64),
774 attributes: serde_json::from_value(attributes).unwrap_or_default(),
775 events: serde_json::from_value(events).unwrap_or_default(),
776 }
777 })
778 .collect();
779
780 Json(ApiResponse::success(TraceDetail { trace_id, spans }))
781 }
782 Ok(_) => Json(ApiResponse::error(format!(
783 "Trace '{}' not found",
784 trace_id
785 ))),
786 Err(e) => Json(ApiResponse::error(e.to_string())),
787 }
788}
789
790pub async fn list_alerts(
796 State(state): State<DashboardState>,
797 Query(query): Query<PaginationQuery>,
798) -> Json<ApiResponse<Vec<AlertSummary>>> {
799 let limit = query.get_limit();
800 let offset = query.get_offset();
801
802 let result = sqlx::query(
803 r#"
804 SELECT id, rule_id, rule_name, metric_value, threshold, severity, status,
805 triggered_at, resolved_at, acknowledged_at, acknowledged_by
806 FROM forge_alerts
807 ORDER BY triggered_at DESC
808 LIMIT $1 OFFSET $2
809 "#,
810 )
811 .bind(limit)
812 .bind(offset)
813 .fetch_all(&state.pool)
814 .await;
815
816 match result {
817 Ok(rows) => {
818 let alerts: Vec<AlertSummary> = rows
819 .into_iter()
820 .map(|row| {
821 let id: uuid::Uuid = row.get("id");
822 let rule_id: uuid::Uuid = row.get("rule_id");
823 AlertSummary {
824 id: id.to_string(),
825 rule_id: rule_id.to_string(),
826 name: row.get("rule_name"),
827 severity: row.get("severity"),
828 status: row.get("status"),
829 metric_value: row.get("metric_value"),
830 threshold: row.get("threshold"),
831 fired_at: row.get("triggered_at"),
832 resolved_at: row.get("resolved_at"),
833 acknowledged_at: row.get("acknowledged_at"),
834 acknowledged_by: row.get("acknowledged_by"),
835 }
836 })
837 .collect();
838 Json(ApiResponse::success(alerts))
839 }
840 Err(e) => Json(ApiResponse::error(e.to_string())),
841 }
842}
843
844pub async fn get_active_alerts(
846 State(state): State<DashboardState>,
847) -> Json<ApiResponse<Vec<AlertSummary>>> {
848 let result = sqlx::query(
849 r#"
850 SELECT id, rule_id, rule_name, metric_value, threshold, severity, status,
851 triggered_at, resolved_at, acknowledged_at, acknowledged_by
852 FROM forge_alerts
853 WHERE status = 'firing'
854 ORDER BY triggered_at DESC
855 "#,
856 )
857 .fetch_all(&state.pool)
858 .await;
859
860 match result {
861 Ok(rows) => {
862 let alerts: Vec<AlertSummary> = rows
863 .into_iter()
864 .map(|row| {
865 let id: uuid::Uuid = row.get("id");
866 let rule_id: uuid::Uuid = row.get("rule_id");
867 AlertSummary {
868 id: id.to_string(),
869 rule_id: rule_id.to_string(),
870 name: row.get("rule_name"),
871 severity: row.get("severity"),
872 status: row.get("status"),
873 metric_value: row.get("metric_value"),
874 threshold: row.get("threshold"),
875 fired_at: row.get("triggered_at"),
876 resolved_at: row.get("resolved_at"),
877 acknowledged_at: row.get("acknowledged_at"),
878 acknowledged_by: row.get("acknowledged_by"),
879 }
880 })
881 .collect();
882 Json(ApiResponse::success(alerts))
883 }
884 Err(e) => Json(ApiResponse::error(e.to_string())),
885 }
886}
887
888pub async fn list_alert_rules(
894 State(state): State<DashboardState>,
895) -> Json<ApiResponse<Vec<AlertRuleSummary>>> {
896 let result = sqlx::query(
897 r#"
898 SELECT id, name, description, metric_name, condition, threshold, severity, enabled, created_at
899 FROM forge_alert_rules
900 ORDER BY name
901 "#,
902 )
903 .fetch_all(&state.pool)
904 .await;
905
906 match result {
907 Ok(rows) => {
908 let rules: Vec<AlertRuleSummary> = rows
909 .into_iter()
910 .map(|row| {
911 let id: uuid::Uuid = row.get("id");
912 AlertRuleSummary {
913 id: id.to_string(),
914 name: row.get("name"),
915 description: row.get("description"),
916 metric_name: row.get("metric_name"),
917 condition: row.get("condition"),
918 threshold: row.get("threshold"),
919 severity: row.get("severity"),
920 enabled: row.get("enabled"),
921 created_at: row.get("created_at"),
922 }
923 })
924 .collect();
925 Json(ApiResponse::success(rules))
926 }
927 Err(e) => Json(ApiResponse::error(e.to_string())),
928 }
929}
930
931pub async fn get_alert_rule(
933 State(state): State<DashboardState>,
934 Path(id): Path<String>,
935) -> Json<ApiResponse<AlertRuleSummary>> {
936 let rule_id = match uuid::Uuid::parse_str(&id) {
937 Ok(id) => id,
938 Err(_) => return Json(ApiResponse::error("Invalid rule ID")),
939 };
940
941 let result = sqlx::query(
942 r#"
943 SELECT id, name, description, metric_name, condition, threshold, severity, enabled, created_at
944 FROM forge_alert_rules
945 WHERE id = $1
946 "#,
947 )
948 .bind(rule_id)
949 .fetch_optional(&state.pool)
950 .await;
951
952 match result {
953 Ok(Some(row)) => {
954 let id: uuid::Uuid = row.get("id");
955 Json(ApiResponse::success(AlertRuleSummary {
956 id: id.to_string(),
957 name: row.get("name"),
958 description: row.get("description"),
959 metric_name: row.get("metric_name"),
960 condition: row.get("condition"),
961 threshold: row.get("threshold"),
962 severity: row.get("severity"),
963 enabled: row.get("enabled"),
964 created_at: row.get("created_at"),
965 }))
966 }
967 Ok(None) => Json(ApiResponse::error(format!("Rule '{}' not found", id))),
968 Err(e) => Json(ApiResponse::error(e.to_string())),
969 }
970}
971
972pub async fn create_alert_rule(
974 State(state): State<DashboardState>,
975 Json(req): Json<CreateAlertRuleRequest>,
976) -> (StatusCode, Json<ApiResponse<AlertRuleSummary>>) {
977 let id = uuid::Uuid::new_v4();
978 let now = Utc::now();
979 let severity = req.severity.as_deref().unwrap_or("warning");
980 let duration_seconds = req.duration_seconds.unwrap_or(0);
981 let cooldown_seconds = req.cooldown_seconds.unwrap_or(300);
982
983 let result = sqlx::query(
984 r#"
985 INSERT INTO forge_alert_rules
986 (id, name, description, metric_name, condition, threshold, duration_seconds, severity,
987 enabled, labels, notification_channels, cooldown_seconds, created_at, updated_at)
988 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, TRUE, '{}', '{}', $9, $10, $10)
989 "#,
990 )
991 .bind(id)
992 .bind(&req.name)
993 .bind(&req.description)
994 .bind(&req.metric_name)
995 .bind(&req.condition)
996 .bind(req.threshold)
997 .bind(duration_seconds)
998 .bind(severity)
999 .bind(cooldown_seconds)
1000 .bind(now)
1001 .execute(&state.pool)
1002 .await;
1003
1004 match result {
1005 Ok(_) => (
1006 StatusCode::CREATED,
1007 Json(ApiResponse::success(AlertRuleSummary {
1008 id: id.to_string(),
1009 name: req.name,
1010 description: req.description,
1011 metric_name: req.metric_name,
1012 condition: req.condition,
1013 threshold: req.threshold,
1014 severity: severity.to_string(),
1015 enabled: true,
1016 created_at: now,
1017 })),
1018 ),
1019 Err(e) => (
1020 StatusCode::INTERNAL_SERVER_ERROR,
1021 Json(ApiResponse::error(e.to_string())),
1022 ),
1023 }
1024}
1025
1026pub async fn update_alert_rule(
1028 State(state): State<DashboardState>,
1029 Path(id): Path<String>,
1030 Json(req): Json<UpdateAlertRuleRequest>,
1031) -> Json<ApiResponse<AlertRuleSummary>> {
1032 let rule_id = match uuid::Uuid::parse_str(&id) {
1033 Ok(id) => id,
1034 Err(_) => return Json(ApiResponse::error("Invalid rule ID")),
1035 };
1036
1037 let existing = sqlx::query(
1039 r#"
1040 SELECT id, name, description, metric_name, condition, threshold, duration_seconds,
1041 severity, enabled, cooldown_seconds, created_at
1042 FROM forge_alert_rules
1043 WHERE id = $1
1044 "#,
1045 )
1046 .bind(rule_id)
1047 .fetch_optional(&state.pool)
1048 .await;
1049
1050 let existing = match existing {
1051 Ok(Some(row)) => row,
1052 Ok(None) => return Json(ApiResponse::error(format!("Rule '{}' not found", id))),
1053 Err(e) => return Json(ApiResponse::error(e.to_string())),
1054 };
1055
1056 let name: String = req.name.unwrap_or_else(|| existing.get("name"));
1058 let description: Option<String> = req.description.or_else(|| existing.get("description"));
1059 let metric_name: String = req
1060 .metric_name
1061 .unwrap_or_else(|| existing.get("metric_name"));
1062 let condition: String = req.condition.unwrap_or_else(|| existing.get("condition"));
1063 let threshold: f64 = req.threshold.unwrap_or_else(|| existing.get("threshold"));
1064 let duration_seconds: i32 = req
1065 .duration_seconds
1066 .unwrap_or_else(|| existing.get("duration_seconds"));
1067 let severity: String = req.severity.unwrap_or_else(|| existing.get("severity"));
1068 let enabled: bool = req.enabled.unwrap_or_else(|| existing.get("enabled"));
1069 let cooldown_seconds: i32 = req
1070 .cooldown_seconds
1071 .unwrap_or_else(|| existing.get("cooldown_seconds"));
1072 let created_at: DateTime<Utc> = existing.get("created_at");
1073
1074 let result = sqlx::query(
1075 r#"
1076 UPDATE forge_alert_rules
1077 SET name = $2, description = $3, metric_name = $4, condition = $5, threshold = $6,
1078 duration_seconds = $7, severity = $8, enabled = $9, cooldown_seconds = $10,
1079 updated_at = NOW()
1080 WHERE id = $1
1081 "#,
1082 )
1083 .bind(rule_id)
1084 .bind(&name)
1085 .bind(&description)
1086 .bind(&metric_name)
1087 .bind(&condition)
1088 .bind(threshold)
1089 .bind(duration_seconds)
1090 .bind(&severity)
1091 .bind(enabled)
1092 .bind(cooldown_seconds)
1093 .execute(&state.pool)
1094 .await;
1095
1096 match result {
1097 Ok(_) => Json(ApiResponse::success(AlertRuleSummary {
1098 id: rule_id.to_string(),
1099 name,
1100 description,
1101 metric_name,
1102 condition,
1103 threshold,
1104 severity,
1105 enabled,
1106 created_at,
1107 })),
1108 Err(e) => Json(ApiResponse::error(e.to_string())),
1109 }
1110}
1111
1112pub async fn delete_alert_rule(
1114 State(state): State<DashboardState>,
1115 Path(id): Path<String>,
1116) -> (StatusCode, Json<ApiResponse<()>>) {
1117 let rule_id = match uuid::Uuid::parse_str(&id) {
1118 Ok(id) => id,
1119 Err(_) => {
1120 return (
1121 StatusCode::BAD_REQUEST,
1122 Json(ApiResponse::error("Invalid rule ID")),
1123 );
1124 }
1125 };
1126
1127 let result = sqlx::query("DELETE FROM forge_alert_rules WHERE id = $1")
1128 .bind(rule_id)
1129 .execute(&state.pool)
1130 .await;
1131
1132 match result {
1133 Ok(r) if r.rows_affected() > 0 => (StatusCode::OK, Json(ApiResponse::success(()))),
1134 Ok(_) => (
1135 StatusCode::NOT_FOUND,
1136 Json(ApiResponse::error(format!("Rule '{}' not found", id))),
1137 ),
1138 Err(e) => (
1139 StatusCode::INTERNAL_SERVER_ERROR,
1140 Json(ApiResponse::error(e.to_string())),
1141 ),
1142 }
1143}
1144
1145pub async fn acknowledge_alert(
1147 State(state): State<DashboardState>,
1148 Path(id): Path<String>,
1149 Json(req): Json<AcknowledgeAlertRequest>,
1150) -> Json<ApiResponse<()>> {
1151 let alert_id = match uuid::Uuid::parse_str(&id) {
1152 Ok(id) => id,
1153 Err(_) => return Json(ApiResponse::error("Invalid alert ID")),
1154 };
1155
1156 let result = sqlx::query(
1157 r#"
1158 UPDATE forge_alerts
1159 SET acknowledged_at = NOW(), acknowledged_by = $2
1160 WHERE id = $1
1161 "#,
1162 )
1163 .bind(alert_id)
1164 .bind(&req.acknowledged_by)
1165 .execute(&state.pool)
1166 .await;
1167
1168 match result {
1169 Ok(r) if r.rows_affected() > 0 => Json(ApiResponse::success(())),
1170 Ok(_) => Json(ApiResponse::error(format!("Alert '{}' not found", id))),
1171 Err(e) => Json(ApiResponse::error(e.to_string())),
1172 }
1173}
1174
1175pub async fn resolve_alert(
1177 State(state): State<DashboardState>,
1178 Path(id): Path<String>,
1179) -> Json<ApiResponse<()>> {
1180 let alert_id = match uuid::Uuid::parse_str(&id) {
1181 Ok(id) => id,
1182 Err(_) => return Json(ApiResponse::error("Invalid alert ID")),
1183 };
1184
1185 let result = sqlx::query(
1186 r#"
1187 UPDATE forge_alerts
1188 SET status = 'resolved', resolved_at = NOW()
1189 WHERE id = $1
1190 "#,
1191 )
1192 .bind(alert_id)
1193 .execute(&state.pool)
1194 .await;
1195
1196 match result {
1197 Ok(r) if r.rows_affected() > 0 => Json(ApiResponse::success(())),
1198 Ok(_) => Json(ApiResponse::error(format!("Alert '{}' not found", id))),
1199 Err(e) => Json(ApiResponse::error(e.to_string())),
1200 }
1201}
1202
1203pub async fn list_jobs(
1209 State(state): State<DashboardState>,
1210 Query(query): Query<PaginationQuery>,
1211) -> Json<ApiResponse<Vec<serde_json::Value>>> {
1212 let limit = query.get_limit();
1213 let offset = query.get_offset();
1214
1215 let result = sqlx::query(
1216 r#"
1217 SELECT id, job_type, status, priority, attempts, max_attempts,
1218 progress_percent, progress_message,
1219 scheduled_at, created_at, started_at, completed_at, last_error
1220 FROM forge_jobs
1221 ORDER BY created_at DESC
1222 LIMIT $1 OFFSET $2
1223 "#,
1224 )
1225 .bind(limit)
1226 .bind(offset)
1227 .fetch_all(&state.pool)
1228 .await;
1229
1230 match result {
1231 Ok(rows) => {
1232 let jobs: Vec<serde_json::Value> = rows
1233 .into_iter()
1234 .map(|row| {
1235 let id: uuid::Uuid = row.get("id");
1236 serde_json::json!({
1237 "id": id.to_string(),
1238 "job_type": row.get::<String, _>("job_type"),
1239 "status": row.get::<String, _>("status"),
1240 "priority": row.get::<i32, _>("priority"),
1241 "attempts": row.get::<i32, _>("attempts"),
1242 "max_attempts": row.get::<i32, _>("max_attempts"),
1243 "progress_percent": row.get::<Option<i32>, _>("progress_percent"),
1244 "progress_message": row.get::<Option<String>, _>("progress_message"),
1245 "scheduled_at": row.get::<DateTime<Utc>, _>("scheduled_at"),
1246 "created_at": row.get::<DateTime<Utc>, _>("created_at"),
1247 "started_at": row.get::<Option<DateTime<Utc>>, _>("started_at"),
1248 "completed_at": row.get::<Option<DateTime<Utc>>, _>("completed_at"),
1249 "last_error": row.get::<Option<String>, _>("last_error"),
1250 })
1251 })
1252 .collect();
1253 Json(ApiResponse::success(jobs))
1254 }
1255 Err(e) => Json(ApiResponse::error(e.to_string())),
1256 }
1257}
1258
1259pub async fn get_job(
1261 State(state): State<DashboardState>,
1262 Path(id): Path<String>,
1263) -> Json<ApiResponse<JobDetail>> {
1264 let job_id = match uuid::Uuid::parse_str(&id) {
1265 Ok(id) => id,
1266 Err(_) => return Json(ApiResponse::error("Invalid job ID")),
1267 };
1268
1269 let result = sqlx::query(
1270 r#"
1271 SELECT id, job_type, status, priority, attempts, max_attempts,
1272 progress_percent, progress_message, input, output,
1273 scheduled_at, created_at, started_at, completed_at, last_error
1274 FROM forge_jobs
1275 WHERE id = $1
1276 "#,
1277 )
1278 .bind(job_id)
1279 .fetch_optional(&state.pool)
1280 .await;
1281
1282 match result {
1283 Ok(Some(row)) => {
1284 let id: uuid::Uuid = row.get("id");
1285 Json(ApiResponse::success(JobDetail {
1286 id: id.to_string(),
1287 job_type: row.get("job_type"),
1288 status: row.get("status"),
1289 priority: row.get("priority"),
1290 attempts: row.get("attempts"),
1291 max_attempts: row.get("max_attempts"),
1292 progress_percent: row.get("progress_percent"),
1293 progress_message: row.get("progress_message"),
1294 input: row.get("input"),
1295 output: row.get("output"),
1296 scheduled_at: row.get("scheduled_at"),
1297 created_at: row.get("created_at"),
1298 started_at: row.get("started_at"),
1299 completed_at: row.get("completed_at"),
1300 last_error: row.get("last_error"),
1301 }))
1302 }
1303 Ok(None) => Json(ApiResponse::error(format!("Job '{}' not found", id))),
1304 Err(e) => Json(ApiResponse::error(e.to_string())),
1305 }
1306}
1307
1308pub async fn get_job_stats(State(state): State<DashboardState>) -> Json<ApiResponse<JobStats>> {
1310 let result = sqlx::query(
1311 r#"
1312 SELECT
1313 COUNT(*) FILTER (WHERE status = 'pending') as pending,
1314 COUNT(*) FILTER (WHERE status = 'running') as running,
1315 COUNT(*) FILTER (WHERE status = 'completed') as completed,
1316 COUNT(*) FILTER (WHERE status = 'failed') as failed,
1317 COUNT(*) FILTER (WHERE status = 'retry') as retrying,
1318 COUNT(*) FILTER (WHERE status = 'dead_letter') as dead_letter
1319 FROM forge_jobs
1320 "#,
1321 )
1322 .fetch_one(&state.pool)
1323 .await;
1324
1325 match result {
1326 Ok(row) => Json(ApiResponse::success(JobStats {
1327 pending: row.get::<Option<i64>, _>("pending").unwrap_or(0) as u64,
1328 running: row.get::<Option<i64>, _>("running").unwrap_or(0) as u64,
1329 completed: row.get::<Option<i64>, _>("completed").unwrap_or(0) as u64,
1330 failed: row.get::<Option<i64>, _>("failed").unwrap_or(0) as u64,
1331 retrying: row.get::<Option<i64>, _>("retrying").unwrap_or(0) as u64,
1332 dead_letter: row.get::<Option<i64>, _>("dead_letter").unwrap_or(0) as u64,
1333 })),
1334 Err(e) => Json(ApiResponse::error(e.to_string())),
1335 }
1336}
1337
1338pub async fn list_workflows(
1344 State(state): State<DashboardState>,
1345 Query(query): Query<PaginationQuery>,
1346) -> Json<ApiResponse<Vec<WorkflowRun>>> {
1347 let limit = query.get_limit();
1348 let offset = query.get_offset();
1349
1350 let result = sqlx::query(
1351 r#"
1352 SELECT id, workflow_name, version, status, current_step,
1353 started_at, completed_at, error
1354 FROM forge_workflow_runs
1355 ORDER BY started_at DESC
1356 LIMIT $1 OFFSET $2
1357 "#,
1358 )
1359 .bind(limit)
1360 .bind(offset)
1361 .fetch_all(&state.pool)
1362 .await;
1363
1364 match result {
1365 Ok(rows) => {
1366 let workflows: Vec<WorkflowRun> = rows
1367 .into_iter()
1368 .map(|row| {
1369 let id: uuid::Uuid = row.get("id");
1370 WorkflowRun {
1371 id: id.to_string(),
1372 workflow_name: row.get("workflow_name"),
1373 version: row.get("version"),
1374 status: row.get("status"),
1375 current_step: row.get("current_step"),
1376 started_at: row.get("started_at"),
1377 completed_at: row.get("completed_at"),
1378 error: row.get("error"),
1379 }
1380 })
1381 .collect();
1382 Json(ApiResponse::success(workflows))
1383 }
1384 Err(e) => Json(ApiResponse::error(e.to_string())),
1385 }
1386}
1387
1388pub async fn get_workflow(
1390 State(state): State<DashboardState>,
1391 Path(id): Path<String>,
1392) -> Json<ApiResponse<WorkflowDetail>> {
1393 let workflow_id = match uuid::Uuid::parse_str(&id) {
1394 Ok(id) => id,
1395 Err(_) => return Json(ApiResponse::error("Invalid workflow ID")),
1396 };
1397
1398 let run_result = sqlx::query(
1400 r#"
1401 SELECT id, workflow_name, version, status, input, output,
1402 current_step, started_at, completed_at, error
1403 FROM forge_workflow_runs
1404 WHERE id = $1
1405 "#,
1406 )
1407 .bind(workflow_id)
1408 .fetch_optional(&state.pool)
1409 .await;
1410
1411 let run = match run_result {
1412 Ok(Some(row)) => row,
1413 Ok(None) => return Json(ApiResponse::error(format!("Workflow '{}' not found", id))),
1414 Err(e) => return Json(ApiResponse::error(e.to_string())),
1415 };
1416
1417 let steps_result = sqlx::query(
1419 r#"
1420 SELECT step_name, status, result, started_at, completed_at, error
1421 FROM forge_workflow_steps
1422 WHERE workflow_run_id = $1
1423 ORDER BY started_at ASC NULLS LAST
1424 "#,
1425 )
1426 .bind(workflow_id)
1427 .fetch_all(&state.pool)
1428 .await;
1429
1430 let steps = match steps_result {
1431 Ok(rows) => rows
1432 .into_iter()
1433 .map(|row| WorkflowStepDetail {
1434 name: row.get("step_name"),
1435 status: row.get("status"),
1436 result: row.get("result"),
1437 started_at: row.get("started_at"),
1438 completed_at: row.get("completed_at"),
1439 error: row.get("error"),
1440 })
1441 .collect(),
1442 Err(_) => Vec::new(),
1443 };
1444
1445 let run_id: uuid::Uuid = run.get("id");
1446 Json(ApiResponse::success(WorkflowDetail {
1447 id: run_id.to_string(),
1448 workflow_name: run.get("workflow_name"),
1449 version: run.get("version"),
1450 status: run.get("status"),
1451 input: run.get("input"),
1452 output: run.get("output"),
1453 current_step: run.get("current_step"),
1454 steps,
1455 started_at: run.get("started_at"),
1456 completed_at: run.get("completed_at"),
1457 error: run.get("error"),
1458 }))
1459}
1460
1461pub async fn get_workflow_stats(
1463 State(state): State<DashboardState>,
1464) -> Json<ApiResponse<WorkflowStats>> {
1465 let result = sqlx::query(
1466 r#"
1467 SELECT
1468 COUNT(*) FILTER (WHERE status = 'running') as running,
1469 COUNT(*) FILTER (WHERE status = 'completed') as completed,
1470 COUNT(*) FILTER (WHERE status = 'waiting') as waiting,
1471 COUNT(*) FILTER (WHERE status = 'failed') as failed,
1472 COUNT(*) FILTER (WHERE status = 'compensating') as compensating
1473 FROM forge_workflow_runs
1474 "#,
1475 )
1476 .fetch_one(&state.pool)
1477 .await;
1478
1479 match result {
1480 Ok(row) => Json(ApiResponse::success(WorkflowStats {
1481 running: row.get::<Option<i64>, _>("running").unwrap_or(0) as u64,
1482 completed: row.get::<Option<i64>, _>("completed").unwrap_or(0) as u64,
1483 waiting: row.get::<Option<i64>, _>("waiting").unwrap_or(0) as u64,
1484 failed: row.get::<Option<i64>, _>("failed").unwrap_or(0) as u64,
1485 compensating: row.get::<Option<i64>, _>("compensating").unwrap_or(0) as u64,
1486 })),
1487 Err(e) => Json(ApiResponse::error(e.to_string())),
1488 }
1489}
1490
1491pub async fn list_nodes(State(state): State<DashboardState>) -> Json<ApiResponse<Vec<NodeInfo>>> {
1497 let result = sqlx::query(
1498 r#"
1499 SELECT id, hostname, roles, status, last_heartbeat, version, started_at
1500 FROM forge_nodes
1501 ORDER BY started_at DESC
1502 "#,
1503 )
1504 .fetch_all(&state.pool)
1505 .await;
1506
1507 match result {
1508 Ok(rows) => {
1509 let nodes: Vec<NodeInfo> = rows
1510 .into_iter()
1511 .map(|row| {
1512 let id: uuid::Uuid = row.get("id");
1513 let roles: Vec<String> = row.get("roles");
1514 NodeInfo {
1515 id: id.to_string(),
1516 name: row.get("hostname"),
1517 roles,
1518 status: row.get("status"),
1519 last_heartbeat: row.get("last_heartbeat"),
1520 version: row.get::<Option<String>, _>("version").unwrap_or_default(),
1521 started_at: row.get("started_at"),
1522 }
1523 })
1524 .collect();
1525 Json(ApiResponse::success(nodes))
1526 }
1527 Err(e) => Json(ApiResponse::error(e.to_string())),
1528 }
1529}
1530
1531pub async fn get_cluster_health(
1533 State(state): State<DashboardState>,
1534) -> Json<ApiResponse<ClusterHealth>> {
1535 let nodes_result = sqlx::query(
1537 r#"
1538 SELECT
1539 COUNT(*) as total,
1540 COUNT(*) FILTER (WHERE status = 'active' AND last_heartbeat > NOW() - INTERVAL '30 seconds') as healthy
1541 FROM forge_nodes
1542 "#,
1543 )
1544 .fetch_one(&state.pool)
1545 .await;
1546
1547 let leaders_result = sqlx::query(
1549 r#"
1550 SELECT role, node_id
1551 FROM forge_leaders
1552 WHERE lease_until > NOW()
1553 "#,
1554 )
1555 .fetch_all(&state.pool)
1556 .await;
1557
1558 match (nodes_result, leaders_result) {
1559 (Ok(nodes_row), Ok(leader_rows)) => {
1560 let total: i64 = nodes_row.get("total");
1561 let healthy: i64 = nodes_row.get("healthy");
1562
1563 let mut leaders: HashMap<String, String> = HashMap::new();
1564 let mut leader_node: Option<String> = None;
1565
1566 for row in leader_rows {
1567 let role: String = row.get("role");
1568 let node_id: uuid::Uuid = row.get("node_id");
1569 if role == "scheduler" {
1570 leader_node = Some(node_id.to_string());
1571 }
1572 leaders.insert(role, node_id.to_string());
1573 }
1574
1575 let status = if healthy == total && total > 0 {
1576 "healthy"
1577 } else if healthy > 0 {
1578 "degraded"
1579 } else {
1580 "unhealthy"
1581 };
1582
1583 Json(ApiResponse::success(ClusterHealth {
1584 status: status.to_string(),
1585 node_count: total as u32,
1586 healthy_nodes: healthy as u32,
1587 leader_node,
1588 leaders,
1589 }))
1590 }
1591 (Err(e), _) | (_, Err(e)) => Json(ApiResponse::error(e.to_string())),
1592 }
1593}
1594
1595pub async fn get_system_info(State(state): State<DashboardState>) -> Json<ApiResponse<SystemInfo>> {
1601 let started_at = sqlx::query_scalar::<_, DateTime<Utc>>(
1603 "SELECT MIN(started_at) FROM forge_nodes WHERE status = 'active'",
1604 )
1605 .fetch_optional(&state.pool)
1606 .await
1607 .ok()
1608 .flatten()
1609 .unwrap_or_else(Utc::now);
1610
1611 let uptime_seconds = (Utc::now() - started_at).num_seconds().max(0) as u64;
1612
1613 Json(ApiResponse::success(SystemInfo {
1614 version: env!("CARGO_PKG_VERSION").to_string(),
1615 rust_version: env!("CARGO_PKG_RUST_VERSION").to_string(),
1616 started_at,
1617 uptime_seconds,
1618 }))
1619}
1620
1621pub async fn get_system_stats(
1623 State(state): State<DashboardState>,
1624) -> Json<ApiResponse<SystemStats>> {
1625 let jobs_pending =
1627 sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM forge_jobs WHERE status = 'pending'")
1628 .fetch_one(&state.pool)
1629 .await
1630 .unwrap_or(0) as u64;
1631
1632 let active_sessions = sqlx::query_scalar::<_, i64>(
1634 "SELECT COUNT(*) FROM forge_sessions WHERE status = 'connected'",
1635 )
1636 .fetch_one(&state.pool)
1637 .await
1638 .unwrap_or(0) as u32;
1639
1640 let active_subscriptions =
1642 sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM forge_subscriptions")
1643 .fetch_one(&state.pool)
1644 .await
1645 .unwrap_or(0) as u32;
1646
1647 let http_requests_total = sqlx::query_scalar::<_, f64>(
1649 "SELECT COALESCE(SUM(value), 0) FROM forge_metrics WHERE name = 'http_requests_total'",
1650 )
1651 .fetch_optional(&state.pool)
1652 .await
1653 .ok()
1654 .flatten()
1655 .unwrap_or(0.0) as u64;
1656
1657 let http_requests_per_second = sqlx::query_scalar::<_, f64>(
1659 r#"
1660 SELECT COALESCE(SUM(value) / 60.0, 0)
1661 FROM forge_metrics
1662 WHERE name = 'http_requests_total'
1663 AND timestamp > NOW() - INTERVAL '1 minute'
1664 "#,
1665 )
1666 .fetch_optional(&state.pool)
1667 .await
1668 .ok()
1669 .flatten()
1670 .unwrap_or(0.0);
1671
1672 let function_calls_total = sqlx::query_scalar::<_, f64>(
1674 "SELECT COALESCE(value, 0) FROM forge_metrics WHERE name = 'forge_function_calls_total' ORDER BY timestamp DESC LIMIT 1",
1675 )
1676 .fetch_optional(&state.pool)
1677 .await
1678 .ok()
1679 .flatten()
1680 .unwrap_or(0.0) as u64;
1681
1682 let cpu_usage_percent = sqlx::query_scalar::<_, f64>(
1684 "SELECT COALESCE(value, 0) FROM forge_metrics WHERE name = 'forge_system_cpu_usage_percent' ORDER BY timestamp DESC LIMIT 1",
1685 )
1686 .fetch_optional(&state.pool)
1687 .await
1688 .ok()
1689 .flatten()
1690 .unwrap_or(0.0);
1691
1692 let memory_used_bytes = sqlx::query_scalar::<_, f64>(
1694 "SELECT COALESCE(value, 0) FROM forge_metrics WHERE name = 'forge_system_memory_used_bytes' ORDER BY timestamp DESC LIMIT 1",
1695 )
1696 .fetch_optional(&state.pool)
1697 .await
1698 .ok()
1699 .flatten()
1700 .unwrap_or(0.0);
1701 let memory_used_mb = (memory_used_bytes / 1_048_576.0) as u64; let p99_latency_ms: Option<f64> = sqlx::query_scalar::<_, f64>(
1705 r#"
1706 SELECT PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY value) * 1000
1707 FROM forge_metrics
1708 WHERE name = 'http_request_duration_seconds'
1709 AND timestamp > NOW() - INTERVAL '1 hour'
1710 "#,
1711 )
1712 .fetch_optional(&state.pool)
1713 .await
1714 .ok()
1715 .flatten();
1716
1717 Json(ApiResponse::success(SystemStats {
1718 http_requests_total,
1719 http_requests_per_second,
1720 p99_latency_ms,
1721 function_calls_total,
1722 active_connections: active_sessions,
1723 active_subscriptions,
1724 jobs_pending,
1725 memory_used_mb,
1726 cpu_usage_percent,
1727 }))
1728}
1729
1730#[derive(Debug, Clone, Serialize)]
1736pub struct CronSummary {
1737 pub name: String,
1738 pub schedule: String,
1739 pub status: String,
1740 pub last_run: Option<DateTime<Utc>>,
1741 pub last_result: Option<String>,
1742 pub next_run: Option<DateTime<Utc>>,
1743 pub avg_duration_ms: Option<f64>,
1744 pub success_count: i64,
1745 pub failure_count: i64,
1746}
1747
1748#[derive(Debug, Clone, Serialize)]
1750pub struct CronExecution {
1751 pub id: String,
1752 pub cron_name: String,
1753 pub started_at: DateTime<Utc>,
1754 pub finished_at: Option<DateTime<Utc>>,
1755 pub duration_ms: Option<i64>,
1756 pub status: String,
1757 pub error: Option<String>,
1758}
1759
1760#[derive(Debug, Clone, Serialize)]
1762pub struct CronStats {
1763 pub active_count: i64,
1764 pub paused_count: i64,
1765 pub total_executions_24h: i64,
1766 pub success_rate_24h: f64,
1767 pub next_scheduled_run: Option<DateTime<Utc>>,
1768}
1769
1770pub async fn list_crons(
1772 State(state): State<DashboardState>,
1773) -> Json<ApiResponse<Vec<CronSummary>>> {
1774 let result = sqlx::query(
1776 r#"
1777 SELECT
1778 cron_name as name,
1779 MAX(scheduled_time) as last_run_at,
1780 MAX(CASE WHEN status = 'completed' THEN 'success' WHEN status = 'failed' THEN 'failed' ELSE status END) as last_result,
1781 COALESCE(AVG(EXTRACT(EPOCH FROM (completed_at - started_at)) * 1000), 0) as avg_duration_ms,
1782 COUNT(CASE WHEN status = 'completed' THEN 1 END) as success_count,
1783 COUNT(CASE WHEN status = 'failed' THEN 1 END) as failure_count
1784 FROM forge_cron_runs
1785 GROUP BY cron_name
1786 ORDER BY cron_name
1787 "#,
1788 )
1789 .fetch_all(&state.pool)
1790 .await;
1791
1792 match result {
1793 Ok(rows) => {
1794 let crons: Vec<CronSummary> = rows
1795 .into_iter()
1796 .map(|r| CronSummary {
1797 name: r.get("name"),
1798 schedule: "* * * * *".to_string(), status: "active".to_string(),
1800 last_run: r.get("last_run_at"),
1801 last_result: r.get("last_result"),
1802 next_run: None, avg_duration_ms: r.try_get::<f64, _>("avg_duration_ms").ok(),
1804 success_count: r.try_get::<i64, _>("success_count").unwrap_or(0),
1805 failure_count: r.try_get::<i64, _>("failure_count").unwrap_or(0),
1806 })
1807 .collect();
1808 Json(ApiResponse::success(crons))
1809 }
1810 Err(_) => {
1811 Json(ApiResponse::success(vec![]))
1813 }
1814 }
1815}
1816
1817pub async fn get_cron_stats(State(state): State<DashboardState>) -> Json<ApiResponse<CronStats>> {
1819 let stats = sqlx::query(
1821 r#"
1822 SELECT
1823 COUNT(DISTINCT cron_name) as active_count,
1824 0 as paused_count
1825 FROM forge_cron_runs
1826 "#,
1827 )
1828 .fetch_optional(&state.pool)
1829 .await;
1830
1831 let execution_stats = sqlx::query(
1832 r#"
1833 SELECT
1834 COUNT(*) as total,
1835 COUNT(CASE WHEN status = 'completed' THEN 1 END) as success
1836 FROM forge_cron_runs
1837 WHERE started_at > NOW() - INTERVAL '24 hours'
1838 "#,
1839 )
1840 .fetch_optional(&state.pool)
1841 .await;
1842
1843 match (stats, execution_stats) {
1844 (Ok(Some(s)), Ok(Some(e))) => {
1845 let total = e.try_get::<i64, _>("total").unwrap_or(0) as f64;
1846 let success = e.try_get::<i64, _>("success").unwrap_or(0) as f64;
1847 let success_rate = if total > 0.0 {
1848 success / total * 100.0
1849 } else {
1850 100.0
1851 };
1852
1853 Json(ApiResponse::success(CronStats {
1854 active_count: s.try_get::<i64, _>("active_count").unwrap_or(0),
1855 paused_count: s.try_get::<i64, _>("paused_count").unwrap_or(0),
1856 total_executions_24h: e.try_get::<i64, _>("total").unwrap_or(0),
1857 success_rate_24h: success_rate,
1858 next_scheduled_run: None, }))
1860 }
1861 _ => Json(ApiResponse::success(CronStats {
1862 active_count: 0,
1863 paused_count: 0,
1864 total_executions_24h: 0,
1865 success_rate_24h: 100.0,
1866 next_scheduled_run: None,
1867 })),
1868 }
1869}
1870
1871pub async fn get_cron_history(
1873 State(state): State<DashboardState>,
1874 Query(pagination): Query<PaginationQuery>,
1875) -> Json<ApiResponse<Vec<CronExecution>>> {
1876 let limit = pagination.get_limit();
1877 let offset = pagination.get_offset();
1878
1879 let result = sqlx::query(
1881 r#"
1882 SELECT
1883 id::text as id,
1884 cron_name,
1885 started_at,
1886 completed_at as finished_at,
1887 EXTRACT(EPOCH FROM (completed_at - started_at)) * 1000 as duration_ms,
1888 CASE WHEN status = 'completed' THEN 'success' ELSE status END as status,
1889 error
1890 FROM forge_cron_runs
1891 ORDER BY started_at DESC
1892 LIMIT $1 OFFSET $2
1893 "#,
1894 )
1895 .bind(limit)
1896 .bind(offset)
1897 .fetch_all(&state.pool)
1898 .await;
1899
1900 match result {
1901 Ok(rows) => {
1902 let executions: Vec<CronExecution> = rows
1903 .into_iter()
1904 .map(|r| CronExecution {
1905 id: r.try_get::<String, _>("id").unwrap_or_default(),
1906 cron_name: r.get("cron_name"),
1907 started_at: r.get("started_at"),
1908 finished_at: r.try_get("finished_at").ok(),
1909 duration_ms: r.try_get::<f64, _>("duration_ms").ok().map(|d| d as i64),
1910 status: r.get("status"),
1911 error: r.try_get("error").ok(),
1912 })
1913 .collect();
1914 Json(ApiResponse::success(executions))
1915 }
1916 Err(_) => Json(ApiResponse::success(vec![])),
1917 }
1918}
1919
1920pub async fn trigger_cron(
1922 State(_state): State<DashboardState>,
1923 Path(name): Path<String>,
1924) -> Json<ApiResponse<()>> {
1925 tracing::info!(cron = %name, "Manual cron trigger requested");
1927 Json(ApiResponse::success(()))
1928}
1929
1930pub async fn pause_cron(
1932 State(state): State<DashboardState>,
1933 Path(name): Path<String>,
1934) -> Json<ApiResponse<()>> {
1935 let result = sqlx::query("UPDATE forge_crons SET status = 'paused' WHERE name = $1")
1936 .bind(&name)
1937 .execute(&state.pool)
1938 .await;
1939
1940 match result {
1941 Ok(_) => {
1942 tracing::info!(cron = %name, "Cron paused");
1943 Json(ApiResponse::success(()))
1944 }
1945 Err(e) => Json(ApiResponse::error(format!("Failed to pause cron: {}", e))),
1946 }
1947}
1948
1949pub async fn resume_cron(
1951 State(state): State<DashboardState>,
1952 Path(name): Path<String>,
1953) -> Json<ApiResponse<()>> {
1954 let result = sqlx::query("UPDATE forge_crons SET status = 'active' WHERE name = $1")
1955 .bind(&name)
1956 .execute(&state.pool)
1957 .await;
1958
1959 match result {
1960 Ok(_) => {
1961 tracing::info!(cron = %name, "Cron resumed");
1962 Json(ApiResponse::success(()))
1963 }
1964 Err(e) => Json(ApiResponse::error(format!("Failed to resume cron: {}", e))),
1965 }
1966}
1967
1968#[derive(Debug, Clone, Serialize)]
1974pub struct RegisteredJob {
1975 pub name: String,
1976 pub max_attempts: u32,
1977 pub priority: String,
1978 pub timeout_secs: u64,
1979 pub worker_capability: Option<String>,
1980}
1981
1982#[derive(Debug, Clone, Serialize)]
1984pub struct RegisteredCron {
1985 pub name: String,
1986 pub schedule: String,
1987 pub timezone: String,
1988 pub catch_up: bool,
1989 pub timeout_secs: u64,
1990}
1991
1992#[derive(Debug, Clone, Serialize)]
1994pub struct RegisteredWorkflow {
1995 pub name: String,
1996 pub version: u32,
1997 pub timeout_secs: u64,
1998 pub deprecated: bool,
1999}
2000
2001pub async fn list_registered_jobs(
2003 State(state): State<DashboardState>,
2004) -> Json<ApiResponse<Vec<RegisteredJob>>> {
2005 let jobs: Vec<RegisteredJob> = state
2006 .job_registry
2007 .jobs()
2008 .map(|(_, entry)| RegisteredJob {
2009 name: entry.info.name.to_string(),
2010 max_attempts: entry.info.retry.max_attempts,
2011 priority: format!("{:?}", entry.info.priority),
2012 timeout_secs: entry.info.timeout.as_secs(),
2013 worker_capability: entry.info.worker_capability.map(|s| s.to_string()),
2014 })
2015 .collect();
2016 Json(ApiResponse::success(jobs))
2017}
2018
2019pub async fn list_registered_crons(
2021 State(state): State<DashboardState>,
2022) -> Json<ApiResponse<Vec<RegisteredCron>>> {
2023 let crons: Vec<RegisteredCron> = state
2024 .cron_registry
2025 .list()
2026 .into_iter()
2027 .map(|entry| RegisteredCron {
2028 name: entry.info.name.to_string(),
2029 schedule: entry.info.schedule.expression().to_string(),
2030 timezone: entry.info.timezone.to_string(),
2031 catch_up: entry.info.catch_up,
2032 timeout_secs: entry.info.timeout.as_secs(),
2033 })
2034 .collect();
2035 Json(ApiResponse::success(crons))
2036}
2037
2038pub async fn list_registered_workflows(
2040 State(state): State<DashboardState>,
2041) -> Json<ApiResponse<Vec<RegisteredWorkflow>>> {
2042 let workflows: Vec<RegisteredWorkflow> = state
2043 .workflow_registry
2044 .list()
2045 .into_iter()
2046 .map(|entry| RegisteredWorkflow {
2047 name: entry.info.name.to_string(),
2048 version: entry.info.version,
2049 timeout_secs: entry.info.timeout.as_secs(),
2050 deprecated: entry.info.deprecated,
2051 })
2052 .collect();
2053 Json(ApiResponse::success(workflows))
2054}
2055
2056#[derive(Debug, Deserialize)]
2060pub struct DispatchJobRequest {
2061 #[serde(default)]
2063 pub args: serde_json::Value,
2064}
2065
2066#[derive(Debug, Serialize)]
2068pub struct DispatchJobResponse {
2069 pub job_id: uuid::Uuid,
2071}
2072
2073#[derive(Debug, Deserialize)]
2075pub struct StartWorkflowRequest {
2076 #[serde(default)]
2078 pub input: serde_json::Value,
2079}
2080
2081#[derive(Debug, Serialize)]
2083pub struct StartWorkflowResponse {
2084 pub workflow_id: uuid::Uuid,
2086}
2087
2088pub async fn dispatch_job(
2090 State(state): State<DashboardState>,
2091 Path(job_type): Path<String>,
2092 Json(request): Json<DispatchJobRequest>,
2093) -> (StatusCode, Json<ApiResponse<DispatchJobResponse>>) {
2094 let dispatcher = match &state.job_dispatcher {
2095 Some(d) => d,
2096 None => {
2097 return (
2098 StatusCode::SERVICE_UNAVAILABLE,
2099 Json(ApiResponse::error("Job dispatcher not available")),
2100 );
2101 }
2102 };
2103
2104 match dispatcher.dispatch_by_name(&job_type, request.args).await {
2105 Ok(job_id) => (
2106 StatusCode::OK,
2107 Json(ApiResponse::success(DispatchJobResponse { job_id })),
2108 ),
2109 Err(e) => (
2110 StatusCode::BAD_REQUEST,
2111 Json(ApiResponse::error(e.to_string())),
2112 ),
2113 }
2114}
2115
2116pub async fn start_workflow(
2118 State(state): State<DashboardState>,
2119 Path(workflow_name): Path<String>,
2120 Json(request): Json<StartWorkflowRequest>,
2121) -> (StatusCode, Json<ApiResponse<StartWorkflowResponse>>) {
2122 let executor = match &state.workflow_executor {
2123 Some(e) => e,
2124 None => {
2125 return (
2126 StatusCode::SERVICE_UNAVAILABLE,
2127 Json(ApiResponse::error("Workflow executor not available")),
2128 );
2129 }
2130 };
2131
2132 match executor.start_by_name(&workflow_name, request.input).await {
2133 Ok(workflow_id) => (
2134 StatusCode::OK,
2135 Json(ApiResponse::success(StartWorkflowResponse { workflow_id })),
2136 ),
2137 Err(e) => (
2138 StatusCode::BAD_REQUEST,
2139 Json(ApiResponse::error(e.to_string())),
2140 ),
2141 }
2142}
2143
2144#[cfg(test)]
2145mod tests {
2146 use super::*;
2147
2148 #[test]
2149 fn test_api_response_success() {
2150 let response: ApiResponse<String> = ApiResponse::success("test".to_string());
2151 assert!(response.success);
2152 assert_eq!(response.data, Some("test".to_string()));
2153 assert!(response.error.is_none());
2154 }
2155
2156 #[test]
2157 fn test_api_response_error() {
2158 let response: ApiResponse<String> = ApiResponse::error("failed");
2159 assert!(!response.success);
2160 assert!(response.data.is_none());
2161 assert_eq!(response.error, Some("failed".to_string()));
2162 }
2163
2164 #[test]
2165 fn test_time_range_query_defaults() {
2166 let query = TimeRangeQuery {
2167 start: None,
2168 end: None,
2169 period: None,
2170 };
2171 let (start, end) = query.get_range();
2172 assert!(end > start);
2173 assert!((end - start).num_hours() == 1);
2174 }
2175
2176 #[test]
2177 fn test_time_range_query_period() {
2178 let query = TimeRangeQuery {
2179 start: None,
2180 end: None,
2181 period: Some("24h".to_string()),
2182 };
2183 let (start, end) = query.get_range();
2184 assert!((end - start).num_hours() == 24);
2185 }
2186
2187 #[test]
2188 fn test_pagination_query() {
2189 let query = PaginationQuery {
2190 page: Some(2),
2191 limit: Some(20),
2192 };
2193 assert_eq!(query.get_limit(), 20);
2194 assert_eq!(query.get_offset(), 20);
2195 }
2196}