Skip to main content

vtcode_core/a2a/
rpc.rs

1//! JSON-RPC 2.0 structures for A2A Protocol
2//!
3//! Implements the JSON-RPC 2.0 request/response format used by the A2A protocol,
4//! along with A2A-specific RPC method constants.
5
6use serde::{Deserialize, Serialize};
7use serde_json::Value;
8
9use super::errors::A2aErrorCode;
10
11// ============================================================================
12// A2A RPC Method Constants
13// ============================================================================
14
15/// Send a message to initiate or continue a task
16pub const METHOD_MESSAGE_SEND: &str = "message/send";
17
18/// Send a message and subscribe to real-time updates via SSE
19pub const METHOD_MESSAGE_STREAM: &str = "message/stream";
20
21/// Retrieve the current state of a task
22pub const METHOD_TASKS_GET: &str = "tasks/get";
23
24/// Retrieve a list of tasks with optional filtering
25pub const METHOD_TASKS_LIST: &str = "tasks/list";
26
27/// Request cancellation of a running task
28pub const METHOD_TASKS_CANCEL: &str = "tasks/cancel";
29
30/// Set push notification configuration for a task
31pub const METHOD_TASKS_PUSH_CONFIG_SET: &str = "tasks/pushNotificationConfig/set";
32
33/// Get push notification configuration for a task
34pub const METHOD_TASKS_PUSH_CONFIG_GET: &str = "tasks/pushNotificationConfig/get";
35
36/// Resubscribe to task updates after connection interruption
37pub const METHOD_TASKS_RESUBSCRIBE: &str = "tasks/resubscribe";
38
39/// Get authenticated extended agent card
40pub const METHOD_AGENT_GET_EXTENDED_CARD: &str = "agent/getAuthenticatedExtendedCard";
41
42// ============================================================================
43// JSON-RPC 2.0 Structures
44// ============================================================================
45
46/// JSON-RPC 2.0 version constant
47pub const JSONRPC_VERSION: &str = "2.0";
48
49/// JSON-RPC 2.0 Request
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct JsonRpcRequest {
52    /// Protocol version (always "2.0")
53    pub jsonrpc: String,
54    /// Method name
55    pub method: String,
56    /// Method parameters
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub params: Option<Value>,
59    /// Request ID (can be string, number, or null)
60    pub id: Value,
61}
62
63impl JsonRpcRequest {
64    /// Create a new JSON-RPC request
65    pub fn new(method: impl Into<String>, params: Option<Value>, id: Value) -> Self {
66        Self {
67            jsonrpc: JSONRPC_VERSION.to_string(),
68            method: method.into(),
69            params,
70            id,
71        }
72    }
73
74    /// Create a request with a string ID
75    pub fn with_string_id(
76        method: impl Into<String>,
77        params: Option<Value>,
78        id: impl Into<String>,
79    ) -> Self {
80        Self::new(method, params, Value::String(id.into()))
81    }
82
83    /// Create a request with a numeric ID
84    pub fn with_numeric_id(method: impl Into<String>, params: Option<Value>, id: i64) -> Self {
85        Self::new(method, params, Value::Number(id.into()))
86    }
87
88    /// Create a message/send request
89    pub fn message_send(params: MessageSendParams, id: Value) -> Self {
90        Self::new(
91            METHOD_MESSAGE_SEND,
92            Some(serde_json::to_value(params).unwrap_or_default()),
93            id,
94        )
95    }
96
97    /// Create a tasks/get request
98    pub fn tasks_get(task_id: impl Into<String>, id: Value) -> Self {
99        Self::new(
100            METHOD_TASKS_GET,
101            Some(serde_json::json!({ "id": task_id.into() })),
102            id,
103        )
104    }
105}
106
107/// JSON-RPC 2.0 Response
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct JsonRpcResponse {
110    /// Protocol version (always "2.0")
111    pub jsonrpc: String,
112    /// Result (present on success)
113    #[serde(skip_serializing_if = "Option::is_none")]
114    pub result: Option<Value>,
115    /// Error (present on failure)
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub error: Option<JsonRpcError>,
118    /// Request ID (matches the request)
119    pub id: Value,
120}
121
122impl JsonRpcResponse {
123    /// Create a success response
124    pub fn success(result: Value, id: Value) -> Self {
125        Self {
126            jsonrpc: JSONRPC_VERSION.to_string(),
127            result: Some(result),
128            error: None,
129            id,
130        }
131    }
132
133    /// Create an error response
134    pub fn error(error: JsonRpcError, id: Value) -> Self {
135        Self {
136            jsonrpc: JSONRPC_VERSION.to_string(),
137            result: None,
138            error: Some(error),
139            id,
140        }
141    }
142
143    /// Check if this is a success response
144    pub fn is_success(&self) -> bool {
145        self.result.is_some() && self.error.is_none()
146    }
147
148    /// Check if this is an error response
149    pub fn is_error(&self) -> bool {
150        self.error.is_some()
151    }
152}
153
154/// JSON-RPC 2.0 Error
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct JsonRpcError {
157    /// Error code
158    pub code: i32,
159    /// Error message
160    pub message: String,
161    /// Additional error data
162    #[serde(skip_serializing_if = "Option::is_none")]
163    pub data: Option<Value>,
164}
165
166impl JsonRpcError {
167    /// Create a new error
168    pub fn new(code: i32, message: impl Into<String>) -> Self {
169        Self {
170            code,
171            message: message.into(),
172            data: None,
173        }
174    }
175
176    /// Create an error with additional data
177    pub fn with_data(code: i32, message: impl Into<String>, data: Value) -> Self {
178        Self {
179            code,
180            message: message.into(),
181            data: Some(data),
182        }
183    }
184
185    /// Create an error from A2aErrorCode
186    pub fn from_code(code: A2aErrorCode, message: impl Into<String>) -> Self {
187        Self::new(code.into(), message)
188    }
189
190    /// Create a parse error
191    pub fn parse_error(message: impl Into<String>) -> Self {
192        Self::from_code(A2aErrorCode::JsonParseError, message)
193    }
194
195    /// Create an invalid request error
196    pub fn invalid_request(message: impl Into<String>) -> Self {
197        Self::from_code(A2aErrorCode::InvalidRequest, message)
198    }
199
200    /// Create a method not found error
201    pub fn method_not_found(method: impl Into<String>) -> Self {
202        Self::from_code(
203            A2aErrorCode::MethodNotFound,
204            format!("Method not found: {}", method.into()),
205        )
206    }
207
208    /// Create an invalid params error
209    pub fn invalid_params(message: impl Into<String>) -> Self {
210        Self::from_code(A2aErrorCode::InvalidParams, message)
211    }
212
213    /// Create an internal error
214    pub fn internal_error(message: impl Into<String>) -> Self {
215        Self::from_code(A2aErrorCode::InternalError, message)
216    }
217
218    /// Create a task not found error
219    pub fn task_not_found(task_id: impl Into<String>) -> Self {
220        Self::from_code(
221            A2aErrorCode::TaskNotFound,
222            format!("Task not found: {}", task_id.into()),
223        )
224    }
225}
226
227// ============================================================================
228// Request Parameters
229// ============================================================================
230
231/// Parameters for message/send and message/stream methods
232#[derive(Debug, Clone, Serialize, Deserialize)]
233#[serde(rename_all = "camelCase")]
234pub struct MessageSendParams {
235    /// Message to send
236    pub message: super::types::Message,
237    /// Optional task ID (to continue existing task)
238    #[serde(skip_serializing_if = "Option::is_none")]
239    pub task_id: Option<String>,
240    /// Optional context ID (for conversational context)
241    #[serde(skip_serializing_if = "Option::is_none")]
242    pub context_id: Option<String>,
243    /// Optional configuration
244    #[serde(skip_serializing_if = "Option::is_none")]
245    pub configuration: Option<MessageConfiguration>,
246}
247
248impl MessageSendParams {
249    /// Create new message send params
250    pub fn new(message: super::types::Message) -> Self {
251        Self {
252            message,
253            task_id: None,
254            context_id: None,
255            configuration: None,
256        }
257    }
258
259    /// Set the task ID
260    pub fn with_task_id(mut self, task_id: impl Into<String>) -> Self {
261        self.task_id = Some(task_id.into());
262        self
263    }
264
265    /// Set the context ID
266    pub fn with_context_id(mut self, context_id: impl Into<String>) -> Self {
267        self.context_id = Some(context_id.into());
268        self
269    }
270}
271
272/// Configuration for message sending
273#[derive(Debug, Clone, Serialize, Deserialize)]
274#[serde(rename_all = "camelCase")]
275pub struct MessageConfiguration {
276    /// Accepted input MIME types
277    #[serde(skip_serializing_if = "Option::is_none")]
278    pub accepted_input_modes: Option<Vec<String>>,
279    /// Accepted output MIME types
280    #[serde(skip_serializing_if = "Option::is_none")]
281    pub accepted_output_modes: Option<Vec<String>>,
282    /// History length to include
283    #[serde(skip_serializing_if = "Option::is_none")]
284    pub history_length: Option<u32>,
285    /// Push notification configuration
286    #[serde(skip_serializing_if = "Option::is_none")]
287    pub push_notification_config: Option<PushNotificationConfig>,
288}
289
290/// Push notification configuration
291#[derive(Debug, Clone, Serialize, Deserialize)]
292#[serde(rename_all = "camelCase")]
293pub struct PushNotificationConfig {
294    /// Webhook URL to receive notifications
295    pub url: String,
296    /// Optional authentication header value
297    #[serde(skip_serializing_if = "Option::is_none")]
298    pub authentication: Option<String>,
299}
300
301/// Parameters for tasks/get method
302#[derive(Debug, Clone, Serialize, Deserialize)]
303#[serde(rename_all = "camelCase")]
304pub struct TaskQueryParams {
305    /// Task ID
306    pub id: String,
307    /// Optional history length
308    #[serde(skip_serializing_if = "Option::is_none")]
309    pub history_length: Option<u32>,
310}
311
312/// Parameters for tasks/list method
313#[derive(Debug, Clone, Serialize, Deserialize, Default)]
314#[serde(rename_all = "camelCase")]
315pub struct ListTasksParams {
316    /// Filter by context ID
317    #[serde(skip_serializing_if = "Option::is_none")]
318    pub context_id: Option<String>,
319    /// Filter by status
320    #[serde(skip_serializing_if = "Option::is_none")]
321    pub status: Option<super::types::TaskState>,
322    /// Page size
323    #[serde(skip_serializing_if = "Option::is_none")]
324    pub page_size: Option<u32>,
325    /// Page token for pagination
326    #[serde(skip_serializing_if = "Option::is_none")]
327    pub page_token: Option<String>,
328    /// History length to include
329    #[serde(skip_serializing_if = "Option::is_none")]
330    pub history_length: Option<u32>,
331    /// Filter by last updated timestamp
332    #[serde(skip_serializing_if = "Option::is_none")]
333    pub last_updated_after: Option<String>,
334    /// Include artifacts in response
335    #[serde(skip_serializing_if = "Option::is_none")]
336    pub include_artifacts: Option<bool>,
337    /// Filter by metadata
338    #[serde(skip_serializing_if = "Option::is_none")]
339    pub metadata: Option<Value>,
340}
341
342/// Result for tasks/list method
343#[derive(Debug, Clone, Serialize, Deserialize)]
344#[serde(rename_all = "camelCase")]
345pub struct ListTasksResult {
346    /// List of tasks
347    pub tasks: Vec<super::types::Task>,
348    /// Total number of tasks matching the query
349    #[serde(skip_serializing_if = "Option::is_none")]
350    pub total_size: Option<u32>,
351    /// Page size used
352    #[serde(skip_serializing_if = "Option::is_none")]
353    pub page_size: Option<u32>,
354    /// Token for next page
355    #[serde(skip_serializing_if = "Option::is_none")]
356    pub next_page_token: Option<String>,
357}
358
359/// Parameters for tasks/cancel method
360#[derive(Debug, Clone, Serialize, Deserialize)]
361#[serde(rename_all = "camelCase")]
362pub struct TaskIdParams {
363    /// Task ID
364    pub id: String,
365}
366
367/// Configuration for push notification delivery
368#[derive(Debug, Clone, Serialize, Deserialize)]
369#[serde(rename_all = "camelCase")]
370pub struct TaskPushNotificationConfig {
371    /// Task ID to configure
372    pub task_id: String,
373    /// Webhook URL for notifications
374    pub url: String,
375    /// Optional authentication header value (Bearer token, API key, etc.)
376    #[serde(skip_serializing_if = "Option::is_none")]
377    pub authentication: Option<String>,
378}
379
380// ============================================================================
381// Streaming Events
382// ============================================================================
383
384// ============================================================================
385// Streaming Events (per A2A Specification)
386// ============================================================================
387
388/// Base wrapper for streaming message response
389#[derive(Debug, Clone, Serialize, Deserialize)]
390#[serde(rename_all = "camelCase")]
391pub struct SendStreamingMessageResponse {
392    /// Event data (one of MessageEvent, TaskStatusUpdateEvent, TaskArtifactUpdateEvent)
393    #[serde(flatten)]
394    pub event: StreamingEvent,
395}
396
397/// Streaming event types (discriminated by 'type' field)
398#[derive(Debug, Clone, Serialize, Deserialize)]
399#[serde(tag = "type", rename_all = "kebab-case")]
400pub enum StreamingEvent {
401    /// Message event from agent
402    #[serde(rename = "message")]
403    Message {
404        /// The message content
405        message: super::types::Message,
406        /// Context identifier the message is associated with
407        #[serde(skip_serializing_if = "Option::is_none")]
408        context_id: Option<String>,
409        /// Type discriminator
410        #[serde(default = "default_message_kind")]
411        kind: String,
412        /// True if this is the final message for the task
413        #[serde(default)]
414        r#final: bool,
415    },
416    /// Task status update event
417    #[serde(rename = "task-status")]
418    TaskStatus {
419        /// Task identifier
420        task_id: String,
421        /// Context identifier the task is associated with
422        #[serde(skip_serializing_if = "Option::is_none")]
423        context_id: Option<String>,
424        /// The new status
425        status: super::types::TaskStatus,
426        /// Type discriminator
427        #[serde(default = "default_status_kind")]
428        kind: String,
429        /// True if this is the terminal update for the task
430        #[serde(default)]
431        r#final: bool,
432    },
433    /// Task artifact update event
434    #[serde(rename = "task-artifact")]
435    TaskArtifact {
436        /// Task identifier
437        task_id: String,
438        /// The artifact data
439        artifact: super::types::Artifact,
440        /// If true, append parts to existing artifact; if false, replace
441        #[serde(default)]
442        append: bool,
443        /// If true, indicates this is the final update for the artifact
444        #[serde(default)]
445        last_chunk: bool,
446        /// Usually false for artifacts; can signal end concurrently with status
447        #[serde(default)]
448        r#final: bool,
449    },
450}
451
452fn default_message_kind() -> String {
453    "streaming-response".to_string()
454}
455
456fn default_status_kind() -> String {
457    "status-update".to_string()
458}
459
460impl StreamingEvent {
461    /// Check if this is a final event
462    pub fn is_final(&self) -> bool {
463        match self {
464            StreamingEvent::Message { r#final, .. } => *r#final,
465            StreamingEvent::TaskStatus { r#final, .. } => *r#final,
466            StreamingEvent::TaskArtifact { r#final, .. } => *r#final,
467        }
468    }
469
470    /// Get the task ID if present
471    pub fn task_id(&self) -> Option<&str> {
472        match self {
473            StreamingEvent::Message { .. } => None,
474            StreamingEvent::TaskStatus { task_id, .. } => Some(task_id),
475            StreamingEvent::TaskArtifact { task_id, .. } => Some(task_id),
476        }
477    }
478
479    /// Get the context ID if present
480    pub fn context_id(&self) -> Option<&str> {
481        match self {
482            StreamingEvent::Message { context_id, .. } => context_id.as_deref(),
483            StreamingEvent::TaskStatus { context_id, .. } => context_id.as_deref(),
484            StreamingEvent::TaskArtifact { .. } => None,
485        }
486    }
487}
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492
493    #[test]
494    fn test_json_rpc_request_creation() {
495        let request = JsonRpcRequest::with_string_id(
496            METHOD_MESSAGE_SEND,
497            Some(serde_json::json!({"message": {}})),
498            "req-1",
499        );
500        assert_eq!(request.jsonrpc, "2.0");
501        assert_eq!(request.method, "message/send");
502        assert_eq!(request.id, Value::String("req-1".to_string()));
503    }
504
505    #[test]
506    fn test_json_rpc_response_success() {
507        let response = JsonRpcResponse::success(
508            serde_json::json!({"status": "ok"}),
509            Value::String("req-1".to_string()),
510        );
511        assert!(response.is_success());
512        assert!(!response.is_error());
513    }
514
515    #[test]
516    fn test_json_rpc_response_error() {
517        let error = JsonRpcError::task_not_found("task-123");
518        let response = JsonRpcResponse::error(error, Value::String("req-1".to_string()));
519        assert!(!response.is_success());
520        assert!(response.is_error());
521    }
522
523    #[test]
524    fn test_error_code_serialization() {
525        let error = JsonRpcError::from_code(A2aErrorCode::TaskNotFound, "Task not found");
526        assert_eq!(error.code, -32001);
527    }
528
529    #[test]
530    fn test_streaming_event_message() {
531        let event = StreamingEvent::Message {
532            message: super::super::types::Message::agent_text("Response"),
533            context_id: Some("ctx-1".to_string()),
534            kind: "streaming-response".to_string(),
535            r#final: false,
536        };
537        assert!(!event.is_final());
538        assert_eq!(event.context_id(), Some("ctx-1"));
539    }
540
541    #[test]
542    fn test_streaming_event_task_status() {
543        let event = StreamingEvent::TaskStatus {
544            task_id: "task-1".to_string(),
545            context_id: None,
546            status: super::super::types::TaskStatus::new(super::super::types::TaskState::Completed),
547            kind: "status-update".to_string(),
548            r#final: true,
549        };
550        assert!(event.is_final());
551        assert_eq!(event.task_id(), Some("task-1"));
552    }
553
554    #[test]
555    fn test_streaming_event_artifact() {
556        let artifact = super::super::types::Artifact::text("art-1", "Output");
557        let event = StreamingEvent::TaskArtifact {
558            task_id: "task-1".to_string(),
559            artifact,
560            append: false,
561            last_chunk: true,
562            r#final: false,
563        };
564        assert!(!event.is_final());
565        assert_eq!(event.task_id(), Some("task-1"));
566    }
567
568    #[test]
569    fn test_send_streaming_message_response_serialization() {
570        let msg = super::super::types::Message::agent_text("Hello");
571        let response = SendStreamingMessageResponse {
572            event: StreamingEvent::Message {
573                message: msg,
574                context_id: Some("ctx-1".to_string()),
575                kind: "streaming-response".to_string(),
576                r#final: false,
577            },
578        };
579
580        let json = serde_json::to_string(&response).expect("serialize");
581        assert!(json.contains("streaming-response"));
582        assert!(json.contains("message"));
583
584        let deserialized: SendStreamingMessageResponse =
585            serde_json::from_str(&json).expect("deserialize");
586        match deserialized.event {
587            StreamingEvent::Message { ref kind, .. } => {
588                assert_eq!(kind, "streaming-response");
589            }
590            _ => panic!("Expected Message event"),
591        }
592    }
593
594    #[test]
595    fn test_task_push_notification_config() {
596        let config = TaskPushNotificationConfig {
597            task_id: "task-1".to_string(),
598            url: "https://example.com/webhook".to_string(),
599            authentication: Some("Bearer token123".to_string()),
600        };
601
602        let json = serde_json::to_string(&config).expect("serialize");
603        let deserialized: TaskPushNotificationConfig =
604            serde_json::from_str(&json).expect("deserialize");
605
606        assert_eq!(deserialized.task_id, "task-1");
607        assert_eq!(deserialized.url, "https://example.com/webhook");
608        assert!(deserialized.authentication.is_some());
609    }
610}