Skip to main content

mockforge_http/handlers/
forecasting.rs

1//! HTTP handlers for API change forecasting
2//!
3//! This module provides endpoints for querying and managing forecasts.
4
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::Json,
9};
10use mockforge_core::contract_drift::forecasting::{ChangeForecast, Forecaster};
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13
14#[cfg(feature = "database")]
15use chrono::{DateTime, Utc};
16#[cfg(feature = "database")]
17use mockforge_core::contract_drift::forecasting::SeasonalPattern;
18#[cfg(feature = "database")]
19use uuid::Uuid;
20
21use crate::database::Database;
22
23/// Helper function to map database row to ChangeForecast
24#[cfg(feature = "database")]
25fn map_row_to_change_forecast(row: &sqlx::postgres::PgRow) -> Result<ChangeForecast, sqlx::Error> {
26    use sqlx::Row;
27
28    let service_id: Option<String> = row.try_get("service_id")?;
29    let service_name: Option<String> = row.try_get("service_name")?;
30    let endpoint: String = row.try_get("endpoint")?;
31    let method: String = row.try_get("method")?;
32    let forecast_window_days: i32 = row.try_get("forecast_window_days")?;
33    let predicted_change_probability: f64 = row.try_get("predicted_change_probability")?;
34    let predicted_break_probability: f64 = row.try_get("predicted_break_probability")?;
35    let next_expected_change_date: Option<DateTime<Utc>> =
36        row.try_get("next_expected_change_date")?;
37    let next_expected_break_date: Option<DateTime<Utc>> =
38        row.try_get("next_expected_break_date")?;
39    let volatility_score: f64 = row.try_get("volatility_score")?;
40    let confidence: f64 = row.try_get("confidence")?;
41    let seasonal_patterns_json: serde_json::Value =
42        row.try_get("seasonal_patterns").unwrap_or_default();
43    let predicted_at: DateTime<Utc> = row.try_get("predicted_at")?;
44    let expires_at: DateTime<Utc> = row.try_get("expires_at")?;
45
46    // Parse seasonal patterns from JSONB
47    let seasonal_patterns: Vec<SeasonalPattern> = if seasonal_patterns_json.is_array() {
48        serde_json::from_value(seasonal_patterns_json).unwrap_or_default()
49    } else {
50        Vec::new()
51    };
52
53    Ok(ChangeForecast {
54        service_id,
55        service_name,
56        endpoint,
57        method,
58        forecast_window_days: forecast_window_days as u32,
59        predicted_change_probability,
60        predicted_break_probability,
61        next_expected_change_date,
62        next_expected_break_date,
63        volatility_score,
64        confidence,
65        seasonal_patterns,
66        predicted_at,
67        expires_at,
68    })
69}
70
71/// State for forecasting handlers
72#[derive(Clone)]
73pub struct ForecastingState {
74    /// Forecaster engine
75    pub forecaster: Arc<Forecaster>,
76    /// Database connection (optional)
77    pub database: Option<Database>,
78}
79
80/// Query parameters for listing forecasts
81#[derive(Debug, Deserialize)]
82pub struct ListForecastsQuery {
83    /// Workspace ID filter
84    pub workspace_id: Option<String>,
85    /// Service ID filter
86    pub service_id: Option<String>,
87    /// Endpoint filter
88    pub endpoint: Option<String>,
89    /// Method filter
90    pub method: Option<String>,
91    /// Forecast window (30, 90, or 180 days)
92    pub window_days: Option<u32>,
93}
94
95/// Response for forecast list
96#[derive(Debug, Serialize)]
97pub struct ForecastListResponse {
98    /// Forecasts
99    pub forecasts: Vec<ChangeForecast>,
100    /// Total count
101    pub total: usize,
102}
103
104/// Get forecasts
105///
106/// GET /api/v1/forecasts
107#[cfg(feature = "database")]
108pub async fn list_forecasts(
109    State(state): State<ForecastingState>,
110    Query(params): Query<ListForecastsQuery>,
111) -> Result<Json<ForecastListResponse>, StatusCode> {
112    let pool = match state.database.as_ref().and_then(|db| db.pool()) {
113        Some(pool) => pool,
114        None => {
115            return Ok(Json(ForecastListResponse {
116                forecasts: Vec::new(),
117                total: 0,
118            }));
119        }
120    };
121
122    // Build query with filters
123    let mut query = String::from(
124        "SELECT id, workspace_id, service_id, service_name, endpoint, method,
125         forecast_window_days, predicted_change_probability, predicted_break_probability,
126         next_expected_change_date, next_expected_break_date, volatility_score, confidence,
127         seasonal_patterns, predicted_at, expires_at
128         FROM api_change_forecasts WHERE expires_at > NOW()",
129    );
130
131    let mut bind_index = 1;
132
133    if params.workspace_id.is_some() {
134        query.push_str(&format!(" AND workspace_id = ${}", bind_index));
135        bind_index += 1;
136    }
137
138    if params.service_id.is_some() {
139        query.push_str(&format!(" AND service_id = ${}", bind_index));
140        bind_index += 1;
141    }
142
143    if params.endpoint.is_some() {
144        query.push_str(&format!(" AND endpoint = ${}", bind_index));
145        bind_index += 1;
146    }
147
148    if params.method.is_some() {
149        query.push_str(&format!(" AND method = ${}", bind_index));
150        bind_index += 1;
151    }
152
153    if let Some(window) = params.window_days {
154        query.push_str(&format!(" AND forecast_window_days = ${}", bind_index));
155        bind_index += 1;
156    }
157
158    query.push_str(" ORDER BY predicted_at DESC LIMIT 100");
159
160    // Build query with proper bindings using sqlx
161    let mut query_builder = sqlx::query(&query);
162
163    if let Some(ws_id) = &params.workspace_id {
164        let uuid = Uuid::parse_str(ws_id).ok();
165        query_builder = query_builder.bind(uuid);
166    }
167
168    if let Some(svc_id) = &params.service_id {
169        query_builder = query_builder.bind(svc_id);
170    }
171
172    if let Some(ep) = &params.endpoint {
173        query_builder = query_builder.bind(ep);
174    }
175
176    if let Some(m) = &params.method {
177        query_builder = query_builder.bind(m);
178    }
179
180    if let Some(window) = params.window_days {
181        query_builder = query_builder.bind(window as i32);
182    }
183
184    // Execute query
185    let rows = query_builder.fetch_all(pool).await.map_err(|e| {
186        tracing::error!("Failed to query forecasts: {}", e);
187        StatusCode::INTERNAL_SERVER_ERROR
188    })?;
189
190    // Map rows to ChangeForecast
191    let mut forecasts = Vec::new();
192    for row in rows {
193        match map_row_to_change_forecast(&row) {
194            Ok(forecast) => forecasts.push(forecast),
195            Err(e) => {
196                tracing::warn!("Failed to map forecast row: {}", e);
197                continue;
198            }
199        }
200    }
201
202    let total = forecasts.len();
203    Ok(Json(ForecastListResponse { forecasts, total }))
204}
205
206/// List forecasts (no database)
207///
208/// GET /api/v1/forecasts
209#[cfg(not(feature = "database"))]
210pub async fn list_forecasts(
211    State(_state): State<ForecastingState>,
212    Query(_params): Query<ListForecastsQuery>,
213) -> Result<Json<ForecastListResponse>, StatusCode> {
214    Ok(Json(ForecastListResponse {
215        forecasts: Vec::new(),
216        total: 0,
217    }))
218}
219
220/// Get service-level forecasts
221///
222/// GET /api/v1/forecasts/service/{service_id}
223#[cfg(feature = "database")]
224pub async fn get_service_forecasts(
225    State(state): State<ForecastingState>,
226    Path(service_id): Path<String>,
227    Query(_params): Query<ListForecastsQuery>,
228) -> Result<Json<ForecastListResponse>, StatusCode> {
229    let pool = match state.database.as_ref().and_then(|db| db.pool()) {
230        Some(pool) => pool,
231        None => {
232            return Ok(Json(ForecastListResponse {
233                forecasts: Vec::new(),
234                total: 0,
235            }));
236        }
237    };
238
239    // Query forecasts for this service
240    let rows = sqlx::query(
241        "SELECT * FROM api_change_forecasts
242         WHERE service_id = $1 AND expires_at > NOW()
243         ORDER BY predicted_at DESC LIMIT 50",
244    )
245    .bind(&service_id)
246    .fetch_all(pool)
247    .await
248    .map_err(|e| {
249        tracing::error!("Failed to query service forecasts: {}", e);
250        StatusCode::INTERNAL_SERVER_ERROR
251    })?;
252
253    // Map rows to forecasts (simplified - would need proper FromRow implementation)
254    Ok(Json(ForecastListResponse {
255        forecasts: Vec::new(),
256        total: rows.len(),
257    }))
258}
259
260/// Get service-level forecasts (no database)
261///
262/// GET /api/v1/forecasts/service/{service_id}
263#[cfg(not(feature = "database"))]
264pub async fn get_service_forecasts(
265    State(_state): State<ForecastingState>,
266    Path(_service_id): Path<String>,
267    Query(_params): Query<ListForecastsQuery>,
268) -> Result<Json<ForecastListResponse>, StatusCode> {
269    Ok(Json(ForecastListResponse {
270        forecasts: Vec::new(),
271        total: 0,
272    }))
273}
274
275/// Get endpoint-level forecasts
276///
277/// GET /api/v1/forecasts/endpoint/{endpoint}
278#[cfg(feature = "database")]
279pub async fn get_endpoint_forecasts(
280    State(state): State<ForecastingState>,
281    Path(endpoint): Path<String>,
282    Query(params): Query<ListForecastsQuery>,
283) -> Result<Json<ForecastListResponse>, StatusCode> {
284    let pool = match state.database.as_ref().and_then(|db| db.pool()) {
285        Some(pool) => pool,
286        None => {
287            return Ok(Json(ForecastListResponse {
288                forecasts: Vec::new(),
289                total: 0,
290            }));
291        }
292    };
293
294    let method = params.method.as_deref().unwrap_or("%");
295
296    let rows = sqlx::query(
297        "SELECT * FROM api_change_forecasts
298         WHERE endpoint = $1 AND method LIKE $2 AND expires_at > NOW()
299         ORDER BY predicted_at DESC LIMIT 50",
300    )
301    .bind(&endpoint)
302    .bind(method)
303    .fetch_all(pool)
304    .await
305    .map_err(|e| {
306        tracing::error!("Failed to query endpoint forecasts: {}", e);
307        StatusCode::INTERNAL_SERVER_ERROR
308    })?;
309
310    Ok(Json(ForecastListResponse {
311        forecasts: Vec::new(),
312        total: rows.len(),
313    }))
314}
315
316/// Get endpoint-level forecasts (no database)
317///
318/// GET /api/v1/forecasts/endpoint/{endpoint}
319#[cfg(not(feature = "database"))]
320pub async fn get_endpoint_forecasts(
321    State(_state): State<ForecastingState>,
322    Path(_endpoint): Path<String>,
323    Query(_params): Query<ListForecastsQuery>,
324) -> Result<Json<ForecastListResponse>, StatusCode> {
325    Ok(Json(ForecastListResponse {
326        forecasts: Vec::new(),
327        total: 0,
328    }))
329}
330
331/// Request to refresh forecasts
332#[derive(Debug, Deserialize)]
333pub struct RefreshForecastsRequest {
334    /// Workspace ID
335    pub workspace_id: Option<String>,
336    /// Service ID
337    pub service_id: Option<String>,
338    /// Endpoint (optional)
339    pub endpoint: Option<String>,
340    /// Method (optional)
341    pub method: Option<String>,
342}
343
344/// Refresh forecasts
345///
346/// POST /api/v1/forecasts/refresh
347#[cfg(feature = "database")]
348pub async fn refresh_forecasts(
349    State(state): State<ForecastingState>,
350    Json(request): Json<RefreshForecastsRequest>,
351) -> Result<Json<serde_json::Value>, StatusCode> {
352    let pool = match state.database.as_ref().and_then(|db| db.pool()) {
353        Some(pool) => pool,
354        None => {
355            return Ok(Json(serde_json::json!({
356                "success": false,
357                "error": "Database not available"
358            })));
359        }
360    };
361
362    // Query historical incidents for forecasting
363    let mut incident_query = String::from(
364        "SELECT id, workspace_id, endpoint, method, incident_type, severity, status,
365         detected_at, details, created_at, updated_at
366         FROM drift_incidents WHERE 1=1",
367    );
368
369    if let Some(ws_id) = &request.workspace_id {
370        incident_query.push_str(" AND workspace_id = $1");
371    }
372
373    // Execute query to get incidents
374    let rows = sqlx::query(&incident_query).fetch_all(pool).await.map_err(|e| {
375        tracing::error!("Failed to query drift incidents: {}", e);
376        StatusCode::INTERNAL_SERVER_ERROR
377    })?;
378
379    // Map rows to DriftIncident and generate forecasts
380    use mockforge_core::incidents::types::{IncidentSeverity, IncidentStatus, IncidentType};
381    use sqlx::Row;
382    let mut incidents = Vec::new();
383    for row in rows {
384        let id: uuid::Uuid = row.try_get("id").map_err(|e| {
385            tracing::error!("Failed to get id from row: {}", e);
386            StatusCode::INTERNAL_SERVER_ERROR
387        })?;
388        let workspace_id: Option<uuid::Uuid> = row.try_get("workspace_id").ok();
389        let endpoint: String = match row.try_get("endpoint") {
390            Ok(e) => e,
391            Err(_) => continue,
392        };
393        let method: String = match row.try_get("method") {
394            Ok(m) => m,
395            Err(_) => continue,
396        };
397        let incident_type_str: String = match row.try_get("incident_type") {
398            Ok(s) => s,
399            Err(_) => continue,
400        };
401        let severity_str: String = match row.try_get("severity") {
402            Ok(s) => s,
403            Err(_) => continue,
404        };
405        let status_str: String = match row.try_get("status") {
406            Ok(s) => s,
407            Err(_) => continue,
408        };
409        let detected_at: DateTime<Utc> = match row.try_get("detected_at") {
410            Ok(dt) => dt,
411            Err(_) => continue,
412        };
413        let details_json: serde_json::Value = row.try_get("details").unwrap_or_default();
414        let created_at: DateTime<Utc> = match row.try_get("created_at") {
415            Ok(dt) => dt,
416            Err(_) => continue,
417        };
418        let updated_at: DateTime<Utc> = match row.try_get("updated_at") {
419            Ok(dt) => dt,
420            Err(_) => continue,
421        };
422
423        let incident_type = match incident_type_str.as_str() {
424            "breaking_change" => IncidentType::BreakingChange,
425            "threshold_exceeded" => IncidentType::ThresholdExceeded,
426            _ => continue, // Skip invalid types
427        };
428
429        let severity = match severity_str.as_str() {
430            "low" => IncidentSeverity::Low,
431            "medium" => IncidentSeverity::Medium,
432            "high" => IncidentSeverity::High,
433            "critical" => IncidentSeverity::Critical,
434            _ => continue, // Skip invalid severity
435        };
436
437        let status = match status_str.as_str() {
438            "open" => IncidentStatus::Open,
439            "acknowledged" => IncidentStatus::Acknowledged,
440            "resolved" => IncidentStatus::Resolved,
441            "closed" => IncidentStatus::Closed,
442            _ => continue, // Skip invalid status
443        };
444
445        incidents.push(DriftIncident {
446            id: id.to_string(),
447            budget_id: None,
448            workspace_id: workspace_id.map(|u| u.to_string()),
449            endpoint,
450            method,
451            incident_type,
452            severity,
453            status,
454            detected_at: detected_at.timestamp(),
455            resolved_at: None,
456            details: details_json,
457            external_ticket_id: None,
458            external_ticket_url: None,
459            created_at: created_at.timestamp(),
460            updated_at: updated_at.timestamp(),
461            sync_cycle_id: None,
462            contract_diff_id: None,
463            before_sample: None,
464            after_sample: None,
465            fitness_test_results: Vec::new(),
466            affected_consumers: None,
467            protocol: None,
468        });
469    }
470
471    // Generate forecasts from incidents by grouping by endpoint/method
472    use mockforge_core::incidents::types::DriftIncident;
473    use std::collections::HashMap;
474    let mut forecasts_generated = 0;
475    let mut endpoint_groups: HashMap<(String, String), Vec<DriftIncident>> = HashMap::new();
476
477    for incident in incidents {
478        endpoint_groups
479            .entry((incident.endpoint.clone(), incident.method.clone()))
480            .or_insert_with(Vec::new)
481            .push(incident);
482    }
483
484    for ((endpoint, method), group_incidents) in endpoint_groups {
485        if let Some(_forecast) = state.forecaster.generate_forecast(
486            &group_incidents,
487            request.workspace_id.clone(),
488            None, // service_id
489            None, // service_name
490            endpoint,
491            method,
492            30, // forecast_window_days
493        ) {
494            forecasts_generated += 1;
495        }
496    }
497
498    Ok(Json(serde_json::json!({
499        "success": true,
500        "message": "Forecasts refreshed",
501        "forecasts_generated": forecasts_generated
502    })))
503}
504
505/// Refresh forecasts (no database)
506///
507/// POST /api/v1/forecasts/refresh
508#[cfg(not(feature = "database"))]
509pub async fn refresh_forecasts(
510    State(_state): State<ForecastingState>,
511    Json(_request): Json<RefreshForecastsRequest>,
512) -> Result<Json<serde_json::Value>, StatusCode> {
513    Ok(Json(serde_json::json!({
514        "success": false,
515        "error": "Database not available"
516    })))
517}
518
519/// Store a forecast in the database
520#[cfg(feature = "database")]
521pub async fn store_forecast(
522    pool: &sqlx::PgPool,
523    forecast: &ChangeForecast,
524    workspace_id: Option<&str>,
525) -> Result<(), sqlx::Error> {
526    let id = Uuid::new_v4();
527    let workspace_uuid = workspace_id.and_then(|id| Uuid::parse_str(id).ok());
528
529    sqlx::query(
530        r#"
531        INSERT INTO api_change_forecasts (
532            id, workspace_id, service_id, service_name, endpoint, method,
533            forecast_window_days, predicted_change_probability, predicted_break_probability,
534            next_expected_change_date, next_expected_break_date, volatility_score, confidence,
535            seasonal_patterns, predicted_at, expires_at
536        ) VALUES (
537            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16
538        )
539        ON CONFLICT (workspace_id, service_id, endpoint, method, forecast_window_days)
540        DO UPDATE SET
541            predicted_change_probability = EXCLUDED.predicted_change_probability,
542            predicted_break_probability = EXCLUDED.predicted_break_probability,
543            next_expected_change_date = EXCLUDED.next_expected_change_date,
544            next_expected_break_date = EXCLUDED.next_expected_break_date,
545            volatility_score = EXCLUDED.volatility_score,
546            confidence = EXCLUDED.confidence,
547            seasonal_patterns = EXCLUDED.seasonal_patterns,
548            predicted_at = EXCLUDED.predicted_at,
549            expires_at = EXCLUDED.expires_at,
550            updated_at = NOW()
551        "#,
552    )
553    .bind(id)
554    .bind(workspace_uuid)
555    .bind(forecast.service_id.as_deref())
556    .bind(forecast.service_name.as_deref())
557    .bind(&forecast.endpoint)
558    .bind(&forecast.method)
559    .bind(forecast.forecast_window_days as i32)
560    .bind(forecast.predicted_change_probability)
561    .bind(forecast.predicted_break_probability)
562    .bind(forecast.next_expected_change_date)
563    .bind(forecast.next_expected_break_date)
564    .bind(forecast.volatility_score)
565    .bind(forecast.confidence)
566    .bind(serde_json::to_value(&forecast.seasonal_patterns).unwrap_or_default())
567    .bind(forecast.predicted_at)
568    .bind(forecast.expires_at)
569    .execute(pool)
570    .await?;
571
572    Ok(())
573}
574
575/// Create router for forecasting endpoints
576pub fn forecasting_router(state: ForecastingState) -> axum::Router {
577    use axum::routing::{get, post};
578    use axum::Router;
579
580    Router::new()
581        .route("/api/v1/forecasts", get(list_forecasts))
582        .route("/api/v1/forecasts/service/{service_id}", get(get_service_forecasts))
583        .route("/api/v1/forecasts/endpoint/{endpoint}", get(get_endpoint_forecasts))
584        .route("/api/v1/forecasts/refresh", post(refresh_forecasts))
585        .with_state(state)
586}