Skip to main content

adk_server/a2a/
remote_agent.rs

1use crate::a2a::{A2aClient, Part as A2aPart, Role, UpdateEvent};
2use adk_core::{Agent, Content, Event, EventStream, InvocationContext, Part, Result};
3use async_trait::async_trait;
4use std::sync::Arc;
5
6/// Configuration for a remote A2A agent
7#[derive(Clone)]
8pub struct RemoteA2aConfig {
9    /// Name of the agent
10    pub name: String,
11    /// Description of the agent
12    pub description: String,
13    /// Base URL of the remote agent (e.g., "http://localhost:8080")
14    /// The agent card will be fetched from {base_url}/.well-known/agent.json
15    pub agent_url: String,
16}
17
18/// An agent that communicates with a remote A2A agent
19pub struct RemoteA2aAgent {
20    config: RemoteA2aConfig,
21}
22
23impl RemoteA2aAgent {
24    pub fn new(config: RemoteA2aConfig) -> Self {
25        Self { config }
26    }
27
28    pub fn builder(name: impl Into<String>) -> RemoteA2aAgentBuilder {
29        RemoteA2aAgentBuilder::new(name)
30    }
31}
32
33#[async_trait]
34impl Agent for RemoteA2aAgent {
35    fn name(&self) -> &str {
36        &self.config.name
37    }
38
39    fn description(&self) -> &str {
40        &self.config.description
41    }
42
43    fn sub_agents(&self) -> &[Arc<dyn Agent>] {
44        &[]
45    }
46
47    async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<EventStream> {
48        let url = self.config.agent_url.clone();
49        let invocation_id = ctx.invocation_id().to_string();
50        let agent_name = self.config.name.clone();
51
52        // Get user content from context
53        let user_content = get_user_content_from_context(ctx.as_ref());
54
55        let stream = async_stream::stream! {
56            // Create A2A client
57            let client = match A2aClient::from_url(&url).await {
58                Ok(c) => c,
59                Err(e) => {
60                    yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
61                    return;
62                }
63            };
64
65            // Build message from user content
66            let message = build_a2a_message(user_content);
67
68            // Send streaming message
69            match client.send_streaming_message(message).await {
70                Ok(mut event_stream) => {
71                    use futures::StreamExt;
72                    while let Some(result) = event_stream.next().await {
73                        match result {
74                            Ok(update_event) => {
75                                if let Some(event) = convert_update_event(&invocation_id, &agent_name, update_event) {
76                                    yield Ok(event);
77                                }
78                            }
79                            Err(e) => {
80                                yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
81                                return;
82                            }
83                        }
84                    }
85                }
86                Err(e) => {
87                    yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
88                }
89            }
90        };
91
92        Ok(Box::pin(stream))
93    }
94}
95
96/// Builder for RemoteA2aAgent
97pub struct RemoteA2aAgentBuilder {
98    name: String,
99    description: String,
100    agent_url: Option<String>,
101}
102
103impl RemoteA2aAgentBuilder {
104    pub fn new(name: impl Into<String>) -> Self {
105        Self { name: name.into(), description: String::new(), agent_url: None }
106    }
107
108    pub fn description(mut self, description: impl Into<String>) -> Self {
109        self.description = description.into();
110        self
111    }
112
113    pub fn agent_url(mut self, url: impl Into<String>) -> Self {
114        self.agent_url = Some(url.into());
115        self
116    }
117
118    pub fn build(self) -> Result<RemoteA2aAgent> {
119        let agent_url = self
120            .agent_url
121            .ok_or_else(|| adk_core::AdkError::agent("RemoteA2aAgent requires agent_url"))?;
122
123        Ok(RemoteA2aAgent::new(RemoteA2aConfig {
124            name: self.name,
125            description: self.description,
126            agent_url,
127        }))
128    }
129}
130
131// Helper functions
132
133fn get_user_content_from_context(ctx: &dyn InvocationContext) -> Option<String> {
134    let content = ctx.user_content();
135    for part in &content.parts {
136        if let Part::Text { text } = part {
137            return Some(text.clone());
138        }
139    }
140    None
141}
142
143fn build_a2a_message(content: Option<String>) -> crate::a2a::Message {
144    let text = content.unwrap_or_default();
145    crate::a2a::Message::builder()
146        .role(Role::User)
147        .parts(vec![A2aPart::text(text)])
148        .message_id(uuid::Uuid::new_v4().to_string())
149        .build()
150}
151
152fn convert_update_event(
153    invocation_id: &str,
154    agent_name: &str,
155    update: UpdateEvent,
156) -> Option<Event> {
157    match update {
158        UpdateEvent::TaskArtifactUpdate(artifact_event) => {
159            let parts: Vec<Part> = artifact_event
160                .artifact
161                .parts
162                .iter()
163                .filter_map(|p| match p {
164                    A2aPart::Text { text, .. } => Some(Part::Text { text: text.clone() }),
165                    _ => None,
166                })
167                .collect();
168
169            if parts.is_empty() {
170                return None;
171            }
172
173            let mut event = Event::new(invocation_id.to_string());
174            event.author = agent_name.to_string();
175            event.llm_response.content = Some(Content { role: "model".to_string(), parts });
176            event.llm_response.partial = !artifact_event.last_chunk;
177            Some(event)
178        }
179        UpdateEvent::TaskStatusUpdate(status_event) => {
180            // Only create event for final status updates with messages
181            if status_event.final_update {
182                if let Some(msg) = status_event.status.message {
183                    let mut event = Event::new(invocation_id.to_string());
184                    event.author = agent_name.to_string();
185                    event.llm_response.content = Some(Content {
186                        role: "model".to_string(),
187                        parts: vec![Part::Text { text: msg }],
188                    });
189                    event.llm_response.turn_complete = true;
190                    return Some(event);
191                }
192            }
193            None
194        }
195    }
196}
197
198fn create_error_event(invocation_id: &str, agent_name: &str, error: &str) -> Event {
199    let mut event = Event::new(invocation_id.to_string());
200    event.author = agent_name.to_string();
201    event.llm_response.error_message = Some(error.to_string());
202    event.llm_response.turn_complete = true;
203    event
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209
210    #[test]
211    fn test_builder() {
212        let agent = RemoteA2aAgent::builder("test")
213            .description("Test agent")
214            .agent_url("http://localhost:8080")
215            .build()
216            .unwrap();
217
218        assert_eq!(agent.name(), "test");
219        assert_eq!(agent.description(), "Test agent");
220    }
221
222    #[test]
223    fn test_builder_missing_url() {
224        let result = RemoteA2aAgent::builder("test").build();
225        assert!(result.is_err());
226    }
227}
228
229// ── A2A v1.0.0 Remote Agent ─────────────────────────────────────────────────
230
231#[cfg(feature = "a2a-v1")]
232pub mod v1_remote {
233    //! V1.0.0 remote agent wrapper.
234    //!
235    //! Implements the ADK [`Agent`] trait by communicating with a remote A2A
236    //! v1.0.0 agent. Parses the v1 [`AgentCard`] structure including
237    //! `supportedInterfaces`, selects the best interface URL (preferring
238    //! JSONRPC over HTTP+JSON), and uses [`A2aV1Client`] which sends the
239    //! `A2A-Version: 1.0` header on all requests.
240
241    use crate::a2a::client::v1_client::A2aV1Client;
242    use a2a_protocol_types::{AgentCard, AgentInterface};
243    use adk_core::{Agent, Content, Event, EventStream, InvocationContext, Part, Result};
244    use async_trait::async_trait;
245    use std::sync::Arc;
246
247    /// Configuration for a v1.0.0 remote A2A agent.
248    #[derive(Clone)]
249    pub struct RemoteA2aV1Config {
250        /// Name of the agent.
251        pub name: String,
252        /// Description of the agent.
253        pub description: String,
254        /// The v1.0.0 agent card describing the remote agent.
255        pub agent_card: AgentCard,
256    }
257
258    /// An agent that communicates with a remote A2A v1.0.0 agent.
259    ///
260    /// Selects the best interface URL from the agent card's
261    /// `supportedInterfaces` (preferring JSONRPC, falling back to HTTP+JSON)
262    /// and delegates to [`A2aV1Client`] for protocol-level communication.
263    pub struct RemoteA2aV1Agent {
264        config: RemoteA2aV1Config,
265    }
266
267    impl RemoteA2aV1Agent {
268        /// Creates a new v1 remote agent from the given configuration.
269        pub fn new(config: RemoteA2aV1Config) -> Self {
270            Self { config }
271        }
272
273        /// Selects the best interface from the agent card.
274        ///
275        /// Prefers JSONRPC, falls back to HTTP+JSON.
276        pub fn select_interface(card: &AgentCard) -> Option<&AgentInterface> {
277            card.supported_interfaces.iter().find(|i| i.protocol_binding == "JSONRPC").or_else(
278                || card.supported_interfaces.iter().find(|i| i.protocol_binding == "HTTP+JSON"),
279            )
280        }
281    }
282
283    #[async_trait]
284    impl Agent for RemoteA2aV1Agent {
285        fn name(&self) -> &str {
286            &self.config.name
287        }
288
289        fn description(&self) -> &str {
290            &self.config.description
291        }
292
293        fn sub_agents(&self) -> &[Arc<dyn Agent>] {
294            &[]
295        }
296
297        async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<EventStream> {
298            let card = self.config.agent_card.clone();
299            let invocation_id = ctx.invocation_id().to_string();
300            let agent_name = self.config.name.clone();
301
302            // Get user content from context
303            let user_content = extract_user_text(ctx.as_ref());
304
305            let stream = async_stream::stream! {
306                // Verify we have a usable interface
307                let interface = match Self::select_interface(&card) {
308                    Some(i) => i.clone(),
309                    None => {
310                        yield Ok(create_v1_error_event(
311                            &invocation_id,
312                            &agent_name,
313                            "no supported interface found in agent card (need JSONRPC or HTTP+JSON)",
314                        ));
315                        return;
316                    }
317                };
318
319                // Build a card with the selected interface for the client
320                let client = A2aV1Client::new(card.clone());
321
322                // Build a v1 Message from user content
323                let message = build_v1_message(user_content);
324
325                // Send streaming message and process the SSE response
326                match client.send_streaming_message(message).await {
327                    Ok(response) => {
328                        use futures::StreamExt;
329
330                        let mut bytes_stream = response.bytes_stream();
331                        let mut buffer = String::new();
332
333                        while let Some(chunk_result) = bytes_stream.next().await {
334                            let chunk = match chunk_result {
335                                Ok(c) => c,
336                                Err(e) => {
337                                    yield Ok(create_v1_error_event(
338                                        &invocation_id,
339                                        &agent_name,
340                                        &format!("stream error: {e}"),
341                                    ));
342                                    break;
343                                }
344                            };
345
346                            buffer.push_str(&String::from_utf8_lossy(&chunk));
347
348                            // Process complete SSE events (delimited by \n\n)
349                            while let Some(event_end) = buffer.find("\n\n") {
350                                let event_data = buffer[..event_end].to_string();
351                                buffer = buffer[event_end + 2..].to_string();
352
353                                if let Some(data) = parse_sse_data_line(&event_data) {
354                                    if data.is_empty() {
355                                        continue;
356                                    }
357
358                                    // Parse as StreamResponse (may be wrapped in JSON-RPC or direct)
359                                    if let Some(event) = parse_stream_response(
360                                        &data,
361                                        &invocation_id,
362                                        &agent_name,
363                                    ) {
364                                        yield Ok(event);
365                                    }
366                                }
367                            }
368                        }
369                    }
370                    Err(e) => {
371                        yield Ok(create_v1_error_event(
372                            &invocation_id,
373                            &agent_name,
374                            &format!("failed to send streaming message: {e}"),
375                        ));
376                    }
377                }
378
379                let _ = interface;
380            };
381
382            Ok(Box::pin(stream))
383        }
384    }
385
386    /// Extracts user text from the invocation context.
387    fn extract_user_text(ctx: &dyn InvocationContext) -> Option<String> {
388        let content = ctx.user_content();
389        for part in &content.parts {
390            if let Part::Text { text } = part {
391                return Some(text.clone());
392            }
393        }
394        None
395    }
396
397    /// Builds a v1 `Message` from user text content.
398    fn build_v1_message(content: Option<String>) -> a2a_protocol_types::Message {
399        let text = content.unwrap_or_default();
400        a2a_protocol_types::Message {
401            id: a2a_protocol_types::MessageId::new(uuid::Uuid::new_v4().to_string()),
402            role: a2a_protocol_types::MessageRole::User,
403            parts: vec![a2a_protocol_types::Part::text(text)],
404            task_id: None,
405            context_id: None,
406            reference_task_ids: None,
407            extensions: None,
408            metadata: None,
409        }
410    }
411
412    /// Parses the `data:` field from an SSE event line.
413    fn parse_sse_data_line(event: &str) -> Option<String> {
414        for line in event.lines() {
415            if let Some(data) = line.strip_prefix("data:") {
416                return Some(data.trim().to_string());
417            }
418        }
419        None
420    }
421
422    /// Attempts to parse an SSE data payload as a StreamResponse (either
423    /// direct JSON or wrapped in a JSON-RPC response) and converts it to
424    /// an ADK Event.
425    fn parse_stream_response(data: &str, invocation_id: &str, agent_name: &str) -> Option<Event> {
426        use a2a_protocol_types::events::StreamResponse;
427
428        // Try direct StreamResponse first (REST binding)
429        if let Ok(stream_resp) = serde_json::from_str::<StreamResponse>(data) {
430            return convert_stream_response(&stream_resp, invocation_id, agent_name);
431        }
432
433        // Try JSON-RPC wrapped response
434        if let Ok(rpc_value) = serde_json::from_str::<serde_json::Value>(data) {
435            if let Some(result) = rpc_value.get("result") {
436                if let Ok(stream_resp) = serde_json::from_value::<StreamResponse>(result.clone()) {
437                    return convert_stream_response(&stream_resp, invocation_id, agent_name);
438                }
439            }
440            // Check for JSON-RPC error
441            if let Some(error) = rpc_value.get("error") {
442                let message =
443                    error.get("message").and_then(|m| m.as_str()).unwrap_or("unknown error");
444                let code = error.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
445                return Some(create_v1_error_event(
446                    invocation_id,
447                    agent_name,
448                    &format!("RPC error {code}: {message}"),
449                ));
450            }
451        }
452
453        tracing::debug!("failed to parse SSE data as StreamResponse: {data}");
454        None
455    }
456
457    /// Converts a `StreamResponse` into an ADK `Event`.
458    fn convert_stream_response(
459        resp: &a2a_protocol_types::events::StreamResponse,
460        invocation_id: &str,
461        agent_name: &str,
462    ) -> Option<Event> {
463        use a2a_protocol_types::events::StreamResponse;
464
465        match resp {
466            StreamResponse::ArtifactUpdate(artifact_event) => {
467                use a2a_protocol_types::PartContent;
468                let parts: Vec<Part> = artifact_event
469                    .artifact
470                    .parts
471                    .iter()
472                    .filter_map(|p| match &p.content {
473                        PartContent::Text(text) => Some(Part::Text { text: text.clone() }),
474                        _ => None,
475                    })
476                    .collect();
477
478                if parts.is_empty() {
479                    return None;
480                }
481
482                let mut event = Event::new(invocation_id.to_string());
483                event.author = agent_name.to_string();
484                event.llm_response.content = Some(Content { role: "model".to_string(), parts });
485                event.llm_response.partial = !artifact_event.last_chunk.unwrap_or(true);
486                Some(event)
487            }
488            StreamResponse::StatusUpdate(status_event) => {
489                // In v1, the message field on TaskStatus is a Message object
490                // (not a String like in v0.3). Extract text from its parts.
491                let is_terminal = matches!(
492                    status_event.status.state,
493                    a2a_protocol_types::task::TaskState::Completed
494                        | a2a_protocol_types::task::TaskState::Failed
495                        | a2a_protocol_types::task::TaskState::Canceled
496                        | a2a_protocol_types::task::TaskState::Rejected
497                );
498
499                if let Some(ref msg) = status_event.status.message {
500                    use a2a_protocol_types::PartContent;
501                    let text_parts: Vec<Part> = msg
502                        .parts
503                        .iter()
504                        .filter_map(|p| match &p.content {
505                            PartContent::Text(text) => Some(Part::Text { text: text.clone() }),
506                            _ => None,
507                        })
508                        .collect();
509
510                    if !text_parts.is_empty() {
511                        let mut event = Event::new(invocation_id.to_string());
512                        event.author = agent_name.to_string();
513                        event.llm_response.content =
514                            Some(Content { role: "model".to_string(), parts: text_parts });
515                        event.llm_response.turn_complete = is_terminal;
516                        return Some(event);
517                    }
518                }
519
520                // For terminal states without a message, emit a turn-complete event
521                if is_terminal {
522                    let mut event = Event::new(invocation_id.to_string());
523                    event.author = agent_name.to_string();
524                    event.llm_response.turn_complete = true;
525                    return Some(event);
526                }
527
528                None
529            }
530            // Task and Message variants — emit text if available
531            _ => None,
532        }
533    }
534
535    /// Creates an error event for the v1 remote agent.
536    fn create_v1_error_event(invocation_id: &str, agent_name: &str, error: &str) -> Event {
537        let mut event = Event::new(invocation_id.to_string());
538        event.author = agent_name.to_string();
539        event.llm_response.error_message = Some(error.to_string());
540        event.llm_response.turn_complete = true;
541        event
542    }
543
544    #[cfg(test)]
545    mod tests {
546        use super::*;
547        use a2a_protocol_types::{AgentCapabilities, AgentInterface, AgentSkill};
548
549        fn make_test_card() -> AgentCard {
550            AgentCard {
551                name: "test-v1-agent".to_string(),
552                url: Some("http://localhost:9999".to_string()),
553                description: "A test v1 agent".to_string(),
554                version: "1.0.0".to_string(),
555                supported_interfaces: vec![
556                    AgentInterface {
557                        url: "http://localhost:9999/a2a".to_string(),
558                        protocol_binding: "JSONRPC".to_string(),
559                        protocol_version: "1.0".to_string(),
560                        tenant: None,
561                    },
562                    AgentInterface {
563                        url: "http://localhost:9999/rest".to_string(),
564                        protocol_binding: "HTTP+JSON".to_string(),
565                        protocol_version: "1.0".to_string(),
566                        tenant: None,
567                    },
568                ],
569                default_input_modes: vec!["text/plain".to_string()],
570                default_output_modes: vec!["text/plain".to_string()],
571                skills: vec![AgentSkill {
572                    id: "echo".to_string(),
573                    name: "Echo".to_string(),
574                    description: "Echoes input".to_string(),
575                    tags: vec![],
576                    examples: None,
577                    input_modes: None,
578                    output_modes: None,
579                    security_requirements: None,
580                }],
581                capabilities: AgentCapabilities::default(),
582                provider: None,
583                icon_url: None,
584                documentation_url: None,
585                security_schemes: None,
586                security_requirements: None,
587                signatures: None,
588            }
589        }
590
591        #[test]
592        fn select_interface_prefers_jsonrpc() {
593            let card = make_test_card();
594            let selected = RemoteA2aV1Agent::select_interface(&card).unwrap();
595            assert_eq!(selected.protocol_binding, "JSONRPC");
596            assert_eq!(selected.url, "http://localhost:9999/a2a");
597        }
598
599        #[test]
600        fn select_interface_falls_back_to_http_json() {
601            let mut card = make_test_card();
602            card.supported_interfaces.retain(|i| i.protocol_binding != "JSONRPC");
603            let selected = RemoteA2aV1Agent::select_interface(&card).unwrap();
604            assert_eq!(selected.protocol_binding, "HTTP+JSON");
605            assert_eq!(selected.url, "http://localhost:9999/rest");
606        }
607
608        #[test]
609        fn select_interface_returns_none_for_unsupported() {
610            let mut card = make_test_card();
611            card.supported_interfaces = vec![AgentInterface {
612                url: "grpc://localhost:9999".to_string(),
613                protocol_binding: "GRPC".to_string(),
614                protocol_version: "1.0".to_string(),
615                tenant: None,
616            }];
617            assert!(RemoteA2aV1Agent::select_interface(&card).is_none());
618        }
619
620        #[test]
621        fn select_interface_returns_none_for_empty() {
622            let mut card = make_test_card();
623            card.supported_interfaces = vec![];
624            assert!(RemoteA2aV1Agent::select_interface(&card).is_none());
625        }
626
627        #[test]
628        fn new_agent_stores_config() {
629            let card = make_test_card();
630            let agent = RemoteA2aV1Agent::new(RemoteA2aV1Config {
631                name: "my-agent".to_string(),
632                description: "My remote agent".to_string(),
633                agent_card: card,
634            });
635            assert_eq!(agent.name(), "my-agent");
636            assert_eq!(agent.description(), "My remote agent");
637        }
638
639        #[test]
640        fn agent_has_no_sub_agents() {
641            let card = make_test_card();
642            let agent = RemoteA2aV1Agent::new(RemoteA2aV1Config {
643                name: "test".to_string(),
644                description: "test".to_string(),
645                agent_card: card,
646            });
647            assert!(agent.sub_agents().is_empty());
648        }
649
650        #[test]
651        fn build_v1_message_with_content() {
652            let msg = build_v1_message(Some("hello".to_string()));
653            assert_eq!(msg.role, a2a_protocol_types::MessageRole::User);
654            assert_eq!(msg.parts.len(), 1);
655            assert_eq!(msg.parts[0].text_content(), Some("hello"));
656        }
657
658        #[test]
659        fn build_v1_message_without_content() {
660            let msg = build_v1_message(None);
661            assert_eq!(msg.parts[0].text_content(), Some(""));
662        }
663
664        #[test]
665        fn parse_sse_data_line_extracts_data() {
666            let event = "event: message\ndata: {\"test\": true}\n";
667            assert_eq!(parse_sse_data_line(event), Some("{\"test\": true}".to_string()));
668        }
669
670        #[test]
671        fn parse_sse_data_line_returns_none_without_data() {
672            let event = "event: ping\n";
673            assert!(parse_sse_data_line(event).is_none());
674        }
675
676        #[test]
677        fn convert_status_update_with_message() {
678            use a2a_protocol_types::events::TaskStatusUpdateEvent;
679            use a2a_protocol_types::task::{ContextId, TaskId, TaskState, TaskStatus};
680
681            let mut status = TaskStatus::new(TaskState::Completed);
682            status.message = Some(a2a_protocol_types::Message {
683                id: a2a_protocol_types::MessageId::new("msg-1"),
684                role: a2a_protocol_types::MessageRole::Agent,
685                parts: vec![a2a_protocol_types::Part::text("done!")],
686                task_id: None,
687                context_id: None,
688                reference_task_ids: None,
689                extensions: None,
690                metadata: None,
691            });
692
693            let status_event = TaskStatusUpdateEvent {
694                task_id: TaskId::new("task-1"),
695                context_id: ContextId::new("ctx-1"),
696                status,
697                metadata: None,
698            };
699
700            let resp = a2a_protocol_types::events::StreamResponse::StatusUpdate(status_event);
701            let event = convert_stream_response(&resp, "inv-1", "agent-1").unwrap();
702
703            assert_eq!(event.author, "agent-1");
704            assert!(event.llm_response.turn_complete);
705            let content = event.llm_response.content.unwrap();
706            assert_eq!(content.parts.len(), 1);
707            match &content.parts[0] {
708                Part::Text { text } => assert_eq!(text, "done!"),
709                _ => panic!("expected text part"),
710            }
711        }
712
713        #[test]
714        fn convert_status_update_terminal_without_message() {
715            use a2a_protocol_types::events::TaskStatusUpdateEvent;
716            use a2a_protocol_types::task::{ContextId, TaskId, TaskState, TaskStatus};
717
718            let status_event = TaskStatusUpdateEvent {
719                task_id: TaskId::new("task-1"),
720                context_id: ContextId::new("ctx-1"),
721                status: TaskStatus::new(TaskState::Failed),
722                metadata: None,
723            };
724
725            let resp = a2a_protocol_types::events::StreamResponse::StatusUpdate(status_event);
726            let event = convert_stream_response(&resp, "inv-1", "agent-1").unwrap();
727
728            assert!(event.llm_response.turn_complete);
729            assert!(event.llm_response.content.is_none());
730        }
731
732        #[test]
733        fn convert_status_update_non_terminal_without_message() {
734            use a2a_protocol_types::events::TaskStatusUpdateEvent;
735            use a2a_protocol_types::task::{ContextId, TaskId, TaskState, TaskStatus};
736
737            let status_event = TaskStatusUpdateEvent {
738                task_id: TaskId::new("task-1"),
739                context_id: ContextId::new("ctx-1"),
740                status: TaskStatus::new(TaskState::Working),
741                metadata: None,
742            };
743
744            let resp = a2a_protocol_types::events::StreamResponse::StatusUpdate(status_event);
745            let result = convert_stream_response(&resp, "inv-1", "agent-1");
746
747            // Non-terminal without message produces no event
748            assert!(result.is_none());
749        }
750
751        #[test]
752        fn convert_artifact_update_with_text() {
753            use a2a_protocol_types::artifact::{Artifact, ArtifactId};
754            use a2a_protocol_types::events::TaskArtifactUpdateEvent;
755            use a2a_protocol_types::task::{ContextId, TaskId};
756
757            let artifact_event = TaskArtifactUpdateEvent {
758                task_id: TaskId::new("task-1"),
759                context_id: ContextId::new("ctx-1"),
760                artifact: Artifact {
761                    id: ArtifactId::new("art-1"),
762                    name: Some("result".to_string()),
763                    description: None,
764                    parts: vec![a2a_protocol_types::Part::text("artifact content")],
765                    extensions: None,
766                    metadata: None,
767                },
768                append: None,
769                last_chunk: Some(true),
770                metadata: None,
771            };
772
773            let resp = a2a_protocol_types::events::StreamResponse::ArtifactUpdate(artifact_event);
774            let event = convert_stream_response(&resp, "inv-1", "agent-1").unwrap();
775
776            assert_eq!(event.author, "agent-1");
777            let content = event.llm_response.content.unwrap();
778            assert_eq!(content.parts.len(), 1);
779            match &content.parts[0] {
780                Part::Text { text } => assert_eq!(text, "artifact content"),
781                _ => panic!("expected text part"),
782            }
783            // last_chunk=true means partial=false
784            assert!(!event.llm_response.partial);
785        }
786
787        #[test]
788        fn convert_artifact_update_partial() {
789            use a2a_protocol_types::artifact::{Artifact, ArtifactId};
790            use a2a_protocol_types::events::TaskArtifactUpdateEvent;
791            use a2a_protocol_types::task::{ContextId, TaskId};
792
793            let artifact_event = TaskArtifactUpdateEvent {
794                task_id: TaskId::new("task-1"),
795                context_id: ContextId::new("ctx-1"),
796                artifact: Artifact {
797                    id: ArtifactId::new("art-1"),
798                    name: None,
799                    description: None,
800                    parts: vec![a2a_protocol_types::Part::text("partial...")],
801                    extensions: None,
802                    metadata: None,
803                },
804                append: None,
805                last_chunk: Some(false),
806                metadata: None,
807            };
808
809            let resp = a2a_protocol_types::events::StreamResponse::ArtifactUpdate(artifact_event);
810            let event = convert_stream_response(&resp, "inv-1", "agent-1").unwrap();
811
812            assert!(event.llm_response.partial);
813        }
814
815        #[test]
816        fn create_v1_error_event_sets_fields() {
817            let event = create_v1_error_event("inv-1", "agent-1", "something broke");
818            assert_eq!(event.author, "agent-1");
819            assert_eq!(event.llm_response.error_message.as_deref(), Some("something broke"));
820            assert!(event.llm_response.turn_complete);
821        }
822    }
823}