1mod response;
13
14use std::collections::HashMap;
15use std::convert::Infallible;
16use std::sync::Arc;
17
18use bytes::Bytes;
19use http_body_util::combinators::BoxBody;
20use hyper::body::Incoming;
21
22use a2a_protocol_types::jsonrpc::{
23 JsonRpcError, JsonRpcErrorResponse, JsonRpcId, JsonRpcRequest, JsonRpcSuccessResponse,
24 JsonRpcVersion,
25};
26
27use crate::agent_card::StaticAgentCardHandler;
28use crate::dispatch::cors::CorsConfig;
29use crate::error::ServerError;
30use crate::handler::{RequestHandler, SendMessageResult};
31use crate::serve::Dispatcher;
32use crate::streaming::build_sse_response;
33
34use response::{
35 error_response, error_response_bytes, extract_headers, json_response, parse_error_response,
36 parse_params, read_body_limited, success_response, success_response_bytes,
37};
38
39pub struct JsonRpcDispatcher {
47 handler: Arc<RequestHandler>,
48 card_handler: Option<StaticAgentCardHandler>,
49 cors: Option<CorsConfig>,
50 config: super::DispatchConfig,
51}
52
53impl JsonRpcDispatcher {
54 #[must_use]
57 pub fn new(handler: Arc<RequestHandler>) -> Self {
58 Self::with_config(handler, super::DispatchConfig::default())
59 }
60
61 #[must_use]
63 pub fn with_config(handler: Arc<RequestHandler>, config: super::DispatchConfig) -> Self {
64 let card_handler = handler
65 .agent_card
66 .as_ref()
67 .and_then(|card| StaticAgentCardHandler::new(card).ok());
68 Self {
69 handler,
70 card_handler,
71 cors: None,
72 config,
73 }
74 }
75
76 #[must_use]
81 pub fn with_cors(mut self, cors: CorsConfig) -> Self {
82 self.cors = Some(cors);
83 self
84 }
85
86 pub async fn dispatch(
93 &self,
94 req: hyper::Request<Incoming>,
95 ) -> hyper::Response<BoxBody<Bytes, Infallible>> {
96 if req.method() == "OPTIONS" {
98 if let Some(ref cors) = self.cors {
99 return cors.preflight_response();
100 }
101 return json_response(204, Vec::new());
102 }
103
104 if req.method() == "GET" && req.uri().path() == "/.well-known/agent-card.json" {
107 let mut resp = self.card_handler.as_ref().map_or_else(
108 || json_response(404, br#"{"error":"agent card not configured"}"#.to_vec()),
109 |h| h.handle(&req).map(http_body_util::BodyExt::boxed),
110 );
111 if let Some(ref cors) = self.cors {
112 cors.apply_headers(&mut resp);
113 }
114 return resp;
115 }
116
117 let mut resp = self.dispatch_inner(req).await;
118 if let Some(ref cors) = self.cors {
119 cors.apply_headers(&mut resp);
120 }
121 resp
122 }
123
124 #[allow(clippy::too_many_lines)]
126 async fn dispatch_inner(
127 &self,
128 req: hyper::Request<Incoming>,
129 ) -> hyper::Response<BoxBody<Bytes, Infallible>> {
130 if let Some(ct) = req.headers().get("content-type") {
132 let ct_str = ct.to_str().unwrap_or("");
133 if !ct_str.starts_with("application/json")
134 && !ct_str.starts_with(a2a_protocol_types::A2A_CONTENT_TYPE)
135 {
136 return parse_error_response(
137 None,
138 &format!("unsupported Content-Type: {ct_str}; expected application/json or application/a2a+json"),
139 );
140 }
141 }
142
143 if let Some(version) = req.headers().get(a2a_protocol_types::A2A_VERSION_HEADER) {
147 if let Ok(v) = version.to_str() {
148 let v = v.trim();
149 if !v.is_empty() {
151 let major = v.split('.').next().and_then(|s| s.parse::<u32>().ok());
152 if major != Some(1) {
153 return error_response(
154 None,
155 &ServerError::Protocol(a2a_protocol_types::error::A2aError::new(
156 a2a_protocol_types::error::ErrorCode::VersionNotSupported,
157 format!("unsupported A2A version: {v}; this server supports 1.x"),
158 )),
159 );
160 }
161 }
162 }
163 }
164
165 let headers = extract_headers(req.headers());
167
168 let body_bytes = match read_body_limited(
170 req.into_body(),
171 self.config.max_request_body_size,
172 self.config.body_read_timeout,
173 )
174 .await
175 {
176 Ok(bytes) => bytes,
177 Err(msg) => return parse_error_response(None, &msg),
178 };
179
180 let raw: serde_json::Value = match serde_json::from_slice(&body_bytes) {
182 Ok(v) => v,
183 Err(e) => return parse_error_response(None, &e.to_string()),
184 };
185
186 if raw.is_array() {
187 let serde_json::Value::Array(items) = raw else {
189 unreachable!()
190 };
191 if items.is_empty() {
192 return parse_error_response(None, "empty batch request");
193 }
194 if items.len() > self.config.max_batch_size {
196 return parse_error_response(
197 None,
198 &format!(
199 "batch too large: {} requests exceeds {} limit",
200 items.len(),
201 self.config.max_batch_size
202 ),
203 );
204 }
205 let mut responses: Vec<serde_json::Value> = Vec::with_capacity(items.len());
206 for item in items {
207 let rpc_req: JsonRpcRequest = match serde_json::from_value(item) {
208 Ok(r) => r,
209 Err(e) => {
210 let err_resp = JsonRpcErrorResponse::new(
212 None,
213 JsonRpcError::new(
214 a2a_protocol_types::error::ErrorCode::ParseError.as_i32(),
215 format!("Parse error: {e}"),
216 ),
217 );
218 if let Ok(v) = serde_json::to_value(&err_resp) {
219 responses.push(v);
220 }
221 continue;
222 }
223 };
224 let resp_body = self.dispatch_single_request(&rpc_req, &headers).await;
225 if let Ok(v) = serde_json::from_slice::<serde_json::Value>(&resp_body) {
226 responses.push(v);
227 }
228 }
229 let body = serde_json::to_vec(&responses).unwrap_or_default();
230 json_response(200, body)
231 } else {
232 let rpc_req: JsonRpcRequest = match serde_json::from_value(raw) {
234 Ok(r) => r,
235 Err(e) => return parse_error_response(None, &e.to_string()),
236 };
237 self.dispatch_single_request_http(&rpc_req, &headers).await
238 }
239 }
240
241 #[allow(clippy::too_many_lines)]
245 async fn dispatch_single_request_http(
246 &self,
247 rpc_req: &JsonRpcRequest,
248 headers: &HashMap<String, String>,
249 ) -> hyper::Response<BoxBody<Bytes, Infallible>> {
250 let id = rpc_req.id.clone();
251 trace_info!(method = %rpc_req.method, "dispatching JSON-RPC request");
252
253 match rpc_req.method.as_str() {
255 "SendStreamingMessage" | "message/stream" => {
256 return self.dispatch_send_message(id, rpc_req, true, headers).await;
257 }
258 "SubscribeToTask" | "tasks/subscribe" => {
259 return match parse_params::<a2a_protocol_types::params::TaskIdParams>(rpc_req) {
260 Ok(p) => match self.handler.on_resubscribe(p, Some(headers)).await {
261 Ok(reader) => build_sse_response(
262 reader,
263 Some(self.config.sse_keep_alive_interval),
264 Some(self.config.sse_channel_capacity),
265 true, ),
267 Err(e) => error_response(id, &e),
268 },
269 Err(e) => error_response(id, &e),
270 };
271 }
272 _ => {}
273 }
274
275 let body = self.dispatch_single_request(rpc_req, headers).await;
276 json_response(200, body)
277 }
278
279 #[allow(clippy::too_many_lines)]
283 async fn dispatch_single_request(
284 &self,
285 rpc_req: &JsonRpcRequest,
286 headers: &HashMap<String, String>,
287 ) -> Vec<u8> {
288 let id = rpc_req.id.clone();
289
290 match rpc_req.method.as_str() {
291 "SendMessage" | "message/send" => {
292 match self
293 .dispatch_send_message_inner(id.clone(), rpc_req, false, headers)
294 .await
295 {
296 Ok(resp) => serde_json::to_vec(&resp).unwrap_or_default(),
297 Err(body) => body,
298 }
299 }
300 "SendStreamingMessage" | "message/stream" => {
301 let err = ServerError::InvalidParams(
303 "SendStreamingMessage not supported in batch requests".into(),
304 );
305 let a2a_err = err.to_a2a_error();
306 let resp = JsonRpcErrorResponse::new(
307 id,
308 JsonRpcError::new(a2a_err.code.as_i32(), a2a_err.message),
309 );
310 serde_json::to_vec(&resp).unwrap_or_default()
311 }
312 "GetTask" | "tasks/get" => {
313 match parse_params::<a2a_protocol_types::params::TaskQueryParams>(rpc_req) {
314 Ok(p) => match self.handler.on_get_task(p, Some(headers)).await {
315 Ok(r) => success_response_bytes(id, &r),
316 Err(e) => error_response_bytes(id, &e),
317 },
318 Err(e) => error_response_bytes(id, &e),
319 }
320 }
321 "ListTasks" | "tasks/list" => {
322 match parse_params::<a2a_protocol_types::params::ListTasksParams>(rpc_req) {
323 Ok(p) => match self.handler.on_list_tasks(p, Some(headers)).await {
324 Ok(r) => success_response_bytes(id, &r),
325 Err(e) => error_response_bytes(id, &e),
326 },
327 Err(e) => error_response_bytes(id, &e),
328 }
329 }
330 "CancelTask" | "tasks/cancel" => {
331 match parse_params::<a2a_protocol_types::params::CancelTaskParams>(rpc_req) {
332 Ok(p) => match self.handler.on_cancel_task(p, Some(headers)).await {
333 Ok(r) => success_response_bytes(id, &r),
334 Err(e) => error_response_bytes(id, &e),
335 },
336 Err(e) => error_response_bytes(id, &e),
337 }
338 }
339 "SubscribeToTask" | "tasks/subscribe" => {
340 let err = ServerError::InvalidParams(
341 "SubscribeToTask not supported in batch requests".into(),
342 );
343 error_response_bytes(id, &err)
344 }
345 "CreateTaskPushNotificationConfig" | "tasks/pushNotificationConfig/set" => {
346 match parse_params::<a2a_protocol_types::push::TaskPushNotificationConfig>(rpc_req)
347 {
348 Ok(p) => match self.handler.on_set_push_config(p, Some(headers)).await {
349 Ok(r) => success_response_bytes(id, &r),
350 Err(e) => error_response_bytes(id, &e),
351 },
352 Err(e) => error_response_bytes(id, &e),
353 }
354 }
355 "GetTaskPushNotificationConfig" | "tasks/pushNotificationConfig/get" => {
356 match parse_params::<a2a_protocol_types::params::GetPushConfigParams>(rpc_req) {
357 Ok(p) => match self.handler.on_get_push_config(p, Some(headers)).await {
358 Ok(r) => success_response_bytes(id, &r),
359 Err(e) => error_response_bytes(id, &e),
360 },
361 Err(e) => error_response_bytes(id, &e),
362 }
363 }
364 "ListTaskPushNotificationConfigs" | "tasks/pushNotificationConfig/list" => {
365 match parse_params::<a2a_protocol_types::params::ListPushConfigsParams>(rpc_req) {
366 Ok(p) => match self
367 .handler
368 .on_list_push_configs(&p.task_id, p.tenant.as_deref(), Some(headers))
369 .await
370 {
371 Ok(configs) => {
372 let resp = a2a_protocol_types::responses::ListPushConfigsResponse {
373 configs,
374 next_page_token: None,
375 };
376 success_response_bytes(id, &resp)
377 }
378 Err(e) => error_response_bytes(id, &e),
379 },
380 Err(e) => error_response_bytes(id, &e),
381 }
382 }
383 "DeleteTaskPushNotificationConfig" | "tasks/pushNotificationConfig/delete" => {
384 match parse_params::<a2a_protocol_types::params::DeletePushConfigParams>(rpc_req) {
385 Ok(p) => match self.handler.on_delete_push_config(p, Some(headers)).await {
386 Ok(()) => success_response_bytes(id, &serde_json::json!({})),
387 Err(e) => error_response_bytes(id, &e),
388 },
389 Err(e) => error_response_bytes(id, &e),
390 }
391 }
392 "GetExtendedAgentCard" | "agent/authenticatedExtendedCard" => {
393 match self.handler.on_get_extended_agent_card(Some(headers)).await {
394 Ok(r) => success_response_bytes(id, &r),
395 Err(e) => error_response_bytes(id, &e),
396 }
397 }
398 other => {
399 let err = ServerError::MethodNotFound(other.to_owned());
400 error_response_bytes(id, &err)
401 }
402 }
403 }
404
405 async fn dispatch_send_message_inner(
408 &self,
409 id: JsonRpcId,
410 rpc_req: &JsonRpcRequest,
411 streaming: bool,
412 headers: &HashMap<String, String>,
413 ) -> Result<JsonRpcSuccessResponse<serde_json::Value>, Vec<u8>> {
414 let params = match parse_params::<a2a_protocol_types::params::MessageSendParams>(rpc_req) {
415 Ok(p) => p,
416 Err(e) => return Err(error_response_bytes(id, &e)),
417 };
418 match self
419 .handler
420 .on_send_message(params, streaming, Some(headers))
421 .await
422 {
423 Ok(SendMessageResult::Response(resp)) => {
424 let result = serde_json::to_value(&resp).unwrap_or(serde_json::Value::Null);
425 Ok(JsonRpcSuccessResponse {
426 jsonrpc: JsonRpcVersion,
427 id,
428 result,
429 })
430 }
431 Ok(SendMessageResult::Stream(_)) => {
432 let err = ServerError::Internal("unexpected stream response".into());
434 Err(error_response_bytes(id, &err))
435 }
436 Err(e) => Err(error_response_bytes(id, &e)),
437 }
438 }
439
440 async fn dispatch_send_message(
441 &self,
442 id: JsonRpcId,
443 rpc_req: &JsonRpcRequest,
444 streaming: bool,
445 headers: &HashMap<String, String>,
446 ) -> hyper::Response<BoxBody<Bytes, Infallible>> {
447 let params = match parse_params::<a2a_protocol_types::params::MessageSendParams>(rpc_req) {
448 Ok(p) => p,
449 Err(e) => return error_response(id, &e),
450 };
451 match self
452 .handler
453 .on_send_message(params, streaming, Some(headers))
454 .await
455 {
456 Ok(SendMessageResult::Response(resp)) => success_response(id, &resp),
457 Ok(SendMessageResult::Stream(reader)) => build_sse_response(
458 reader,
459 Some(self.config.sse_keep_alive_interval),
460 Some(self.config.sse_channel_capacity),
461 true, ),
463 Err(e) => error_response(id, &e),
464 }
465 }
466}
467
468impl std::fmt::Debug for JsonRpcDispatcher {
469 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
470 f.debug_struct("JsonRpcDispatcher").finish()
471 }
472}
473
474impl Dispatcher for JsonRpcDispatcher {
477 fn dispatch(
478 &self,
479 req: hyper::Request<Incoming>,
480 ) -> std::pin::Pin<
481 Box<dyn std::future::Future<Output = crate::serve::DispatchResponse> + Send + '_>,
482 > {
483 Box::pin(self.dispatch(req))
484 }
485}