Skip to main content

oxios_kernel/tools/
a2a_tools.rs

1//! A2A tools — let agents communicate with other agents at runtime.
2//!
3//! Provides three tools:
4//! - `a2a_delegate` — delegate a task to another agent by capability
5//! - `a2a_send` — send a message to a specific agent
6//! - `a2a_query` — discover agents by capability or skill
7
8use async_trait::async_trait;
9use std::sync::Arc;
10
11use oxi_sdk::{AgentTool, AgentToolResult, ToolContext};
12use serde_json::{Value, json};
13use uuid::Uuid;
14
15use crate::a2a::{A2AMessage, A2AProtocol, TaskPriority, TaskSpec};
16use crate::types::AgentId;
17// ─── A2aDelegateTool ───────────────────────────────────────────────────
18
19/// Tool for delegating a task to another agent discovered by capability.
20///
21/// Usage flow:
22/// 1. Agent calls `a2a_delegate` with a description and required capability.
23/// 2. The tool queries the AgentCardRegistry for agents with that capability.
24/// 3. If found, it delegates the task via A2A and waits for the result.
25/// 4. Returns the execution result to the calling agent.
26pub struct A2aDelegateTool {
27    a2a: Arc<A2AProtocol>,
28    my_agent_id: AgentId,
29}
30
31impl A2aDelegateTool {
32    /// Create a new A2A delegate tool.
33    pub fn new(a2a: Arc<A2AProtocol>, agent_id: AgentId) -> Self {
34        Self {
35            a2a,
36            my_agent_id: agent_id,
37        }
38    }
39
40    /// Create an `A2aDelegateTool` from a [`KernelHandle`].
41    ///
42    /// Extracts the A2A protocol from the kernel's a2a facade.
43    pub fn from_kernel(kernel: &crate::kernel_handle::KernelHandle, agent_id: AgentId) -> Self {
44        Self::new(kernel.a2a.protocol().clone(), agent_id)
45    }
46}
47
48impl std::fmt::Debug for A2aDelegateTool {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("A2aDelegateTool").finish()
51    }
52}
53
54#[async_trait]
55
56impl AgentTool for A2aDelegateTool {
57    fn name(&self) -> &str {
58        "a2a_delegate"
59    }
60
61    fn label(&self) -> &str {
62        "A2A Delegate"
63    }
64
65    fn description(&self) -> &str {
66        "Delegate a task to another agent. Specify a capability (e.g. 'code-review', 'testing') \
67         and a description of the work. The system will find a suitable agent, execute the task, \
68         and return the result. This is a blocking call — it waits for the delegated agent to complete."
69    }
70
71    fn parameters_schema(&self) -> Value {
72        json!({
73            "type": "object",
74            "properties": {
75                "description": {
76                    "type": "string",
77                    "description": "Human-readable description of the task to delegate"
78                },
79                "capability": {
80                    "type": "string",
81                    "description": "Required capability of the target agent (e.g. 'code-review', 'testing', 'debugging')"
82                },
83                "payload": {
84                    "type": "object",
85                    "description": "Structured data for the task (optional)"
86                },
87                "priority": {
88                    "type": "string",
89                    "enum": ["low", "normal", "high", "critical"],
90                    "description": "Task priority (default: normal)"
91                }
92            },
93            "required": ["description", "capability"]
94        })
95    }
96
97    async fn execute(
98        &self,
99        _tool_call_id: &str,
100        params: Value,
101        _shutdown: Option<tokio::sync::oneshot::Receiver<()>>,
102        _ctx: &ToolContext,
103    ) -> Result<AgentToolResult, oxi_sdk::ToolError> {
104        let description = params["description"].as_str().unwrap_or("").to_string();
105        if description.is_empty() {
106            return Ok(AgentToolResult::error(
107                "Missing required parameter: description",
108            ));
109        }
110
111        let capability = params["capability"].as_str().unwrap_or("").to_string();
112        if capability.is_empty() {
113            return Ok(AgentToolResult::error(
114                "Missing required parameter: capability",
115            ));
116        }
117
118        let payload = params.get("payload").cloned().unwrap_or(json!({}));
119        let priority = parse_priority(params["priority"].as_str());
120
121        let my_id = self.my_agent_id;
122
123        // 1. Find agents with the required capability.
124        let candidates = match self.a2a.query_capabilities(&capability).await {
125            Ok(c) => c,
126            Err(e) => {
127                return Ok(AgentToolResult::error(format!(
128                    "Failed to query capabilities: {e}"
129                )));
130            }
131        };
132
133        if candidates.is_empty() {
134            // No agent available — guide the LLM to handle it itself.
135            return Ok(AgentToolResult::success(format!(
136                "No agents currently available with capability '{capability}'. You should handle this task yourself."
137            )));
138        }
139
140        // 2. Pick the first available agent (could be smarter — load-balancing later).
141        let target = &candidates[0];
142        let target_id = target.agent_id;
143
144        tracing::info!(
145            from = %my_id,
146            to = %target_id,
147            target_name = %target.name,
148            capability = %capability,
149            "A2A delegating task"
150        );
151
152        // 3. Create task spec and delegate via A2A.
153        let task = TaskSpec::new(&description, payload.clone()).with_priority(priority);
154        let task_id = task.task_id;
155
156        // 4. Execute via dispatch handler (blocking — waits for result).
157        match self.a2a.execute_delegation(my_id, target_id, task).await {
158            Some(Ok(result)) => Ok(AgentToolResult::success(
159                serde_json::to_string(&json!({
160                    "task_id": task_id.to_string(),
161                    "delegated_to": target.name,
162                    "delegated_to_id": target_id.to_string(),
163                    "status": "completed",
164                    "result": result,
165                }))
166                .unwrap_or_default(),
167            )),
168            Some(Err(e)) => Ok(AgentToolResult::error(format!(
169                "A2A delegation failed: {e}"
170            ))),
171            None => {
172                // No handler registered — fall back to fire-and-forget.
173                tracing::warn!("No A2A dispatch handler registered, using fire-and-forget");
174                match self
175                    .a2a
176                    .delegate_task(my_id, target_id, TaskSpec::new(&description, payload))
177                    .await
178                {
179                    Ok(_) => Ok(AgentToolResult::success(format!(
180                        "Task delegated to '{}' (no handler — fire-and-forget). Task ID: {}",
181                        target.name, task_id
182                    ))),
183                    Err(e) => Ok(AgentToolResult::error(format!("Delegation failed: {e}"))),
184                }
185            }
186        }
187    }
188}
189
190// ─── A2aSendTool ───────────────────────────────────────────────────────
191
192/// Tool for sending a direct message to a specific agent.
193///
194/// Unlike `a2a_delegate`, this sends a fire-and-forget message.
195/// Use for status updates, notifications, or simple queries.
196pub struct A2aSendTool {
197    a2a: Arc<A2AProtocol>,
198    my_agent_id: AgentId,
199}
200
201impl A2aSendTool {
202    /// Create a new A2A send tool.
203    pub fn new(a2a: Arc<A2AProtocol>, agent_id: AgentId) -> Self {
204        Self {
205            a2a,
206            my_agent_id: agent_id,
207        }
208    }
209
210    /// Create an `A2aSendTool` from a [`KernelHandle`].
211    ///
212    /// Extracts the A2A protocol from the kernel's a2a facade.
213    pub fn from_kernel(kernel: &crate::kernel_handle::KernelHandle, agent_id: AgentId) -> Self {
214        Self::new(kernel.a2a.protocol().clone(), agent_id)
215    }
216}
217
218impl std::fmt::Debug for A2aSendTool {
219    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
220        f.debug_struct("A2aSendTool").finish()
221    }
222}
223
224#[async_trait]
225
226impl AgentTool for A2aSendTool {
227    fn name(&self) -> &str {
228        "a2a_send"
229    }
230
231    fn label(&self) -> &str {
232        "A2A Send"
233    }
234
235    fn description(&self) -> &str {
236        "Send a message to a specific agent by ID. Fire-and-forget — does not wait for a response. \
237         Use for status updates, notifications, or sharing information."
238    }
239
240    fn parameters_schema(&self) -> Value {
241        json!({
242            "type": "object",
243            "properties": {
244                "target_agent_id": {
245                    "type": "string",
246                    "description": "UUID of the target agent"
247                },
248                "message_type": {
249                    "type": "string",
250                    "enum": ["status_update", "result_sharing", "handshake"],
251                    "description": "Type of message to send (default: status_update)"
252                },
253                "content": {
254                    "type": "string",
255                    "description": "The message content"
256                },
257                "task_id": {
258                    "type": "string",
259                    "description": "Task UUID this message relates to (for status_update and result_sharing)"
260                },
261                "payload": {
262                    "type": "object",
263                    "description": "Structured data to share (optional, for result_sharing)"
264                },
265                "progress": {
266                    "type": "integer",
267                    "description": "Progress percentage for status updates (0-100)"
268                }
269            },
270            "required": ["target_agent_id", "message_type", "content"]
271        })
272    }
273
274    async fn execute(
275        &self,
276        _tool_call_id: &str,
277        params: Value,
278        _shutdown: Option<tokio::sync::oneshot::Receiver<()>>,
279        _ctx: &ToolContext,
280    ) -> Result<AgentToolResult, oxi_sdk::ToolError> {
281        let target_str = params["target_agent_id"].as_str().unwrap_or("");
282        let target_id: AgentId = match Uuid::parse_str(target_str) {
283            Ok(id) => id,
284            Err(e) => {
285                return Ok(AgentToolResult::error(format!(
286                    "Invalid target_agent_id: {e}"
287                )));
288            }
289        };
290
291        let message_type = params["message_type"].as_str().unwrap_or("status_update");
292        let content = params["content"].as_str().unwrap_or("").to_string();
293        let payload = params.get("payload").cloned().unwrap_or(json!({}));
294        let progress = params["progress"].as_u64().unwrap_or(0) as u8;
295        let task_id: Uuid = params["task_id"]
296            .as_str()
297            .and_then(|s| Uuid::parse_str(s).ok())
298            .unwrap_or_else(Uuid::new_v4);
299
300        let my_id = self.my_agent_id;
301
302        let message = match message_type {
303            "status_update" => A2AMessage::StatusUpdate {
304                task_id,
305                progress,
306                message: content,
307            },
308            "result_sharing" => A2AMessage::ResultSharing {
309                task_id,
310                result: payload,
311                summary: content,
312            },
313            "handshake" => {
314                let card = self.a2a.registry().get_agent(my_id).await;
315                let (name, capabilities) = card
316                    .map(|c| (c.name, c.capabilities))
317                    .unwrap_or(("unknown".into(), vec![]));
318                A2AMessage::Handshake {
319                    agent_id: my_id,
320                    name,
321                    capabilities,
322                }
323            }
324            _ => {
325                return Ok(AgentToolResult::error(format!(
326                    "Unknown message_type: {message_type}"
327                )));
328            }
329        };
330
331        match self.a2a.send_message(my_id, target_id, message).await {
332            Ok(request_id) => Ok(AgentToolResult::success(
333                serde_json::to_string(&json!({
334                    "request_id": request_id.to_string(),
335                    "sent_to": target_str,
336                }))
337                .unwrap_or_default(),
338            )),
339            Err(e) => Ok(AgentToolResult::error(format!("Failed to send: {e}"))),
340        }
341    }
342}
343
344// ─── A2aQueryTool ──────────────────────────────────────────────────────
345
346/// Tool for discovering other agents by capability.
347///
348/// Returns a list of agent cards matching the requested capability or skill.
349pub struct A2aQueryTool {
350    a2a: Arc<A2AProtocol>,
351}
352
353impl A2aQueryTool {
354    /// Create a new A2A query tool.
355    pub fn new(a2a: Arc<A2AProtocol>) -> Self {
356        Self { a2a }
357    }
358
359    /// Create an `A2aQueryTool` from a [`KernelHandle`].
360    ///
361    /// Extracts the A2A protocol from the kernel's a2a facade.
362    pub fn from_kernel(kernel: &crate::kernel_handle::KernelHandle) -> Self {
363        Self::new(kernel.a2a.protocol().clone())
364    }
365}
366
367impl std::fmt::Debug for A2aQueryTool {
368    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369        f.debug_struct("A2aQueryTool").finish()
370    }
371}
372
373#[async_trait]
374
375impl AgentTool for A2aQueryTool {
376    fn name(&self) -> &str {
377        "a2a_query"
378    }
379
380    fn label(&self) -> &str {
381        "A2A Query"
382    }
383
384    fn description(&self) -> &str {
385        "Discover other agents by capability or skill. Returns a list of available agents \
386         with their names, capabilities, and status."
387    }
388
389    fn parameters_schema(&self) -> Value {
390        json!({
391            "type": "object",
392            "properties": {
393                "capability": {
394                    "type": "string",
395                    "description": "Search for agents with this capability"
396                },
397                "skill": {
398                    "type": "string",
399                    "description": "Search for agents with this skill"
400                },
401                "limit": {
402                    "type": "integer",
403                    "description": "Maximum number of results (default: 10)"
404                }
405            }
406        })
407    }
408
409    async fn execute(
410        &self,
411        _tool_call_id: &str,
412        params: Value,
413        _shutdown: Option<tokio::sync::oneshot::Receiver<()>>,
414        _ctx: &ToolContext,
415    ) -> Result<AgentToolResult, oxi_sdk::ToolError> {
416        let capability = params["capability"].as_str();
417        let skill = params["skill"].as_str();
418        let limit = params["limit"].as_u64().unwrap_or(10) as usize;
419
420        let agents = if let Some(cap) = capability {
421            match self.a2a.query_capabilities(cap).await {
422                Ok(a) => a,
423                Err(e) => return Ok(AgentToolResult::error(format!("Query failed: {e}"))),
424            }
425        } else if let Some(sk) = skill {
426            match self.a2a.registry().find_agents_by_skill(sk).await {
427                Ok(a) => a,
428                Err(e) => return Ok(AgentToolResult::error(format!("Query failed: {e}"))),
429            }
430        } else {
431            // No filter — return all agents.
432            self.a2a.registry().list_agents().await
433        };
434
435        let cards: Vec<Value> = agents
436            .into_iter()
437            .take(limit)
438            .map(|card| {
439                json!({
440                    "agent_id": card.agent_id.to_string(),
441                    "name": card.name,
442                    "description": card.description,
443                    "capabilities": card.capabilities,
444                    "skills": card.skills,
445                    "status": format!("{:?}", card.status),
446                })
447            })
448            .collect();
449
450        Ok(AgentToolResult::success(
451            serde_json::to_string(&json!({
452                "agents": cards,
453                "count": cards.len(),
454            }))
455            .unwrap_or_default(),
456        ))
457    }
458}
459
460// ─── Helpers ────────────────────────────────────────────────────────────
461
462fn parse_priority(s: Option<&str>) -> TaskPriority {
463    match s {
464        Some("low") => TaskPriority::Low,
465        Some("high") => TaskPriority::High,
466        Some("critical") => TaskPriority::Critical,
467        _ => TaskPriority::Normal,
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use super::*;
474    use crate::event_bus::EventBus;
475
476    fn test_a2a() -> Arc<A2AProtocol> {
477        Arc::new(A2AProtocol::new(EventBus::new(256)))
478    }
479
480    async fn register_agent_async(a2a: &A2AProtocol, name: &str, caps: &[&str]) -> AgentId {
481        let id = Uuid::new_v4();
482        let mut card = crate::a2a::AgentCard::new(id, name, format!("Test agent: {name}"));
483        for cap in caps {
484            card = card.with_capability(*cap);
485        }
486        a2a.registry().register_agent(card).await.unwrap();
487        id
488    }
489
490    #[tokio::test]
491    async fn test_a2a_query_finds_capability() {
492        let a2a = test_a2a();
493        register_agent_async(&a2a, "reviewer", &["code-review"]).await;
494
495        let tool = A2aQueryTool::new(a2a.clone());
496        let params = json!({"capability": "code-review"});
497        let result = tool
498            .execute("tc", params, None, &ToolContext::default())
499            .await
500            .unwrap();
501        assert!(result.output.contains("reviewer"));
502        assert!(result.output.contains("1"));
503    }
504
505    #[tokio::test]
506    async fn test_a2a_query_no_match() {
507        let a2a = test_a2a();
508
509        let tool = A2aQueryTool::new(a2a.clone());
510        let params = json!({"capability": "nonexistent"});
511        let result = tool
512            .execute("tc", params, None, &ToolContext::default())
513            .await
514            .unwrap();
515        assert!(result.output.contains("0"));
516    }
517
518    #[tokio::test]
519    async fn test_a2a_query_respects_limit() {
520        let a2a = test_a2a();
521        register_agent_async(&a2a, "a1", &["test"]).await;
522        register_agent_async(&a2a, "a2", &["test"]).await;
523        register_agent_async(&a2a, "a3", &["test"]).await;
524
525        let tool = A2aQueryTool::new(a2a.clone());
526        let params = json!({"capability": "test", "limit": 2});
527        let result = tool
528            .execute("tc", params, None, &ToolContext::default())
529            .await
530            .unwrap();
531        assert!(result.output.contains("2"));
532    }
533
534    #[tokio::test]
535    async fn test_a2a_delegate_no_agents_returns_guidance() {
536        let a2a = test_a2a();
537        let agent_id = Uuid::new_v4();
538
539        let tool = A2aDelegateTool::new(a2a.clone(), agent_id);
540        let params = json!({"description": "review code", "capability": "code-review"});
541        let result = tool
542            .execute("tc", params, None, &ToolContext::default())
543            .await
544            .unwrap();
545        assert!(result.success);
546        assert!(result.output.contains("handle this task yourself"));
547    }
548
549    #[tokio::test]
550    async fn test_a2a_send_invalid_uuid() {
551        let a2a = test_a2a();
552        let agent_id = Uuid::new_v4();
553
554        let tool = A2aSendTool::new(a2a.clone(), agent_id);
555        let params = json!({"target_agent_id": "not-a-uuid", "message_type": "status_update", "content": "hello"});
556        let result = tool
557            .execute("tc", params, None, &ToolContext::default())
558            .await
559            .unwrap();
560        assert!(!result.success);
561        assert!(result.output.contains("Invalid target_agent_id"));
562    }
563
564    #[tokio::test]
565    async fn test_a2a_send_handshake() {
566        let a2a = test_a2a();
567        let my_id = Uuid::new_v4();
568        let target_id = Uuid::new_v4();
569
570        // Register self so handshake can look up name.
571        let card = crate::a2a::AgentCard::new(my_id, "me", "Test agent").with_capability("test");
572        a2a.registry().register_agent(card).await.unwrap();
573
574        let tool = A2aSendTool::new(a2a.clone(), my_id);
575        let params = json!({"target_agent_id": target_id.to_string(), "message_type": "handshake", "content": "hello"});
576        let result = tool
577            .execute("tc", params, None, &ToolContext::default())
578            .await
579            .unwrap();
580        assert!(result.success);
581
582        // Verify message in queue.
583        let msgs = a2a.receive_messages(target_id).await;
584        assert_eq!(msgs.len(), 1);
585    }
586
587    #[test]
588    fn test_parse_priority() {
589        assert!(matches!(parse_priority(Some("low")), TaskPriority::Low));
590        assert!(matches!(parse_priority(Some("high")), TaskPriority::High));
591        assert!(matches!(
592            parse_priority(Some("critical")),
593            TaskPriority::Critical
594        ));
595        assert!(matches!(parse_priority(None), TaskPriority::Normal));
596        assert!(matches!(
597            parse_priority(Some("unknown")),
598            TaskPriority::Normal
599        ));
600    }
601}