tower_a2a/codec/
jsonrpc.rs

1//! JSON-RPC 2.0 codec for A2A protocol
2//!
3//! This codec wraps A2A operations in JSON-RPC 2.0 envelopes for compatibility
4//! with agents that use the JSON-RPC protocol binding.
5
6use bytes::Bytes;
7use serde::{Deserialize, Serialize};
8use serde_json::Value;
9use uuid::Uuid;
10
11use crate::{
12    codec::Codec,
13    protocol::{error::A2AError, operation::A2AOperation},
14    service::response::A2AResponse,
15};
16
17use super::json::JsonCodec;
18
19/// JSON-RPC 2.0 request envelope
20#[derive(Debug, Serialize)]
21struct JsonRpcRequest {
22    jsonrpc: String,
23    method: String,
24    params: Value,
25    id: String,
26}
27
28/// JSON-RPC 2.0 response envelope
29#[derive(Debug, Deserialize)]
30#[allow(unused)]
31struct JsonRpcResponse {
32    jsonrpc: String,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    result: Option<Value>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    error: Option<JsonRpcError>,
37    id: Value,
38}
39
40/// JSON-RPC 2.0 error object
41#[derive(Debug, Deserialize)]
42#[allow(unused)]
43struct JsonRpcError {
44    code: i64,
45    message: String,
46    #[serde(skip_serializing_if = "Option::is_none")]
47    data: Option<Value>,
48}
49
50/// JSON-RPC 2.0 codec that wraps A2A operations
51///
52/// This codec implements the JSON-RPC 2.0 protocol binding for A2A.
53/// It wraps operations in JSON-RPC request envelopes and unwraps responses.
54#[derive(Debug, Clone)]
55pub struct JsonRpcCodec {
56    /// Inner JSON codec for encoding the params
57    inner: JsonCodec,
58}
59
60impl JsonRpcCodec {
61    /// Create a new JSON-RPC codec
62    pub fn new() -> Self {
63        Self {
64            inner: JsonCodec::new(),
65        }
66    }
67
68    /// Map an A2A operation to a JSON-RPC method name
69    fn operation_to_method(operation: &A2AOperation) -> &'static str {
70        match operation {
71            A2AOperation::SendMessage { stream, .. } => {
72                if *stream {
73                    "message/stream"
74                } else {
75                    "message/send"
76                }
77            }
78            A2AOperation::GetTask { .. } => "task/get",
79            A2AOperation::ListTasks { .. } => "task/list",
80            A2AOperation::CancelTask { .. } => "task/cancel",
81            A2AOperation::DiscoverAgent => "agent/discover",
82            A2AOperation::SubscribeTask { .. } => "task/subscribe",
83            A2AOperation::RegisterWebhook { .. } => "webhook/register",
84        }
85    }
86}
87
88impl Default for JsonRpcCodec {
89    fn default() -> Self {
90        Self::new()
91    }
92}
93
94impl Codec for JsonRpcCodec {
95    fn encode_request(&self, operation: &A2AOperation) -> Result<Bytes, A2AError> {
96        // Encode the operation using the inner JSON codec
97        let params_bytes = self.inner.encode_request(operation)?;
98        let params: Value = serde_json::from_slice(&params_bytes)?;
99
100        // Wrap in JSON-RPC 2.0 envelope
101        let request = JsonRpcRequest {
102            jsonrpc: "2.0".to_string(),
103            method: Self::operation_to_method(operation).to_string(),
104            params,
105            id: Uuid::now_v7().to_string(),
106        };
107
108        let bytes = serde_json::to_vec(&request)?;
109        Ok(Bytes::from(bytes))
110    }
111
112    fn decode_response(
113        &self,
114        body: &[u8],
115        operation: &A2AOperation,
116    ) -> Result<A2AResponse, A2AError> {
117        // Empty responses
118        if body.is_empty() {
119            return Ok(A2AResponse::Empty);
120        }
121
122        // Parse JSON-RPC response envelope
123        let jsonrpc_response: JsonRpcResponse = serde_json::from_slice(body)
124            .map_err(|e| A2AError::Protocol(format!("Failed to parse JSON-RPC response: {}", e)))?;
125
126        // Check for JSON-RPC error
127        if let Some(error) = jsonrpc_response.error {
128            return Err(A2AError::Protocol(format!(
129                "JSON-RPC error {}: {}",
130                error.code, error.message
131            )));
132        }
133
134        // Extract result
135        let result = jsonrpc_response.result.ok_or_else(|| {
136            A2AError::Protocol("JSON-RPC response missing 'result' field".to_string())
137        })?;
138
139        // Decode the result using the inner JSON codec
140        let result_bytes = serde_json::to_vec(&result)?;
141        self.inner.decode_response(&result_bytes, operation)
142    }
143
144    fn content_type(&self) -> &str {
145        "application/json"
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use crate::protocol::message::Message;
152
153    use super::*;
154
155    #[test]
156    fn test_encode_send_message() {
157        let codec = JsonRpcCodec::new();
158        let message = Message::user("Hello");
159
160        let operation = A2AOperation::SendMessage {
161            message,
162            stream: false,
163            context_id: None,
164            task_id: None,
165        };
166
167        let bytes = codec.encode_request(&operation).unwrap();
168        assert!(!bytes.is_empty());
169
170        // Verify it's valid JSON-RPC
171        let json: Value = serde_json::from_slice(&bytes).unwrap();
172        assert_eq!(json["jsonrpc"], "2.0");
173        assert_eq!(json["method"], "message/send");
174        assert!(json["params"].is_object());
175        assert!(json["id"].is_string());
176    }
177
178    #[test]
179    fn test_encode_streaming_message() {
180        let codec = JsonRpcCodec::new();
181        let message = Message::user("Hello");
182
183        let operation = A2AOperation::SendMessage {
184            message,
185            stream: true,
186            context_id: None,
187            task_id: None,
188        };
189
190        let bytes = codec.encode_request(&operation).unwrap();
191        let json: Value = serde_json::from_slice(&bytes).unwrap();
192        assert_eq!(json["method"], "message/stream");
193    }
194
195    #[test]
196    fn test_operation_method_mapping() {
197        let message = Message::user("test");
198
199        let op = A2AOperation::SendMessage {
200            message: message.clone(),
201            stream: false,
202            context_id: None,
203            task_id: None,
204        };
205        assert_eq!(JsonRpcCodec::operation_to_method(&op), "message/send");
206
207        let op = A2AOperation::SendMessage {
208            message,
209            stream: true,
210            context_id: None,
211            task_id: None,
212        };
213        assert_eq!(JsonRpcCodec::operation_to_method(&op), "message/stream");
214
215        let op = A2AOperation::GetTask {
216            task_id: "task-123".to_string(),
217        };
218        assert_eq!(JsonRpcCodec::operation_to_method(&op), "task/get");
219
220        let op = A2AOperation::CancelTask {
221            task_id: "task-123".to_string(),
222        };
223        assert_eq!(JsonRpcCodec::operation_to_method(&op), "task/cancel");
224
225        let op = A2AOperation::DiscoverAgent;
226        assert_eq!(JsonRpcCodec::operation_to_method(&op), "agent/discover");
227    }
228
229    #[test]
230    fn test_decode_success_response() {
231        let codec = JsonRpcCodec::new();
232        let json = r#"{
233            "jsonrpc": "2.0",
234            "result": {
235                "id": "task-123",
236                "status": "submitted",
237                "input": {
238                    "role": "user",
239                    "parts": [{"text": "Hello"}]
240                },
241                "createdAt": "2024-01-01T00:00:00Z"
242            },
243            "id": "req-123"
244        }"#;
245
246        let operation = A2AOperation::GetTask {
247            task_id: "task-123".to_string(),
248        };
249
250        let response = codec.decode_response(json.as_bytes(), &operation).unwrap();
251
252        match response {
253            A2AResponse::Task(task) => {
254                assert_eq!(task.id, "task-123");
255            }
256            _ => panic!("Expected Task response"),
257        }
258    }
259
260    #[test]
261    fn test_decode_error_response() {
262        let codec = JsonRpcCodec::new();
263        let json = r#"{
264            "jsonrpc": "2.0",
265            "error": {
266                "code": -32600,
267                "message": "Invalid Request"
268            },
269            "id": "req-123"
270        }"#;
271
272        let operation = A2AOperation::GetTask {
273            task_id: "task-123".to_string(),
274        };
275
276        let result = codec.decode_response(json.as_bytes(), &operation);
277        assert!(result.is_err());
278
279        match result {
280            Err(A2AError::Protocol(msg)) => {
281                assert!(msg.contains("-32600"));
282                assert!(msg.contains("Invalid Request"));
283            }
284            _ => panic!("Expected Protocol error"),
285        }
286    }
287
288    #[test]
289    fn test_decode_missing_result() {
290        let codec = JsonRpcCodec::new();
291        let json = r#"{
292            "jsonrpc": "2.0",
293            "id": "req-123"
294        }"#;
295
296        let operation = A2AOperation::GetTask {
297            task_id: "task-123".to_string(),
298        };
299
300        let result = codec.decode_response(json.as_bytes(), &operation);
301        assert!(result.is_err());
302
303        match result {
304            Err(A2AError::Protocol(msg)) => {
305                assert!(msg.contains("missing 'result' field"));
306            }
307            _ => panic!("Expected Protocol error"),
308        }
309    }
310
311    #[test]
312    fn test_content_type() {
313        let codec = JsonRpcCodec::new();
314        assert_eq!(codec.content_type(), "application/json");
315    }
316}