Skip to main content

oxios_kernel/tools/
a2a_tools.rs

1//! A2A tools — let agents communicate with other agents at runtime.
2//!
3//! Provides three tools:
4//! - `a2a_delegate` — delegate a task to another agent by capability
5//! - `a2a_send` — send a message to a specific agent
6//! - `a2a_query` — discover agents by capability or skill
7
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use oxi_sdk::{AgentTool, AgentToolResult, ToolContext, ToolError};
12use serde_json::{json, Value};
13use uuid::Uuid;
14
15use crate::a2a::{A2AMessage, A2AProtocol, TaskPriority, TaskSpec};
16use crate::types::AgentId;
17// ─── A2aDelegateTool ───────────────────────────────────────────────────
18
19/// Tool for delegating a task to another agent discovered by capability.
20///
21/// Usage flow:
22/// 1. Agent calls `a2a_delegate` with a description and required capability.
23/// 2. The tool queries the AgentCardRegistry for agents with that capability.
24/// 3. If found, it delegates the task via A2A and waits for the result.
25/// 4. Returns the execution result to the calling agent.
26pub struct A2aDelegateTool {
27    a2a: Arc<A2AProtocol>,
28    my_agent_id: AgentId,
29}
30
31impl A2aDelegateTool {
32    /// Create a new A2A delegate tool.
33    pub fn new(a2a: Arc<A2AProtocol>, agent_id: AgentId) -> Self {
34        Self {
35            a2a,
36            my_agent_id: agent_id,
37        }
38    }
39
40    /// Create an `A2aDelegateTool` from a [`KernelHandle`].
41    ///
42    /// Extracts the A2A protocol from the kernel's a2a facade.
43    pub fn from_kernel(kernel: &crate::kernel_handle::KernelHandle, agent_id: AgentId) -> Self {
44        Self::new(kernel.a2a.protocol().clone(), agent_id)
45    }
46}
47
48impl std::fmt::Debug for A2aDelegateTool {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        f.debug_struct("A2aDelegateTool").finish()
51    }
52}
53
54#[async_trait]
55impl AgentTool for A2aDelegateTool {
56    fn name(&self) -> &str {
57        "a2a_delegate"
58    }
59
60    fn label(&self) -> &str {
61        "A2A Delegate"
62    }
63
64    fn description(&self) -> &str {
65        "Delegate a task to another agent. Specify a capability (e.g. 'code-review', 'testing') \
66         and a description of the work. The system will find a suitable agent, execute the task, \
67         and return the result. This is a blocking call — it waits for the delegated agent to complete."
68    }
69
70    fn parameters_schema(&self) -> Value {
71        json!({
72            "type": "object",
73            "properties": {
74                "description": {
75                    "type": "string",
76                    "description": "Human-readable description of the task to delegate"
77                },
78                "capability": {
79                    "type": "string",
80                    "description": "Required capability of the target agent (e.g. 'code-review', 'testing', 'debugging')"
81                },
82                "payload": {
83                    "type": "object",
84                    "description": "Structured data for the task (optional)"
85                },
86                "priority": {
87                    "type": "string",
88                    "enum": ["low", "normal", "high", "critical"],
89                    "description": "Task priority (default: normal)"
90                }
91            },
92            "required": ["description", "capability"]
93        })
94    }
95
96    async fn execute(
97        &self,
98        _tool_call_id: &str,
99        params: Value,
100        _shutdown: Option<tokio::sync::oneshot::Receiver<()>>,
101        _ctx: &ToolContext,
102    ) -> Result<AgentToolResult, ToolError> {
103        let description = params["description"].as_str().unwrap_or("").to_string();
104        if description.is_empty() {
105            return Ok(AgentToolResult::error(
106                "Missing required parameter: description",
107            ));
108        }
109
110        let capability = params["capability"].as_str().unwrap_or("").to_string();
111        if capability.is_empty() {
112            return Ok(AgentToolResult::error(
113                "Missing required parameter: capability",
114            ));
115        }
116
117        let payload = params.get("payload").cloned().unwrap_or(json!({}));
118        let priority = parse_priority(params["priority"].as_str());
119
120        let my_id = self.my_agent_id;
121
122        // 1. Find agents with the required capability.
123        let candidates = match self.a2a.query_capabilities(&capability).await {
124            Ok(c) => c,
125            Err(e) => {
126                return Ok(AgentToolResult::error(format!(
127                    "Failed to query capabilities: {e}"
128                )))
129            }
130        };
131
132        if candidates.is_empty() {
133            // No agent available — guide the LLM to handle it itself.
134            return Ok(AgentToolResult::success(format!(
135                "No agents currently available with capability '{}'. You should handle this task yourself.",
136                capability
137            )));
138        }
139
140        // 2. Pick the first available agent (could be smarter — load-balancing later).
141        let target = &candidates[0];
142        let target_id = target.agent_id;
143
144        tracing::info!(
145            from = %my_id,
146            to = %target_id,
147            target_name = %target.name,
148            capability = %capability,
149            "A2A delegating task"
150        );
151
152        // 3. Create task spec and delegate via A2A.
153        let task = TaskSpec::new(&description, payload.clone()).with_priority(priority);
154        let task_id = task.task_id;
155
156        // 4. Execute via dispatch handler (blocking — waits for result).
157        match self.a2a.execute_delegation(my_id, target_id, task).await {
158            Some(Ok(result)) => Ok(AgentToolResult::success(
159                serde_json::to_string(&json!({
160                    "task_id": task_id.to_string(),
161                    "delegated_to": target.name,
162                    "delegated_to_id": target_id.to_string(),
163                    "status": "completed",
164                    "result": result,
165                }))
166                .unwrap_or_default(),
167            )),
168            Some(Err(e)) => Ok(AgentToolResult::error(format!(
169                "A2A delegation failed: {}",
170                e
171            ))),
172            None => {
173                // No handler registered — fall back to fire-and-forget.
174                tracing::warn!("No A2A dispatch handler registered, using fire-and-forget");
175                match self
176                    .a2a
177                    .delegate_task(my_id, target_id, TaskSpec::new(&description, payload))
178                    .await
179                {
180                    Ok(_) => Ok(AgentToolResult::success(format!(
181                        "Task delegated to '{}' (no handler — fire-and-forget). Task ID: {}",
182                        target.name, task_id
183                    ))),
184                    Err(e) => Ok(AgentToolResult::error(format!("Delegation failed: {e}"))),
185                }
186            }
187        }
188    }
189}
190
191// ─── A2aSendTool ───────────────────────────────────────────────────────
192
193/// Tool for sending a direct message to a specific agent.
194///
195/// Unlike `a2a_delegate`, this sends a fire-and-forget message.
196/// Use for status updates, notifications, or simple queries.
197pub struct A2aSendTool {
198    a2a: Arc<A2AProtocol>,
199    my_agent_id: AgentId,
200}
201
202impl A2aSendTool {
203    /// Create a new A2A send tool.
204    pub fn new(a2a: Arc<A2AProtocol>, agent_id: AgentId) -> Self {
205        Self {
206            a2a,
207            my_agent_id: agent_id,
208        }
209    }
210
211    /// Create an `A2aSendTool` from a [`KernelHandle`].
212    ///
213    /// Extracts the A2A protocol from the kernel's a2a facade.
214    pub fn from_kernel(kernel: &crate::kernel_handle::KernelHandle, agent_id: AgentId) -> Self {
215        Self::new(kernel.a2a.protocol().clone(), agent_id)
216    }
217}
218
219impl std::fmt::Debug for A2aSendTool {
220    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221        f.debug_struct("A2aSendTool").finish()
222    }
223}
224
225#[async_trait]
226impl AgentTool for A2aSendTool {
227    fn name(&self) -> &str {
228        "a2a_send"
229    }
230
231    fn label(&self) -> &str {
232        "A2A Send"
233    }
234
235    fn description(&self) -> &str {
236        "Send a message to a specific agent by ID. Fire-and-forget — does not wait for a response. \
237         Use for status updates, notifications, or sharing information."
238    }
239
240    fn parameters_schema(&self) -> Value {
241        json!({
242            "type": "object",
243            "properties": {
244                "target_agent_id": {
245                    "type": "string",
246                    "description": "UUID of the target agent"
247                },
248                "message_type": {
249                    "type": "string",
250                    "enum": ["status_update", "result_sharing", "handshake"],
251                    "description": "Type of message to send (default: status_update)"
252                },
253                "content": {
254                    "type": "string",
255                    "description": "The message content"
256                },
257                "task_id": {
258                    "type": "string",
259                    "description": "Task UUID this message relates to (for status_update and result_sharing)"
260                },
261                "payload": {
262                    "type": "object",
263                    "description": "Structured data to share (optional, for result_sharing)"
264                },
265                "progress": {
266                    "type": "integer",
267                    "description": "Progress percentage for status updates (0-100)"
268                }
269            },
270            "required": ["target_agent_id", "message_type", "content"]
271        })
272    }
273
274    async fn execute(
275        &self,
276        _tool_call_id: &str,
277        params: Value,
278        _shutdown: Option<tokio::sync::oneshot::Receiver<()>>,
279        _ctx: &ToolContext,
280    ) -> Result<AgentToolResult, ToolError> {
281        let target_str = params["target_agent_id"].as_str().unwrap_or("");
282        let target_id: AgentId = match Uuid::parse_str(target_str) {
283            Ok(id) => id,
284            Err(e) => {
285                return Ok(AgentToolResult::error(format!(
286                    "Invalid target_agent_id: {e}"
287                )))
288            }
289        };
290
291        let message_type = params["message_type"].as_str().unwrap_or("status_update");
292        let content = params["content"].as_str().unwrap_or("").to_string();
293        let payload = params.get("payload").cloned().unwrap_or(json!({}));
294        let progress = params["progress"].as_u64().unwrap_or(0) as u8;
295        let task_id: Uuid = params["task_id"]
296            .as_str()
297            .and_then(|s| Uuid::parse_str(s).ok())
298            .unwrap_or_else(Uuid::new_v4);
299
300        let my_id = self.my_agent_id;
301
302        let message = match message_type {
303            "status_update" => A2AMessage::StatusUpdate {
304                task_id,
305                progress,
306                message: content,
307            },
308            "result_sharing" => A2AMessage::ResultSharing {
309                task_id,
310                result: payload,
311                summary: content,
312            },
313            "handshake" => {
314                let card = self.a2a.registry().get_agent(my_id).await;
315                let (name, capabilities) = card
316                    .map(|c| (c.name, c.capabilities))
317                    .unwrap_or(("unknown".into(), vec![]));
318                A2AMessage::Handshake {
319                    agent_id: my_id,
320                    name,
321                    capabilities,
322                }
323            }
324            _ => {
325                return Ok(AgentToolResult::error(format!(
326                    "Unknown message_type: {message_type}"
327                )))
328            }
329        };
330
331        match self.a2a.send_message(my_id, target_id, message).await {
332            Ok(request_id) => Ok(AgentToolResult::success(
333                serde_json::to_string(&json!({
334                    "request_id": request_id.to_string(),
335                    "sent_to": target_str,
336                }))
337                .unwrap_or_default(),
338            )),
339            Err(e) => Ok(AgentToolResult::error(format!("Failed to send: {e}"))),
340        }
341    }
342}
343
344// ─── A2aQueryTool ──────────────────────────────────────────────────────
345
346/// Tool for discovering other agents by capability.
347///
348/// Returns a list of agent cards matching the requested capability or skill.
349pub struct A2aQueryTool {
350    a2a: Arc<A2AProtocol>,
351}
352
353impl A2aQueryTool {
354    /// Create a new A2A query tool.
355    pub fn new(a2a: Arc<A2AProtocol>) -> Self {
356        Self { a2a }
357    }
358
359    /// Create an `A2aQueryTool` from a [`KernelHandle`].
360    ///
361    /// Extracts the A2A protocol from the kernel's a2a facade.
362    pub fn from_kernel(kernel: &crate::kernel_handle::KernelHandle) -> Self {
363        Self::new(kernel.a2a.protocol().clone())
364    }
365}
366
367impl std::fmt::Debug for A2aQueryTool {
368    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369        f.debug_struct("A2aQueryTool").finish()
370    }
371}
372
373#[async_trait]
374impl AgentTool for A2aQueryTool {
375    fn name(&self) -> &str {
376        "a2a_query"
377    }
378
379    fn label(&self) -> &str {
380        "A2A Query"
381    }
382
383    fn description(&self) -> &str {
384        "Discover other agents by capability or skill. Returns a list of available agents \
385         with their names, capabilities, and status."
386    }
387
388    fn parameters_schema(&self) -> Value {
389        json!({
390            "type": "object",
391            "properties": {
392                "capability": {
393                    "type": "string",
394                    "description": "Search for agents with this capability"
395                },
396                "skill": {
397                    "type": "string",
398                    "description": "Search for agents with this skill"
399                },
400                "limit": {
401                    "type": "integer",
402                    "description": "Maximum number of results (default: 10)"
403                }
404            }
405        })
406    }
407
408    async fn execute(
409        &self,
410        _tool_call_id: &str,
411        params: Value,
412        _shutdown: Option<tokio::sync::oneshot::Receiver<()>>,
413        _ctx: &ToolContext,
414    ) -> Result<AgentToolResult, ToolError> {
415        let capability = params["capability"].as_str();
416        let skill = params["skill"].as_str();
417        let limit = params["limit"].as_u64().unwrap_or(10) as usize;
418
419        let agents = if let Some(cap) = capability {
420            match self.a2a.query_capabilities(cap).await {
421                Ok(a) => a,
422                Err(e) => return Ok(AgentToolResult::error(format!("Query failed: {e}"))),
423            }
424        } else if let Some(sk) = skill {
425            match self.a2a.registry().find_agents_by_skill(sk).await {
426                Ok(a) => a,
427                Err(e) => return Ok(AgentToolResult::error(format!("Query failed: {e}"))),
428            }
429        } else {
430            // No filter — return all agents.
431            self.a2a.registry().list_agents().await
432        };
433
434        let cards: Vec<Value> = agents
435            .into_iter()
436            .take(limit)
437            .map(|card| {
438                json!({
439                    "agent_id": card.agent_id.to_string(),
440                    "name": card.name,
441                    "description": card.description,
442                    "capabilities": card.capabilities,
443                    "skills": card.skills,
444                    "status": format!("{:?}", card.status),
445                })
446            })
447            .collect();
448
449        Ok(AgentToolResult::success(
450            serde_json::to_string(&json!({
451                "agents": cards,
452                "count": cards.len(),
453            }))
454            .unwrap_or_default(),
455        ))
456    }
457}
458
459// ─── Helpers ────────────────────────────────────────────────────────────
460
461fn parse_priority(s: Option<&str>) -> TaskPriority {
462    match s {
463        Some("low") => TaskPriority::Low,
464        Some("high") => TaskPriority::High,
465        Some("critical") => TaskPriority::Critical,
466        _ => TaskPriority::Normal,
467    }
468}
469
470#[cfg(test)]
471mod tests {
472    use super::*;
473    use crate::event_bus::EventBus;
474    use crate::types::AgentStatus;
475
476    fn test_a2a() -> Arc<A2AProtocol> {
477        Arc::new(A2AProtocol::new(EventBus::new(256)))
478    }
479
480    fn register_test_agent(a2a: &A2AProtocol, name: &str, caps: &[&str]) -> AgentId {
481        let id = Uuid::new_v4();
482        let mut card = crate::a2a::AgentCard::new(id, name, format!("Test agent: {name}"));
483        for cap in caps {
484            card = card.with_capability(*cap);
485        }
486        // Caller must be in async context.
487        // Use `futures::executor::block_on` or call from #[tokio::test].
488        id
489    }
490
491    async fn register_agent_async(a2a: &A2AProtocol, name: &str, caps: &[&str]) -> AgentId {
492        let id = Uuid::new_v4();
493        let mut card = crate::a2a::AgentCard::new(id, name, format!("Test agent: {name}"));
494        for cap in caps {
495            card = card.with_capability(*cap);
496        }
497        a2a.registry().register_agent(card).await.unwrap();
498        id
499    }
500
501    #[tokio::test]
502    async fn test_a2a_query_finds_capability() {
503        let a2a = test_a2a();
504        register_agent_async(&a2a, "reviewer", &["code-review"]).await;
505
506        let tool = A2aQueryTool::new(a2a.clone());
507        let params = json!({"capability": "code-review"});
508        let result = tool.execute("tc", params, None).await.unwrap();
509        assert!(result.output.contains("reviewer"));
510        assert!(result.output.contains("1"));
511    }
512
513    #[tokio::test]
514    async fn test_a2a_query_no_match() {
515        let a2a = test_a2a();
516
517        let tool = A2aQueryTool::new(a2a.clone());
518        let params = json!({"capability": "nonexistent"});
519        let result = tool.execute("tc", params, None).await.unwrap();
520        assert!(result.output.contains("0"));
521    }
522
523    #[tokio::test]
524    async fn test_a2a_query_respects_limit() {
525        let a2a = test_a2a();
526        register_agent_async(&a2a, "a1", &["test"]).await;
527        register_agent_async(&a2a, "a2", &["test"]).await;
528        register_agent_async(&a2a, "a3", &["test"]).await;
529
530        let tool = A2aQueryTool::new(a2a.clone());
531        let params = json!({"capability": "test", "limit": 2});
532        let result = tool.execute("tc", params, None).await.unwrap();
533        assert!(result.output.contains("2"));
534    }
535
536    #[tokio::test]
537    async fn test_a2a_delegate_no_agents_returns_guidance() {
538        let a2a = test_a2a();
539        let agent_id = Uuid::new_v4();
540
541        let tool = A2aDelegateTool::new(a2a.clone(), agent_id);
542        let params = json!({"description": "review code", "capability": "code-review"});
543        let result = tool.execute("tc", params, None).await.unwrap();
544        assert!(result.success);
545        assert!(result.output.contains("handle this task yourself"));
546    }
547
548    #[tokio::test]
549    async fn test_a2a_send_invalid_uuid() {
550        let a2a = test_a2a();
551        let agent_id = Uuid::new_v4();
552
553        let tool = A2aSendTool::new(a2a.clone(), agent_id);
554        let params = json!({"target_agent_id": "not-a-uuid", "message_type": "status_update", "content": "hello"});
555        let result = tool.execute("tc", params, None).await.unwrap();
556        assert!(!result.success);
557        assert!(result.output.contains("Invalid target_agent_id"));
558    }
559
560    #[tokio::test]
561    async fn test_a2a_send_handshake() {
562        let a2a = test_a2a();
563        let my_id = Uuid::new_v4();
564        let target_id = Uuid::new_v4();
565
566        // Register self so handshake can look up name.
567        let card = crate::a2a::AgentCard::new(my_id, "me", "Test agent").with_capability("test");
568        a2a.registry().register_agent(card).await.unwrap();
569
570        let tool = A2aSendTool::new(a2a.clone(), my_id);
571        let params = json!({"target_agent_id": target_id.to_string(), "message_type": "handshake", "content": "hello"});
572        let result = tool.execute("tc", params, None).await.unwrap();
573        assert!(result.success);
574
575        // Verify message in queue.
576        let msgs = a2a.receive_messages(target_id).await;
577        assert_eq!(msgs.len(), 1);
578    }
579
580    #[test]
581    fn test_parse_priority() {
582        assert!(matches!(parse_priority(Some("low")), TaskPriority::Low));
583        assert!(matches!(parse_priority(Some("high")), TaskPriority::High));
584        assert!(matches!(
585            parse_priority(Some("critical")),
586            TaskPriority::Critical
587        ));
588        assert!(matches!(parse_priority(None), TaskPriority::Normal));
589        assert!(matches!(
590            parse_priority(Some("unknown")),
591            TaskPriority::Normal
592        ));
593    }
594}