adk_server/a2a/
remote_agent.rs

1use crate::a2a::{A2aClient, Part as A2aPart, Role, UpdateEvent};
2use adk_core::{Agent, Content, Event, EventStream, InvocationContext, Part, Result};
3use async_trait::async_trait;
4use std::sync::Arc;
5
6/// Configuration for a remote A2A agent
7#[derive(Clone)]
8pub struct RemoteA2aConfig {
9    /// Name of the agent
10    pub name: String,
11    /// Description of the agent
12    pub description: String,
13    /// Base URL of the remote agent (e.g., "http://localhost:8080")
14    /// The agent card will be fetched from {base_url}/.well-known/agent.json
15    pub agent_url: String,
16}
17
18/// An agent that communicates with a remote A2A agent
19pub struct RemoteA2aAgent {
20    config: RemoteA2aConfig,
21}
22
23impl RemoteA2aAgent {
24    pub fn new(config: RemoteA2aConfig) -> Self {
25        Self { config }
26    }
27
28    pub fn builder(name: impl Into<String>) -> RemoteA2aAgentBuilder {
29        RemoteA2aAgentBuilder::new(name)
30    }
31}
32
33#[async_trait]
34impl Agent for RemoteA2aAgent {
35    fn name(&self) -> &str {
36        &self.config.name
37    }
38
39    fn description(&self) -> &str {
40        &self.config.description
41    }
42
43    fn sub_agents(&self) -> &[Arc<dyn Agent>] {
44        &[]
45    }
46
47    async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<EventStream> {
48        let url = self.config.agent_url.clone();
49        let invocation_id = ctx.invocation_id().to_string();
50        let agent_name = self.config.name.clone();
51
52        // Get user content from context
53        let user_content = get_user_content_from_context(ctx.as_ref());
54
55        let stream = async_stream::stream! {
56            // Create A2A client
57            let client = match A2aClient::from_url(&url).await {
58                Ok(c) => c,
59                Err(e) => {
60                    yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
61                    return;
62                }
63            };
64
65            // Build message from user content
66            let message = build_a2a_message(user_content);
67
68            // Send streaming message
69            match client.send_streaming_message(message).await {
70                Ok(mut event_stream) => {
71                    use futures::StreamExt;
72                    while let Some(result) = event_stream.next().await {
73                        match result {
74                            Ok(update_event) => {
75                                if let Some(event) = convert_update_event(&invocation_id, &agent_name, update_event) {
76                                    yield Ok(event);
77                                }
78                            }
79                            Err(e) => {
80                                yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
81                                return;
82                            }
83                        }
84                    }
85                }
86                Err(e) => {
87                    yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
88                }
89            }
90        };
91
92        Ok(Box::pin(stream))
93    }
94}
95
96/// Builder for RemoteA2aAgent
97pub struct RemoteA2aAgentBuilder {
98    name: String,
99    description: String,
100    agent_url: Option<String>,
101}
102
103impl RemoteA2aAgentBuilder {
104    pub fn new(name: impl Into<String>) -> Self {
105        Self { name: name.into(), description: String::new(), agent_url: None }
106    }
107
108    pub fn description(mut self, description: impl Into<String>) -> Self {
109        self.description = description.into();
110        self
111    }
112
113    pub fn agent_url(mut self, url: impl Into<String>) -> Self {
114        self.agent_url = Some(url.into());
115        self
116    }
117
118    pub fn build(self) -> Result<RemoteA2aAgent> {
119        let agent_url = self.agent_url.ok_or_else(|| {
120            adk_core::AdkError::Agent("RemoteA2aAgent requires agent_url".to_string())
121        })?;
122
123        Ok(RemoteA2aAgent::new(RemoteA2aConfig {
124            name: self.name,
125            description: self.description,
126            agent_url,
127        }))
128    }
129}
130
131// Helper functions
132
133fn get_user_content_from_context(ctx: &dyn InvocationContext) -> Option<String> {
134    let content = ctx.user_content();
135    for part in &content.parts {
136        if let Part::Text { text } = part {
137            return Some(text.clone());
138        }
139    }
140    None
141}
142
143fn build_a2a_message(content: Option<String>) -> crate::a2a::Message {
144    let text = content.unwrap_or_default();
145    crate::a2a::Message::builder()
146        .role(Role::User)
147        .parts(vec![A2aPart::text(text)])
148        .message_id(uuid::Uuid::new_v4().to_string())
149        .build()
150}
151
152fn convert_update_event(
153    invocation_id: &str,
154    agent_name: &str,
155    update: UpdateEvent,
156) -> Option<Event> {
157    match update {
158        UpdateEvent::TaskArtifactUpdate(artifact_event) => {
159            let parts: Vec<Part> = artifact_event
160                .artifact
161                .parts
162                .iter()
163                .filter_map(|p| match p {
164                    A2aPart::Text { text, .. } => Some(Part::Text { text: text.clone() }),
165                    _ => None,
166                })
167                .collect();
168
169            if parts.is_empty() {
170                return None;
171            }
172
173            let mut event = Event::new(invocation_id.to_string());
174            event.author = agent_name.to_string();
175            event.llm_response.content = Some(Content { role: "model".to_string(), parts });
176            event.llm_response.partial = !artifact_event.last_chunk;
177            Some(event)
178        }
179        UpdateEvent::TaskStatusUpdate(status_event) => {
180            // Only create event for final status updates with messages
181            if status_event.final_update {
182                if let Some(msg) = status_event.status.message {
183                    let mut event = Event::new(invocation_id.to_string());
184                    event.author = agent_name.to_string();
185                    event.llm_response.content = Some(Content {
186                        role: "model".to_string(),
187                        parts: vec![Part::Text { text: msg }],
188                    });
189                    event.llm_response.turn_complete = true;
190                    return Some(event);
191                }
192            }
193            None
194        }
195    }
196}
197
198fn create_error_event(invocation_id: &str, agent_name: &str, error: &str) -> Event {
199    let mut event = Event::new(invocation_id.to_string());
200    event.author = agent_name.to_string();
201    event.llm_response.error_message = Some(error.to_string());
202    event.llm_response.turn_complete = true;
203    event
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209
210    #[test]
211    fn test_builder() {
212        let agent = RemoteA2aAgent::builder("test")
213            .description("Test agent")
214            .agent_url("http://localhost:8080")
215            .build()
216            .unwrap();
217
218        assert_eq!(agent.name(), "test");
219        assert_eq!(agent.description(), "Test agent");
220    }
221
222    #[test]
223    fn test_builder_missing_url() {
224        let result = RemoteA2aAgent::builder("test").build();
225        assert!(result.is_err());
226    }
227}