Skip to main content

routa_core/tools/
mod.rs

1//! AgentTools - Core coordination tools for multi-agent collaboration.
2//!
3//! Port of the TypeScript AgentTools from src/core/tools/agent-tools.ts
4//!
5//! Provides coordination tools:
6//!   1. listAgents        - List agents in a workspace
7//!   2. readAgentConversation - Read another agent's conversation
8//!   3. createAgent       - Create ROUTA/CRAFTER/GATE agents
9//!   4. delegate          - Assign task to agent
10//!   5. messageAgent      - Inter-agent messaging
11//!   6. reportToParent    - Completion report to parent
12//!   7. createTask        - Create a new task
13//!   8. getTask           - Get task by ID
14//!   9. listTasks         - List tasks in workspace
15//!  10. updateTaskStatus  - Update task status
16//!  11. subscribeToEvents - Subscribe to workspace events
17//!  12. unsubscribeFromEvents - Unsubscribe
18
19use serde::{Deserialize, Serialize};
20
21use crate::error::ServerError;
22use crate::events::{AgentEvent, AgentEventType, EventBus, EventSubscription};
23use crate::models::agent::{Agent, AgentRole, AgentStatus, ModelTier};
24use crate::models::message::{Message, MessageRole};
25use crate::models::task::{Task, TaskStatus};
26use crate::store::{AgentStore, ConversationStore, TaskStore};
27
28/// Result of a tool operation.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct ToolResult {
32    pub success: bool,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub data: Option<serde_json::Value>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub error: Option<String>,
37}
38
39impl ToolResult {
40    pub fn success(data: impl Serialize) -> Self {
41        Self {
42            success: true,
43            data: Some(serde_json::to_value(data).unwrap_or_default()),
44            error: None,
45        }
46    }
47
48    pub fn error(msg: impl Into<String>) -> Self {
49        Self {
50            success: false,
51            data: None,
52            error: Some(msg.into()),
53        }
54    }
55}
56
57/// Completion report from a child agent.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(rename_all = "camelCase")]
60pub struct CompletionReport {
61    pub agent_id: String,
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub task_id: Option<String>,
64    pub summary: String,
65    pub success: bool,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub files_modified: Option<Vec<String>>,
68}
69
70/// AgentTools provides coordination tools for multi-agent collaboration.
71pub struct AgentTools {
72    agent_store: AgentStore,
73    conversation_store: ConversationStore,
74    task_store: TaskStore,
75    event_bus: EventBus,
76}
77
78impl AgentTools {
79    pub fn new(
80        agent_store: AgentStore,
81        conversation_store: ConversationStore,
82        task_store: TaskStore,
83        event_bus: EventBus,
84    ) -> Self {
85        Self {
86            agent_store,
87            conversation_store,
88            task_store,
89            event_bus,
90        }
91    }
92
93    // ─── Tool 1: List Agents ─────────────────────────────────────────────
94
95    pub async fn list_agents(&self, workspace_id: &str) -> Result<ToolResult, ServerError> {
96        let agents = self.agent_store.list_by_workspace(workspace_id).await?;
97        let summary: Vec<serde_json::Value> = agents
98            .iter()
99            .map(|a| {
100                serde_json::json!({
101                    "id": a.id,
102                    "name": a.name,
103                    "role": a.role,
104                    "status": a.status,
105                    "parentId": a.parent_id,
106                })
107            })
108            .collect();
109        Ok(ToolResult::success(summary))
110    }
111
112    // ─── Tool 2: Read Agent Conversation ─────────────────────────────────
113
114    pub async fn read_agent_conversation(
115        &self,
116        agent_id: &str,
117        last_n: Option<usize>,
118        start_turn: Option<i32>,
119        end_turn: Option<i32>,
120        include_tool_calls: bool,
121    ) -> Result<ToolResult, ServerError> {
122        let agent = self.agent_store.get(agent_id).await?;
123        let agent = match agent {
124            Some(a) => a,
125            None => return Ok(ToolResult::error(format!("Agent not found: {agent_id}"))),
126        };
127
128        let mut messages = if let Some(n) = last_n {
129            self.conversation_store.get_last_n(agent_id, n).await?
130        } else if let (Some(start), Some(end)) = (start_turn, end_turn) {
131            self.conversation_store
132                .get_by_turn_range(agent_id, start, end)
133                .await?
134        } else {
135            self.conversation_store.get_conversation(agent_id).await?
136        };
137
138        if !include_tool_calls {
139            messages.retain(|m| m.role != MessageRole::Tool);
140        }
141
142        Ok(ToolResult::success(serde_json::json!({
143            "agentId": agent_id,
144            "agentName": agent.name,
145            "messageCount": messages.len(),
146            "messages": messages.iter().map(|m| serde_json::json!({
147                "role": m.role,
148                "content": m.content,
149                "turn": m.turn,
150                "toolName": m.tool_name,
151                "timestamp": m.timestamp.to_rfc3339(),
152            })).collect::<Vec<_>>(),
153        })))
154    }
155
156    // ─── Tool 3: Create Agent ────────────────────────────────────────────
157
158    pub async fn create_agent(
159        &self,
160        name: &str,
161        role: &str,
162        workspace_id: &str,
163        parent_id: Option<&str>,
164        model_tier: Option<&str>,
165    ) -> Result<ToolResult, ServerError> {
166        let role = match AgentRole::from_str(role) {
167            Some(r) => r,
168            None => {
169                return Ok(ToolResult::error(format!(
170                    "Invalid role: {role}. Must be one of: ROUTA, CRAFTER, GATE, DEVELOPER"
171                )))
172            }
173        };
174
175        let model_tier = model_tier
176            .and_then(ModelTier::from_str)
177            .unwrap_or(ModelTier::Smart);
178
179        let agent = Agent::new(
180            uuid::Uuid::new_v4().to_string(),
181            name.to_string(),
182            role.clone(),
183            workspace_id.to_string(),
184            parent_id.map(|s| s.to_string()),
185            Some(model_tier),
186            None,
187        );
188
189        self.agent_store.save(&agent).await?;
190
191        self.event_bus
192            .emit(AgentEvent {
193                event_type: AgentEventType::AgentCreated,
194                agent_id: agent.id.clone(),
195                workspace_id: workspace_id.to_string(),
196                data: serde_json::json!({ "name": agent.name, "role": agent.role }),
197                timestamp: chrono::Utc::now(),
198            })
199            .await;
200
201        Ok(ToolResult::success(serde_json::json!({
202            "agentId": agent.id,
203            "name": agent.name,
204            "role": agent.role,
205            "status": agent.status,
206        })))
207    }
208
209    // ─── Tool 4: Delegate Task ──────────────────────────────────────────
210
211    pub async fn delegate(
212        &self,
213        agent_id: &str,
214        task_id: &str,
215        caller_agent_id: &str,
216    ) -> Result<ToolResult, ServerError> {
217        let agent = match self.agent_store.get(agent_id).await? {
218            Some(a) => a,
219            None => return Ok(ToolResult::error(format!("Agent not found: {agent_id}"))),
220        };
221
222        let mut task = match self.task_store.get(task_id).await? {
223            Some(t) => t,
224            None => return Ok(ToolResult::error(format!("Task not found: {task_id}"))),
225        };
226
227        // Assign and activate
228        task.assigned_to = Some(agent_id.to_string());
229        task.status = TaskStatus::InProgress;
230        task.updated_at = chrono::Utc::now();
231        self.task_store.save(&task).await?;
232
233        self.agent_store
234            .update_status(agent_id, &AgentStatus::Active)
235            .await?;
236
237        // Record delegation as a conversation message
238        let message = Message::new(
239            uuid::Uuid::new_v4().to_string(),
240            agent_id.to_string(),
241            MessageRole::User,
242            format!(
243                "Task delegated: {}\nObjective: {}",
244                task.title, task.objective
245            ),
246            None,
247            None,
248            None,
249        );
250        self.conversation_store.append(&message).await?;
251
252        self.event_bus
253            .emit(AgentEvent {
254                event_type: AgentEventType::TaskAssigned,
255                agent_id: agent_id.to_string(),
256                workspace_id: agent.workspace_id.clone(),
257                data: serde_json::json!({
258                    "taskId": task_id,
259                    "callerAgentId": caller_agent_id,
260                    "taskTitle": task.title,
261                }),
262                timestamp: chrono::Utc::now(),
263            })
264            .await;
265
266        Ok(ToolResult::success(serde_json::json!({
267            "agentId": agent_id,
268            "taskId": task_id,
269            "status": "delegated",
270        })))
271    }
272
273    // ─── Tool 5: Message Agent ──────────────────────────────────────────
274
275    pub async fn message_agent(
276        &self,
277        from_agent_id: &str,
278        to_agent_id: &str,
279        message: &str,
280    ) -> Result<ToolResult, ServerError> {
281        let to_agent = match self.agent_store.get(to_agent_id).await? {
282            Some(a) => a,
283            None => {
284                return Ok(ToolResult::error(format!(
285                    "Target agent not found: {to_agent_id}"
286                )))
287            }
288        };
289
290        let msg = Message::new(
291            uuid::Uuid::new_v4().to_string(),
292            to_agent_id.to_string(),
293            MessageRole::User,
294            format!("[From agent {from_agent_id}]: {message}"),
295            None,
296            None,
297            None,
298        );
299        self.conversation_store.append(&msg).await?;
300
301        self.event_bus
302            .emit(AgentEvent {
303                event_type: AgentEventType::MessageSent,
304                agent_id: from_agent_id.to_string(),
305                workspace_id: to_agent.workspace_id.clone(),
306                data: serde_json::json!({
307                    "fromAgentId": from_agent_id,
308                    "toAgentId": to_agent_id,
309                    "messagePreview": &message[..message.len().min(200)],
310                }),
311                timestamp: chrono::Utc::now(),
312            })
313            .await;
314
315        Ok(ToolResult::success(serde_json::json!({
316            "delivered": true,
317            "toAgentId": to_agent_id,
318            "fromAgentId": from_agent_id,
319        })))
320    }
321
322    // ─── Tool 6: Report to Parent ───────────────────────────────────────
323
324    pub async fn report_to_parent(
325        &self,
326        agent_id: &str,
327        report: CompletionReport,
328    ) -> Result<ToolResult, ServerError> {
329        let agent = match self.agent_store.get(agent_id).await? {
330            Some(a) => a,
331            None => return Ok(ToolResult::error(format!("Agent not found: {agent_id}"))),
332        };
333
334        let parent_id = match &agent.parent_id {
335            Some(p) => p.clone(),
336            None => {
337                return Ok(ToolResult::error(format!(
338                    "Agent {agent_id} has no parent to report to"
339                )))
340            }
341        };
342
343        // Update task status
344        if let Some(task_id) = &report.task_id {
345            if let Some(mut task) = self.task_store.get(task_id).await? {
346                task.status = if report.success {
347                    TaskStatus::Completed
348                } else {
349                    TaskStatus::NeedsFix
350                };
351                task.completion_summary = Some(report.summary.clone());
352                task.updated_at = chrono::Utc::now();
353                self.task_store.save(&task).await?;
354            }
355        }
356
357        // Mark agent completed
358        self.agent_store
359            .update_status(agent_id, &AgentStatus::Completed)
360            .await?;
361
362        // Deliver report as message to parent
363        let content = format!(
364            "[Completion Report from {} ({})]\nTask: {:?}\nSuccess: {}\nSummary: {}\n{}",
365            agent.name,
366            agent_id,
367            report.task_id,
368            report.success,
369            report.summary,
370            report
371                .files_modified
372                .as_ref()
373                .map(|f| format!("Files Modified: {}", f.join(", ")))
374                .unwrap_or_default()
375        );
376
377        let msg = Message::new(
378            uuid::Uuid::new_v4().to_string(),
379            parent_id.clone(),
380            MessageRole::User,
381            content,
382            None,
383            None,
384            None,
385        );
386        self.conversation_store.append(&msg).await?;
387
388        self.event_bus
389            .emit(AgentEvent {
390                event_type: AgentEventType::ReportSubmitted,
391                agent_id: agent_id.to_string(),
392                workspace_id: agent.workspace_id.clone(),
393                data: serde_json::json!({
394                    "parentId": parent_id,
395                    "taskId": report.task_id,
396                    "success": report.success,
397                }),
398                timestamp: chrono::Utc::now(),
399            })
400            .await;
401
402        Ok(ToolResult::success(serde_json::json!({
403            "reported": true,
404            "parentId": parent_id,
405            "success": report.success,
406        })))
407    }
408
409    // ─── Tool 7: Create Task ────────────────────────────────────────────
410    #[allow(clippy::too_many_arguments)]
411    pub async fn create_task(
412        &self,
413        title: &str,
414        objective: &str,
415        workspace_id: &str,
416        session_id: Option<&str>,
417        scope: Option<&str>,
418        acceptance_criteria: Option<Vec<String>>,
419        verification_commands: Option<Vec<String>>,
420        test_cases: Option<Vec<String>>,
421        dependencies: Option<Vec<String>>,
422        parallel_group: Option<&str>,
423    ) -> Result<ToolResult, ServerError> {
424        let task = Task::new(
425            uuid::Uuid::new_v4().to_string(),
426            title.to_string(),
427            objective.to_string(),
428            workspace_id.to_string(),
429            session_id.map(|s| s.to_string()),
430            scope.map(|s| s.to_string()),
431            acceptance_criteria,
432            verification_commands,
433            test_cases,
434            dependencies,
435            parallel_group.map(|s| s.to_string()),
436        );
437
438        self.task_store.save(&task).await?;
439
440        Ok(ToolResult::success(serde_json::json!({
441            "taskId": task.id,
442            "title": task.title,
443            "status": task.status,
444        })))
445    }
446
447    // ─── Tool 8: Get Task ─────────────────────────────────────────────────
448
449    pub async fn get_task(&self, task_id: &str) -> Result<ToolResult, ServerError> {
450        match self.task_store.get(task_id).await? {
451            Some(task) => Ok(ToolResult::success(task)),
452            None => Ok(ToolResult::error(format!("Task not found: {task_id}"))),
453        }
454    }
455
456    // ─── Tool 9: List Tasks ───────────────────────────────────────────────
457
458    pub async fn list_tasks(&self, workspace_id: &str) -> Result<ToolResult, ServerError> {
459        let tasks = self.task_store.list_by_workspace(workspace_id).await?;
460        let summary: Vec<serde_json::Value> = tasks
461            .iter()
462            .map(|t| {
463                serde_json::json!({
464                    "id": t.id,
465                    "title": t.title,
466                    "status": t.status,
467                    "assignedTo": t.assigned_to,
468                    "verificationVerdict": t.verification_verdict,
469                })
470            })
471            .collect();
472        Ok(ToolResult::success(summary))
473    }
474
475    // ─── Tool 10: Update Task Status ────────────────────────────────────
476
477    pub async fn update_task_status(
478        &self,
479        task_id: &str,
480        status: &str,
481        agent_id: &str,
482        summary: Option<&str>,
483    ) -> Result<ToolResult, ServerError> {
484        let new_status = match TaskStatus::from_str(status) {
485            Some(s) => s,
486            None => {
487                return Ok(ToolResult::error(format!(
488                    "Invalid status: {status}. Must be one of: PENDING, IN_PROGRESS, REVIEW_REQUIRED, COMPLETED, NEEDS_FIX, BLOCKED, CANCELLED"
489                )))
490            }
491        };
492
493        let mut task = match self.task_store.get(task_id).await? {
494            Some(t) => t,
495            None => return Ok(ToolResult::error(format!("Task not found: {task_id}"))),
496        };
497
498        let old_status = task.status.clone();
499        task.status = new_status.clone();
500        if let Some(s) = summary {
501            task.completion_summary = Some(s.to_string());
502        }
503        task.updated_at = chrono::Utc::now();
504        self.task_store.save(&task).await?;
505
506        // Emit status change event
507        self.event_bus
508            .emit(AgentEvent {
509                event_type: AgentEventType::TaskStatusChanged,
510                agent_id: agent_id.to_string(),
511                workspace_id: task.workspace_id.clone(),
512                data: serde_json::json!({
513                    "taskId": task_id,
514                    "oldStatus": old_status,
515                    "newStatus": new_status,
516                    "summary": summary,
517                }),
518                timestamp: chrono::Utc::now(),
519            })
520            .await;
521
522        // Also emit TASK_COMPLETED if applicable
523        if new_status == TaskStatus::Completed {
524            self.event_bus
525                .emit(AgentEvent {
526                    event_type: AgentEventType::TaskCompleted,
527                    agent_id: agent_id.to_string(),
528                    workspace_id: task.workspace_id.clone(),
529                    data: serde_json::json!({
530                        "taskId": task_id,
531                        "taskTitle": task.title,
532                        "summary": summary,
533                    }),
534                    timestamp: chrono::Utc::now(),
535                })
536                .await;
537        }
538
539        Ok(ToolResult::success(serde_json::json!({
540            "taskId": task_id,
541            "oldStatus": old_status,
542            "newStatus": new_status,
543            "updatedAt": task.updated_at.to_rfc3339(),
544        })))
545    }
546
547    // ─── Tool 11: Subscribe to Events ───────────────────────────────────
548
549    #[allow(clippy::too_many_arguments)]
550    pub async fn subscribe_to_events(
551        &self,
552        agent_id: &str,
553        agent_name: &str,
554        event_types: Vec<String>,
555        exclude_self: bool,
556        one_shot: bool,
557        wait_group_id: Option<String>,
558        priority: i32,
559    ) -> Result<ToolResult, ServerError> {
560        let valid_types: Vec<AgentEventType> = event_types
561            .iter()
562            .filter_map(|t| AgentEventType::from_str(t))
563            .collect();
564
565        if valid_types.is_empty() {
566            return Ok(ToolResult::error(format!(
567                "No valid event types. Available: {}",
568                EventBus::all_event_types().join(", ")
569            )));
570        }
571
572        let subscription_id = uuid::Uuid::new_v4().to_string();
573        self.event_bus
574            .subscribe(EventSubscription {
575                id: subscription_id.clone(),
576                agent_id: agent_id.to_string(),
577                agent_name: agent_name.to_string(),
578                event_types: valid_types.clone(),
579                exclude_self,
580                one_shot,
581                wait_group_id: wait_group_id.clone(),
582                priority,
583            })
584            .await;
585
586        Ok(ToolResult::success(serde_json::json!({
587            "subscriptionId": subscription_id,
588            "eventTypes": valid_types,
589            "oneShot": one_shot,
590            "waitGroupId": wait_group_id,
591            "priority": priority,
592        })))
593    }
594
595    // ─── Tool 12: Unsubscribe from Events ──────────────────────────────
596
597    pub async fn unsubscribe_from_events(
598        &self,
599        subscription_id: &str,
600    ) -> Result<ToolResult, ServerError> {
601        let removed = self.event_bus.unsubscribe(subscription_id).await;
602        Ok(ToolResult::success(serde_json::json!({
603            "unsubscribed": removed,
604            "subscriptionId": subscription_id,
605        })))
606    }
607
608    // ─── Internal: Drain Pending Events ─────────────────────────────────
609
610    pub async fn drain_pending_events(&self, agent_id: &str) -> Result<ToolResult, ServerError> {
611        let events = self.event_bus.drain_pending_events(agent_id).await;
612        let event_data: Vec<serde_json::Value> = events
613            .iter()
614            .map(|e| {
615                serde_json::json!({
616                    "type": e.event_type,
617                    "agentId": e.agent_id,
618                    "data": e.data,
619                    "timestamp": e.timestamp.to_rfc3339(),
620                })
621            })
622            .collect();
623        Ok(ToolResult::success(
624            serde_json::json!({ "events": event_data }),
625        ))
626    }
627
628    // ─── Tool: Get Agent Status ───────────────────────────────────────
629
630    pub async fn get_agent_status(&self, agent_id: &str) -> Result<ToolResult, ServerError> {
631        let agent = match self.agent_store.get(agent_id).await? {
632            Some(a) => a,
633            None => return Ok(ToolResult::error(format!("Agent not found: {agent_id}"))),
634        };
635
636        let message_count = self.conversation_store.get_message_count(agent_id).await?;
637        let tasks = self.task_store.list_by_assignee(agent_id).await?;
638
639        Ok(ToolResult::success(serde_json::json!({
640            "agentId": agent.id,
641            "name": agent.name,
642            "role": agent.role,
643            "status": agent.status,
644            "modelTier": agent.model_tier,
645            "parentId": agent.parent_id,
646            "messageCount": message_count,
647            "tasks": tasks.iter().map(|t| serde_json::json!({
648                "id": t.id,
649                "title": t.title,
650                "status": t.status,
651            })).collect::<Vec<_>>(),
652        })))
653    }
654
655    // ─── Tool: Get Agent Summary ─────────────────────────────────────
656
657    pub async fn get_agent_summary(&self, agent_id: &str) -> Result<ToolResult, ServerError> {
658        let agent = match self.agent_store.get(agent_id).await? {
659            Some(a) => a,
660            None => return Ok(ToolResult::error(format!("Agent not found: {agent_id}"))),
661        };
662
663        let message_count = self.conversation_store.get_message_count(agent_id).await?;
664        let last_messages = self.conversation_store.get_last_n(agent_id, 3).await?;
665        let tasks = self.task_store.list_by_assignee(agent_id).await?;
666
667        let last_response = last_messages
668            .iter()
669            .rfind(|m| m.role == MessageRole::Assistant);
670
671        let all_messages = self.conversation_store.get_conversation(agent_id).await?;
672        let tool_call_count = all_messages
673            .iter()
674            .filter(|m| m.role == MessageRole::Tool)
675            .count();
676
677        Ok(ToolResult::success(serde_json::json!({
678            "agentId": agent.id,
679            "name": agent.name,
680            "role": agent.role,
681            "status": agent.status,
682            "messageCount": message_count,
683            "toolCallCount": tool_call_count,
684            "lastResponse": last_response.map(|m| serde_json::json!({
685                "content": &m.content[..m.content.len().min(500)],
686                "timestamp": m.timestamp.to_rfc3339(),
687            })),
688            "activeTasks": tasks.iter()
689                .filter(|t| t.status == TaskStatus::InProgress)
690                .map(|t| serde_json::json!({ "id": t.id, "title": t.title }))
691                .collect::<Vec<_>>(),
692        })))
693    }
694}