1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::IntoResponse,
9 Json,
10};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15
16#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
18#[serde(rename_all = "lowercase")]
19pub enum ProtocolType {
20 Grpc,
21 Websocket,
22 Mqtt,
23 Kafka,
24}
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ProtocolContract {
29 pub contract_id: String,
30 pub version: String,
31 pub protocol: ProtocolType,
32 pub contract: serde_json::Value,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub created_at: Option<String>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub updated_at: Option<String>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ListContractsResponse {
42 pub contracts: Vec<ProtocolContract>,
43 pub total: usize,
44}
45
46#[derive(Debug, Clone, Deserialize)]
48pub struct CreateGrpcContractRequest {
49 pub contract_id: String,
50 pub version: String,
51 pub descriptor_set: String, }
53
54#[derive(Debug, Clone, Serialize, Deserialize)]
56pub struct WebSocketMessageType {
57 pub message_type: String,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub topic: Option<String>,
60 pub schema: serde_json::Value,
61 pub direction: String,
62 #[serde(skip_serializing_if = "Option::is_none")]
63 pub description: Option<String>,
64 #[serde(skip_serializing_if = "Option::is_none")]
65 pub example: Option<serde_json::Value>,
66}
67
68#[derive(Debug, Clone, Deserialize)]
70pub struct CreateWebSocketContractRequest {
71 pub contract_id: String,
72 pub version: String,
73 pub message_types: Vec<WebSocketMessageType>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct MqttTopicSchema {
79 pub topic: String,
80 #[serde(skip_serializing_if = "Option::is_none")]
81 pub qos: Option<u8>,
82 pub schema: serde_json::Value,
83 #[serde(skip_serializing_if = "Option::is_none")]
84 pub retained: Option<bool>,
85 #[serde(skip_serializing_if = "Option::is_none")]
86 pub description: Option<String>,
87 #[serde(skip_serializing_if = "Option::is_none")]
88 pub example: Option<serde_json::Value>,
89}
90
91#[derive(Debug, Clone, Deserialize)]
93pub struct CreateMqttContractRequest {
94 pub contract_id: String,
95 pub version: String,
96 pub topics: Vec<MqttTopicSchema>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
101pub struct TopicSchema {
102 pub format: String, pub schema: serde_json::Value,
104 #[serde(skip_serializing_if = "Option::is_none")]
105 pub schema_id: Option<String>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub version: Option<String>,
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct EvolutionRules {
113 pub allow_backward_compatible: bool,
114 pub allow_forward_compatible: bool,
115 pub require_version_bump: bool,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
120pub struct KafkaTopicSchema {
121 pub topic: String,
122 #[serde(skip_serializing_if = "Option::is_none")]
123 pub key_schema: Option<TopicSchema>,
124 pub value_schema: TopicSchema,
125 #[serde(skip_serializing_if = "Option::is_none")]
126 pub partitions: Option<u32>,
127 #[serde(skip_serializing_if = "Option::is_none")]
128 pub replication_factor: Option<u16>,
129 #[serde(skip_serializing_if = "Option::is_none")]
130 pub description: Option<String>,
131 #[serde(skip_serializing_if = "Option::is_none")]
132 pub example: Option<serde_json::Value>,
133 #[serde(skip_serializing_if = "Option::is_none")]
134 pub evolution_rules: Option<EvolutionRules>,
135}
136
137#[derive(Debug, Clone, Deserialize)]
139pub struct CreateKafkaContractRequest {
140 pub contract_id: String,
141 pub version: String,
142 pub topics: Vec<KafkaTopicSchema>,
143}
144
145#[derive(Debug, Clone, Deserialize)]
147pub struct CompareContractsRequest {
148 pub old_contract_id: String,
149 pub new_contract_id: String,
150}
151
152#[derive(Debug, Clone, Serialize)]
154pub struct ContractChange {
155 pub operation_id: String,
156 pub change_type: String,
157 pub description: String,
158}
159
160#[derive(Debug, Clone, Serialize)]
162pub struct CompareContractsResponse {
163 pub breaking_changes: Vec<ContractChange>,
164 pub non_breaking_changes: Vec<ContractChange>,
165 pub summary: CompareSummary,
166}
167
168#[derive(Debug, Clone, Serialize)]
169pub struct CompareSummary {
170 pub total_operations: usize,
171 pub breaking_count: usize,
172 pub non_breaking_count: usize,
173}
174
175#[derive(Debug, Clone, Deserialize)]
177pub struct ValidateMessageRequest {
178 pub operation_id: String,
179 pub message: serde_json::Value,
180 #[serde(skip_serializing_if = "Option::is_none")]
181 pub message_format: Option<String>,
182}
183
184#[derive(Debug, Clone, Serialize)]
186pub struct ValidationError {
187 pub path: String,
188 pub message: String,
189}
190
191#[derive(Debug, Clone, Serialize)]
193pub struct ValidateMessageResponse {
194 pub valid: bool,
195 pub errors: Vec<ValidationError>,
196 pub warnings: Vec<ValidationError>,
197}
198
199#[derive(Debug, Clone, Deserialize)]
201pub struct ListContractsQuery {
202 #[serde(skip_serializing_if = "Option::is_none")]
203 pub protocol: Option<String>,
204}
205
206#[derive(Clone)]
208pub struct ProtocolContractsState {
209 contracts: Arc<RwLock<HashMap<String, ProtocolContract>>>,
210}
211
212impl Default for ProtocolContractsState {
213 fn default() -> Self {
214 Self::new()
215 }
216}
217
218impl ProtocolContractsState {
219 pub fn new() -> Self {
220 Self {
221 contracts: Arc::new(RwLock::new(HashMap::new())),
222 }
223 }
224}
225
226pub async fn list_contracts(
228 State(state): State<ProtocolContractsState>,
229 Query(query): Query<ListContractsQuery>,
230) -> impl IntoResponse {
231 let contracts = state.contracts.read().await;
232
233 let filtered: Vec<ProtocolContract> = contracts
234 .values()
235 .filter(|c| {
236 if let Some(ref protocol) = query.protocol {
237 match protocol.to_lowercase().as_str() {
238 "grpc" => c.protocol == ProtocolType::Grpc,
239 "websocket" => c.protocol == ProtocolType::Websocket,
240 "mqtt" => c.protocol == ProtocolType::Mqtt,
241 "kafka" => c.protocol == ProtocolType::Kafka,
242 _ => true,
243 }
244 } else {
245 true
246 }
247 })
248 .cloned()
249 .collect();
250
251 let total = filtered.len();
252
253 Json(serde_json::json!({
254 "data": ListContractsResponse {
255 contracts: filtered,
256 total,
257 }
258 }))
259}
260
261pub async fn get_contract(
263 State(state): State<ProtocolContractsState>,
264 Path(contract_id): Path<String>,
265) -> impl IntoResponse {
266 let contracts = state.contracts.read().await;
267
268 match contracts.get(&contract_id) {
269 Some(contract) => Json(serde_json::json!({
270 "data": contract
271 }))
272 .into_response(),
273 None => (
274 StatusCode::NOT_FOUND,
275 Json(serde_json::json!({
276 "error": format!("Contract '{}' not found", contract_id)
277 })),
278 )
279 .into_response(),
280 }
281}
282
283pub async fn delete_contract(
285 State(state): State<ProtocolContractsState>,
286 Path(contract_id): Path<String>,
287) -> impl IntoResponse {
288 let mut contracts = state.contracts.write().await;
289
290 match contracts.remove(&contract_id) {
291 Some(_) => Json(serde_json::json!({
292 "message": format!("Contract '{}' deleted", contract_id)
293 }))
294 .into_response(),
295 None => (
296 StatusCode::NOT_FOUND,
297 Json(serde_json::json!({
298 "error": format!("Contract '{}' not found", contract_id)
299 })),
300 )
301 .into_response(),
302 }
303}
304
305pub async fn create_grpc_contract(
307 State(state): State<ProtocolContractsState>,
308 Json(request): Json<CreateGrpcContractRequest>,
309) -> impl IntoResponse {
310 let mut contracts = state.contracts.write().await;
311
312 let now = chrono::Utc::now().to_rfc3339();
313 let contract = ProtocolContract {
314 contract_id: request.contract_id.clone(),
315 version: request.version,
316 protocol: ProtocolType::Grpc,
317 contract: serde_json::json!({
318 "descriptor_set": request.descriptor_set
319 }),
320 created_at: Some(now.clone()),
321 updated_at: Some(now),
322 };
323
324 contracts.insert(request.contract_id.clone(), contract.clone());
325
326 (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
327}
328
329pub async fn create_websocket_contract(
331 State(state): State<ProtocolContractsState>,
332 Json(request): Json<CreateWebSocketContractRequest>,
333) -> impl IntoResponse {
334 let mut contracts = state.contracts.write().await;
335
336 let now = chrono::Utc::now().to_rfc3339();
337 let contract = ProtocolContract {
338 contract_id: request.contract_id.clone(),
339 version: request.version,
340 protocol: ProtocolType::Websocket,
341 contract: serde_json::json!({
342 "message_types": request.message_types
343 }),
344 created_at: Some(now.clone()),
345 updated_at: Some(now),
346 };
347
348 contracts.insert(request.contract_id.clone(), contract.clone());
349
350 (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
351}
352
353pub async fn create_mqtt_contract(
355 State(state): State<ProtocolContractsState>,
356 Json(request): Json<CreateMqttContractRequest>,
357) -> impl IntoResponse {
358 let mut contracts = state.contracts.write().await;
359
360 let now = chrono::Utc::now().to_rfc3339();
361 let contract = ProtocolContract {
362 contract_id: request.contract_id.clone(),
363 version: request.version,
364 protocol: ProtocolType::Mqtt,
365 contract: serde_json::json!({
366 "topics": request.topics
367 }),
368 created_at: Some(now.clone()),
369 updated_at: Some(now),
370 };
371
372 contracts.insert(request.contract_id.clone(), contract.clone());
373
374 (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
375}
376
377pub async fn create_kafka_contract(
379 State(state): State<ProtocolContractsState>,
380 Json(request): Json<CreateKafkaContractRequest>,
381) -> impl IntoResponse {
382 let mut contracts = state.contracts.write().await;
383
384 let now = chrono::Utc::now().to_rfc3339();
385 let contract = ProtocolContract {
386 contract_id: request.contract_id.clone(),
387 version: request.version,
388 protocol: ProtocolType::Kafka,
389 contract: serde_json::json!({
390 "topics": request.topics
391 }),
392 created_at: Some(now.clone()),
393 updated_at: Some(now),
394 };
395
396 contracts.insert(request.contract_id.clone(), contract.clone());
397
398 (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
399}
400
401pub async fn compare_contracts(
403 State(state): State<ProtocolContractsState>,
404 Json(request): Json<CompareContractsRequest>,
405) -> impl IntoResponse {
406 let contracts = state.contracts.read().await;
407
408 let old_contract = match contracts.get(&request.old_contract_id) {
409 Some(c) => c,
410 None => {
411 return (
412 StatusCode::NOT_FOUND,
413 Json(serde_json::json!({
414 "error": format!("Contract '{}' not found", request.old_contract_id)
415 })),
416 )
417 .into_response()
418 }
419 };
420
421 let new_contract = match contracts.get(&request.new_contract_id) {
422 Some(c) => c,
423 None => {
424 return (
425 StatusCode::NOT_FOUND,
426 Json(serde_json::json!({
427 "error": format!("Contract '{}' not found", request.new_contract_id)
428 })),
429 )
430 .into_response()
431 }
432 };
433
434 let mut breaking_changes = Vec::new();
436 let mut non_breaking_changes = Vec::new();
437
438 if old_contract.protocol != new_contract.protocol {
439 breaking_changes.push(ContractChange {
440 operation_id: "protocol".to_string(),
441 change_type: "protocol_change".to_string(),
442 description: format!(
443 "Protocol changed from {:?} to {:?}",
444 old_contract.protocol, new_contract.protocol
445 ),
446 });
447 }
448
449 if old_contract.version != new_contract.version {
450 non_breaking_changes.push(ContractChange {
451 operation_id: "version".to_string(),
452 change_type: "version_bump".to_string(),
453 description: format!(
454 "Version changed from {} to {}",
455 old_contract.version, new_contract.version
456 ),
457 });
458 }
459
460 let response = CompareContractsResponse {
461 summary: CompareSummary {
462 total_operations: breaking_changes.len() + non_breaking_changes.len(),
463 breaking_count: breaking_changes.len(),
464 non_breaking_count: non_breaking_changes.len(),
465 },
466 breaking_changes,
467 non_breaking_changes,
468 };
469
470 Json(serde_json::json!({ "data": response })).into_response()
471}
472
473pub async fn validate_message(
475 State(state): State<ProtocolContractsState>,
476 Path(contract_id): Path<String>,
477 Json(_request): Json<ValidateMessageRequest>,
478) -> impl IntoResponse {
479 let contracts = state.contracts.read().await;
480
481 match contracts.get(&contract_id) {
482 Some(_contract) => {
483 let response = ValidateMessageResponse {
485 valid: true,
486 errors: Vec::new(),
487 warnings: Vec::new(),
488 };
489
490 Json(serde_json::json!({ "data": response }))
491 }
492 None => Json(serde_json::json!({
493 "error": format!("Contract '{}' not found", contract_id)
494 })),
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501
502 #[test]
503 fn test_protocol_type_serialization() {
504 assert_eq!(serde_json::to_string(&ProtocolType::Grpc).unwrap(), "\"grpc\"");
505 assert_eq!(serde_json::to_string(&ProtocolType::Websocket).unwrap(), "\"websocket\"");
506 assert_eq!(serde_json::to_string(&ProtocolType::Mqtt).unwrap(), "\"mqtt\"");
507 assert_eq!(serde_json::to_string(&ProtocolType::Kafka).unwrap(), "\"kafka\"");
508 }
509
510 #[tokio::test]
511 async fn test_protocol_contracts_state_new() {
512 let state = ProtocolContractsState::new();
513 let contracts = state.contracts.read().await;
515 assert!(contracts.is_empty());
516 }
517}