adk_server/a2a/
client.rs

1use crate::a2a::{
2    AgentCard, JsonRpcRequest, JsonRpcResponse, Message, MessageSendParams,
3    TaskArtifactUpdateEvent, TaskStatusUpdateEvent, UpdateEvent,
4};
5use adk_core::Result;
6use futures::stream::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9
10/// A2A client for communicating with remote A2A agents
11pub struct A2aClient {
12    http_client: reqwest::Client,
13    agent_card: AgentCard,
14}
15
16impl A2aClient {
17    /// Create a new A2A client from an agent card
18    pub fn new(agent_card: AgentCard) -> Self {
19        Self { http_client: reqwest::Client::new(), agent_card }
20    }
21
22    /// Resolve an agent card from a URL (fetch from /.well-known/agent.json)
23    pub async fn resolve_agent_card(base_url: &str) -> Result<AgentCard> {
24        let url = format!("{}/.well-known/agent.json", base_url.trim_end_matches('/'));
25
26        let client = reqwest::Client::new();
27        let response =
28            client.get(&url).send().await.map_err(|e| {
29                adk_core::AdkError::Agent(format!("Failed to fetch agent card: {}", e))
30            })?;
31
32        if !response.status().is_success() {
33            return Err(adk_core::AdkError::Agent(format!(
34                "Failed to fetch agent card: HTTP {}",
35                response.status()
36            )));
37        }
38
39        let card: AgentCard = response
40            .json()
41            .await
42            .map_err(|e| adk_core::AdkError::Agent(format!("Failed to parse agent card: {}", e)))?;
43
44        Ok(card)
45    }
46
47    /// Create a client by resolving an agent card from a URL
48    pub async fn from_url(base_url: &str) -> Result<Self> {
49        let card = Self::resolve_agent_card(base_url).await?;
50        Ok(Self::new(card))
51    }
52
53    /// Get the agent card
54    pub fn agent_card(&self) -> &AgentCard {
55        &self.agent_card
56    }
57
58    /// Send a message to the remote agent (blocking/non-streaming)
59    pub async fn send_message(&self, message: Message) -> Result<JsonRpcResponse> {
60        let request = JsonRpcRequest {
61            jsonrpc: "2.0".to_string(),
62            method: "message/send".to_string(),
63            params: Some(
64                serde_json::to_value(MessageSendParams { message, config: None })
65                    .map_err(|e| adk_core::AdkError::Agent(e.to_string()))?,
66            ),
67            id: Some(Value::String(uuid::Uuid::new_v4().to_string())),
68        };
69
70        let response = self
71            .http_client
72            .post(&self.agent_card.url)
73            .json(&request)
74            .send()
75            .await
76            .map_err(|e| adk_core::AdkError::Agent(format!("Request failed: {}", e)))?;
77
78        if !response.status().is_success() {
79            return Err(adk_core::AdkError::Agent(format!(
80                "Request failed: HTTP {}",
81                response.status()
82            )));
83        }
84
85        let rpc_response: JsonRpcResponse = response
86            .json()
87            .await
88            .map_err(|e| adk_core::AdkError::Agent(format!("Failed to parse response: {}", e)))?;
89
90        Ok(rpc_response)
91    }
92
93    /// Send a message and receive streaming events via SSE
94    pub async fn send_streaming_message(
95        &self,
96        message: Message,
97    ) -> Result<Pin<Box<dyn Stream<Item = Result<UpdateEvent>> + Send>>> {
98        let stream_url = format!("{}/stream", self.agent_card.url.trim_end_matches('/'));
99
100        let request = JsonRpcRequest {
101            jsonrpc: "2.0".to_string(),
102            method: "message/stream".to_string(),
103            params: Some(
104                serde_json::to_value(MessageSendParams { message, config: None })
105                    .map_err(|e| adk_core::AdkError::Agent(e.to_string()))?,
106            ),
107            id: Some(Value::String(uuid::Uuid::new_v4().to_string())),
108        };
109
110        let response = self
111            .http_client
112            .post(&stream_url)
113            .json(&request)
114            .send()
115            .await
116            .map_err(|e| adk_core::AdkError::Agent(format!("Request failed: {}", e)))?;
117
118        if !response.status().is_success() {
119            return Err(adk_core::AdkError::Agent(format!(
120                "Request failed: HTTP {}",
121                response.status()
122            )));
123        }
124
125        // Parse SSE stream
126        let stream = async_stream::stream! {
127            let mut bytes_stream = response.bytes_stream();
128            let mut buffer = String::new();
129
130            use futures::StreamExt;
131            while let Some(chunk_result) = bytes_stream.next().await {
132                let chunk = match chunk_result {
133                    Ok(c) => c,
134                    Err(e) => {
135                        yield Err(adk_core::AdkError::Agent(format!("Stream error: {}", e)));
136                        break;
137                    }
138                };
139
140                buffer.push_str(&String::from_utf8_lossy(&chunk));
141
142                // Process complete SSE events
143                while let Some(event_end) = buffer.find("\n\n") {
144                    let event_data = buffer[..event_end].to_string();
145                    buffer = buffer[event_end + 2..].to_string();
146
147                    // Parse SSE event
148                    if let Some(data) = parse_sse_data(&event_data) {
149                        // Skip done events
150                        if data.is_empty() {
151                            continue;
152                        }
153
154                        // Parse JSON-RPC response
155                        match serde_json::from_str::<JsonRpcResponse>(&data) {
156                            Ok(rpc_response) => {
157                                if let Some(result) = rpc_response.result {
158                                    // Try to parse as different event types
159                                    if let Ok(status_event) = serde_json::from_value::<TaskStatusUpdateEvent>(result.clone()) {
160                                        yield Ok(UpdateEvent::TaskStatusUpdate(status_event));
161                                    } else if let Ok(artifact_event) = serde_json::from_value::<TaskArtifactUpdateEvent>(result) {
162                                        yield Ok(UpdateEvent::TaskArtifactUpdate(artifact_event));
163                                    }
164                                } else if let Some(error) = rpc_response.error {
165                                    yield Err(adk_core::AdkError::Agent(format!(
166                                        "RPC error: {} ({})",
167                                        error.message, error.code
168                                    )));
169                                }
170                            }
171                            Err(e) => {
172                                // Skip parse errors for non-JSON data
173                                tracing::debug!("Failed to parse SSE data: {}", e);
174                            }
175                        }
176                    }
177                }
178            }
179        };
180
181        Ok(Box::pin(stream))
182    }
183}
184
185/// Parse the data field from an SSE event
186fn parse_sse_data(event: &str) -> Option<String> {
187    for line in event.lines() {
188        if let Some(data) = line.strip_prefix("data:") {
189            return Some(data.trim().to_string());
190        }
191    }
192    None
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198
199    #[test]
200    fn test_parse_sse_data() {
201        let event = "event: message\ndata: {\"test\": true}\n";
202        assert_eq!(parse_sse_data(event), Some("{\"test\": true}".to_string()));
203    }
204
205    #[test]
206    fn test_parse_sse_data_no_data() {
207        let event = "event: ping\n";
208        assert_eq!(parse_sse_data(event), None);
209    }
210}