1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::Json,
9};
10use chrono::{DateTime, Utc};
11use mockforge_core::ai_contract_diff::{ContractDiffAnalyzer, ContractDiffConfig};
12use mockforge_core::incidents::semantic_manager::{SemanticIncident, SemanticIncidentManager};
13use mockforge_core::openapi::OpenApiSpec;
14use serde::{Deserialize, Serialize};
15use std::sync::Arc;
16use uuid::Uuid;
17
18use crate::database::Database;
19
20#[cfg(feature = "database")]
22fn map_row_to_semantic_incident(
23 row: &sqlx::postgres::PgRow,
24) -> Result<SemanticIncident, sqlx::Error> {
25 use sqlx::Row;
26 use mockforge_core::ai_contract_diff::semantic_analyzer::SemanticChangeType;
27 use mockforge_core::incidents::types::{IncidentSeverity, IncidentStatus};
28
29 let id: uuid::Uuid = row.try_get("id")?;
30 let workspace_id: Option<uuid::Uuid> = row.try_get("workspace_id").ok();
31 let endpoint: String = row.try_get("endpoint")?;
32 let method: String = row.try_get("method")?;
33 let semantic_change_type_str: String = row.try_get("semantic_change_type")?;
34 let severity_str: String = row.try_get("severity")?;
35 let status_str: String = row.try_get("status")?;
36 let semantic_confidence: f64 = row.try_get("semantic_confidence")?;
37 let soft_breaking_score: f64 = row.try_get("soft_breaking_score")?;
38 let llm_analysis: serde_json::Value = row.try_get("llm_analysis").unwrap_or_default();
39 let before_semantic_state: serde_json::Value =
40 row.try_get("before_semantic_state").unwrap_or_default();
41 let after_semantic_state: serde_json::Value =
42 row.try_get("after_semantic_state").unwrap_or_default();
43 let details_json: serde_json::Value = row.try_get("details").unwrap_or_default();
44 let related_drift_incident_id: Option<uuid::Uuid> =
45 row.try_get("related_drift_incident_id").ok();
46 let contract_diff_id: Option<String> = row.try_get("contract_diff_id").ok();
47 let external_ticket_id: Option<String> = row.try_get("external_ticket_id").ok();
48 let external_ticket_url: Option<String> = row.try_get("external_ticket_url").ok();
49 let detected_at: DateTime<Utc> = row.try_get("detected_at")?;
50 let created_at: DateTime<Utc> = row.try_get("created_at")?;
51 let acknowledged_at: Option<DateTime<Utc>> = row.try_get("acknowledged_at").ok();
52 let resolved_at: Option<DateTime<Utc>> = row.try_get("resolved_at").ok();
53 let closed_at: Option<DateTime<Utc>> = row.try_get("closed_at").ok();
54 let updated_at: DateTime<Utc> = row.try_get("updated_at")?;
55
56 let semantic_change_type = match semantic_change_type_str.as_str() {
58 "description_change" => SemanticChangeType::DescriptionChange,
59 "enum_narrowing" => SemanticChangeType::EnumNarrowing,
60 "nullability_change" => SemanticChangeType::NullabilityChange,
61 "error_code_removed" => SemanticChangeType::ErrorCodeRemoved,
62 "meaning_shift" => SemanticChangeType::MeaningShift,
63 _ => SemanticChangeType::MeaningShift, };
65
66 let severity = match severity_str.as_str() {
68 "low" => IncidentSeverity::Low,
69 "medium" => IncidentSeverity::Medium,
70 "high" => IncidentSeverity::High,
71 "critical" => IncidentSeverity::Critical,
72 _ => IncidentSeverity::Medium, };
74
75 let status = match status_str.as_str() {
77 "open" => IncidentStatus::Open,
78 "acknowledged" => IncidentStatus::Acknowledged,
79 "resolved" => IncidentStatus::Resolved,
80 "closed" => IncidentStatus::Closed,
81 _ => IncidentStatus::Open, };
83
84 Ok(SemanticIncident {
85 id: id.to_string(),
86 workspace_id: workspace_id.map(|u| u.to_string()),
87 endpoint,
88 method,
89 semantic_change_type,
90 severity,
91 status,
92 semantic_confidence,
93 soft_breaking_score,
94 llm_analysis,
95 before_semantic_state,
96 after_semantic_state,
97 details: details_json,
98 related_drift_incident_id: related_drift_incident_id.map(|u| u.to_string()),
99 contract_diff_id,
100 external_ticket_id,
101 external_ticket_url,
102 detected_at: detected_at.timestamp(),
103 created_at: created_at.timestamp(),
104 acknowledged_at: acknowledged_at.map(|dt| dt.timestamp()),
105 resolved_at: resolved_at.map(|dt| dt.timestamp()),
106 closed_at: closed_at.map(|dt| dt.timestamp()),
107 updated_at: updated_at.timestamp(),
108 })
109}
110
111#[derive(Clone)]
113pub struct SemanticDriftState {
114 pub manager: Arc<SemanticIncidentManager>,
116 pub database: Option<Database>,
118}
119
120#[derive(Debug, Deserialize)]
122pub struct ListSemanticIncidentsQuery {
123 pub workspace_id: Option<String>,
125 pub endpoint: Option<String>,
127 pub method: Option<String>,
129 pub status: Option<String>,
131 pub limit: Option<usize>,
133}
134
135#[derive(Debug, Serialize)]
137pub struct SemanticIncidentListResponse {
138 pub incidents: Vec<SemanticIncident>,
140 pub total: usize,
142}
143
144pub async fn list_semantic_incidents(
148 State(state): State<SemanticDriftState>,
149 Query(params): Query<ListSemanticIncidentsQuery>,
150) -> Result<Json<SemanticIncidentListResponse>, StatusCode> {
151 #[cfg(feature = "database")]
153 if let Some(pool) = state.database.as_ref().and_then(|db| db.pool()) {
154 let mut query = String::from(
155 "SELECT id, workspace_id, endpoint, method, semantic_change_type, severity, status,
156 semantic_confidence, soft_breaking_score, llm_analysis, before_semantic_state,
157 after_semantic_state, details, related_drift_incident_id, contract_diff_id,
158 external_ticket_id, external_ticket_url, detected_at, created_at, acknowledged_at,
159 resolved_at, closed_at, updated_at
160 FROM semantic_drift_incidents WHERE 1=1",
161 );
162
163 let mut bind_index = 1;
164
165 if let Some(ws_id) = ¶ms.workspace_id {
166 query.push_str(&format!(" AND workspace_id = ${}", bind_index));
167 bind_index += 1;
168 }
169
170 if let Some(ep) = ¶ms.endpoint {
171 query.push_str(&format!(" AND endpoint = ${}", bind_index));
172 bind_index += 1;
173 }
174
175 if let Some(m) = ¶ms.method {
176 query.push_str(&format!(" AND method = ${}", bind_index));
177 bind_index += 1;
178 }
179
180 if let Some(status_str) = ¶ms.status {
181 query.push_str(&format!(" AND status = ${}", bind_index));
182 bind_index += 1;
183 }
184
185 let limit = params.limit.unwrap_or(100);
186 query.push_str(&format!(" ORDER BY detected_at DESC LIMIT {}", limit));
187
188 let rows = sqlx::query(&query).fetch_all(pool).await.map_err(|e| {
190 tracing::error!("Failed to query semantic incidents: {}", e);
191 StatusCode::INTERNAL_SERVER_ERROR
192 })?;
193
194 let mut incidents = Vec::new();
196 for row in rows {
197 match map_row_to_semantic_incident(&row) {
198 Ok(incident) => incidents.push(incident),
199 Err(e) => {
200 tracing::warn!("Failed to map semantic incident row: {}", e);
201 continue;
202 }
203 }
204 }
205 if !incidents.is_empty() {
206 return Ok(Json(SemanticIncidentListResponse {
207 total: incidents.len(),
208 incidents,
209 }));
210 }
211 }
213
214 let status = params.status.as_deref().and_then(|s| match s {
216 "open" => Some(mockforge_core::incidents::types::IncidentStatus::Open),
217 "acknowledged" => Some(mockforge_core::incidents::types::IncidentStatus::Acknowledged),
218 "resolved" => Some(mockforge_core::incidents::types::IncidentStatus::Resolved),
219 "closed" => Some(mockforge_core::incidents::types::IncidentStatus::Closed),
220 _ => None,
221 });
222
223 let incidents = state
224 .manager
225 .list_incidents(
226 params.workspace_id.as_deref(),
227 params.endpoint.as_deref(),
228 params.method.as_deref(),
229 status,
230 params.limit,
231 )
232 .await;
233
234 Ok(Json(SemanticIncidentListResponse {
235 total: incidents.len(),
236 incidents,
237 }))
238}
239
240pub async fn get_semantic_incident(
244 State(state): State<SemanticDriftState>,
245 Path(id): Path<String>,
246) -> Result<Json<SemanticIncident>, StatusCode> {
247 #[cfg(feature = "database")]
249 if let Some(pool) = state.database.as_ref().and_then(|db| db.pool()) {
250 let row = sqlx::query("SELECT * FROM semantic_drift_incidents WHERE id = $1")
251 .bind(&id)
252 .fetch_optional(pool)
253 .await
254 .map_err(|e| {
255 tracing::error!("Failed to query semantic incident: {}", e);
256 StatusCode::INTERNAL_SERVER_ERROR
257 })?;
258
259 if let Some(row) = row {
260 match map_row_to_semantic_incident(&row) {
261 Ok(incident) => return Ok(Json(incident)),
262 Err(e) => {
263 tracing::warn!("Failed to map semantic incident: {}", e);
264 }
266 }
267 }
268 }
269
270 match state.manager.get_incident(&id).await {
272 Some(incident) => Ok(Json(incident)),
273 None => Err(StatusCode::NOT_FOUND),
274 }
275}
276
277#[derive(Debug, Deserialize)]
279pub struct AnalyzeSemanticDriftRequest {
280 pub before_spec: String,
282 pub after_spec: String,
284 pub endpoint: String,
286 pub method: String,
288 pub workspace_id: Option<String>,
290}
291
292pub async fn analyze_semantic_drift(
296 State(state): State<SemanticDriftState>,
297 Json(request): Json<AnalyzeSemanticDriftRequest>,
298) -> Result<Json<serde_json::Value>, StatusCode> {
299 let before_spec = OpenApiSpec::from_string(&request.before_spec, None)
301 .map_err(|_| StatusCode::BAD_REQUEST)?;
302 let after_spec =
303 OpenApiSpec::from_string(&request.after_spec, None).map_err(|_| StatusCode::BAD_REQUEST)?;
304
305 let config = ContractDiffConfig::default();
307 let analyzer =
308 ContractDiffAnalyzer::new(config).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
309
310 let semantic_result = analyzer
312 .compare_specs(&before_spec, &after_spec, &request.endpoint, &request.method)
313 .await
314 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
315
316 if let Some(result) = semantic_result {
317 if result.semantic_confidence >= 0.65 {
319 let incident = state
320 .manager
321 .create_incident(
322 &result,
323 request.endpoint.clone(),
324 request.method.clone(),
325 request.workspace_id.clone(),
326 None, None, )
329 .await;
330
331 #[cfg(feature = "database")]
333 if let Some(pool) = state.database.as_ref().and_then(|db| db.pool()) {
334 if let Err(e) = store_semantic_incident(pool, &incident).await {
335 tracing::warn!("Failed to store semantic incident in database: {}", e);
336 }
337 }
338
339 return Ok(Json(serde_json::json!({
340 "success": true,
341 "semantic_drift_detected": true,
342 "incident_id": incident.id,
343 "semantic_confidence": result.semantic_confidence,
344 "soft_breaking_score": result.soft_breaking_score,
345 "change_type": format!("{:?}", result.change_type),
346 })));
347 }
348 }
349
350 Ok(Json(serde_json::json!({
351 "success": true,
352 "semantic_drift_detected": false,
353 "message": "No significant semantic drift detected"
354 })))
355}
356
357#[cfg(feature = "database")]
359async fn store_semantic_incident(
360 pool: &sqlx::PgPool,
361 incident: &SemanticIncident,
362) -> Result<(), sqlx::Error> {
363 let id = Uuid::parse_str(&incident.id).unwrap_or_else(|_| Uuid::new_v4());
364 let workspace_uuid = incident.workspace_id.as_ref().and_then(|id| Uuid::parse_str(id).ok());
365 let related_uuid = incident
366 .related_drift_incident_id
367 .as_ref()
368 .and_then(|id| Uuid::parse_str(id).ok());
369
370 sqlx::query(
371 r#"
372 INSERT INTO semantic_drift_incidents (
373 id, workspace_id, endpoint, method, semantic_change_type, severity, status,
374 semantic_confidence, soft_breaking_score, llm_analysis, before_semantic_state,
375 after_semantic_state, details, related_drift_incident_id, contract_diff_id,
376 external_ticket_id, external_ticket_url, detected_at, created_at, updated_at
377 ) VALUES (
378 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20
379 )
380 ON CONFLICT (id) DO UPDATE SET
381 status = EXCLUDED.status,
382 acknowledged_at = EXCLUDED.acknowledged_at,
383 resolved_at = EXCLUDED.resolved_at,
384 closed_at = EXCLUDED.closed_at,
385 updated_at = EXCLUDED.updated_at
386 "#,
387 )
388 .bind(id)
389 .bind(workspace_uuid)
390 .bind(&incident.endpoint)
391 .bind(&incident.method)
392 .bind(format!("{:?}", incident.semantic_change_type))
393 .bind(format!("{:?}", incident.severity))
394 .bind(format!("{:?}", incident.status))
395 .bind(incident.semantic_confidence)
396 .bind(incident.soft_breaking_score)
397 .bind(&incident.llm_analysis)
398 .bind(&incident.before_semantic_state)
399 .bind(&incident.after_semantic_state)
400 .bind(&incident.details)
401 .bind(related_uuid)
402 .bind(incident.contract_diff_id.as_deref())
403 .bind(incident.external_ticket_id.as_deref())
404 .bind(incident.external_ticket_url.as_deref())
405 .bind(DateTime::<Utc>::from_timestamp(incident.detected_at, 0).unwrap_or_else(Utc::now))
406 .bind(DateTime::<Utc>::from_timestamp(incident.created_at, 0).unwrap_or_else(Utc::now))
407 .bind(DateTime::<Utc>::from_timestamp(incident.updated_at, 0).unwrap_or_else(Utc::now))
408 .execute(pool)
409 .await?;
410
411 Ok(())
412}
413
414pub fn semantic_drift_router(state: SemanticDriftState) -> axum::Router {
416 use axum::routing::{get, post};
417 use axum::Router;
418
419 Router::new()
420 .route("/api/v1/semantic-drift/incidents", get(list_semantic_incidents))
421 .route("/api/v1/semantic-drift/incidents/{id}", get(get_semantic_incident))
422 .route("/api/v1/semantic-drift/analyze", post(analyze_semantic_drift))
423 .with_state(state)
424}