1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::Json,
9};
10use mockforge_core::contract_drift::forecasting::{ChangeForecast, Forecaster};
11use serde::{Deserialize, Serialize};
12use std::sync::Arc;
13
14#[cfg(feature = "database")]
15use chrono::{DateTime, Utc};
16#[cfg(feature = "database")]
17use mockforge_core::contract_drift::forecasting::SeasonalPattern;
18#[cfg(feature = "database")]
19use uuid::Uuid;
20
21use crate::database::Database;
22
23#[cfg(feature = "database")]
25fn map_row_to_change_forecast(row: &sqlx::postgres::PgRow) -> Result<ChangeForecast, sqlx::Error> {
26 use sqlx::Row;
27
28 let service_id: Option<String> = row.try_get("service_id")?;
29 let service_name: Option<String> = row.try_get("service_name")?;
30 let endpoint: String = row.try_get("endpoint")?;
31 let method: String = row.try_get("method")?;
32 let forecast_window_days: i32 = row.try_get("forecast_window_days")?;
33 let predicted_change_probability: f64 = row.try_get("predicted_change_probability")?;
34 let predicted_break_probability: f64 = row.try_get("predicted_break_probability")?;
35 let next_expected_change_date: Option<DateTime<Utc>> =
36 row.try_get("next_expected_change_date")?;
37 let next_expected_break_date: Option<DateTime<Utc>> =
38 row.try_get("next_expected_break_date")?;
39 let volatility_score: f64 = row.try_get("volatility_score")?;
40 let confidence: f64 = row.try_get("confidence")?;
41 let seasonal_patterns_json: serde_json::Value =
42 row.try_get("seasonal_patterns").unwrap_or_default();
43 let predicted_at: DateTime<Utc> = row.try_get("predicted_at")?;
44 let expires_at: DateTime<Utc> = row.try_get("expires_at")?;
45
46 let seasonal_patterns: Vec<SeasonalPattern> = if seasonal_patterns_json.is_array() {
48 serde_json::from_value(seasonal_patterns_json).unwrap_or_default()
49 } else {
50 Vec::new()
51 };
52
53 Ok(ChangeForecast {
54 service_id,
55 service_name,
56 endpoint,
57 method,
58 forecast_window_days: forecast_window_days as u32,
59 predicted_change_probability,
60 predicted_break_probability,
61 next_expected_change_date,
62 next_expected_break_date,
63 volatility_score,
64 confidence,
65 seasonal_patterns,
66 predicted_at,
67 expires_at,
68 })
69}
70
71#[derive(Clone)]
73pub struct ForecastingState {
74 pub forecaster: Arc<Forecaster>,
76 pub database: Option<Database>,
78}
79
80#[derive(Debug, Deserialize)]
82pub struct ListForecastsQuery {
83 pub workspace_id: Option<String>,
85 pub service_id: Option<String>,
87 pub endpoint: Option<String>,
89 pub method: Option<String>,
91 pub window_days: Option<u32>,
93}
94
95#[derive(Debug, Serialize)]
97pub struct ForecastListResponse {
98 pub forecasts: Vec<ChangeForecast>,
100 pub total: usize,
102}
103
104#[cfg(feature = "database")]
108pub async fn list_forecasts(
109 State(state): State<ForecastingState>,
110 Query(params): Query<ListForecastsQuery>,
111) -> Result<Json<ForecastListResponse>, StatusCode> {
112 let pool = match state.database.as_ref().and_then(|db| db.pool()) {
113 Some(pool) => pool,
114 None => {
115 return Ok(Json(ForecastListResponse {
116 forecasts: Vec::new(),
117 total: 0,
118 }));
119 }
120 };
121
122 let mut query = String::from(
124 "SELECT id, workspace_id, service_id, service_name, endpoint, method,
125 forecast_window_days, predicted_change_probability, predicted_break_probability,
126 next_expected_change_date, next_expected_break_date, volatility_score, confidence,
127 seasonal_patterns, predicted_at, expires_at
128 FROM api_change_forecasts WHERE expires_at > NOW()",
129 );
130
131 let mut bind_index = 1;
132
133 if params.workspace_id.is_some() {
134 query.push_str(&format!(" AND workspace_id = ${}", bind_index));
135 bind_index += 1;
136 }
137
138 if params.service_id.is_some() {
139 query.push_str(&format!(" AND service_id = ${}", bind_index));
140 bind_index += 1;
141 }
142
143 if params.endpoint.is_some() {
144 query.push_str(&format!(" AND endpoint = ${}", bind_index));
145 bind_index += 1;
146 }
147
148 if params.method.is_some() {
149 query.push_str(&format!(" AND method = ${}", bind_index));
150 bind_index += 1;
151 }
152
153 if let Some(window) = params.window_days {
154 query.push_str(&format!(" AND forecast_window_days = ${}", bind_index));
155 bind_index += 1;
156 }
157
158 query.push_str(" ORDER BY predicted_at DESC LIMIT 100");
159
160 let mut query_builder = sqlx::query(&query);
162
163 if let Some(ws_id) = ¶ms.workspace_id {
164 let uuid = Uuid::parse_str(ws_id).ok();
165 query_builder = query_builder.bind(uuid);
166 }
167
168 if let Some(svc_id) = ¶ms.service_id {
169 query_builder = query_builder.bind(svc_id);
170 }
171
172 if let Some(ep) = ¶ms.endpoint {
173 query_builder = query_builder.bind(ep);
174 }
175
176 if let Some(m) = ¶ms.method {
177 query_builder = query_builder.bind(m);
178 }
179
180 if let Some(window) = params.window_days {
181 query_builder = query_builder.bind(window as i32);
182 }
183
184 let rows = query_builder.fetch_all(pool).await.map_err(|e| {
186 tracing::error!("Failed to query forecasts: {}", e);
187 StatusCode::INTERNAL_SERVER_ERROR
188 })?;
189
190 let mut forecasts = Vec::new();
192 for row in rows {
193 match map_row_to_change_forecast(&row) {
194 Ok(forecast) => forecasts.push(forecast),
195 Err(e) => {
196 tracing::warn!("Failed to map forecast row: {}", e);
197 continue;
198 }
199 }
200 }
201
202 let total = forecasts.len();
203 Ok(Json(ForecastListResponse { forecasts, total }))
204}
205
206#[cfg(not(feature = "database"))]
210pub async fn list_forecasts(
211 State(_state): State<ForecastingState>,
212 Query(_params): Query<ListForecastsQuery>,
213) -> Result<Json<ForecastListResponse>, StatusCode> {
214 Ok(Json(ForecastListResponse {
215 forecasts: Vec::new(),
216 total: 0,
217 }))
218}
219
220#[cfg(feature = "database")]
224pub async fn get_service_forecasts(
225 State(state): State<ForecastingState>,
226 Path(service_id): Path<String>,
227 Query(_params): Query<ListForecastsQuery>,
228) -> Result<Json<ForecastListResponse>, StatusCode> {
229 let pool = match state.database.as_ref().and_then(|db| db.pool()) {
230 Some(pool) => pool,
231 None => {
232 return Ok(Json(ForecastListResponse {
233 forecasts: Vec::new(),
234 total: 0,
235 }));
236 }
237 };
238
239 let rows = sqlx::query(
241 "SELECT * FROM api_change_forecasts
242 WHERE service_id = $1 AND expires_at > NOW()
243 ORDER BY predicted_at DESC LIMIT 50",
244 )
245 .bind(&service_id)
246 .fetch_all(pool)
247 .await
248 .map_err(|e| {
249 tracing::error!("Failed to query service forecasts: {}", e);
250 StatusCode::INTERNAL_SERVER_ERROR
251 })?;
252
253 Ok(Json(ForecastListResponse {
255 forecasts: Vec::new(),
256 total: rows.len(),
257 }))
258}
259
260#[cfg(not(feature = "database"))]
264pub async fn get_service_forecasts(
265 State(_state): State<ForecastingState>,
266 Path(_service_id): Path<String>,
267 Query(_params): Query<ListForecastsQuery>,
268) -> Result<Json<ForecastListResponse>, StatusCode> {
269 Ok(Json(ForecastListResponse {
270 forecasts: Vec::new(),
271 total: 0,
272 }))
273}
274
275#[cfg(feature = "database")]
279pub async fn get_endpoint_forecasts(
280 State(state): State<ForecastingState>,
281 Path(endpoint): Path<String>,
282 Query(params): Query<ListForecastsQuery>,
283) -> Result<Json<ForecastListResponse>, StatusCode> {
284 let pool = match state.database.as_ref().and_then(|db| db.pool()) {
285 Some(pool) => pool,
286 None => {
287 return Ok(Json(ForecastListResponse {
288 forecasts: Vec::new(),
289 total: 0,
290 }));
291 }
292 };
293
294 let method = params.method.as_deref().unwrap_or("%");
295
296 let rows = sqlx::query(
297 "SELECT * FROM api_change_forecasts
298 WHERE endpoint = $1 AND method LIKE $2 AND expires_at > NOW()
299 ORDER BY predicted_at DESC LIMIT 50",
300 )
301 .bind(&endpoint)
302 .bind(method)
303 .fetch_all(pool)
304 .await
305 .map_err(|e| {
306 tracing::error!("Failed to query endpoint forecasts: {}", e);
307 StatusCode::INTERNAL_SERVER_ERROR
308 })?;
309
310 Ok(Json(ForecastListResponse {
311 forecasts: Vec::new(),
312 total: rows.len(),
313 }))
314}
315
316#[cfg(not(feature = "database"))]
320pub async fn get_endpoint_forecasts(
321 State(_state): State<ForecastingState>,
322 Path(_endpoint): Path<String>,
323 Query(_params): Query<ListForecastsQuery>,
324) -> Result<Json<ForecastListResponse>, StatusCode> {
325 Ok(Json(ForecastListResponse {
326 forecasts: Vec::new(),
327 total: 0,
328 }))
329}
330
331#[derive(Debug, Deserialize)]
333pub struct RefreshForecastsRequest {
334 pub workspace_id: Option<String>,
336 pub service_id: Option<String>,
338 pub endpoint: Option<String>,
340 pub method: Option<String>,
342}
343
344#[cfg(feature = "database")]
348pub async fn refresh_forecasts(
349 State(state): State<ForecastingState>,
350 Json(request): Json<RefreshForecastsRequest>,
351) -> Result<Json<serde_json::Value>, StatusCode> {
352 let pool = match state.database.as_ref().and_then(|db| db.pool()) {
353 Some(pool) => pool,
354 None => {
355 return Ok(Json(serde_json::json!({
356 "success": false,
357 "error": "Database not available"
358 })));
359 }
360 };
361
362 let mut incident_query = String::from(
364 "SELECT id, workspace_id, endpoint, method, incident_type, severity, status,
365 detected_at, details, created_at, updated_at
366 FROM drift_incidents WHERE 1=1",
367 );
368
369 if let Some(ws_id) = &request.workspace_id {
370 incident_query.push_str(" AND workspace_id = $1");
371 }
372
373 let rows = sqlx::query(&incident_query).fetch_all(pool).await.map_err(|e| {
375 tracing::error!("Failed to query drift incidents: {}", e);
376 StatusCode::INTERNAL_SERVER_ERROR
377 })?;
378
379 use mockforge_core::incidents::types::{IncidentSeverity, IncidentStatus, IncidentType};
381 use sqlx::Row;
382 let mut incidents = Vec::new();
383 for row in rows {
384 let id: uuid::Uuid = row.try_get("id").map_err(|e| {
385 tracing::error!("Failed to get id from row: {}", e);
386 StatusCode::INTERNAL_SERVER_ERROR
387 })?;
388 let workspace_id: Option<uuid::Uuid> = row.try_get("workspace_id").ok();
389 let endpoint: String = match row.try_get("endpoint") {
390 Ok(e) => e,
391 Err(_) => continue,
392 };
393 let method: String = match row.try_get("method") {
394 Ok(m) => m,
395 Err(_) => continue,
396 };
397 let incident_type_str: String = match row.try_get("incident_type") {
398 Ok(s) => s,
399 Err(_) => continue,
400 };
401 let severity_str: String = match row.try_get("severity") {
402 Ok(s) => s,
403 Err(_) => continue,
404 };
405 let status_str: String = match row.try_get("status") {
406 Ok(s) => s,
407 Err(_) => continue,
408 };
409 let detected_at: DateTime<Utc> = match row.try_get("detected_at") {
410 Ok(dt) => dt,
411 Err(_) => continue,
412 };
413 let details_json: serde_json::Value = row.try_get("details").unwrap_or_default();
414 let created_at: DateTime<Utc> = match row.try_get("created_at") {
415 Ok(dt) => dt,
416 Err(_) => continue,
417 };
418 let updated_at: DateTime<Utc> = match row.try_get("updated_at") {
419 Ok(dt) => dt,
420 Err(_) => continue,
421 };
422
423 let incident_type = match incident_type_str.as_str() {
424 "breaking_change" => IncidentType::BreakingChange,
425 "threshold_exceeded" => IncidentType::ThresholdExceeded,
426 _ => continue, };
428
429 let severity = match severity_str.as_str() {
430 "low" => IncidentSeverity::Low,
431 "medium" => IncidentSeverity::Medium,
432 "high" => IncidentSeverity::High,
433 "critical" => IncidentSeverity::Critical,
434 _ => continue, };
436
437 let status = match status_str.as_str() {
438 "open" => IncidentStatus::Open,
439 "acknowledged" => IncidentStatus::Acknowledged,
440 "resolved" => IncidentStatus::Resolved,
441 "closed" => IncidentStatus::Closed,
442 _ => continue, };
444
445 incidents.push(DriftIncident {
446 id: id.to_string(),
447 budget_id: None,
448 workspace_id: workspace_id.map(|u| u.to_string()),
449 endpoint,
450 method,
451 incident_type,
452 severity,
453 status,
454 detected_at: detected_at.timestamp(),
455 resolved_at: None,
456 details: details_json,
457 external_ticket_id: None,
458 external_ticket_url: None,
459 created_at: created_at.timestamp(),
460 updated_at: updated_at.timestamp(),
461 sync_cycle_id: None,
462 contract_diff_id: None,
463 before_sample: None,
464 after_sample: None,
465 fitness_test_results: Vec::new(),
466 affected_consumers: None,
467 protocol: None,
468 });
469 }
470
471 use mockforge_core::incidents::types::DriftIncident;
473 use std::collections::HashMap;
474 let mut forecasts_generated = 0;
475 let mut endpoint_groups: HashMap<(String, String), Vec<DriftIncident>> = HashMap::new();
476
477 for incident in incidents {
478 endpoint_groups
479 .entry((incident.endpoint.clone(), incident.method.clone()))
480 .or_insert_with(Vec::new)
481 .push(incident);
482 }
483
484 for ((endpoint, method), group_incidents) in endpoint_groups {
485 if let Some(_forecast) = state.forecaster.generate_forecast(
486 &group_incidents,
487 request.workspace_id.clone(),
488 None, None, endpoint,
491 method,
492 30, ) {
494 forecasts_generated += 1;
495 }
496 }
497
498 Ok(Json(serde_json::json!({
499 "success": true,
500 "message": "Forecasts refreshed",
501 "forecasts_generated": forecasts_generated
502 })))
503}
504
505#[cfg(not(feature = "database"))]
509pub async fn refresh_forecasts(
510 State(_state): State<ForecastingState>,
511 Json(_request): Json<RefreshForecastsRequest>,
512) -> Result<Json<serde_json::Value>, StatusCode> {
513 Ok(Json(serde_json::json!({
514 "success": false,
515 "error": "Database not available"
516 })))
517}
518
519#[cfg(feature = "database")]
521pub async fn store_forecast(
522 pool: &sqlx::PgPool,
523 forecast: &ChangeForecast,
524 workspace_id: Option<&str>,
525) -> Result<(), sqlx::Error> {
526 let id = Uuid::new_v4();
527 let workspace_uuid = workspace_id.and_then(|id| Uuid::parse_str(id).ok());
528
529 sqlx::query(
530 r#"
531 INSERT INTO api_change_forecasts (
532 id, workspace_id, service_id, service_name, endpoint, method,
533 forecast_window_days, predicted_change_probability, predicted_break_probability,
534 next_expected_change_date, next_expected_break_date, volatility_score, confidence,
535 seasonal_patterns, predicted_at, expires_at
536 ) VALUES (
537 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16
538 )
539 ON CONFLICT (workspace_id, service_id, endpoint, method, forecast_window_days)
540 DO UPDATE SET
541 predicted_change_probability = EXCLUDED.predicted_change_probability,
542 predicted_break_probability = EXCLUDED.predicted_break_probability,
543 next_expected_change_date = EXCLUDED.next_expected_change_date,
544 next_expected_break_date = EXCLUDED.next_expected_break_date,
545 volatility_score = EXCLUDED.volatility_score,
546 confidence = EXCLUDED.confidence,
547 seasonal_patterns = EXCLUDED.seasonal_patterns,
548 predicted_at = EXCLUDED.predicted_at,
549 expires_at = EXCLUDED.expires_at,
550 updated_at = NOW()
551 "#,
552 )
553 .bind(id)
554 .bind(workspace_uuid)
555 .bind(forecast.service_id.as_deref())
556 .bind(forecast.service_name.as_deref())
557 .bind(&forecast.endpoint)
558 .bind(&forecast.method)
559 .bind(forecast.forecast_window_days as i32)
560 .bind(forecast.predicted_change_probability)
561 .bind(forecast.predicted_break_probability)
562 .bind(forecast.next_expected_change_date)
563 .bind(forecast.next_expected_break_date)
564 .bind(forecast.volatility_score)
565 .bind(forecast.confidence)
566 .bind(serde_json::to_value(&forecast.seasonal_patterns).unwrap_or_default())
567 .bind(forecast.predicted_at)
568 .bind(forecast.expires_at)
569 .execute(pool)
570 .await?;
571
572 Ok(())
573}
574
575pub fn forecasting_router(state: ForecastingState) -> axum::Router {
577 use axum::routing::{get, post};
578 use axum::Router;
579
580 Router::new()
581 .route("/api/v1/forecasts", get(list_forecasts))
582 .route("/api/v1/forecasts/service/{service_id}", get(get_service_forecasts))
583 .route("/api/v1/forecasts/endpoint/{endpoint}", get(get_endpoint_forecasts))
584 .route("/api/v1/forecasts/refresh", post(refresh_forecasts))
585 .with_state(state)
586}