mockforge_http/handlers/
protocol_contracts.rs

1//! Protocol contract management handlers
2//!
3//! This module provides HTTP handlers for managing protocol contracts (gRPC, WebSocket, MQTT, Kafka).
4
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::Json,
9};
10use mockforge_core::contract_drift::protocol_contracts::{
11    compare_contracts, ProtocolContractRegistry,
12};
13use mockforge_core::contract_drift::{
14    GrpcContract, KafkaContract, KafkaTopicSchema, MqttContract, MqttTopicSchema, SchemaFormat,
15    TopicSchema, WebSocketContract, WebSocketMessageType,
16};
17use mockforge_core::protocol_abstraction::Protocol;
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::RwLock;
22
23// Base64 encoding/decoding
24use base64::{engine::general_purpose, Engine as _};
25
26/// State for protocol contract handlers
27#[derive(Clone)]
28pub struct ProtocolContractState {
29    /// Protocol contract registry
30    pub registry: Arc<RwLock<ProtocolContractRegistry>>,
31    /// Optional drift budget engine for evaluating contract changes
32    pub drift_engine: Option<Arc<mockforge_core::contract_drift::DriftBudgetEngine>>,
33    /// Optional incident manager for creating drift incidents
34    pub incident_manager: Option<Arc<mockforge_core::incidents::IncidentManager>>,
35    /// Optional fitness function registry for evaluating fitness rules
36    pub fitness_registry:
37        Option<Arc<RwLock<mockforge_core::contract_drift::FitnessFunctionRegistry>>>,
38    /// Optional consumer impact analyzer
39    pub consumer_analyzer:
40        Option<Arc<RwLock<mockforge_core::contract_drift::ConsumerImpactAnalyzer>>>,
41}
42
43/// Request to create a gRPC contract
44#[derive(Debug, Deserialize)]
45pub struct CreateGrpcContractRequest {
46    /// Contract ID
47    pub contract_id: String,
48    /// Contract version
49    pub version: String,
50    /// Protobuf descriptor set (base64 encoded)
51    pub descriptor_set: String,
52}
53
54/// Request to create a WebSocket contract
55#[derive(Debug, Deserialize)]
56pub struct CreateWebSocketContractRequest {
57    /// Contract ID
58    pub contract_id: String,
59    /// Contract version
60    pub version: String,
61    /// Message types
62    pub message_types: Vec<WebSocketMessageTypeRequest>,
63}
64
65/// Request for a WebSocket message type
66#[derive(Debug, Deserialize)]
67pub struct WebSocketMessageTypeRequest {
68    /// Message type identifier
69    pub message_type: String,
70    /// Optional topic or channel name
71    pub topic: Option<String>,
72    /// JSON schema for this message type
73    pub schema: serde_json::Value,
74    /// Direction: "inbound", "outbound", or "bidirectional"
75    pub direction: String,
76    /// Description of this message type
77    pub description: Option<String>,
78    /// Example message payload
79    pub example: Option<serde_json::Value>,
80}
81
82/// Request to create an MQTT contract
83#[derive(Debug, Deserialize)]
84pub struct CreateMqttContractRequest {
85    /// Contract ID
86    pub contract_id: String,
87    /// Contract version
88    pub version: String,
89    /// Topic schemas
90    pub topics: Vec<MqttTopicSchemaRequest>,
91}
92
93/// Request for an MQTT topic schema
94#[derive(Debug, Deserialize)]
95pub struct MqttTopicSchemaRequest {
96    /// Topic name
97    pub topic: String,
98    /// Quality of Service level (0, 1, or 2)
99    pub qos: Option<u8>,
100    /// JSON schema for messages on this topic
101    pub schema: serde_json::Value,
102    /// Whether messages are retained
103    pub retained: Option<bool>,
104    /// Description of this topic
105    pub description: Option<String>,
106    /// Example message payload
107    pub example: Option<serde_json::Value>,
108}
109
110/// Request to create a Kafka contract
111#[derive(Debug, Deserialize)]
112pub struct CreateKafkaContractRequest {
113    /// Contract ID
114    pub contract_id: String,
115    /// Contract version
116    pub version: String,
117    /// Topic schemas
118    pub topics: Vec<KafkaTopicSchemaRequest>,
119}
120
121/// Request for a Kafka topic schema
122#[derive(Debug, Deserialize)]
123pub struct KafkaTopicSchemaRequest {
124    /// Topic name
125    pub topic: String,
126    /// Key schema (optional)
127    pub key_schema: Option<TopicSchemaRequest>,
128    /// Value schema (required)
129    pub value_schema: TopicSchemaRequest,
130    /// Number of partitions
131    pub partitions: Option<u32>,
132    /// Replication factor
133    pub replication_factor: Option<u16>,
134    /// Description of this topic
135    pub description: Option<String>,
136    /// Evolution rules for schema changes
137    pub evolution_rules: Option<EvolutionRulesRequest>,
138}
139
140/// Request for a topic schema (key or value)
141#[derive(Debug, Deserialize)]
142pub struct TopicSchemaRequest {
143    /// Schema format: "json", "avro", or "protobuf"
144    pub format: String,
145    /// Schema definition
146    pub schema: serde_json::Value,
147    /// Schema registry ID (if using schema registry)
148    pub schema_id: Option<String>,
149    /// Schema version
150    pub version: Option<String>,
151}
152
153/// Request for evolution rules
154#[derive(Debug, Deserialize)]
155pub struct EvolutionRulesRequest {
156    /// Allow backward compatible changes
157    pub allow_backward_compatible: bool,
158    /// Allow forward compatible changes
159    pub allow_forward_compatible: bool,
160    /// Require explicit version bump for breaking changes
161    pub require_version_bump: bool,
162}
163
164/// Response for protocol contract operations
165#[derive(Debug, Serialize)]
166pub struct ProtocolContractResponse {
167    /// Contract ID
168    pub contract_id: String,
169    /// Contract version
170    pub version: String,
171    /// Protocol type
172    pub protocol: String,
173    /// Contract JSON representation
174    pub contract: serde_json::Value,
175}
176
177/// Response for listing contracts
178#[derive(Debug, Serialize)]
179pub struct ListContractsResponse {
180    /// List of contracts
181    pub contracts: Vec<ProtocolContractResponse>,
182    /// Total count
183    pub total: usize,
184}
185
186/// Request to compare contracts
187#[derive(Debug, Deserialize)]
188pub struct CompareContractsRequest {
189    /// Old contract ID
190    pub old_contract_id: String,
191    /// New contract ID
192    pub new_contract_id: String,
193}
194
195/// Request to validate a message
196#[derive(Debug, Deserialize)]
197pub struct ValidateMessageRequest {
198    /// Operation ID (endpoint, method, topic, etc.)
199    pub operation_id: String,
200    /// Message payload (base64 encoded or JSON)
201    pub payload: serde_json::Value,
202    /// Content type
203    pub content_type: Option<String>,
204    /// Additional metadata
205    pub metadata: Option<HashMap<String, String>>,
206}
207
208/// List all protocol contracts
209pub async fn list_contracts(
210    State(state): State<ProtocolContractState>,
211    Query(params): Query<HashMap<String, String>>,
212) -> Result<Json<ListContractsResponse>, (StatusCode, Json<serde_json::Value>)> {
213    let registry = state.registry.read().await;
214
215    let protocol_filter = params.get("protocol").and_then(|p| match p.as_str() {
216        "grpc" => Some(Protocol::Grpc),
217        "websocket" => Some(Protocol::WebSocket),
218        "mqtt" => Some(Protocol::Mqtt),
219        "kafka" => Some(Protocol::Kafka),
220        _ => None,
221    });
222
223    let contracts: Vec<ProtocolContractResponse> = if let Some(protocol) = protocol_filter {
224        registry
225            .list_by_protocol(protocol)
226            .iter()
227            .map(|contract| {
228                let contract_json = contract.to_json().unwrap_or_else(|_| serde_json::json!({}));
229                ProtocolContractResponse {
230                    contract_id: contract.contract_id().to_string(),
231                    version: contract.version().to_string(),
232                    protocol: format!("{:?}", contract.protocol()).to_lowercase(),
233                    contract: contract_json,
234                }
235            })
236            .collect()
237    } else {
238        registry
239            .list()
240            .iter()
241            .map(|contract| {
242                let contract_json = contract.to_json().unwrap_or_else(|_| serde_json::json!({}));
243                ProtocolContractResponse {
244                    contract_id: contract.contract_id().to_string(),
245                    version: contract.version().to_string(),
246                    protocol: format!("{:?}", contract.protocol()).to_lowercase(),
247                    contract: contract_json,
248                }
249            })
250            .collect()
251    };
252
253    Ok(Json(ListContractsResponse {
254        total: contracts.len(),
255        contracts,
256    }))
257}
258
259/// Get a specific contract
260pub async fn get_contract(
261    State(state): State<ProtocolContractState>,
262    Path(contract_id): Path<String>,
263) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
264    let registry = state.registry.read().await;
265
266    let contract = registry.get(&contract_id).ok_or_else(|| {
267        (
268            StatusCode::NOT_FOUND,
269            Json(serde_json::json!({
270                "error": "Contract not found",
271                "contract_id": contract_id
272            })),
273        )
274    })?;
275
276    let contract_json = contract.to_json().map_err(|e| {
277        (
278            StatusCode::INTERNAL_SERVER_ERROR,
279            Json(serde_json::json!({
280                "error": "Failed to serialize contract",
281                "message": e.to_string()
282            })),
283        )
284    })?;
285
286    Ok(Json(ProtocolContractResponse {
287        contract_id: contract.contract_id().to_string(),
288        version: contract.version().to_string(),
289        protocol: format!("{:?}", contract.protocol()).to_lowercase(),
290        contract: contract_json,
291    }))
292}
293
294/// Create a gRPC contract
295pub async fn create_grpc_contract(
296    State(state): State<ProtocolContractState>,
297    Json(request): Json<CreateGrpcContractRequest>,
298) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
299    // Decode base64 descriptor set
300    let descriptor_bytes =
301        general_purpose::STANDARD.decode(&request.descriptor_set).map_err(|e| {
302            (
303                StatusCode::BAD_REQUEST,
304                Json(serde_json::json!({
305                    "error": "Invalid base64 descriptor set",
306                    "message": e.to_string()
307                })),
308            )
309        })?;
310
311    // Create descriptor pool from bytes
312    // Note: GrpcContract::from_descriptor_set handles the descriptor pool creation
313    let contract = GrpcContract::from_descriptor_set(
314        request.contract_id.clone(),
315        request.version.clone(),
316        &descriptor_bytes,
317    )
318    .map_err(|e| {
319        (
320            StatusCode::BAD_REQUEST,
321            Json(serde_json::json!({
322                "error": "Failed to create gRPC contract",
323                "message": e.to_string()
324            })),
325        )
326    })?;
327
328    // Register contract
329    let mut registry = state.registry.write().await;
330    registry.register(Box::new(contract));
331
332    let contract = registry.get(&request.contract_id).ok_or_else(|| {
333        (
334            StatusCode::INTERNAL_SERVER_ERROR,
335            Json(serde_json::json!({
336                "error": "Failed to retrieve registered contract",
337                "contract_id": request.contract_id
338            })),
339        )
340    })?;
341    let contract_json = contract.to_json().map_err(|e| {
342        (
343            StatusCode::INTERNAL_SERVER_ERROR,
344            Json(serde_json::json!({
345                "error": "Failed to serialize contract",
346                "message": e.to_string()
347            })),
348        )
349    })?;
350
351    Ok(Json(ProtocolContractResponse {
352        contract_id: request.contract_id,
353        version: request.version,
354        protocol: "grpc".to_string(),
355        contract: contract_json,
356    }))
357}
358
359/// Create a WebSocket contract
360pub async fn create_websocket_contract(
361    State(state): State<ProtocolContractState>,
362    Json(request): Json<CreateWebSocketContractRequest>,
363) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
364    let mut contract = WebSocketContract::new(request.contract_id.clone(), request.version.clone());
365
366    // Add message types
367    for msg_type_req in request.message_types {
368        let direction = match msg_type_req.direction.as_str() {
369            "inbound" => mockforge_core::contract_drift::MessageDirection::Inbound,
370            "outbound" => mockforge_core::contract_drift::MessageDirection::Outbound,
371            "bidirectional" => mockforge_core::contract_drift::MessageDirection::Bidirectional,
372            _ => {
373                return Err((
374                    StatusCode::BAD_REQUEST,
375                    Json(serde_json::json!({
376                        "error": "Invalid direction",
377                        "message": "Direction must be 'inbound', 'outbound', or 'bidirectional'"
378                    })),
379                ));
380            }
381        };
382
383        let message_type = WebSocketMessageType {
384            message_type: msg_type_req.message_type,
385            topic: msg_type_req.topic,
386            schema: msg_type_req.schema,
387            direction,
388            description: msg_type_req.description,
389            example: msg_type_req.example,
390        };
391
392        contract.add_message_type(message_type).map_err(|e| {
393            (
394                StatusCode::BAD_REQUEST,
395                Json(serde_json::json!({
396                    "error": "Failed to add message type",
397                    "message": e.to_string()
398                })),
399            )
400        })?;
401    }
402
403    // Register contract
404    let mut registry = state.registry.write().await;
405    registry.register(Box::new(contract));
406
407    let contract = registry.get(&request.contract_id).ok_or_else(|| {
408        (
409            StatusCode::INTERNAL_SERVER_ERROR,
410            Json(serde_json::json!({
411                "error": "Failed to retrieve registered contract",
412                "contract_id": request.contract_id
413            })),
414        )
415    })?;
416    let contract_json = contract.to_json().map_err(|e| {
417        (
418            StatusCode::INTERNAL_SERVER_ERROR,
419            Json(serde_json::json!({
420                "error": "Failed to serialize contract",
421                "message": e.to_string()
422            })),
423        )
424    })?;
425
426    Ok(Json(ProtocolContractResponse {
427        contract_id: request.contract_id,
428        version: request.version,
429        protocol: "websocket".to_string(),
430        contract: contract_json,
431    }))
432}
433
434/// Create an MQTT contract
435pub async fn create_mqtt_contract(
436    State(state): State<ProtocolContractState>,
437    Json(request): Json<CreateMqttContractRequest>,
438) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
439    let mut contract = MqttContract::new(request.contract_id.clone(), request.version.clone());
440
441    // Add topics
442    for topic_req in request.topics {
443        let topic_schema = MqttTopicSchema {
444            topic: topic_req.topic,
445            qos: topic_req.qos,
446            schema: topic_req.schema,
447            retained: topic_req.retained,
448            description: topic_req.description,
449            example: topic_req.example,
450        };
451
452        contract.add_topic(topic_schema).map_err(|e| {
453            (
454                StatusCode::BAD_REQUEST,
455                Json(serde_json::json!({
456                    "error": "Failed to add topic",
457                    "message": e.to_string()
458                })),
459            )
460        })?;
461    }
462
463    // Register contract
464    let mut registry = state.registry.write().await;
465    registry.register(Box::new(contract));
466
467    let contract = registry.get(&request.contract_id).ok_or_else(|| {
468        (
469            StatusCode::INTERNAL_SERVER_ERROR,
470            Json(serde_json::json!({
471                "error": "Failed to retrieve registered contract",
472                "contract_id": request.contract_id
473            })),
474        )
475    })?;
476    let contract_json = contract.to_json().map_err(|e| {
477        (
478            StatusCode::INTERNAL_SERVER_ERROR,
479            Json(serde_json::json!({
480                "error": "Failed to serialize contract",
481                "message": e.to_string()
482            })),
483        )
484    })?;
485
486    Ok(Json(ProtocolContractResponse {
487        contract_id: request.contract_id,
488        version: request.version,
489        protocol: "mqtt".to_string(),
490        contract: contract_json,
491    }))
492}
493
494/// Create a Kafka contract
495pub async fn create_kafka_contract(
496    State(state): State<ProtocolContractState>,
497    Json(request): Json<CreateKafkaContractRequest>,
498) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
499    let mut contract = KafkaContract::new(request.contract_id.clone(), request.version.clone());
500
501    // Add topics
502    for topic_req in request.topics {
503        let format = match topic_req.value_schema.format.as_str() {
504            "json" => SchemaFormat::Json,
505            "avro" => SchemaFormat::Avro,
506            "protobuf" => SchemaFormat::Protobuf,
507            _ => {
508                return Err((
509                    StatusCode::BAD_REQUEST,
510                    Json(serde_json::json!({
511                        "error": "Invalid schema format",
512                        "message": "Format must be 'json', 'avro', or 'protobuf'"
513                    })),
514                ));
515            }
516        };
517
518        let value_schema = TopicSchema {
519            format,
520            schema: topic_req.value_schema.schema,
521            schema_id: topic_req.value_schema.schema_id,
522            version: topic_req.value_schema.version,
523        };
524
525        let key_schema = topic_req.key_schema.map(|ks_req| {
526            let format = match ks_req.format.as_str() {
527                "json" => SchemaFormat::Json,
528                "avro" => SchemaFormat::Avro,
529                "protobuf" => SchemaFormat::Protobuf,
530                _ => SchemaFormat::Json, // Default to JSON
531            };
532
533            TopicSchema {
534                format,
535                schema: ks_req.schema,
536                schema_id: ks_req.schema_id,
537                version: ks_req.version,
538            }
539        });
540
541        let evolution_rules = topic_req.evolution_rules.map(|er_req| {
542            mockforge_core::contract_drift::EvolutionRules {
543                allow_backward_compatible: er_req.allow_backward_compatible,
544                allow_forward_compatible: er_req.allow_forward_compatible,
545                require_version_bump: er_req.require_version_bump,
546            }
547        });
548
549        let topic_schema = KafkaTopicSchema {
550            topic: topic_req.topic,
551            key_schema,
552            value_schema,
553            partitions: topic_req.partitions,
554            replication_factor: topic_req.replication_factor,
555            description: topic_req.description,
556            evolution_rules,
557        };
558
559        contract.add_topic(topic_schema).map_err(|e| {
560            (
561                StatusCode::BAD_REQUEST,
562                Json(serde_json::json!({
563                    "error": "Failed to add topic",
564                    "message": e.to_string()
565                })),
566            )
567        })?;
568    }
569
570    // Register contract
571    let mut registry = state.registry.write().await;
572    registry.register(Box::new(contract));
573
574    let contract = registry.get(&request.contract_id).ok_or_else(|| {
575        (
576            StatusCode::INTERNAL_SERVER_ERROR,
577            Json(serde_json::json!({
578                "error": "Failed to retrieve registered contract",
579                "contract_id": request.contract_id
580            })),
581        )
582    })?;
583    let contract_json = contract.to_json().map_err(|e| {
584        (
585            StatusCode::INTERNAL_SERVER_ERROR,
586            Json(serde_json::json!({
587                "error": "Failed to serialize contract",
588                "message": e.to_string()
589            })),
590        )
591    })?;
592
593    Ok(Json(ProtocolContractResponse {
594        contract_id: request.contract_id,
595        version: request.version,
596        protocol: "kafka".to_string(),
597        contract: contract_json,
598    }))
599}
600
601/// Delete a contract
602pub async fn delete_contract(
603    State(state): State<ProtocolContractState>,
604    Path(contract_id): Path<String>,
605) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
606    let mut registry = state.registry.write().await;
607
608    registry.remove(&contract_id).ok_or_else(|| {
609        (
610            StatusCode::NOT_FOUND,
611            Json(serde_json::json!({
612                "error": "Contract not found",
613                "contract_id": contract_id
614            })),
615        )
616    })?;
617
618    Ok(Json(serde_json::json!({
619        "message": "Contract deleted",
620        "contract_id": contract_id
621    })))
622}
623
624/// Compare two contracts
625pub async fn compare_contracts_handler(
626    State(state): State<ProtocolContractState>,
627    Json(request): Json<CompareContractsRequest>,
628) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
629    let registry = state.registry.read().await;
630
631    let old_contract = registry.get(&request.old_contract_id).ok_or_else(|| {
632        (
633            StatusCode::NOT_FOUND,
634            Json(serde_json::json!({
635                "error": "Old contract not found",
636                "contract_id": request.old_contract_id
637            })),
638        )
639    })?;
640
641    let new_contract = registry.get(&request.new_contract_id).ok_or_else(|| {
642        (
643            StatusCode::NOT_FOUND,
644            Json(serde_json::json!({
645                "error": "New contract not found",
646                "contract_id": request.new_contract_id
647            })),
648        )
649    })?;
650
651    let diff_result = compare_contracts(old_contract, new_contract).await.map_err(|e| {
652        (
653            StatusCode::BAD_REQUEST,
654            Json(serde_json::json!({
655                "error": "Failed to compare contracts",
656                "message": e.to_string()
657            })),
658        )
659    })?;
660
661    // Evaluate drift and create incidents if drift engine is available
662    let mut drift_evaluation = None;
663    if let (Some(ref drift_engine), Some(ref incident_manager)) =
664        (&state.drift_engine, &state.incident_manager)
665    {
666        // Get protocol type
667        let protocol = new_contract.protocol();
668
669        // For each operation in the contract, evaluate drift
670        let operations = new_contract.operations();
671        for operation in operations {
672            let operation_id = &operation.id;
673
674            // Determine endpoint and method from operation type
675            let (endpoint, method) = match &operation.operation_type {
676                mockforge_core::contract_drift::protocol_contracts::OperationType::HttpEndpoint { path, method } => {
677                    (path.clone(), method.clone())
678                }
679                mockforge_core::contract_drift::protocol_contracts::OperationType::GrpcMethod { service, method } => {
680                    (format!("{}.{}", service, method), "grpc".to_string())
681                }
682                mockforge_core::contract_drift::protocol_contracts::OperationType::WebSocketMessage { message_type, .. } => {
683                    (message_type.clone(), "websocket".to_string())
684                }
685                mockforge_core::contract_drift::protocol_contracts::OperationType::MqttTopic { topic, qos: _ } => {
686                    (topic.clone(), "mqtt".to_string())
687                }
688                mockforge_core::contract_drift::protocol_contracts::OperationType::KafkaTopic { topic, key_schema: _, value_schema: _ } => {
689                    (topic.clone(), "kafka".to_string())
690                }
691            };
692
693            // Evaluate drift budget (for protocol contracts, we need to use evaluate_with_specs equivalent)
694            // Since we don't have OpenAPI specs for protocol contracts, we'll use a simplified evaluation
695            // that works with the diff result directly
696            let drift_result = drift_engine.evaluate(&diff_result, &endpoint, &method);
697
698            // Run fitness tests if registry is available
699            let mut drift_result_with_fitness = drift_result.clone();
700            if let Some(ref fitness_registry) = state.fitness_registry {
701                let rt = tokio::runtime::Handle::try_current();
702                if let Ok(handle) = rt {
703                    let fitness_results = handle.block_on(async {
704                        let guard = fitness_registry.read().await;
705                        guard.evaluate_all_protocol(
706                            Some(old_contract),
707                            new_contract,
708                            &diff_result,
709                            operation_id,
710                            None, // workspace_id
711                            None, // service_name
712                        )
713                    });
714                    if let Ok(results) = fitness_results {
715                        drift_result_with_fitness.fitness_test_results = results;
716                        if drift_result_with_fitness.fitness_test_results.iter().any(|r| !r.passed)
717                        {
718                            drift_result_with_fitness.should_create_incident = true;
719                        }
720                    }
721                }
722            }
723
724            // Analyze consumer impact if analyzer is available
725            // Use operation_id for more flexible protocol-specific matching
726            if let Some(ref consumer_analyzer) = state.consumer_analyzer {
727                let rt = tokio::runtime::Handle::try_current();
728                if let Ok(handle) = rt {
729                    let impact = handle.block_on(async {
730                        let guard = consumer_analyzer.read().await;
731                        // Use analyze_impact_with_operation_id for better protocol support
732                        guard.analyze_impact_with_operation_id(
733                            &endpoint,
734                            &method,
735                            Some(operation_id),
736                        )
737                    });
738                    if let Some(impact) = impact {
739                        drift_result_with_fitness.consumer_impact = Some(impact);
740                    }
741                }
742            }
743
744            // Create incident if budget is exceeded or breaking changes detected
745            if drift_result_with_fitness.should_create_incident {
746                let incident_type = if drift_result_with_fitness.breaking_changes > 0 {
747                    mockforge_core::incidents::types::IncidentType::BreakingChange
748                } else {
749                    mockforge_core::incidents::types::IncidentType::ThresholdExceeded
750                };
751
752                let severity = if drift_result_with_fitness.breaking_changes > 0 {
753                    mockforge_core::incidents::types::IncidentSeverity::High
754                } else if drift_result_with_fitness.potentially_breaking_changes > 0 {
755                    mockforge_core::incidents::types::IncidentSeverity::Medium
756                } else {
757                    mockforge_core::incidents::types::IncidentSeverity::Low
758                };
759
760                let details = serde_json::json!({
761                    "breaking_changes": drift_result_with_fitness.breaking_changes,
762                    "potentially_breaking_changes": drift_result_with_fitness.potentially_breaking_changes,
763                    "non_breaking_changes": drift_result_with_fitness.non_breaking_changes,
764                    "budget_exceeded": drift_result_with_fitness.budget_exceeded,
765                    "operation_id": operation_id,
766                    "operation_type": format!("{:?}", operation.operation_type),
767                });
768
769                let before_sample = Some(serde_json::json!({
770                    "contract_id": old_contract.contract_id(),
771                    "version": old_contract.version(),
772                    "protocol": format!("{:?}", old_contract.protocol()),
773                    "operation_id": operation_id,
774                }));
775
776                let after_sample = Some(serde_json::json!({
777                    "contract_id": new_contract.contract_id(),
778                    "version": new_contract.version(),
779                    "protocol": format!("{:?}", new_contract.protocol()),
780                    "operation_id": operation_id,
781                    "mismatches": diff_result.mismatches,
782                }));
783
784                let _incident = incident_manager
785                    .create_incident_with_samples(
786                        endpoint.clone(),
787                        method.clone(),
788                        incident_type,
789                        severity,
790                        details,
791                        None, // budget_id
792                        None, // workspace_id
793                        None, // sync_cycle_id
794                        None, // contract_diff_id
795                        before_sample,
796                        after_sample,
797                        Some(drift_result_with_fitness.fitness_test_results.clone()),
798                        drift_result_with_fitness.consumer_impact.clone(),
799                        Some(protocol),
800                    )
801                    .await;
802            }
803
804            drift_evaluation = Some(serde_json::json!({
805                "operation_id": operation_id,
806                "endpoint": endpoint,
807                "method": method,
808                "budget_exceeded": drift_result_with_fitness.budget_exceeded,
809                "breaking_changes": drift_result_with_fitness.breaking_changes,
810                "fitness_test_results": drift_result_with_fitness.fitness_test_results,
811                "consumer_impact": drift_result_with_fitness.consumer_impact,
812            }));
813        }
814    }
815
816    Ok(Json(serde_json::json!({
817        "matches": diff_result.matches,
818        "confidence": diff_result.confidence,
819        "mismatches": diff_result.mismatches,
820        "recommendations": diff_result.recommendations,
821        "corrections": diff_result.corrections,
822        "drift_evaluation": drift_evaluation,
823    })))
824}
825
826/// Validate a message against a contract
827pub async fn validate_message(
828    State(state): State<ProtocolContractState>,
829    Path(contract_id): Path<String>,
830    Json(request): Json<ValidateMessageRequest>,
831) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
832    let registry = state.registry.read().await;
833
834    let contract = registry.get(&contract_id).ok_or_else(|| {
835        (
836            StatusCode::NOT_FOUND,
837            Json(serde_json::json!({
838                "error": "Contract not found",
839                "contract_id": contract_id
840            })),
841        )
842    })?;
843
844    // Convert payload to bytes
845    let payload_bytes = match request.payload {
846        serde_json::Value::String(s) => {
847            // Try base64 decode first, then fall back to UTF-8
848            general_purpose::STANDARD.decode(&s).unwrap_or_else(|_| s.into_bytes())
849        }
850        _ => serde_json::to_vec(&request.payload).map_err(|e| {
851            (
852                StatusCode::BAD_REQUEST,
853                Json(serde_json::json!({
854                    "error": "Failed to serialize payload",
855                    "message": e.to_string()
856                })),
857            )
858        })?,
859    };
860
861    let contract_request = mockforge_core::contract_drift::protocol_contracts::ContractRequest {
862        protocol: contract.protocol(),
863        operation_id: request.operation_id.clone(),
864        payload: payload_bytes,
865        content_type: request.content_type,
866        metadata: request.metadata.unwrap_or_default(),
867    };
868
869    let validation_result =
870        contract.validate(&request.operation_id, &contract_request).await.map_err(|e| {
871            (
872                StatusCode::BAD_REQUEST,
873                Json(serde_json::json!({
874                    "error": "Validation failed",
875                    "message": e.to_string()
876                })),
877            )
878        })?;
879
880    Ok(Json(serde_json::json!({
881        "valid": validation_result.valid,
882        "errors": validation_result.errors,
883        "warnings": validation_result.warnings,
884    })))
885}
886
887/// Get contract router
888pub fn protocol_contracts_router(state: ProtocolContractState) -> axum::Router {
889    use axum::routing::{delete, get, post};
890
891    axum::Router::new()
892        .route("/", get(list_contracts))
893        .route("/{contract_id}", get(get_contract))
894        .route("/{contract_id}", delete(delete_contract))
895        .route("/grpc", post(create_grpc_contract))
896        .route("/websocket", post(create_websocket_contract))
897        .route("/mqtt", post(create_mqtt_contract))
898        .route("/kafka", post(create_kafka_contract))
899        .route("/compare", post(compare_contracts_handler))
900        .route("/{contract_id}/validate", post(validate_message))
901        .with_state(state)
902}