1use axum::extract::State;
11use axum::http::StatusCode;
12use axum::response::{IntoResponse, Response};
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use std::collections::HashMap;
16use std::collections::HashSet;
17use std::sync::Arc;
18
19#[derive(Debug, Clone, Serialize, Deserialize, Default)]
23pub struct AsyncApiConfig {
24 pub enabled: bool,
26 pub spec: Option<serde_json::Value>,
28}
29
30#[derive(Clone)]
34pub(crate) struct AsyncApiState {
35 pub registered_spec: Option<Arc<Value>>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
43pub struct ParsedChannel {
44 pub name: String,
46 pub address: String,
48 pub messages: Vec<String>,
50 pub bindings: Option<serde_json::Value>,
52}
53
54#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
56pub struct ParsedOperation {
57 pub name: String,
59 pub action: String,
61 pub channel: String,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
67pub struct ParsedMessage {
68 pub name: String,
70 pub schema: Option<serde_json::Value>,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct ParseResult {
77 pub spec_version: String,
78 pub title: String,
79 pub api_version: String,
80 pub channels: Vec<ParsedChannel>,
81 pub operations: Vec<ParsedOperation>,
82 pub messages: Vec<ParsedMessage>,
83}
84
85pub fn parse_asyncapi_value(spec: &Value) -> Result<ParseResult, String> {
93 use asyncapiv3::spec::AsyncApiSpec;
94
95 if let Some(version) = spec.get("asyncapi").and_then(Value::as_str) {
98 if version != "3.0.0" {
99 return Err(format!("unsupported AsyncAPI version: {version}, expected 3.0.0"));
100 }
101 } else if spec.get("asyncapi").is_none() {
102 return Err("missing required field: asyncapi".to_string());
103 }
104
105 let raw: AsyncApiSpec =
106 serde_json::from_value(spec.clone()).map_err(|e| format!("Failed to parse AsyncAPI spec: {e}"))?;
107
108 let AsyncApiSpec::V3_0_0(v3) = raw;
109
110 let spec_version = "3.0.0".to_string();
112
113 let title = v3.info.title.clone();
114 let api_version = v3.info.version.clone();
115
116 let spec_doc =
118 serde_json::to_value(&v3).map_err(|e| format!("Failed to serialize AsyncAPI spec for $ref resolution: {e}"))?;
119
120 let channels = extract_channels(&v3, &spec_doc)?;
121 let operations = extract_operations(&v3)?;
122 let messages = extract_messages(&v3, &spec_doc)?;
123
124 Ok(ParseResult {
125 spec_version,
126 title,
127 api_version,
128 channels,
129 operations,
130 messages,
131 })
132}
133
134fn extract_channels(v3: &asyncapiv3::spec::AsyncApiV3Spec, spec_doc: &Value) -> Result<Vec<ParsedChannel>, String> {
135 use asyncapiv3::spec::common::Either;
136
137 let mut channels = Vec::new();
138
139 for (name, channel_ref_or) in &v3.channels {
140 match channel_ref_or {
141 Either::Right(channel) => {
142 let address = channel.address.clone().unwrap_or_else(|| name.clone());
143 let messages: Vec<String> = channel.messages.keys().cloned().collect();
144 let bindings = spec_doc
146 .pointer(&format!("/channels/{}/bindings", name.replace('/', "~1")))
147 .cloned();
148 channels.push(ParsedChannel {
149 name: name.clone(),
150 address,
151 messages,
152 bindings,
153 });
154 }
155 Either::Left(_) => {
156 }
158 }
159 }
160
161 channels.sort_by(|a, b| a.name.cmp(&b.name));
163 Ok(channels)
164}
165
166fn extract_operations(v3: &asyncapiv3::spec::AsyncApiV3Spec) -> Result<Vec<ParsedOperation>, String> {
167 use asyncapiv3::spec::common::Either;
168
169 let mut operations = Vec::new();
170
171 for (name, op_ref_or) in &v3.operations {
172 match op_ref_or {
173 Either::Right(op) => {
174 use asyncapiv3::spec::operation::OperationAction;
175 let action = match op.action {
176 OperationAction::Send => "send",
177 OperationAction::Receive => "receive",
178 }
179 .to_string();
180
181 let channel_ref = &op.channel.reference;
183 let channel = channel_ref
184 .strip_prefix("#/channels/")
185 .map(|s| s.split('/').next().unwrap_or(s).to_string())
186 .unwrap_or_else(|| channel_ref.clone());
187
188 operations.push(ParsedOperation {
189 name: name.clone(),
190 action,
191 channel,
192 });
193 }
194 Either::Left(_) => {}
195 }
196 }
197
198 operations.sort_by(|a, b| a.name.cmp(&b.name));
199 Ok(operations)
200}
201
202fn extract_messages(v3: &asyncapiv3::spec::AsyncApiV3Spec, spec_doc: &Value) -> Result<Vec<ParsedMessage>, String> {
203 use asyncapiv3::spec::common::Either;
204 use asyncapiv3::spec::message::Message;
205
206 let mut messages: HashMap<String, Option<Value>> = HashMap::new();
207
208 for (msg_name, msg_ref_or) in &v3.components.messages {
210 match msg_ref_or {
211 Either::Right(msg) => {
212 let schema = extract_schema_from_message(msg, spec_doc);
213 messages.insert(msg_name.clone(), schema);
214 }
215 Either::Left(reference) => {
216 if let Some(msg) = resolve_ref_as::<Message>(spec_doc, &reference.reference) {
217 let schema = extract_schema_from_message(&msg, spec_doc);
218 messages.insert(msg_name.clone(), schema);
219 }
220 }
221 }
222 }
223
224 for (channel_name, channel_ref_or) in &v3.channels {
226 match channel_ref_or {
227 Either::Right(channel) => {
228 for (msg_name, msg_ref_or) in &channel.messages {
229 let slug = channel_name.trim_start_matches('/').replace('/', "_");
230 let full_name = format!("{slug}_{msg_name}");
231 match msg_ref_or {
232 Either::Right(msg) => {
233 let schema = extract_schema_from_message(msg, spec_doc);
234 messages.entry(full_name).or_insert(schema);
235 }
236 Either::Left(_) => {
237 }
239 }
240 }
241 }
242 Either::Left(_) => {}
243 }
244 }
245
246 let mut result: Vec<ParsedMessage> = messages
247 .into_iter()
248 .map(|(name, schema)| ParsedMessage { name, schema })
249 .collect();
250 result.sort_by(|a, b| a.name.cmp(&b.name));
251 Ok(result)
252}
253
254fn extract_schema_from_message(message: &asyncapiv3::spec::message::Message, spec_doc: &Value) -> Option<Value> {
255 use asyncapiv3::spec::common::Either;
256
257 let payload = message.payload.as_ref()?;
258 match payload {
259 Either::Right(schema_or_multi) => match schema_or_multi {
260 Either::Left(schema) => serde_json::to_value(schema).ok(),
261 Either::Right(multi_format) => Some(multi_format.schema.clone()),
262 },
263 Either::Left(reference) => resolve_ref_value(spec_doc, &reference.reference).map(normalize_schema_ref_value),
264 }
265}
266
267pub fn validate_message(
273 spec: &Value,
274 channel_name: &str,
275 message_name: &str,
276 payload: &Value,
277) -> Result<(bool, Vec<String>), String> {
278 use asyncapiv3::spec::AsyncApiSpec;
279 use asyncapiv3::spec::common::Either;
280
281 let raw: AsyncApiSpec =
282 serde_json::from_value(spec.clone()).map_err(|e| format!("Failed to parse AsyncAPI spec: {e}"))?;
283 let AsyncApiSpec::V3_0_0(v3) = raw;
284
285 let spec_doc = serde_json::to_value(&v3).map_err(|e| format!("Failed to serialize spec: {e}"))?;
286
287 let channel = v3
289 .channels
290 .get(channel_name)
291 .ok_or_else(|| format!("Channel '{channel_name}' not found in spec"))?;
292
293 let channel = match channel {
294 Either::Right(c) => c,
295 Either::Left(_) => return Err(format!("Channel '{channel_name}' is a $ref, not inline")),
296 };
297
298 let msg_ref_or = channel
300 .messages
301 .get(message_name)
302 .ok_or_else(|| format!("Message '{message_name}' not found on channel '{channel_name}'"))?;
303
304 let schema = match msg_ref_or {
305 Either::Right(msg) => extract_schema_from_message(msg, &spec_doc),
306 Either::Left(reference) => {
307 use asyncapiv3::spec::message::Message;
308 resolve_ref_as::<Message>(&spec_doc, &reference.reference)
309 .and_then(|msg| extract_schema_from_message(&msg, &spec_doc))
310 }
311 };
312
313 let schema = match schema {
314 Some(s) => s,
315 None => {
316 return Ok((true, Vec::new()));
318 }
319 };
320
321 let compiled = jsonschema::validator_for(&schema).map_err(|e| format!("Failed to compile schema: {e}"))?;
323
324 let errors: Vec<String> = compiled.iter_errors(payload).map(|e| e.to_string()).collect();
325
326 Ok((errors.is_empty(), errors))
327}
328
329fn decode_pointer_segment(segment: &str) -> String {
332 segment.replace("~1", "/").replace("~0", "~")
333}
334
335fn reference_to_pointer(reference: &str) -> Option<String> {
336 let raw = reference.strip_prefix("#/")?;
337 let mut pointer = String::new();
338 for segment in raw.split('/') {
339 pointer.push('/');
340 pointer.push_str(&decode_pointer_segment(segment));
341 }
342 Some(pointer)
343}
344
345fn resolve_ref_value(document: &Value, reference: &str) -> Option<Value> {
346 let mut current = reference.to_string();
347 let mut visited = HashSet::new();
348
349 for _ in 0..32 {
350 if !visited.insert(current.clone()) {
351 return None;
352 }
353 let pointer = reference_to_pointer(¤t)?;
354 let value = document.pointer(&pointer)?;
355 if let Some(next_ref) = value.get("$ref").and_then(Value::as_str) {
356 current = next_ref.to_string();
357 continue;
358 }
359 return Some(value.clone());
360 }
361 None
362}
363
364fn resolve_ref_as<T: serde::de::DeserializeOwned>(document: &Value, reference: &str) -> Option<T> {
365 let value = resolve_ref_value(document, reference)?;
366 serde_json::from_value(value).ok()
367}
368
369fn normalize_schema_ref_value(value: Value) -> Value {
370 if let Some(obj) = value.as_object()
371 && obj.get("schemaFormat").is_some()
372 && let Some(schema) = obj.get("schema")
373 {
374 return schema.clone();
375 }
376 value
377}
378
379#[derive(Debug, Deserialize)]
383pub struct ParseRequest {
384 #[serde(flatten)]
385 pub spec: serde_json::Value,
386}
387
388#[derive(Debug, Serialize)]
390pub struct ValidationResponse {
391 pub valid: bool,
392 #[serde(skip_serializing_if = "Vec::is_empty")]
393 pub errors: Vec<String>,
394}
395
396#[derive(Debug, Deserialize)]
398pub struct ValidateRequest {
399 pub spec: serde_json::Value,
400 pub channel: String,
401 pub message: String,
402 pub payload: serde_json::Value,
403}
404
405pub(crate) async fn handle_asyncapi_parse(axum::extract::Json(body): axum::extract::Json<Value>) -> Response {
411 match parse_asyncapi_value(&body) {
412 Ok(result) => (StatusCode::OK, axum::Json(result)).into_response(),
413 Err(error) => problem_response(StatusCode::BAD_REQUEST, &error),
414 }
415}
416
417pub(crate) async fn handle_asyncapi_validate(
421 axum::extract::Json(body): axum::extract::Json<ValidateRequest>,
422) -> Response {
423 match validate_message(&body.spec, &body.channel, &body.message, &body.payload) {
424 Ok((valid, errors)) => (StatusCode::OK, axum::Json(ValidationResponse { valid, errors })).into_response(),
425 Err(error) => problem_response(StatusCode::BAD_REQUEST, &error),
426 }
427}
428
429pub(crate) async fn handle_asyncapi_json(State(state): State<AsyncApiState>) -> Response {
434 match &state.registered_spec {
435 Some(spec) => (StatusCode::OK, axum::Json((**spec).clone())).into_response(),
436 None => problem_response(
437 StatusCode::NOT_FOUND,
438 "No AsyncAPI spec registered. Configure ServerConfig::asyncapi.spec to register one.",
439 ),
440 }
441}
442
443fn problem_response(status: StatusCode, detail: &str) -> Response {
446 let body = serde_json::json!({
447 "type": "about:blank",
448 "title": status.canonical_reason().unwrap_or("Error"),
449 "status": status.as_u16(),
450 "detail": detail,
451 });
452 (
453 status,
454 [(
455 axum::http::header::CONTENT_TYPE,
456 spikard_core::problem::CONTENT_TYPE_PROBLEM_JSON,
457 )],
458 axum::Json(body),
459 )
460 .into_response()
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 fn chat_api_spec() -> Value {
468 serde_json::json!({
469 "asyncapi": "3.0.0",
470 "info": { "title": "Chat API", "version": "1.0.0" },
471 "channels": {
472 "chat": {
473 "address": "/chat",
474 "messages": {
475 "ChatMessage": {
476 "payload": {
477 "type": "object",
478 "properties": {
479 "text": { "type": "string" },
480 "user_id": { "type": "string" }
481 },
482 "required": ["text", "user_id"]
483 }
484 }
485 }
486 }
487 },
488 "operations": {
489 "sendChat": {
490 "action": "send",
491 "channel": { "$ref": "#/channels/chat" }
492 }
493 },
494 "components": {}
495 })
496 }
497
498 #[test]
499 fn test_parse_valid_spec_returns_structured_result() {
500 let spec = chat_api_spec();
501 let result = parse_asyncapi_value(&spec).expect("valid spec should parse");
502 assert_eq!(result.spec_version, "3.0.0");
503 assert_eq!(result.title, "Chat API");
504 assert_eq!(result.api_version, "1.0.0");
505 assert_eq!(result.channels.len(), 1);
506 assert_eq!(result.channels[0].name, "chat");
507 assert_eq!(result.operations.len(), 1);
508 assert_eq!(result.operations[0].name, "sendChat");
509 assert_eq!(result.operations[0].action, "send");
510 }
511
512 #[test]
513 fn test_parse_invalid_version_returns_error() {
514 let spec = serde_json::json!({
515 "asyncapi": "2.0.0",
516 "info": { "title": "Old API", "version": "1.0.0" },
517 "channels": {}
518 });
519 let err = parse_asyncapi_value(&spec).expect_err("should fail for 2.0.0");
520 assert!(
521 err.contains("unsupported AsyncAPI version") || err.contains("2.0.0"),
522 "Unexpected error: {err}"
523 );
524 }
525
526 #[test]
527 fn test_parse_missing_required_field_returns_error() {
528 let spec = serde_json::json!({
529 "info": { "title": "No version field", "version": "1.0.0" },
530 "channels": {}
531 });
532 let result = parse_asyncapi_value(&spec);
534 assert!(result.is_err(), "should fail when asyncapi field is missing");
535 }
536
537 #[test]
538 fn test_validate_message_valid_payload() {
539 let spec = serde_json::json!({
540 "asyncapi": "3.0.0",
541 "info": { "title": "Order API", "version": "1.0.0" },
542 "channels": {
543 "orders/new": {
544 "address": "orders/new",
545 "messages": {
546 "NewOrder": {
547 "payload": {
548 "type": "object",
549 "properties": {
550 "order_id": { "type": "string" },
551 "amount": { "type": "number" },
552 "currency": { "type": "string" }
553 },
554 "required": ["order_id", "amount", "currency"]
555 }
556 }
557 }
558 }
559 },
560 "operations": {},
561 "components": {}
562 });
563 let payload = serde_json::json!({
564 "order_id": "ORD-001",
565 "amount": 99.99,
566 "currency": "USD"
567 });
568 let (valid, errors) = validate_message(&spec, "orders/new", "NewOrder", &payload).expect("validate");
569 assert!(valid, "Expected valid, but got errors: {errors:?}");
570 assert!(errors.is_empty());
571 }
572
573 #[test]
574 fn test_validate_message_missing_required_fields() {
575 let spec = serde_json::json!({
576 "asyncapi": "3.0.0",
577 "info": { "title": "Order API", "version": "1.0.0" },
578 "channels": {
579 "orders/new": {
580 "address": "orders/new",
581 "messages": {
582 "NewOrder": {
583 "payload": {
584 "type": "object",
585 "properties": {
586 "order_id": { "type": "string" },
587 "amount": { "type": "number" },
588 "currency": { "type": "string" }
589 },
590 "required": ["order_id", "amount", "currency"]
591 }
592 }
593 }
594 }
595 },
596 "operations": {},
597 "components": {}
598 });
599 let payload = serde_json::json!({ "order_id": "ORD-002" });
600 let (valid, errors) = validate_message(&spec, "orders/new", "NewOrder", &payload).expect("validate");
601 assert!(!valid, "Expected invalid");
602 assert!(!errors.is_empty(), "Expected validation errors");
603 }
604
605 #[test]
606 fn test_validate_message_unknown_channel_returns_error() {
607 let spec = chat_api_spec();
608 let payload = serde_json::json!({ "text": "hello", "user_id": "u1" });
609 let result = validate_message(&spec, "nonexistent_channel", "ChatMessage", &payload);
610 assert!(result.is_err(), "Should error for unknown channel");
611 }
612
613 #[test]
614 fn test_parse_channel_extraction_fixture_asyncapi_channel_extraction() {
615 let spec = serde_json::json!({
616 "asyncapi": "3.0.0",
617 "info": { "title": "Chat API", "version": "1.0.0" },
618 "channels": {
619 "chat/messages": {
620 "address": "chat/messages",
621 "messages": {
622 "ChatMessage": { "$ref": "#/components/messages/ChatMessage" }
623 }
624 },
625 "user/events": {
626 "address": "user/events",
627 "messages": {
628 "UserEvent": { "$ref": "#/components/messages/UserEvent" }
629 }
630 }
631 },
632 "operations": {},
633 "components": {
634 "messages": {
635 "ChatMessage": {
636 "payload": {
637 "type": "object",
638 "properties": {
639 "text": { "type": "string" },
640 "user_id": { "type": "string" }
641 },
642 "required": ["text", "user_id"]
643 }
644 },
645 "UserEvent": {
646 "payload": {
647 "type": "object",
648 "properties": {
649 "event": { "type": "string" },
650 "user_id": { "type": "string" }
651 },
652 "required": ["event", "user_id"]
653 }
654 }
655 }
656 }
657 });
658 let result = parse_asyncapi_value(&spec).expect("should parse");
659 assert_eq!(result.channels.len(), 2);
660 let names: Vec<&str> = result.channels.iter().map(|c| c.name.as_str()).collect();
661 assert!(names.contains(&"chat/messages"), "Missing chat/messages");
662 assert!(names.contains(&"user/events"), "Missing user/events");
663 }
664
665 #[test]
666 fn test_parse_operation_extraction_fixture() {
667 let spec = serde_json::json!({
668 "asyncapi": "3.0.0",
669 "info": { "title": "Notification API", "version": "2.0.0" },
670 "channels": {
671 "notifications": {
672 "address": "notifications",
673 "messages": {
674 "Notification": { "$ref": "#/components/messages/Notification" }
675 }
676 }
677 },
678 "operations": {
679 "receiveNotification": {
680 "action": "receive",
681 "channel": { "$ref": "#/channels/notifications" },
682 "messages": [{ "$ref": "#/channels/notifications/messages/Notification" }]
683 },
684 "sendAck": {
685 "action": "send",
686 "channel": { "$ref": "#/channels/notifications" }
687 }
688 },
689 "components": {
690 "messages": {
691 "Notification": {
692 "payload": {
693 "type": "object",
694 "properties": {
695 "id": { "type": "string" },
696 "body": { "type": "string" }
697 },
698 "required": ["id", "body"]
699 }
700 }
701 }
702 }
703 });
704 let result = parse_asyncapi_value(&spec).expect("should parse");
705 assert_eq!(result.channels.len(), 1);
706 assert_eq!(result.operations.len(), 2);
707 let op_names: Vec<&str> = result.operations.iter().map(|o| o.name.as_str()).collect();
708 assert!(op_names.contains(&"receiveNotification"));
709 assert!(op_names.contains(&"sendAck"));
710 }
711
712 #[test]
713 fn test_parse_validate_summary_fields_fixture() {
714 let spec = serde_json::json!({
715 "asyncapi": "3.0.0",
716 "info": {
717 "title": "Inventory Service",
718 "version": "3.2.1"
719 },
720 "channels": {
721 "inventory/updates": {
722 "address": "inventory/updates",
723 "messages": {
724 "InventoryUpdate": {
725 "payload": { "type": "object" }
726 }
727 }
728 }
729 },
730 "operations": {},
731 "components": {}
732 });
733 let result = parse_asyncapi_value(&spec).expect("should parse");
734 assert_eq!(result.spec_version, "3.0.0");
735 assert_eq!(result.title, "Inventory Service");
736 assert_eq!(result.api_version, "3.2.1");
737 assert_eq!(result.channels.len(), 1);
738 }
739
740 #[test]
741 fn test_decode_pointer_segment() {
742 assert_eq!(decode_pointer_segment("hello~1world"), "hello/world");
743 assert_eq!(decode_pointer_segment("test~0value"), "test~value");
744 }
745
746 #[test]
747 fn test_resolve_ref_value_follows_nested_local_refs() {
748 let doc = serde_json::json!({
749 "components": {
750 "schemas": {
751 "A": { "$ref": "#/components/schemas/B" },
752 "B": { "type": "object", "properties": { "id": { "type": "string" } } }
753 }
754 }
755 });
756 let resolved = resolve_ref_value(&doc, "#/components/schemas/A").expect("resolved schema");
757 assert_eq!(resolved["type"], "object");
758 assert!(resolved["properties"].get("id").is_some());
759 }
760}