1use crate::a2a::{
2 A2aClient, Part as A2aPart, Role, TaskArtifactUpdateEvent, TaskStatusUpdateEvent, UpdateEvent,
3};
4use adk_core::{Agent, Content, Event, EventStream, InvocationContext, Part, Result};
5use async_trait::async_trait;
6use std::sync::Arc;
7
8#[derive(Clone)]
10pub struct RemoteA2aConfig {
11 pub name: String,
13 pub description: String,
15 pub agent_url: String,
18 pub streaming: Option<bool>,
21}
22
23pub struct RemoteA2aAgent {
25 config: RemoteA2aConfig,
26}
27
28impl RemoteA2aAgent {
29 pub fn new(config: RemoteA2aConfig) -> Self {
30 Self { config }
31 }
32
33 pub fn builder(name: impl Into<String>) -> RemoteA2aAgentBuilder {
34 RemoteA2aAgentBuilder::new(name)
35 }
36}
37
38#[async_trait]
39impl Agent for RemoteA2aAgent {
40 fn name(&self) -> &str {
41 &self.config.name
42 }
43
44 fn description(&self) -> &str {
45 &self.config.description
46 }
47
48 fn sub_agents(&self) -> &[Arc<dyn Agent>] {
49 &[]
50 }
51
52 async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<EventStream> {
53 let url = self.config.agent_url.clone();
54 let invocation_id = ctx.invocation_id().to_string();
55 let agent_name = self.config.name.clone();
56 let config_streaming = self.config.streaming;
57
58 let user_content = get_user_content_from_context(ctx.as_ref());
60
61 let stream = async_stream::stream! {
62 let client = match A2aClient::from_url(&url).await {
64 Ok(c) => c,
65 Err(e) => {
66 yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
67 return;
68 }
69 };
70
71 let use_streaming = config_streaming.unwrap_or(client.agent_card().capabilities.streaming);
73
74 let message = build_a2a_message(user_content);
76
77 if use_streaming {
78 match client.send_streaming_message(message).await {
80 Ok(mut event_stream) => {
81 use futures::StreamExt;
82 while let Some(result) = event_stream.next().await {
83 match result {
84 Ok(update_event) => {
85 if let Some(event) = convert_update_event(&invocation_id, &agent_name, update_event) {
86 yield Ok(event);
87 }
88 }
89 Err(e) => {
90 yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
91 return;
92 }
93 }
94 }
95 }
96 Err(e) => {
97 yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
98 }
99 }
100 } else {
101 match client.send_message(message).await {
103 Ok(rpc_response) => {
104 if let Some(result) = rpc_response.result {
105 match serde_json::from_value::<crate::a2a::Task>(result) {
106 Ok(task) => {
107 for event in convert_task_to_events(&invocation_id, &agent_name, task) {
108 yield Ok(event);
109 }
110 }
111 Err(e) => {
112 yield Ok(create_error_event(&invocation_id, &agent_name, &format!("failed to parse response: {e}")));
113 }
114 }
115 } else if let Some(error) = rpc_response.error {
116 yield Ok(create_error_event(&invocation_id, &agent_name, &format!("RPC error: {} ({})", error.message, error.code)));
117 }
118 }
119 Err(e) => {
120 yield Ok(create_error_event(&invocation_id, &agent_name, &e.to_string()));
121 }
122 }
123 }
124 };
125
126 Ok(Box::pin(stream))
127 }
128}
129
130pub struct RemoteA2aAgentBuilder {
132 name: String,
133 description: String,
134 agent_url: Option<String>,
135 streaming: Option<bool>,
136}
137
138impl RemoteA2aAgentBuilder {
139 pub fn new(name: impl Into<String>) -> Self {
140 Self { name: name.into(), description: String::new(), agent_url: None, streaming: None }
141 }
142
143 pub fn description(mut self, description: impl Into<String>) -> Self {
144 self.description = description.into();
145 self
146 }
147
148 pub fn agent_url(mut self, url: impl Into<String>) -> Self {
149 self.agent_url = Some(url.into());
150 self
151 }
152
153 pub fn streaming(mut self, streaming: bool) -> Self {
155 self.streaming = Some(streaming);
156 self
157 }
158
159 pub fn build(self) -> Result<RemoteA2aAgent> {
160 let agent_url = self
161 .agent_url
162 .ok_or_else(|| adk_core::AdkError::agent("RemoteA2aAgent requires agent_url"))?;
163
164 Ok(RemoteA2aAgent::new(RemoteA2aConfig {
165 name: self.name,
166 description: self.description,
167 agent_url,
168 streaming: self.streaming,
169 }))
170 }
171}
172
173fn get_user_content_from_context(ctx: &dyn InvocationContext) -> Option<String> {
176 let content = ctx.user_content();
177 for part in &content.parts {
178 if let Part::Text { text } = part {
179 return Some(text.clone());
180 }
181 }
182 None
183}
184
185fn build_a2a_message(content: Option<String>) -> crate::a2a::Message {
186 let text = content.unwrap_or_default();
187 crate::a2a::Message::builder()
188 .role(Role::User)
189 .parts(vec![A2aPart::text(text)])
190 .message_id(uuid::Uuid::new_v4().to_string())
191 .build()
192}
193
194fn convert_update_event(
195 invocation_id: &str,
196 agent_name: &str,
197 update: UpdateEvent,
198) -> Option<Event> {
199 match update {
200 UpdateEvent::TaskArtifactUpdate(artifact_event) => {
201 let parts: Vec<Part> = artifact_event
202 .artifact
203 .parts
204 .iter()
205 .filter_map(|p| match p {
206 A2aPart::Text { text, .. } => Some(Part::Text { text: text.clone() }),
207 _ => None,
208 })
209 .collect();
210
211 if parts.is_empty() {
212 return None;
213 }
214
215 let mut event = Event::new(invocation_id.to_string());
216 event.author = agent_name.to_string();
217 event.llm_response.content = Some(Content { role: "model".to_string(), parts });
218 event.llm_response.partial = !artifact_event.last_chunk;
219 Some(event)
220 }
221 UpdateEvent::TaskStatusUpdate(status_event) => {
222 if status_event.final_update
224 && let Some(msg) = status_event.status.message
225 {
226 let mut event = Event::new(invocation_id.to_string());
227 event.author = agent_name.to_string();
228 event.llm_response.content = Some(Content {
229 role: "model".to_string(),
230 parts: vec![Part::Text { text: msg }],
231 });
232 event.llm_response.turn_complete = true;
233 return Some(event);
234 }
235 None
236 }
237 }
238}
239
240fn create_error_event(invocation_id: &str, agent_name: &str, error: &str) -> Event {
241 let mut event = Event::new(invocation_id.to_string());
242 event.author = agent_name.to_string();
243 event.llm_response.error_message = Some(error.to_string());
244 event.llm_response.turn_complete = true;
245 event
246}
247
248fn convert_task_to_events(
250 invocation_id: &str,
251 agent_name: &str,
252 task: crate::a2a::Task,
253) -> Vec<Event> {
254 let mut events = Vec::new();
255
256 if let Some(artifacts) = task.artifacts {
258 for artifact in artifacts {
259 let update = UpdateEvent::TaskArtifactUpdate(TaskArtifactUpdateEvent {
260 task_id: task.id.clone(),
261 context_id: task.context_id.clone(),
262 artifact,
263 append: false,
264 last_chunk: true,
265 });
266 if let Some(event) = convert_update_event(invocation_id, agent_name, update) {
267 events.push(event);
268 }
269 }
270 }
271
272 if let Some(history) = task.history {
274 for msg in history {
275 if msg.role == Role::Agent {
276 let parts: Vec<Part> = msg
277 .parts
278 .iter()
279 .filter_map(|p| match p {
280 A2aPart::Text { text, .. } => Some(Part::Text { text: text.clone() }),
281 _ => None,
282 })
283 .collect();
284
285 if !parts.is_empty() {
286 let mut event = Event::new(invocation_id.to_string());
287 event.author = agent_name.to_string();
288 event.llm_response.content = Some(Content { role: "model".to_string(), parts });
289 event.llm_response.turn_complete = false;
290 events.push(event);
291 }
292 }
293 }
294 }
295
296 let update = UpdateEvent::TaskStatusUpdate(TaskStatusUpdateEvent {
298 task_id: task.id,
299 context_id: task.context_id,
300 status: task.status,
301 final_update: true,
302 });
303 if let Some(event) = convert_update_event(invocation_id, agent_name, update) {
304 events.push(event);
305 }
306
307 events
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313
314 #[test]
315 fn test_builder() {
316 let agent = RemoteA2aAgent::builder("test")
317 .description("Test agent")
318 .agent_url("http://localhost:8080")
319 .build()
320 .unwrap();
321
322 assert_eq!(agent.name(), "test");
323 assert_eq!(agent.description(), "Test agent");
324 }
325
326 #[test]
327 fn test_builder_missing_url() {
328 let result = RemoteA2aAgent::builder("test").build();
329 assert!(result.is_err());
330 }
331
332 #[test]
333 fn test_builder_streaming_option() {
334 let agent = RemoteA2aAgent::builder("test")
335 .agent_url("http://localhost:8080")
336 .streaming(true)
337 .build()
338 .unwrap();
339
340 assert_eq!(agent.config.streaming, Some(true));
341 }
342
343 #[test]
344 fn test_builder_streaming_default_is_none() {
345 let agent =
346 RemoteA2aAgent::builder("test").agent_url("http://localhost:8080").build().unwrap();
347
348 assert_eq!(agent.config.streaming, None);
349 }
350
351 #[test]
352 fn test_convert_task_to_events_with_artifacts_and_history() {
353 use crate::a2a::{Artifact, Message, Task, TaskState, TaskStatus};
354
355 let task = Task {
356 id: "task-123".to_string(),
357 context_id: Some("ctx-456".to_string()),
358 status: TaskStatus { state: TaskState::Completed, message: Some("Done".to_string()) },
359 artifacts: Some(vec![Artifact {
360 artifact_id: "art-1".to_string(),
361 name: Some("result".to_string()),
362 description: None,
363 parts: vec![A2aPart::text("artifact content".to_string())],
364 metadata: None,
365 extensions: None,
366 }]),
367 history: Some(vec![
368 Message::builder()
369 .role(Role::Agent)
370 .parts(vec![A2aPart::text("Hello from agent".to_string())])
371 .build(),
372 ]),
373 };
374
375 let events = convert_task_to_events("inv-1", "remote-agent", task);
376
377 assert_eq!(events.len(), 3);
379
380 assert_eq!(events[0].author, "remote-agent");
382 let content = events[0].llm_response.content.as_ref().unwrap();
383 assert_eq!(content.parts[0], Part::Text { text: "artifact content".to_string() });
384
385 assert_eq!(events[1].author, "remote-agent");
387 let content = events[1].llm_response.content.as_ref().unwrap();
388 assert_eq!(content.parts[0], Part::Text { text: "Hello from agent".to_string() });
389 assert!(!events[1].llm_response.turn_complete);
390
391 assert_eq!(events[2].author, "remote-agent");
393 assert!(events[2].llm_response.turn_complete);
394 }
395
396 #[test]
397 fn test_convert_task_to_events_empty_task() {
398 use crate::a2a::{Task, TaskState, TaskStatus};
399
400 let task = Task {
401 id: "task-empty".to_string(),
402 context_id: None,
403 status: TaskStatus { state: TaskState::Completed, message: None },
404 artifacts: None,
405 history: None,
406 };
407
408 let events = convert_task_to_events("inv-1", "agent", task);
409
410 assert!(events.is_empty());
413 }
414
415 #[test]
416 fn test_convert_task_to_events_status_with_message() {
417 use crate::a2a::{Task, TaskState, TaskStatus};
418
419 let task = Task {
420 id: "task-msg".to_string(),
421 context_id: None,
422 status: TaskStatus {
423 state: TaskState::Completed,
424 message: Some("All done".to_string()),
425 },
426 artifacts: None,
427 history: None,
428 };
429
430 let events = convert_task_to_events("inv-1", "agent", task);
431
432 assert_eq!(events.len(), 1);
433 assert!(events[0].llm_response.turn_complete);
434 let content = events[0].llm_response.content.as_ref().unwrap();
435 assert_eq!(content.parts[0], Part::Text { text: "All done".to_string() });
436 }
437}
438
439#[cfg(feature = "a2a-v1")]
442pub mod v1_remote {
443 use crate::a2a::client::v1_client::A2aV1Client;
452 use a2a_protocol_types::{AgentCard, AgentInterface};
453 use adk_core::{Agent, Content, Event, EventStream, InvocationContext, Part, Result};
454 use async_trait::async_trait;
455 use std::sync::Arc;
456
457 #[derive(Clone)]
459 pub struct RemoteA2aV1Config {
460 pub name: String,
462 pub description: String,
464 pub agent_card: AgentCard,
466 pub streaming: Option<bool>,
469 }
470
471 pub struct RemoteA2aV1Agent {
477 config: RemoteA2aV1Config,
478 }
479
480 impl RemoteA2aV1Agent {
481 pub fn new(config: RemoteA2aV1Config) -> Self {
483 Self { config }
484 }
485
486 pub fn select_interface(card: &AgentCard) -> Option<&AgentInterface> {
490 card.supported_interfaces.iter().find(|i| i.protocol_binding == "JSONRPC").or_else(
491 || card.supported_interfaces.iter().find(|i| i.protocol_binding == "HTTP+JSON"),
492 )
493 }
494 }
495
496 #[async_trait]
497 impl Agent for RemoteA2aV1Agent {
498 fn name(&self) -> &str {
499 &self.config.name
500 }
501
502 fn description(&self) -> &str {
503 &self.config.description
504 }
505
506 fn sub_agents(&self) -> &[Arc<dyn Agent>] {
507 &[]
508 }
509
510 async fn run(&self, ctx: Arc<dyn InvocationContext>) -> Result<EventStream> {
511 let card = self.config.agent_card.clone();
512 let invocation_id = ctx.invocation_id().to_string();
513 let agent_name = self.config.name.clone();
514 let config_streaming = self.config.streaming;
515
516 let user_content = extract_user_text(ctx.as_ref());
518
519 let stream = async_stream::stream! {
520 let interface = match Self::select_interface(&card) {
522 Some(i) => i.clone(),
523 None => {
524 yield Ok(create_v1_error_event(
525 &invocation_id,
526 &agent_name,
527 "no supported interface found in agent card (need JSONRPC or HTTP+JSON)",
528 ));
529 return;
530 }
531 };
532
533 let client = A2aV1Client::new(card.clone());
535
536 let use_streaming = config_streaming.unwrap_or(card.capabilities.streaming.unwrap_or(false));
538
539 let message = build_v1_message(user_content);
541
542 if use_streaming {
543 match client.send_streaming_message(message).await {
545 Ok(response) => {
546 use futures::StreamExt;
547
548 let mut bytes_stream = response.bytes_stream();
549 let mut buffer = String::new();
550
551 while let Some(chunk_result) = bytes_stream.next().await {
552 let chunk = match chunk_result {
553 Ok(c) => c,
554 Err(e) => {
555 yield Ok(create_v1_error_event(
556 &invocation_id,
557 &agent_name,
558 &format!("stream error: {e}"),
559 ));
560 break;
561 }
562 };
563
564 buffer.push_str(&String::from_utf8_lossy(&chunk));
565
566 while let Some(event_end) = buffer.find("\n\n") {
568 let event_data = buffer[..event_end].to_string();
569 buffer = buffer[event_end + 2..].to_string();
570
571 if let Some(data) = parse_sse_data_line(&event_data) {
572 if data.is_empty() {
573 continue;
574 }
575
576 if let Some(event) = parse_stream_response(
578 &data,
579 &invocation_id,
580 &agent_name,
581 ) {
582 yield Ok(event);
583 }
584 }
585 }
586 }
587 }
588 Err(e) => {
589 yield Ok(create_v1_error_event(
590 &invocation_id,
591 &agent_name,
592 &format!("failed to send streaming message: {e}"),
593 ));
594 }
595 }
596 } else {
597 match client.send_message(message).await {
599 Ok(task) => {
600 if let Some(artifacts) = task.artifacts {
602 for artifact in artifacts {
603 let resp = a2a_protocol_types::events::StreamResponse::ArtifactUpdate(
604 a2a_protocol_types::events::TaskArtifactUpdateEvent {
605 task_id: task.id.clone(),
606 context_id: task.context_id.clone(),
607 artifact,
608 append: Some(false),
609 last_chunk: Some(true),
610 metadata: None,
611 }
612 );
613 if let Some(event) = convert_stream_response(&resp, &invocation_id, &agent_name) {
614 yield Ok(event);
615 }
616 }
617 }
618
619 if let Some(history) = task.history {
621 for msg in history {
622 if msg.role == a2a_protocol_types::MessageRole::Agent {
623 use a2a_protocol_types::PartContent;
624 let text_parts: Vec<Part> = msg.parts.iter().filter_map(|p| {
625 match &p.content {
626 PartContent::Text(text) => Some(Part::Text { text: text.clone() }),
627 _ => None,
628 }
629 }).collect();
630
631 if !text_parts.is_empty() {
632 let mut event = Event::new(invocation_id.clone());
633 event.author = agent_name.clone();
634 event.llm_response.content = Some(Content { role: "model".to_string(), parts: text_parts });
635 event.llm_response.turn_complete = false;
636 yield Ok(event);
637 }
638 }
639 }
640 }
641
642 let resp = a2a_protocol_types::events::StreamResponse::StatusUpdate(
644 a2a_protocol_types::events::TaskStatusUpdateEvent {
645 task_id: task.id,
646 context_id: task.context_id,
647 status: task.status,
648 metadata: None,
649 }
650 );
651 if let Some(event) = convert_stream_response(&resp, &invocation_id, &agent_name) {
652 yield Ok(event);
653 }
654 }
655 Err(e) => {
656 yield Ok(create_v1_error_event(
657 &invocation_id,
658 &agent_name,
659 &format!("failed to send message: {e}"),
660 ));
661 }
662 }
663 }
664
665 let _ = interface;
666 };
667
668 Ok(Box::pin(stream))
669 }
670 }
671
672 fn extract_user_text(ctx: &dyn InvocationContext) -> Option<String> {
674 let content = ctx.user_content();
675 for part in &content.parts {
676 if let Part::Text { text } = part {
677 return Some(text.clone());
678 }
679 }
680 None
681 }
682
683 fn build_v1_message(content: Option<String>) -> a2a_protocol_types::Message {
685 let text = content.unwrap_or_default();
686 a2a_protocol_types::Message {
687 id: a2a_protocol_types::MessageId::new(uuid::Uuid::new_v4().to_string()),
688 role: a2a_protocol_types::MessageRole::User,
689 parts: vec![a2a_protocol_types::Part::text(text)],
690 task_id: None,
691 context_id: None,
692 reference_task_ids: None,
693 extensions: None,
694 metadata: None,
695 }
696 }
697
698 fn parse_sse_data_line(event: &str) -> Option<String> {
700 for line in event.lines() {
701 if let Some(data) = line.strip_prefix("data:") {
702 return Some(data.trim().to_string());
703 }
704 }
705 None
706 }
707
708 fn parse_stream_response(data: &str, invocation_id: &str, agent_name: &str) -> Option<Event> {
712 use a2a_protocol_types::events::StreamResponse;
713
714 if let Ok(stream_resp) = serde_json::from_str::<StreamResponse>(data) {
716 return convert_stream_response(&stream_resp, invocation_id, agent_name);
717 }
718
719 if let Ok(rpc_value) = serde_json::from_str::<serde_json::Value>(data) {
721 if let Some(result) = rpc_value.get("result") {
722 if let Ok(stream_resp) = serde_json::from_value::<StreamResponse>(result.clone()) {
723 return convert_stream_response(&stream_resp, invocation_id, agent_name);
724 }
725 }
726 if let Some(error) = rpc_value.get("error") {
728 let message =
729 error.get("message").and_then(|m| m.as_str()).unwrap_or("unknown error");
730 let code = error.get("code").and_then(|c| c.as_i64()).unwrap_or(0);
731 return Some(create_v1_error_event(
732 invocation_id,
733 agent_name,
734 &format!("RPC error {code}: {message}"),
735 ));
736 }
737 }
738
739 tracing::debug!("failed to parse SSE data as StreamResponse: {data}");
740 None
741 }
742
743 fn convert_stream_response(
745 resp: &a2a_protocol_types::events::StreamResponse,
746 invocation_id: &str,
747 agent_name: &str,
748 ) -> Option<Event> {
749 use a2a_protocol_types::events::StreamResponse;
750
751 match resp {
752 StreamResponse::ArtifactUpdate(artifact_event) => {
753 use a2a_protocol_types::PartContent;
754 let parts: Vec<Part> = artifact_event
755 .artifact
756 .parts
757 .iter()
758 .filter_map(|p| match &p.content {
759 PartContent::Text(text) => Some(Part::Text { text: text.clone() }),
760 _ => None,
761 })
762 .collect();
763
764 if parts.is_empty() {
765 return None;
766 }
767
768 let mut event = Event::new(invocation_id.to_string());
769 event.author = agent_name.to_string();
770 event.llm_response.content = Some(Content { role: "model".to_string(), parts });
771 event.llm_response.partial = !artifact_event.last_chunk.unwrap_or(true);
772 Some(event)
773 }
774 StreamResponse::StatusUpdate(status_event) => {
775 let is_terminal = matches!(
778 status_event.status.state,
779 a2a_protocol_types::task::TaskState::Completed
780 | a2a_protocol_types::task::TaskState::Failed
781 | a2a_protocol_types::task::TaskState::Canceled
782 | a2a_protocol_types::task::TaskState::Rejected
783 );
784
785 if let Some(ref msg) = status_event.status.message {
786 use a2a_protocol_types::PartContent;
787 let text_parts: Vec<Part> = msg
788 .parts
789 .iter()
790 .filter_map(|p| match &p.content {
791 PartContent::Text(text) => Some(Part::Text { text: text.clone() }),
792 _ => None,
793 })
794 .collect();
795
796 if !text_parts.is_empty() {
797 let mut event = Event::new(invocation_id.to_string());
798 event.author = agent_name.to_string();
799 event.llm_response.content =
800 Some(Content { role: "model".to_string(), parts: text_parts });
801 event.llm_response.turn_complete = is_terminal;
802 return Some(event);
803 }
804 }
805
806 if is_terminal {
808 let mut event = Event::new(invocation_id.to_string());
809 event.author = agent_name.to_string();
810 event.llm_response.turn_complete = true;
811 return Some(event);
812 }
813
814 None
815 }
816 _ => None,
818 }
819 }
820
821 fn create_v1_error_event(invocation_id: &str, agent_name: &str, error: &str) -> Event {
823 let mut event = Event::new(invocation_id.to_string());
824 event.author = agent_name.to_string();
825 event.llm_response.error_message = Some(error.to_string());
826 event.llm_response.turn_complete = true;
827 event
828 }
829
830 #[cfg(test)]
831 mod tests {
832 use super::*;
833 use a2a_protocol_types::{AgentCapabilities, AgentInterface, AgentSkill};
834
835 fn make_test_card() -> AgentCard {
836 AgentCard {
837 name: "test-v1-agent".to_string(),
838 url: Some("http://localhost:9999".to_string()),
839 description: "A test v1 agent".to_string(),
840 version: "1.0.0".to_string(),
841 supported_interfaces: vec![
842 AgentInterface {
843 url: "http://localhost:9999/a2a".to_string(),
844 protocol_binding: "JSONRPC".to_string(),
845 protocol_version: "1.0".to_string(),
846 tenant: None,
847 },
848 AgentInterface {
849 url: "http://localhost:9999/rest".to_string(),
850 protocol_binding: "HTTP+JSON".to_string(),
851 protocol_version: "1.0".to_string(),
852 tenant: None,
853 },
854 ],
855 default_input_modes: vec!["text/plain".to_string()],
856 default_output_modes: vec!["text/plain".to_string()],
857 skills: vec![AgentSkill {
858 id: "echo".to_string(),
859 name: "Echo".to_string(),
860 description: "Echoes input".to_string(),
861 tags: vec![],
862 examples: None,
863 input_modes: None,
864 output_modes: None,
865 security_requirements: None,
866 }],
867 capabilities: AgentCapabilities::default(),
868 provider: None,
869 icon_url: None,
870 documentation_url: None,
871 security_schemes: None,
872 security_requirements: None,
873 signatures: None,
874 }
875 }
876
877 #[test]
878 fn select_interface_prefers_jsonrpc() {
879 let card = make_test_card();
880 let selected = RemoteA2aV1Agent::select_interface(&card).unwrap();
881 assert_eq!(selected.protocol_binding, "JSONRPC");
882 assert_eq!(selected.url, "http://localhost:9999/a2a");
883 }
884
885 #[test]
886 fn select_interface_falls_back_to_http_json() {
887 let mut card = make_test_card();
888 card.supported_interfaces.retain(|i| i.protocol_binding != "JSONRPC");
889 let selected = RemoteA2aV1Agent::select_interface(&card).unwrap();
890 assert_eq!(selected.protocol_binding, "HTTP+JSON");
891 assert_eq!(selected.url, "http://localhost:9999/rest");
892 }
893
894 #[test]
895 fn select_interface_returns_none_for_unsupported() {
896 let mut card = make_test_card();
897 card.supported_interfaces = vec![AgentInterface {
898 url: "grpc://localhost:9999".to_string(),
899 protocol_binding: "GRPC".to_string(),
900 protocol_version: "1.0".to_string(),
901 tenant: None,
902 }];
903 assert!(RemoteA2aV1Agent::select_interface(&card).is_none());
904 }
905
906 #[test]
907 fn select_interface_returns_none_for_empty() {
908 let mut card = make_test_card();
909 card.supported_interfaces = vec![];
910 assert!(RemoteA2aV1Agent::select_interface(&card).is_none());
911 }
912
913 #[test]
914 fn new_agent_stores_config() {
915 let card = make_test_card();
916 let agent = RemoteA2aV1Agent::new(RemoteA2aV1Config {
917 name: "my-agent".to_string(),
918 description: "My remote agent".to_string(),
919 agent_card: card,
920 });
921 assert_eq!(agent.name(), "my-agent");
922 assert_eq!(agent.description(), "My remote agent");
923 }
924
925 #[test]
926 fn agent_has_no_sub_agents() {
927 let card = make_test_card();
928 let agent = RemoteA2aV1Agent::new(RemoteA2aV1Config {
929 name: "test".to_string(),
930 description: "test".to_string(),
931 agent_card: card,
932 });
933 assert!(agent.sub_agents().is_empty());
934 }
935
936 #[test]
937 fn build_v1_message_with_content() {
938 let msg = build_v1_message(Some("hello".to_string()));
939 assert_eq!(msg.role, a2a_protocol_types::MessageRole::User);
940 assert_eq!(msg.parts.len(), 1);
941 assert_eq!(msg.parts[0].text_content(), Some("hello"));
942 }
943
944 #[test]
945 fn build_v1_message_without_content() {
946 let msg = build_v1_message(None);
947 assert_eq!(msg.parts[0].text_content(), Some(""));
948 }
949
950 #[test]
951 fn parse_sse_data_line_extracts_data() {
952 let event = "event: message\ndata: {\"test\": true}\n";
953 assert_eq!(parse_sse_data_line(event), Some("{\"test\": true}".to_string()));
954 }
955
956 #[test]
957 fn parse_sse_data_line_returns_none_without_data() {
958 let event = "event: ping\n";
959 assert!(parse_sse_data_line(event).is_none());
960 }
961
962 #[test]
963 fn convert_status_update_with_message() {
964 use a2a_protocol_types::events::TaskStatusUpdateEvent;
965 use a2a_protocol_types::task::{ContextId, TaskId, TaskState, TaskStatus};
966
967 let mut status = TaskStatus::new(TaskState::Completed);
968 status.message = Some(a2a_protocol_types::Message {
969 id: a2a_protocol_types::MessageId::new("msg-1"),
970 role: a2a_protocol_types::MessageRole::Agent,
971 parts: vec![a2a_protocol_types::Part::text("done!")],
972 task_id: None,
973 context_id: None,
974 reference_task_ids: None,
975 extensions: None,
976 metadata: None,
977 });
978
979 let status_event = TaskStatusUpdateEvent {
980 task_id: TaskId::new("task-1"),
981 context_id: ContextId::new("ctx-1"),
982 status,
983 metadata: None,
984 };
985
986 let resp = a2a_protocol_types::events::StreamResponse::StatusUpdate(status_event);
987 let event = convert_stream_response(&resp, "inv-1", "agent-1").unwrap();
988
989 assert_eq!(event.author, "agent-1");
990 assert!(event.llm_response.turn_complete);
991 let content = event.llm_response.content.unwrap();
992 assert_eq!(content.parts.len(), 1);
993 match &content.parts[0] {
994 Part::Text { text } => assert_eq!(text, "done!"),
995 _ => panic!("expected text part"),
996 }
997 }
998
999 #[test]
1000 fn convert_status_update_terminal_without_message() {
1001 use a2a_protocol_types::events::TaskStatusUpdateEvent;
1002 use a2a_protocol_types::task::{ContextId, TaskId, TaskState, TaskStatus};
1003
1004 let status_event = TaskStatusUpdateEvent {
1005 task_id: TaskId::new("task-1"),
1006 context_id: ContextId::new("ctx-1"),
1007 status: TaskStatus::new(TaskState::Failed),
1008 metadata: None,
1009 };
1010
1011 let resp = a2a_protocol_types::events::StreamResponse::StatusUpdate(status_event);
1012 let event = convert_stream_response(&resp, "inv-1", "agent-1").unwrap();
1013
1014 assert!(event.llm_response.turn_complete);
1015 assert!(event.llm_response.content.is_none());
1016 }
1017
1018 #[test]
1019 fn convert_status_update_non_terminal_without_message() {
1020 use a2a_protocol_types::events::TaskStatusUpdateEvent;
1021 use a2a_protocol_types::task::{ContextId, TaskId, TaskState, TaskStatus};
1022
1023 let status_event = TaskStatusUpdateEvent {
1024 task_id: TaskId::new("task-1"),
1025 context_id: ContextId::new("ctx-1"),
1026 status: TaskStatus::new(TaskState::Working),
1027 metadata: None,
1028 };
1029
1030 let resp = a2a_protocol_types::events::StreamResponse::StatusUpdate(status_event);
1031 let result = convert_stream_response(&resp, "inv-1", "agent-1");
1032
1033 assert!(result.is_none());
1035 }
1036
1037 #[test]
1038 fn convert_artifact_update_with_text() {
1039 use a2a_protocol_types::artifact::{Artifact, ArtifactId};
1040 use a2a_protocol_types::events::TaskArtifactUpdateEvent;
1041 use a2a_protocol_types::task::{ContextId, TaskId};
1042
1043 let artifact_event = TaskArtifactUpdateEvent {
1044 task_id: TaskId::new("task-1"),
1045 context_id: ContextId::new("ctx-1"),
1046 artifact: Artifact {
1047 id: ArtifactId::new("art-1"),
1048 name: Some("result".to_string()),
1049 description: None,
1050 parts: vec![a2a_protocol_types::Part::text("artifact content")],
1051 extensions: None,
1052 metadata: None,
1053 },
1054 append: None,
1055 last_chunk: Some(true),
1056 metadata: None,
1057 };
1058
1059 let resp = a2a_protocol_types::events::StreamResponse::ArtifactUpdate(artifact_event);
1060 let event = convert_stream_response(&resp, "inv-1", "agent-1").unwrap();
1061
1062 assert_eq!(event.author, "agent-1");
1063 let content = event.llm_response.content.unwrap();
1064 assert_eq!(content.parts.len(), 1);
1065 match &content.parts[0] {
1066 Part::Text { text } => assert_eq!(text, "artifact content"),
1067 _ => panic!("expected text part"),
1068 }
1069 assert!(!event.llm_response.partial);
1071 }
1072
1073 #[test]
1074 fn convert_artifact_update_partial() {
1075 use a2a_protocol_types::artifact::{Artifact, ArtifactId};
1076 use a2a_protocol_types::events::TaskArtifactUpdateEvent;
1077 use a2a_protocol_types::task::{ContextId, TaskId};
1078
1079 let artifact_event = TaskArtifactUpdateEvent {
1080 task_id: TaskId::new("task-1"),
1081 context_id: ContextId::new("ctx-1"),
1082 artifact: Artifact {
1083 id: ArtifactId::new("art-1"),
1084 name: None,
1085 description: None,
1086 parts: vec![a2a_protocol_types::Part::text("partial...")],
1087 extensions: None,
1088 metadata: None,
1089 },
1090 append: None,
1091 last_chunk: Some(false),
1092 metadata: None,
1093 };
1094
1095 let resp = a2a_protocol_types::events::StreamResponse::ArtifactUpdate(artifact_event);
1096 let event = convert_stream_response(&resp, "inv-1", "agent-1").unwrap();
1097
1098 assert!(event.llm_response.partial);
1099 }
1100
1101 #[test]
1102 fn create_v1_error_event_sets_fields() {
1103 let event = create_v1_error_event("inv-1", "agent-1", "something broke");
1104 assert_eq!(event.author, "agent-1");
1105 assert_eq!(event.llm_response.error_message.as_deref(), Some("something broke"));
1106 assert!(event.llm_response.turn_complete);
1107 }
1108 }
1109}