Skip to main content

mockforge_http/handlers/
forecasting.rs

1//! HTTP handlers for API change forecasting
2//!
3//! This module provides endpoints for querying and managing forecasts.
4
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::Json,
9};
10use mockforge_contracts::contract_drift::forecasting::{ChangeForecast, Forecaster};
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13
14#[cfg(feature = "database")]
15use chrono::{DateTime, Utc};
16#[cfg(feature = "database")]
17use mockforge_contracts::contract_drift::forecasting::SeasonalPattern;
18#[cfg(feature = "database")]
19use uuid::Uuid;
20
21#[cfg(feature = "database")]
22use crate::database::Database;
23
24/// Helper function to map database row to ChangeForecast
25#[cfg(feature = "database")]
26fn map_row_to_change_forecast(row: &sqlx::postgres::PgRow) -> Result<ChangeForecast, sqlx::Error> {
27    use sqlx::Row;
28
29    let service_id: Option<String> = row.try_get("service_id")?;
30    let service_name: Option<String> = row.try_get("service_name")?;
31    let endpoint: String = row.try_get("endpoint")?;
32    let method: String = row.try_get("method")?;
33    let forecast_window_days: i32 = row.try_get("forecast_window_days")?;
34    let predicted_change_probability: f64 = row.try_get("predicted_change_probability")?;
35    let predicted_break_probability: f64 = row.try_get("predicted_break_probability")?;
36    let next_expected_change_date: Option<DateTime<Utc>> =
37        row.try_get("next_expected_change_date")?;
38    let next_expected_break_date: Option<DateTime<Utc>> =
39        row.try_get("next_expected_break_date")?;
40    let volatility_score: f64 = row.try_get("volatility_score")?;
41    let confidence: f64 = row.try_get("confidence")?;
42    let seasonal_patterns_json: serde_json::Value =
43        row.try_get("seasonal_patterns").unwrap_or_default();
44    let predicted_at: DateTime<Utc> = row.try_get("predicted_at")?;
45    let expires_at: DateTime<Utc> = row.try_get("expires_at")?;
46
47    // Parse seasonal patterns from JSONB
48    let seasonal_patterns: Vec<SeasonalPattern> = if seasonal_patterns_json.is_array() {
49        serde_json::from_value(seasonal_patterns_json).unwrap_or_default()
50    } else {
51        Vec::new()
52    };
53
54    Ok(ChangeForecast {
55        service_id,
56        service_name,
57        endpoint,
58        method,
59        forecast_window_days: forecast_window_days as u32,
60        predicted_change_probability,
61        predicted_break_probability,
62        next_expected_change_date,
63        next_expected_break_date,
64        volatility_score,
65        confidence,
66        seasonal_patterns,
67        predicted_at,
68        expires_at,
69    })
70}
71
72/// State for forecasting handlers
73#[derive(Clone)]
74pub struct ForecastingState {
75    /// Forecaster engine
76    pub forecaster: Arc<Forecaster>,
77    /// Database connection (optional)
78    #[cfg(feature = "database")]
79    pub database: Option<Database>,
80}
81
82/// Query parameters for listing forecasts
83#[derive(Debug, Deserialize)]
84pub struct ListForecastsQuery {
85    /// Workspace ID filter
86    pub workspace_id: Option<String>,
87    /// Service ID filter
88    pub service_id: Option<String>,
89    /// Endpoint filter
90    pub endpoint: Option<String>,
91    /// Method filter
92    pub method: Option<String>,
93    /// Forecast window (30, 90, or 180 days)
94    pub window_days: Option<u32>,
95}
96
97/// Response for forecast list
98#[derive(Debug, Serialize)]
99pub struct ForecastListResponse {
100    /// Forecasts
101    pub forecasts: Vec<ChangeForecast>,
102    /// Total count
103    pub total: usize,
104}
105
106/// Get forecasts
107///
108/// GET /api/v1/forecasts
109#[cfg(feature = "database")]
110pub async fn list_forecasts(
111    State(state): State<ForecastingState>,
112    Query(params): Query<ListForecastsQuery>,
113) -> Result<Json<ForecastListResponse>, StatusCode> {
114    let pool = match state.database.as_ref().and_then(|db| db.pool()) {
115        Some(pool) => pool,
116        None => {
117            return Ok(Json(ForecastListResponse {
118                forecasts: Vec::new(),
119                total: 0,
120            }));
121        }
122    };
123
124    // Build query with filters
125    let mut query = String::from(
126        "SELECT id, workspace_id, service_id, service_name, endpoint, method,
127         forecast_window_days, predicted_change_probability, predicted_break_probability,
128         next_expected_change_date, next_expected_break_date, volatility_score, confidence,
129         seasonal_patterns, predicted_at, expires_at
130         FROM api_change_forecasts WHERE expires_at > NOW()",
131    );
132
133    let mut bind_index = 1;
134
135    if params.workspace_id.is_some() {
136        query.push_str(&format!(" AND workspace_id = ${}", bind_index));
137        bind_index += 1;
138    }
139
140    if params.service_id.is_some() {
141        query.push_str(&format!(" AND service_id = ${}", bind_index));
142        bind_index += 1;
143    }
144
145    if params.endpoint.is_some() {
146        query.push_str(&format!(" AND endpoint = ${}", bind_index));
147        bind_index += 1;
148    }
149
150    if params.method.is_some() {
151        query.push_str(&format!(" AND method = ${}", bind_index));
152        bind_index += 1;
153    }
154
155    if let Some(_window) = params.window_days {
156        query.push_str(&format!(" AND forecast_window_days = ${}", bind_index));
157    }
158
159    query.push_str(" ORDER BY predicted_at DESC LIMIT 100");
160
161    // Build query with proper bindings using sqlx
162    let mut query_builder = sqlx::query(&query);
163
164    if let Some(ws_id) = &params.workspace_id {
165        let uuid = Uuid::parse_str(ws_id).ok();
166        query_builder = query_builder.bind(uuid);
167    }
168
169    if let Some(svc_id) = &params.service_id {
170        query_builder = query_builder.bind(svc_id);
171    }
172
173    if let Some(ep) = &params.endpoint {
174        query_builder = query_builder.bind(ep);
175    }
176
177    if let Some(m) = &params.method {
178        query_builder = query_builder.bind(m);
179    }
180
181    if let Some(window) = params.window_days {
182        query_builder = query_builder.bind(window as i32);
183    }
184
185    // Execute query
186    let rows = query_builder.fetch_all(pool).await.map_err(|e| {
187        tracing::error!("Failed to query forecasts: {}", e);
188        StatusCode::INTERNAL_SERVER_ERROR
189    })?;
190
191    // Map rows to ChangeForecast
192    let mut forecasts = Vec::new();
193    for row in rows {
194        match map_row_to_change_forecast(&row) {
195            Ok(forecast) => forecasts.push(forecast),
196            Err(e) => {
197                tracing::warn!("Failed to map forecast row: {}", e);
198                continue;
199            }
200        }
201    }
202
203    let total = forecasts.len();
204    Ok(Json(ForecastListResponse { forecasts, total }))
205}
206
207/// List forecasts (no database)
208///
209/// GET /api/v1/forecasts
210#[cfg(not(feature = "database"))]
211pub async fn list_forecasts(
212    State(_state): State<ForecastingState>,
213    Query(_params): Query<ListForecastsQuery>,
214) -> Result<Json<ForecastListResponse>, StatusCode> {
215    Ok(Json(ForecastListResponse {
216        forecasts: Vec::new(),
217        total: 0,
218    }))
219}
220
221/// Get service-level forecasts
222///
223/// GET /api/v1/forecasts/service/{service_id}
224#[cfg(feature = "database")]
225pub async fn get_service_forecasts(
226    State(state): State<ForecastingState>,
227    Path(service_id): Path<String>,
228    Query(_params): Query<ListForecastsQuery>,
229) -> Result<Json<ForecastListResponse>, StatusCode> {
230    let pool = match state.database.as_ref().and_then(|db| db.pool()) {
231        Some(pool) => pool,
232        None => {
233            return Ok(Json(ForecastListResponse {
234                forecasts: Vec::new(),
235                total: 0,
236            }));
237        }
238    };
239
240    // Query forecasts for this service
241    let rows = sqlx::query(
242        "SELECT * FROM api_change_forecasts
243         WHERE service_id = $1 AND expires_at > NOW()
244         ORDER BY predicted_at DESC LIMIT 50",
245    )
246    .bind(&service_id)
247    .fetch_all(pool)
248    .await
249    .map_err(|e| {
250        tracing::error!("Failed to query service forecasts: {}", e);
251        StatusCode::INTERNAL_SERVER_ERROR
252    })?;
253
254    // Map rows to forecasts
255    let mut forecasts = Vec::new();
256    for row in rows {
257        match map_row_to_change_forecast(&row) {
258            Ok(forecast) => forecasts.push(forecast),
259            Err(e) => {
260                tracing::warn!("Failed to map service forecast row: {}", e);
261                continue;
262            }
263        }
264    }
265
266    let total = forecasts.len();
267    Ok(Json(ForecastListResponse { forecasts, total }))
268}
269
270/// Get service-level forecasts (no database)
271///
272/// GET /api/v1/forecasts/service/{service_id}
273#[cfg(not(feature = "database"))]
274pub async fn get_service_forecasts(
275    State(_state): State<ForecastingState>,
276    Path(_service_id): Path<String>,
277    Query(_params): Query<ListForecastsQuery>,
278) -> Result<Json<ForecastListResponse>, StatusCode> {
279    Ok(Json(ForecastListResponse {
280        forecasts: Vec::new(),
281        total: 0,
282    }))
283}
284
285/// Get endpoint-level forecasts
286///
287/// GET /api/v1/forecasts/endpoint/{endpoint}
288#[cfg(feature = "database")]
289pub async fn get_endpoint_forecasts(
290    State(state): State<ForecastingState>,
291    Path(endpoint): Path<String>,
292    Query(params): Query<ListForecastsQuery>,
293) -> Result<Json<ForecastListResponse>, StatusCode> {
294    let pool = match state.database.as_ref().and_then(|db| db.pool()) {
295        Some(pool) => pool,
296        None => {
297            return Ok(Json(ForecastListResponse {
298                forecasts: Vec::new(),
299                total: 0,
300            }));
301        }
302    };
303
304    let method = params.method.as_deref().unwrap_or("%");
305
306    let rows = sqlx::query(
307        "SELECT * FROM api_change_forecasts
308         WHERE endpoint = $1 AND method LIKE $2 AND expires_at > NOW()
309         ORDER BY predicted_at DESC LIMIT 50",
310    )
311    .bind(&endpoint)
312    .bind(method)
313    .fetch_all(pool)
314    .await
315    .map_err(|e| {
316        tracing::error!("Failed to query endpoint forecasts: {}", e);
317        StatusCode::INTERNAL_SERVER_ERROR
318    })?;
319
320    // Map rows to forecasts
321    let mut forecasts = Vec::new();
322    for row in rows {
323        match map_row_to_change_forecast(&row) {
324            Ok(forecast) => forecasts.push(forecast),
325            Err(e) => {
326                tracing::warn!("Failed to map endpoint forecast row: {}", e);
327                continue;
328            }
329        }
330    }
331
332    let total = forecasts.len();
333    Ok(Json(ForecastListResponse { forecasts, total }))
334}
335
336/// Get endpoint-level forecasts (no database)
337///
338/// GET /api/v1/forecasts/endpoint/{endpoint}
339#[cfg(not(feature = "database"))]
340pub async fn get_endpoint_forecasts(
341    State(_state): State<ForecastingState>,
342    Path(_endpoint): Path<String>,
343    Query(_params): Query<ListForecastsQuery>,
344) -> Result<Json<ForecastListResponse>, StatusCode> {
345    Ok(Json(ForecastListResponse {
346        forecasts: Vec::new(),
347        total: 0,
348    }))
349}
350
351/// Request to refresh forecasts
352#[derive(Debug, Deserialize)]
353pub struct RefreshForecastsRequest {
354    /// Workspace ID
355    pub workspace_id: Option<String>,
356    /// Service ID
357    pub service_id: Option<String>,
358    /// Endpoint (optional)
359    pub endpoint: Option<String>,
360    /// Method (optional)
361    pub method: Option<String>,
362}
363
364/// Refresh forecasts
365///
366/// POST /api/v1/forecasts/refresh
367#[cfg(feature = "database")]
368pub async fn refresh_forecasts(
369    State(state): State<ForecastingState>,
370    Json(request): Json<RefreshForecastsRequest>,
371) -> Result<Json<serde_json::Value>, StatusCode> {
372    let pool = match state.database.as_ref().and_then(|db| db.pool()) {
373        Some(pool) => pool,
374        None => {
375            return Ok(Json(serde_json::json!({
376                "success": false,
377                "error": "Database not available"
378            })));
379        }
380    };
381
382    // Query historical incidents for forecasting
383    let mut incident_query = String::from(
384        "SELECT id, workspace_id, endpoint, method, incident_type, severity, status,
385         detected_at, details, created_at, updated_at
386         FROM drift_incidents WHERE 1=1",
387    );
388
389    if let Some(_ws_id) = &request.workspace_id {
390        incident_query.push_str(" AND workspace_id = $1");
391    }
392
393    // Execute query to get incidents
394    let rows = sqlx::query(&incident_query).fetch_all(pool).await.map_err(|e| {
395        tracing::error!("Failed to query drift incidents: {}", e);
396        StatusCode::INTERNAL_SERVER_ERROR
397    })?;
398
399    // Map rows to DriftIncident and generate forecasts
400    use mockforge_core::incidents::types::{IncidentSeverity, IncidentStatus, IncidentType};
401    use sqlx::Row;
402    let mut incidents = Vec::new();
403    for row in rows {
404        let id: Uuid = row.try_get("id").map_err(|e| {
405            tracing::error!("Failed to get id from row: {}", e);
406            StatusCode::INTERNAL_SERVER_ERROR
407        })?;
408        let workspace_id: Option<Uuid> = row.try_get("workspace_id").ok();
409        let endpoint: String = match row.try_get("endpoint") {
410            Ok(e) => e,
411            Err(_) => continue,
412        };
413        let method: String = match row.try_get("method") {
414            Ok(m) => m,
415            Err(_) => continue,
416        };
417        let incident_type_str: String = match row.try_get("incident_type") {
418            Ok(s) => s,
419            Err(_) => continue,
420        };
421        let severity_str: String = match row.try_get("severity") {
422            Ok(s) => s,
423            Err(_) => continue,
424        };
425        let status_str: String = match row.try_get("status") {
426            Ok(s) => s,
427            Err(_) => continue,
428        };
429        let detected_at: DateTime<Utc> = match row.try_get("detected_at") {
430            Ok(dt) => dt,
431            Err(_) => continue,
432        };
433        let details_json: serde_json::Value = row.try_get("details").unwrap_or_default();
434        let created_at: DateTime<Utc> = match row.try_get("created_at") {
435            Ok(dt) => dt,
436            Err(_) => continue,
437        };
438        let updated_at: DateTime<Utc> = match row.try_get("updated_at") {
439            Ok(dt) => dt,
440            Err(_) => continue,
441        };
442
443        let incident_type = match incident_type_str.as_str() {
444            "breaking_change" => IncidentType::BreakingChange,
445            "threshold_exceeded" => IncidentType::ThresholdExceeded,
446            _ => continue, // Skip invalid types
447        };
448
449        let severity = match severity_str.as_str() {
450            "low" => IncidentSeverity::Low,
451            "medium" => IncidentSeverity::Medium,
452            "high" => IncidentSeverity::High,
453            "critical" => IncidentSeverity::Critical,
454            _ => continue, // Skip invalid severity
455        };
456
457        let status = match status_str.as_str() {
458            "open" => IncidentStatus::Open,
459            "acknowledged" => IncidentStatus::Acknowledged,
460            "resolved" => IncidentStatus::Resolved,
461            "closed" => IncidentStatus::Closed,
462            _ => continue, // Skip invalid status
463        };
464
465        incidents.push(DriftIncident {
466            id: id.to_string(),
467            budget_id: None,
468            workspace_id: workspace_id.map(|u| u.to_string()),
469            endpoint,
470            method,
471            incident_type,
472            severity,
473            status,
474            detected_at: detected_at.timestamp(),
475            resolved_at: None,
476            details: details_json,
477            external_ticket_id: None,
478            external_ticket_url: None,
479            created_at: created_at.timestamp(),
480            updated_at: updated_at.timestamp(),
481            sync_cycle_id: None,
482            contract_diff_id: None,
483            before_sample: None,
484            after_sample: None,
485            fitness_test_results: Vec::new(),
486            affected_consumers: None,
487            protocol: None,
488        });
489    }
490
491    // Generate forecasts from incidents by grouping by endpoint/method
492    use mockforge_core::incidents::types::DriftIncident;
493    use std::collections::HashMap;
494    let mut forecasts_generated = 0;
495    let mut endpoint_groups: HashMap<(String, String), Vec<DriftIncident>> = HashMap::new();
496
497    for incident in incidents {
498        endpoint_groups
499            .entry((incident.endpoint.clone(), incident.method.clone()))
500            .or_default()
501            .push(incident);
502    }
503
504    for ((endpoint, method), group_incidents) in endpoint_groups {
505        if let Some(forecast) = state.forecaster.generate_forecast(
506            &group_incidents,
507            request.workspace_id.clone(),
508            None, // service_id
509            None, // service_name
510            endpoint,
511            method,
512            30, // forecast_window_days
513        ) {
514            // Persist forecast to database
515            if let Err(e) = store_forecast(pool, &forecast, request.workspace_id.as_deref()).await {
516                tracing::warn!("Failed to store forecast: {}", e);
517            }
518            forecasts_generated += 1;
519        }
520    }
521
522    Ok(Json(serde_json::json!({
523        "success": true,
524        "message": "Forecasts refreshed",
525        "forecasts_generated": forecasts_generated
526    })))
527}
528
529/// Refresh forecasts (no database)
530///
531/// POST /api/v1/forecasts/refresh
532#[cfg(not(feature = "database"))]
533pub async fn refresh_forecasts(
534    State(_state): State<ForecastingState>,
535    Json(_request): Json<RefreshForecastsRequest>,
536) -> Result<Json<serde_json::Value>, StatusCode> {
537    Ok(Json(serde_json::json!({
538        "success": false,
539        "error": "Database not available"
540    })))
541}
542
543/// Store a forecast in the database
544#[cfg(feature = "database")]
545pub async fn store_forecast(
546    pool: &sqlx::PgPool,
547    forecast: &ChangeForecast,
548    workspace_id: Option<&str>,
549) -> Result<(), sqlx::Error> {
550    let id = Uuid::new_v4();
551    let workspace_uuid = workspace_id.and_then(|id| Uuid::parse_str(id).ok());
552
553    sqlx::query(
554        r#"
555        INSERT INTO api_change_forecasts (
556            id, workspace_id, service_id, service_name, endpoint, method,
557            forecast_window_days, predicted_change_probability, predicted_break_probability,
558            next_expected_change_date, next_expected_break_date, volatility_score, confidence,
559            seasonal_patterns, predicted_at, expires_at
560        ) VALUES (
561            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16
562        )
563        ON CONFLICT (workspace_id, service_id, endpoint, method, forecast_window_days)
564        DO UPDATE SET
565            predicted_change_probability = EXCLUDED.predicted_change_probability,
566            predicted_break_probability = EXCLUDED.predicted_break_probability,
567            next_expected_change_date = EXCLUDED.next_expected_change_date,
568            next_expected_break_date = EXCLUDED.next_expected_break_date,
569            volatility_score = EXCLUDED.volatility_score,
570            confidence = EXCLUDED.confidence,
571            seasonal_patterns = EXCLUDED.seasonal_patterns,
572            predicted_at = EXCLUDED.predicted_at,
573            expires_at = EXCLUDED.expires_at,
574            updated_at = NOW()
575        "#,
576    )
577    .bind(id)
578    .bind(workspace_uuid)
579    .bind(forecast.service_id.as_deref())
580    .bind(forecast.service_name.as_deref())
581    .bind(&forecast.endpoint)
582    .bind(&forecast.method)
583    .bind(forecast.forecast_window_days as i32)
584    .bind(forecast.predicted_change_probability)
585    .bind(forecast.predicted_break_probability)
586    .bind(forecast.next_expected_change_date)
587    .bind(forecast.next_expected_break_date)
588    .bind(forecast.volatility_score)
589    .bind(forecast.confidence)
590    .bind(serde_json::to_value(&forecast.seasonal_patterns).unwrap_or_default())
591    .bind(forecast.predicted_at)
592    .bind(forecast.expires_at)
593    .execute(pool)
594    .await?;
595
596    Ok(())
597}
598
599/// Create router for forecasting endpoints
600pub fn forecasting_router(state: ForecastingState) -> axum::Router {
601    use axum::routing::{get, post};
602    use axum::Router;
603
604    Router::new()
605        .route("/api/v1/forecasts", get(list_forecasts))
606        .route("/api/v1/forecasts/service/{service_id}", get(get_service_forecasts))
607        .route("/api/v1/forecasts/endpoint/{endpoint}", get(get_endpoint_forecasts))
608        .route("/api/v1/forecasts/refresh", post(refresh_forecasts))
609        .with_state(state)
610}