1use axum::{
6 extract::{Path, Query, State},
7 http::StatusCode,
8 response::IntoResponse,
9 Json,
10};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use tokio::sync::RwLock;
15use uuid::Uuid;
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
19#[serde(rename_all = "lowercase")]
20pub enum ProtocolType {
21 Grpc,
22 Websocket,
23 Mqtt,
24 Kafka,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct ProtocolContract {
30 pub contract_id: String,
31 pub version: String,
32 pub protocol: ProtocolType,
33 pub contract: serde_json::Value,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub created_at: Option<String>,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub updated_at: Option<String>,
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct ListContractsResponse {
43 pub contracts: Vec<ProtocolContract>,
44 pub total: usize,
45}
46
47#[derive(Debug, Clone, Deserialize)]
49pub struct CreateGrpcContractRequest {
50 pub contract_id: String,
51 pub version: String,
52 pub descriptor_set: String, }
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct WebSocketMessageType {
58 pub message_type: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pub topic: Option<String>,
61 pub schema: serde_json::Value,
62 pub direction: String,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 pub description: Option<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 pub example: Option<serde_json::Value>,
67}
68
69#[derive(Debug, Clone, Deserialize)]
71pub struct CreateWebSocketContractRequest {
72 pub contract_id: String,
73 pub version: String,
74 pub message_types: Vec<WebSocketMessageType>,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct MqttTopicSchema {
80 pub topic: String,
81 #[serde(skip_serializing_if = "Option::is_none")]
82 pub qos: Option<u8>,
83 pub schema: serde_json::Value,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 pub retained: Option<bool>,
86 #[serde(skip_serializing_if = "Option::is_none")]
87 pub description: Option<String>,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 pub example: Option<serde_json::Value>,
90}
91
92#[derive(Debug, Clone, Deserialize)]
94pub struct CreateMqttContractRequest {
95 pub contract_id: String,
96 pub version: String,
97 pub topics: Vec<MqttTopicSchema>,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct TopicSchema {
103 pub format: String, pub schema: serde_json::Value,
105 #[serde(skip_serializing_if = "Option::is_none")]
106 pub schema_id: Option<String>,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pub version: Option<String>,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113pub struct EvolutionRules {
114 pub allow_backward_compatible: bool,
115 pub allow_forward_compatible: bool,
116 pub require_version_bump: bool,
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct KafkaTopicSchema {
122 pub topic: String,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub key_schema: Option<TopicSchema>,
125 pub value_schema: TopicSchema,
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub partitions: Option<u32>,
128 #[serde(skip_serializing_if = "Option::is_none")]
129 pub replication_factor: Option<u16>,
130 #[serde(skip_serializing_if = "Option::is_none")]
131 pub description: Option<String>,
132 #[serde(skip_serializing_if = "Option::is_none")]
133 pub example: Option<serde_json::Value>,
134 #[serde(skip_serializing_if = "Option::is_none")]
135 pub evolution_rules: Option<EvolutionRules>,
136}
137
138#[derive(Debug, Clone, Deserialize)]
140pub struct CreateKafkaContractRequest {
141 pub contract_id: String,
142 pub version: String,
143 pub topics: Vec<KafkaTopicSchema>,
144}
145
146#[derive(Debug, Clone, Deserialize)]
148pub struct CompareContractsRequest {
149 pub old_contract_id: String,
150 pub new_contract_id: String,
151}
152
153#[derive(Debug, Clone, Serialize)]
155pub struct ContractChange {
156 pub operation_id: String,
157 pub change_type: String,
158 pub description: String,
159}
160
161#[derive(Debug, Clone, Serialize)]
163pub struct CompareContractsResponse {
164 pub breaking_changes: Vec<ContractChange>,
165 pub non_breaking_changes: Vec<ContractChange>,
166 pub summary: CompareSummary,
167}
168
169#[derive(Debug, Clone, Serialize)]
170pub struct CompareSummary {
171 pub total_operations: usize,
172 pub breaking_count: usize,
173 pub non_breaking_count: usize,
174}
175
176#[derive(Debug, Clone, Deserialize)]
178pub struct ValidateMessageRequest {
179 pub operation_id: String,
180 pub message: serde_json::Value,
181 #[serde(skip_serializing_if = "Option::is_none")]
182 pub message_format: Option<String>,
183}
184
185#[derive(Debug, Clone, Serialize)]
187pub struct ValidationError {
188 pub path: String,
189 pub message: String,
190}
191
192#[derive(Debug, Clone, Serialize)]
194pub struct ValidateMessageResponse {
195 pub valid: bool,
196 pub errors: Vec<ValidationError>,
197 pub warnings: Vec<ValidationError>,
198}
199
200#[derive(Debug, Clone, Deserialize)]
202pub struct ListContractsQuery {
203 #[serde(skip_serializing_if = "Option::is_none")]
204 pub protocol: Option<String>,
205}
206
207#[derive(Clone)]
209pub struct ProtocolContractsState {
210 contracts: Arc<RwLock<HashMap<String, ProtocolContract>>>,
211}
212
213impl Default for ProtocolContractsState {
214 fn default() -> Self {
215 Self::new()
216 }
217}
218
219impl ProtocolContractsState {
220 pub fn new() -> Self {
221 Self {
222 contracts: Arc::new(RwLock::new(HashMap::new())),
223 }
224 }
225}
226
227pub async fn list_contracts(
229 State(state): State<ProtocolContractsState>,
230 Query(query): Query<ListContractsQuery>,
231) -> impl IntoResponse {
232 let contracts = state.contracts.read().await;
233
234 let filtered: Vec<ProtocolContract> = contracts
235 .values()
236 .filter(|c| {
237 if let Some(ref protocol) = query.protocol {
238 match protocol.to_lowercase().as_str() {
239 "grpc" => c.protocol == ProtocolType::Grpc,
240 "websocket" => c.protocol == ProtocolType::Websocket,
241 "mqtt" => c.protocol == ProtocolType::Mqtt,
242 "kafka" => c.protocol == ProtocolType::Kafka,
243 _ => true,
244 }
245 } else {
246 true
247 }
248 })
249 .cloned()
250 .collect();
251
252 let total = filtered.len();
253
254 Json(serde_json::json!({
255 "data": ListContractsResponse {
256 contracts: filtered,
257 total,
258 }
259 }))
260}
261
262pub async fn get_contract(
264 State(state): State<ProtocolContractsState>,
265 Path(contract_id): Path<String>,
266) -> impl IntoResponse {
267 let contracts = state.contracts.read().await;
268
269 match contracts.get(&contract_id) {
270 Some(contract) => Json(serde_json::json!({
271 "data": contract
272 }))
273 .into_response(),
274 None => (
275 StatusCode::NOT_FOUND,
276 Json(serde_json::json!({
277 "error": format!("Contract '{}' not found", contract_id)
278 })),
279 )
280 .into_response(),
281 }
282}
283
284pub async fn delete_contract(
286 State(state): State<ProtocolContractsState>,
287 Path(contract_id): Path<String>,
288) -> impl IntoResponse {
289 let mut contracts = state.contracts.write().await;
290
291 match contracts.remove(&contract_id) {
292 Some(_) => Json(serde_json::json!({
293 "message": format!("Contract '{}' deleted", contract_id)
294 }))
295 .into_response(),
296 None => (
297 StatusCode::NOT_FOUND,
298 Json(serde_json::json!({
299 "error": format!("Contract '{}' not found", contract_id)
300 })),
301 )
302 .into_response(),
303 }
304}
305
306pub async fn create_grpc_contract(
308 State(state): State<ProtocolContractsState>,
309 Json(request): Json<CreateGrpcContractRequest>,
310) -> impl IntoResponse {
311 let mut contracts = state.contracts.write().await;
312
313 let now = chrono::Utc::now().to_rfc3339();
314 let contract = ProtocolContract {
315 contract_id: request.contract_id.clone(),
316 version: request.version,
317 protocol: ProtocolType::Grpc,
318 contract: serde_json::json!({
319 "descriptor_set": request.descriptor_set
320 }),
321 created_at: Some(now.clone()),
322 updated_at: Some(now),
323 };
324
325 contracts.insert(request.contract_id.clone(), contract.clone());
326
327 (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
328}
329
330pub async fn create_websocket_contract(
332 State(state): State<ProtocolContractsState>,
333 Json(request): Json<CreateWebSocketContractRequest>,
334) -> impl IntoResponse {
335 let mut contracts = state.contracts.write().await;
336
337 let now = chrono::Utc::now().to_rfc3339();
338 let contract = ProtocolContract {
339 contract_id: request.contract_id.clone(),
340 version: request.version,
341 protocol: ProtocolType::Websocket,
342 contract: serde_json::json!({
343 "message_types": request.message_types
344 }),
345 created_at: Some(now.clone()),
346 updated_at: Some(now),
347 };
348
349 contracts.insert(request.contract_id.clone(), contract.clone());
350
351 (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
352}
353
354pub async fn create_mqtt_contract(
356 State(state): State<ProtocolContractsState>,
357 Json(request): Json<CreateMqttContractRequest>,
358) -> impl IntoResponse {
359 let mut contracts = state.contracts.write().await;
360
361 let now = chrono::Utc::now().to_rfc3339();
362 let contract = ProtocolContract {
363 contract_id: request.contract_id.clone(),
364 version: request.version,
365 protocol: ProtocolType::Mqtt,
366 contract: serde_json::json!({
367 "topics": request.topics
368 }),
369 created_at: Some(now.clone()),
370 updated_at: Some(now),
371 };
372
373 contracts.insert(request.contract_id.clone(), contract.clone());
374
375 (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
376}
377
378pub async fn create_kafka_contract(
380 State(state): State<ProtocolContractsState>,
381 Json(request): Json<CreateKafkaContractRequest>,
382) -> impl IntoResponse {
383 let mut contracts = state.contracts.write().await;
384
385 let now = chrono::Utc::now().to_rfc3339();
386 let contract = ProtocolContract {
387 contract_id: request.contract_id.clone(),
388 version: request.version,
389 protocol: ProtocolType::Kafka,
390 contract: serde_json::json!({
391 "topics": request.topics
392 }),
393 created_at: Some(now.clone()),
394 updated_at: Some(now),
395 };
396
397 contracts.insert(request.contract_id.clone(), contract.clone());
398
399 (StatusCode::CREATED, Json(serde_json::json!({ "data": contract })))
400}
401
402pub async fn compare_contracts(
404 State(state): State<ProtocolContractsState>,
405 Json(request): Json<CompareContractsRequest>,
406) -> impl IntoResponse {
407 let contracts = state.contracts.read().await;
408
409 let old_contract = match contracts.get(&request.old_contract_id) {
410 Some(c) => c,
411 None => {
412 return (
413 StatusCode::NOT_FOUND,
414 Json(serde_json::json!({
415 "error": format!("Contract '{}' not found", request.old_contract_id)
416 })),
417 )
418 .into_response()
419 }
420 };
421
422 let new_contract = match contracts.get(&request.new_contract_id) {
423 Some(c) => c,
424 None => {
425 return (
426 StatusCode::NOT_FOUND,
427 Json(serde_json::json!({
428 "error": format!("Contract '{}' not found", request.new_contract_id)
429 })),
430 )
431 .into_response()
432 }
433 };
434
435 let mut breaking_changes = Vec::new();
437 let mut non_breaking_changes = Vec::new();
438
439 if old_contract.protocol != new_contract.protocol {
440 breaking_changes.push(ContractChange {
441 operation_id: "protocol".to_string(),
442 change_type: "protocol_change".to_string(),
443 description: format!(
444 "Protocol changed from {:?} to {:?}",
445 old_contract.protocol, new_contract.protocol
446 ),
447 });
448 }
449
450 if old_contract.version != new_contract.version {
451 non_breaking_changes.push(ContractChange {
452 operation_id: "version".to_string(),
453 change_type: "version_bump".to_string(),
454 description: format!(
455 "Version changed from {} to {}",
456 old_contract.version, new_contract.version
457 ),
458 });
459 }
460
461 let response = CompareContractsResponse {
462 summary: CompareSummary {
463 total_operations: breaking_changes.len() + non_breaking_changes.len(),
464 breaking_count: breaking_changes.len(),
465 non_breaking_count: non_breaking_changes.len(),
466 },
467 breaking_changes,
468 non_breaking_changes,
469 };
470
471 Json(serde_json::json!({ "data": response })).into_response()
472}
473
474pub async fn validate_message(
476 State(state): State<ProtocolContractsState>,
477 Path(contract_id): Path<String>,
478 Json(request): Json<ValidateMessageRequest>,
479) -> impl IntoResponse {
480 let contracts = state.contracts.read().await;
481
482 match contracts.get(&contract_id) {
483 Some(_contract) => {
484 let response = ValidateMessageResponse {
486 valid: true,
487 errors: Vec::new(),
488 warnings: Vec::new(),
489 };
490
491 Json(serde_json::json!({ "data": response }))
492 }
493 None => Json(serde_json::json!({
494 "error": format!("Contract '{}' not found", contract_id)
495 })),
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502
503 #[test]
504 fn test_protocol_type_serialization() {
505 assert_eq!(serde_json::to_string(&ProtocolType::Grpc).unwrap(), "\"grpc\"");
506 assert_eq!(serde_json::to_string(&ProtocolType::Websocket).unwrap(), "\"websocket\"");
507 assert_eq!(serde_json::to_string(&ProtocolType::Mqtt).unwrap(), "\"mqtt\"");
508 assert_eq!(serde_json::to_string(&ProtocolType::Kafka).unwrap(), "\"kafka\"");
509 }
510
511 #[tokio::test]
512 async fn test_protocol_contracts_state_new() {
513 let state = ProtocolContractsState::new();
514 let contracts = state.contracts.read().await;
516 assert!(contracts.is_empty());
517 }
518}