1use crate::a2a::{
2 AgentCard, JsonRpcRequest, JsonRpcResponse, Message, MessageSendParams,
3 TaskArtifactUpdateEvent, TaskStatusUpdateEvent, UpdateEvent,
4};
5use adk_core::Result;
6use futures::stream::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9
10pub struct A2aClient {
12 http_client: reqwest::Client,
13 agent_card: AgentCard,
14}
15
16impl A2aClient {
17 pub fn new(agent_card: AgentCard) -> Self {
19 Self { http_client: reqwest::Client::new(), agent_card }
20 }
21
22 pub async fn resolve_agent_card(base_url: &str) -> Result<AgentCard> {
24 let url = format!("{}/.well-known/agent.json", base_url.trim_end_matches('/'));
25
26 let client = reqwest::Client::new();
27 let response =
28 client.get(&url).send().await.map_err(|e| {
29 adk_core::AdkError::Agent(format!("Failed to fetch agent card: {}", e))
30 })?;
31
32 if !response.status().is_success() {
33 return Err(adk_core::AdkError::Agent(format!(
34 "Failed to fetch agent card: HTTP {}",
35 response.status()
36 )));
37 }
38
39 let card: AgentCard = response
40 .json()
41 .await
42 .map_err(|e| adk_core::AdkError::Agent(format!("Failed to parse agent card: {}", e)))?;
43
44 Ok(card)
45 }
46
47 pub async fn from_url(base_url: &str) -> Result<Self> {
49 let card = Self::resolve_agent_card(base_url).await?;
50 Ok(Self::new(card))
51 }
52
53 pub fn agent_card(&self) -> &AgentCard {
55 &self.agent_card
56 }
57
58 pub async fn send_message(&self, message: Message) -> Result<JsonRpcResponse> {
60 let request = JsonRpcRequest {
61 jsonrpc: "2.0".to_string(),
62 method: "message/send".to_string(),
63 params: Some(
64 serde_json::to_value(MessageSendParams { message, config: None })
65 .map_err(|e| adk_core::AdkError::Agent(e.to_string()))?,
66 ),
67 id: Some(Value::String(uuid::Uuid::new_v4().to_string())),
68 };
69
70 let response = self
71 .http_client
72 .post(&self.agent_card.url)
73 .json(&request)
74 .send()
75 .await
76 .map_err(|e| adk_core::AdkError::Agent(format!("Request failed: {}", e)))?;
77
78 if !response.status().is_success() {
79 return Err(adk_core::AdkError::Agent(format!(
80 "Request failed: HTTP {}",
81 response.status()
82 )));
83 }
84
85 let rpc_response: JsonRpcResponse = response
86 .json()
87 .await
88 .map_err(|e| adk_core::AdkError::Agent(format!("Failed to parse response: {}", e)))?;
89
90 Ok(rpc_response)
91 }
92
93 pub async fn send_streaming_message(
95 &self,
96 message: Message,
97 ) -> Result<Pin<Box<dyn Stream<Item = Result<UpdateEvent>> + Send>>> {
98 let stream_url = format!("{}/stream", self.agent_card.url.trim_end_matches('/'));
99
100 let request = JsonRpcRequest {
101 jsonrpc: "2.0".to_string(),
102 method: "message/stream".to_string(),
103 params: Some(
104 serde_json::to_value(MessageSendParams { message, config: None })
105 .map_err(|e| adk_core::AdkError::Agent(e.to_string()))?,
106 ),
107 id: Some(Value::String(uuid::Uuid::new_v4().to_string())),
108 };
109
110 let response = self
111 .http_client
112 .post(&stream_url)
113 .json(&request)
114 .send()
115 .await
116 .map_err(|e| adk_core::AdkError::Agent(format!("Request failed: {}", e)))?;
117
118 if !response.status().is_success() {
119 return Err(adk_core::AdkError::Agent(format!(
120 "Request failed: HTTP {}",
121 response.status()
122 )));
123 }
124
125 let stream = async_stream::stream! {
127 let mut bytes_stream = response.bytes_stream();
128 let mut buffer = String::new();
129
130 use futures::StreamExt;
131 while let Some(chunk_result) = bytes_stream.next().await {
132 let chunk = match chunk_result {
133 Ok(c) => c,
134 Err(e) => {
135 yield Err(adk_core::AdkError::Agent(format!("Stream error: {}", e)));
136 break;
137 }
138 };
139
140 buffer.push_str(&String::from_utf8_lossy(&chunk));
141
142 while let Some(event_end) = buffer.find("\n\n") {
144 let event_data = buffer[..event_end].to_string();
145 buffer = buffer[event_end + 2..].to_string();
146
147 if let Some(data) = parse_sse_data(&event_data) {
149 if data.is_empty() {
151 continue;
152 }
153
154 match serde_json::from_str::<JsonRpcResponse>(&data) {
156 Ok(rpc_response) => {
157 if let Some(result) = rpc_response.result {
158 if let Ok(status_event) = serde_json::from_value::<TaskStatusUpdateEvent>(result.clone()) {
160 yield Ok(UpdateEvent::TaskStatusUpdate(status_event));
161 } else if let Ok(artifact_event) = serde_json::from_value::<TaskArtifactUpdateEvent>(result) {
162 yield Ok(UpdateEvent::TaskArtifactUpdate(artifact_event));
163 }
164 } else if let Some(error) = rpc_response.error {
165 yield Err(adk_core::AdkError::Agent(format!(
166 "RPC error: {} ({})",
167 error.message, error.code
168 )));
169 }
170 }
171 Err(e) => {
172 tracing::debug!("Failed to parse SSE data: {}", e);
174 }
175 }
176 }
177 }
178 }
179 };
180
181 Ok(Box::pin(stream))
182 }
183}
184
185fn parse_sse_data(event: &str) -> Option<String> {
187 for line in event.lines() {
188 if let Some(data) = line.strip_prefix("data:") {
189 return Some(data.trim().to_string());
190 }
191 }
192 None
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198
199 #[test]
200 fn test_parse_sse_data() {
201 let event = "event: message\ndata: {\"test\": true}\n";
202 assert_eq!(parse_sse_data(event), Some("{\"test\": true}".to_string()));
203 }
204
205 #[test]
206 fn test_parse_sse_data_no_data() {
207 let event = "event: ping\n";
208 assert_eq!(parse_sse_data(event), None);
209 }
210}