Skip to main content

mockforge_http/handlers/
forecasting.rs

1//! HTTP handlers for API change forecasting
2//!
3//! This module provides endpoints for querying and managing forecasts.
4
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::Json,
9};
10use mockforge_core::contract_drift::forecasting::{ChangeForecast, Forecaster};
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13
14#[cfg(feature = "database")]
15use chrono::{DateTime, Utc};
16#[cfg(feature = "database")]
17use mockforge_core::contract_drift::forecasting::SeasonalPattern;
18#[cfg(feature = "database")]
19use uuid::Uuid;
20
21use crate::database::Database;
22
23/// Helper function to map database row to ChangeForecast
24#[cfg(feature = "database")]
25fn map_row_to_change_forecast(row: &sqlx::postgres::PgRow) -> Result<ChangeForecast, sqlx::Error> {
26    use sqlx::Row;
27
28    let service_id: Option<String> = row.try_get("service_id")?;
29    let service_name: Option<String> = row.try_get("service_name")?;
30    let endpoint: String = row.try_get("endpoint")?;
31    let method: String = row.try_get("method")?;
32    let forecast_window_days: i32 = row.try_get("forecast_window_days")?;
33    let predicted_change_probability: f64 = row.try_get("predicted_change_probability")?;
34    let predicted_break_probability: f64 = row.try_get("predicted_break_probability")?;
35    let next_expected_change_date: Option<DateTime<Utc>> =
36        row.try_get("next_expected_change_date")?;
37    let next_expected_break_date: Option<DateTime<Utc>> =
38        row.try_get("next_expected_break_date")?;
39    let volatility_score: f64 = row.try_get("volatility_score")?;
40    let confidence: f64 = row.try_get("confidence")?;
41    let seasonal_patterns_json: serde_json::Value =
42        row.try_get("seasonal_patterns").unwrap_or_default();
43    let predicted_at: DateTime<Utc> = row.try_get("predicted_at")?;
44    let expires_at: DateTime<Utc> = row.try_get("expires_at")?;
45
46    // Parse seasonal patterns from JSONB
47    let seasonal_patterns: Vec<SeasonalPattern> = if seasonal_patterns_json.is_array() {
48        serde_json::from_value(seasonal_patterns_json).unwrap_or_default()
49    } else {
50        Vec::new()
51    };
52
53    Ok(ChangeForecast {
54        service_id,
55        service_name,
56        endpoint,
57        method,
58        forecast_window_days: forecast_window_days as u32,
59        predicted_change_probability,
60        predicted_break_probability,
61        next_expected_change_date,
62        next_expected_break_date,
63        volatility_score,
64        confidence,
65        seasonal_patterns,
66        predicted_at,
67        expires_at,
68    })
69}
70
71/// State for forecasting handlers
72#[derive(Clone)]
73pub struct ForecastingState {
74    /// Forecaster engine
75    pub forecaster: Arc<Forecaster>,
76    /// Database connection (optional)
77    pub database: Option<Database>,
78}
79
80/// Query parameters for listing forecasts
81#[derive(Debug, Deserialize)]
82pub struct ListForecastsQuery {
83    /// Workspace ID filter
84    pub workspace_id: Option<String>,
85    /// Service ID filter
86    pub service_id: Option<String>,
87    /// Endpoint filter
88    pub endpoint: Option<String>,
89    /// Method filter
90    pub method: Option<String>,
91    /// Forecast window (30, 90, or 180 days)
92    pub window_days: Option<u32>,
93}
94
95/// Response for forecast list
96#[derive(Debug, Serialize)]
97pub struct ForecastListResponse {
98    /// Forecasts
99    pub forecasts: Vec<ChangeForecast>,
100    /// Total count
101    pub total: usize,
102}
103
104/// Get forecasts
105///
106/// GET /api/v1/forecasts
107#[cfg(feature = "database")]
108pub async fn list_forecasts(
109    State(state): State<ForecastingState>,
110    Query(params): Query<ListForecastsQuery>,
111) -> Result<Json<ForecastListResponse>, StatusCode> {
112    let pool = match state.database.as_ref().and_then(|db| db.pool()) {
113        Some(pool) => pool,
114        None => {
115            return Ok(Json(ForecastListResponse {
116                forecasts: Vec::new(),
117                total: 0,
118            }));
119        }
120    };
121
122    // Build query with filters
123    let mut query = String::from(
124        "SELECT id, workspace_id, service_id, service_name, endpoint, method,
125         forecast_window_days, predicted_change_probability, predicted_break_probability,
126         next_expected_change_date, next_expected_break_date, volatility_score, confidence,
127         seasonal_patterns, predicted_at, expires_at
128         FROM api_change_forecasts WHERE expires_at > NOW()",
129    );
130
131    let mut bind_index = 1;
132
133    if params.workspace_id.is_some() {
134        query.push_str(&format!(" AND workspace_id = ${}", bind_index));
135        bind_index += 1;
136    }
137
138    if params.service_id.is_some() {
139        query.push_str(&format!(" AND service_id = ${}", bind_index));
140        bind_index += 1;
141    }
142
143    if params.endpoint.is_some() {
144        query.push_str(&format!(" AND endpoint = ${}", bind_index));
145        bind_index += 1;
146    }
147
148    if params.method.is_some() {
149        query.push_str(&format!(" AND method = ${}", bind_index));
150        bind_index += 1;
151    }
152
153    if let Some(_window) = params.window_days {
154        query.push_str(&format!(" AND forecast_window_days = ${}", bind_index));
155    }
156
157    query.push_str(" ORDER BY predicted_at DESC LIMIT 100");
158
159    // Build query with proper bindings using sqlx
160    let mut query_builder = sqlx::query(&query);
161
162    if let Some(ws_id) = &params.workspace_id {
163        let uuid = Uuid::parse_str(ws_id).ok();
164        query_builder = query_builder.bind(uuid);
165    }
166
167    if let Some(svc_id) = &params.service_id {
168        query_builder = query_builder.bind(svc_id);
169    }
170
171    if let Some(ep) = &params.endpoint {
172        query_builder = query_builder.bind(ep);
173    }
174
175    if let Some(m) = &params.method {
176        query_builder = query_builder.bind(m);
177    }
178
179    if let Some(window) = params.window_days {
180        query_builder = query_builder.bind(window as i32);
181    }
182
183    // Execute query
184    let rows = query_builder.fetch_all(pool).await.map_err(|e| {
185        tracing::error!("Failed to query forecasts: {}", e);
186        StatusCode::INTERNAL_SERVER_ERROR
187    })?;
188
189    // Map rows to ChangeForecast
190    let mut forecasts = Vec::new();
191    for row in rows {
192        match map_row_to_change_forecast(&row) {
193            Ok(forecast) => forecasts.push(forecast),
194            Err(e) => {
195                tracing::warn!("Failed to map forecast row: {}", e);
196                continue;
197            }
198        }
199    }
200
201    let total = forecasts.len();
202    Ok(Json(ForecastListResponse { forecasts, total }))
203}
204
205/// List forecasts (no database)
206///
207/// GET /api/v1/forecasts
208#[cfg(not(feature = "database"))]
209pub async fn list_forecasts(
210    State(_state): State<ForecastingState>,
211    Query(_params): Query<ListForecastsQuery>,
212) -> Result<Json<ForecastListResponse>, StatusCode> {
213    Ok(Json(ForecastListResponse {
214        forecasts: Vec::new(),
215        total: 0,
216    }))
217}
218
219/// Get service-level forecasts
220///
221/// GET /api/v1/forecasts/service/{service_id}
222#[cfg(feature = "database")]
223pub async fn get_service_forecasts(
224    State(state): State<ForecastingState>,
225    Path(service_id): Path<String>,
226    Query(_params): Query<ListForecastsQuery>,
227) -> Result<Json<ForecastListResponse>, StatusCode> {
228    let pool = match state.database.as_ref().and_then(|db| db.pool()) {
229        Some(pool) => pool,
230        None => {
231            return Ok(Json(ForecastListResponse {
232                forecasts: Vec::new(),
233                total: 0,
234            }));
235        }
236    };
237
238    // Query forecasts for this service
239    let rows = sqlx::query(
240        "SELECT * FROM api_change_forecasts
241         WHERE service_id = $1 AND expires_at > NOW()
242         ORDER BY predicted_at DESC LIMIT 50",
243    )
244    .bind(&service_id)
245    .fetch_all(pool)
246    .await
247    .map_err(|e| {
248        tracing::error!("Failed to query service forecasts: {}", e);
249        StatusCode::INTERNAL_SERVER_ERROR
250    })?;
251
252    // Map rows to forecasts
253    let mut forecasts = Vec::new();
254    for row in rows {
255        match map_row_to_change_forecast(&row) {
256            Ok(forecast) => forecasts.push(forecast),
257            Err(e) => {
258                tracing::warn!("Failed to map service forecast row: {}", e);
259                continue;
260            }
261        }
262    }
263
264    let total = forecasts.len();
265    Ok(Json(ForecastListResponse { forecasts, total }))
266}
267
268/// Get service-level forecasts (no database)
269///
270/// GET /api/v1/forecasts/service/{service_id}
271#[cfg(not(feature = "database"))]
272pub async fn get_service_forecasts(
273    State(_state): State<ForecastingState>,
274    Path(_service_id): Path<String>,
275    Query(_params): Query<ListForecastsQuery>,
276) -> Result<Json<ForecastListResponse>, StatusCode> {
277    Ok(Json(ForecastListResponse {
278        forecasts: Vec::new(),
279        total: 0,
280    }))
281}
282
283/// Get endpoint-level forecasts
284///
285/// GET /api/v1/forecasts/endpoint/{endpoint}
286#[cfg(feature = "database")]
287pub async fn get_endpoint_forecasts(
288    State(state): State<ForecastingState>,
289    Path(endpoint): Path<String>,
290    Query(params): Query<ListForecastsQuery>,
291) -> Result<Json<ForecastListResponse>, StatusCode> {
292    let pool = match state.database.as_ref().and_then(|db| db.pool()) {
293        Some(pool) => pool,
294        None => {
295            return Ok(Json(ForecastListResponse {
296                forecasts: Vec::new(),
297                total: 0,
298            }));
299        }
300    };
301
302    let method = params.method.as_deref().unwrap_or("%");
303
304    let rows = sqlx::query(
305        "SELECT * FROM api_change_forecasts
306         WHERE endpoint = $1 AND method LIKE $2 AND expires_at > NOW()
307         ORDER BY predicted_at DESC LIMIT 50",
308    )
309    .bind(&endpoint)
310    .bind(method)
311    .fetch_all(pool)
312    .await
313    .map_err(|e| {
314        tracing::error!("Failed to query endpoint forecasts: {}", e);
315        StatusCode::INTERNAL_SERVER_ERROR
316    })?;
317
318    // Map rows to forecasts
319    let mut forecasts = Vec::new();
320    for row in rows {
321        match map_row_to_change_forecast(&row) {
322            Ok(forecast) => forecasts.push(forecast),
323            Err(e) => {
324                tracing::warn!("Failed to map endpoint forecast row: {}", e);
325                continue;
326            }
327        }
328    }
329
330    let total = forecasts.len();
331    Ok(Json(ForecastListResponse { forecasts, total }))
332}
333
334/// Get endpoint-level forecasts (no database)
335///
336/// GET /api/v1/forecasts/endpoint/{endpoint}
337#[cfg(not(feature = "database"))]
338pub async fn get_endpoint_forecasts(
339    State(_state): State<ForecastingState>,
340    Path(_endpoint): Path<String>,
341    Query(_params): Query<ListForecastsQuery>,
342) -> Result<Json<ForecastListResponse>, StatusCode> {
343    Ok(Json(ForecastListResponse {
344        forecasts: Vec::new(),
345        total: 0,
346    }))
347}
348
349/// Request to refresh forecasts
350#[derive(Debug, Deserialize)]
351pub struct RefreshForecastsRequest {
352    /// Workspace ID
353    pub workspace_id: Option<String>,
354    /// Service ID
355    pub service_id: Option<String>,
356    /// Endpoint (optional)
357    pub endpoint: Option<String>,
358    /// Method (optional)
359    pub method: Option<String>,
360}
361
362/// Refresh forecasts
363///
364/// POST /api/v1/forecasts/refresh
365#[cfg(feature = "database")]
366pub async fn refresh_forecasts(
367    State(state): State<ForecastingState>,
368    Json(request): Json<RefreshForecastsRequest>,
369) -> Result<Json<serde_json::Value>, StatusCode> {
370    let pool = match state.database.as_ref().and_then(|db| db.pool()) {
371        Some(pool) => pool,
372        None => {
373            return Ok(Json(serde_json::json!({
374                "success": false,
375                "error": "Database not available"
376            })));
377        }
378    };
379
380    // Query historical incidents for forecasting
381    let mut incident_query = String::from(
382        "SELECT id, workspace_id, endpoint, method, incident_type, severity, status,
383         detected_at, details, created_at, updated_at
384         FROM drift_incidents WHERE 1=1",
385    );
386
387    if let Some(_ws_id) = &request.workspace_id {
388        incident_query.push_str(" AND workspace_id = $1");
389    }
390
391    // Execute query to get incidents
392    let rows = sqlx::query(&incident_query).fetch_all(pool).await.map_err(|e| {
393        tracing::error!("Failed to query drift incidents: {}", e);
394        StatusCode::INTERNAL_SERVER_ERROR
395    })?;
396
397    // Map rows to DriftIncident and generate forecasts
398    use mockforge_core::incidents::types::{IncidentSeverity, IncidentStatus, IncidentType};
399    use sqlx::Row;
400    let mut incidents = Vec::new();
401    for row in rows {
402        let id: uuid::Uuid = row.try_get("id").map_err(|e| {
403            tracing::error!("Failed to get id from row: {}", e);
404            StatusCode::INTERNAL_SERVER_ERROR
405        })?;
406        let workspace_id: Option<uuid::Uuid> = row.try_get("workspace_id").ok();
407        let endpoint: String = match row.try_get("endpoint") {
408            Ok(e) => e,
409            Err(_) => continue,
410        };
411        let method: String = match row.try_get("method") {
412            Ok(m) => m,
413            Err(_) => continue,
414        };
415        let incident_type_str: String = match row.try_get("incident_type") {
416            Ok(s) => s,
417            Err(_) => continue,
418        };
419        let severity_str: String = match row.try_get("severity") {
420            Ok(s) => s,
421            Err(_) => continue,
422        };
423        let status_str: String = match row.try_get("status") {
424            Ok(s) => s,
425            Err(_) => continue,
426        };
427        let detected_at: DateTime<Utc> = match row.try_get("detected_at") {
428            Ok(dt) => dt,
429            Err(_) => continue,
430        };
431        let details_json: serde_json::Value = row.try_get("details").unwrap_or_default();
432        let created_at: DateTime<Utc> = match row.try_get("created_at") {
433            Ok(dt) => dt,
434            Err(_) => continue,
435        };
436        let updated_at: DateTime<Utc> = match row.try_get("updated_at") {
437            Ok(dt) => dt,
438            Err(_) => continue,
439        };
440
441        let incident_type = match incident_type_str.as_str() {
442            "breaking_change" => IncidentType::BreakingChange,
443            "threshold_exceeded" => IncidentType::ThresholdExceeded,
444            _ => continue, // Skip invalid types
445        };
446
447        let severity = match severity_str.as_str() {
448            "low" => IncidentSeverity::Low,
449            "medium" => IncidentSeverity::Medium,
450            "high" => IncidentSeverity::High,
451            "critical" => IncidentSeverity::Critical,
452            _ => continue, // Skip invalid severity
453        };
454
455        let status = match status_str.as_str() {
456            "open" => IncidentStatus::Open,
457            "acknowledged" => IncidentStatus::Acknowledged,
458            "resolved" => IncidentStatus::Resolved,
459            "closed" => IncidentStatus::Closed,
460            _ => continue, // Skip invalid status
461        };
462
463        incidents.push(DriftIncident {
464            id: id.to_string(),
465            budget_id: None,
466            workspace_id: workspace_id.map(|u| u.to_string()),
467            endpoint,
468            method,
469            incident_type,
470            severity,
471            status,
472            detected_at: detected_at.timestamp(),
473            resolved_at: None,
474            details: details_json,
475            external_ticket_id: None,
476            external_ticket_url: None,
477            created_at: created_at.timestamp(),
478            updated_at: updated_at.timestamp(),
479            sync_cycle_id: None,
480            contract_diff_id: None,
481            before_sample: None,
482            after_sample: None,
483            fitness_test_results: Vec::new(),
484            affected_consumers: None,
485            protocol: None,
486        });
487    }
488
489    // Generate forecasts from incidents by grouping by endpoint/method
490    use mockforge_core::incidents::types::DriftIncident;
491    use std::collections::HashMap;
492    let mut forecasts_generated = 0;
493    let mut endpoint_groups: HashMap<(String, String), Vec<DriftIncident>> = HashMap::new();
494
495    for incident in incidents {
496        endpoint_groups
497            .entry((incident.endpoint.clone(), incident.method.clone()))
498            .or_default()
499            .push(incident);
500    }
501
502    for ((endpoint, method), group_incidents) in endpoint_groups {
503        if let Some(forecast) = state.forecaster.generate_forecast(
504            &group_incidents,
505            request.workspace_id.clone(),
506            None, // service_id
507            None, // service_name
508            endpoint,
509            method,
510            30, // forecast_window_days
511        ) {
512            // Persist forecast to database
513            if let Err(e) = store_forecast(pool, &forecast, request.workspace_id.as_deref()).await {
514                tracing::warn!("Failed to store forecast: {}", e);
515            }
516            forecasts_generated += 1;
517        }
518    }
519
520    Ok(Json(serde_json::json!({
521        "success": true,
522        "message": "Forecasts refreshed",
523        "forecasts_generated": forecasts_generated
524    })))
525}
526
527/// Refresh forecasts (no database)
528///
529/// POST /api/v1/forecasts/refresh
530#[cfg(not(feature = "database"))]
531pub async fn refresh_forecasts(
532    State(_state): State<ForecastingState>,
533    Json(_request): Json<RefreshForecastsRequest>,
534) -> Result<Json<serde_json::Value>, StatusCode> {
535    Ok(Json(serde_json::json!({
536        "success": false,
537        "error": "Database not available"
538    })))
539}
540
541/// Store a forecast in the database
542#[cfg(feature = "database")]
543pub async fn store_forecast(
544    pool: &sqlx::PgPool,
545    forecast: &ChangeForecast,
546    workspace_id: Option<&str>,
547) -> Result<(), sqlx::Error> {
548    let id = Uuid::new_v4();
549    let workspace_uuid = workspace_id.and_then(|id| Uuid::parse_str(id).ok());
550
551    sqlx::query(
552        r#"
553        INSERT INTO api_change_forecasts (
554            id, workspace_id, service_id, service_name, endpoint, method,
555            forecast_window_days, predicted_change_probability, predicted_break_probability,
556            next_expected_change_date, next_expected_break_date, volatility_score, confidence,
557            seasonal_patterns, predicted_at, expires_at
558        ) VALUES (
559            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16
560        )
561        ON CONFLICT (workspace_id, service_id, endpoint, method, forecast_window_days)
562        DO UPDATE SET
563            predicted_change_probability = EXCLUDED.predicted_change_probability,
564            predicted_break_probability = EXCLUDED.predicted_break_probability,
565            next_expected_change_date = EXCLUDED.next_expected_change_date,
566            next_expected_break_date = EXCLUDED.next_expected_break_date,
567            volatility_score = EXCLUDED.volatility_score,
568            confidence = EXCLUDED.confidence,
569            seasonal_patterns = EXCLUDED.seasonal_patterns,
570            predicted_at = EXCLUDED.predicted_at,
571            expires_at = EXCLUDED.expires_at,
572            updated_at = NOW()
573        "#,
574    )
575    .bind(id)
576    .bind(workspace_uuid)
577    .bind(forecast.service_id.as_deref())
578    .bind(forecast.service_name.as_deref())
579    .bind(&forecast.endpoint)
580    .bind(&forecast.method)
581    .bind(forecast.forecast_window_days as i32)
582    .bind(forecast.predicted_change_probability)
583    .bind(forecast.predicted_break_probability)
584    .bind(forecast.next_expected_change_date)
585    .bind(forecast.next_expected_break_date)
586    .bind(forecast.volatility_score)
587    .bind(forecast.confidence)
588    .bind(serde_json::to_value(&forecast.seasonal_patterns).unwrap_or_default())
589    .bind(forecast.predicted_at)
590    .bind(forecast.expires_at)
591    .execute(pool)
592    .await?;
593
594    Ok(())
595}
596
597/// Create router for forecasting endpoints
598pub fn forecasting_router(state: ForecastingState) -> axum::Router {
599    use axum::routing::{get, post};
600    use axum::Router;
601
602    Router::new()
603        .route("/api/v1/forecasts", get(list_forecasts))
604        .route("/api/v1/forecasts/service/{service_id}", get(get_service_forecasts))
605        .route("/api/v1/forecasts/endpoint/{endpoint}", get(get_endpoint_forecasts))
606        .route("/api/v1/forecasts/refresh", post(refresh_forecasts))
607        .with_state(state)
608}