1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::Json,
9};
10use mockforge_core::contract_drift::protocol_contracts::{
11 compare_contracts, ProtocolContractRegistry,
12};
13use mockforge_core::contract_drift::{
14 GrpcContract, KafkaContract, KafkaTopicSchema, MqttContract, MqttTopicSchema, SchemaFormat,
15 TopicSchema, WebSocketContract, WebSocketMessageType,
16};
17use mockforge_core::protocol_abstraction::Protocol;
18use serde::{Deserialize, Serialize};
19use std::collections::HashMap;
20use std::sync::Arc;
21use tokio::sync::RwLock;
22
23use base64::{engine::general_purpose, Engine as _};
25
26#[derive(Clone)]
28pub struct ProtocolContractState {
29 pub registry: Arc<RwLock<ProtocolContractRegistry>>,
31 pub drift_engine: Option<Arc<mockforge_core::contract_drift::DriftBudgetEngine>>,
33 pub incident_manager: Option<Arc<mockforge_core::incidents::IncidentManager>>,
35 pub fitness_registry:
37 Option<Arc<RwLock<mockforge_core::contract_drift::FitnessFunctionRegistry>>>,
38 pub consumer_analyzer:
40 Option<Arc<RwLock<mockforge_core::contract_drift::ConsumerImpactAnalyzer>>>,
41}
42
43#[derive(Debug, Deserialize)]
45pub struct CreateGrpcContractRequest {
46 pub contract_id: String,
48 pub version: String,
50 pub descriptor_set: String,
52}
53
54#[derive(Debug, Deserialize)]
56pub struct CreateWebSocketContractRequest {
57 pub contract_id: String,
59 pub version: String,
61 pub message_types: Vec<WebSocketMessageTypeRequest>,
63}
64
65#[derive(Debug, Deserialize)]
67pub struct WebSocketMessageTypeRequest {
68 pub message_type: String,
70 pub topic: Option<String>,
72 pub schema: serde_json::Value,
74 pub direction: String,
76 pub description: Option<String>,
78 pub example: Option<serde_json::Value>,
80}
81
82#[derive(Debug, Deserialize)]
84pub struct CreateMqttContractRequest {
85 pub contract_id: String,
87 pub version: String,
89 pub topics: Vec<MqttTopicSchemaRequest>,
91}
92
93#[derive(Debug, Deserialize)]
95pub struct MqttTopicSchemaRequest {
96 pub topic: String,
98 pub qos: Option<u8>,
100 pub schema: serde_json::Value,
102 pub retained: Option<bool>,
104 pub description: Option<String>,
106 pub example: Option<serde_json::Value>,
108}
109
110#[derive(Debug, Deserialize)]
112pub struct CreateKafkaContractRequest {
113 pub contract_id: String,
115 pub version: String,
117 pub topics: Vec<KafkaTopicSchemaRequest>,
119}
120
121#[derive(Debug, Deserialize)]
123pub struct KafkaTopicSchemaRequest {
124 pub topic: String,
126 pub key_schema: Option<TopicSchemaRequest>,
128 pub value_schema: TopicSchemaRequest,
130 pub partitions: Option<u32>,
132 pub replication_factor: Option<u16>,
134 pub description: Option<String>,
136 pub evolution_rules: Option<EvolutionRulesRequest>,
138}
139
140#[derive(Debug, Deserialize)]
142pub struct TopicSchemaRequest {
143 pub format: String,
145 pub schema: serde_json::Value,
147 pub schema_id: Option<String>,
149 pub version: Option<String>,
151}
152
153#[derive(Debug, Deserialize)]
155pub struct EvolutionRulesRequest {
156 pub allow_backward_compatible: bool,
158 pub allow_forward_compatible: bool,
160 pub require_version_bump: bool,
162}
163
164#[derive(Debug, Serialize)]
166pub struct ProtocolContractResponse {
167 pub contract_id: String,
169 pub version: String,
171 pub protocol: String,
173 pub contract: serde_json::Value,
175}
176
177#[derive(Debug, Serialize)]
179pub struct ListContractsResponse {
180 pub contracts: Vec<ProtocolContractResponse>,
182 pub total: usize,
184}
185
186#[derive(Debug, Deserialize)]
188pub struct CompareContractsRequest {
189 pub old_contract_id: String,
191 pub new_contract_id: String,
193}
194
195#[derive(Debug, Deserialize)]
197pub struct ValidateMessageRequest {
198 pub operation_id: String,
200 pub payload: serde_json::Value,
202 pub content_type: Option<String>,
204 pub metadata: Option<HashMap<String, String>>,
206}
207
208pub async fn list_contracts(
210 State(state): State<ProtocolContractState>,
211 Query(params): Query<HashMap<String, String>>,
212) -> Result<Json<ListContractsResponse>, (StatusCode, Json<serde_json::Value>)> {
213 let registry = state.registry.read().await;
214
215 let protocol_filter = params.get("protocol").and_then(|p| match p.as_str() {
216 "grpc" => Some(Protocol::Grpc),
217 "websocket" => Some(Protocol::WebSocket),
218 "mqtt" => Some(Protocol::Mqtt),
219 "kafka" => Some(Protocol::Kafka),
220 _ => None,
221 });
222
223 let contracts: Vec<ProtocolContractResponse> = if let Some(protocol) = protocol_filter {
224 registry
225 .list_by_protocol(protocol)
226 .iter()
227 .map(|contract| {
228 let contract_json = contract.to_json().unwrap_or_else(|_| serde_json::json!({}));
229 ProtocolContractResponse {
230 contract_id: contract.contract_id().to_string(),
231 version: contract.version().to_string(),
232 protocol: format!("{:?}", contract.protocol()).to_lowercase(),
233 contract: contract_json,
234 }
235 })
236 .collect()
237 } else {
238 registry
239 .list()
240 .iter()
241 .map(|contract| {
242 let contract_json = contract.to_json().unwrap_or_else(|_| serde_json::json!({}));
243 ProtocolContractResponse {
244 contract_id: contract.contract_id().to_string(),
245 version: contract.version().to_string(),
246 protocol: format!("{:?}", contract.protocol()).to_lowercase(),
247 contract: contract_json,
248 }
249 })
250 .collect()
251 };
252
253 Ok(Json(ListContractsResponse {
254 total: contracts.len(),
255 contracts,
256 }))
257}
258
259pub async fn get_contract(
261 State(state): State<ProtocolContractState>,
262 Path(contract_id): Path<String>,
263) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
264 let registry = state.registry.read().await;
265
266 let contract = registry.get(&contract_id).ok_or_else(|| {
267 (
268 StatusCode::NOT_FOUND,
269 Json(serde_json::json!({
270 "error": "Contract not found",
271 "contract_id": contract_id
272 })),
273 )
274 })?;
275
276 let contract_json = contract.to_json().map_err(|e| {
277 (
278 StatusCode::INTERNAL_SERVER_ERROR,
279 Json(serde_json::json!({
280 "error": "Failed to serialize contract",
281 "message": e.to_string()
282 })),
283 )
284 })?;
285
286 Ok(Json(ProtocolContractResponse {
287 contract_id: contract.contract_id().to_string(),
288 version: contract.version().to_string(),
289 protocol: format!("{:?}", contract.protocol()).to_lowercase(),
290 contract: contract_json,
291 }))
292}
293
294pub async fn create_grpc_contract(
296 State(state): State<ProtocolContractState>,
297 Json(request): Json<CreateGrpcContractRequest>,
298) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
299 let descriptor_bytes =
301 general_purpose::STANDARD.decode(&request.descriptor_set).map_err(|e| {
302 (
303 StatusCode::BAD_REQUEST,
304 Json(serde_json::json!({
305 "error": "Invalid base64 descriptor set",
306 "message": e.to_string()
307 })),
308 )
309 })?;
310
311 let contract = GrpcContract::from_descriptor_set(
314 request.contract_id.clone(),
315 request.version.clone(),
316 &descriptor_bytes,
317 )
318 .map_err(|e| {
319 (
320 StatusCode::BAD_REQUEST,
321 Json(serde_json::json!({
322 "error": "Failed to create gRPC contract",
323 "message": e.to_string()
324 })),
325 )
326 })?;
327
328 let mut registry = state.registry.write().await;
330 registry.register(Box::new(contract));
331
332 let contract = registry.get(&request.contract_id).ok_or_else(|| {
333 (
334 StatusCode::INTERNAL_SERVER_ERROR,
335 Json(serde_json::json!({
336 "error": "Failed to retrieve registered contract",
337 "contract_id": request.contract_id
338 })),
339 )
340 })?;
341 let contract_json = contract.to_json().map_err(|e| {
342 (
343 StatusCode::INTERNAL_SERVER_ERROR,
344 Json(serde_json::json!({
345 "error": "Failed to serialize contract",
346 "message": e.to_string()
347 })),
348 )
349 })?;
350
351 Ok(Json(ProtocolContractResponse {
352 contract_id: request.contract_id,
353 version: request.version,
354 protocol: "grpc".to_string(),
355 contract: contract_json,
356 }))
357}
358
359pub async fn create_websocket_contract(
361 State(state): State<ProtocolContractState>,
362 Json(request): Json<CreateWebSocketContractRequest>,
363) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
364 let mut contract = WebSocketContract::new(request.contract_id.clone(), request.version.clone());
365
366 for msg_type_req in request.message_types {
368 let direction = match msg_type_req.direction.as_str() {
369 "inbound" => mockforge_core::contract_drift::MessageDirection::Inbound,
370 "outbound" => mockforge_core::contract_drift::MessageDirection::Outbound,
371 "bidirectional" => mockforge_core::contract_drift::MessageDirection::Bidirectional,
372 _ => {
373 return Err((
374 StatusCode::BAD_REQUEST,
375 Json(serde_json::json!({
376 "error": "Invalid direction",
377 "message": "Direction must be 'inbound', 'outbound', or 'bidirectional'"
378 })),
379 ));
380 }
381 };
382
383 let message_type = WebSocketMessageType {
384 message_type: msg_type_req.message_type,
385 topic: msg_type_req.topic,
386 schema: msg_type_req.schema,
387 direction,
388 description: msg_type_req.description,
389 example: msg_type_req.example,
390 };
391
392 contract.add_message_type(message_type).map_err(|e| {
393 (
394 StatusCode::BAD_REQUEST,
395 Json(serde_json::json!({
396 "error": "Failed to add message type",
397 "message": e.to_string()
398 })),
399 )
400 })?;
401 }
402
403 let mut registry = state.registry.write().await;
405 registry.register(Box::new(contract));
406
407 let contract = registry.get(&request.contract_id).ok_or_else(|| {
408 (
409 StatusCode::INTERNAL_SERVER_ERROR,
410 Json(serde_json::json!({
411 "error": "Failed to retrieve registered contract",
412 "contract_id": request.contract_id
413 })),
414 )
415 })?;
416 let contract_json = contract.to_json().map_err(|e| {
417 (
418 StatusCode::INTERNAL_SERVER_ERROR,
419 Json(serde_json::json!({
420 "error": "Failed to serialize contract",
421 "message": e.to_string()
422 })),
423 )
424 })?;
425
426 Ok(Json(ProtocolContractResponse {
427 contract_id: request.contract_id,
428 version: request.version,
429 protocol: "websocket".to_string(),
430 contract: contract_json,
431 }))
432}
433
434pub async fn create_mqtt_contract(
436 State(state): State<ProtocolContractState>,
437 Json(request): Json<CreateMqttContractRequest>,
438) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
439 let mut contract = MqttContract::new(request.contract_id.clone(), request.version.clone());
440
441 for topic_req in request.topics {
443 let topic_schema = MqttTopicSchema {
444 topic: topic_req.topic,
445 qos: topic_req.qos,
446 schema: topic_req.schema,
447 retained: topic_req.retained,
448 description: topic_req.description,
449 example: topic_req.example,
450 };
451
452 contract.add_topic(topic_schema).map_err(|e| {
453 (
454 StatusCode::BAD_REQUEST,
455 Json(serde_json::json!({
456 "error": "Failed to add topic",
457 "message": e.to_string()
458 })),
459 )
460 })?;
461 }
462
463 let mut registry = state.registry.write().await;
465 registry.register(Box::new(contract));
466
467 let contract = registry.get(&request.contract_id).ok_or_else(|| {
468 (
469 StatusCode::INTERNAL_SERVER_ERROR,
470 Json(serde_json::json!({
471 "error": "Failed to retrieve registered contract",
472 "contract_id": request.contract_id
473 })),
474 )
475 })?;
476 let contract_json = contract.to_json().map_err(|e| {
477 (
478 StatusCode::INTERNAL_SERVER_ERROR,
479 Json(serde_json::json!({
480 "error": "Failed to serialize contract",
481 "message": e.to_string()
482 })),
483 )
484 })?;
485
486 Ok(Json(ProtocolContractResponse {
487 contract_id: request.contract_id,
488 version: request.version,
489 protocol: "mqtt".to_string(),
490 contract: contract_json,
491 }))
492}
493
494pub async fn create_kafka_contract(
496 State(state): State<ProtocolContractState>,
497 Json(request): Json<CreateKafkaContractRequest>,
498) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
499 let mut contract = KafkaContract::new(request.contract_id.clone(), request.version.clone());
500
501 for topic_req in request.topics {
503 let format = match topic_req.value_schema.format.as_str() {
504 "json" => SchemaFormat::Json,
505 "avro" => SchemaFormat::Avro,
506 "protobuf" => SchemaFormat::Protobuf,
507 _ => {
508 return Err((
509 StatusCode::BAD_REQUEST,
510 Json(serde_json::json!({
511 "error": "Invalid schema format",
512 "message": "Format must be 'json', 'avro', or 'protobuf'"
513 })),
514 ));
515 }
516 };
517
518 let value_schema = TopicSchema {
519 format,
520 schema: topic_req.value_schema.schema,
521 schema_id: topic_req.value_schema.schema_id,
522 version: topic_req.value_schema.version,
523 };
524
525 let key_schema = topic_req.key_schema.map(|ks_req| {
526 let format = match ks_req.format.as_str() {
527 "json" => SchemaFormat::Json,
528 "avro" => SchemaFormat::Avro,
529 "protobuf" => SchemaFormat::Protobuf,
530 _ => SchemaFormat::Json, };
532
533 TopicSchema {
534 format,
535 schema: ks_req.schema,
536 schema_id: ks_req.schema_id,
537 version: ks_req.version,
538 }
539 });
540
541 let evolution_rules = topic_req.evolution_rules.map(|er_req| {
542 mockforge_core::contract_drift::EvolutionRules {
543 allow_backward_compatible: er_req.allow_backward_compatible,
544 allow_forward_compatible: er_req.allow_forward_compatible,
545 require_version_bump: er_req.require_version_bump,
546 }
547 });
548
549 let topic_schema = KafkaTopicSchema {
550 topic: topic_req.topic,
551 key_schema,
552 value_schema,
553 partitions: topic_req.partitions,
554 replication_factor: topic_req.replication_factor,
555 description: topic_req.description,
556 evolution_rules,
557 };
558
559 contract.add_topic(topic_schema).map_err(|e| {
560 (
561 StatusCode::BAD_REQUEST,
562 Json(serde_json::json!({
563 "error": "Failed to add topic",
564 "message": e.to_string()
565 })),
566 )
567 })?;
568 }
569
570 let mut registry = state.registry.write().await;
572 registry.register(Box::new(contract));
573
574 let contract = registry.get(&request.contract_id).ok_or_else(|| {
575 (
576 StatusCode::INTERNAL_SERVER_ERROR,
577 Json(serde_json::json!({
578 "error": "Failed to retrieve registered contract",
579 "contract_id": request.contract_id
580 })),
581 )
582 })?;
583 let contract_json = contract.to_json().map_err(|e| {
584 (
585 StatusCode::INTERNAL_SERVER_ERROR,
586 Json(serde_json::json!({
587 "error": "Failed to serialize contract",
588 "message": e.to_string()
589 })),
590 )
591 })?;
592
593 Ok(Json(ProtocolContractResponse {
594 contract_id: request.contract_id,
595 version: request.version,
596 protocol: "kafka".to_string(),
597 contract: contract_json,
598 }))
599}
600
601pub async fn delete_contract(
603 State(state): State<ProtocolContractState>,
604 Path(contract_id): Path<String>,
605) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
606 let mut registry = state.registry.write().await;
607
608 registry.remove(&contract_id).ok_or_else(|| {
609 (
610 StatusCode::NOT_FOUND,
611 Json(serde_json::json!({
612 "error": "Contract not found",
613 "contract_id": contract_id
614 })),
615 )
616 })?;
617
618 Ok(Json(serde_json::json!({
619 "message": "Contract deleted",
620 "contract_id": contract_id
621 })))
622}
623
624pub async fn compare_contracts_handler(
626 State(state): State<ProtocolContractState>,
627 Json(request): Json<CompareContractsRequest>,
628) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
629 let registry = state.registry.read().await;
630
631 let old_contract = registry.get(&request.old_contract_id).ok_or_else(|| {
632 (
633 StatusCode::NOT_FOUND,
634 Json(serde_json::json!({
635 "error": "Old contract not found",
636 "contract_id": request.old_contract_id
637 })),
638 )
639 })?;
640
641 let new_contract = registry.get(&request.new_contract_id).ok_or_else(|| {
642 (
643 StatusCode::NOT_FOUND,
644 Json(serde_json::json!({
645 "error": "New contract not found",
646 "contract_id": request.new_contract_id
647 })),
648 )
649 })?;
650
651 let diff_result = compare_contracts(old_contract, new_contract).await.map_err(|e| {
652 (
653 StatusCode::BAD_REQUEST,
654 Json(serde_json::json!({
655 "error": "Failed to compare contracts",
656 "message": e.to_string()
657 })),
658 )
659 })?;
660
661 let mut drift_evaluation = None;
663 if let (Some(ref drift_engine), Some(ref incident_manager)) =
664 (&state.drift_engine, &state.incident_manager)
665 {
666 let protocol = new_contract.protocol();
668
669 let operations = new_contract.operations();
671 for operation in operations {
672 let operation_id = &operation.id;
673
674 let (endpoint, method) = match &operation.operation_type {
676 mockforge_core::contract_drift::protocol_contracts::OperationType::HttpEndpoint { path, method } => {
677 (path.clone(), method.clone())
678 }
679 mockforge_core::contract_drift::protocol_contracts::OperationType::GrpcMethod { service, method } => {
680 (format!("{}.{}", service, method), "grpc".to_string())
681 }
682 mockforge_core::contract_drift::protocol_contracts::OperationType::WebSocketMessage { message_type, .. } => {
683 (message_type.clone(), "websocket".to_string())
684 }
685 mockforge_core::contract_drift::protocol_contracts::OperationType::MqttTopic { topic, qos: _ } => {
686 (topic.clone(), "mqtt".to_string())
687 }
688 mockforge_core::contract_drift::protocol_contracts::OperationType::KafkaTopic { topic, key_schema: _, value_schema: _ } => {
689 (topic.clone(), "kafka".to_string())
690 }
691 };
692
693 let drift_result = drift_engine.evaluate(&diff_result, &endpoint, &method);
697
698 let mut drift_result_with_fitness = drift_result.clone();
700 if let Some(ref fitness_registry) = state.fitness_registry {
701 let rt = tokio::runtime::Handle::try_current();
702 if let Ok(handle) = rt {
703 let fitness_results = handle.block_on(async {
704 let guard = fitness_registry.read().await;
705 guard.evaluate_all_protocol(
706 Some(old_contract),
707 new_contract,
708 &diff_result,
709 operation_id,
710 None, None, )
713 });
714 if let Ok(results) = fitness_results {
715 drift_result_with_fitness.fitness_test_results = results;
716 if drift_result_with_fitness.fitness_test_results.iter().any(|r| !r.passed)
717 {
718 drift_result_with_fitness.should_create_incident = true;
719 }
720 }
721 }
722 }
723
724 if let Some(ref consumer_analyzer) = state.consumer_analyzer {
727 let rt = tokio::runtime::Handle::try_current();
728 if let Ok(handle) = rt {
729 let impact = handle.block_on(async {
730 let guard = consumer_analyzer.read().await;
731 guard.analyze_impact_with_operation_id(
733 &endpoint,
734 &method,
735 Some(operation_id),
736 )
737 });
738 if let Some(impact) = impact {
739 drift_result_with_fitness.consumer_impact = Some(impact);
740 }
741 }
742 }
743
744 if drift_result_with_fitness.should_create_incident {
746 let incident_type = if drift_result_with_fitness.breaking_changes > 0 {
747 mockforge_core::incidents::types::IncidentType::BreakingChange
748 } else {
749 mockforge_core::incidents::types::IncidentType::ThresholdExceeded
750 };
751
752 let severity = if drift_result_with_fitness.breaking_changes > 0 {
753 mockforge_core::incidents::types::IncidentSeverity::High
754 } else if drift_result_with_fitness.potentially_breaking_changes > 0 {
755 mockforge_core::incidents::types::IncidentSeverity::Medium
756 } else {
757 mockforge_core::incidents::types::IncidentSeverity::Low
758 };
759
760 let details = serde_json::json!({
761 "breaking_changes": drift_result_with_fitness.breaking_changes,
762 "potentially_breaking_changes": drift_result_with_fitness.potentially_breaking_changes,
763 "non_breaking_changes": drift_result_with_fitness.non_breaking_changes,
764 "budget_exceeded": drift_result_with_fitness.budget_exceeded,
765 "operation_id": operation_id,
766 "operation_type": format!("{:?}", operation.operation_type),
767 });
768
769 let before_sample = Some(serde_json::json!({
770 "contract_id": old_contract.contract_id(),
771 "version": old_contract.version(),
772 "protocol": format!("{:?}", old_contract.protocol()),
773 "operation_id": operation_id,
774 }));
775
776 let after_sample = Some(serde_json::json!({
777 "contract_id": new_contract.contract_id(),
778 "version": new_contract.version(),
779 "protocol": format!("{:?}", new_contract.protocol()),
780 "operation_id": operation_id,
781 "mismatches": diff_result.mismatches,
782 }));
783
784 let _incident = incident_manager
785 .create_incident_with_samples(
786 endpoint.clone(),
787 method.clone(),
788 incident_type,
789 severity,
790 details,
791 None, None, None, None, before_sample,
796 after_sample,
797 Some(drift_result_with_fitness.fitness_test_results.clone()),
798 drift_result_with_fitness.consumer_impact.clone(),
799 Some(protocol),
800 )
801 .await;
802 }
803
804 drift_evaluation = Some(serde_json::json!({
805 "operation_id": operation_id,
806 "endpoint": endpoint,
807 "method": method,
808 "budget_exceeded": drift_result_with_fitness.budget_exceeded,
809 "breaking_changes": drift_result_with_fitness.breaking_changes,
810 "fitness_test_results": drift_result_with_fitness.fitness_test_results,
811 "consumer_impact": drift_result_with_fitness.consumer_impact,
812 }));
813 }
814 }
815
816 Ok(Json(serde_json::json!({
817 "matches": diff_result.matches,
818 "confidence": diff_result.confidence,
819 "mismatches": diff_result.mismatches,
820 "recommendations": diff_result.recommendations,
821 "corrections": diff_result.corrections,
822 "drift_evaluation": drift_evaluation,
823 })))
824}
825
826pub async fn validate_message(
828 State(state): State<ProtocolContractState>,
829 Path(contract_id): Path<String>,
830 Json(request): Json<ValidateMessageRequest>,
831) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
832 let registry = state.registry.read().await;
833
834 let contract = registry.get(&contract_id).ok_or_else(|| {
835 (
836 StatusCode::NOT_FOUND,
837 Json(serde_json::json!({
838 "error": "Contract not found",
839 "contract_id": contract_id
840 })),
841 )
842 })?;
843
844 let payload_bytes = match request.payload {
846 serde_json::Value::String(s) => {
847 general_purpose::STANDARD.decode(&s).unwrap_or_else(|_| s.into_bytes())
849 }
850 _ => serde_json::to_vec(&request.payload).map_err(|e| {
851 (
852 StatusCode::BAD_REQUEST,
853 Json(serde_json::json!({
854 "error": "Failed to serialize payload",
855 "message": e.to_string()
856 })),
857 )
858 })?,
859 };
860
861 let contract_request = mockforge_core::contract_drift::protocol_contracts::ContractRequest {
862 protocol: contract.protocol(),
863 operation_id: request.operation_id.clone(),
864 payload: payload_bytes,
865 content_type: request.content_type,
866 metadata: request.metadata.unwrap_or_default(),
867 };
868
869 let validation_result =
870 contract.validate(&request.operation_id, &contract_request).await.map_err(|e| {
871 (
872 StatusCode::BAD_REQUEST,
873 Json(serde_json::json!({
874 "error": "Validation failed",
875 "message": e.to_string()
876 })),
877 )
878 })?;
879
880 Ok(Json(serde_json::json!({
881 "valid": validation_result.valid,
882 "errors": validation_result.errors,
883 "warnings": validation_result.warnings,
884 })))
885}
886
887pub fn protocol_contracts_router(state: ProtocolContractState) -> axum::Router {
889 use axum::routing::{delete, get, post};
890
891 axum::Router::new()
892 .route("/", get(list_contracts))
893 .route("/{contract_id}", get(get_contract))
894 .route("/{contract_id}", delete(delete_contract))
895 .route("/grpc", post(create_grpc_contract))
896 .route("/websocket", post(create_websocket_contract))
897 .route("/mqtt", post(create_mqtt_contract))
898 .route("/kafka", post(create_kafka_contract))
899 .route("/compare", post(compare_contracts_handler))
900 .route("/{contract_id}/validate", post(validate_message))
901 .with_state(state)
902}