Skip to main content

mockforge_intelligence/handlers/
semantic_drift.rs

1//! HTTP handlers for semantic drift incidents
2//!
3//! This module provides endpoints for managing semantic drift incidents.
4
5// ContractDiffAnalyzer stays in core (LLM-bound).
6#![allow(deprecated)]
7
8use crate::ai_contract_diff::{ContractDiffAnalyzer, ContractDiffConfig};
9use crate::incidents::semantic_manager::{SemanticIncident, SemanticIncidentManager};
10use axum::{
11    extract::{Path, Query, State},
12    http::StatusCode,
13    response::Json,
14};
15use mockforge_openapi::OpenApiSpec;
16use serde::{Deserialize, Serialize};
17use std::sync::Arc;
18
19#[cfg(feature = "database")]
20use chrono::{DateTime, Utc};
21#[cfg(feature = "database")]
22use uuid::Uuid;
23
24#[cfg(feature = "database")]
25use crate::database::Database;
26
27/// Helper function to map database row to SemanticIncident
28#[cfg(feature = "database")]
29fn map_row_to_semantic_incident(
30    row: &sqlx::postgres::PgRow,
31) -> Result<SemanticIncident, sqlx::Error> {
32    use mockforge_foundation::contract_diff_types::SemanticChangeType;
33    use mockforge_foundation::incidents_types::{IncidentSeverity, IncidentStatus};
34    use sqlx::Row;
35
36    let id: Uuid = row.try_get("id")?;
37    let workspace_id: Option<Uuid> = row.try_get("workspace_id").ok();
38    let endpoint: String = row.try_get("endpoint")?;
39    let method: String = row.try_get("method")?;
40    let semantic_change_type_str: String = row.try_get("semantic_change_type")?;
41    let severity_str: String = row.try_get("severity")?;
42    let status_str: String = row.try_get("status")?;
43    let semantic_confidence: f64 = row.try_get("semantic_confidence")?;
44    let soft_breaking_score: f64 = row.try_get("soft_breaking_score")?;
45    let llm_analysis: serde_json::Value = row.try_get("llm_analysis").unwrap_or_default();
46    let before_semantic_state: serde_json::Value =
47        row.try_get("before_semantic_state").unwrap_or_default();
48    let after_semantic_state: serde_json::Value =
49        row.try_get("after_semantic_state").unwrap_or_default();
50    let details_json: serde_json::Value = row.try_get("details").unwrap_or_default();
51    let related_drift_incident_id: Option<Uuid> = row.try_get("related_drift_incident_id").ok();
52    let contract_diff_id: Option<String> = row.try_get("contract_diff_id").ok();
53    let external_ticket_id: Option<String> = row.try_get("external_ticket_id").ok();
54    let external_ticket_url: Option<String> = row.try_get("external_ticket_url").ok();
55    let detected_at: DateTime<Utc> = row.try_get("detected_at")?;
56    let created_at: DateTime<Utc> = row.try_get("created_at")?;
57    let acknowledged_at: Option<DateTime<Utc>> = row.try_get("acknowledged_at").ok();
58    let resolved_at: Option<DateTime<Utc>> = row.try_get("resolved_at").ok();
59    let closed_at: Option<DateTime<Utc>> = row.try_get("closed_at").ok();
60    let updated_at: DateTime<Utc> = row.try_get("updated_at")?;
61
62    // Parse semantic change type
63    let semantic_change_type = match semantic_change_type_str.as_str() {
64        "description_change" => SemanticChangeType::DescriptionChange,
65        "enum_narrowing" => SemanticChangeType::EnumNarrowing,
66        "nullability_change" => SemanticChangeType::NullableChange,
67        "error_code_removed" => SemanticChangeType::ErrorCodeRemoved,
68        "meaning_shift" => SemanticChangeType::MeaningShift,
69        _ => SemanticChangeType::MeaningShift, // Default fallback
70    };
71
72    // Parse severity
73    let severity = match severity_str.as_str() {
74        "low" => IncidentSeverity::Low,
75        "medium" => IncidentSeverity::Medium,
76        "high" => IncidentSeverity::High,
77        "critical" => IncidentSeverity::Critical,
78        _ => IncidentSeverity::Medium, // Default fallback
79    };
80
81    // Parse status
82    let status = match status_str.as_str() {
83        "open" => IncidentStatus::Open,
84        "acknowledged" => IncidentStatus::Acknowledged,
85        "resolved" => IncidentStatus::Resolved,
86        "closed" => IncidentStatus::Closed,
87        _ => IncidentStatus::Open, // Default fallback
88    };
89
90    Ok(SemanticIncident {
91        id: id.to_string(),
92        workspace_id: workspace_id.map(|u| u.to_string()),
93        endpoint,
94        method,
95        semantic_change_type,
96        severity,
97        status,
98        semantic_confidence,
99        soft_breaking_score,
100        llm_analysis,
101        before_semantic_state,
102        after_semantic_state,
103        details: details_json,
104        related_drift_incident_id: related_drift_incident_id.map(|u| u.to_string()),
105        contract_diff_id,
106        external_ticket_id,
107        external_ticket_url,
108        detected_at: detected_at.timestamp(),
109        created_at: created_at.timestamp(),
110        acknowledged_at: acknowledged_at.map(|dt| dt.timestamp()),
111        resolved_at: resolved_at.map(|dt| dt.timestamp()),
112        closed_at: closed_at.map(|dt| dt.timestamp()),
113        updated_at: updated_at.timestamp(),
114    })
115}
116
117/// State for semantic drift handlers
118#[derive(Clone)]
119pub struct SemanticDriftState {
120    /// Semantic incident manager
121    pub manager: Arc<SemanticIncidentManager>,
122    /// Database connection (optional)
123    #[cfg(feature = "database")]
124    pub database: Option<Database>,
125}
126
127/// Query parameters for listing semantic incidents
128#[derive(Debug, Deserialize)]
129pub struct ListSemanticIncidentsQuery {
130    /// Workspace ID filter
131    pub workspace_id: Option<String>,
132    /// Endpoint filter
133    pub endpoint: Option<String>,
134    /// Method filter
135    pub method: Option<String>,
136    /// Status filter
137    pub status: Option<String>,
138    /// Limit results
139    pub limit: Option<usize>,
140}
141
142/// Response for semantic incident list
143#[derive(Debug, Serialize)]
144pub struct SemanticIncidentListResponse {
145    /// Incidents
146    pub incidents: Vec<SemanticIncident>,
147    /// Total count
148    pub total: usize,
149}
150
151/// List semantic drift incidents
152///
153/// GET /api/v1/semantic-drift/incidents
154pub async fn list_semantic_incidents(
155    State(state): State<SemanticDriftState>,
156    Query(params): Query<ListSemanticIncidentsQuery>,
157) -> Result<Json<SemanticIncidentListResponse>, StatusCode> {
158    // Try database first, fallback to in-memory
159    #[cfg(feature = "database")]
160    if let Some(pool) = state.database.as_ref().and_then(|db| db.pool()) {
161        let mut query = String::from(
162            "SELECT id, workspace_id, endpoint, method, semantic_change_type, severity, status,
163             semantic_confidence, soft_breaking_score, llm_analysis, before_semantic_state,
164             after_semantic_state, details, related_drift_incident_id, contract_diff_id,
165             external_ticket_id, external_ticket_url, detected_at, created_at, acknowledged_at,
166             resolved_at, closed_at, updated_at
167             FROM semantic_drift_incidents WHERE 1=1",
168        );
169
170        let mut bind_index = 1;
171
172        if params.workspace_id.is_some() {
173            query.push_str(&format!(" AND workspace_id = ${}", bind_index));
174            bind_index += 1;
175        }
176
177        if params.endpoint.is_some() {
178            query.push_str(&format!(" AND endpoint = ${}", bind_index));
179            bind_index += 1;
180        }
181
182        if params.method.is_some() {
183            query.push_str(&format!(" AND method = ${}", bind_index));
184            bind_index += 1;
185        }
186
187        if params.status.is_some() {
188            query.push_str(&format!(" AND status = ${}", bind_index));
189            bind_index += 1;
190        }
191
192        let limit = params.limit.unwrap_or(100);
193        query.push_str(&format!(" ORDER BY detected_at DESC LIMIT {}", limit));
194
195        // Execute query - use fetch_all for SELECT queries.
196        // Binds match the order in which filters were appended above.
197        let _ = bind_index; // silence unused-assignment after the last filter
198        let mut q = sqlx::query(&query);
199        if let Some(ws_id) = &params.workspace_id {
200            q = q.bind(ws_id);
201        }
202        if let Some(ep) = &params.endpoint {
203            q = q.bind(ep);
204        }
205        if let Some(m) = &params.method {
206            q = q.bind(m);
207        }
208        if let Some(s) = &params.status {
209            q = q.bind(s);
210        }
211        let rows = q.fetch_all(pool).await.map_err(|e| {
212            tracing::error!("Failed to query semantic incidents: {}", e);
213            StatusCode::INTERNAL_SERVER_ERROR
214        })?;
215
216        // Map rows to SemanticIncident
217        let mut incidents = Vec::new();
218        for row in rows {
219            match map_row_to_semantic_incident(&row) {
220                Ok(incident) => incidents.push(incident),
221                Err(e) => {
222                    tracing::warn!("Failed to map semantic incident row: {}", e);
223                    continue;
224                }
225            }
226        }
227        if !incidents.is_empty() {
228            return Ok(Json(SemanticIncidentListResponse {
229                total: incidents.len(),
230                incidents,
231            }));
232        }
233        // Fall through to in-memory manager if no database results
234    }
235
236    // Fallback to in-memory manager
237    let status = params.status.as_deref().and_then(|s| match s {
238        "open" => Some(mockforge_foundation::incidents_types::IncidentStatus::Open),
239        "acknowledged" => Some(mockforge_foundation::incidents_types::IncidentStatus::Acknowledged),
240        "resolved" => Some(mockforge_foundation::incidents_types::IncidentStatus::Resolved),
241        "closed" => Some(mockforge_foundation::incidents_types::IncidentStatus::Closed),
242        _ => None,
243    });
244
245    let incidents = state
246        .manager
247        .list_incidents(
248            params.workspace_id.as_deref(),
249            params.endpoint.as_deref(),
250            params.method.as_deref(),
251            status,
252            params.limit,
253        )
254        .await;
255
256    Ok(Json(SemanticIncidentListResponse {
257        total: incidents.len(),
258        incidents,
259    }))
260}
261
262/// Get a specific semantic incident
263///
264/// GET /api/v1/semantic-drift/incidents/{id}
265pub async fn get_semantic_incident(
266    State(state): State<SemanticDriftState>,
267    Path(id): Path<String>,
268) -> Result<Json<SemanticIncident>, StatusCode> {
269    // Try database first
270    #[cfg(feature = "database")]
271    if let Some(pool) = state.database.as_ref().and_then(|db| db.pool()) {
272        let row = sqlx::query("SELECT * FROM semantic_drift_incidents WHERE id = $1")
273            .bind(&id)
274            .fetch_optional(pool)
275            .await
276            .map_err(|e| {
277                tracing::error!("Failed to query semantic incident: {}", e);
278                StatusCode::INTERNAL_SERVER_ERROR
279            })?;
280
281        if let Some(row) = row {
282            match map_row_to_semantic_incident(&row) {
283                Ok(incident) => return Ok(Json(incident)),
284                Err(e) => {
285                    tracing::warn!("Failed to map semantic incident: {}", e);
286                    // Fall through to in-memory
287                }
288            }
289        }
290    }
291
292    // Fallback to in-memory manager
293    match state.manager.get_incident(&id).await {
294        Some(incident) => Ok(Json(incident)),
295        None => Err(StatusCode::NOT_FOUND),
296    }
297}
298
299/// Request to analyze semantic drift
300#[derive(Debug, Deserialize)]
301pub struct AnalyzeSemanticDriftRequest {
302    /// Before spec (OpenAPI YAML/JSON)
303    pub before_spec: String,
304    /// After spec (OpenAPI YAML/JSON)
305    pub after_spec: String,
306    /// Endpoint path
307    pub endpoint: String,
308    /// HTTP method
309    pub method: String,
310    /// Workspace ID (optional)
311    pub workspace_id: Option<String>,
312}
313
314/// Analyze semantic drift between two specs
315///
316/// POST /api/v1/semantic-drift/analyze
317pub async fn analyze_semantic_drift(
318    State(state): State<SemanticDriftState>,
319    Json(request): Json<AnalyzeSemanticDriftRequest>,
320) -> Result<Json<serde_json::Value>, StatusCode> {
321    // Parse specs
322    let before_spec = OpenApiSpec::from_string(&request.before_spec, None)
323        .map_err(|_| StatusCode::BAD_REQUEST)?;
324    let after_spec =
325        OpenApiSpec::from_string(&request.after_spec, None).map_err(|_| StatusCode::BAD_REQUEST)?;
326
327    // Create analyzer
328    let config = ContractDiffConfig::default();
329    let analyzer =
330        ContractDiffAnalyzer::new(config).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
331
332    // Run semantic analysis
333    let semantic_result = analyzer
334        .compare_specs(&before_spec, &after_spec, &request.endpoint, &request.method)
335        .await
336        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
337
338    if let Some(result) = semantic_result {
339        // Create semantic incident if threshold met
340        if result.semantic_confidence >= 0.65 {
341            let incident = state
342                .manager
343                .create_incident(
344                    &result,
345                    request.endpoint.clone(),
346                    request.method.clone(),
347                    request.workspace_id.clone(),
348                    None, // related_drift_incident_id
349                    None, // contract_diff_id
350                )
351                .await;
352
353            // Store in database if available
354            #[cfg(feature = "database")]
355            if let Some(pool) = state.database.as_ref().and_then(|db| db.pool()) {
356                if let Err(e) = store_semantic_incident(pool, &incident).await {
357                    tracing::warn!("Failed to store semantic incident in database: {}", e);
358                }
359            }
360
361            return Ok(Json(serde_json::json!({
362                "success": true,
363                "semantic_drift_detected": true,
364                "incident_id": incident.id,
365                "semantic_confidence": result.semantic_confidence,
366                "soft_breaking_score": result.soft_breaking_score,
367                "change_type": format!("{:?}", result.change_type),
368            })));
369        }
370    }
371
372    Ok(Json(serde_json::json!({
373        "success": true,
374        "semantic_drift_detected": false,
375        "message": "No significant semantic drift detected"
376    })))
377}
378
379/// Store semantic incident in database
380#[cfg(feature = "database")]
381async fn store_semantic_incident(
382    pool: &sqlx::PgPool,
383    incident: &SemanticIncident,
384) -> Result<(), sqlx::Error> {
385    let id = Uuid::parse_str(&incident.id).unwrap_or_else(|_| Uuid::new_v4());
386    let workspace_uuid = incident.workspace_id.as_ref().and_then(|id| Uuid::parse_str(id).ok());
387    let related_uuid = incident
388        .related_drift_incident_id
389        .as_ref()
390        .and_then(|id| Uuid::parse_str(id).ok());
391
392    sqlx::query(
393        r#"
394        INSERT INTO semantic_drift_incidents (
395            id, workspace_id, endpoint, method, semantic_change_type, severity, status,
396            semantic_confidence, soft_breaking_score, llm_analysis, before_semantic_state,
397            after_semantic_state, details, related_drift_incident_id, contract_diff_id,
398            external_ticket_id, external_ticket_url, detected_at, created_at, updated_at
399        ) VALUES (
400            $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20
401        )
402        ON CONFLICT (id) DO UPDATE SET
403            status = EXCLUDED.status,
404            acknowledged_at = EXCLUDED.acknowledged_at,
405            resolved_at = EXCLUDED.resolved_at,
406            closed_at = EXCLUDED.closed_at,
407            updated_at = EXCLUDED.updated_at
408        "#,
409    )
410    .bind(id)
411    .bind(workspace_uuid)
412    .bind(&incident.endpoint)
413    .bind(&incident.method)
414    .bind(format!("{:?}", incident.semantic_change_type))
415    .bind(format!("{:?}", incident.severity))
416    .bind(format!("{:?}", incident.status))
417    .bind(incident.semantic_confidence)
418    .bind(incident.soft_breaking_score)
419    .bind(&incident.llm_analysis)
420    .bind(&incident.before_semantic_state)
421    .bind(&incident.after_semantic_state)
422    .bind(&incident.details)
423    .bind(related_uuid)
424    .bind(incident.contract_diff_id.as_deref())
425    .bind(incident.external_ticket_id.as_deref())
426    .bind(incident.external_ticket_url.as_deref())
427    .bind(DateTime::<Utc>::from_timestamp(incident.detected_at, 0).unwrap_or_else(Utc::now))
428    .bind(DateTime::<Utc>::from_timestamp(incident.created_at, 0).unwrap_or_else(Utc::now))
429    .bind(DateTime::<Utc>::from_timestamp(incident.updated_at, 0).unwrap_or_else(Utc::now))
430    .execute(pool)
431    .await?;
432
433    Ok(())
434}
435
436/// Create router for semantic drift endpoints
437pub fn semantic_drift_router(state: SemanticDriftState) -> axum::Router {
438    use axum::routing::{get, post};
439    use axum::Router;
440
441    Router::new()
442        .route("/api/v1/semantic-drift/incidents", get(list_semantic_incidents))
443        .route("/api/v1/semantic-drift/incidents/{id}", get(get_semantic_incident))
444        .route("/api/v1/semantic-drift/analyze", post(analyze_semantic_drift))
445        .with_state(state)
446}