1use std::sync::Arc;
8
9use bytes::Bytes;
10use serde_json::{json, Map, Value};
11
12use tork_core::constants::APPLICATION_JSON;
13use tork_core::{
14 bytes_response, AsyncApiProvider, BoxFuture, HandlerFn, Method, RequestContext, Response,
15 Result, Route, StatusCode,
16};
17
18use crate::spec::sanitize_doc_text;
19
20const ASYNCAPI_VERSION: &str = "3.0.0";
22const DEFAULT_JSON_PATH: &str = "/asyncapi.json";
24
25pub struct AsyncApi {
30 title: String,
31 version: String,
32 description: Option<String>,
33 json_path: String,
34}
35
36impl Default for AsyncApi {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42impl AsyncApi {
43 pub fn new() -> Self {
45 Self {
46 title: "API".to_owned(),
47 version: "0.1.0".to_owned(),
48 description: None,
49 json_path: DEFAULT_JSON_PATH.to_owned(),
50 }
51 }
52
53 pub fn title(mut self, title: impl Into<String>) -> Self {
55 self.title = title.into();
56 self
57 }
58
59 pub fn version(mut self, version: impl Into<String>) -> Self {
61 self.version = version.into();
62 self
63 }
64
65 pub fn description(mut self, description: impl Into<String>) -> Self {
67 self.description = Some(description.into());
68 self
69 }
70
71 pub fn json(mut self, path: impl Into<String>) -> Self {
73 self.json_path = path.into();
74 self
75 }
76
77 pub fn build_document(&self, routes: &[Route]) -> Value {
79 build_document(self, routes)
80 }
81}
82
83impl AsyncApiProvider for AsyncApi {
84 fn documentation_routes(&self, registered: &[Route]) -> Vec<Route> {
85 let document = build_document(self, registered);
86 let body = serde_json::to_vec(&document).unwrap_or_default();
87 vec![spec_route(&self.json_path, Bytes::from(body))]
88 }
89}
90
91fn spec_route(path: &str, body: Bytes) -> Route {
93 let handler: HandlerFn = Arc::new(
94 move |_ctx: RequestContext| -> BoxFuture<'static, Result<Response>> {
95 let body = body.clone();
96 Box::pin(async move { Ok(bytes_response(StatusCode::OK, APPLICATION_JSON, body)) })
97 },
98 );
99
100 Route::new(Method::GET, path.to_owned(), handler).summary("AsyncAPI specification")
101}
102
103fn build_document(api: &AsyncApi, routes: &[Route]) -> Value {
105 let mut generator = schemars::generate::SchemaSettings::openapi3().into_generator();
108 let mut channels: Map<String, Value> = Map::new();
109 let mut operations: Map<String, Value> = Map::new();
110
111 for route in routes {
112 let meta = route.meta();
113 if !(meta.streaming || meta.websocket) {
114 continue;
115 }
116
117 let path = route.path();
118 let name = channel_name(path);
119 let channel_ref = json!({ "$ref": format!("#/channels/{name}") });
120 let mut messages: Map<String, Value> = Map::new();
121
122 if meta.streaming {
123 if let Some(thunk) = meta.response_schema {
124 let payload = thunk(&mut generator).as_value().clone();
125 messages.insert("data".to_owned(), json!({ "payload": payload }));
126 }
127 operations.insert(
128 format!("{name}_send"),
129 json!({ "action": "send", "channel": channel_ref }),
130 );
131 } else {
132 if let Some(thunk) = meta.ws_incoming {
134 let payload = thunk(&mut generator).as_value().clone();
135 messages.insert("incoming".to_owned(), json!({ "payload": payload }));
136 operations.insert(
137 format!("{name}_receive"),
138 json!({
139 "action": "receive",
140 "channel": channel_ref,
141 "messages": [{ "$ref": format!("#/channels/{name}/messages/incoming") }],
142 }),
143 );
144 }
145 if let Some(thunk) = meta.ws_outgoing {
146 let payload = thunk(&mut generator).as_value().clone();
147 messages.insert("outgoing".to_owned(), json!({ "payload": payload }));
148 operations.insert(
149 format!("{name}_send"),
150 json!({
151 "action": "send",
152 "channel": channel_ref,
153 "messages": [{ "$ref": format!("#/channels/{name}/messages/outgoing") }],
154 }),
155 );
156 }
157 }
158
159 channels.insert(
160 name,
161 json!({ "address": path, "messages": Value::Object(messages) }),
162 );
163 }
164
165 let mut info = Map::new();
166 info.insert("title".to_owned(), json!(sanitize_doc_text(&api.title)));
167 info.insert("version".to_owned(), json!(api.version));
168 if let Some(description) = &api.description {
169 info.insert(
170 "description".to_owned(),
171 json!(sanitize_doc_text(description)),
172 );
173 }
174
175 let mut document = json!({
176 "asyncapi": ASYNCAPI_VERSION,
177 "info": Value::Object(info),
178 "channels": Value::Object(channels),
179 "operations": Value::Object(operations),
180 });
181
182 let definitions = generator.take_definitions(true);
183 if !definitions.is_empty() {
184 document["components"] = json!({ "schemas": Value::Object(definitions) });
185 }
186
187 document
188}
189
190fn channel_name(path: &str) -> String {
192 let mut name = String::new();
193 for segment in path.split('/').filter(|segment| !segment.is_empty()) {
194 if !name.is_empty() {
195 name.push('_');
196 }
197 for ch in segment.chars() {
198 if ch.is_ascii_alphanumeric() {
199 name.push(ch);
200 } else if ch != '{' && ch != '}' {
201 name.push('_');
202 }
203 }
204 }
205 if name.is_empty() {
206 "root".to_owned()
207 } else {
208 name
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use bytes::Bytes;
216 use std::sync::Arc;
217 use tork_core::{RequestContext, Response};
218
219 #[derive(schemars::JsonSchema)]
220 #[allow(dead_code)]
221 struct ChatIn {
222 message: String,
223 }
224
225 #[derive(schemars::JsonSchema)]
226 #[allow(dead_code)]
227 struct ChatOut {
228 text: String,
229 }
230
231 #[derive(schemars::JsonSchema)]
232 #[allow(dead_code)]
233 struct Tick {
234 n: i64,
235 }
236
237 fn dummy_handler() -> HandlerFn {
238 Arc::new(
239 |_ctx: RequestContext| -> BoxFuture<'static, Result<Response>> {
240 Box::pin(async {
241 Ok(bytes_response(
242 StatusCode::OK,
243 APPLICATION_JSON,
244 Bytes::new(),
245 ))
246 })
247 },
248 )
249 }
250
251 #[test]
252 fn documents_sse_and_websocket_channels() {
253 let routes = vec![
254 Route::new(Method::GET, "/events", dummy_handler())
255 .response_schema::<Tick>()
256 .streaming(),
257 Route::new(Method::GET, "/chat/{room}", dummy_handler())
258 .websocket()
259 .ws_incoming::<ChatIn>()
260 .ws_outgoing::<ChatOut>(),
261 ];
262
263 let document = AsyncApi::new().build_document(&routes);
264
265 assert_eq!(document["asyncapi"], "3.0.0");
266 assert_eq!(document["channels"]["events"]["address"], "/events");
268 assert!(document["channels"]["events"]["messages"]["data"].is_object());
269 assert_eq!(document["channels"]["chat_room"]["address"], "/chat/{room}");
271 assert!(document["channels"]["chat_room"]["messages"]["incoming"].is_object());
272 assert!(document["channels"]["chat_room"]["messages"]["outgoing"].is_object());
273 assert_eq!(
275 document["operations"]["chat_room_receive"]["action"],
276 "receive"
277 );
278 assert_eq!(document["operations"]["chat_room_send"]["action"], "send");
279 assert!(document["components"]["schemas"]["ChatIn"].is_object());
281 assert!(document["components"]["schemas"]["ChatOut"].is_object());
282 assert!(document["components"]["schemas"]["Tick"].is_object());
283 }
284
285 #[test]
286 fn ignores_non_realtime_routes_and_omits_empty_components() {
287 let routes = vec![Route::new(Method::GET, "/ping", dummy_handler())];
288
289 let document = AsyncApi::new().build_document(&routes);
290
291 assert!(document["channels"].as_object().unwrap().is_empty());
292 assert!(document["operations"].as_object().unwrap().is_empty());
293 assert!(document.get("components").is_none());
294 }
295
296 #[test]
297 fn documents_one_sided_stream_and_websocket_channels() {
298 let routes = vec![
299 Route::new(Method::GET, "/ticks", dummy_handler()).streaming(),
300 Route::new(Method::GET, "/in/{room}", dummy_handler())
301 .websocket()
302 .ws_incoming::<serde_json::Value>(),
303 Route::new(Method::GET, "/out/{room}", dummy_handler())
304 .websocket()
305 .ws_outgoing::<serde_json::Value>(),
306 ];
307
308 let document = AsyncApi::new().build_document(&routes);
309
310 assert_eq!(document["channels"]["ticks"]["address"], "/ticks");
311 assert!(document["channels"]["ticks"]["messages"]["data"].is_null());
312 assert!(document["channels"]["in_room"]["messages"]["incoming"].is_object());
313 assert!(document["channels"]["in_room"]["messages"]
314 .get("outgoing")
315 .is_none());
316 assert!(document["channels"]["out_room"]["messages"]["outgoing"].is_object());
317 assert!(document["channels"]["out_room"]["messages"]
318 .get("incoming")
319 .is_none());
320 assert_eq!(
321 document["operations"]["in_room_receive"]["action"],
322 "receive"
323 );
324 assert_eq!(document["operations"]["out_room_send"]["action"], "send");
325 }
326
327 #[test]
328 fn provider_registers_custom_json_route() {
329 let provider = AsyncApi::new()
330 .title("Realtime")
331 .version("2.0.0")
332 .json("/events.json");
333
334 let routes = provider.documentation_routes(&[]);
335
336 assert_eq!(routes.len(), 1);
337 assert_eq!(routes[0].path(), "/events.json");
338 assert_eq!(routes[0].method(), Method::GET);
339 }
340
341 #[test]
342 fn build_document_keeps_custom_info_fields() {
343 let routes = vec![Route::new(Method::GET, "/events", dummy_handler()).streaming()];
344 let document = AsyncApi::new()
345 .title("Realtime")
346 .version("2.0.0")
347 .description("Event stream docs")
348 .build_document(&routes);
349
350 assert_eq!(document["info"]["title"], "Realtime");
351 assert_eq!(document["info"]["version"], "2.0.0");
352 assert_eq!(document["info"]["description"], "Event stream docs");
353 assert_eq!(document["channels"]["events"]["address"], "/events");
354 }
355
356 #[test]
357 fn channel_name_covers_root_and_placeholders() {
358 assert_eq!(channel_name("/"), "root");
359 assert_eq!(channel_name("/chat/{room}/members"), "chat_room_members");
360 }
361
362 #[test]
363 fn build_document_sanitizes_info_text_fields() {
364 let routes = vec![Route::new(Method::GET, "/events", dummy_handler()).streaming()];
365 let document = AsyncApi::new()
366 .title("Realtime <unsafe>")
367 .description("Event\u{0001}`docs`")
368 .build_document(&routes);
369
370 assert_eq!(document["info"]["title"], "Realtime <unsafe>");
371 assert_eq!(document["info"]["description"], "Event `docs`");
372 }
373}