1use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::collections::HashMap;
9
10#[derive(Debug)]
12pub struct AsyncApiImportResult {
13 pub channels: Vec<MockForgeChannel>,
15 pub warnings: Vec<String>,
17 pub spec_info: AsyncApiSpecInfo,
19}
20
21#[derive(Debug, Serialize)]
23pub struct MockForgeChannel {
24 pub protocol: ChannelProtocol,
26 pub name: String,
28 pub path: String,
30 pub description: Option<String>,
32 pub operations: Vec<ChannelOperation>,
34}
35
36#[derive(Debug, Serialize, Clone)]
38#[serde(rename_all = "lowercase")]
39pub enum ChannelProtocol {
40 Websocket,
42 Mqtt,
44 Kafka,
46 Amqp,
48}
49
50#[derive(Debug, Serialize)]
52pub struct ChannelOperation {
53 pub operation_type: OperationType,
55 pub message_schema: Option<Value>,
57 pub example_message: Option<Value>,
59}
60
61#[derive(Debug, Serialize)]
63#[serde(rename_all = "lowercase")]
64pub enum OperationType {
65 Subscribe,
67 Publish,
69}
70
71#[derive(Debug)]
73pub struct AsyncApiSpecInfo {
74 pub title: String,
76 pub version: String,
78 pub description: Option<String>,
80 pub asyncapi_version: String,
82 pub servers: Vec<String>,
84}
85
86#[derive(Debug, Deserialize)]
88struct AsyncApiSpec {
89 asyncapi: String,
90 info: AsyncApiInfo,
91 servers: Option<HashMap<String, AsyncApiServer>>,
92 channels: Option<HashMap<String, AsyncApiChannel>>,
93}
94
95#[derive(Debug, Deserialize)]
96struct AsyncApiInfo {
97 title: String,
98 version: String,
99 description: Option<String>,
100}
101
102#[derive(Debug, Deserialize)]
103struct AsyncApiServer {
104 url: String,
105 protocol: String,
106 #[serde(rename = "protocolVersion")]
107 #[allow(dead_code)]
108 protocol_version: Option<String>,
109}
110
111#[derive(Debug, Deserialize)]
112struct AsyncApiChannel {
113 description: Option<String>,
114 subscribe: Option<AsyncApiOperation>,
115 publish: Option<AsyncApiOperation>,
116}
117
118#[derive(Debug, Deserialize)]
119struct AsyncApiOperation {
120 message: Option<AsyncApiMessage>,
121}
122
123#[derive(Debug, Deserialize)]
124struct AsyncApiMessage {
125 payload: Option<Value>,
126 examples: Option<Vec<Value>>,
127}
128
129pub fn import_asyncapi_spec(
131 content: &str,
132 _base_url: Option<&str>,
133) -> Result<AsyncApiImportResult, String> {
134 let spec: AsyncApiSpec = serde_json::from_str(content)
136 .or_else(|_| {
137 serde_yaml::from_str(content)
138 .map_err(|e| format!("Failed to parse AsyncAPI spec: {}", e))
139 })
140 .map_err(|e| format!("Failed to parse AsyncAPI spec: {}", e))?;
141
142 if !spec.asyncapi.starts_with("2.") && !spec.asyncapi.starts_with("3.") {
144 return Err(format!(
145 "Unsupported AsyncAPI version: {}. Only 2.x and 3.x are supported.",
146 spec.asyncapi
147 ));
148 }
149
150 let servers = spec
152 .servers
153 .as_ref()
154 .map(|s| {
155 s.values()
156 .map(|server| format!("{}://{}", server.protocol, server.url))
157 .collect()
158 })
159 .unwrap_or_default();
160
161 let spec_info = AsyncApiSpecInfo {
162 title: spec.info.title.clone(),
163 version: spec.info.version.clone(),
164 description: spec.info.description.clone(),
165 asyncapi_version: spec.asyncapi.clone(),
166 servers,
167 };
168
169 let mut channels = Vec::new();
170 let mut warnings = Vec::new();
171
172 if let Some(channel_map) = spec.channels {
174 for (channel_name, channel_spec) in channel_map {
175 match convert_channel_to_mockforge(&channel_name, &channel_spec, &spec.servers) {
176 Ok(channel) => channels.push(channel),
177 Err(e) => {
178 warnings.push(format!("Failed to convert channel '{}': {}", channel_name, e))
179 }
180 }
181 }
182 }
183
184 Ok(AsyncApiImportResult {
185 channels,
186 warnings,
187 spec_info,
188 })
189}
190
191fn convert_channel_to_mockforge(
193 channel_name: &str,
194 channel_spec: &AsyncApiChannel,
195 servers: &Option<HashMap<String, AsyncApiServer>>,
196) -> Result<MockForgeChannel, String> {
197 let protocol = servers
199 .as_ref()
200 .and_then(|s| s.values().next())
201 .map(|server| match server.protocol.to_lowercase().as_str() {
202 "ws" | "wss" | "websocket" => ChannelProtocol::Websocket,
203 "mqtt" | "mqtts" => ChannelProtocol::Mqtt,
204 "kafka" | "kafka-secure" => ChannelProtocol::Kafka,
205 "amqp" | "amqps" => ChannelProtocol::Amqp,
206 _ => ChannelProtocol::Websocket,
207 })
208 .unwrap_or(ChannelProtocol::Websocket);
209
210 let mut operations = Vec::new();
211
212 if let Some(subscribe) = &channel_spec.subscribe {
214 let message_schema = subscribe.message.as_ref().and_then(|m| m.payload.clone());
215
216 let example_message = subscribe
217 .message
218 .as_ref()
219 .and_then(|m| m.examples.as_ref())
220 .and_then(|examples| examples.first().cloned());
221
222 operations.push(ChannelOperation {
223 operation_type: OperationType::Subscribe,
224 message_schema,
225 example_message,
226 });
227 }
228
229 if let Some(publish) = &channel_spec.publish {
231 let message_schema = publish.message.as_ref().and_then(|m| m.payload.clone());
232
233 let example_message = publish
234 .message
235 .as_ref()
236 .and_then(|m| m.examples.as_ref())
237 .and_then(|examples| examples.first().cloned());
238
239 operations.push(ChannelOperation {
240 operation_type: OperationType::Publish,
241 message_schema,
242 example_message,
243 });
244 }
245
246 Ok(MockForgeChannel {
247 protocol,
248 name: channel_name.to_string(),
249 path: format!("/{}", channel_name.trim_start_matches('/')),
250 description: channel_spec.description.clone(),
251 operations,
252 })
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258
259 #[test]
260 fn test_parse_asyncapi_2() {
261 let spec = r#"
262 {
263 "asyncapi": "2.6.0",
264 "info": {
265 "title": "Test API",
266 "version": "1.0.0",
267 "description": "Test AsyncAPI spec"
268 },
269 "servers": {
270 "production": {
271 "url": "localhost:1883",
272 "protocol": "mqtt"
273 }
274 },
275 "channels": {
276 "sensors/temperature": {
277 "description": "Temperature sensor data",
278 "publish": {
279 "message": {
280 "payload": {
281 "type": "object",
282 "properties": {
283 "temperature": { "type": "number" },
284 "unit": { "type": "string" }
285 }
286 }
287 }
288 }
289 }
290 }
291 }
292 "#;
293
294 let result = import_asyncapi_spec(spec, None);
295 assert!(result.is_ok());
296
297 let import_result = result.unwrap();
298 assert_eq!(import_result.spec_info.title, "Test API");
299 assert_eq!(import_result.spec_info.version, "1.0.0");
300 assert_eq!(import_result.channels.len(), 1);
301 }
302
303 #[test]
304 fn test_parse_asyncapi_3() {
305 let spec = r#"
306 {
307 "asyncapi": "3.0.0",
308 "info": {
309 "title": "WebSocket API",
310 "version": "1.0.0"
311 },
312 "servers": {
313 "development": {
314 "url": "ws://localhost:8080",
315 "protocol": "ws"
316 }
317 },
318 "channels": {
319 "chat/messages": {
320 "description": "Chat messages",
321 "subscribe": {
322 "message": {
323 "payload": {
324 "type": "object",
325 "properties": {
326 "message": { "type": "string" },
327 "sender": { "type": "string" }
328 }
329 }
330 }
331 }
332 }
333 }
334 }
335 "#;
336
337 let result = import_asyncapi_spec(spec, None);
338 assert!(result.is_ok());
339
340 let import_result = result.unwrap();
341 assert_eq!(import_result.spec_info.title, "WebSocket API");
342 assert_eq!(import_result.channels.len(), 1);
343 }
344}