adk_server/a2a/
remote_agent.rs1use crate::a2a::{A2aClient, Part as A2aPart, Role, UpdateEvent};
2use adk_core::{Agent, Content, Event, EventStream, InvocationContext, Part, Result};
3use async_trait::async_trait;
4use std::sync::Arc;
5
6#[derive(Clone)]
8pub struct RemoteA2aConfig {
9 pub name: String,
11 pub description: String,
13 pub agent_url: String,
16}
17
18pub struct RemoteA2aAgent {
20 config: RemoteA2aConfig,
21}
22
23impl RemoteA2aAgent {
24 pub fn new(config: RemoteA2aConfig) -> Self {
25 Self { config }
26 }
27
28 pub fn builder(name: impl Into<String>) -> RemoteA2aAgentBuilder {
29 RemoteA2aAgentBuilder::new(name)
30 }
31}
32
33#[async_trait]
34impl Agent for RemoteA2aAgent {
35 fn name(&self) -> &str {
36 &self.config.name
37 }
38
39 fn description(&self) -> &str {
40 &self.config.description
41 }
42
43 fn sub_agents(&self) -> &[Arc<dyn Agent>] {
44 &[]
45 }
46
47 async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<EventStream> {
48 let url = self.config.agent_url.clone();
49 let invocation_id = ctx.invocation_id().to_string();
50 let agent_name = self.config.name.clone();
51
52 let user_content = get_user_content_from_context(ctx.as_ref());
54
55 let stream = async_stream::stream! {
56 let client = match A2aClient::from_url(&url).await {
58 Ok(c) => c,
59 Err(e) => {
60 yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
61 return;
62 }
63 };
64
65 let message = build_a2a_message(user_content);
67
68 match client.send_streaming_message(message).await {
70 Ok(mut event_stream) => {
71 use futures::StreamExt;
72 while let Some(result) = event_stream.next().await {
73 match result {
74 Ok(update_event) => {
75 if let Some(event) = convert_update_event(&invocation_id, &agent_name, update_event) {
76 yield Ok(event);
77 }
78 }
79 Err(e) => {
80 yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
81 return;
82 }
83 }
84 }
85 }
86 Err(e) => {
87 yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
88 }
89 }
90 };
91
92 Ok(Box::pin(stream))
93 }
94}
95
96pub struct RemoteA2aAgentBuilder {
98 name: String,
99 description: String,
100 agent_url: Option<String>,
101}
102
103impl RemoteA2aAgentBuilder {
104 pub fn new(name: impl Into<String>) -> Self {
105 Self { name: name.into(), description: String::new(), agent_url: None }
106 }
107
108 pub fn description(mut self, description: impl Into<String>) -> Self {
109 self.description = description.into();
110 self
111 }
112
113 pub fn agent_url(mut self, url: impl Into<String>) -> Self {
114 self.agent_url = Some(url.into());
115 self
116 }
117
118 pub fn build(self) -> Result<RemoteA2aAgent> {
119 let agent_url = self.agent_url.ok_or_else(|| {
120 adk_core::AdkError::Agent("RemoteA2aAgent requires agent_url".to_string())
121 })?;
122
123 Ok(RemoteA2aAgent::new(RemoteA2aConfig {
124 name: self.name,
125 description: self.description,
126 agent_url,
127 }))
128 }
129}
130
131fn get_user_content_from_context(ctx: &dyn InvocationContext) -> Option<String> {
134 let content = ctx.user_content();
135 for part in &content.parts {
136 if let Part::Text { text } = part {
137 return Some(text.clone());
138 }
139 }
140 None
141}
142
143fn build_a2a_message(content: Option<String>) -> crate::a2a::Message {
144 let text = content.unwrap_or_default();
145 crate::a2a::Message::builder()
146 .role(Role::User)
147 .parts(vec![A2aPart::text(text)])
148 .message_id(uuid::Uuid::new_v4().to_string())
149 .build()
150}
151
152fn convert_update_event(
153 invocation_id: &str,
154 agent_name: &str,
155 update: UpdateEvent,
156) -> Option<Event> {
157 match update {
158 UpdateEvent::TaskArtifactUpdate(artifact_event) => {
159 let parts: Vec<Part> = artifact_event
160 .artifact
161 .parts
162 .iter()
163 .filter_map(|p| match p {
164 A2aPart::Text { text, .. } => Some(Part::Text { text: text.clone() }),
165 _ => None,
166 })
167 .collect();
168
169 if parts.is_empty() {
170 return None;
171 }
172
173 let mut event = Event::new(invocation_id.to_string());
174 event.author = agent_name.to_string();
175 event.llm_response.content = Some(Content { role: "model".to_string(), parts });
176 event.llm_response.partial = !artifact_event.last_chunk;
177 Some(event)
178 }
179 UpdateEvent::TaskStatusUpdate(status_event) => {
180 if status_event.final_update {
182 if let Some(msg) = status_event.status.message {
183 let mut event = Event::new(invocation_id.to_string());
184 event.author = agent_name.to_string();
185 event.llm_response.content = Some(Content {
186 role: "model".to_string(),
187 parts: vec![Part::Text { text: msg }],
188 });
189 event.llm_response.turn_complete = true;
190 return Some(event);
191 }
192 }
193 None
194 }
195 }
196}
197
198fn create_error_event(invocation_id: &str, agent_name: &str, error: &str) -> Event {
199 let mut event = Event::new(invocation_id.to_string());
200 event.author = agent_name.to_string();
201 event.llm_response.error_message = Some(error.to_string());
202 event.llm_response.turn_complete = true;
203 event
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209
210 #[test]
211 fn test_builder() {
212 let agent = RemoteA2aAgent::builder("test")
213 .description("Test agent")
214 .agent_url("http://localhost:8080")
215 .build()
216 .unwrap();
217
218 assert_eq!(agent.name(), "test");
219 assert_eq!(agent.description(), "Test agent");
220 }
221
222 #[test]
223 fn test_builder_missing_url() {
224 let result = RemoteA2aAgent::builder("test").build();
225 assert!(result.is_err());
226 }
227}