1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::Json,
9};
10use mockforge_contracts::contract_drift::forecasting::{ChangeForecast, Forecaster};
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13
14#[cfg(feature = "database")]
15use chrono::{DateTime, Utc};
16#[cfg(feature = "database")]
17use mockforge_contracts::contract_drift::forecasting::SeasonalPattern;
18#[cfg(feature = "database")]
19use uuid::Uuid;
20
21#[cfg(feature = "database")]
22use crate::database::Database;
23
24#[cfg(feature = "database")]
26fn map_row_to_change_forecast(row: &sqlx::postgres::PgRow) -> Result<ChangeForecast, sqlx::Error> {
27 use sqlx::Row;
28
29 let service_id: Option<String> = row.try_get("service_id")?;
30 let service_name: Option<String> = row.try_get("service_name")?;
31 let endpoint: String = row.try_get("endpoint")?;
32 let method: String = row.try_get("method")?;
33 let forecast_window_days: i32 = row.try_get("forecast_window_days")?;
34 let predicted_change_probability: f64 = row.try_get("predicted_change_probability")?;
35 let predicted_break_probability: f64 = row.try_get("predicted_break_probability")?;
36 let next_expected_change_date: Option<DateTime<Utc>> =
37 row.try_get("next_expected_change_date")?;
38 let next_expected_break_date: Option<DateTime<Utc>> =
39 row.try_get("next_expected_break_date")?;
40 let volatility_score: f64 = row.try_get("volatility_score")?;
41 let confidence: f64 = row.try_get("confidence")?;
42 let seasonal_patterns_json: serde_json::Value =
43 row.try_get("seasonal_patterns").unwrap_or_default();
44 let predicted_at: DateTime<Utc> = row.try_get("predicted_at")?;
45 let expires_at: DateTime<Utc> = row.try_get("expires_at")?;
46
47 let seasonal_patterns: Vec<SeasonalPattern> = if seasonal_patterns_json.is_array() {
49 serde_json::from_value(seasonal_patterns_json).unwrap_or_default()
50 } else {
51 Vec::new()
52 };
53
54 Ok(ChangeForecast {
55 service_id,
56 service_name,
57 endpoint,
58 method,
59 forecast_window_days: forecast_window_days as u32,
60 predicted_change_probability,
61 predicted_break_probability,
62 next_expected_change_date,
63 next_expected_break_date,
64 volatility_score,
65 confidence,
66 seasonal_patterns,
67 predicted_at,
68 expires_at,
69 })
70}
71
72#[derive(Clone)]
74pub struct ForecastingState {
75 pub forecaster: Arc<Forecaster>,
77 #[cfg(feature = "database")]
79 pub database: Option<Database>,
80}
81
82#[derive(Debug, Deserialize)]
84pub struct ListForecastsQuery {
85 pub workspace_id: Option<String>,
87 pub service_id: Option<String>,
89 pub endpoint: Option<String>,
91 pub method: Option<String>,
93 pub window_days: Option<u32>,
95}
96
97#[derive(Debug, Serialize)]
99pub struct ForecastListResponse {
100 pub forecasts: Vec<ChangeForecast>,
102 pub total: usize,
104}
105
106#[cfg(feature = "database")]
110pub async fn list_forecasts(
111 State(state): State<ForecastingState>,
112 Query(params): Query<ListForecastsQuery>,
113) -> Result<Json<ForecastListResponse>, StatusCode> {
114 let pool = match state.database.as_ref().and_then(|db| db.pool()) {
115 Some(pool) => pool,
116 None => {
117 return Ok(Json(ForecastListResponse {
118 forecasts: Vec::new(),
119 total: 0,
120 }));
121 }
122 };
123
124 let mut query = String::from(
126 "SELECT id, workspace_id, service_id, service_name, endpoint, method,
127 forecast_window_days, predicted_change_probability, predicted_break_probability,
128 next_expected_change_date, next_expected_break_date, volatility_score, confidence,
129 seasonal_patterns, predicted_at, expires_at
130 FROM api_change_forecasts WHERE expires_at > NOW()",
131 );
132
133 let mut bind_index = 1;
134
135 if params.workspace_id.is_some() {
136 query.push_str(&format!(" AND workspace_id = ${}", bind_index));
137 bind_index += 1;
138 }
139
140 if params.service_id.is_some() {
141 query.push_str(&format!(" AND service_id = ${}", bind_index));
142 bind_index += 1;
143 }
144
145 if params.endpoint.is_some() {
146 query.push_str(&format!(" AND endpoint = ${}", bind_index));
147 bind_index += 1;
148 }
149
150 if params.method.is_some() {
151 query.push_str(&format!(" AND method = ${}", bind_index));
152 bind_index += 1;
153 }
154
155 if let Some(_window) = params.window_days {
156 query.push_str(&format!(" AND forecast_window_days = ${}", bind_index));
157 }
158
159 query.push_str(" ORDER BY predicted_at DESC LIMIT 100");
160
161 let mut query_builder = sqlx::query(&query);
163
164 if let Some(ws_id) = ¶ms.workspace_id {
165 let uuid = Uuid::parse_str(ws_id).ok();
166 query_builder = query_builder.bind(uuid);
167 }
168
169 if let Some(svc_id) = ¶ms.service_id {
170 query_builder = query_builder.bind(svc_id);
171 }
172
173 if let Some(ep) = ¶ms.endpoint {
174 query_builder = query_builder.bind(ep);
175 }
176
177 if let Some(m) = ¶ms.method {
178 query_builder = query_builder.bind(m);
179 }
180
181 if let Some(window) = params.window_days {
182 query_builder = query_builder.bind(window as i32);
183 }
184
185 let rows = query_builder.fetch_all(pool).await.map_err(|e| {
187 tracing::error!("Failed to query forecasts: {}", e);
188 StatusCode::INTERNAL_SERVER_ERROR
189 })?;
190
191 let mut forecasts = Vec::new();
193 for row in rows {
194 match map_row_to_change_forecast(&row) {
195 Ok(forecast) => forecasts.push(forecast),
196 Err(e) => {
197 tracing::warn!("Failed to map forecast row: {}", e);
198 continue;
199 }
200 }
201 }
202
203 let total = forecasts.len();
204 Ok(Json(ForecastListResponse { forecasts, total }))
205}
206
207#[cfg(not(feature = "database"))]
211pub async fn list_forecasts(
212 State(_state): State<ForecastingState>,
213 Query(_params): Query<ListForecastsQuery>,
214) -> Result<Json<ForecastListResponse>, StatusCode> {
215 Ok(Json(ForecastListResponse {
216 forecasts: Vec::new(),
217 total: 0,
218 }))
219}
220
221#[cfg(feature = "database")]
225pub async fn get_service_forecasts(
226 State(state): State<ForecastingState>,
227 Path(service_id): Path<String>,
228 Query(_params): Query<ListForecastsQuery>,
229) -> Result<Json<ForecastListResponse>, StatusCode> {
230 let pool = match state.database.as_ref().and_then(|db| db.pool()) {
231 Some(pool) => pool,
232 None => {
233 return Ok(Json(ForecastListResponse {
234 forecasts: Vec::new(),
235 total: 0,
236 }));
237 }
238 };
239
240 let rows = sqlx::query(
242 "SELECT * FROM api_change_forecasts
243 WHERE service_id = $1 AND expires_at > NOW()
244 ORDER BY predicted_at DESC LIMIT 50",
245 )
246 .bind(&service_id)
247 .fetch_all(pool)
248 .await
249 .map_err(|e| {
250 tracing::error!("Failed to query service forecasts: {}", e);
251 StatusCode::INTERNAL_SERVER_ERROR
252 })?;
253
254 let mut forecasts = Vec::new();
256 for row in rows {
257 match map_row_to_change_forecast(&row) {
258 Ok(forecast) => forecasts.push(forecast),
259 Err(e) => {
260 tracing::warn!("Failed to map service forecast row: {}", e);
261 continue;
262 }
263 }
264 }
265
266 let total = forecasts.len();
267 Ok(Json(ForecastListResponse { forecasts, total }))
268}
269
270#[cfg(not(feature = "database"))]
274pub async fn get_service_forecasts(
275 State(_state): State<ForecastingState>,
276 Path(_service_id): Path<String>,
277 Query(_params): Query<ListForecastsQuery>,
278) -> Result<Json<ForecastListResponse>, StatusCode> {
279 Ok(Json(ForecastListResponse {
280 forecasts: Vec::new(),
281 total: 0,
282 }))
283}
284
285#[cfg(feature = "database")]
289pub async fn get_endpoint_forecasts(
290 State(state): State<ForecastingState>,
291 Path(endpoint): Path<String>,
292 Query(params): Query<ListForecastsQuery>,
293) -> Result<Json<ForecastListResponse>, StatusCode> {
294 let pool = match state.database.as_ref().and_then(|db| db.pool()) {
295 Some(pool) => pool,
296 None => {
297 return Ok(Json(ForecastListResponse {
298 forecasts: Vec::new(),
299 total: 0,
300 }));
301 }
302 };
303
304 let method = params.method.as_deref().unwrap_or("%");
305
306 let rows = sqlx::query(
307 "SELECT * FROM api_change_forecasts
308 WHERE endpoint = $1 AND method LIKE $2 AND expires_at > NOW()
309 ORDER BY predicted_at DESC LIMIT 50",
310 )
311 .bind(&endpoint)
312 .bind(method)
313 .fetch_all(pool)
314 .await
315 .map_err(|e| {
316 tracing::error!("Failed to query endpoint forecasts: {}", e);
317 StatusCode::INTERNAL_SERVER_ERROR
318 })?;
319
320 let mut forecasts = Vec::new();
322 for row in rows {
323 match map_row_to_change_forecast(&row) {
324 Ok(forecast) => forecasts.push(forecast),
325 Err(e) => {
326 tracing::warn!("Failed to map endpoint forecast row: {}", e);
327 continue;
328 }
329 }
330 }
331
332 let total = forecasts.len();
333 Ok(Json(ForecastListResponse { forecasts, total }))
334}
335
336#[cfg(not(feature = "database"))]
340pub async fn get_endpoint_forecasts(
341 State(_state): State<ForecastingState>,
342 Path(_endpoint): Path<String>,
343 Query(_params): Query<ListForecastsQuery>,
344) -> Result<Json<ForecastListResponse>, StatusCode> {
345 Ok(Json(ForecastListResponse {
346 forecasts: Vec::new(),
347 total: 0,
348 }))
349}
350
351#[derive(Debug, Deserialize)]
353pub struct RefreshForecastsRequest {
354 pub workspace_id: Option<String>,
356 pub service_id: Option<String>,
358 pub endpoint: Option<String>,
360 pub method: Option<String>,
362}
363
364#[cfg(feature = "database")]
368pub async fn refresh_forecasts(
369 State(state): State<ForecastingState>,
370 Json(request): Json<RefreshForecastsRequest>,
371) -> Result<Json<serde_json::Value>, StatusCode> {
372 let pool = match state.database.as_ref().and_then(|db| db.pool()) {
373 Some(pool) => pool,
374 None => {
375 return Ok(Json(serde_json::json!({
376 "success": false,
377 "error": "Database not available"
378 })));
379 }
380 };
381
382 let mut incident_query = String::from(
384 "SELECT id, workspace_id, endpoint, method, incident_type, severity, status,
385 detected_at, details, created_at, updated_at
386 FROM drift_incidents WHERE 1=1",
387 );
388
389 if let Some(_ws_id) = &request.workspace_id {
390 incident_query.push_str(" AND workspace_id = $1");
391 }
392
393 let rows = sqlx::query(&incident_query).fetch_all(pool).await.map_err(|e| {
395 tracing::error!("Failed to query drift incidents: {}", e);
396 StatusCode::INTERNAL_SERVER_ERROR
397 })?;
398
399 use mockforge_foundation::incidents_types::{IncidentSeverity, IncidentStatus, IncidentType};
401 use sqlx::Row;
402 let mut incidents = Vec::new();
403 for row in rows {
404 let id: Uuid = row.try_get("id").map_err(|e| {
405 tracing::error!("Failed to get id from row: {}", e);
406 StatusCode::INTERNAL_SERVER_ERROR
407 })?;
408 let workspace_id: Option<Uuid> = row.try_get("workspace_id").ok();
409 let endpoint: String = match row.try_get("endpoint") {
410 Ok(e) => e,
411 Err(_) => continue,
412 };
413 let method: String = match row.try_get("method") {
414 Ok(m) => m,
415 Err(_) => continue,
416 };
417 let incident_type_str: String = match row.try_get("incident_type") {
418 Ok(s) => s,
419 Err(_) => continue,
420 };
421 let severity_str: String = match row.try_get("severity") {
422 Ok(s) => s,
423 Err(_) => continue,
424 };
425 let status_str: String = match row.try_get("status") {
426 Ok(s) => s,
427 Err(_) => continue,
428 };
429 let detected_at: DateTime<Utc> = match row.try_get("detected_at") {
430 Ok(dt) => dt,
431 Err(_) => continue,
432 };
433 let details_json: serde_json::Value = row.try_get("details").unwrap_or_default();
434 let created_at: DateTime<Utc> = match row.try_get("created_at") {
435 Ok(dt) => dt,
436 Err(_) => continue,
437 };
438 let updated_at: DateTime<Utc> = match row.try_get("updated_at") {
439 Ok(dt) => dt,
440 Err(_) => continue,
441 };
442
443 let incident_type = match incident_type_str.as_str() {
444 "breaking_change" => IncidentType::BreakingChange,
445 "threshold_exceeded" => IncidentType::ThresholdExceeded,
446 _ => continue, };
448
449 let severity = match severity_str.as_str() {
450 "low" => IncidentSeverity::Low,
451 "medium" => IncidentSeverity::Medium,
452 "high" => IncidentSeverity::High,
453 "critical" => IncidentSeverity::Critical,
454 _ => continue, };
456
457 let status = match status_str.as_str() {
458 "open" => IncidentStatus::Open,
459 "acknowledged" => IncidentStatus::Acknowledged,
460 "resolved" => IncidentStatus::Resolved,
461 "closed" => IncidentStatus::Closed,
462 _ => continue, };
464
465 incidents.push(DriftIncident {
466 id: id.to_string(),
467 budget_id: None,
468 workspace_id: workspace_id.map(|u| u.to_string()),
469 endpoint,
470 method,
471 incident_type,
472 severity,
473 status,
474 detected_at: detected_at.timestamp(),
475 resolved_at: None,
476 details: details_json,
477 external_ticket_id: None,
478 external_ticket_url: None,
479 created_at: created_at.timestamp(),
480 updated_at: updated_at.timestamp(),
481 sync_cycle_id: None,
482 contract_diff_id: None,
483 before_sample: None,
484 after_sample: None,
485 fitness_test_results: Vec::new(),
486 affected_consumers: None,
487 protocol: None,
488 });
489 }
490
491 use mockforge_foundation::incidents_types::DriftIncident;
493 use std::collections::HashMap;
494 let mut forecasts_generated = 0;
495 let mut endpoint_groups: HashMap<(String, String), Vec<DriftIncident>> = HashMap::new();
496
497 for incident in incidents {
498 endpoint_groups
499 .entry((incident.endpoint.clone(), incident.method.clone()))
500 .or_default()
501 .push(incident);
502 }
503
504 for ((endpoint, method), group_incidents) in endpoint_groups {
505 if let Some(forecast) = state.forecaster.generate_forecast(
506 &group_incidents,
507 request.workspace_id.clone(),
508 None, None, endpoint,
511 method,
512 30, ) {
514 if let Err(e) = store_forecast(pool, &forecast, request.workspace_id.as_deref()).await {
516 tracing::warn!("Failed to store forecast: {}", e);
517 }
518 forecasts_generated += 1;
519 }
520 }
521
522 Ok(Json(serde_json::json!({
523 "success": true,
524 "message": "Forecasts refreshed",
525 "forecasts_generated": forecasts_generated
526 })))
527}
528
529#[cfg(not(feature = "database"))]
533pub async fn refresh_forecasts(
534 State(_state): State<ForecastingState>,
535 Json(_request): Json<RefreshForecastsRequest>,
536) -> Result<Json<serde_json::Value>, StatusCode> {
537 Ok(Json(serde_json::json!({
538 "success": false,
539 "error": "Database not available"
540 })))
541}
542
543#[cfg(feature = "database")]
545pub async fn store_forecast(
546 pool: &sqlx::PgPool,
547 forecast: &ChangeForecast,
548 workspace_id: Option<&str>,
549) -> Result<(), sqlx::Error> {
550 let id = Uuid::new_v4();
551 let workspace_uuid = workspace_id.and_then(|id| Uuid::parse_str(id).ok());
552
553 sqlx::query(
554 r#"
555 INSERT INTO api_change_forecasts (
556 id, workspace_id, service_id, service_name, endpoint, method,
557 forecast_window_days, predicted_change_probability, predicted_break_probability,
558 next_expected_change_date, next_expected_break_date, volatility_score, confidence,
559 seasonal_patterns, predicted_at, expires_at
560 ) VALUES (
561 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16
562 )
563 ON CONFLICT (workspace_id, service_id, endpoint, method, forecast_window_days)
564 DO UPDATE SET
565 predicted_change_probability = EXCLUDED.predicted_change_probability,
566 predicted_break_probability = EXCLUDED.predicted_break_probability,
567 next_expected_change_date = EXCLUDED.next_expected_change_date,
568 next_expected_break_date = EXCLUDED.next_expected_break_date,
569 volatility_score = EXCLUDED.volatility_score,
570 confidence = EXCLUDED.confidence,
571 seasonal_patterns = EXCLUDED.seasonal_patterns,
572 predicted_at = EXCLUDED.predicted_at,
573 expires_at = EXCLUDED.expires_at,
574 updated_at = NOW()
575 "#,
576 )
577 .bind(id)
578 .bind(workspace_uuid)
579 .bind(forecast.service_id.as_deref())
580 .bind(forecast.service_name.as_deref())
581 .bind(&forecast.endpoint)
582 .bind(&forecast.method)
583 .bind(forecast.forecast_window_days as i32)
584 .bind(forecast.predicted_change_probability)
585 .bind(forecast.predicted_break_probability)
586 .bind(forecast.next_expected_change_date)
587 .bind(forecast.next_expected_break_date)
588 .bind(forecast.volatility_score)
589 .bind(forecast.confidence)
590 .bind(serde_json::to_value(&forecast.seasonal_patterns).unwrap_or_default())
591 .bind(forecast.predicted_at)
592 .bind(forecast.expires_at)
593 .execute(pool)
594 .await?;
595
596 Ok(())
597}
598
599pub fn forecasting_router(state: ForecastingState) -> axum::Router {
601 use axum::routing::{get, post};
602 use axum::Router;
603
604 Router::new()
605 .route("/api/v1/forecasts", get(list_forecasts))
606 .route("/api/v1/forecasts/service/{service_id}", get(get_service_forecasts))
607 .route("/api/v1/forecasts/endpoint/{endpoint}", get(get_endpoint_forecasts))
608 .route("/api/v1/forecasts/refresh", post(refresh_forecasts))
609 .with_state(state)
610}