1use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use uuid::Uuid;
10
11use crate::{
12 codec::Codec,
13 protocol::{error::A2AError, operation::A2AOperation},
14 service::response::A2AResponse,
15};
16
17use super::json::JsonCodec;
18
19#[derive(Debug, Serialize)]
21struct JsonRpcRequest {
22 jsonrpc: String,
23 method: String,
24 params: Value,
25 id: String,
26}
27
28#[derive(Debug, Deserialize)]
30#[allow(unused)]
31struct JsonRpcResponse {
32 jsonrpc: String,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 result: Option<Value>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 error: Option<JsonRpcError>,
37 id: Value,
38}
39
40#[derive(Debug, Deserialize)]
42#[allow(unused)]
43struct JsonRpcError {
44 code: i64,
45 message: String,
46 #[serde(skip_serializing_if = "Option::is_none")]
47 data: Option<Value>,
48}
49
50#[derive(Debug, Clone)]
55pub struct JsonRpcCodec {
56 inner: JsonCodec,
58}
59
60impl JsonRpcCodec {
61 pub fn new() -> Self {
63 Self {
64 inner: JsonCodec::new(),
65 }
66 }
67
68 fn operation_to_method(operation: &A2AOperation) -> &'static str {
70 match operation {
71 A2AOperation::SendMessage { stream, .. } => {
72 if *stream {
73 "message/stream"
74 } else {
75 "message/send"
76 }
77 }
78 A2AOperation::GetTask { .. } => "task/get",
79 A2AOperation::ListTasks { .. } => "task/list",
80 A2AOperation::CancelTask { .. } => "task/cancel",
81 A2AOperation::DiscoverAgent => "agent/discover",
82 A2AOperation::SubscribeTask { .. } => "task/subscribe",
83 A2AOperation::RegisterWebhook { .. } => "webhook/register",
84 }
85 }
86}
87
88impl Default for JsonRpcCodec {
89 fn default() -> Self {
90 Self::new()
91 }
92}
93
94impl Codec for JsonRpcCodec {
95 fn encode_request(&self, operation: &A2AOperation) -> Result<Bytes, A2AError> {
96 let params_bytes = self.inner.encode_request(operation)?;
98 let params: Value = serde_json::from_slice(¶ms_bytes)?;
99
100 let request = JsonRpcRequest {
102 jsonrpc: "2.0".to_string(),
103 method: Self::operation_to_method(operation).to_string(),
104 params,
105 id: Uuid::now_v7().to_string(),
106 };
107
108 let bytes = serde_json::to_vec(&request)?;
109 Ok(Bytes::from(bytes))
110 }
111
112 fn decode_response(
113 &self,
114 body: &[u8],
115 operation: &A2AOperation,
116 ) -> Result<A2AResponse, A2AError> {
117 if body.is_empty() {
119 return Ok(A2AResponse::Empty);
120 }
121
122 let jsonrpc_response: JsonRpcResponse = serde_json::from_slice(body)
124 .map_err(|e| A2AError::Protocol(format!("Failed to parse JSON-RPC response: {}", e)))?;
125
126 if let Some(error) = jsonrpc_response.error {
128 return Err(A2AError::Protocol(format!(
129 "JSON-RPC error {}: {}",
130 error.code, error.message
131 )));
132 }
133
134 let result = jsonrpc_response.result.ok_or_else(|| {
136 A2AError::Protocol("JSON-RPC response missing 'result' field".to_string())
137 })?;
138
139 let result_bytes = serde_json::to_vec(&result)?;
141 self.inner.decode_response(&result_bytes, operation)
142 }
143
144 fn content_type(&self) -> &str {
145 "application/json"
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use crate::protocol::message::Message;
152
153 use super::*;
154
155 #[test]
156 fn test_encode_send_message() {
157 let codec = JsonRpcCodec::new();
158 let message = Message::user("Hello");
159
160 let operation = A2AOperation::SendMessage {
161 message,
162 stream: false,
163 context_id: None,
164 task_id: None,
165 };
166
167 let bytes = codec.encode_request(&operation).unwrap();
168 assert!(!bytes.is_empty());
169
170 let json: Value = serde_json::from_slice(&bytes).unwrap();
172 assert_eq!(json["jsonrpc"], "2.0");
173 assert_eq!(json["method"], "message/send");
174 assert!(json["params"].is_object());
175 assert!(json["id"].is_string());
176 }
177
178 #[test]
179 fn test_encode_streaming_message() {
180 let codec = JsonRpcCodec::new();
181 let message = Message::user("Hello");
182
183 let operation = A2AOperation::SendMessage {
184 message,
185 stream: true,
186 context_id: None,
187 task_id: None,
188 };
189
190 let bytes = codec.encode_request(&operation).unwrap();
191 let json: Value = serde_json::from_slice(&bytes).unwrap();
192 assert_eq!(json["method"], "message/stream");
193 }
194
195 #[test]
196 fn test_operation_method_mapping() {
197 let message = Message::user("test");
198
199 let op = A2AOperation::SendMessage {
200 message: message.clone(),
201 stream: false,
202 context_id: None,
203 task_id: None,
204 };
205 assert_eq!(JsonRpcCodec::operation_to_method(&op), "message/send");
206
207 let op = A2AOperation::SendMessage {
208 message,
209 stream: true,
210 context_id: None,
211 task_id: None,
212 };
213 assert_eq!(JsonRpcCodec::operation_to_method(&op), "message/stream");
214
215 let op = A2AOperation::GetTask {
216 task_id: "task-123".to_string(),
217 };
218 assert_eq!(JsonRpcCodec::operation_to_method(&op), "task/get");
219
220 let op = A2AOperation::CancelTask {
221 task_id: "task-123".to_string(),
222 };
223 assert_eq!(JsonRpcCodec::operation_to_method(&op), "task/cancel");
224
225 let op = A2AOperation::DiscoverAgent;
226 assert_eq!(JsonRpcCodec::operation_to_method(&op), "agent/discover");
227 }
228
229 #[test]
230 fn test_decode_success_response() {
231 let codec = JsonRpcCodec::new();
232 let json = r#"{
233 "jsonrpc": "2.0",
234 "result": {
235 "id": "task-123",
236 "status": "submitted",
237 "input": {
238 "role": "user",
239 "parts": [{"text": "Hello"}]
240 },
241 "createdAt": "2024-01-01T00:00:00Z"
242 },
243 "id": "req-123"
244 }"#;
245
246 let operation = A2AOperation::GetTask {
247 task_id: "task-123".to_string(),
248 };
249
250 let response = codec.decode_response(json.as_bytes(), &operation).unwrap();
251
252 match response {
253 A2AResponse::Task(task) => {
254 assert_eq!(task.id, "task-123");
255 }
256 _ => panic!("Expected Task response"),
257 }
258 }
259
260 #[test]
261 fn test_decode_error_response() {
262 let codec = JsonRpcCodec::new();
263 let json = r#"{
264 "jsonrpc": "2.0",
265 "error": {
266 "code": -32600,
267 "message": "Invalid Request"
268 },
269 "id": "req-123"
270 }"#;
271
272 let operation = A2AOperation::GetTask {
273 task_id: "task-123".to_string(),
274 };
275
276 let result = codec.decode_response(json.as_bytes(), &operation);
277 assert!(result.is_err());
278
279 match result {
280 Err(A2AError::Protocol(msg)) => {
281 assert!(msg.contains("-32600"));
282 assert!(msg.contains("Invalid Request"));
283 }
284 _ => panic!("Expected Protocol error"),
285 }
286 }
287
288 #[test]
289 fn test_decode_missing_result() {
290 let codec = JsonRpcCodec::new();
291 let json = r#"{
292 "jsonrpc": "2.0",
293 "id": "req-123"
294 }"#;
295
296 let operation = A2AOperation::GetTask {
297 task_id: "task-123".to_string(),
298 };
299
300 let result = codec.decode_response(json.as_bytes(), &operation);
301 assert!(result.is_err());
302
303 match result {
304 Err(A2AError::Protocol(msg)) => {
305 assert!(msg.contains("missing 'result' field"));
306 }
307 _ => panic!("Expected Protocol error"),
308 }
309 }
310
311 #[test]
312 fn test_content_type() {
313 let codec = JsonRpcCodec::new();
314 assert_eq!(codec.content_type(), "application/json");
315 }
316}