Skip to main content

tork_openapi/
asyncapi.rs

1//! The `AsyncApi` builder and AsyncAPI document assembly.
2//!
3//! Describes the event-driven side of the API: Server-Sent Events streams and
4//! WebSocket connections become AsyncAPI channels, with their message payloads
5//! recorded under `components.schemas`.
6
7use std::sync::Arc;
8
9use bytes::Bytes;
10use serde_json::{json, Map, Value};
11
12use tork_core::constants::APPLICATION_JSON;
13use tork_core::{
14    bytes_response, AsyncApiProvider, BoxFuture, HandlerFn, Method, RequestContext, Response,
15    Result, Route, StatusCode,
16};
17
18use crate::spec::sanitize_doc_text;
19
20/// AsyncAPI specification version emitted by the document.
21const ASYNCAPI_VERSION: &str = "3.0.0";
22/// Default path at which the document is served.
23const DEFAULT_JSON_PATH: &str = "/asyncapi.json";
24
25/// Configures AsyncAPI document generation.
26///
27/// The document describes each Server-Sent Events stream and WebSocket route as a
28/// channel, with message payload schemas under `components.schemas`.
29pub struct AsyncApi {
30    title: String,
31    version: String,
32    description: Option<String>,
33    json_path: String,
34}
35
36impl Default for AsyncApi {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42impl AsyncApi {
43    /// Creates a builder with default title, version, and document path.
44    pub fn new() -> Self {
45        Self {
46            title: "API".to_owned(),
47            version: "0.1.0".to_owned(),
48            description: None,
49            json_path: DEFAULT_JSON_PATH.to_owned(),
50        }
51    }
52
53    /// Sets the API title.
54    pub fn title(mut self, title: impl Into<String>) -> Self {
55        self.title = title.into();
56        self
57    }
58
59    /// Sets the API version.
60    pub fn version(mut self, version: impl Into<String>) -> Self {
61        self.version = version.into();
62        self
63    }
64
65    /// Sets the API description.
66    pub fn description(mut self, description: impl Into<String>) -> Self {
67        self.description = Some(description.into());
68        self
69    }
70
71    /// Sets the path at which the document is served.
72    pub fn json(mut self, path: impl Into<String>) -> Self {
73        self.json_path = path.into();
74        self
75    }
76
77    /// Builds the AsyncAPI document for the given routes as a JSON value.
78    pub fn build_document(&self, routes: &[Route]) -> Value {
79        build_document(self, routes)
80    }
81}
82
83impl AsyncApiProvider for AsyncApi {
84    fn documentation_routes(&self, registered: &[Route]) -> Vec<Route> {
85        let document = build_document(self, registered);
86        let body = serde_json::to_vec(&document).unwrap_or_default();
87        vec![spec_route(&self.json_path, Bytes::from(body))]
88    }
89}
90
91/// Builds a route that serves a pre-serialized document at `path`.
92fn spec_route(path: &str, body: Bytes) -> Route {
93    let handler: HandlerFn = Arc::new(
94        move |_ctx: RequestContext| -> BoxFuture<'static, Result<Response>> {
95            let body = body.clone();
96            Box::pin(async move { Ok(bytes_response(StatusCode::OK, APPLICATION_JSON, body)) })
97        },
98    );
99
100    Route::new(Method::GET, path.to_owned(), handler).summary("AsyncAPI specification")
101}
102
103/// Assembles the AsyncAPI document from the route table.
104fn build_document(api: &AsyncApi, routes: &[Route]) -> Value {
105    // OpenAPI 3 settings make `$ref`s point at `#/components/schemas/...`, which is
106    // also where AsyncAPI 3 keeps reusable schemas.
107    let mut generator = schemars::generate::SchemaSettings::openapi3().into_generator();
108    let mut channels: Map<String, Value> = Map::new();
109    let mut operations: Map<String, Value> = Map::new();
110
111    for route in routes {
112        let meta = route.meta();
113        if !(meta.streaming || meta.websocket) {
114            continue;
115        }
116
117        let path = route.path();
118        let name = channel_name(path);
119        let channel_ref = json!({ "$ref": format!("#/channels/{name}") });
120        let mut messages: Map<String, Value> = Map::new();
121
122        if meta.streaming {
123            if let Some(thunk) = meta.response_schema {
124                let payload = thunk(&mut generator).as_value().clone();
125                messages.insert("data".to_owned(), json!({ "payload": payload }));
126            }
127            operations.insert(
128                format!("{name}_send"),
129                json!({ "action": "send", "channel": channel_ref }),
130            );
131        } else {
132            // WebSocket: receive the incoming message, send the outgoing one.
133            if let Some(thunk) = meta.ws_incoming {
134                let payload = thunk(&mut generator).as_value().clone();
135                messages.insert("incoming".to_owned(), json!({ "payload": payload }));
136                operations.insert(
137                    format!("{name}_receive"),
138                    json!({
139                        "action": "receive",
140                        "channel": channel_ref,
141                        "messages": [{ "$ref": format!("#/channels/{name}/messages/incoming") }],
142                    }),
143                );
144            }
145            if let Some(thunk) = meta.ws_outgoing {
146                let payload = thunk(&mut generator).as_value().clone();
147                messages.insert("outgoing".to_owned(), json!({ "payload": payload }));
148                operations.insert(
149                    format!("{name}_send"),
150                    json!({
151                        "action": "send",
152                        "channel": channel_ref,
153                        "messages": [{ "$ref": format!("#/channels/{name}/messages/outgoing") }],
154                    }),
155                );
156            }
157        }
158
159        channels.insert(
160            name,
161            json!({ "address": path, "messages": Value::Object(messages) }),
162        );
163    }
164
165    let mut info = Map::new();
166    info.insert("title".to_owned(), json!(sanitize_doc_text(&api.title)));
167    info.insert("version".to_owned(), json!(api.version));
168    if let Some(description) = &api.description {
169        info.insert(
170            "description".to_owned(),
171            json!(sanitize_doc_text(description)),
172        );
173    }
174
175    let mut document = json!({
176        "asyncapi": ASYNCAPI_VERSION,
177        "info": Value::Object(info),
178        "channels": Value::Object(channels),
179        "operations": Value::Object(operations),
180    });
181
182    let definitions = generator.take_definitions(true);
183    if !definitions.is_empty() {
184        document["components"] = json!({ "schemas": Value::Object(definitions) });
185    }
186
187    document
188}
189
190/// Derives a stable channel name from a path, e.g. `/chat/{room}` -> `chat_room`.
191fn channel_name(path: &str) -> String {
192    let mut name = String::new();
193    for segment in path.split('/').filter(|segment| !segment.is_empty()) {
194        if !name.is_empty() {
195            name.push('_');
196        }
197        for ch in segment.chars() {
198            if ch.is_ascii_alphanumeric() {
199                name.push(ch);
200            } else if ch != '{' && ch != '}' {
201                name.push('_');
202            }
203        }
204    }
205    if name.is_empty() {
206        "root".to_owned()
207    } else {
208        name
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use bytes::Bytes;
216    use std::sync::Arc;
217    use tork_core::{RequestContext, Response};
218
219    #[derive(schemars::JsonSchema)]
220    #[allow(dead_code)]
221    struct ChatIn {
222        message: String,
223    }
224
225    #[derive(schemars::JsonSchema)]
226    #[allow(dead_code)]
227    struct ChatOut {
228        text: String,
229    }
230
231    #[derive(schemars::JsonSchema)]
232    #[allow(dead_code)]
233    struct Tick {
234        n: i64,
235    }
236
237    fn dummy_handler() -> HandlerFn {
238        Arc::new(
239            |_ctx: RequestContext| -> BoxFuture<'static, Result<Response>> {
240                Box::pin(async {
241                    Ok(bytes_response(
242                        StatusCode::OK,
243                        APPLICATION_JSON,
244                        Bytes::new(),
245                    ))
246                })
247            },
248        )
249    }
250
251    #[test]
252    fn documents_sse_and_websocket_channels() {
253        let routes = vec![
254            Route::new(Method::GET, "/events", dummy_handler())
255                .response_schema::<Tick>()
256                .streaming(),
257            Route::new(Method::GET, "/chat/{room}", dummy_handler())
258                .websocket()
259                .ws_incoming::<ChatIn>()
260                .ws_outgoing::<ChatOut>(),
261        ];
262
263        let document = AsyncApi::new().build_document(&routes);
264
265        assert_eq!(document["asyncapi"], "3.0.0");
266        // The SSE channel carries a data message.
267        assert_eq!(document["channels"]["events"]["address"], "/events");
268        assert!(document["channels"]["events"]["messages"]["data"].is_object());
269        // The WebSocket channel carries incoming and outgoing messages.
270        assert_eq!(document["channels"]["chat_room"]["address"], "/chat/{room}");
271        assert!(document["channels"]["chat_room"]["messages"]["incoming"].is_object());
272        assert!(document["channels"]["chat_room"]["messages"]["outgoing"].is_object());
273        // Operations describe the direction of each channel.
274        assert_eq!(
275            document["operations"]["chat_room_receive"]["action"],
276            "receive"
277        );
278        assert_eq!(document["operations"]["chat_room_send"]["action"], "send");
279        // Message payloads are registered as component schemas.
280        assert!(document["components"]["schemas"]["ChatIn"].is_object());
281        assert!(document["components"]["schemas"]["ChatOut"].is_object());
282        assert!(document["components"]["schemas"]["Tick"].is_object());
283    }
284
285    #[test]
286    fn ignores_non_realtime_routes_and_omits_empty_components() {
287        let routes = vec![Route::new(Method::GET, "/ping", dummy_handler())];
288
289        let document = AsyncApi::new().build_document(&routes);
290
291        assert!(document["channels"].as_object().unwrap().is_empty());
292        assert!(document["operations"].as_object().unwrap().is_empty());
293        assert!(document.get("components").is_none());
294    }
295
296    #[test]
297    fn documents_one_sided_stream_and_websocket_channels() {
298        let routes = vec![
299            Route::new(Method::GET, "/ticks", dummy_handler()).streaming(),
300            Route::new(Method::GET, "/in/{room}", dummy_handler())
301                .websocket()
302                .ws_incoming::<serde_json::Value>(),
303            Route::new(Method::GET, "/out/{room}", dummy_handler())
304                .websocket()
305                .ws_outgoing::<serde_json::Value>(),
306        ];
307
308        let document = AsyncApi::new().build_document(&routes);
309
310        assert_eq!(document["channels"]["ticks"]["address"], "/ticks");
311        assert!(document["channels"]["ticks"]["messages"]["data"].is_null());
312        assert!(document["channels"]["in_room"]["messages"]["incoming"].is_object());
313        assert!(document["channels"]["in_room"]["messages"]
314            .get("outgoing")
315            .is_none());
316        assert!(document["channels"]["out_room"]["messages"]["outgoing"].is_object());
317        assert!(document["channels"]["out_room"]["messages"]
318            .get("incoming")
319            .is_none());
320        assert_eq!(
321            document["operations"]["in_room_receive"]["action"],
322            "receive"
323        );
324        assert_eq!(document["operations"]["out_room_send"]["action"], "send");
325    }
326
327    #[test]
328    fn provider_registers_custom_json_route() {
329        let provider = AsyncApi::new()
330            .title("Realtime")
331            .version("2.0.0")
332            .json("/events.json");
333
334        let routes = provider.documentation_routes(&[]);
335
336        assert_eq!(routes.len(), 1);
337        assert_eq!(routes[0].path(), "/events.json");
338        assert_eq!(routes[0].method(), Method::GET);
339    }
340
341    #[test]
342    fn build_document_keeps_custom_info_fields() {
343        let routes = vec![Route::new(Method::GET, "/events", dummy_handler()).streaming()];
344        let document = AsyncApi::new()
345            .title("Realtime")
346            .version("2.0.0")
347            .description("Event stream docs")
348            .build_document(&routes);
349
350        assert_eq!(document["info"]["title"], "Realtime");
351        assert_eq!(document["info"]["version"], "2.0.0");
352        assert_eq!(document["info"]["description"], "Event stream docs");
353        assert_eq!(document["channels"]["events"]["address"], "/events");
354    }
355
356    #[test]
357    fn channel_name_covers_root_and_placeholders() {
358        assert_eq!(channel_name("/"), "root");
359        assert_eq!(channel_name("/chat/{room}/members"), "chat_room_members");
360    }
361
362    #[test]
363    fn build_document_sanitizes_info_text_fields() {
364        let routes = vec![Route::new(Method::GET, "/events", dummy_handler()).streaming()];
365        let document = AsyncApi::new()
366            .title("Realtime <unsafe>")
367            .description("Event\u{0001}`docs`")
368            .build_document(&routes);
369
370        assert_eq!(document["info"]["title"], "Realtime &lt;unsafe&gt;");
371        assert_eq!(document["info"]["description"], "Event &#x60;docs&#x60;");
372    }
373}