Skip to main content

adk_server/a2a/
remote_agent.rs

1use crate::a2a::{
2    A2aClient, Part as A2aPart, Role, TaskArtifactUpdateEvent, TaskStatusUpdateEvent, UpdateEvent,
3};
4use adk_core::{Agent, Content, Event, EventStream, InvocationContext, Part, Result};
5use async_trait::async_trait;
6use std::sync::Arc;
7
8/// Configuration for a remote A2A agent
9#[derive(Clone)]
10pub struct RemoteA2aConfig {
11    /// Name of the agent
12    pub name: String,
13    /// Description of the agent
14    pub description: String,
15    /// Base URL of the remote agent (e.g., "http://localhost:8080")
16    /// The agent card will be fetched from {base_url}/.well-known/agent.json
17    pub agent_url: String,
18    /// Whether to use streaming for communication.
19    /// If `None`, the agent uses streaming if the remote agent supports it.
20    pub streaming: Option<bool>,
21}
22
23/// An agent that communicates with a remote A2A agent
24pub struct RemoteA2aAgent {
25    config: RemoteA2aConfig,
26}
27
28impl RemoteA2aAgent {
29    pub fn new(config: RemoteA2aConfig) -> Self {
30        Self { config }
31    }
32
33    pub fn builder(name: impl Into<String>) -> RemoteA2aAgentBuilder {
34        RemoteA2aAgentBuilder::new(name)
35    }
36}
37
38#[async_trait]
39impl Agent for RemoteA2aAgent {
40    fn name(&self) -> &str {
41        &self.config.name
42    }
43
44    fn description(&self) -> &str {
45        &self.config.description
46    }
47
48    fn sub_agents(&self) -> &[Arc<dyn Agent>] {
49        &[]
50    }
51
52    async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<EventStream> {
53        let url = self.config.agent_url.clone();
54        let invocation_id = ctx.invocation_id().to_string();
55        let agent_name = self.config.name.clone();
56        let config_streaming = self.config.streaming;
57
58        // Get user content from context
59        let user_content = get_user_content_from_context(ctx.as_ref());
60
61        let stream = async_stream::stream! {
62            // Create A2A client
63            let client = match A2aClient::from_url(&url).await {
64                Ok(c) => c,
65                Err(e) => {
66                    yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
67                    return;
68                }
69            };
70
71            // Determine if we should use streaming
72            let use_streaming = config_streaming.unwrap_or(client.agent_card().capabilities.streaming);
73
74            // Build message from user content
75            let message = build_a2a_message(user_content);
76
77            if use_streaming {
78                // Send streaming message
79                match client.send_streaming_message(message).await {
80                    Ok(mut event_stream) => {
81                        use futures::StreamExt;
82                        while let Some(result) = event_stream.next().await {
83                            match result {
84                                Ok(update_event) => {
85                                    if let Some(event) = convert_update_event(&invocation_id, &agent_name, update_event) {
86                                        yield Ok(event);
87                                    }
88                                }
89                                Err(e) => {
90                                    yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
91                                    return;
92                                }
93                            }
94                        }
95                    }
96                    Err(e) => {
97                        yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
98                    }
99                }
100            } else {
101                // Send non-streaming message
102                match client.send_message(message).await {
103                    Ok(rpc_response) => {
104                        if let Some(result) = rpc_response.result {
105                            match serde_json::from_value::<crate::a2a::Task>(result) {
106                                Ok(task) => {
107                                    for event in convert_task_to_events(&invocation_id, &agent_name, task) {
108                                        yield Ok(event);
109                                    }
110                                }
111                                Err(e) => {
112                                    yield Ok(create_error_event(&invocation_id, &agent_name, &format!("failed to parse response: {e}")));
113                                }
114                            }
115                        } else if let Some(error) = rpc_response.error {
116                            yield Ok(create_error_event(&invocation_id, &agent_name, &format!("RPC error: {} ({})", error.message, error.code)));
117                        }
118                    }
119                    Err(e) => {
120                        yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
121                    }
122                }
123            }
124        };
125
126        Ok(Box::pin(stream))
127    }
128}
129
130/// Builder for RemoteA2aAgent
131pub struct RemoteA2aAgentBuilder {
132    name: String,
133    description: String,
134    agent_url: Option<String>,
135    streaming: Option<bool>,
136}
137
138impl RemoteA2aAgentBuilder {
139    pub fn new(name: impl Into<String>) -> Self {
140        Self { name: name.into(), description: String::new(), agent_url: None, streaming: None }
141    }
142
143    pub fn description(mut self, description: impl Into<String>) -> Self {
144        self.description = description.into();
145        self
146    }
147
148    pub fn agent_url(mut self, url: impl Into<String>) -> Self {
149        self.agent_url = Some(url.into());
150        self
151    }
152
153    /// Set whether to use streaming. If not set, auto-detects from agent capabilities.
154    pub fn streaming(mut self, streaming: bool) -> Self {
155        self.streaming = Some(streaming);
156        self
157    }
158
159    pub fn build(self) -> Result<RemoteA2aAgent> {
160        let agent_url = self
161            .agent_url
162            .ok_or_else(|| adk_core::AdkError::agent("RemoteA2aAgent requires agent_url"))?;
163
164        Ok(RemoteA2aAgent::new(RemoteA2aConfig {
165            name: self.name,
166            description: self.description,
167            agent_url,
168            streaming: self.streaming,
169        }))
170    }
171}
172
173// Helper functions
174
175fn get_user_content_from_context(ctx: &dyn InvocationContext) -> Option<String> {
176    let content = ctx.user_content();
177    for part in &content.parts {
178        if let Part::Text { text } = part {
179            return Some(text.clone());
180        }
181    }
182    None
183}
184
185fn build_a2a_message(content: Option<String>) -> crate::a2a::Message {
186    let text = content.unwrap_or_default();
187    crate::a2a::Message::builder()
188        .role(Role::User)
189        .parts(vec![A2aPart::text(text)])
190        .message_id(uuid::Uuid::new_v4().to_string())
191        .build()
192}
193
194fn convert_update_event(
195    invocation_id: &str,
196    agent_name: &str,
197    update: UpdateEvent,
198) -> Option<Event> {
199    match update {
200        UpdateEvent::TaskArtifactUpdate(artifact_event) => {
201            let parts: Vec<Part> = artifact_event
202                .artifact
203                .parts
204                .iter()
205                .filter_map(|p| match p {
206                    A2aPart::Text { text, .. } => Some(Part::Text { text: text.clone() }),
207                    _ => None,
208                })
209                .collect();
210
211            if parts.is_empty() {
212                return None;
213            }
214
215            let mut event = Event::new(invocation_id.to_string());
216            event.author = agent_name.to_string();
217            event.llm_response.content = Some(Content { role: "model".to_string(), parts });
218            event.llm_response.partial = !artifact_event.last_chunk;
219            Some(event)
220        }
221        UpdateEvent::TaskStatusUpdate(status_event) => {
222            // Only create event for final status updates with messages
223            if status_event.final_update
224                && let Some(msg) = status_event.status.message
225            {
226                let mut event = Event::new(invocation_id.to_string());
227                event.author = agent_name.to_string();
228                event.llm_response.content = Some(Content {
229                    role: "model".to_string(),
230                    parts: vec![Part::Text { text: msg }],
231                });
232                event.llm_response.turn_complete = true;
233                return Some(event);
234            }
235            None
236        }
237    }
238}
239
240fn create_error_event(invocation_id: &str, agent_name: &str, error: &str) -> Event {
241    let mut event = Event::new(invocation_id.to_string());
242    event.author = agent_name.to_string();
243    event.llm_response.error_message = Some(error.to_string());
244    event.llm_response.turn_complete = true;
245    event
246}
247
248/// Converts a non-streaming A2A Task response into a sequence of ADK Events.
249fn convert_task_to_events(
250    invocation_id: &str,
251    agent_name: &str,
252    task: crate::a2a::Task,
253) -> Vec<Event> {
254    let mut events = Vec::new();
255
256    // 1. Yield events for artifacts
257    if let Some(artifacts) = task.artifacts {
258        for artifact in artifacts {
259            let update = UpdateEvent::TaskArtifactUpdate(TaskArtifactUpdateEvent {
260                task_id: task.id.clone(),
261                context_id: task.context_id.clone(),
262                artifact,
263                append: false,
264                last_chunk: true,
265            });
266            if let Some(event) = convert_update_event(invocation_id, agent_name, update) {
267                events.push(event);
268            }
269        }
270    }
271
272    // 2. Yield events for history (agent messages)
273    if let Some(history) = task.history {
274        for msg in history {
275            if msg.role == Role::Agent {
276                let parts: Vec<Part> = msg
277                    .parts
278                    .iter()
279                    .filter_map(|p| match p {
280                        A2aPart::Text { text, .. } => Some(Part::Text { text: text.clone() }),
281                        _ => None,
282                    })
283                    .collect();
284
285                if !parts.is_empty() {
286                    let mut event = Event::new(invocation_id.to_string());
287                    event.author = agent_name.to_string();
288                    event.llm_response.content = Some(Content { role: "model".to_string(), parts });
289                    event.llm_response.turn_complete = false;
290                    events.push(event);
291                }
292            }
293        }
294    }
295
296    // 3. Yield final status event
297    let update = UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
298        task_id: task.id,
299        context_id: task.context_id,
300        status: task.status,
301        final_update: true,
302    });
303    if let Some(event) = convert_update_event(invocation_id, agent_name, update) {
304        events.push(event);
305    }
306
307    events
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    #[test]
315    fn test_builder() {
316        let agent = RemoteA2aAgent::builder("test")
317            .description("Test agent")
318            .agent_url("http://localhost:8080")
319            .build()
320            .unwrap();
321
322        assert_eq!(agent.name(), "test");
323        assert_eq!(agent.description(), "Test agent");
324    }
325
326    #[test]
327    fn test_builder_missing_url() {
328        let result = RemoteA2aAgent::builder("test").build();
329        assert!(result.is_err());
330    }
331
332    #[test]
333    fn test_builder_streaming_option() {
334        let agent = RemoteA2aAgent::builder("test")
335            .agent_url("http://localhost:8080")
336            .streaming(true)
337            .build()
338            .unwrap();
339
340        assert_eq!(agent.config.streaming, Some(true));
341    }
342
343    #[test]
344    fn test_builder_streaming_default_is_none() {
345        let agent =
346            RemoteA2aAgent::builder("test").agent_url("http://localhost:8080").build().unwrap();
347
348        assert_eq!(agent.config.streaming, None);
349    }
350
351    #[test]
352    fn test_convert_task_to_events_with_artifacts_and_history() {
353        use crate::a2a::{Artifact, Message, Task, TaskState, TaskStatus};
354
355        let task = Task {
356            id: "task-123".to_string(),
357            context_id: Some("ctx-456".to_string()),
358            status: TaskStatus { state: TaskState::Completed, message: Some("Done".to_string()) },
359            artifacts: Some(vec![Artifact {
360                artifact_id: "art-1".to_string(),
361                name: Some("result".to_string()),
362                description: None,
363                parts: vec![A2aPart::text("artifact content".to_string())],
364                metadata: None,
365                extensions: None,
366            }]),
367            history: Some(vec![
368                Message::builder()
369                    .role(Role::Agent)
370                    .parts(vec![A2aPart::text("Hello from agent".to_string())])
371                    .build(),
372            ]),
373        };
374
375        let events = convert_task_to_events("inv-1", "remote-agent", task);
376
377        // Should have 3 events: artifact, history message, final status
378        assert_eq!(events.len(), 3);
379
380        // Artifact event
381        assert_eq!(events[0].author, "remote-agent");
382        let content = events[0].llm_response.content.as_ref().unwrap();
383        assert_eq!(content.parts[0], Part::Text { text: "artifact content".to_string() });
384
385        // History event
386        assert_eq!(events[1].author, "remote-agent");
387        let content = events[1].llm_response.content.as_ref().unwrap();
388        assert_eq!(content.parts[0], Part::Text { text: "Hello from agent".to_string() });
389        assert!(!events[1].llm_response.turn_complete);
390
391        // Final status event
392        assert_eq!(events[2].author, "remote-agent");
393        assert!(events[2].llm_response.turn_complete);
394    }
395
396    #[test]
397    fn test_convert_task_to_events_empty_task() {
398        use crate::a2a::{Task, TaskState, TaskStatus};
399
400        let task = Task {
401            id: "task-empty".to_string(),
402            context_id: None,
403            status: TaskStatus { state: TaskState::Completed, message: None },
404            artifacts: None,
405            history: None,
406        };
407
408        let events = convert_task_to_events("inv-1", "agent", task);
409
410        // A completed task with no message and no artifacts produces no events
411        // because convert_update_event only emits for final_update with a message
412        assert!(events.is_empty());
413    }
414
415    #[test]
416    fn test_convert_task_to_events_status_with_message() {
417        use crate::a2a::{Task, TaskState, TaskStatus};
418
419        let task = Task {
420            id: "task-msg".to_string(),
421            context_id: None,
422            status: TaskStatus {
423                state: TaskState::Completed,
424                message: Some("All done".to_string()),
425            },
426            artifacts: None,
427            history: None,
428        };
429
430        let events = convert_task_to_events("inv-1", "agent", task);
431
432        assert_eq!(events.len(), 1);
433        assert!(events[0].llm_response.turn_complete);
434        let content = events[0].llm_response.content.as_ref().unwrap();
435        assert_eq!(content.parts[0], Part::Text { text: "All done".to_string() });
436    }
437}
438
439// ── A2A v1.0.0 Remote Agent ─────────────────────────────────────────────────
440
441#[cfg(feature = "a2a-v1")]
442pub mod v1_remote {
443    //! V1.0.0 remote agent wrapper.
444    //!
445    //! Implements the ADK [`Agent`] trait by communicating with a remote A2A
446    //! v1.0.0 agent. Parses the v1 [`AgentCard`] structure including
447    //! `supportedInterfaces`, selects the best interface URL (preferring
448    //! JSONRPC over HTTP+JSON), and uses [`A2aV1Client`] which sends the
449    //! `A2A-Version: 1.0` header on all requests.
450
451    use crate::a2a::client::v1_client::A2aV1Client;
452    use a2a_protocol_types::{AgentCard, AgentInterface};
453    use adk_core::{Agent, Content, Event, EventStream, InvocationContext, Part, Result};
454    use async_trait::async_trait;
455    use std::sync::Arc;
456
457    /// Configuration for a v1.0.0 remote A2A agent.
458    #[derive(Clone)]
459    pub struct RemoteA2aV1Config {
460        /// Name of the agent.
461        pub name: String,
462        /// Description of the agent.
463        pub description: String,
464        /// The v1.0.0 agent card describing the remote agent.
465        pub agent_card: AgentCard,
466        /// Whether to use streaming for communication.
467        /// If `None`, auto-detects from the agent card capabilities.
468        pub streaming: Option<bool>,
469    }
470
471    /// An agent that communicates with a remote A2A v1.0.0 agent.
472    ///
473    /// Selects the best interface URL from the agent card's
474    /// `supportedInterfaces` (preferring JSONRPC, falling back to HTTP+JSON)
475    /// and delegates to [`A2aV1Client`] for protocol-level communication.
476    pub struct RemoteA2aV1Agent {
477        config: RemoteA2aV1Config,
478    }
479
480    impl RemoteA2aV1Agent {
481        /// Creates a new v1 remote agent from the given configuration.
482        pub fn new(config: RemoteA2aV1Config) -> Self {
483            Self { config }
484        }
485
486        /// Selects the best interface from the agent card.
487        ///
488        /// Prefers JSONRPC, falls back to HTTP+JSON.
489        pub fn select_interface(card: &AgentCard) -> Option<&AgentInterface> {
490            card.supported_interfaces.iter().find(|i| i.protocol_binding == "JSONRPC").or_else(
491                || card.supported_interfaces.iter().find(|i| i.protocol_binding == "HTTP+JSON"),
492            )
493        }
494    }
495
496    #[async_trait]
497    impl Agent for RemoteA2aV1Agent {
498        fn name(&self) -> &str {
499            &self.config.name
500        }
501
502        fn description(&self) -> &str {
503            &self.config.description
504        }
505
506        fn sub_agents(&self) -> &[Arc<dyn Agent>] {
507            &[]
508        }
509
510        async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<EventStream> {
511            let card = self.config.agent_card.clone();
512            let invocation_id = ctx.invocation_id().to_string();
513            let agent_name = self.config.name.clone();
514            let config_streaming = self.config.streaming;
515
516            // Get user content from context
517            let user_content = extract_user_text(ctx.as_ref());
518
519            let stream = async_stream::stream! {
520                // Verify we have a usable interface
521                let interface = match Self::select_interface(&card) {
522                    Some(i) => i.clone(),
523                    None => {
524                        yield Ok(create_v1_error_event(
525                            &invocation_id,
526                            &agent_name,
527                            "no supported interface found in agent card (need JSONRPC or HTTP+JSON)",
528                        ));
529                        return;
530                    }
531                };
532
533                // Build a card with the selected interface for the client
534                let client = A2aV1Client::new(card.clone());
535
536                // Determine if we should use streaming
537                let use_streaming = config_streaming.unwrap_or(card.capabilities.streaming.unwrap_or(false));
538
539                // Build a v1 Message from user content
540                let message = build_v1_message(user_content);
541
542                if use_streaming {
543                    // Send streaming message and process the SSE response
544                    match client.send_streaming_message(message).await {
545                        Ok(response) => {
546                            use futures::StreamExt;
547
548                            let mut bytes_stream = response.bytes_stream();
549                            let mut buffer = String::new();
550
551                            while let Some(chunk_result) = bytes_stream.next().await {
552                                let chunk = match chunk_result {
553                                    Ok(c) => c,
554                                    Err(e) => {
555                                        yield Ok(create_v1_error_event(
556                                            &invocation_id,
557                                            &agent_name,
558                                            &format!("stream error: {e}"),
559                                        ));
560                                        break;
561                                    }
562                                };
563
564                                buffer.push_str(&String::from_utf8_lossy(&chunk));
565
566                                // Process complete SSE events (delimited by \n\n)
567                                while let Some(event_end) = buffer.find("\n\n") {
568                                    let event_data = buffer[..event_end].to_string();
569                                    buffer = buffer[event_end + 2..].to_string();
570
571                                    if let Some(data) = parse_sse_data_line(&event_data) {
572                                        if data.is_empty() {
573                                            continue;
574                                        }
575
576                                        // Parse as StreamResponse (may be wrapped in JSON-RPC or direct)
577                                        if let Some(event) = parse_stream_response(
578                                            &data,
579                                            &invocation_id,
580                                            &agent_name,
581                                        ) {
582                                            yield Ok(event);
583                                        }
584                                    }
585                                }
586                            }
587                        }
588                        Err(e) => {
589                            yield Ok(create_v1_error_event(
590                                &invocation_id,
591                                &agent_name,
592                                &format!("failed to send streaming message: {e}"),
593                            ));
594                        }
595                    }
596                } else {
597                    // Send non-streaming message
598                    match client.send_message(message).await {
599                        Ok(task) => {
600                            // Yield events for artifacts
601                            if let Some(artifacts) = task.artifacts {
602                                for artifact in artifacts {
603                                    let resp = a2a_protocol_types::events::StreamResponse::ArtifactUpdate(
604                                        a2a_protocol_types::events::TaskArtifactUpdateEvent {
605                                            task_id: task.id.clone(),
606                                            context_id: task.context_id.clone(),
607                                            artifact,
608                                            append: Some(false),
609                                            last_chunk: Some(true),
610                                            metadata: None,
611                                        }
612                                    );
613                                    if let Some(event) = convert_stream_response(&resp, &invocation_id, &agent_name) {
614                                        yield Ok(event);
615                                    }
616                                }
617                            }
618
619                            // Yield events for history (agent messages)
620                            if let Some(history) = task.history {
621                                for msg in history {
622                                    if msg.role == a2a_protocol_types::MessageRole::Agent {
623                                        use a2a_protocol_types::PartContent;
624                                        let text_parts: Vec<Part> = msg.parts.iter().filter_map(|p| {
625                                            match &p.content {
626                                                PartContent::Text(text) => Some(Part::Text { text: text.clone() }),
627                                                _ => None,
628                                            }
629                                        }).collect();
630
631                                        if !text_parts.is_empty() {
632                                            let mut event = Event::new(invocation_id.clone());
633                                            event.author = agent_name.clone();
634                                            event.llm_response.content = Some(Content { role: "model".to_string(), parts: text_parts });
635                                            event.llm_response.turn_complete = false;
636                                            yield Ok(event);
637                                        }
638                                    }
639                                }
640                            }
641
642                            // Yield final status event
643                            let resp = a2a_protocol_types::events::StreamResponse::StatusUpdate(
644                                a2a_protocol_types::events::TaskStatusUpdateEvent {
645                                    task_id: task.id,
646                                    context_id: task.context_id,
647                                    status: task.status,
648                                    metadata: None,
649                                }
650                            );
651                            if let Some(event) = convert_stream_response(&resp, &invocation_id, &agent_name) {
652                                yield Ok(event);
653                            }
654                        }
655                        Err(e) => {
656                            yield Ok(create_v1_error_event(
657                                &invocation_id,
658                                &agent_name,
659                                &format!("failed to send message: {e}"),
660                            ));
661                        }
662                    }
663                }
664
665                let _ = interface;
666            };
667
668            Ok(Box::pin(stream))
669        }
670    }
671
672    /// Extracts user text from the invocation context.
673    fn extract_user_text(ctx: &dyn InvocationContext) -> Option<String> {
674        let content = ctx.user_content();
675        for part in &content.parts {
676            if let Part::Text { text } = part {
677                return Some(text.clone());
678            }
679        }
680        None
681    }
682
683    /// Builds a v1 `Message` from user text content.
684    fn build_v1_message(content: Option<String>) -> a2a_protocol_types::Message {
685        let text = content.unwrap_or_default();
686        a2a_protocol_types::Message {
687            id: a2a_protocol_types::MessageId::new(uuid::Uuid::new_v4().to_string()),
688            role: a2a_protocol_types::MessageRole::User,
689            parts: vec![a2a_protocol_types::Part::text(text)],
690            task_id: None,
691            context_id: None,
692            reference_task_ids: None,
693            extensions: None,
694            metadata: None,
695        }
696    }
697
698    /// Parses the `data:` field from an SSE event line.
699    fn parse_sse_data_line(event: &str) -> Option<String> {
700        for line in event.lines() {
701            if let Some(data) = line.strip_prefix("data:") {
702                return Some(data.trim().to_string());
703            }
704        }
705        None
706    }
707
708    /// Attempts to parse an SSE data payload as a StreamResponse (either
709    /// direct JSON or wrapped in a JSON-RPC response) and converts it to
710    /// an ADK Event.
711    fn parse_stream_response(data: &str, invocation_id: &str, agent_name: &str) -> Option<Event> {
712        use a2a_protocol_types::events::StreamResponse;
713
714        // Try direct StreamResponse first (REST binding)
715        if let Ok(stream_resp) = serde_json::from_str::<StreamResponse>(data) {
716            return convert_stream_response(&stream_resp, invocation_id, agent_name);
717        }
718
719        // Try JSON-RPC wrapped response
720        if let Ok(rpc_value) = serde_json::from_str::<serde_json::Value>(data) {
721            if let Some(result) = rpc_value.get("result") {
722                if let Ok(stream_resp) = serde_json::from_value::<StreamResponse>(result.clone()) {
723                    return convert_stream_response(&stream_resp, invocation_id, agent_name);
724                }
725            }
726            // Check for JSON-RPC error
727            if let Some(error) = rpc_value.get("error") {
728                let message =
729                    error.get("message").and_then(|m| m.as_str()).unwrap_or("unknown error");
730                let code = error.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
731                return Some(create_v1_error_event(
732                    invocation_id,
733                    agent_name,
734                    &format!("RPC error {code}: {message}"),
735                ));
736            }
737        }
738
739        tracing::debug!("failed to parse SSE data as StreamResponse: {data}");
740        None
741    }
742
743    /// Converts a `StreamResponse` into an ADK `Event`.
744    fn convert_stream_response(
745        resp: &a2a_protocol_types::events::StreamResponse,
746        invocation_id: &str,
747        agent_name: &str,
748    ) -> Option<Event> {
749        use a2a_protocol_types::events::StreamResponse;
750
751        match resp {
752            StreamResponse::ArtifactUpdate(artifact_event) => {
753                use a2a_protocol_types::PartContent;
754                let parts: Vec<Part> = artifact_event
755                    .artifact
756                    .parts
757                    .iter()
758                    .filter_map(|p| match &p.content {
759                        PartContent::Text(text) => Some(Part::Text { text: text.clone() }),
760                        _ => None,
761                    })
762                    .collect();
763
764                if parts.is_empty() {
765                    return None;
766                }
767
768                let mut event = Event::new(invocation_id.to_string());
769                event.author = agent_name.to_string();
770                event.llm_response.content = Some(Content { role: "model".to_string(), parts });
771                event.llm_response.partial = !artifact_event.last_chunk.unwrap_or(true);
772                Some(event)
773            }
774            StreamResponse::StatusUpdate(status_event) => {
775                // In v1, the message field on TaskStatus is a Message object
776                // (not a String like in v0.3). Extract text from its parts.
777                let is_terminal = matches!(
778                    status_event.status.state,
779                    a2a_protocol_types::task::TaskState::Completed
780                        | a2a_protocol_types::task::TaskState::Failed
781                        | a2a_protocol_types::task::TaskState::Canceled
782                        | a2a_protocol_types::task::TaskState::Rejected
783                );
784
785                if let Some(ref msg) = status_event.status.message {
786                    use a2a_protocol_types::PartContent;
787                    let text_parts: Vec<Part> = msg
788                        .parts
789                        .iter()
790                        .filter_map(|p| match &p.content {
791                            PartContent::Text(text) => Some(Part::Text { text: text.clone() }),
792                            _ => None,
793                        })
794                        .collect();
795
796                    if !text_parts.is_empty() {
797                        let mut event = Event::new(invocation_id.to_string());
798                        event.author = agent_name.to_string();
799                        event.llm_response.content =
800                            Some(Content { role: "model".to_string(), parts: text_parts });
801                        event.llm_response.turn_complete = is_terminal;
802                        return Some(event);
803                    }
804                }
805
806                // For terminal states without a message, emit a turn-complete event
807                if is_terminal {
808                    let mut event = Event::new(invocation_id.to_string());
809                    event.author = agent_name.to_string();
810                    event.llm_response.turn_complete = true;
811                    return Some(event);
812                }
813
814                None
815            }
816            // Task and Message variants — emit text if available
817            _ => None,
818        }
819    }
820
821    /// Creates an error event for the v1 remote agent.
822    fn create_v1_error_event(invocation_id: &str, agent_name: &str, error: &str) -> Event {
823        let mut event = Event::new(invocation_id.to_string());
824        event.author = agent_name.to_string();
825        event.llm_response.error_message = Some(error.to_string());
826        event.llm_response.turn_complete = true;
827        event
828    }
829
830    #[cfg(test)]
831    mod tests {
832        use super::*;
833        use a2a_protocol_types::{AgentCapabilities, AgentInterface, AgentSkill};
834
835        fn make_test_card() -> AgentCard {
836            AgentCard {
837                name: "test-v1-agent".to_string(),
838                url: Some("http://localhost:9999".to_string()),
839                description: "A test v1 agent".to_string(),
840                version: "1.0.0".to_string(),
841                supported_interfaces: vec![
842                    AgentInterface {
843                        url: "http://localhost:9999/a2a".to_string(),
844                        protocol_binding: "JSONRPC".to_string(),
845                        protocol_version: "1.0".to_string(),
846                        tenant: None,
847                    },
848                    AgentInterface {
849                        url: "http://localhost:9999/rest".to_string(),
850                        protocol_binding: "HTTP+JSON".to_string(),
851                        protocol_version: "1.0".to_string(),
852                        tenant: None,
853                    },
854                ],
855                default_input_modes: vec!["text/plain".to_string()],
856                default_output_modes: vec!["text/plain".to_string()],
857                skills: vec![AgentSkill {
858                    id: "echo".to_string(),
859                    name: "Echo".to_string(),
860                    description: "Echoes input".to_string(),
861                    tags: vec![],
862                    examples: None,
863                    input_modes: None,
864                    output_modes: None,
865                    security_requirements: None,
866                }],
867                capabilities: AgentCapabilities::default(),
868                provider: None,
869                icon_url: None,
870                documentation_url: None,
871                security_schemes: None,
872                security_requirements: None,
873                signatures: None,
874            }
875        }
876
877        #[test]
878        fn select_interface_prefers_jsonrpc() {
879            let card = make_test_card();
880            let selected = RemoteA2aV1Agent::select_interface(&card).unwrap();
881            assert_eq!(selected.protocol_binding, "JSONRPC");
882            assert_eq!(selected.url, "http://localhost:9999/a2a");
883        }
884
885        #[test]
886        fn select_interface_falls_back_to_http_json() {
887            let mut card = make_test_card();
888            card.supported_interfaces.retain(|i| i.protocol_binding != "JSONRPC");
889            let selected = RemoteA2aV1Agent::select_interface(&card).unwrap();
890            assert_eq!(selected.protocol_binding, "HTTP+JSON");
891            assert_eq!(selected.url, "http://localhost:9999/rest");
892        }
893
894        #[test]
895        fn select_interface_returns_none_for_unsupported() {
896            let mut card = make_test_card();
897            card.supported_interfaces = vec![AgentInterface {
898                url: "grpc://localhost:9999".to_string(),
899                protocol_binding: "GRPC".to_string(),
900                protocol_version: "1.0".to_string(),
901                tenant: None,
902            }];
903            assert!(RemoteA2aV1Agent::select_interface(&card).is_none());
904        }
905
906        #[test]
907        fn select_interface_returns_none_for_empty() {
908            let mut card = make_test_card();
909            card.supported_interfaces = vec![];
910            assert!(RemoteA2aV1Agent::select_interface(&card).is_none());
911        }
912
913        #[test]
914        fn new_agent_stores_config() {
915            let card = make_test_card();
916            let agent = RemoteA2aV1Agent::new(RemoteA2aV1Config {
917                name: "my-agent".to_string(),
918                description: "My remote agent".to_string(),
919                agent_card: card,
920            });
921            assert_eq!(agent.name(), "my-agent");
922            assert_eq!(agent.description(), "My remote agent");
923        }
924
925        #[test]
926        fn agent_has_no_sub_agents() {
927            let card = make_test_card();
928            let agent = RemoteA2aV1Agent::new(RemoteA2aV1Config {
929                name: "test".to_string(),
930                description: "test".to_string(),
931                agent_card: card,
932            });
933            assert!(agent.sub_agents().is_empty());
934        }
935
936        #[test]
937        fn build_v1_message_with_content() {
938            let msg = build_v1_message(Some("hello".to_string()));
939            assert_eq!(msg.role, a2a_protocol_types::MessageRole::User);
940            assert_eq!(msg.parts.len(), 1);
941            assert_eq!(msg.parts[0].text_content(), Some("hello"));
942        }
943
944        #[test]
945        fn build_v1_message_without_content() {
946            let msg = build_v1_message(None);
947            assert_eq!(msg.parts[0].text_content(), Some(""));
948        }
949
950        #[test]
951        fn parse_sse_data_line_extracts_data() {
952            let event = "event: message\ndata: {\"test\": true}\n";
953            assert_eq!(parse_sse_data_line(event), Some("{\"test\": true}".to_string()));
954        }
955
956        #[test]
957        fn parse_sse_data_line_returns_none_without_data() {
958            let event = "event: ping\n";
959            assert!(parse_sse_data_line(event).is_none());
960        }
961
962        #[test]
963        fn convert_status_update_with_message() {
964            use a2a_protocol_types::events::TaskStatusUpdateEvent;
965            use a2a_protocol_types::task::{ContextId, TaskId, TaskState, TaskStatus};
966
967            let mut status = TaskStatus::new(TaskState::Completed);
968            status.message = Some(a2a_protocol_types::Message {
969                id: a2a_protocol_types::MessageId::new("msg-1"),
970                role: a2a_protocol_types::MessageRole::Agent,
971                parts: vec![a2a_protocol_types::Part::text("done!")],
972                task_id: None,
973                context_id: None,
974                reference_task_ids: None,
975                extensions: None,
976                metadata: None,
977            });
978
979            let status_event = TaskStatusUpdateEvent {
980                task_id: TaskId::new("task-1"),
981                context_id: ContextId::new("ctx-1"),
982                status,
983                metadata: None,
984            };
985
986            let resp = a2a_protocol_types::events::StreamResponse::StatusUpdate(status_event);
987            let event = convert_stream_response(&resp, "inv-1", "agent-1").unwrap();
988
989            assert_eq!(event.author, "agent-1");
990            assert!(event.llm_response.turn_complete);
991            let content = event.llm_response.content.unwrap();
992            assert_eq!(content.parts.len(), 1);
993            match &content.parts[0] {
994                Part::Text { text } => assert_eq!(text, "done!"),
995                _ => panic!("expected text part"),
996            }
997        }
998
999        #[test]
1000        fn convert_status_update_terminal_without_message() {
1001            use a2a_protocol_types::events::TaskStatusUpdateEvent;
1002            use a2a_protocol_types::task::{ContextId, TaskId, TaskState, TaskStatus};
1003
1004            let status_event = TaskStatusUpdateEvent {
1005                task_id: TaskId::new("task-1"),
1006                context_id: ContextId::new("ctx-1"),
1007                status: TaskStatus::new(TaskState::Failed),
1008                metadata: None,
1009            };
1010
1011            let resp = a2a_protocol_types::events::StreamResponse::StatusUpdate(status_event);
1012            let event = convert_stream_response(&resp, "inv-1", "agent-1").unwrap();
1013
1014            assert!(event.llm_response.turn_complete);
1015            assert!(event.llm_response.content.is_none());
1016        }
1017
1018        #[test]
1019        fn convert_status_update_non_terminal_without_message() {
1020            use a2a_protocol_types::events::TaskStatusUpdateEvent;
1021            use a2a_protocol_types::task::{ContextId, TaskId, TaskState, TaskStatus};
1022
1023            let status_event = TaskStatusUpdateEvent {
1024                task_id: TaskId::new("task-1"),
1025                context_id: ContextId::new("ctx-1"),
1026                status: TaskStatus::new(TaskState::Working),
1027                metadata: None,
1028            };
1029
1030            let resp = a2a_protocol_types::events::StreamResponse::StatusUpdate(status_event);
1031            let result = convert_stream_response(&resp, "inv-1", "agent-1");
1032
1033            // Non-terminal without message produces no event
1034            assert!(result.is_none());
1035        }
1036
1037        #[test]
1038        fn convert_artifact_update_with_text() {
1039            use a2a_protocol_types::artifact::{Artifact, ArtifactId};
1040            use a2a_protocol_types::events::TaskArtifactUpdateEvent;
1041            use a2a_protocol_types::task::{ContextId, TaskId};
1042
1043            let artifact_event = TaskArtifactUpdateEvent {
1044                task_id: TaskId::new("task-1"),
1045                context_id: ContextId::new("ctx-1"),
1046                artifact: Artifact {
1047                    id: ArtifactId::new("art-1"),
1048                    name: Some("result".to_string()),
1049                    description: None,
1050                    parts: vec![a2a_protocol_types::Part::text("artifact content")],
1051                    extensions: None,
1052                    metadata: None,
1053                },
1054                append: None,
1055                last_chunk: Some(true),
1056                metadata: None,
1057            };
1058
1059            let resp = a2a_protocol_types::events::StreamResponse::ArtifactUpdate(artifact_event);
1060            let event = convert_stream_response(&resp, "inv-1", "agent-1").unwrap();
1061
1062            assert_eq!(event.author, "agent-1");
1063            let content = event.llm_response.content.unwrap();
1064            assert_eq!(content.parts.len(), 1);
1065            match &content.parts[0] {
1066                Part::Text { text } => assert_eq!(text, "artifact content"),
1067                _ => panic!("expected text part"),
1068            }
1069            // last_chunk=true means partial=false
1070            assert!(!event.llm_response.partial);
1071        }
1072
1073        #[test]
1074        fn convert_artifact_update_partial() {
1075            use a2a_protocol_types::artifact::{Artifact, ArtifactId};
1076            use a2a_protocol_types::events::TaskArtifactUpdateEvent;
1077            use a2a_protocol_types::task::{ContextId, TaskId};
1078
1079            let artifact_event = TaskArtifactUpdateEvent {
1080                task_id: TaskId::new("task-1"),
1081                context_id: ContextId::new("ctx-1"),
1082                artifact: Artifact {
1083                    id: ArtifactId::new("art-1"),
1084                    name: None,
1085                    description: None,
1086                    parts: vec![a2a_protocol_types::Part::text("partial...")],
1087                    extensions: None,
1088                    metadata: None,
1089                },
1090                append: None,
1091                last_chunk: Some(false),
1092                metadata: None,
1093            };
1094
1095            let resp = a2a_protocol_types::events::StreamResponse::ArtifactUpdate(artifact_event);
1096            let event = convert_stream_response(&resp, "inv-1", "agent-1").unwrap();
1097
1098            assert!(event.llm_response.partial);
1099        }
1100
1101        #[test]
1102        fn create_v1_error_event_sets_fields() {
1103            let event = create_v1_error_event("inv-1", "agent-1", "something broke");
1104            assert_eq!(event.author, "agent-1");
1105            assert_eq!(event.llm_response.error_message.as_deref(), Some("something broke"));
1106            assert!(event.llm_response.turn_complete);
1107        }
1108    }
1109}