Skip to main content

mockforge_http/handlers/
protocol_contracts.rs

1//! Protocol contract management handlers
2//!
3//! This module provides HTTP handlers for managing protocol contracts (gRPC, WebSocket, MQTT, Kafka).
4
5// Per-protocol contract impls (GrpcContract, KafkaContract, MqttContract,
6// WebSocketContract, ProtocolContractRegistry) stay in mockforge-core because
7// they hold compiled jsonschema validators. Allow here until a future phase
8// extracts them to mockforge-contracts.
9#![allow(deprecated)]
10
11use axum::{
12    extract::{Path, Query, State},
13    http::StatusCode,
14    response::Json,
15};
16use mockforge_core::contract_drift::protocol_contracts::{
17    compare_contracts, ProtocolContractRegistry,
18};
19use mockforge_core::contract_drift::{
20    GrpcContract, KafkaContract, MqttContract, WebSocketContract,
21};
22use mockforge_foundation::protocol::Protocol;
23use mockforge_foundation::protocol_contract_types::{
24    KafkaTopicSchema, MqttTopicSchema, SchemaFormat, TopicSchema, WebSocketMessageType,
25};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::sync::Arc;
29use tokio::sync::RwLock;
30
31// Base64 encoding/decoding
32use base64::{engine::general_purpose, Engine as _};
33
34/// State for protocol contract handlers
35#[derive(Clone)]
36pub struct ProtocolContractState {
37    /// Protocol contract registry
38    pub registry: Arc<RwLock<ProtocolContractRegistry>>,
39    /// Optional drift budget engine for evaluating contract changes
40    pub drift_engine: Option<Arc<mockforge_core::contract_drift::DriftBudgetEngine>>,
41    /// Optional incident manager for creating drift incidents
42    pub incident_manager: Option<Arc<mockforge_core::incidents::IncidentManager>>,
43    /// Optional fitness function registry for evaluating fitness rules
44    pub fitness_registry:
45        Option<Arc<RwLock<mockforge_core::contract_drift::FitnessFunctionRegistry>>>,
46    /// Optional consumer impact analyzer
47    pub consumer_analyzer:
48        Option<Arc<RwLock<mockforge_core::contract_drift::ConsumerImpactAnalyzer>>>,
49}
50
51/// Request to create a gRPC contract
52#[derive(Debug, Deserialize)]
53pub struct CreateGrpcContractRequest {
54    /// Contract ID
55    pub contract_id: String,
56    /// Contract version
57    pub version: String,
58    /// Protobuf descriptor set (base64 encoded)
59    pub descriptor_set: String,
60}
61
62/// Request to create a WebSocket contract
63#[derive(Debug, Deserialize)]
64pub struct CreateWebSocketContractRequest {
65    /// Contract ID
66    pub contract_id: String,
67    /// Contract version
68    pub version: String,
69    /// Message types
70    pub message_types: Vec<WebSocketMessageTypeRequest>,
71}
72
73/// Request for a WebSocket message type
74#[derive(Debug, Deserialize)]
75pub struct WebSocketMessageTypeRequest {
76    /// Message type identifier
77    pub message_type: String,
78    /// Optional topic or channel name
79    pub topic: Option<String>,
80    /// JSON schema for this message type
81    pub schema: serde_json::Value,
82    /// Direction: "inbound", "outbound", or "bidirectional"
83    pub direction: String,
84    /// Description of this message type
85    pub description: Option<String>,
86    /// Example message payload
87    pub example: Option<serde_json::Value>,
88}
89
90/// Request to create an MQTT contract
91#[derive(Debug, Deserialize)]
92pub struct CreateMqttContractRequest {
93    /// Contract ID
94    pub contract_id: String,
95    /// Contract version
96    pub version: String,
97    /// Topic schemas
98    pub topics: Vec<MqttTopicSchemaRequest>,
99}
100
101/// Request for an MQTT topic schema
102#[derive(Debug, Deserialize)]
103pub struct MqttTopicSchemaRequest {
104    /// Topic name
105    pub topic: String,
106    /// Quality of Service level (0, 1, or 2)
107    pub qos: Option<u8>,
108    /// JSON schema for messages on this topic
109    pub schema: serde_json::Value,
110    /// Whether messages are retained
111    pub retained: Option<bool>,
112    /// Description of this topic
113    pub description: Option<String>,
114    /// Example message payload
115    pub example: Option<serde_json::Value>,
116}
117
118/// Request to create a Kafka contract
119#[derive(Debug, Deserialize)]
120pub struct CreateKafkaContractRequest {
121    /// Contract ID
122    pub contract_id: String,
123    /// Contract version
124    pub version: String,
125    /// Topic schemas
126    pub topics: Vec<KafkaTopicSchemaRequest>,
127}
128
129/// Request for a Kafka topic schema
130#[derive(Debug, Deserialize)]
131pub struct KafkaTopicSchemaRequest {
132    /// Topic name
133    pub topic: String,
134    /// Key schema (optional)
135    pub key_schema: Option<TopicSchemaRequest>,
136    /// Value schema (required)
137    pub value_schema: TopicSchemaRequest,
138    /// Number of partitions
139    pub partitions: Option<u32>,
140    /// Replication factor
141    pub replication_factor: Option<u16>,
142    /// Description of this topic
143    pub description: Option<String>,
144    /// Evolution rules for schema changes
145    pub evolution_rules: Option<EvolutionRulesRequest>,
146}
147
148/// Request for a topic schema (key or value)
149#[derive(Debug, Deserialize)]
150pub struct TopicSchemaRequest {
151    /// Schema format: "json", "avro", or "protobuf"
152    pub format: String,
153    /// Schema definition
154    pub schema: serde_json::Value,
155    /// Schema registry ID (if using schema registry)
156    pub schema_id: Option<String>,
157    /// Schema version
158    pub version: Option<String>,
159}
160
161/// Request for evolution rules
162#[derive(Debug, Deserialize)]
163pub struct EvolutionRulesRequest {
164    /// Allow backward compatible changes
165    pub allow_backward_compatible: bool,
166    /// Allow forward compatible changes
167    pub allow_forward_compatible: bool,
168    /// Require explicit version bump for breaking changes
169    pub require_version_bump: bool,
170}
171
172/// Response for protocol contract operations
173#[derive(Debug, Serialize)]
174pub struct ProtocolContractResponse {
175    /// Contract ID
176    pub contract_id: String,
177    /// Contract version
178    pub version: String,
179    /// Protocol type
180    pub protocol: String,
181    /// Contract JSON representation
182    pub contract: serde_json::Value,
183}
184
185/// Response for listing contracts
186#[derive(Debug, Serialize)]
187pub struct ListContractsResponse {
188    /// List of contracts
189    pub contracts: Vec<ProtocolContractResponse>,
190    /// Total count
191    pub total: usize,
192}
193
194/// Request to compare contracts
195#[derive(Debug, Deserialize)]
196pub struct CompareContractsRequest {
197    /// Old contract ID
198    pub old_contract_id: String,
199    /// New contract ID
200    pub new_contract_id: String,
201}
202
203/// Request to validate a message
204#[derive(Debug, Deserialize)]
205pub struct ValidateMessageRequest {
206    /// Operation ID (endpoint, method, topic, etc.)
207    pub operation_id: String,
208    /// Message payload (base64 encoded or JSON)
209    pub payload: serde_json::Value,
210    /// Content type
211    pub content_type: Option<String>,
212    /// Additional metadata
213    pub metadata: Option<HashMap<String, String>>,
214}
215
216/// List all protocol contracts
217pub async fn list_contracts(
218    State(state): State<ProtocolContractState>,
219    Query(params): Query<HashMap<String, String>>,
220) -> Result<Json<ListContractsResponse>, (StatusCode, Json<serde_json::Value>)> {
221    let registry = state.registry.read().await;
222
223    let protocol_filter = params.get("protocol").and_then(|p| match p.as_str() {
224        "grpc" => Some(Protocol::Grpc),
225        "websocket" => Some(Protocol::WebSocket),
226        "mqtt" => Some(Protocol::Mqtt),
227        "kafka" => Some(Protocol::Kafka),
228        _ => None,
229    });
230
231    let contracts: Vec<ProtocolContractResponse> = if let Some(protocol) = protocol_filter {
232        registry
233            .list_by_protocol(protocol)
234            .iter()
235            .map(|contract| {
236                let contract_json = contract.to_json().unwrap_or_else(|_| serde_json::json!({}));
237                ProtocolContractResponse {
238                    contract_id: contract.contract_id().to_string(),
239                    version: contract.version().to_string(),
240                    protocol: format!("{:?}", contract.protocol()).to_lowercase(),
241                    contract: contract_json,
242                }
243            })
244            .collect()
245    } else {
246        registry
247            .list()
248            .iter()
249            .map(|contract| {
250                let contract_json = contract.to_json().unwrap_or_else(|_| serde_json::json!({}));
251                ProtocolContractResponse {
252                    contract_id: contract.contract_id().to_string(),
253                    version: contract.version().to_string(),
254                    protocol: format!("{:?}", contract.protocol()).to_lowercase(),
255                    contract: contract_json,
256                }
257            })
258            .collect()
259    };
260
261    Ok(Json(ListContractsResponse {
262        total: contracts.len(),
263        contracts,
264    }))
265}
266
267/// Get a specific contract
268pub async fn get_contract(
269    State(state): State<ProtocolContractState>,
270    Path(contract_id): Path<String>,
271) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
272    let registry = state.registry.read().await;
273
274    let contract = registry.get(&contract_id).ok_or_else(|| {
275        (
276            StatusCode::NOT_FOUND,
277            Json(serde_json::json!({
278                "error": "Contract not found",
279                "contract_id": contract_id
280            })),
281        )
282    })?;
283
284    let contract_json = contract.to_json().map_err(|e| {
285        (
286            StatusCode::INTERNAL_SERVER_ERROR,
287            Json(serde_json::json!({
288                "error": "Failed to serialize contract",
289                "message": e.to_string()
290            })),
291        )
292    })?;
293
294    Ok(Json(ProtocolContractResponse {
295        contract_id: contract.contract_id().to_string(),
296        version: contract.version().to_string(),
297        protocol: format!("{:?}", contract.protocol()).to_lowercase(),
298        contract: contract_json,
299    }))
300}
301
302/// Create a gRPC contract
303pub async fn create_grpc_contract(
304    State(state): State<ProtocolContractState>,
305    Json(request): Json<CreateGrpcContractRequest>,
306) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
307    // Decode base64 descriptor set
308    let descriptor_bytes =
309        general_purpose::STANDARD.decode(&request.descriptor_set).map_err(|e| {
310            (
311                StatusCode::BAD_REQUEST,
312                Json(serde_json::json!({
313                    "error": "Invalid base64 descriptor set",
314                    "message": e.to_string()
315                })),
316            )
317        })?;
318
319    // Create descriptor pool from bytes
320    // Note: GrpcContract::from_descriptor_set handles the descriptor pool creation
321    let contract = GrpcContract::from_descriptor_set(
322        request.contract_id.clone(),
323        request.version.clone(),
324        &descriptor_bytes,
325    )
326    .map_err(|e| {
327        (
328            StatusCode::BAD_REQUEST,
329            Json(serde_json::json!({
330                "error": "Failed to create gRPC contract",
331                "message": e.to_string()
332            })),
333        )
334    })?;
335
336    // Register contract
337    let mut registry = state.registry.write().await;
338    registry.register(Box::new(contract));
339
340    let contract = registry.get(&request.contract_id).ok_or_else(|| {
341        (
342            StatusCode::INTERNAL_SERVER_ERROR,
343            Json(serde_json::json!({
344                "error": "Failed to retrieve registered contract",
345                "contract_id": request.contract_id
346            })),
347        )
348    })?;
349    let contract_json = contract.to_json().map_err(|e| {
350        (
351            StatusCode::INTERNAL_SERVER_ERROR,
352            Json(serde_json::json!({
353                "error": "Failed to serialize contract",
354                "message": e.to_string()
355            })),
356        )
357    })?;
358
359    Ok(Json(ProtocolContractResponse {
360        contract_id: request.contract_id,
361        version: request.version,
362        protocol: "grpc".to_string(),
363        contract: contract_json,
364    }))
365}
366
367/// Create a WebSocket contract
368pub async fn create_websocket_contract(
369    State(state): State<ProtocolContractState>,
370    Json(request): Json<CreateWebSocketContractRequest>,
371) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
372    let mut contract = WebSocketContract::new(request.contract_id.clone(), request.version.clone());
373
374    // Add message types
375    for msg_type_req in request.message_types {
376        let direction = match msg_type_req.direction.as_str() {
377            "inbound" => mockforge_core::contract_drift::MessageDirection::Inbound,
378            "outbound" => mockforge_core::contract_drift::MessageDirection::Outbound,
379            "bidirectional" => mockforge_core::contract_drift::MessageDirection::Bidirectional,
380            _ => {
381                return Err((
382                    StatusCode::BAD_REQUEST,
383                    Json(serde_json::json!({
384                        "error": "Invalid direction",
385                        "message": "Direction must be 'inbound', 'outbound', or 'bidirectional'"
386                    })),
387                ));
388            }
389        };
390
391        let message_type = WebSocketMessageType {
392            message_type: msg_type_req.message_type,
393            topic: msg_type_req.topic,
394            schema: msg_type_req.schema,
395            direction,
396            description: msg_type_req.description,
397            example: msg_type_req.example,
398        };
399
400        contract.add_message_type(message_type).map_err(|e| {
401            (
402                StatusCode::BAD_REQUEST,
403                Json(serde_json::json!({
404                    "error": "Failed to add message type",
405                    "message": e.to_string()
406                })),
407            )
408        })?;
409    }
410
411    // Register contract
412    let mut registry = state.registry.write().await;
413    registry.register(Box::new(contract));
414
415    let contract = registry.get(&request.contract_id).ok_or_else(|| {
416        (
417            StatusCode::INTERNAL_SERVER_ERROR,
418            Json(serde_json::json!({
419                "error": "Failed to retrieve registered contract",
420                "contract_id": request.contract_id
421            })),
422        )
423    })?;
424    let contract_json = contract.to_json().map_err(|e| {
425        (
426            StatusCode::INTERNAL_SERVER_ERROR,
427            Json(serde_json::json!({
428                "error": "Failed to serialize contract",
429                "message": e.to_string()
430            })),
431        )
432    })?;
433
434    Ok(Json(ProtocolContractResponse {
435        contract_id: request.contract_id,
436        version: request.version,
437        protocol: "websocket".to_string(),
438        contract: contract_json,
439    }))
440}
441
442/// Create an MQTT contract
443pub async fn create_mqtt_contract(
444    State(state): State<ProtocolContractState>,
445    Json(request): Json<CreateMqttContractRequest>,
446) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
447    let mut contract = MqttContract::new(request.contract_id.clone(), request.version.clone());
448
449    // Add topics
450    for topic_req in request.topics {
451        let topic_schema = MqttTopicSchema {
452            topic: topic_req.topic,
453            qos: topic_req.qos,
454            schema: topic_req.schema,
455            retained: topic_req.retained,
456            description: topic_req.description,
457            example: topic_req.example,
458        };
459
460        contract.add_topic(topic_schema).map_err(|e| {
461            (
462                StatusCode::BAD_REQUEST,
463                Json(serde_json::json!({
464                    "error": "Failed to add topic",
465                    "message": e.to_string()
466                })),
467            )
468        })?;
469    }
470
471    // Register contract
472    let mut registry = state.registry.write().await;
473    registry.register(Box::new(contract));
474
475    let contract = registry.get(&request.contract_id).ok_or_else(|| {
476        (
477            StatusCode::INTERNAL_SERVER_ERROR,
478            Json(serde_json::json!({
479                "error": "Failed to retrieve registered contract",
480                "contract_id": request.contract_id
481            })),
482        )
483    })?;
484    let contract_json = contract.to_json().map_err(|e| {
485        (
486            StatusCode::INTERNAL_SERVER_ERROR,
487            Json(serde_json::json!({
488                "error": "Failed to serialize contract",
489                "message": e.to_string()
490            })),
491        )
492    })?;
493
494    Ok(Json(ProtocolContractResponse {
495        contract_id: request.contract_id,
496        version: request.version,
497        protocol: "mqtt".to_string(),
498        contract: contract_json,
499    }))
500}
501
502/// Create a Kafka contract
503pub async fn create_kafka_contract(
504    State(state): State<ProtocolContractState>,
505    Json(request): Json<CreateKafkaContractRequest>,
506) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
507    let mut contract = KafkaContract::new(request.contract_id.clone(), request.version.clone());
508
509    // Add topics
510    for topic_req in request.topics {
511        let format = match topic_req.value_schema.format.as_str() {
512            "json" => SchemaFormat::Json,
513            "avro" => SchemaFormat::Avro,
514            "protobuf" => SchemaFormat::Protobuf,
515            _ => {
516                return Err((
517                    StatusCode::BAD_REQUEST,
518                    Json(serde_json::json!({
519                        "error": "Invalid schema format",
520                        "message": "Format must be 'json', 'avro', or 'protobuf'"
521                    })),
522                ));
523            }
524        };
525
526        let value_schema = TopicSchema {
527            format,
528            schema: topic_req.value_schema.schema,
529            schema_id: topic_req.value_schema.schema_id,
530            version: topic_req.value_schema.version,
531        };
532
533        let key_schema = topic_req.key_schema.map(|ks_req| {
534            let format = match ks_req.format.as_str() {
535                "json" => SchemaFormat::Json,
536                "avro" => SchemaFormat::Avro,
537                "protobuf" => SchemaFormat::Protobuf,
538                _ => SchemaFormat::Json, // Default to JSON
539            };
540
541            TopicSchema {
542                format,
543                schema: ks_req.schema,
544                schema_id: ks_req.schema_id,
545                version: ks_req.version,
546            }
547        });
548
549        let evolution_rules = topic_req.evolution_rules.map(|er_req| {
550            mockforge_core::contract_drift::EvolutionRules {
551                allow_backward_compatible: er_req.allow_backward_compatible,
552                allow_forward_compatible: er_req.allow_forward_compatible,
553                require_version_bump: er_req.require_version_bump,
554            }
555        });
556
557        let topic_schema = KafkaTopicSchema {
558            topic: topic_req.topic,
559            key_schema,
560            value_schema,
561            partitions: topic_req.partitions,
562            replication_factor: topic_req.replication_factor,
563            description: topic_req.description,
564            evolution_rules,
565        };
566
567        contract.add_topic(topic_schema).map_err(|e| {
568            (
569                StatusCode::BAD_REQUEST,
570                Json(serde_json::json!({
571                    "error": "Failed to add topic",
572                    "message": e.to_string()
573                })),
574            )
575        })?;
576    }
577
578    // Register contract
579    let mut registry = state.registry.write().await;
580    registry.register(Box::new(contract));
581
582    let contract = registry.get(&request.contract_id).ok_or_else(|| {
583        (
584            StatusCode::INTERNAL_SERVER_ERROR,
585            Json(serde_json::json!({
586                "error": "Failed to retrieve registered contract",
587                "contract_id": request.contract_id
588            })),
589        )
590    })?;
591    let contract_json = contract.to_json().map_err(|e| {
592        (
593            StatusCode::INTERNAL_SERVER_ERROR,
594            Json(serde_json::json!({
595                "error": "Failed to serialize contract",
596                "message": e.to_string()
597            })),
598        )
599    })?;
600
601    Ok(Json(ProtocolContractResponse {
602        contract_id: request.contract_id,
603        version: request.version,
604        protocol: "kafka".to_string(),
605        contract: contract_json,
606    }))
607}
608
609/// Delete a contract
610pub async fn delete_contract(
611    State(state): State<ProtocolContractState>,
612    Path(contract_id): Path<String>,
613) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
614    let mut registry = state.registry.write().await;
615
616    registry.remove(&contract_id).ok_or_else(|| {
617        (
618            StatusCode::NOT_FOUND,
619            Json(serde_json::json!({
620                "error": "Contract not found",
621                "contract_id": contract_id
622            })),
623        )
624    })?;
625
626    Ok(Json(serde_json::json!({
627        "message": "Contract deleted",
628        "contract_id": contract_id
629    })))
630}
631
632/// Compare two contracts
633pub async fn compare_contracts_handler(
634    State(state): State<ProtocolContractState>,
635    Json(request): Json<CompareContractsRequest>,
636) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
637    let registry = state.registry.read().await;
638
639    let old_contract = registry.get(&request.old_contract_id).ok_or_else(|| {
640        (
641            StatusCode::NOT_FOUND,
642            Json(serde_json::json!({
643                "error": "Old contract not found",
644                "contract_id": request.old_contract_id
645            })),
646        )
647    })?;
648
649    let new_contract = registry.get(&request.new_contract_id).ok_or_else(|| {
650        (
651            StatusCode::NOT_FOUND,
652            Json(serde_json::json!({
653                "error": "New contract not found",
654                "contract_id": request.new_contract_id
655            })),
656        )
657    })?;
658
659    let diff_result = compare_contracts(old_contract, new_contract).await.map_err(|e| {
660        (
661            StatusCode::BAD_REQUEST,
662            Json(serde_json::json!({
663                "error": "Failed to compare contracts",
664                "message": e.to_string()
665            })),
666        )
667    })?;
668
669    // Evaluate drift and create incidents if drift engine is available
670    let mut drift_evaluation = None;
671    if let (Some(ref drift_engine), Some(ref incident_manager)) =
672        (&state.drift_engine, &state.incident_manager)
673    {
674        // Get protocol type
675        let protocol = new_contract.protocol();
676
677        // For each operation in the contract, evaluate drift
678        let operations = new_contract.operations();
679        for operation in operations {
680            let operation_id = &operation.id;
681
682            // Determine endpoint and method from operation type
683            let (endpoint, method) = match &operation.operation_type {
684                mockforge_core::contract_drift::protocol_contracts::OperationType::HttpEndpoint { path, method } => {
685                    (path.clone(), method.clone())
686                }
687                mockforge_core::contract_drift::protocol_contracts::OperationType::GrpcMethod { service, method } => {
688                    (format!("{}.{}", service, method), "grpc".to_string())
689                }
690                mockforge_core::contract_drift::protocol_contracts::OperationType::WebSocketMessage { message_type, .. } => {
691                    (message_type.clone(), "websocket".to_string())
692                }
693                mockforge_core::contract_drift::protocol_contracts::OperationType::MqttTopic { topic, qos: _ } => {
694                    (topic.clone(), "mqtt".to_string())
695                }
696                mockforge_core::contract_drift::protocol_contracts::OperationType::KafkaTopic { topic, key_schema: _, value_schema: _ } => {
697                    (topic.clone(), "kafka".to_string())
698                }
699            };
700
701            // Evaluate drift budget (for protocol contracts, we need to use evaluate_with_specs equivalent)
702            // Since we don't have OpenAPI specs for protocol contracts, we'll use a simplified evaluation
703            // that works with the diff result directly
704            let drift_result = drift_engine.evaluate(&diff_result, &endpoint, &method);
705
706            // Run fitness tests if registry is available
707            let mut drift_result_with_fitness = drift_result.clone();
708            if let Some(ref fitness_registry) = state.fitness_registry {
709                let guard = fitness_registry.read().await;
710                if let Ok(results) = guard.evaluate_all_protocol(
711                    Some(old_contract),
712                    new_contract,
713                    &diff_result,
714                    operation_id,
715                    None, // workspace_id
716                    None, // service_name
717                ) {
718                    drift_result_with_fitness.fitness_test_results = results;
719                    if drift_result_with_fitness.fitness_test_results.iter().any(|r| !r.passed) {
720                        drift_result_with_fitness.should_create_incident = true;
721                    }
722                }
723            }
724
725            // Analyze consumer impact if analyzer is available
726            // Use operation_id for more flexible protocol-specific matching
727            if let Some(ref consumer_analyzer) = state.consumer_analyzer {
728                let guard = consumer_analyzer.read().await;
729                let impact =
730                    guard.analyze_impact_with_operation_id(&endpoint, &method, Some(operation_id));
731                if let Some(impact) = impact {
732                    drift_result_with_fitness.consumer_impact = Some(impact);
733                }
734            }
735
736            // Create incident if budget is exceeded or breaking changes detected
737            if drift_result_with_fitness.should_create_incident {
738                let incident_type = if drift_result_with_fitness.breaking_changes > 0 {
739                    mockforge_core::incidents::types::IncidentType::BreakingChange
740                } else {
741                    mockforge_core::incidents::types::IncidentType::ThresholdExceeded
742                };
743
744                let severity = if drift_result_with_fitness.breaking_changes > 0 {
745                    mockforge_core::incidents::types::IncidentSeverity::High
746                } else if drift_result_with_fitness.potentially_breaking_changes > 0 {
747                    mockforge_core::incidents::types::IncidentSeverity::Medium
748                } else {
749                    mockforge_core::incidents::types::IncidentSeverity::Low
750                };
751
752                let details = serde_json::json!({
753                    "breaking_changes": drift_result_with_fitness.breaking_changes,
754                    "potentially_breaking_changes": drift_result_with_fitness.potentially_breaking_changes,
755                    "non_breaking_changes": drift_result_with_fitness.non_breaking_changes,
756                    "budget_exceeded": drift_result_with_fitness.budget_exceeded,
757                    "operation_id": operation_id,
758                    "operation_type": format!("{:?}", operation.operation_type),
759                });
760
761                let before_sample = Some(serde_json::json!({
762                    "contract_id": old_contract.contract_id(),
763                    "version": old_contract.version(),
764                    "protocol": format!("{:?}", old_contract.protocol()),
765                    "operation_id": operation_id,
766                }));
767
768                let after_sample = Some(serde_json::json!({
769                    "contract_id": new_contract.contract_id(),
770                    "version": new_contract.version(),
771                    "protocol": format!("{:?}", new_contract.protocol()),
772                    "operation_id": operation_id,
773                    "mismatches": diff_result.mismatches,
774                }));
775
776                let _incident = incident_manager
777                    .create_incident_with_samples(
778                        endpoint.clone(),
779                        method.clone(),
780                        incident_type,
781                        severity,
782                        details,
783                        None, // budget_id
784                        None, // workspace_id
785                        None, // sync_cycle_id
786                        None, // contract_diff_id
787                        before_sample,
788                        after_sample,
789                        Some(drift_result_with_fitness.fitness_test_results.clone()),
790                        drift_result_with_fitness.consumer_impact.clone(),
791                        Some(protocol),
792                    )
793                    .await;
794            }
795
796            drift_evaluation = Some(serde_json::json!({
797                "operation_id": operation_id,
798                "endpoint": endpoint,
799                "method": method,
800                "budget_exceeded": drift_result_with_fitness.budget_exceeded,
801                "breaking_changes": drift_result_with_fitness.breaking_changes,
802                "fitness_test_results": drift_result_with_fitness.fitness_test_results,
803                "consumer_impact": drift_result_with_fitness.consumer_impact,
804            }));
805        }
806    }
807
808    Ok(Json(serde_json::json!({
809        "matches": diff_result.matches,
810        "confidence": diff_result.confidence,
811        "mismatches": diff_result.mismatches,
812        "recommendations": diff_result.recommendations,
813        "corrections": diff_result.corrections,
814        "drift_evaluation": drift_evaluation,
815    })))
816}
817
818/// Validate a message against a contract
819pub async fn validate_message(
820    State(state): State<ProtocolContractState>,
821    Path(contract_id): Path<String>,
822    Json(request): Json<ValidateMessageRequest>,
823) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
824    let registry = state.registry.read().await;
825
826    let contract = registry.get(&contract_id).ok_or_else(|| {
827        (
828            StatusCode::NOT_FOUND,
829            Json(serde_json::json!({
830                "error": "Contract not found",
831                "contract_id": contract_id
832            })),
833        )
834    })?;
835
836    // Convert payload to bytes
837    let payload_bytes = match request.payload {
838        serde_json::Value::String(s) => {
839            // Try base64 decode first, then fall back to UTF-8
840            general_purpose::STANDARD.decode(&s).unwrap_or_else(|_| s.into_bytes())
841        }
842        _ => serde_json::to_vec(&request.payload).map_err(|e| {
843            (
844                StatusCode::BAD_REQUEST,
845                Json(serde_json::json!({
846                    "error": "Failed to serialize payload",
847                    "message": e.to_string()
848                })),
849            )
850        })?,
851    };
852
853    let contract_request = mockforge_core::contract_drift::protocol_contracts::ContractRequest {
854        protocol: contract.protocol(),
855        operation_id: request.operation_id.clone(),
856        payload: payload_bytes,
857        content_type: request.content_type,
858        metadata: request.metadata.unwrap_or_default(),
859    };
860
861    let validation_result =
862        contract.validate(&request.operation_id, &contract_request).await.map_err(|e| {
863            (
864                StatusCode::BAD_REQUEST,
865                Json(serde_json::json!({
866                    "error": "Validation failed",
867                    "message": e.to_string()
868                })),
869            )
870        })?;
871
872    Ok(Json(serde_json::json!({
873        "valid": validation_result.valid,
874        "errors": validation_result.errors,
875        "warnings": validation_result.warnings,
876    })))
877}
878
879/// Get contract router
880pub fn protocol_contracts_router(state: ProtocolContractState) -> axum::Router {
881    use axum::routing::{delete, get, post};
882
883    axum::Router::new()
884        .route("/", get(list_contracts))
885        .route("/{contract_id}", get(get_contract))
886        .route("/{contract_id}", delete(delete_contract))
887        .route("/grpc", post(create_grpc_contract))
888        .route("/websocket", post(create_websocket_contract))
889        .route("/mqtt", post(create_mqtt_contract))
890        .route("/kafka", post(create_kafka_contract))
891        .route("/compare", post(compare_contracts_handler))
892        .route("/{contract_id}/validate", post(validate_message))
893        .with_state(state)
894}