Skip to main content

mockforge_import/import/
asyncapi_import.rs

1//! AsyncAPI specification import functionality
2//!
3//! This module handles parsing AsyncAPI 2.x/3.x specifications and converting them
4//! to MockForge WebSocket, MQTT, Kafka, and AMQP configurations.
5
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8use std::collections::HashMap;
9
10/// Import result for AsyncAPI specs
11#[derive(Debug)]
12pub struct AsyncApiImportResult {
13    /// Converted channels from the AsyncAPI spec
14    pub channels: Vec<MockForgeChannel>,
15    /// Warnings encountered during import
16    pub warnings: Vec<String>,
17    /// Extracted specification metadata
18    pub spec_info: AsyncApiSpecInfo,
19}
20
21/// MockForge channel structure for AsyncAPI import
22#[derive(Debug, Serialize)]
23pub struct MockForgeChannel {
24    /// Protocol used by this channel
25    pub protocol: ChannelProtocol,
26    /// Channel name
27    pub name: String,
28    /// Channel path/endpoint
29    pub path: String,
30    /// Optional channel description
31    pub description: Option<String>,
32    /// Operations available on this channel
33    pub operations: Vec<ChannelOperation>,
34}
35
36/// Channel protocol type
37#[derive(Debug, Serialize, Clone)]
38#[serde(rename_all = "lowercase")]
39pub enum ChannelProtocol {
40    /// WebSocket protocol
41    Websocket,
42    /// MQTT protocol
43    Mqtt,
44    /// Kafka protocol
45    Kafka,
46    /// AMQP protocol
47    Amqp,
48}
49
50/// Channel operation (subscribe/publish)
51#[derive(Debug, Serialize)]
52pub struct ChannelOperation {
53    /// Type of operation (subscribe or publish)
54    pub operation_type: OperationType,
55    /// JSON schema for messages
56    pub message_schema: Option<Value>,
57    /// Example message payload
58    pub example_message: Option<Value>,
59}
60
61/// Operation type for channels
62#[derive(Debug, Serialize)]
63#[serde(rename_all = "lowercase")]
64pub enum OperationType {
65    /// Subscribe to messages
66    Subscribe,
67    /// Publish messages
68    Publish,
69}
70
71/// AsyncAPI specification metadata
72#[derive(Debug)]
73pub struct AsyncApiSpecInfo {
74    /// Specification title
75    pub title: String,
76    /// Specification version
77    pub version: String,
78    /// Optional specification description
79    pub description: Option<String>,
80    /// AsyncAPI version used
81    pub asyncapi_version: String,
82    /// List of server URLs
83    pub servers: Vec<String>,
84}
85
86/// AsyncAPI 2.x/3.x specification structure (simplified)
87#[derive(Debug, Deserialize)]
88struct AsyncApiSpec {
89    asyncapi: String,
90    info: AsyncApiInfo,
91    servers: Option<HashMap<String, AsyncApiServer>>,
92    channels: Option<HashMap<String, AsyncApiChannel>>,
93}
94
95#[derive(Debug, Deserialize)]
96struct AsyncApiInfo {
97    title: String,
98    version: String,
99    description: Option<String>,
100}
101
102#[derive(Debug, Deserialize)]
103struct AsyncApiServer {
104    url: String,
105    protocol: String,
106    #[serde(rename = "protocolVersion")]
107    #[allow(dead_code)]
108    protocol_version: Option<String>,
109}
110
111#[derive(Debug, Deserialize)]
112struct AsyncApiChannel {
113    description: Option<String>,
114    subscribe: Option<AsyncApiOperation>,
115    publish: Option<AsyncApiOperation>,
116}
117
118#[derive(Debug, Deserialize)]
119struct AsyncApiOperation {
120    message: Option<AsyncApiMessage>,
121}
122
123#[derive(Debug, Deserialize)]
124struct AsyncApiMessage {
125    payload: Option<Value>,
126    examples: Option<Vec<Value>>,
127}
128
129/// Import an AsyncAPI specification
130pub fn import_asyncapi_spec(
131    content: &str,
132    _base_url: Option<&str>,
133) -> Result<AsyncApiImportResult, String> {
134    // Try parsing as JSON first, then YAML
135    let spec: AsyncApiSpec = serde_json::from_str(content)
136        .or_else(|_| {
137            serde_yaml::from_str(content)
138                .map_err(|e| format!("Failed to parse AsyncAPI spec: {}", e))
139        })
140        .map_err(|e| format!("Failed to parse AsyncAPI spec: {}", e))?;
141
142    // Validate AsyncAPI version
143    if !spec.asyncapi.starts_with("2.") && !spec.asyncapi.starts_with("3.") {
144        return Err(format!(
145            "Unsupported AsyncAPI version: {}. Only 2.x and 3.x are supported.",
146            spec.asyncapi
147        ));
148    }
149
150    // Extract spec info
151    let servers = spec
152        .servers
153        .as_ref()
154        .map(|s| {
155            s.values()
156                .map(|server| format!("{}://{}", server.protocol, server.url))
157                .collect()
158        })
159        .unwrap_or_default();
160
161    let spec_info = AsyncApiSpecInfo {
162        title: spec.info.title.clone(),
163        version: spec.info.version.clone(),
164        description: spec.info.description.clone(),
165        asyncapi_version: spec.asyncapi.clone(),
166        servers,
167    };
168
169    let mut channels = Vec::new();
170    let mut warnings = Vec::new();
171
172    // Process channels
173    if let Some(channel_map) = spec.channels {
174        for (channel_name, channel_spec) in channel_map {
175            match convert_channel_to_mockforge(&channel_name, &channel_spec, &spec.servers) {
176                Ok(channel) => channels.push(channel),
177                Err(e) => {
178                    warnings.push(format!("Failed to convert channel '{}': {}", channel_name, e))
179                }
180            }
181        }
182    }
183
184    Ok(AsyncApiImportResult {
185        channels,
186        warnings,
187        spec_info,
188    })
189}
190
191/// Convert AsyncAPI channel to MockForge channel
192fn convert_channel_to_mockforge(
193    channel_name: &str,
194    channel_spec: &AsyncApiChannel,
195    servers: &Option<HashMap<String, AsyncApiServer>>,
196) -> Result<MockForgeChannel, String> {
197    // Determine protocol from first server or default to WebSocket
198    let protocol = servers
199        .as_ref()
200        .and_then(|s| s.values().next())
201        .map(|server| match server.protocol.to_lowercase().as_str() {
202            "ws" | "wss" | "websocket" => ChannelProtocol::Websocket,
203            "mqtt" | "mqtts" => ChannelProtocol::Mqtt,
204            "kafka" | "kafka-secure" => ChannelProtocol::Kafka,
205            "amqp" | "amqps" => ChannelProtocol::Amqp,
206            _ => ChannelProtocol::Websocket,
207        })
208        .unwrap_or(ChannelProtocol::Websocket);
209
210    let mut operations = Vec::new();
211
212    // Process subscribe operation
213    if let Some(subscribe) = &channel_spec.subscribe {
214        let message_schema = subscribe.message.as_ref().and_then(|m| m.payload.clone());
215
216        let example_message = subscribe
217            .message
218            .as_ref()
219            .and_then(|m| m.examples.as_ref())
220            .and_then(|examples| examples.first().cloned());
221
222        operations.push(ChannelOperation {
223            operation_type: OperationType::Subscribe,
224            message_schema,
225            example_message,
226        });
227    }
228
229    // Process publish operation
230    if let Some(publish) = &channel_spec.publish {
231        let message_schema = publish.message.as_ref().and_then(|m| m.payload.clone());
232
233        let example_message = publish
234            .message
235            .as_ref()
236            .and_then(|m| m.examples.as_ref())
237            .and_then(|examples| examples.first().cloned());
238
239        operations.push(ChannelOperation {
240            operation_type: OperationType::Publish,
241            message_schema,
242            example_message,
243        });
244    }
245
246    Ok(MockForgeChannel {
247        protocol,
248        name: channel_name.to_string(),
249        path: format!("/{}", channel_name.trim_start_matches('/')),
250        description: channel_spec.description.clone(),
251        operations,
252    })
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    #[test]
260    fn test_parse_asyncapi_2() {
261        let spec = r#"
262        {
263            "asyncapi": "2.6.0",
264            "info": {
265                "title": "Test API",
266                "version": "1.0.0",
267                "description": "Test AsyncAPI spec"
268            },
269            "servers": {
270                "production": {
271                    "url": "localhost:1883",
272                    "protocol": "mqtt"
273                }
274            },
275            "channels": {
276                "sensors/temperature": {
277                    "description": "Temperature sensor data",
278                    "publish": {
279                        "message": {
280                            "payload": {
281                                "type": "object",
282                                "properties": {
283                                    "temperature": { "type": "number" },
284                                    "unit": { "type": "string" }
285                                }
286                            }
287                        }
288                    }
289                }
290            }
291        }
292        "#;
293
294        let result = import_asyncapi_spec(spec, None);
295        assert!(result.is_ok());
296
297        let import_result = result.unwrap();
298        assert_eq!(import_result.spec_info.title, "Test API");
299        assert_eq!(import_result.spec_info.version, "1.0.0");
300        assert_eq!(import_result.channels.len(), 1);
301    }
302
303    #[test]
304    fn test_parse_asyncapi_3() {
305        let spec = r#"
306        {
307            "asyncapi": "3.0.0",
308            "info": {
309                "title": "WebSocket API",
310                "version": "1.0.0"
311            },
312            "servers": {
313                "development": {
314                    "url": "ws://localhost:8080",
315                    "protocol": "ws"
316                }
317            },
318            "channels": {
319                "chat/messages": {
320                    "description": "Chat messages",
321                    "subscribe": {
322                        "message": {
323                            "payload": {
324                                "type": "object",
325                                "properties": {
326                                    "message": { "type": "string" },
327                                    "sender": { "type": "string" }
328                                }
329                            }
330                        }
331                    }
332                }
333            }
334        }
335        "#;
336
337        let result = import_asyncapi_spec(spec, None);
338        assert!(result.is_ok());
339
340        let import_result = result.unwrap();
341        assert_eq!(import_result.spec_info.title, "WebSocket API");
342        assert_eq!(import_result.channels.len(), 1);
343    }
344}