Skip to main content

oxios_kernel/tools/
a2a_tools.rs

1//! A2A tools — let agents communicate with other agents at runtime.
2//!
3//! Provides three tools:
4//! - `a2a_delegate` — delegate a task to another agent by capability
5//! - `a2a_send` — send a message to a specific agent
6//! - `a2a_query` — discover agents by capability or skill
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use oxi_sdk::{AgentTool, AgentToolResult, ToolContext, ToolError};
12use serde_json::{json, Value};
13use uuid::Uuid;
14
15use crate::a2a::{A2AMessage, A2AProtocol, TaskPriority, TaskSpec};
16use crate::types::AgentId;
17// ─── A2aDelegateTool ───────────────────────────────────────────────────
18
19/// Tool for delegating a task to another agent discovered by capability.
20///
21/// Usage flow:
22/// 1. Agent calls `a2a_delegate` with a description and required capability.
23/// 2. The tool queries the AgentCardRegistry for agents with that capability.
24/// 3. If found, it delegates the task via A2A and waits for the result.
25/// 4. Returns the execution result to the calling agent.
26pub struct A2aDelegateTool {
27    a2a: Arc<A2AProtocol>,
28    my_agent_id: AgentId,
29}
30
31impl A2aDelegateTool {
32    /// Create a new A2A delegate tool.
33    pub fn new(a2a: Arc<A2AProtocol>, agent_id: AgentId) -> Self {
34        Self {
35            a2a,
36            my_agent_id: agent_id,
37        }
38    }
39
40    /// Create an `A2aDelegateTool` from a [`KernelHandle`].
41    ///
42    /// Extracts the A2A protocol from the kernel's a2a facade.
43    pub fn from_kernel(kernel: &crate::kernel_handle::KernelHandle, agent_id: AgentId) -> Self {
44        Self::new(kernel.a2a.protocol().clone(), agent_id)
45    }
46}
47
48impl std::fmt::Debug for A2aDelegateTool {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("A2aDelegateTool").finish()
51    }
52}
53
54#[async_trait]
55impl AgentTool for A2aDelegateTool {
56    fn name(&self) -> &str {
57        "a2a_delegate"
58    }
59
60    fn label(&self) -> &str {
61        "A2A Delegate"
62    }
63
64    fn description(&self) -> &str {
65        "Delegate a task to another agent. Specify a capability (e.g. 'code-review', 'testing') \
66         and a description of the work. The system will find a suitable agent, execute the task, \
67         and return the result. This is a blocking call — it waits for the delegated agent to complete."
68    }
69
70    fn parameters_schema(&self) -> Value {
71        json!({
72            "type": "object",
73            "properties": {
74                "description": {
75                    "type": "string",
76                    "description": "Human-readable description of the task to delegate"
77                },
78                "capability": {
79                    "type": "string",
80                    "description": "Required capability of the target agent (e.g. 'code-review', 'testing', 'debugging')"
81                },
82                "payload": {
83                    "type": "object",
84                    "description": "Structured data for the task (optional)"
85                },
86                "priority": {
87                    "type": "string",
88                    "enum": ["low", "normal", "high", "critical"],
89                    "description": "Task priority (default: normal)"
90                }
91            },
92            "required": ["description", "capability"]
93        })
94    }
95
96    async fn execute(
97        &self,
98        _tool_call_id: &str,
99        params: Value,
100        _shutdown: Option<tokio::sync::oneshot::Receiver<()>>,
101        _ctx: &ToolContext,
102    ) -> Result<AgentToolResult, ToolError> {
103        let description = params["description"].as_str().unwrap_or("").to_string();
104        if description.is_empty() {
105            return Ok(AgentToolResult::error(
106                "Missing required parameter: description",
107            ));
108        }
109
110        let capability = params["capability"].as_str().unwrap_or("").to_string();
111        if capability.is_empty() {
112            return Ok(AgentToolResult::error(
113                "Missing required parameter: capability",
114            ));
115        }
116
117        let payload = params.get("payload").cloned().unwrap_or(json!({}));
118        let priority = parse_priority(params["priority"].as_str());
119
120        let my_id = self.my_agent_id;
121
122        // 1. Find agents with the required capability.
123        let candidates = match self.a2a.query_capabilities(&capability).await {
124            Ok(c) => c,
125            Err(e) => {
126                return Ok(AgentToolResult::error(format!(
127                    "Failed to query capabilities: {e}"
128                )))
129            }
130        };
131
132        if candidates.is_empty() {
133            // No agent available — guide the LLM to handle it itself.
134            return Ok(AgentToolResult::success(format!(
135                "No agents currently available with capability '{capability}'. You should handle this task yourself."
136            )));
137        }
138
139        // 2. Pick the first available agent (could be smarter — load-balancing later).
140        let target = &candidates[0];
141        let target_id = target.agent_id;
142
143        tracing::info!(
144            from = %my_id,
145            to = %target_id,
146            target_name = %target.name,
147            capability = %capability,
148            "A2A delegating task"
149        );
150
151        // 3. Create task spec and delegate via A2A.
152        let task = TaskSpec::new(&description, payload.clone()).with_priority(priority);
153        let task_id = task.task_id;
154
155        // 4. Execute via dispatch handler (blocking — waits for result).
156        match self.a2a.execute_delegation(my_id, target_id, task).await {
157            Some(Ok(result)) => Ok(AgentToolResult::success(
158                serde_json::to_string(&json!({
159                    "task_id": task_id.to_string(),
160                    "delegated_to": target.name,
161                    "delegated_to_id": target_id.to_string(),
162                    "status": "completed",
163                    "result": result,
164                }))
165                .unwrap_or_default(),
166            )),
167            Some(Err(e)) => Ok(AgentToolResult::error(format!(
168                "A2A delegation failed: {e}"
169            ))),
170            None => {
171                // No handler registered — fall back to fire-and-forget.
172                tracing::warn!("No A2A dispatch handler registered, using fire-and-forget");
173                match self
174                    .a2a
175                    .delegate_task(my_id, target_id, TaskSpec::new(&description, payload))
176                    .await
177                {
178                    Ok(_) => Ok(AgentToolResult::success(format!(
179                        "Task delegated to '{}' (no handler — fire-and-forget). Task ID: {}",
180                        target.name, task_id
181                    ))),
182                    Err(e) => Ok(AgentToolResult::error(format!("Delegation failed: {e}"))),
183                }
184            }
185        }
186    }
187}
188
189// ─── A2aSendTool ───────────────────────────────────────────────────────
190
191/// Tool for sending a direct message to a specific agent.
192///
193/// Unlike `a2a_delegate`, this sends a fire-and-forget message.
194/// Use for status updates, notifications, or simple queries.
195pub struct A2aSendTool {
196    a2a: Arc<A2AProtocol>,
197    my_agent_id: AgentId,
198}
199
200impl A2aSendTool {
201    /// Create a new A2A send tool.
202    pub fn new(a2a: Arc<A2AProtocol>, agent_id: AgentId) -> Self {
203        Self {
204            a2a,
205            my_agent_id: agent_id,
206        }
207    }
208
209    /// Create an `A2aSendTool` from a [`KernelHandle`].
210    ///
211    /// Extracts the A2A protocol from the kernel's a2a facade.
212    pub fn from_kernel(kernel: &crate::kernel_handle::KernelHandle, agent_id: AgentId) -> Self {
213        Self::new(kernel.a2a.protocol().clone(), agent_id)
214    }
215}
216
217impl std::fmt::Debug for A2aSendTool {
218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219        f.debug_struct("A2aSendTool").finish()
220    }
221}
222
223#[async_trait]
224impl AgentTool for A2aSendTool {
225    fn name(&self) -> &str {
226        "a2a_send"
227    }
228
229    fn label(&self) -> &str {
230        "A2A Send"
231    }
232
233    fn description(&self) -> &str {
234        "Send a message to a specific agent by ID. Fire-and-forget — does not wait for a response. \
235         Use for status updates, notifications, or sharing information."
236    }
237
238    fn parameters_schema(&self) -> Value {
239        json!({
240            "type": "object",
241            "properties": {
242                "target_agent_id": {
243                    "type": "string",
244                    "description": "UUID of the target agent"
245                },
246                "message_type": {
247                    "type": "string",
248                    "enum": ["status_update", "result_sharing", "handshake"],
249                    "description": "Type of message to send (default: status_update)"
250                },
251                "content": {
252                    "type": "string",
253                    "description": "The message content"
254                },
255                "task_id": {
256                    "type": "string",
257                    "description": "Task UUID this message relates to (for status_update and result_sharing)"
258                },
259                "payload": {
260                    "type": "object",
261                    "description": "Structured data to share (optional, for result_sharing)"
262                },
263                "progress": {
264                    "type": "integer",
265                    "description": "Progress percentage for status updates (0-100)"
266                }
267            },
268            "required": ["target_agent_id", "message_type", "content"]
269        })
270    }
271
272    async fn execute(
273        &self,
274        _tool_call_id: &str,
275        params: Value,
276        _shutdown: Option<tokio::sync::oneshot::Receiver<()>>,
277        _ctx: &ToolContext,
278    ) -> Result<AgentToolResult, ToolError> {
279        let target_str = params["target_agent_id"].as_str().unwrap_or("");
280        let target_id: AgentId = match Uuid::parse_str(target_str) {
281            Ok(id) => id,
282            Err(e) => {
283                return Ok(AgentToolResult::error(format!(
284                    "Invalid target_agent_id: {e}"
285                )))
286            }
287        };
288
289        let message_type = params["message_type"].as_str().unwrap_or("status_update");
290        let content = params["content"].as_str().unwrap_or("").to_string();
291        let payload = params.get("payload").cloned().unwrap_or(json!({}));
292        let progress = params["progress"].as_u64().unwrap_or(0) as u8;
293        let task_id: Uuid = params["task_id"]
294            .as_str()
295            .and_then(|s| Uuid::parse_str(s).ok())
296            .unwrap_or_else(Uuid::new_v4);
297
298        let my_id = self.my_agent_id;
299
300        let message = match message_type {
301            "status_update" => A2AMessage::StatusUpdate {
302                task_id,
303                progress,
304                message: content,
305            },
306            "result_sharing" => A2AMessage::ResultSharing {
307                task_id,
308                result: payload,
309                summary: content,
310            },
311            "handshake" => {
312                let card = self.a2a.registry().get_agent(my_id).await;
313                let (name, capabilities) = card
314                    .map(|c| (c.name, c.capabilities))
315                    .unwrap_or(("unknown".into(), vec![]));
316                A2AMessage::Handshake {
317                    agent_id: my_id,
318                    name,
319                    capabilities,
320                }
321            }
322            _ => {
323                return Ok(AgentToolResult::error(format!(
324                    "Unknown message_type: {message_type}"
325                )))
326            }
327        };
328
329        match self.a2a.send_message(my_id, target_id, message).await {
330            Ok(request_id) => Ok(AgentToolResult::success(
331                serde_json::to_string(&json!({
332                    "request_id": request_id.to_string(),
333                    "sent_to": target_str,
334                }))
335                .unwrap_or_default(),
336            )),
337            Err(e) => Ok(AgentToolResult::error(format!("Failed to send: {e}"))),
338        }
339    }
340}
341
342// ─── A2aQueryTool ──────────────────────────────────────────────────────
343
344/// Tool for discovering other agents by capability.
345///
346/// Returns a list of agent cards matching the requested capability or skill.
347pub struct A2aQueryTool {
348    a2a: Arc<A2AProtocol>,
349}
350
351impl A2aQueryTool {
352    /// Create a new A2A query tool.
353    pub fn new(a2a: Arc<A2AProtocol>) -> Self {
354        Self { a2a }
355    }
356
357    /// Create an `A2aQueryTool` from a [`KernelHandle`].
358    ///
359    /// Extracts the A2A protocol from the kernel's a2a facade.
360    pub fn from_kernel(kernel: &crate::kernel_handle::KernelHandle) -> Self {
361        Self::new(kernel.a2a.protocol().clone())
362    }
363}
364
365impl std::fmt::Debug for A2aQueryTool {
366    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367        f.debug_struct("A2aQueryTool").finish()
368    }
369}
370
371#[async_trait]
372impl AgentTool for A2aQueryTool {
373    fn name(&self) -> &str {
374        "a2a_query"
375    }
376
377    fn label(&self) -> &str {
378        "A2A Query"
379    }
380
381    fn description(&self) -> &str {
382        "Discover other agents by capability or skill. Returns a list of available agents \
383         with their names, capabilities, and status."
384    }
385
386    fn parameters_schema(&self) -> Value {
387        json!({
388            "type": "object",
389            "properties": {
390                "capability": {
391                    "type": "string",
392                    "description": "Search for agents with this capability"
393                },
394                "skill": {
395                    "type": "string",
396                    "description": "Search for agents with this skill"
397                },
398                "limit": {
399                    "type": "integer",
400                    "description": "Maximum number of results (default: 10)"
401                }
402            }
403        })
404    }
405
406    async fn execute(
407        &self,
408        _tool_call_id: &str,
409        params: Value,
410        _shutdown: Option<tokio::sync::oneshot::Receiver<()>>,
411        _ctx: &ToolContext,
412    ) -> Result<AgentToolResult, ToolError> {
413        let capability = params["capability"].as_str();
414        let skill = params["skill"].as_str();
415        let limit = params["limit"].as_u64().unwrap_or(10) as usize;
416
417        let agents = if let Some(cap) = capability {
418            match self.a2a.query_capabilities(cap).await {
419                Ok(a) => a,
420                Err(e) => return Ok(AgentToolResult::error(format!("Query failed: {e}"))),
421            }
422        } else if let Some(sk) = skill {
423            match self.a2a.registry().find_agents_by_skill(sk).await {
424                Ok(a) => a,
425                Err(e) => return Ok(AgentToolResult::error(format!("Query failed: {e}"))),
426            }
427        } else {
428            // No filter — return all agents.
429            self.a2a.registry().list_agents().await
430        };
431
432        let cards: Vec<Value> = agents
433            .into_iter()
434            .take(limit)
435            .map(|card| {
436                json!({
437                    "agent_id": card.agent_id.to_string(),
438                    "name": card.name,
439                    "description": card.description,
440                    "capabilities": card.capabilities,
441                    "skills": card.skills,
442                    "status": format!("{:?}", card.status),
443                })
444            })
445            .collect();
446
447        Ok(AgentToolResult::success(
448            serde_json::to_string(&json!({
449                "agents": cards,
450                "count": cards.len(),
451            }))
452            .unwrap_or_default(),
453        ))
454    }
455}
456
457// ─── Helpers ────────────────────────────────────────────────────────────
458
459fn parse_priority(s: Option<&str>) -> TaskPriority {
460    match s {
461        Some("low") => TaskPriority::Low,
462        Some("high") => TaskPriority::High,
463        Some("critical") => TaskPriority::Critical,
464        _ => TaskPriority::Normal,
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471    use crate::event_bus::EventBus;
472
473    fn test_a2a() -> Arc<A2AProtocol> {
474        Arc::new(A2AProtocol::new(EventBus::new(256)))
475    }
476
477    async fn register_agent_async(a2a: &A2AProtocol, name: &str, caps: &[&str]) -> AgentId {
478        let id = Uuid::new_v4();
479        let mut card = crate::a2a::AgentCard::new(id, name, format!("Test agent: {name}"));
480        for cap in caps {
481            card = card.with_capability(*cap);
482        }
483        a2a.registry().register_agent(card).await.unwrap();
484        id
485    }
486
487    #[tokio::test]
488    async fn test_a2a_query_finds_capability() {
489        let a2a = test_a2a();
490        register_agent_async(&a2a, "reviewer", &["code-review"]).await;
491
492        let tool = A2aQueryTool::new(a2a.clone());
493        let params = json!({"capability": "code-review"});
494        let result = tool
495            .execute("tc", params, None, &ToolContext::default())
496            .await
497            .unwrap();
498        assert!(result.output.contains("reviewer"));
499        assert!(result.output.contains("1"));
500    }
501
502    #[tokio::test]
503    async fn test_a2a_query_no_match() {
504        let a2a = test_a2a();
505
506        let tool = A2aQueryTool::new(a2a.clone());
507        let params = json!({"capability": "nonexistent"});
508        let result = tool
509            .execute("tc", params, None, &ToolContext::default())
510            .await
511            .unwrap();
512        assert!(result.output.contains("0"));
513    }
514
515    #[tokio::test]
516    async fn test_a2a_query_respects_limit() {
517        let a2a = test_a2a();
518        register_agent_async(&a2a, "a1", &["test"]).await;
519        register_agent_async(&a2a, "a2", &["test"]).await;
520        register_agent_async(&a2a, "a3", &["test"]).await;
521
522        let tool = A2aQueryTool::new(a2a.clone());
523        let params = json!({"capability": "test", "limit": 2});
524        let result = tool
525            .execute("tc", params, None, &ToolContext::default())
526            .await
527            .unwrap();
528        assert!(result.output.contains("2"));
529    }
530
531    #[tokio::test]
532    async fn test_a2a_delegate_no_agents_returns_guidance() {
533        let a2a = test_a2a();
534        let agent_id = Uuid::new_v4();
535
536        let tool = A2aDelegateTool::new(a2a.clone(), agent_id);
537        let params = json!({"description": "review code", "capability": "code-review"});
538        let result = tool
539            .execute("tc", params, None, &ToolContext::default())
540            .await
541            .unwrap();
542        assert!(result.success);
543        assert!(result.output.contains("handle this task yourself"));
544    }
545
546    #[tokio::test]
547    async fn test_a2a_send_invalid_uuid() {
548        let a2a = test_a2a();
549        let agent_id = Uuid::new_v4();
550
551        let tool = A2aSendTool::new(a2a.clone(), agent_id);
552        let params = json!({"target_agent_id": "not-a-uuid", "message_type": "status_update", "content": "hello"});
553        let result = tool
554            .execute("tc", params, None, &ToolContext::default())
555            .await
556            .unwrap();
557        assert!(!result.success);
558        assert!(result.output.contains("Invalid target_agent_id"));
559    }
560
561    #[tokio::test]
562    async fn test_a2a_send_handshake() {
563        let a2a = test_a2a();
564        let my_id = Uuid::new_v4();
565        let target_id = Uuid::new_v4();
566
567        // Register self so handshake can look up name.
568        let card = crate::a2a::AgentCard::new(my_id, "me", "Test agent").with_capability("test");
569        a2a.registry().register_agent(card).await.unwrap();
570
571        let tool = A2aSendTool::new(a2a.clone(), my_id);
572        let params = json!({"target_agent_id": target_id.to_string(), "message_type": "handshake", "content": "hello"});
573        let result = tool
574            .execute("tc", params, None, &ToolContext::default())
575            .await
576            .unwrap();
577        assert!(result.success);
578
579        // Verify message in queue.
580        let msgs = a2a.receive_messages(target_id).await;
581        assert_eq!(msgs.len(), 1);
582    }
583
584    #[test]
585    fn test_parse_priority() {
586        assert!(matches!(parse_priority(Some("low")), TaskPriority::Low));
587        assert!(matches!(parse_priority(Some("high")), TaskPriority::High));
588        assert!(matches!(
589            parse_priority(Some("critical")),
590            TaskPriority::Critical
591        ));
592        assert!(matches!(parse_priority(None), TaskPriority::Normal));
593        assert!(matches!(
594            parse_priority(Some("unknown")),
595            TaskPriority::Normal
596        ));
597    }
598}