mockforge_ui/handlers/
protocol_contracts.rs

1//! Protocol Contracts API Handlers
2//!
3//! Handles CRUD operations for protocol contracts (gRPC, WebSocket, MQTT, Kafka).
4
5use axum::{
6    extract::{Path, Query, State},
7    http::StatusCode,
8    response::IntoResponse,
9    Json,
10};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use uuid::Uuid;
16
17/// Protocol types supported
18#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
19#[serde(rename_all = "lowercase")]
20pub enum ProtocolType {
21    Grpc,
22    Websocket,
23    Mqtt,
24    Kafka,
25}
26
27/// A protocol contract
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct ProtocolContract {
30    pub contract_id: String,
31    pub version: String,
32    pub protocol: ProtocolType,
33    pub contract: serde_json::Value,
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub created_at: Option<String>,
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub updated_at: Option<String>,
38}
39
40/// List contracts response
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ListContractsResponse {
43    pub contracts: Vec<ProtocolContract>,
44    pub total: usize,
45}
46
47/// Create gRPC contract request
48#[derive(Debug, Clone, Deserialize)]
49pub struct CreateGrpcContractRequest {
50    pub contract_id: String,
51    pub version: String,
52    pub descriptor_set: String, // base64 encoded
53}
54
55/// WebSocket message type
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct WebSocketMessageType {
58    pub message_type: String,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pub topic: Option<String>,
61    pub schema: serde_json::Value,
62    pub direction: String,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub description: Option<String>,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    pub example: Option<serde_json::Value>,
67}
68
69/// Create WebSocket contract request
70#[derive(Debug, Clone, Deserialize)]
71pub struct CreateWebSocketContractRequest {
72    pub contract_id: String,
73    pub version: String,
74    pub message_types: Vec<WebSocketMessageType>,
75}
76
77/// MQTT topic schema
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct MqttTopicSchema {
80    pub topic: String,
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub qos: Option<u8>,
83    pub schema: serde_json::Value,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub retained: Option<bool>,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    pub description: Option<String>,
88    #[serde(skip_serializing_if = "Option::is_none")]
89    pub example: Option<serde_json::Value>,
90}
91
92/// Create MQTT contract request
93#[derive(Debug, Clone, Deserialize)]
94pub struct CreateMqttContractRequest {
95    pub contract_id: String,
96    pub version: String,
97    pub topics: Vec<MqttTopicSchema>,
98}
99
100/// Topic schema format
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct TopicSchema {
103    pub format: String, // json, avro, protobuf
104    pub schema: serde_json::Value,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub schema_id: Option<String>,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub version: Option<String>,
109}
110
111/// Evolution rules for Kafka schemas
112#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct EvolutionRules {
114    pub allow_backward_compatible: bool,
115    pub allow_forward_compatible: bool,
116    pub require_version_bump: bool,
117}
118
119/// Kafka topic schema
120#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct KafkaTopicSchema {
122    pub topic: String,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub key_schema: Option<TopicSchema>,
125    pub value_schema: TopicSchema,
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub partitions: Option<u32>,
128    #[serde(skip_serializing_if = "Option::is_none")]
129    pub replication_factor: Option<u16>,
130    #[serde(skip_serializing_if = "Option::is_none")]
131    pub description: Option<String>,
132    #[serde(skip_serializing_if = "Option::is_none")]
133    pub example: Option<serde_json::Value>,
134    #[serde(skip_serializing_if = "Option::is_none")]
135    pub evolution_rules: Option<EvolutionRules>,
136}
137
138/// Create Kafka contract request
139#[derive(Debug, Clone, Deserialize)]
140pub struct CreateKafkaContractRequest {
141    pub contract_id: String,
142    pub version: String,
143    pub topics: Vec<KafkaTopicSchema>,
144}
145
146/// Compare contracts request
147#[derive(Debug, Clone, Deserialize)]
148pub struct CompareContractsRequest {
149    pub old_contract_id: String,
150    pub new_contract_id: String,
151}
152
153/// Contract change
154#[derive(Debug, Clone, Serialize)]
155pub struct ContractChange {
156    pub operation_id: String,
157    pub change_type: String,
158    pub description: String,
159}
160
161/// Compare contracts response
162#[derive(Debug, Clone, Serialize)]
163pub struct CompareContractsResponse {
164    pub breaking_changes: Vec<ContractChange>,
165    pub non_breaking_changes: Vec<ContractChange>,
166    pub summary: CompareSummary,
167}
168
169#[derive(Debug, Clone, Serialize)]
170pub struct CompareSummary {
171    pub total_operations: usize,
172    pub breaking_count: usize,
173    pub non_breaking_count: usize,
174}
175
176/// Validate message request
177#[derive(Debug, Clone, Deserialize)]
178pub struct ValidateMessageRequest {
179    pub operation_id: String,
180    pub message: serde_json::Value,
181    #[serde(skip_serializing_if = "Option::is_none")]
182    pub message_format: Option<String>,
183}
184
185/// Validation error
186#[derive(Debug, Clone, Serialize)]
187pub struct ValidationError {
188    pub path: String,
189    pub message: String,
190}
191
192/// Validate message response
193#[derive(Debug, Clone, Serialize)]
194pub struct ValidateMessageResponse {
195    pub valid: bool,
196    pub errors: Vec<ValidationError>,
197    pub warnings: Vec<ValidationError>,
198}
199
200/// Query parameters for listing contracts
201#[derive(Debug, Clone, Deserialize)]
202pub struct ListContractsQuery {
203    #[serde(skip_serializing_if = "Option::is_none")]
204    pub protocol: Option<String>,
205}
206
207/// State for protocol contracts
208#[derive(Clone)]
209pub struct ProtocolContractsState {
210    contracts: Arc<RwLock<HashMap<String, ProtocolContract>>>,
211}
212
213impl Default for ProtocolContractsState {
214    fn default() -> Self {
215        Self::new()
216    }
217}
218
219impl ProtocolContractsState {
220    pub fn new() -> Self {
221        Self {
222            contracts: Arc::new(RwLock::new(HashMap::new())),
223        }
224    }
225}
226
227/// List all contracts
228pub async fn list_contracts(
229    State(state): State<ProtocolContractsState>,
230    Query(query): Query<ListContractsQuery>,
231) -> impl IntoResponse {
232    let contracts = state.contracts.read().await;
233
234    let filtered: Vec<ProtocolContract> = contracts
235        .values()
236        .filter(|c| {
237            if let Some(ref protocol) = query.protocol {
238                match protocol.to_lowercase().as_str() {
239                    "grpc" => c.protocol == ProtocolType::Grpc,
240                    "websocket" => c.protocol == ProtocolType::Websocket,
241                    "mqtt" => c.protocol == ProtocolType::Mqtt,
242                    "kafka" => c.protocol == ProtocolType::Kafka,
243                    _ => true,
244                }
245            } else {
246                true
247            }
248        })
249        .cloned()
250        .collect();
251
252    let total = filtered.len();
253
254    Json(serde_json::json!({
255        "data": ListContractsResponse {
256            contracts: filtered,
257            total,
258        }
259    }))
260}
261
262/// Get a specific contract
263pub async fn get_contract(
264    State(state): State<ProtocolContractsState>,
265    Path(contract_id): Path<String>,
266) -> impl IntoResponse {
267    let contracts = state.contracts.read().await;
268
269    match contracts.get(&contract_id) {
270        Some(contract) => Json(serde_json::json!({
271            "data": contract
272        }))
273        .into_response(),
274        None => (
275            StatusCode::NOT_FOUND,
276            Json(serde_json::json!({
277                "error": format!("Contract '{}' not found", contract_id)
278            })),
279        )
280            .into_response(),
281    }
282}
283
284/// Delete a contract
285pub async fn delete_contract(
286    State(state): State<ProtocolContractsState>,
287    Path(contract_id): Path<String>,
288) -> impl IntoResponse {
289    let mut contracts = state.contracts.write().await;
290
291    match contracts.remove(&contract_id) {
292        Some(_) => Json(serde_json::json!({
293            "message": format!("Contract '{}' deleted", contract_id)
294        }))
295        .into_response(),
296        None => (
297            StatusCode::NOT_FOUND,
298            Json(serde_json::json!({
299                "error": format!("Contract '{}' not found", contract_id)
300            })),
301        )
302            .into_response(),
303    }
304}
305
306/// Create a gRPC contract
307pub async fn create_grpc_contract(
308    State(state): State<ProtocolContractsState>,
309    Json(request): Json<CreateGrpcContractRequest>,
310) -> impl IntoResponse {
311    let mut contracts = state.contracts.write().await;
312
313    let now = chrono::Utc::now().to_rfc3339();
314    let contract = ProtocolContract {
315        contract_id: request.contract_id.clone(),
316        version: request.version,
317        protocol: ProtocolType::Grpc,
318        contract: serde_json::json!({
319            "descriptor_set": request.descriptor_set
320        }),
321        created_at: Some(now.clone()),
322        updated_at: Some(now),
323    };
324
325    contracts.insert(request.contract_id.clone(), contract.clone());
326
327    (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
328}
329
330/// Create a WebSocket contract
331pub async fn create_websocket_contract(
332    State(state): State<ProtocolContractsState>,
333    Json(request): Json<CreateWebSocketContractRequest>,
334) -> impl IntoResponse {
335    let mut contracts = state.contracts.write().await;
336
337    let now = chrono::Utc::now().to_rfc3339();
338    let contract = ProtocolContract {
339        contract_id: request.contract_id.clone(),
340        version: request.version,
341        protocol: ProtocolType::Websocket,
342        contract: serde_json::json!({
343            "message_types": request.message_types
344        }),
345        created_at: Some(now.clone()),
346        updated_at: Some(now),
347    };
348
349    contracts.insert(request.contract_id.clone(), contract.clone());
350
351    (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
352}
353
354/// Create an MQTT contract
355pub async fn create_mqtt_contract(
356    State(state): State<ProtocolContractsState>,
357    Json(request): Json<CreateMqttContractRequest>,
358) -> impl IntoResponse {
359    let mut contracts = state.contracts.write().await;
360
361    let now = chrono::Utc::now().to_rfc3339();
362    let contract = ProtocolContract {
363        contract_id: request.contract_id.clone(),
364        version: request.version,
365        protocol: ProtocolType::Mqtt,
366        contract: serde_json::json!({
367            "topics": request.topics
368        }),
369        created_at: Some(now.clone()),
370        updated_at: Some(now),
371    };
372
373    contracts.insert(request.contract_id.clone(), contract.clone());
374
375    (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
376}
377
378/// Create a Kafka contract
379pub async fn create_kafka_contract(
380    State(state): State<ProtocolContractsState>,
381    Json(request): Json<CreateKafkaContractRequest>,
382) -> impl IntoResponse {
383    let mut contracts = state.contracts.write().await;
384
385    let now = chrono::Utc::now().to_rfc3339();
386    let contract = ProtocolContract {
387        contract_id: request.contract_id.clone(),
388        version: request.version,
389        protocol: ProtocolType::Kafka,
390        contract: serde_json::json!({
391            "topics": request.topics
392        }),
393        created_at: Some(now.clone()),
394        updated_at: Some(now),
395    };
396
397    contracts.insert(request.contract_id.clone(), contract.clone());
398
399    (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
400}
401
402/// Compare two contracts
403pub async fn compare_contracts(
404    State(state): State<ProtocolContractsState>,
405    Json(request): Json<CompareContractsRequest>,
406) -> impl IntoResponse {
407    let contracts = state.contracts.read().await;
408
409    let old_contract = match contracts.get(&request.old_contract_id) {
410        Some(c) => c,
411        None => {
412            return (
413                StatusCode::NOT_FOUND,
414                Json(serde_json::json!({
415                    "error": format!("Contract '{}' not found", request.old_contract_id)
416                })),
417            )
418                .into_response()
419        }
420    };
421
422    let new_contract = match contracts.get(&request.new_contract_id) {
423        Some(c) => c,
424        None => {
425            return (
426                StatusCode::NOT_FOUND,
427                Json(serde_json::json!({
428                    "error": format!("Contract '{}' not found", request.new_contract_id)
429                })),
430            )
431                .into_response()
432        }
433    };
434
435    // Simple comparison - in production this would do deep schema comparison
436    let mut breaking_changes = Vec::new();
437    let mut non_breaking_changes = Vec::new();
438
439    if old_contract.protocol != new_contract.protocol {
440        breaking_changes.push(ContractChange {
441            operation_id: "protocol".to_string(),
442            change_type: "protocol_change".to_string(),
443            description: format!(
444                "Protocol changed from {:?} to {:?}",
445                old_contract.protocol, new_contract.protocol
446            ),
447        });
448    }
449
450    if old_contract.version != new_contract.version {
451        non_breaking_changes.push(ContractChange {
452            operation_id: "version".to_string(),
453            change_type: "version_bump".to_string(),
454            description: format!(
455                "Version changed from {} to {}",
456                old_contract.version, new_contract.version
457            ),
458        });
459    }
460
461    let response = CompareContractsResponse {
462        summary: CompareSummary {
463            total_operations: breaking_changes.len() + non_breaking_changes.len(),
464            breaking_count: breaking_changes.len(),
465            non_breaking_count: non_breaking_changes.len(),
466        },
467        breaking_changes,
468        non_breaking_changes,
469    };
470
471    Json(serde_json::json!({ "data": response })).into_response()
472}
473
474/// Validate a message against a contract
475pub async fn validate_message(
476    State(state): State<ProtocolContractsState>,
477    Path(contract_id): Path<String>,
478    Json(request): Json<ValidateMessageRequest>,
479) -> impl IntoResponse {
480    let contracts = state.contracts.read().await;
481
482    match contracts.get(&contract_id) {
483        Some(_contract) => {
484            // Simple validation - in production this would validate against the actual schema
485            let response = ValidateMessageResponse {
486                valid: true,
487                errors: Vec::new(),
488                warnings: Vec::new(),
489            };
490
491            Json(serde_json::json!({ "data": response }))
492        }
493        None => Json(serde_json::json!({
494            "error": format!("Contract '{}' not found", contract_id)
495        })),
496    }
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502
503    #[test]
504    fn test_protocol_type_serialization() {
505        assert_eq!(serde_json::to_string(&ProtocolType::Grpc).unwrap(), "\"grpc\"");
506        assert_eq!(serde_json::to_string(&ProtocolType::Websocket).unwrap(), "\"websocket\"");
507        assert_eq!(serde_json::to_string(&ProtocolType::Mqtt).unwrap(), "\"mqtt\"");
508        assert_eq!(serde_json::to_string(&ProtocolType::Kafka).unwrap(), "\"kafka\"");
509    }
510
511    #[tokio::test]
512    async fn test_protocol_contracts_state_new() {
513        let state = ProtocolContractsState::new();
514        // Should be empty initially
515        let contracts = state.contracts.read().await;
516        assert!(contracts.is_empty());
517    }
518}