Skip to main content

mockforge_ui/handlers/
protocol_contracts.rs

1//! Protocol Contracts API Handlers
2//!
3//! Handles CRUD operations for protocol contracts (gRPC, WebSocket, MQTT, Kafka).
4
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::IntoResponse,
9    Json,
10};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16/// Protocol types supported
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
18#[serde(rename_all = "lowercase")]
19pub enum ProtocolType {
20    Grpc,
21    Websocket,
22    Mqtt,
23    Kafka,
24}
25
26/// A protocol contract
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ProtocolContract {
29    pub contract_id: String,
30    pub version: String,
31    pub protocol: ProtocolType,
32    pub contract: serde_json::Value,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub created_at: Option<String>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub updated_at: Option<String>,
37}
38
39/// List contracts response
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ListContractsResponse {
42    pub contracts: Vec<ProtocolContract>,
43    pub total: usize,
44}
45
46/// Create gRPC contract request
47#[derive(Debug, Clone, Deserialize)]
48pub struct CreateGrpcContractRequest {
49    pub contract_id: String,
50    pub version: String,
51    pub descriptor_set: String, // base64 encoded
52}
53
54/// WebSocket message type
55#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct WebSocketMessageType {
57    pub message_type: String,
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub topic: Option<String>,
60    pub schema: serde_json::Value,
61    pub direction: String,
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub description: Option<String>,
64    #[serde(skip_serializing_if = "Option::is_none")]
65    pub example: Option<serde_json::Value>,
66}
67
68/// Create WebSocket contract request
69#[derive(Debug, Clone, Deserialize)]
70pub struct CreateWebSocketContractRequest {
71    pub contract_id: String,
72    pub version: String,
73    pub message_types: Vec<WebSocketMessageType>,
74}
75
76/// MQTT topic schema
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct MqttTopicSchema {
79    pub topic: String,
80    #[serde(skip_serializing_if = "Option::is_none")]
81    pub qos: Option<u8>,
82    pub schema: serde_json::Value,
83    #[serde(skip_serializing_if = "Option::is_none")]
84    pub retained: Option<bool>,
85    #[serde(skip_serializing_if = "Option::is_none")]
86    pub description: Option<String>,
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub example: Option<serde_json::Value>,
89}
90
91/// Create MQTT contract request
92#[derive(Debug, Clone, Deserialize)]
93pub struct CreateMqttContractRequest {
94    pub contract_id: String,
95    pub version: String,
96    pub topics: Vec<MqttTopicSchema>,
97}
98
99/// Topic schema format
100#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct TopicSchema {
102    pub format: String, // json, avro, protobuf
103    pub schema: serde_json::Value,
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub schema_id: Option<String>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub version: Option<String>,
108}
109
110/// Evolution rules for Kafka schemas
111#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct EvolutionRules {
113    pub allow_backward_compatible: bool,
114    pub allow_forward_compatible: bool,
115    pub require_version_bump: bool,
116}
117
118/// Kafka topic schema
119#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct KafkaTopicSchema {
121    pub topic: String,
122    #[serde(skip_serializing_if = "Option::is_none")]
123    pub key_schema: Option<TopicSchema>,
124    pub value_schema: TopicSchema,
125    #[serde(skip_serializing_if = "Option::is_none")]
126    pub partitions: Option<u32>,
127    #[serde(skip_serializing_if = "Option::is_none")]
128    pub replication_factor: Option<u16>,
129    #[serde(skip_serializing_if = "Option::is_none")]
130    pub description: Option<String>,
131    #[serde(skip_serializing_if = "Option::is_none")]
132    pub example: Option<serde_json::Value>,
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub evolution_rules: Option<EvolutionRules>,
135}
136
137/// Create Kafka contract request
138#[derive(Debug, Clone, Deserialize)]
139pub struct CreateKafkaContractRequest {
140    pub contract_id: String,
141    pub version: String,
142    pub topics: Vec<KafkaTopicSchema>,
143}
144
145/// Compare contracts request
146#[derive(Debug, Clone, Deserialize)]
147pub struct CompareContractsRequest {
148    pub old_contract_id: String,
149    pub new_contract_id: String,
150}
151
152/// Contract change
153#[derive(Debug, Clone, Serialize)]
154pub struct ContractChange {
155    pub operation_id: String,
156    pub change_type: String,
157    pub description: String,
158}
159
160/// Compare contracts response
161#[derive(Debug, Clone, Serialize)]
162pub struct CompareContractsResponse {
163    pub breaking_changes: Vec<ContractChange>,
164    pub non_breaking_changes: Vec<ContractChange>,
165    pub summary: CompareSummary,
166}
167
168#[derive(Debug, Clone, Serialize)]
169pub struct CompareSummary {
170    pub total_operations: usize,
171    pub breaking_count: usize,
172    pub non_breaking_count: usize,
173}
174
175/// Validate message request
176#[derive(Debug, Clone, Deserialize)]
177pub struct ValidateMessageRequest {
178    pub operation_id: String,
179    pub message: serde_json::Value,
180    #[serde(skip_serializing_if = "Option::is_none")]
181    pub message_format: Option<String>,
182}
183
184/// Validation error
185#[derive(Debug, Clone, Serialize)]
186pub struct ValidationError {
187    pub path: String,
188    pub message: String,
189}
190
191/// Validate message response
192#[derive(Debug, Clone, Serialize)]
193pub struct ValidateMessageResponse {
194    pub valid: bool,
195    pub errors: Vec<ValidationError>,
196    pub warnings: Vec<ValidationError>,
197}
198
199/// Query parameters for listing contracts
200#[derive(Debug, Clone, Deserialize)]
201pub struct ListContractsQuery {
202    #[serde(skip_serializing_if = "Option::is_none")]
203    pub protocol: Option<String>,
204}
205
206/// State for protocol contracts
207#[derive(Clone)]
208pub struct ProtocolContractsState {
209    contracts: Arc<RwLock<HashMap<String, ProtocolContract>>>,
210}
211
212impl Default for ProtocolContractsState {
213    fn default() -> Self {
214        Self::new()
215    }
216}
217
218impl ProtocolContractsState {
219    pub fn new() -> Self {
220        Self {
221            contracts: Arc::new(RwLock::new(HashMap::new())),
222        }
223    }
224}
225
226/// List all contracts
227pub async fn list_contracts(
228    State(state): State<ProtocolContractsState>,
229    Query(query): Query<ListContractsQuery>,
230) -> impl IntoResponse {
231    let contracts = state.contracts.read().await;
232
233    let filtered: Vec<ProtocolContract> = contracts
234        .values()
235        .filter(|c| {
236            if let Some(ref protocol) = query.protocol {
237                match protocol.to_lowercase().as_str() {
238                    "grpc" => c.protocol == ProtocolType::Grpc,
239                    "websocket" => c.protocol == ProtocolType::Websocket,
240                    "mqtt" => c.protocol == ProtocolType::Mqtt,
241                    "kafka" => c.protocol == ProtocolType::Kafka,
242                    _ => true,
243                }
244            } else {
245                true
246            }
247        })
248        .cloned()
249        .collect();
250
251    let total = filtered.len();
252
253    Json(serde_json::json!({
254        "data": ListContractsResponse {
255            contracts: filtered,
256            total,
257        }
258    }))
259}
260
261/// Get a specific contract
262pub async fn get_contract(
263    State(state): State<ProtocolContractsState>,
264    Path(contract_id): Path<String>,
265) -> impl IntoResponse {
266    let contracts = state.contracts.read().await;
267
268    match contracts.get(&contract_id) {
269        Some(contract) => Json(serde_json::json!({
270            "data": contract
271        }))
272        .into_response(),
273        None => (
274            StatusCode::NOT_FOUND,
275            Json(serde_json::json!({
276                "error": format!("Contract '{}' not found", contract_id)
277            })),
278        )
279            .into_response(),
280    }
281}
282
283/// Delete a contract
284pub async fn delete_contract(
285    State(state): State<ProtocolContractsState>,
286    Path(contract_id): Path<String>,
287) -> impl IntoResponse {
288    let mut contracts = state.contracts.write().await;
289
290    match contracts.remove(&contract_id) {
291        Some(_) => Json(serde_json::json!({
292            "message": format!("Contract '{}' deleted", contract_id)
293        }))
294        .into_response(),
295        None => (
296            StatusCode::NOT_FOUND,
297            Json(serde_json::json!({
298                "error": format!("Contract '{}' not found", contract_id)
299            })),
300        )
301            .into_response(),
302    }
303}
304
305/// Create a gRPC contract
306pub async fn create_grpc_contract(
307    State(state): State<ProtocolContractsState>,
308    Json(request): Json<CreateGrpcContractRequest>,
309) -> impl IntoResponse {
310    let mut contracts = state.contracts.write().await;
311
312    let now = chrono::Utc::now().to_rfc3339();
313    let contract = ProtocolContract {
314        contract_id: request.contract_id.clone(),
315        version: request.version,
316        protocol: ProtocolType::Grpc,
317        contract: serde_json::json!({
318            "descriptor_set": request.descriptor_set
319        }),
320        created_at: Some(now.clone()),
321        updated_at: Some(now),
322    };
323
324    contracts.insert(request.contract_id.clone(), contract.clone());
325
326    (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
327}
328
329/// Create a WebSocket contract
330pub async fn create_websocket_contract(
331    State(state): State<ProtocolContractsState>,
332    Json(request): Json<CreateWebSocketContractRequest>,
333) -> impl IntoResponse {
334    let mut contracts = state.contracts.write().await;
335
336    let now = chrono::Utc::now().to_rfc3339();
337    let contract = ProtocolContract {
338        contract_id: request.contract_id.clone(),
339        version: request.version,
340        protocol: ProtocolType::Websocket,
341        contract: serde_json::json!({
342            "message_types": request.message_types
343        }),
344        created_at: Some(now.clone()),
345        updated_at: Some(now),
346    };
347
348    contracts.insert(request.contract_id.clone(), contract.clone());
349
350    (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
351}
352
353/// Create an MQTT contract
354pub async fn create_mqtt_contract(
355    State(state): State<ProtocolContractsState>,
356    Json(request): Json<CreateMqttContractRequest>,
357) -> impl IntoResponse {
358    let mut contracts = state.contracts.write().await;
359
360    let now = chrono::Utc::now().to_rfc3339();
361    let contract = ProtocolContract {
362        contract_id: request.contract_id.clone(),
363        version: request.version,
364        protocol: ProtocolType::Mqtt,
365        contract: serde_json::json!({
366            "topics": request.topics
367        }),
368        created_at: Some(now.clone()),
369        updated_at: Some(now),
370    };
371
372    contracts.insert(request.contract_id.clone(), contract.clone());
373
374    (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
375}
376
377/// Create a Kafka contract
378pub async fn create_kafka_contract(
379    State(state): State<ProtocolContractsState>,
380    Json(request): Json<CreateKafkaContractRequest>,
381) -> impl IntoResponse {
382    let mut contracts = state.contracts.write().await;
383
384    let now = chrono::Utc::now().to_rfc3339();
385    let contract = ProtocolContract {
386        contract_id: request.contract_id.clone(),
387        version: request.version,
388        protocol: ProtocolType::Kafka,
389        contract: serde_json::json!({
390            "topics": request.topics
391        }),
392        created_at: Some(now.clone()),
393        updated_at: Some(now),
394    };
395
396    contracts.insert(request.contract_id.clone(), contract.clone());
397
398    (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
399}
400
401/// Compare two contracts
402pub async fn compare_contracts(
403    State(state): State<ProtocolContractsState>,
404    Json(request): Json<CompareContractsRequest>,
405) -> impl IntoResponse {
406    let contracts = state.contracts.read().await;
407
408    let old_contract = match contracts.get(&request.old_contract_id) {
409        Some(c) => c,
410        None => {
411            return (
412                StatusCode::NOT_FOUND,
413                Json(serde_json::json!({
414                    "error": format!("Contract '{}' not found", request.old_contract_id)
415                })),
416            )
417                .into_response()
418        }
419    };
420
421    let new_contract = match contracts.get(&request.new_contract_id) {
422        Some(c) => c,
423        None => {
424            return (
425                StatusCode::NOT_FOUND,
426                Json(serde_json::json!({
427                    "error": format!("Contract '{}' not found", request.new_contract_id)
428                })),
429            )
430                .into_response()
431        }
432    };
433
434    // Simple comparison - in production this would do deep schema comparison
435    let mut breaking_changes = Vec::new();
436    let mut non_breaking_changes = Vec::new();
437
438    if old_contract.protocol != new_contract.protocol {
439        breaking_changes.push(ContractChange {
440            operation_id: "protocol".to_string(),
441            change_type: "protocol_change".to_string(),
442            description: format!(
443                "Protocol changed from {:?} to {:?}",
444                old_contract.protocol, new_contract.protocol
445            ),
446        });
447    }
448
449    if old_contract.version != new_contract.version {
450        non_breaking_changes.push(ContractChange {
451            operation_id: "version".to_string(),
452            change_type: "version_bump".to_string(),
453            description: format!(
454                "Version changed from {} to {}",
455                old_contract.version, new_contract.version
456            ),
457        });
458    }
459
460    let response = CompareContractsResponse {
461        summary: CompareSummary {
462            total_operations: breaking_changes.len() + non_breaking_changes.len(),
463            breaking_count: breaking_changes.len(),
464            non_breaking_count: non_breaking_changes.len(),
465        },
466        breaking_changes,
467        non_breaking_changes,
468    };
469
470    Json(serde_json::json!({ "data": response })).into_response()
471}
472
473/// Validate a message against a contract
474pub async fn validate_message(
475    State(state): State<ProtocolContractsState>,
476    Path(contract_id): Path<String>,
477    Json(_request): Json<ValidateMessageRequest>,
478) -> impl IntoResponse {
479    let contracts = state.contracts.read().await;
480
481    match contracts.get(&contract_id) {
482        Some(_contract) => {
483            // Simple validation - in production this would validate against the actual schema
484            let response = ValidateMessageResponse {
485                valid: true,
486                errors: Vec::new(),
487                warnings: Vec::new(),
488            };
489
490            Json(serde_json::json!({ "data": response }))
491        }
492        None => Json(serde_json::json!({
493            "error": format!("Contract '{}' not found", contract_id)
494        })),
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501
502    #[test]
503    fn test_protocol_type_serialization() {
504        assert_eq!(serde_json::to_string(&ProtocolType::Grpc).unwrap(), "\"grpc\"");
505        assert_eq!(serde_json::to_string(&ProtocolType::Websocket).unwrap(), "\"websocket\"");
506        assert_eq!(serde_json::to_string(&ProtocolType::Mqtt).unwrap(), "\"mqtt\"");
507        assert_eq!(serde_json::to_string(&ProtocolType::Kafka).unwrap(), "\"kafka\"");
508    }
509
510    #[tokio::test]
511    async fn test_protocol_contracts_state_new() {
512        let state = ProtocolContractsState::new();
513        // Should be empty initially
514        let contracts = state.contracts.read().await;
515        assert!(contracts.is_empty());
516    }
517}