mockforge_http/handlers/
protocol_contracts.rs

1//! Protocol contract management handlers
2//!
3//! This module provides HTTP handlers for managing protocol contracts (gRPC, WebSocket, MQTT, Kafka).
4
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::Json,
9};
10use mockforge_core::contract_drift::protocol_contracts::{
11    compare_contracts, ProtocolContractRegistry,
12};
13use mockforge_core::contract_drift::{
14    GrpcContract, KafkaContract, KafkaTopicSchema, MqttContract, MqttTopicSchema, SchemaFormat,
15    TopicSchema, WebSocketContract, WebSocketMessageType,
16};
17use mockforge_core::protocol_abstraction::Protocol;
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::RwLock;
22
23// Base64 encoding/decoding
24use base64::{engine::general_purpose, Engine as _};
25
26/// State for protocol contract handlers
27#[derive(Clone)]
28pub struct ProtocolContractState {
29    /// Protocol contract registry
30    pub registry: Arc<RwLock<ProtocolContractRegistry>>,
31    /// Optional drift budget engine for evaluating contract changes
32    pub drift_engine: Option<Arc<mockforge_core::contract_drift::DriftBudgetEngine>>,
33    /// Optional incident manager for creating drift incidents
34    pub incident_manager: Option<Arc<mockforge_core::incidents::IncidentManager>>,
35    /// Optional fitness function registry for evaluating fitness rules
36    pub fitness_registry:
37        Option<Arc<RwLock<mockforge_core::contract_drift::FitnessFunctionRegistry>>>,
38    /// Optional consumer impact analyzer
39    pub consumer_analyzer:
40        Option<Arc<RwLock<mockforge_core::contract_drift::ConsumerImpactAnalyzer>>>,
41}
42
43/// Request to create a gRPC contract
44#[derive(Debug, Deserialize)]
45pub struct CreateGrpcContractRequest {
46    /// Contract ID
47    pub contract_id: String,
48    /// Contract version
49    pub version: String,
50    /// Protobuf descriptor set (base64 encoded)
51    pub descriptor_set: String,
52}
53
54/// Request to create a WebSocket contract
55#[derive(Debug, Deserialize)]
56pub struct CreateWebSocketContractRequest {
57    /// Contract ID
58    pub contract_id: String,
59    /// Contract version
60    pub version: String,
61    /// Message types
62    pub message_types: Vec<WebSocketMessageTypeRequest>,
63}
64
65/// Request for a WebSocket message type
66#[derive(Debug, Deserialize)]
67pub struct WebSocketMessageTypeRequest {
68    /// Message type identifier
69    pub message_type: String,
70    /// Optional topic or channel name
71    pub topic: Option<String>,
72    /// JSON schema for this message type
73    pub schema: serde_json::Value,
74    /// Direction: "inbound", "outbound", or "bidirectional"
75    pub direction: String,
76    /// Description of this message type
77    pub description: Option<String>,
78    /// Example message payload
79    pub example: Option<serde_json::Value>,
80}
81
82/// Request to create an MQTT contract
83#[derive(Debug, Deserialize)]
84pub struct CreateMqttContractRequest {
85    /// Contract ID
86    pub contract_id: String,
87    /// Contract version
88    pub version: String,
89    /// Topic schemas
90    pub topics: Vec<MqttTopicSchemaRequest>,
91}
92
93/// Request for an MQTT topic schema
94#[derive(Debug, Deserialize)]
95pub struct MqttTopicSchemaRequest {
96    /// Topic name
97    pub topic: String,
98    /// Quality of Service level (0, 1, or 2)
99    pub qos: Option<u8>,
100    /// JSON schema for messages on this topic
101    pub schema: serde_json::Value,
102    /// Whether messages are retained
103    pub retained: Option<bool>,
104    /// Description of this topic
105    pub description: Option<String>,
106    /// Example message payload
107    pub example: Option<serde_json::Value>,
108}
109
110/// Request to create a Kafka contract
111#[derive(Debug, Deserialize)]
112pub struct CreateKafkaContractRequest {
113    /// Contract ID
114    pub contract_id: String,
115    /// Contract version
116    pub version: String,
117    /// Topic schemas
118    pub topics: Vec<KafkaTopicSchemaRequest>,
119}
120
121/// Request for a Kafka topic schema
122#[derive(Debug, Deserialize)]
123pub struct KafkaTopicSchemaRequest {
124    /// Topic name
125    pub topic: String,
126    /// Key schema (optional)
127    pub key_schema: Option<TopicSchemaRequest>,
128    /// Value schema (required)
129    pub value_schema: TopicSchemaRequest,
130    /// Number of partitions
131    pub partitions: Option<u32>,
132    /// Replication factor
133    pub replication_factor: Option<u16>,
134    /// Description of this topic
135    pub description: Option<String>,
136    /// Evolution rules for schema changes
137    pub evolution_rules: Option<EvolutionRulesRequest>,
138}
139
140/// Request for a topic schema (key or value)
141#[derive(Debug, Deserialize)]
142pub struct TopicSchemaRequest {
143    /// Schema format: "json", "avro", or "protobuf"
144    pub format: String,
145    /// Schema definition
146    pub schema: serde_json::Value,
147    /// Schema registry ID (if using schema registry)
148    pub schema_id: Option<String>,
149    /// Schema version
150    pub version: Option<String>,
151}
152
153/// Request for evolution rules
154#[derive(Debug, Deserialize)]
155pub struct EvolutionRulesRequest {
156    /// Allow backward compatible changes
157    pub allow_backward_compatible: bool,
158    /// Allow forward compatible changes
159    pub allow_forward_compatible: bool,
160    /// Require explicit version bump for breaking changes
161    pub require_version_bump: bool,
162}
163
164/// Response for protocol contract operations
165#[derive(Debug, Serialize)]
166pub struct ProtocolContractResponse {
167    /// Contract ID
168    pub contract_id: String,
169    /// Contract version
170    pub version: String,
171    /// Protocol type
172    pub protocol: String,
173    /// Contract JSON representation
174    pub contract: serde_json::Value,
175}
176
177/// Response for listing contracts
178#[derive(Debug, Serialize)]
179pub struct ListContractsResponse {
180    /// List of contracts
181    pub contracts: Vec<ProtocolContractResponse>,
182    /// Total count
183    pub total: usize,
184}
185
186/// Request to compare contracts
187#[derive(Debug, Deserialize)]
188pub struct CompareContractsRequest {
189    /// Old contract ID
190    pub old_contract_id: String,
191    /// New contract ID
192    pub new_contract_id: String,
193}
194
195/// Request to validate a message
196#[derive(Debug, Deserialize)]
197pub struct ValidateMessageRequest {
198    /// Operation ID (endpoint, method, topic, etc.)
199    pub operation_id: String,
200    /// Message payload (base64 encoded or JSON)
201    pub payload: serde_json::Value,
202    /// Content type
203    pub content_type: Option<String>,
204    /// Additional metadata
205    pub metadata: Option<HashMap<String, String>>,
206}
207
208/// List all protocol contracts
209pub async fn list_contracts(
210    State(state): State<ProtocolContractState>,
211    Query(params): Query<HashMap<String, String>>,
212) -> Result<Json<ListContractsResponse>, (StatusCode, Json<serde_json::Value>)> {
213    let registry = state.registry.read().await;
214
215    let protocol_filter = params.get("protocol").and_then(|p| match p.as_str() {
216        "grpc" => Some(Protocol::Grpc),
217        "websocket" => Some(Protocol::WebSocket),
218        "mqtt" => Some(Protocol::Mqtt),
219        "kafka" => Some(Protocol::Kafka),
220        _ => None,
221    });
222
223    let contracts: Vec<ProtocolContractResponse> = if let Some(protocol) = protocol_filter {
224        registry
225            .list_by_protocol(protocol)
226            .iter()
227            .map(|contract| {
228                let contract_json = contract.to_json().unwrap_or_else(|_| serde_json::json!({}));
229                ProtocolContractResponse {
230                    contract_id: contract.contract_id().to_string(),
231                    version: contract.version().to_string(),
232                    protocol: format!("{:?}", contract.protocol()).to_lowercase(),
233                    contract: contract_json,
234                }
235            })
236            .collect()
237    } else {
238        registry
239            .list()
240            .iter()
241            .map(|contract| {
242                let contract_json = contract.to_json().unwrap_or_else(|_| serde_json::json!({}));
243                ProtocolContractResponse {
244                    contract_id: contract.contract_id().to_string(),
245                    version: contract.version().to_string(),
246                    protocol: format!("{:?}", contract.protocol()).to_lowercase(),
247                    contract: contract_json,
248                }
249            })
250            .collect()
251    };
252
253    Ok(Json(ListContractsResponse {
254        total: contracts.len(),
255        contracts,
256    }))
257}
258
259/// Get a specific contract
260pub async fn get_contract(
261    State(state): State<ProtocolContractState>,
262    Path(contract_id): Path<String>,
263) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
264    let registry = state.registry.read().await;
265
266    let contract = registry.get(&contract_id).ok_or_else(|| {
267        (
268            StatusCode::NOT_FOUND,
269            Json(serde_json::json!({
270                "error": "Contract not found",
271                "contract_id": contract_id
272            })),
273        )
274    })?;
275
276    let contract_json = contract.to_json().map_err(|e| {
277        (
278            StatusCode::INTERNAL_SERVER_ERROR,
279            Json(serde_json::json!({
280                "error": "Failed to serialize contract",
281                "message": e.to_string()
282            })),
283        )
284    })?;
285
286    Ok(Json(ProtocolContractResponse {
287        contract_id: contract.contract_id().to_string(),
288        version: contract.version().to_string(),
289        protocol: format!("{:?}", contract.protocol()).to_lowercase(),
290        contract: contract_json,
291    }))
292}
293
294/// Create a gRPC contract
295pub async fn create_grpc_contract(
296    State(state): State<ProtocolContractState>,
297    Json(request): Json<CreateGrpcContractRequest>,
298) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
299    // Decode base64 descriptor set
300    let descriptor_bytes =
301        general_purpose::STANDARD.decode(&request.descriptor_set).map_err(|e| {
302            (
303                StatusCode::BAD_REQUEST,
304                Json(serde_json::json!({
305                    "error": "Invalid base64 descriptor set",
306                    "message": e.to_string()
307                })),
308            )
309        })?;
310
311    // Create descriptor pool from bytes
312    // Note: GrpcContract::from_descriptor_set handles the descriptor pool creation
313    let contract = GrpcContract::from_descriptor_set(
314        request.contract_id.clone(),
315        request.version.clone(),
316        &descriptor_bytes,
317    )
318    .map_err(|e| {
319        (
320            StatusCode::BAD_REQUEST,
321            Json(serde_json::json!({
322                "error": "Failed to create gRPC contract",
323                "message": e.to_string()
324            })),
325        )
326    })?;
327
328    // Register contract
329    let mut registry = state.registry.write().await;
330    registry.register(Box::new(contract));
331
332    let contract = registry.get(&request.contract_id).unwrap();
333    let contract_json = contract.to_json().map_err(|e| {
334        (
335            StatusCode::INTERNAL_SERVER_ERROR,
336            Json(serde_json::json!({
337                "error": "Failed to serialize contract",
338                "message": e.to_string()
339            })),
340        )
341    })?;
342
343    Ok(Json(ProtocolContractResponse {
344        contract_id: request.contract_id,
345        version: request.version,
346        protocol: "grpc".to_string(),
347        contract: contract_json,
348    }))
349}
350
351/// Create a WebSocket contract
352pub async fn create_websocket_contract(
353    State(state): State<ProtocolContractState>,
354    Json(request): Json<CreateWebSocketContractRequest>,
355) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
356    let mut contract = WebSocketContract::new(request.contract_id.clone(), request.version.clone());
357
358    // Add message types
359    for msg_type_req in request.message_types {
360        let direction = match msg_type_req.direction.as_str() {
361            "inbound" => mockforge_core::contract_drift::MessageDirection::Inbound,
362            "outbound" => mockforge_core::contract_drift::MessageDirection::Outbound,
363            "bidirectional" => mockforge_core::contract_drift::MessageDirection::Bidirectional,
364            _ => {
365                return Err((
366                    StatusCode::BAD_REQUEST,
367                    Json(serde_json::json!({
368                        "error": "Invalid direction",
369                        "message": "Direction must be 'inbound', 'outbound', or 'bidirectional'"
370                    })),
371                ));
372            }
373        };
374
375        let message_type = WebSocketMessageType {
376            message_type: msg_type_req.message_type,
377            topic: msg_type_req.topic,
378            schema: msg_type_req.schema,
379            direction,
380            description: msg_type_req.description,
381            example: msg_type_req.example,
382        };
383
384        contract.add_message_type(message_type).map_err(|e| {
385            (
386                StatusCode::BAD_REQUEST,
387                Json(serde_json::json!({
388                    "error": "Failed to add message type",
389                    "message": e.to_string()
390                })),
391            )
392        })?;
393    }
394
395    // Register contract
396    let mut registry = state.registry.write().await;
397    registry.register(Box::new(contract));
398
399    let contract = registry.get(&request.contract_id).unwrap();
400    let contract_json = contract.to_json().map_err(|e| {
401        (
402            StatusCode::INTERNAL_SERVER_ERROR,
403            Json(serde_json::json!({
404                "error": "Failed to serialize contract",
405                "message": e.to_string()
406            })),
407        )
408    })?;
409
410    Ok(Json(ProtocolContractResponse {
411        contract_id: request.contract_id,
412        version: request.version,
413        protocol: "websocket".to_string(),
414        contract: contract_json,
415    }))
416}
417
418/// Create an MQTT contract
419pub async fn create_mqtt_contract(
420    State(state): State<ProtocolContractState>,
421    Json(request): Json<CreateMqttContractRequest>,
422) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
423    let mut contract = MqttContract::new(request.contract_id.clone(), request.version.clone());
424
425    // Add topics
426    for topic_req in request.topics {
427        let topic_schema = MqttTopicSchema {
428            topic: topic_req.topic,
429            qos: topic_req.qos,
430            schema: topic_req.schema,
431            retained: topic_req.retained,
432            description: topic_req.description,
433            example: topic_req.example,
434        };
435
436        contract.add_topic(topic_schema).map_err(|e| {
437            (
438                StatusCode::BAD_REQUEST,
439                Json(serde_json::json!({
440                    "error": "Failed to add topic",
441                    "message": e.to_string()
442                })),
443            )
444        })?;
445    }
446
447    // Register contract
448    let mut registry = state.registry.write().await;
449    registry.register(Box::new(contract));
450
451    let contract = registry.get(&request.contract_id).unwrap();
452    let contract_json = contract.to_json().map_err(|e| {
453        (
454            StatusCode::INTERNAL_SERVER_ERROR,
455            Json(serde_json::json!({
456                "error": "Failed to serialize contract",
457                "message": e.to_string()
458            })),
459        )
460    })?;
461
462    Ok(Json(ProtocolContractResponse {
463        contract_id: request.contract_id,
464        version: request.version,
465        protocol: "mqtt".to_string(),
466        contract: contract_json,
467    }))
468}
469
470/// Create a Kafka contract
471pub async fn create_kafka_contract(
472    State(state): State<ProtocolContractState>,
473    Json(request): Json<CreateKafkaContractRequest>,
474) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
475    let mut contract = KafkaContract::new(request.contract_id.clone(), request.version.clone());
476
477    // Add topics
478    for topic_req in request.topics {
479        let format = match topic_req.value_schema.format.as_str() {
480            "json" => SchemaFormat::Json,
481            "avro" => SchemaFormat::Avro,
482            "protobuf" => SchemaFormat::Protobuf,
483            _ => {
484                return Err((
485                    StatusCode::BAD_REQUEST,
486                    Json(serde_json::json!({
487                        "error": "Invalid schema format",
488                        "message": "Format must be 'json', 'avro', or 'protobuf'"
489                    })),
490                ));
491            }
492        };
493
494        let value_schema = TopicSchema {
495            format,
496            schema: topic_req.value_schema.schema,
497            schema_id: topic_req.value_schema.schema_id,
498            version: topic_req.value_schema.version,
499        };
500
501        let key_schema = topic_req.key_schema.map(|ks_req| {
502            let format = match ks_req.format.as_str() {
503                "json" => SchemaFormat::Json,
504                "avro" => SchemaFormat::Avro,
505                "protobuf" => SchemaFormat::Protobuf,
506                _ => SchemaFormat::Json, // Default to JSON
507            };
508
509            TopicSchema {
510                format,
511                schema: ks_req.schema,
512                schema_id: ks_req.schema_id,
513                version: ks_req.version,
514            }
515        });
516
517        let evolution_rules = topic_req.evolution_rules.map(|er_req| {
518            mockforge_core::contract_drift::EvolutionRules {
519                allow_backward_compatible: er_req.allow_backward_compatible,
520                allow_forward_compatible: er_req.allow_forward_compatible,
521                require_version_bump: er_req.require_version_bump,
522            }
523        });
524
525        let topic_schema = KafkaTopicSchema {
526            topic: topic_req.topic,
527            key_schema,
528            value_schema,
529            partitions: topic_req.partitions,
530            replication_factor: topic_req.replication_factor,
531            description: topic_req.description,
532            evolution_rules,
533        };
534
535        contract.add_topic(topic_schema).map_err(|e| {
536            (
537                StatusCode::BAD_REQUEST,
538                Json(serde_json::json!({
539                    "error": "Failed to add topic",
540                    "message": e.to_string()
541                })),
542            )
543        })?;
544    }
545
546    // Register contract
547    let mut registry = state.registry.write().await;
548    registry.register(Box::new(contract));
549
550    let contract = registry.get(&request.contract_id).unwrap();
551    let contract_json = contract.to_json().map_err(|e| {
552        (
553            StatusCode::INTERNAL_SERVER_ERROR,
554            Json(serde_json::json!({
555                "error": "Failed to serialize contract",
556                "message": e.to_string()
557            })),
558        )
559    })?;
560
561    Ok(Json(ProtocolContractResponse {
562        contract_id: request.contract_id,
563        version: request.version,
564        protocol: "kafka".to_string(),
565        contract: contract_json,
566    }))
567}
568
569/// Delete a contract
570pub async fn delete_contract(
571    State(state): State<ProtocolContractState>,
572    Path(contract_id): Path<String>,
573) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
574    let mut registry = state.registry.write().await;
575
576    registry.remove(&contract_id).ok_or_else(|| {
577        (
578            StatusCode::NOT_FOUND,
579            Json(serde_json::json!({
580                "error": "Contract not found",
581                "contract_id": contract_id
582            })),
583        )
584    })?;
585
586    Ok(Json(serde_json::json!({
587        "message": "Contract deleted",
588        "contract_id": contract_id
589    })))
590}
591
592/// Compare two contracts
593pub async fn compare_contracts_handler(
594    State(state): State<ProtocolContractState>,
595    Json(request): Json<CompareContractsRequest>,
596) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
597    let registry = state.registry.read().await;
598
599    let old_contract = registry.get(&request.old_contract_id).ok_or_else(|| {
600        (
601            StatusCode::NOT_FOUND,
602            Json(serde_json::json!({
603                "error": "Old contract not found",
604                "contract_id": request.old_contract_id
605            })),
606        )
607    })?;
608
609    let new_contract = registry.get(&request.new_contract_id).ok_or_else(|| {
610        (
611            StatusCode::NOT_FOUND,
612            Json(serde_json::json!({
613                "error": "New contract not found",
614                "contract_id": request.new_contract_id
615            })),
616        )
617    })?;
618
619    let diff_result = compare_contracts(old_contract, new_contract).await.map_err(|e| {
620        (
621            StatusCode::BAD_REQUEST,
622            Json(serde_json::json!({
623                "error": "Failed to compare contracts",
624                "message": e.to_string()
625            })),
626        )
627    })?;
628
629    // Evaluate drift and create incidents if drift engine is available
630    let mut drift_evaluation = None;
631    if let (Some(ref drift_engine), Some(ref incident_manager)) =
632        (&state.drift_engine, &state.incident_manager)
633    {
634        // Get protocol type
635        let protocol = new_contract.protocol();
636
637        // For each operation in the contract, evaluate drift
638        let operations = new_contract.operations();
639        for operation in operations {
640            let operation_id = &operation.id;
641
642            // Determine endpoint and method from operation type
643            let (endpoint, method) = match &operation.operation_type {
644                mockforge_core::contract_drift::protocol_contracts::OperationType::HttpEndpoint { path, method } => {
645                    (path.clone(), method.clone())
646                }
647                mockforge_core::contract_drift::protocol_contracts::OperationType::GrpcMethod { service, method } => {
648                    (format!("{}.{}", service, method), "grpc".to_string())
649                }
650                mockforge_core::contract_drift::protocol_contracts::OperationType::WebSocketMessage { message_type, .. } => {
651                    (message_type.clone(), "websocket".to_string())
652                }
653                mockforge_core::contract_drift::protocol_contracts::OperationType::MqttTopic { topic, qos: _ } => {
654                    (topic.clone(), "mqtt".to_string())
655                }
656                mockforge_core::contract_drift::protocol_contracts::OperationType::KafkaTopic { topic, key_schema: _, value_schema: _ } => {
657                    (topic.clone(), "kafka".to_string())
658                }
659            };
660
661            // Evaluate drift budget (for protocol contracts, we need to use evaluate_with_specs equivalent)
662            // Since we don't have OpenAPI specs for protocol contracts, we'll use a simplified evaluation
663            // that works with the diff result directly
664            let drift_result = drift_engine.evaluate(&diff_result, &endpoint, &method);
665
666            // Run fitness tests if registry is available
667            let mut drift_result_with_fitness = drift_result.clone();
668            if let Some(ref fitness_registry) = state.fitness_registry {
669                let rt = tokio::runtime::Handle::try_current();
670                if let Ok(handle) = rt {
671                    let fitness_results = handle.block_on(async {
672                        let guard = fitness_registry.read().await;
673                        guard.evaluate_all_protocol(
674                            Some(old_contract),
675                            new_contract,
676                            &diff_result,
677                            operation_id,
678                            None, // workspace_id
679                            None, // service_name
680                        )
681                    });
682                    if let Ok(results) = fitness_results {
683                        drift_result_with_fitness.fitness_test_results = results;
684                        if drift_result_with_fitness.fitness_test_results.iter().any(|r| !r.passed)
685                        {
686                            drift_result_with_fitness.should_create_incident = true;
687                        }
688                    }
689                }
690            }
691
692            // Analyze consumer impact if analyzer is available
693            // Use operation_id for more flexible protocol-specific matching
694            if let Some(ref consumer_analyzer) = state.consumer_analyzer {
695                let rt = tokio::runtime::Handle::try_current();
696                if let Ok(handle) = rt {
697                    let impact = handle.block_on(async {
698                        let guard = consumer_analyzer.read().await;
699                        // Use analyze_impact_with_operation_id for better protocol support
700                        guard.analyze_impact_with_operation_id(
701                            &endpoint,
702                            &method,
703                            Some(operation_id),
704                        )
705                    });
706                    if let Some(impact) = impact {
707                        drift_result_with_fitness.consumer_impact = Some(impact);
708                    }
709                }
710            }
711
712            // Create incident if budget is exceeded or breaking changes detected
713            if drift_result_with_fitness.should_create_incident {
714                let incident_type = if drift_result_with_fitness.breaking_changes > 0 {
715                    mockforge_core::incidents::types::IncidentType::BreakingChange
716                } else {
717                    mockforge_core::incidents::types::IncidentType::ThresholdExceeded
718                };
719
720                let severity = if drift_result_with_fitness.breaking_changes > 0 {
721                    mockforge_core::incidents::types::IncidentSeverity::High
722                } else if drift_result_with_fitness.potentially_breaking_changes > 0 {
723                    mockforge_core::incidents::types::IncidentSeverity::Medium
724                } else {
725                    mockforge_core::incidents::types::IncidentSeverity::Low
726                };
727
728                let details = serde_json::json!({
729                    "breaking_changes": drift_result_with_fitness.breaking_changes,
730                    "potentially_breaking_changes": drift_result_with_fitness.potentially_breaking_changes,
731                    "non_breaking_changes": drift_result_with_fitness.non_breaking_changes,
732                    "budget_exceeded": drift_result_with_fitness.budget_exceeded,
733                    "operation_id": operation_id,
734                    "operation_type": format!("{:?}", operation.operation_type),
735                });
736
737                let before_sample = Some(serde_json::json!({
738                    "contract_id": old_contract.contract_id(),
739                    "version": old_contract.version(),
740                    "protocol": format!("{:?}", old_contract.protocol()),
741                    "operation_id": operation_id,
742                }));
743
744                let after_sample = Some(serde_json::json!({
745                    "contract_id": new_contract.contract_id(),
746                    "version": new_contract.version(),
747                    "protocol": format!("{:?}", new_contract.protocol()),
748                    "operation_id": operation_id,
749                    "mismatches": diff_result.mismatches,
750                }));
751
752                let _incident = incident_manager
753                    .create_incident_with_samples(
754                        endpoint.clone(),
755                        method.clone(),
756                        incident_type,
757                        severity,
758                        details,
759                        None, // budget_id
760                        None, // workspace_id
761                        None, // sync_cycle_id
762                        None, // contract_diff_id
763                        before_sample,
764                        after_sample,
765                        Some(drift_result_with_fitness.fitness_test_results.clone()),
766                        drift_result_with_fitness.consumer_impact.clone(),
767                        Some(protocol),
768                    )
769                    .await;
770            }
771
772            drift_evaluation = Some(serde_json::json!({
773                "operation_id": operation_id,
774                "endpoint": endpoint,
775                "method": method,
776                "budget_exceeded": drift_result_with_fitness.budget_exceeded,
777                "breaking_changes": drift_result_with_fitness.breaking_changes,
778                "fitness_test_results": drift_result_with_fitness.fitness_test_results,
779                "consumer_impact": drift_result_with_fitness.consumer_impact,
780            }));
781        }
782    }
783
784    Ok(Json(serde_json::json!({
785        "matches": diff_result.matches,
786        "confidence": diff_result.confidence,
787        "mismatches": diff_result.mismatches,
788        "recommendations": diff_result.recommendations,
789        "corrections": diff_result.corrections,
790        "drift_evaluation": drift_evaluation,
791    })))
792}
793
794/// Validate a message against a contract
795pub async fn validate_message(
796    State(state): State<ProtocolContractState>,
797    Path(contract_id): Path<String>,
798    Json(request): Json<ValidateMessageRequest>,
799) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
800    let registry = state.registry.read().await;
801
802    let contract = registry.get(&contract_id).ok_or_else(|| {
803        (
804            StatusCode::NOT_FOUND,
805            Json(serde_json::json!({
806                "error": "Contract not found",
807                "contract_id": contract_id
808            })),
809        )
810    })?;
811
812    // Convert payload to bytes
813    let payload_bytes = match request.payload {
814        serde_json::Value::String(s) => {
815            // Try base64 decode first, then fall back to UTF-8
816            general_purpose::STANDARD.decode(&s).unwrap_or_else(|_| s.into_bytes())
817        }
818        _ => serde_json::to_vec(&request.payload).map_err(|e| {
819            (
820                StatusCode::BAD_REQUEST,
821                Json(serde_json::json!({
822                    "error": "Failed to serialize payload",
823                    "message": e.to_string()
824                })),
825            )
826        })?,
827    };
828
829    let contract_request = mockforge_core::contract_drift::protocol_contracts::ContractRequest {
830        protocol: contract.protocol(),
831        operation_id: request.operation_id.clone(),
832        payload: payload_bytes,
833        content_type: request.content_type,
834        metadata: request.metadata.unwrap_or_default(),
835    };
836
837    let validation_result =
838        contract.validate(&request.operation_id, &contract_request).await.map_err(|e| {
839            (
840                StatusCode::BAD_REQUEST,
841                Json(serde_json::json!({
842                    "error": "Validation failed",
843                    "message": e.to_string()
844                })),
845            )
846        })?;
847
848    Ok(Json(serde_json::json!({
849        "valid": validation_result.valid,
850        "errors": validation_result.errors,
851        "warnings": validation_result.warnings,
852    })))
853}
854
855/// Get contract router
856pub fn protocol_contracts_router(state: ProtocolContractState) -> axum::Router {
857    use axum::routing::{delete, get, post};
858
859    axum::Router::new()
860        .route("/", get(list_contracts))
861        .route("/{contract_id}", get(get_contract))
862        .route("/{contract_id}", delete(delete_contract))
863        .route("/grpc", post(create_grpc_contract))
864        .route("/websocket", post(create_websocket_contract))
865        .route("/mqtt", post(create_mqtt_contract))
866        .route("/kafka", post(create_kafka_contract))
867        .route("/compare", post(compare_contracts_handler))
868        .route("/{contract_id}/validate", post(validate_message))
869        .with_state(state)
870}