1#![allow(deprecated)]
10
11use axum::{
12 extract::{Path, Query, State},
13 http::StatusCode,
14 response::Json,
15};
16use mockforge_core::contract_drift::protocol_contracts::{
17 compare_contracts, ProtocolContractRegistry,
18};
19use mockforge_core::contract_drift::{
20 GrpcContract, KafkaContract, MqttContract, WebSocketContract,
21};
22use mockforge_foundation::protocol::Protocol;
23use mockforge_foundation::protocol_contract_types::{
24 KafkaTopicSchema, MqttTopicSchema, SchemaFormat, TopicSchema, WebSocketMessageType,
25};
26use serde::{Deserialize, Serialize};
27use std::collections::HashMap;
28use std::sync::Arc;
29use tokio::sync::RwLock;
30
31use base64::{engine::general_purpose, Engine as _};
33
34#[derive(Clone)]
36pub struct ProtocolContractState {
37 pub registry: Arc<RwLock<ProtocolContractRegistry>>,
39 pub drift_engine: Option<Arc<mockforge_core::contract_drift::DriftBudgetEngine>>,
41 pub incident_manager: Option<Arc<mockforge_core::incidents::IncidentManager>>,
43 pub fitness_registry:
45 Option<Arc<RwLock<mockforge_core::contract_drift::FitnessFunctionRegistry>>>,
46 pub consumer_analyzer:
48 Option<Arc<RwLock<mockforge_core::contract_drift::ConsumerImpactAnalyzer>>>,
49}
50
51#[derive(Debug, Deserialize)]
53pub struct CreateGrpcContractRequest {
54 pub contract_id: String,
56 pub version: String,
58 pub descriptor_set: String,
60}
61
62#[derive(Debug, Deserialize)]
64pub struct CreateWebSocketContractRequest {
65 pub contract_id: String,
67 pub version: String,
69 pub message_types: Vec<WebSocketMessageTypeRequest>,
71}
72
73#[derive(Debug, Deserialize)]
75pub struct WebSocketMessageTypeRequest {
76 pub message_type: String,
78 pub topic: Option<String>,
80 pub schema: serde_json::Value,
82 pub direction: String,
84 pub description: Option<String>,
86 pub example: Option<serde_json::Value>,
88}
89
90#[derive(Debug, Deserialize)]
92pub struct CreateMqttContractRequest {
93 pub contract_id: String,
95 pub version: String,
97 pub topics: Vec<MqttTopicSchemaRequest>,
99}
100
101#[derive(Debug, Deserialize)]
103pub struct MqttTopicSchemaRequest {
104 pub topic: String,
106 pub qos: Option<u8>,
108 pub schema: serde_json::Value,
110 pub retained: Option<bool>,
112 pub description: Option<String>,
114 pub example: Option<serde_json::Value>,
116}
117
118#[derive(Debug, Deserialize)]
120pub struct CreateKafkaContractRequest {
121 pub contract_id: String,
123 pub version: String,
125 pub topics: Vec<KafkaTopicSchemaRequest>,
127}
128
129#[derive(Debug, Deserialize)]
131pub struct KafkaTopicSchemaRequest {
132 pub topic: String,
134 pub key_schema: Option<TopicSchemaRequest>,
136 pub value_schema: TopicSchemaRequest,
138 pub partitions: Option<u32>,
140 pub replication_factor: Option<u16>,
142 pub description: Option<String>,
144 pub evolution_rules: Option<EvolutionRulesRequest>,
146}
147
148#[derive(Debug, Deserialize)]
150pub struct TopicSchemaRequest {
151 pub format: String,
153 pub schema: serde_json::Value,
155 pub schema_id: Option<String>,
157 pub version: Option<String>,
159}
160
161#[derive(Debug, Deserialize)]
163pub struct EvolutionRulesRequest {
164 pub allow_backward_compatible: bool,
166 pub allow_forward_compatible: bool,
168 pub require_version_bump: bool,
170}
171
172#[derive(Debug, Serialize)]
174pub struct ProtocolContractResponse {
175 pub contract_id: String,
177 pub version: String,
179 pub protocol: String,
181 pub contract: serde_json::Value,
183}
184
185#[derive(Debug, Serialize)]
187pub struct ListContractsResponse {
188 pub contracts: Vec<ProtocolContractResponse>,
190 pub total: usize,
192}
193
194#[derive(Debug, Deserialize)]
196pub struct CompareContractsRequest {
197 pub old_contract_id: String,
199 pub new_contract_id: String,
201}
202
203#[derive(Debug, Deserialize)]
205pub struct ValidateMessageRequest {
206 pub operation_id: String,
208 pub payload: serde_json::Value,
210 pub content_type: Option<String>,
212 pub metadata: Option<HashMap<String, String>>,
214}
215
216pub async fn list_contracts(
218 State(state): State<ProtocolContractState>,
219 Query(params): Query<HashMap<String, String>>,
220) -> Result<Json<ListContractsResponse>, (StatusCode, Json<serde_json::Value>)> {
221 let registry = state.registry.read().await;
222
223 let protocol_filter = params.get("protocol").and_then(|p| match p.as_str() {
224 "grpc" => Some(Protocol::Grpc),
225 "websocket" => Some(Protocol::WebSocket),
226 "mqtt" => Some(Protocol::Mqtt),
227 "kafka" => Some(Protocol::Kafka),
228 _ => None,
229 });
230
231 let contracts: Vec<ProtocolContractResponse> = if let Some(protocol) = protocol_filter {
232 registry
233 .list_by_protocol(protocol)
234 .iter()
235 .map(|contract| {
236 let contract_json = contract.to_json().unwrap_or_else(|_| serde_json::json!({}));
237 ProtocolContractResponse {
238 contract_id: contract.contract_id().to_string(),
239 version: contract.version().to_string(),
240 protocol: format!("{:?}", contract.protocol()).to_lowercase(),
241 contract: contract_json,
242 }
243 })
244 .collect()
245 } else {
246 registry
247 .list()
248 .iter()
249 .map(|contract| {
250 let contract_json = contract.to_json().unwrap_or_else(|_| serde_json::json!({}));
251 ProtocolContractResponse {
252 contract_id: contract.contract_id().to_string(),
253 version: contract.version().to_string(),
254 protocol: format!("{:?}", contract.protocol()).to_lowercase(),
255 contract: contract_json,
256 }
257 })
258 .collect()
259 };
260
261 Ok(Json(ListContractsResponse {
262 total: contracts.len(),
263 contracts,
264 }))
265}
266
267pub async fn get_contract(
269 State(state): State<ProtocolContractState>,
270 Path(contract_id): Path<String>,
271) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
272 let registry = state.registry.read().await;
273
274 let contract = registry.get(&contract_id).ok_or_else(|| {
275 (
276 StatusCode::NOT_FOUND,
277 Json(serde_json::json!({
278 "error": "Contract not found",
279 "contract_id": contract_id
280 })),
281 )
282 })?;
283
284 let contract_json = contract.to_json().map_err(|e| {
285 (
286 StatusCode::INTERNAL_SERVER_ERROR,
287 Json(serde_json::json!({
288 "error": "Failed to serialize contract",
289 "message": e.to_string()
290 })),
291 )
292 })?;
293
294 Ok(Json(ProtocolContractResponse {
295 contract_id: contract.contract_id().to_string(),
296 version: contract.version().to_string(),
297 protocol: format!("{:?}", contract.protocol()).to_lowercase(),
298 contract: contract_json,
299 }))
300}
301
302pub async fn create_grpc_contract(
304 State(state): State<ProtocolContractState>,
305 Json(request): Json<CreateGrpcContractRequest>,
306) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
307 let descriptor_bytes =
309 general_purpose::STANDARD.decode(&request.descriptor_set).map_err(|e| {
310 (
311 StatusCode::BAD_REQUEST,
312 Json(serde_json::json!({
313 "error": "Invalid base64 descriptor set",
314 "message": e.to_string()
315 })),
316 )
317 })?;
318
319 let contract = GrpcContract::from_descriptor_set(
322 request.contract_id.clone(),
323 request.version.clone(),
324 &descriptor_bytes,
325 )
326 .map_err(|e| {
327 (
328 StatusCode::BAD_REQUEST,
329 Json(serde_json::json!({
330 "error": "Failed to create gRPC contract",
331 "message": e.to_string()
332 })),
333 )
334 })?;
335
336 let mut registry = state.registry.write().await;
338 registry.register(Box::new(contract));
339
340 let contract = registry.get(&request.contract_id).ok_or_else(|| {
341 (
342 StatusCode::INTERNAL_SERVER_ERROR,
343 Json(serde_json::json!({
344 "error": "Failed to retrieve registered contract",
345 "contract_id": request.contract_id
346 })),
347 )
348 })?;
349 let contract_json = contract.to_json().map_err(|e| {
350 (
351 StatusCode::INTERNAL_SERVER_ERROR,
352 Json(serde_json::json!({
353 "error": "Failed to serialize contract",
354 "message": e.to_string()
355 })),
356 )
357 })?;
358
359 Ok(Json(ProtocolContractResponse {
360 contract_id: request.contract_id,
361 version: request.version,
362 protocol: "grpc".to_string(),
363 contract: contract_json,
364 }))
365}
366
367pub async fn create_websocket_contract(
369 State(state): State<ProtocolContractState>,
370 Json(request): Json<CreateWebSocketContractRequest>,
371) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
372 let mut contract = WebSocketContract::new(request.contract_id.clone(), request.version.clone());
373
374 for msg_type_req in request.message_types {
376 let direction = match msg_type_req.direction.as_str() {
377 "inbound" => mockforge_core::contract_drift::MessageDirection::Inbound,
378 "outbound" => mockforge_core::contract_drift::MessageDirection::Outbound,
379 "bidirectional" => mockforge_core::contract_drift::MessageDirection::Bidirectional,
380 _ => {
381 return Err((
382 StatusCode::BAD_REQUEST,
383 Json(serde_json::json!({
384 "error": "Invalid direction",
385 "message": "Direction must be 'inbound', 'outbound', or 'bidirectional'"
386 })),
387 ));
388 }
389 };
390
391 let message_type = WebSocketMessageType {
392 message_type: msg_type_req.message_type,
393 topic: msg_type_req.topic,
394 schema: msg_type_req.schema,
395 direction,
396 description: msg_type_req.description,
397 example: msg_type_req.example,
398 };
399
400 contract.add_message_type(message_type).map_err(|e| {
401 (
402 StatusCode::BAD_REQUEST,
403 Json(serde_json::json!({
404 "error": "Failed to add message type",
405 "message": e.to_string()
406 })),
407 )
408 })?;
409 }
410
411 let mut registry = state.registry.write().await;
413 registry.register(Box::new(contract));
414
415 let contract = registry.get(&request.contract_id).ok_or_else(|| {
416 (
417 StatusCode::INTERNAL_SERVER_ERROR,
418 Json(serde_json::json!({
419 "error": "Failed to retrieve registered contract",
420 "contract_id": request.contract_id
421 })),
422 )
423 })?;
424 let contract_json = contract.to_json().map_err(|e| {
425 (
426 StatusCode::INTERNAL_SERVER_ERROR,
427 Json(serde_json::json!({
428 "error": "Failed to serialize contract",
429 "message": e.to_string()
430 })),
431 )
432 })?;
433
434 Ok(Json(ProtocolContractResponse {
435 contract_id: request.contract_id,
436 version: request.version,
437 protocol: "websocket".to_string(),
438 contract: contract_json,
439 }))
440}
441
442pub async fn create_mqtt_contract(
444 State(state): State<ProtocolContractState>,
445 Json(request): Json<CreateMqttContractRequest>,
446) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
447 let mut contract = MqttContract::new(request.contract_id.clone(), request.version.clone());
448
449 for topic_req in request.topics {
451 let topic_schema = MqttTopicSchema {
452 topic: topic_req.topic,
453 qos: topic_req.qos,
454 schema: topic_req.schema,
455 retained: topic_req.retained,
456 description: topic_req.description,
457 example: topic_req.example,
458 };
459
460 contract.add_topic(topic_schema).map_err(|e| {
461 (
462 StatusCode::BAD_REQUEST,
463 Json(serde_json::json!({
464 "error": "Failed to add topic",
465 "message": e.to_string()
466 })),
467 )
468 })?;
469 }
470
471 let mut registry = state.registry.write().await;
473 registry.register(Box::new(contract));
474
475 let contract = registry.get(&request.contract_id).ok_or_else(|| {
476 (
477 StatusCode::INTERNAL_SERVER_ERROR,
478 Json(serde_json::json!({
479 "error": "Failed to retrieve registered contract",
480 "contract_id": request.contract_id
481 })),
482 )
483 })?;
484 let contract_json = contract.to_json().map_err(|e| {
485 (
486 StatusCode::INTERNAL_SERVER_ERROR,
487 Json(serde_json::json!({
488 "error": "Failed to serialize contract",
489 "message": e.to_string()
490 })),
491 )
492 })?;
493
494 Ok(Json(ProtocolContractResponse {
495 contract_id: request.contract_id,
496 version: request.version,
497 protocol: "mqtt".to_string(),
498 contract: contract_json,
499 }))
500}
501
502pub async fn create_kafka_contract(
504 State(state): State<ProtocolContractState>,
505 Json(request): Json<CreateKafkaContractRequest>,
506) -> Result<Json<ProtocolContractResponse>, (StatusCode, Json<serde_json::Value>)> {
507 let mut contract = KafkaContract::new(request.contract_id.clone(), request.version.clone());
508
509 for topic_req in request.topics {
511 let format = match topic_req.value_schema.format.as_str() {
512 "json" => SchemaFormat::Json,
513 "avro" => SchemaFormat::Avro,
514 "protobuf" => SchemaFormat::Protobuf,
515 _ => {
516 return Err((
517 StatusCode::BAD_REQUEST,
518 Json(serde_json::json!({
519 "error": "Invalid schema format",
520 "message": "Format must be 'json', 'avro', or 'protobuf'"
521 })),
522 ));
523 }
524 };
525
526 let value_schema = TopicSchema {
527 format,
528 schema: topic_req.value_schema.schema,
529 schema_id: topic_req.value_schema.schema_id,
530 version: topic_req.value_schema.version,
531 };
532
533 let key_schema = topic_req.key_schema.map(|ks_req| {
534 let format = match ks_req.format.as_str() {
535 "json" => SchemaFormat::Json,
536 "avro" => SchemaFormat::Avro,
537 "protobuf" => SchemaFormat::Protobuf,
538 _ => SchemaFormat::Json, };
540
541 TopicSchema {
542 format,
543 schema: ks_req.schema,
544 schema_id: ks_req.schema_id,
545 version: ks_req.version,
546 }
547 });
548
549 let evolution_rules = topic_req.evolution_rules.map(|er_req| {
550 mockforge_core::contract_drift::EvolutionRules {
551 allow_backward_compatible: er_req.allow_backward_compatible,
552 allow_forward_compatible: er_req.allow_forward_compatible,
553 require_version_bump: er_req.require_version_bump,
554 }
555 });
556
557 let topic_schema = KafkaTopicSchema {
558 topic: topic_req.topic,
559 key_schema,
560 value_schema,
561 partitions: topic_req.partitions,
562 replication_factor: topic_req.replication_factor,
563 description: topic_req.description,
564 evolution_rules,
565 };
566
567 contract.add_topic(topic_schema).map_err(|e| {
568 (
569 StatusCode::BAD_REQUEST,
570 Json(serde_json::json!({
571 "error": "Failed to add topic",
572 "message": e.to_string()
573 })),
574 )
575 })?;
576 }
577
578 let mut registry = state.registry.write().await;
580 registry.register(Box::new(contract));
581
582 let contract = registry.get(&request.contract_id).ok_or_else(|| {
583 (
584 StatusCode::INTERNAL_SERVER_ERROR,
585 Json(serde_json::json!({
586 "error": "Failed to retrieve registered contract",
587 "contract_id": request.contract_id
588 })),
589 )
590 })?;
591 let contract_json = contract.to_json().map_err(|e| {
592 (
593 StatusCode::INTERNAL_SERVER_ERROR,
594 Json(serde_json::json!({
595 "error": "Failed to serialize contract",
596 "message": e.to_string()
597 })),
598 )
599 })?;
600
601 Ok(Json(ProtocolContractResponse {
602 contract_id: request.contract_id,
603 version: request.version,
604 protocol: "kafka".to_string(),
605 contract: contract_json,
606 }))
607}
608
609pub async fn delete_contract(
611 State(state): State<ProtocolContractState>,
612 Path(contract_id): Path<String>,
613) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
614 let mut registry = state.registry.write().await;
615
616 registry.remove(&contract_id).ok_or_else(|| {
617 (
618 StatusCode::NOT_FOUND,
619 Json(serde_json::json!({
620 "error": "Contract not found",
621 "contract_id": contract_id
622 })),
623 )
624 })?;
625
626 Ok(Json(serde_json::json!({
627 "message": "Contract deleted",
628 "contract_id": contract_id
629 })))
630}
631
632pub async fn compare_contracts_handler(
634 State(state): State<ProtocolContractState>,
635 Json(request): Json<CompareContractsRequest>,
636) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
637 let registry = state.registry.read().await;
638
639 let old_contract = registry.get(&request.old_contract_id).ok_or_else(|| {
640 (
641 StatusCode::NOT_FOUND,
642 Json(serde_json::json!({
643 "error": "Old contract not found",
644 "contract_id": request.old_contract_id
645 })),
646 )
647 })?;
648
649 let new_contract = registry.get(&request.new_contract_id).ok_or_else(|| {
650 (
651 StatusCode::NOT_FOUND,
652 Json(serde_json::json!({
653 "error": "New contract not found",
654 "contract_id": request.new_contract_id
655 })),
656 )
657 })?;
658
659 let diff_result = compare_contracts(old_contract, new_contract).await.map_err(|e| {
660 (
661 StatusCode::BAD_REQUEST,
662 Json(serde_json::json!({
663 "error": "Failed to compare contracts",
664 "message": e.to_string()
665 })),
666 )
667 })?;
668
669 let mut drift_evaluation = None;
671 if let (Some(ref drift_engine), Some(ref incident_manager)) =
672 (&state.drift_engine, &state.incident_manager)
673 {
674 let protocol = new_contract.protocol();
676
677 let operations = new_contract.operations();
679 for operation in operations {
680 let operation_id = &operation.id;
681
682 let (endpoint, method) = match &operation.operation_type {
684 mockforge_core::contract_drift::protocol_contracts::OperationType::HttpEndpoint { path, method } => {
685 (path.clone(), method.clone())
686 }
687 mockforge_core::contract_drift::protocol_contracts::OperationType::GrpcMethod { service, method } => {
688 (format!("{}.{}", service, method), "grpc".to_string())
689 }
690 mockforge_core::contract_drift::protocol_contracts::OperationType::WebSocketMessage { message_type, .. } => {
691 (message_type.clone(), "websocket".to_string())
692 }
693 mockforge_core::contract_drift::protocol_contracts::OperationType::MqttTopic { topic, qos: _ } => {
694 (topic.clone(), "mqtt".to_string())
695 }
696 mockforge_core::contract_drift::protocol_contracts::OperationType::KafkaTopic { topic, key_schema: _, value_schema: _ } => {
697 (topic.clone(), "kafka".to_string())
698 }
699 };
700
701 let drift_result = drift_engine.evaluate(&diff_result, &endpoint, &method);
705
706 let mut drift_result_with_fitness = drift_result.clone();
708 if let Some(ref fitness_registry) = state.fitness_registry {
709 let guard = fitness_registry.read().await;
710 if let Ok(results) = guard.evaluate_all_protocol(
711 Some(old_contract),
712 new_contract,
713 &diff_result,
714 operation_id,
715 None, None, ) {
718 drift_result_with_fitness.fitness_test_results = results;
719 if drift_result_with_fitness.fitness_test_results.iter().any(|r| !r.passed) {
720 drift_result_with_fitness.should_create_incident = true;
721 }
722 }
723 }
724
725 if let Some(ref consumer_analyzer) = state.consumer_analyzer {
728 let guard = consumer_analyzer.read().await;
729 let impact =
730 guard.analyze_impact_with_operation_id(&endpoint, &method, Some(operation_id));
731 if let Some(impact) = impact {
732 drift_result_with_fitness.consumer_impact = Some(impact);
733 }
734 }
735
736 if drift_result_with_fitness.should_create_incident {
738 let incident_type = if drift_result_with_fitness.breaking_changes > 0 {
739 mockforge_core::incidents::types::IncidentType::BreakingChange
740 } else {
741 mockforge_core::incidents::types::IncidentType::ThresholdExceeded
742 };
743
744 let severity = if drift_result_with_fitness.breaking_changes > 0 {
745 mockforge_core::incidents::types::IncidentSeverity::High
746 } else if drift_result_with_fitness.potentially_breaking_changes > 0 {
747 mockforge_core::incidents::types::IncidentSeverity::Medium
748 } else {
749 mockforge_core::incidents::types::IncidentSeverity::Low
750 };
751
752 let details = serde_json::json!({
753 "breaking_changes": drift_result_with_fitness.breaking_changes,
754 "potentially_breaking_changes": drift_result_with_fitness.potentially_breaking_changes,
755 "non_breaking_changes": drift_result_with_fitness.non_breaking_changes,
756 "budget_exceeded": drift_result_with_fitness.budget_exceeded,
757 "operation_id": operation_id,
758 "operation_type": format!("{:?}", operation.operation_type),
759 });
760
761 let before_sample = Some(serde_json::json!({
762 "contract_id": old_contract.contract_id(),
763 "version": old_contract.version(),
764 "protocol": format!("{:?}", old_contract.protocol()),
765 "operation_id": operation_id,
766 }));
767
768 let after_sample = Some(serde_json::json!({
769 "contract_id": new_contract.contract_id(),
770 "version": new_contract.version(),
771 "protocol": format!("{:?}", new_contract.protocol()),
772 "operation_id": operation_id,
773 "mismatches": diff_result.mismatches,
774 }));
775
776 let _incident = incident_manager
777 .create_incident_with_samples(
778 endpoint.clone(),
779 method.clone(),
780 incident_type,
781 severity,
782 details,
783 None, None, None, None, before_sample,
788 after_sample,
789 Some(drift_result_with_fitness.fitness_test_results.clone()),
790 drift_result_with_fitness.consumer_impact.clone(),
791 Some(protocol),
792 )
793 .await;
794 }
795
796 drift_evaluation = Some(serde_json::json!({
797 "operation_id": operation_id,
798 "endpoint": endpoint,
799 "method": method,
800 "budget_exceeded": drift_result_with_fitness.budget_exceeded,
801 "breaking_changes": drift_result_with_fitness.breaking_changes,
802 "fitness_test_results": drift_result_with_fitness.fitness_test_results,
803 "consumer_impact": drift_result_with_fitness.consumer_impact,
804 }));
805 }
806 }
807
808 Ok(Json(serde_json::json!({
809 "matches": diff_result.matches,
810 "confidence": diff_result.confidence,
811 "mismatches": diff_result.mismatches,
812 "recommendations": diff_result.recommendations,
813 "corrections": diff_result.corrections,
814 "drift_evaluation": drift_evaluation,
815 })))
816}
817
818pub async fn validate_message(
820 State(state): State<ProtocolContractState>,
821 Path(contract_id): Path<String>,
822 Json(request): Json<ValidateMessageRequest>,
823) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
824 let registry = state.registry.read().await;
825
826 let contract = registry.get(&contract_id).ok_or_else(|| {
827 (
828 StatusCode::NOT_FOUND,
829 Json(serde_json::json!({
830 "error": "Contract not found",
831 "contract_id": contract_id
832 })),
833 )
834 })?;
835
836 let payload_bytes = match request.payload {
838 serde_json::Value::String(s) => {
839 general_purpose::STANDARD.decode(&s).unwrap_or_else(|_| s.into_bytes())
841 }
842 _ => serde_json::to_vec(&request.payload).map_err(|e| {
843 (
844 StatusCode::BAD_REQUEST,
845 Json(serde_json::json!({
846 "error": "Failed to serialize payload",
847 "message": e.to_string()
848 })),
849 )
850 })?,
851 };
852
853 let contract_request = mockforge_core::contract_drift::protocol_contracts::ContractRequest {
854 protocol: contract.protocol(),
855 operation_id: request.operation_id.clone(),
856 payload: payload_bytes,
857 content_type: request.content_type,
858 metadata: request.metadata.unwrap_or_default(),
859 };
860
861 let validation_result =
862 contract.validate(&request.operation_id, &contract_request).await.map_err(|e| {
863 (
864 StatusCode::BAD_REQUEST,
865 Json(serde_json::json!({
866 "error": "Validation failed",
867 "message": e.to_string()
868 })),
869 )
870 })?;
871
872 Ok(Json(serde_json::json!({
873 "valid": validation_result.valid,
874 "errors": validation_result.errors,
875 "warnings": validation_result.warnings,
876 })))
877}
878
879pub fn protocol_contracts_router(state: ProtocolContractState) -> axum::Router {
881 use axum::routing::{delete, get, post};
882
883 axum::Router::new()
884 .route("/", get(list_contracts))
885 .route("/{contract_id}", get(get_contract))
886 .route("/{contract_id}", delete(delete_contract))
887 .route("/grpc", post(create_grpc_contract))
888 .route("/websocket", post(create_websocket_contract))
889 .route("/mqtt", post(create_mqtt_contract))
890 .route("/kafka", post(create_kafka_contract))
891 .route("/compare", post(compare_contracts_handler))
892 .route("/{contract_id}/validate", post(validate_message))
893 .with_state(state)
894}