1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::Json,
9};
10use mockforge_core::contract_drift::forecasting::{ChangeForecast, Forecaster};
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13
14#[cfg(feature = "database")]
15use chrono::{DateTime, Utc};
16#[cfg(feature = "database")]
17use mockforge_core::contract_drift::forecasting::SeasonalPattern;
18#[cfg(feature = "database")]
19use uuid::Uuid;
20
21use crate::database::Database;
22
23#[cfg(feature = "database")]
25fn map_row_to_change_forecast(row: &sqlx::postgres::PgRow) -> Result<ChangeForecast, sqlx::Error> {
26 use sqlx::Row;
27
28 let service_id: Option<String> = row.try_get("service_id")?;
29 let service_name: Option<String> = row.try_get("service_name")?;
30 let endpoint: String = row.try_get("endpoint")?;
31 let method: String = row.try_get("method")?;
32 let forecast_window_days: i32 = row.try_get("forecast_window_days")?;
33 let predicted_change_probability: f64 = row.try_get("predicted_change_probability")?;
34 let predicted_break_probability: f64 = row.try_get("predicted_break_probability")?;
35 let next_expected_change_date: Option<DateTime<Utc>> =
36 row.try_get("next_expected_change_date")?;
37 let next_expected_break_date: Option<DateTime<Utc>> =
38 row.try_get("next_expected_break_date")?;
39 let volatility_score: f64 = row.try_get("volatility_score")?;
40 let confidence: f64 = row.try_get("confidence")?;
41 let seasonal_patterns_json: serde_json::Value =
42 row.try_get("seasonal_patterns").unwrap_or_default();
43 let predicted_at: DateTime<Utc> = row.try_get("predicted_at")?;
44 let expires_at: DateTime<Utc> = row.try_get("expires_at")?;
45
46 let seasonal_patterns: Vec<SeasonalPattern> = if seasonal_patterns_json.is_array() {
48 serde_json::from_value(seasonal_patterns_json).unwrap_or_default()
49 } else {
50 Vec::new()
51 };
52
53 Ok(ChangeForecast {
54 service_id,
55 service_name,
56 endpoint,
57 method,
58 forecast_window_days: forecast_window_days as u32,
59 predicted_change_probability,
60 predicted_break_probability,
61 next_expected_change_date,
62 next_expected_break_date,
63 volatility_score,
64 confidence,
65 seasonal_patterns,
66 predicted_at,
67 expires_at,
68 })
69}
70
71#[derive(Clone)]
73pub struct ForecastingState {
74 pub forecaster: Arc<Forecaster>,
76 pub database: Option<Database>,
78}
79
80#[derive(Debug, Deserialize)]
82pub struct ListForecastsQuery {
83 pub workspace_id: Option<String>,
85 pub service_id: Option<String>,
87 pub endpoint: Option<String>,
89 pub method: Option<String>,
91 pub window_days: Option<u32>,
93}
94
95#[derive(Debug, Serialize)]
97pub struct ForecastListResponse {
98 pub forecasts: Vec<ChangeForecast>,
100 pub total: usize,
102}
103
104#[cfg(feature = "database")]
108pub async fn list_forecasts(
109 State(state): State<ForecastingState>,
110 Query(params): Query<ListForecastsQuery>,
111) -> Result<Json<ForecastListResponse>, StatusCode> {
112 let pool = match state.database.as_ref().and_then(|db| db.pool()) {
113 Some(pool) => pool,
114 None => {
115 return Ok(Json(ForecastListResponse {
116 forecasts: Vec::new(),
117 total: 0,
118 }));
119 }
120 };
121
122 let mut query = String::from(
124 "SELECT id, workspace_id, service_id, service_name, endpoint, method,
125 forecast_window_days, predicted_change_probability, predicted_break_probability,
126 next_expected_change_date, next_expected_break_date, volatility_score, confidence,
127 seasonal_patterns, predicted_at, expires_at
128 FROM api_change_forecasts WHERE expires_at > NOW()",
129 );
130
131 let mut bind_index = 1;
132
133 if params.workspace_id.is_some() {
134 query.push_str(&format!(" AND workspace_id = ${}", bind_index));
135 bind_index += 1;
136 }
137
138 if params.service_id.is_some() {
139 query.push_str(&format!(" AND service_id = ${}", bind_index));
140 bind_index += 1;
141 }
142
143 if params.endpoint.is_some() {
144 query.push_str(&format!(" AND endpoint = ${}", bind_index));
145 bind_index += 1;
146 }
147
148 if params.method.is_some() {
149 query.push_str(&format!(" AND method = ${}", bind_index));
150 bind_index += 1;
151 }
152
153 if let Some(_window) = params.window_days {
154 query.push_str(&format!(" AND forecast_window_days = ${}", bind_index));
155 }
156
157 query.push_str(" ORDER BY predicted_at DESC LIMIT 100");
158
159 let mut query_builder = sqlx::query(&query);
161
162 if let Some(ws_id) = ¶ms.workspace_id {
163 let uuid = Uuid::parse_str(ws_id).ok();
164 query_builder = query_builder.bind(uuid);
165 }
166
167 if let Some(svc_id) = ¶ms.service_id {
168 query_builder = query_builder.bind(svc_id);
169 }
170
171 if let Some(ep) = ¶ms.endpoint {
172 query_builder = query_builder.bind(ep);
173 }
174
175 if let Some(m) = ¶ms.method {
176 query_builder = query_builder.bind(m);
177 }
178
179 if let Some(window) = params.window_days {
180 query_builder = query_builder.bind(window as i32);
181 }
182
183 let rows = query_builder.fetch_all(pool).await.map_err(|e| {
185 tracing::error!("Failed to query forecasts: {}", e);
186 StatusCode::INTERNAL_SERVER_ERROR
187 })?;
188
189 let mut forecasts = Vec::new();
191 for row in rows {
192 match map_row_to_change_forecast(&row) {
193 Ok(forecast) => forecasts.push(forecast),
194 Err(e) => {
195 tracing::warn!("Failed to map forecast row: {}", e);
196 continue;
197 }
198 }
199 }
200
201 let total = forecasts.len();
202 Ok(Json(ForecastListResponse { forecasts, total }))
203}
204
205#[cfg(not(feature = "database"))]
209pub async fn list_forecasts(
210 State(_state): State<ForecastingState>,
211 Query(_params): Query<ListForecastsQuery>,
212) -> Result<Json<ForecastListResponse>, StatusCode> {
213 Ok(Json(ForecastListResponse {
214 forecasts: Vec::new(),
215 total: 0,
216 }))
217}
218
219#[cfg(feature = "database")]
223pub async fn get_service_forecasts(
224 State(state): State<ForecastingState>,
225 Path(service_id): Path<String>,
226 Query(_params): Query<ListForecastsQuery>,
227) -> Result<Json<ForecastListResponse>, StatusCode> {
228 let pool = match state.database.as_ref().and_then(|db| db.pool()) {
229 Some(pool) => pool,
230 None => {
231 return Ok(Json(ForecastListResponse {
232 forecasts: Vec::new(),
233 total: 0,
234 }));
235 }
236 };
237
238 let rows = sqlx::query(
240 "SELECT * FROM api_change_forecasts
241 WHERE service_id = $1 AND expires_at > NOW()
242 ORDER BY predicted_at DESC LIMIT 50",
243 )
244 .bind(&service_id)
245 .fetch_all(pool)
246 .await
247 .map_err(|e| {
248 tracing::error!("Failed to query service forecasts: {}", e);
249 StatusCode::INTERNAL_SERVER_ERROR
250 })?;
251
252 let mut forecasts = Vec::new();
254 for row in rows {
255 match map_row_to_change_forecast(&row) {
256 Ok(forecast) => forecasts.push(forecast),
257 Err(e) => {
258 tracing::warn!("Failed to map service forecast row: {}", e);
259 continue;
260 }
261 }
262 }
263
264 let total = forecasts.len();
265 Ok(Json(ForecastListResponse { forecasts, total }))
266}
267
268#[cfg(not(feature = "database"))]
272pub async fn get_service_forecasts(
273 State(_state): State<ForecastingState>,
274 Path(_service_id): Path<String>,
275 Query(_params): Query<ListForecastsQuery>,
276) -> Result<Json<ForecastListResponse>, StatusCode> {
277 Ok(Json(ForecastListResponse {
278 forecasts: Vec::new(),
279 total: 0,
280 }))
281}
282
283#[cfg(feature = "database")]
287pub async fn get_endpoint_forecasts(
288 State(state): State<ForecastingState>,
289 Path(endpoint): Path<String>,
290 Query(params): Query<ListForecastsQuery>,
291) -> Result<Json<ForecastListResponse>, StatusCode> {
292 let pool = match state.database.as_ref().and_then(|db| db.pool()) {
293 Some(pool) => pool,
294 None => {
295 return Ok(Json(ForecastListResponse {
296 forecasts: Vec::new(),
297 total: 0,
298 }));
299 }
300 };
301
302 let method = params.method.as_deref().unwrap_or("%");
303
304 let rows = sqlx::query(
305 "SELECT * FROM api_change_forecasts
306 WHERE endpoint = $1 AND method LIKE $2 AND expires_at > NOW()
307 ORDER BY predicted_at DESC LIMIT 50",
308 )
309 .bind(&endpoint)
310 .bind(method)
311 .fetch_all(pool)
312 .await
313 .map_err(|e| {
314 tracing::error!("Failed to query endpoint forecasts: {}", e);
315 StatusCode::INTERNAL_SERVER_ERROR
316 })?;
317
318 let mut forecasts = Vec::new();
320 for row in rows {
321 match map_row_to_change_forecast(&row) {
322 Ok(forecast) => forecasts.push(forecast),
323 Err(e) => {
324 tracing::warn!("Failed to map endpoint forecast row: {}", e);
325 continue;
326 }
327 }
328 }
329
330 let total = forecasts.len();
331 Ok(Json(ForecastListResponse { forecasts, total }))
332}
333
334#[cfg(not(feature = "database"))]
338pub async fn get_endpoint_forecasts(
339 State(_state): State<ForecastingState>,
340 Path(_endpoint): Path<String>,
341 Query(_params): Query<ListForecastsQuery>,
342) -> Result<Json<ForecastListResponse>, StatusCode> {
343 Ok(Json(ForecastListResponse {
344 forecasts: Vec::new(),
345 total: 0,
346 }))
347}
348
349#[derive(Debug, Deserialize)]
351pub struct RefreshForecastsRequest {
352 pub workspace_id: Option<String>,
354 pub service_id: Option<String>,
356 pub endpoint: Option<String>,
358 pub method: Option<String>,
360}
361
362#[cfg(feature = "database")]
366pub async fn refresh_forecasts(
367 State(state): State<ForecastingState>,
368 Json(request): Json<RefreshForecastsRequest>,
369) -> Result<Json<serde_json::Value>, StatusCode> {
370 let pool = match state.database.as_ref().and_then(|db| db.pool()) {
371 Some(pool) => pool,
372 None => {
373 return Ok(Json(serde_json::json!({
374 "success": false,
375 "error": "Database not available"
376 })));
377 }
378 };
379
380 let mut incident_query = String::from(
382 "SELECT id, workspace_id, endpoint, method, incident_type, severity, status,
383 detected_at, details, created_at, updated_at
384 FROM drift_incidents WHERE 1=1",
385 );
386
387 if let Some(_ws_id) = &request.workspace_id {
388 incident_query.push_str(" AND workspace_id = $1");
389 }
390
391 let rows = sqlx::query(&incident_query).fetch_all(pool).await.map_err(|e| {
393 tracing::error!("Failed to query drift incidents: {}", e);
394 StatusCode::INTERNAL_SERVER_ERROR
395 })?;
396
397 use mockforge_core::incidents::types::{IncidentSeverity, IncidentStatus, IncidentType};
399 use sqlx::Row;
400 let mut incidents = Vec::new();
401 for row in rows {
402 let id: uuid::Uuid = row.try_get("id").map_err(|e| {
403 tracing::error!("Failed to get id from row: {}", e);
404 StatusCode::INTERNAL_SERVER_ERROR
405 })?;
406 let workspace_id: Option<uuid::Uuid> = row.try_get("workspace_id").ok();
407 let endpoint: String = match row.try_get("endpoint") {
408 Ok(e) => e,
409 Err(_) => continue,
410 };
411 let method: String = match row.try_get("method") {
412 Ok(m) => m,
413 Err(_) => continue,
414 };
415 let incident_type_str: String = match row.try_get("incident_type") {
416 Ok(s) => s,
417 Err(_) => continue,
418 };
419 let severity_str: String = match row.try_get("severity") {
420 Ok(s) => s,
421 Err(_) => continue,
422 };
423 let status_str: String = match row.try_get("status") {
424 Ok(s) => s,
425 Err(_) => continue,
426 };
427 let detected_at: DateTime<Utc> = match row.try_get("detected_at") {
428 Ok(dt) => dt,
429 Err(_) => continue,
430 };
431 let details_json: serde_json::Value = row.try_get("details").unwrap_or_default();
432 let created_at: DateTime<Utc> = match row.try_get("created_at") {
433 Ok(dt) => dt,
434 Err(_) => continue,
435 };
436 let updated_at: DateTime<Utc> = match row.try_get("updated_at") {
437 Ok(dt) => dt,
438 Err(_) => continue,
439 };
440
441 let incident_type = match incident_type_str.as_str() {
442 "breaking_change" => IncidentType::BreakingChange,
443 "threshold_exceeded" => IncidentType::ThresholdExceeded,
444 _ => continue, };
446
447 let severity = match severity_str.as_str() {
448 "low" => IncidentSeverity::Low,
449 "medium" => IncidentSeverity::Medium,
450 "high" => IncidentSeverity::High,
451 "critical" => IncidentSeverity::Critical,
452 _ => continue, };
454
455 let status = match status_str.as_str() {
456 "open" => IncidentStatus::Open,
457 "acknowledged" => IncidentStatus::Acknowledged,
458 "resolved" => IncidentStatus::Resolved,
459 "closed" => IncidentStatus::Closed,
460 _ => continue, };
462
463 incidents.push(DriftIncident {
464 id: id.to_string(),
465 budget_id: None,
466 workspace_id: workspace_id.map(|u| u.to_string()),
467 endpoint,
468 method,
469 incident_type,
470 severity,
471 status,
472 detected_at: detected_at.timestamp(),
473 resolved_at: None,
474 details: details_json,
475 external_ticket_id: None,
476 external_ticket_url: None,
477 created_at: created_at.timestamp(),
478 updated_at: updated_at.timestamp(),
479 sync_cycle_id: None,
480 contract_diff_id: None,
481 before_sample: None,
482 after_sample: None,
483 fitness_test_results: Vec::new(),
484 affected_consumers: None,
485 protocol: None,
486 });
487 }
488
489 use mockforge_core::incidents::types::DriftIncident;
491 use std::collections::HashMap;
492 let mut forecasts_generated = 0;
493 let mut endpoint_groups: HashMap<(String, String), Vec<DriftIncident>> = HashMap::new();
494
495 for incident in incidents {
496 endpoint_groups
497 .entry((incident.endpoint.clone(), incident.method.clone()))
498 .or_default()
499 .push(incident);
500 }
501
502 for ((endpoint, method), group_incidents) in endpoint_groups {
503 if let Some(forecast) = state.forecaster.generate_forecast(
504 &group_incidents,
505 request.workspace_id.clone(),
506 None, None, endpoint,
509 method,
510 30, ) {
512 if let Err(e) = store_forecast(pool, &forecast, request.workspace_id.as_deref()).await {
514 tracing::warn!("Failed to store forecast: {}", e);
515 }
516 forecasts_generated += 1;
517 }
518 }
519
520 Ok(Json(serde_json::json!({
521 "success": true,
522 "message": "Forecasts refreshed",
523 "forecasts_generated": forecasts_generated
524 })))
525}
526
527#[cfg(not(feature = "database"))]
531pub async fn refresh_forecasts(
532 State(_state): State<ForecastingState>,
533 Json(_request): Json<RefreshForecastsRequest>,
534) -> Result<Json<serde_json::Value>, StatusCode> {
535 Ok(Json(serde_json::json!({
536 "success": false,
537 "error": "Database not available"
538 })))
539}
540
541#[cfg(feature = "database")]
543pub async fn store_forecast(
544 pool: &sqlx::PgPool,
545 forecast: &ChangeForecast,
546 workspace_id: Option<&str>,
547) -> Result<(), sqlx::Error> {
548 let id = Uuid::new_v4();
549 let workspace_uuid = workspace_id.and_then(|id| Uuid::parse_str(id).ok());
550
551 sqlx::query(
552 r#"
553 INSERT INTO api_change_forecasts (
554 id, workspace_id, service_id, service_name, endpoint, method,
555 forecast_window_days, predicted_change_probability, predicted_break_probability,
556 next_expected_change_date, next_expected_break_date, volatility_score, confidence,
557 seasonal_patterns, predicted_at, expires_at
558 ) VALUES (
559 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16
560 )
561 ON CONFLICT (workspace_id, service_id, endpoint, method, forecast_window_days)
562 DO UPDATE SET
563 predicted_change_probability = EXCLUDED.predicted_change_probability,
564 predicted_break_probability = EXCLUDED.predicted_break_probability,
565 next_expected_change_date = EXCLUDED.next_expected_change_date,
566 next_expected_break_date = EXCLUDED.next_expected_break_date,
567 volatility_score = EXCLUDED.volatility_score,
568 confidence = EXCLUDED.confidence,
569 seasonal_patterns = EXCLUDED.seasonal_patterns,
570 predicted_at = EXCLUDED.predicted_at,
571 expires_at = EXCLUDED.expires_at,
572 updated_at = NOW()
573 "#,
574 )
575 .bind(id)
576 .bind(workspace_uuid)
577 .bind(forecast.service_id.as_deref())
578 .bind(forecast.service_name.as_deref())
579 .bind(&forecast.endpoint)
580 .bind(&forecast.method)
581 .bind(forecast.forecast_window_days as i32)
582 .bind(forecast.predicted_change_probability)
583 .bind(forecast.predicted_break_probability)
584 .bind(forecast.next_expected_change_date)
585 .bind(forecast.next_expected_break_date)
586 .bind(forecast.volatility_score)
587 .bind(forecast.confidence)
588 .bind(serde_json::to_value(&forecast.seasonal_patterns).unwrap_or_default())
589 .bind(forecast.predicted_at)
590 .bind(forecast.expires_at)
591 .execute(pool)
592 .await?;
593
594 Ok(())
595}
596
597pub fn forecasting_router(state: ForecastingState) -> axum::Router {
599 use axum::routing::{get, post};
600 use axum::Router;
601
602 Router::new()
603 .route("/api/v1/forecasts", get(list_forecasts))
604 .route("/api/v1/forecasts/service/{service_id}", get(get_service_forecasts))
605 .route("/api/v1/forecasts/endpoint/{endpoint}", get(get_endpoint_forecasts))
606 .route("/api/v1/forecasts/refresh", post(refresh_forecasts))
607 .with_state(state)
608}