Skip to main content

routa_core/tools/
mod.rs

1//! AgentTools - Core coordination tools for multi-agent collaboration.
2//!
3//! Port of the TypeScript AgentTools from src/core/tools/agent-tools.ts
4//!
5//! Provides coordination tools:
6//!   1. listAgents        - List agents in a workspace
7//!   2. readAgentConversation - Read another agent's conversation
8//!   3. createAgent       - Create ROUTA/CRAFTER/GATE agents
9//!   4. delegate          - Assign task to agent
10//!   5. messageAgent      - Inter-agent messaging
11//!   6. reportToParent    - Completion report to parent
12//!   7. createTask        - Create a new task
13//!   8. getTask           - Get task by ID
14//!   9. listTasks         - List tasks in workspace
15//!  10. updateTaskStatus  - Update task status
16//!  11. subscribeToEvents - Subscribe to workspace events
17//!  12. unsubscribeFromEvents - Unsubscribe
18
19use serde::{Deserialize, Serialize};
20
21use crate::error::ServerError;
22use crate::events::{AgentEvent, AgentEventType, EventBus, EventSubscription};
23use crate::models::agent::{Agent, AgentRole, AgentStatus, ModelTier};
24use crate::models::message::{Message, MessageRole};
25use crate::models::task::{Task, TaskStatus};
26use crate::store::{AgentStore, ConversationStore, TaskStore};
27
28/// Result of a tool operation.
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct ToolResult {
32    pub success: bool,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub data: Option<serde_json::Value>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub error: Option<String>,
37}
38
39impl ToolResult {
40    pub fn success(data: impl Serialize) -> Self {
41        Self {
42            success: true,
43            data: Some(serde_json::to_value(data).unwrap_or_default()),
44            error: None,
45        }
46    }
47
48    pub fn error(msg: impl Into<String>) -> Self {
49        Self {
50            success: false,
51            data: None,
52            error: Some(msg.into()),
53        }
54    }
55}
56
57/// Completion report from a child agent.
58#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(rename_all = "camelCase")]
60pub struct CompletionReport {
61    pub agent_id: String,
62    #[serde(skip_serializing_if = "Option::is_none")]
63    pub task_id: Option<String>,
64    pub summary: String,
65    pub success: bool,
66    #[serde(skip_serializing_if = "Option::is_none")]
67    pub files_modified: Option<Vec<String>>,
68}
69
70/// AgentTools provides coordination tools for multi-agent collaboration.
71pub struct AgentTools {
72    agent_store: AgentStore,
73    conversation_store: ConversationStore,
74    task_store: TaskStore,
75    event_bus: EventBus,
76}
77
78impl AgentTools {
79    pub fn new(
80        agent_store: AgentStore,
81        conversation_store: ConversationStore,
82        task_store: TaskStore,
83        event_bus: EventBus,
84    ) -> Self {
85        Self {
86            agent_store,
87            conversation_store,
88            task_store,
89            event_bus,
90        }
91    }
92
93    // ─── Tool 1: List Agents ─────────────────────────────────────────────
94
95    pub async fn list_agents(&self, workspace_id: &str) -> Result<ToolResult, ServerError> {
96        let agents = self.agent_store.list_by_workspace(workspace_id).await?;
97        let summary: Vec<serde_json::Value> = agents
98            .iter()
99            .map(|a| {
100                serde_json::json!({
101                    "id": a.id,
102                    "name": a.name,
103                    "role": a.role,
104                    "status": a.status,
105                    "parentId": a.parent_id,
106                })
107            })
108            .collect();
109        Ok(ToolResult::success(summary))
110    }
111
112    // ─── Tool 2: Read Agent Conversation ─────────────────────────────────
113
114    pub async fn read_agent_conversation(
115        &self,
116        agent_id: &str,
117        last_n: Option<usize>,
118        start_turn: Option<i32>,
119        end_turn: Option<i32>,
120        include_tool_calls: bool,
121    ) -> Result<ToolResult, ServerError> {
122        let agent = self.agent_store.get(agent_id).await?;
123        let agent = match agent {
124            Some(a) => a,
125            None => return Ok(ToolResult::error(format!("Agent not found: {}", agent_id))),
126        };
127
128        let mut messages = if let Some(n) = last_n {
129            self.conversation_store.get_last_n(agent_id, n).await?
130        } else if let (Some(start), Some(end)) = (start_turn, end_turn) {
131            self.conversation_store
132                .get_by_turn_range(agent_id, start, end)
133                .await?
134        } else {
135            self.conversation_store.get_conversation(agent_id).await?
136        };
137
138        if !include_tool_calls {
139            messages.retain(|m| m.role != MessageRole::Tool);
140        }
141
142        Ok(ToolResult::success(serde_json::json!({
143            "agentId": agent_id,
144            "agentName": agent.name,
145            "messageCount": messages.len(),
146            "messages": messages.iter().map(|m| serde_json::json!({
147                "role": m.role,
148                "content": m.content,
149                "turn": m.turn,
150                "toolName": m.tool_name,
151                "timestamp": m.timestamp.to_rfc3339(),
152            })).collect::<Vec<_>>(),
153        })))
154    }
155
156    // ─── Tool 3: Create Agent ────────────────────────────────────────────
157
158    pub async fn create_agent(
159        &self,
160        name: &str,
161        role: &str,
162        workspace_id: &str,
163        parent_id: Option<&str>,
164        model_tier: Option<&str>,
165    ) -> Result<ToolResult, ServerError> {
166        let role = match AgentRole::from_str(role) {
167            Some(r) => r,
168            None => {
169                return Ok(ToolResult::error(format!(
170                    "Invalid role: {}. Must be one of: ROUTA, CRAFTER, GATE, DEVELOPER",
171                    role
172                )))
173            }
174        };
175
176        let model_tier = model_tier
177            .and_then(ModelTier::from_str)
178            .unwrap_or(ModelTier::Smart);
179
180        let agent = Agent::new(
181            uuid::Uuid::new_v4().to_string(),
182            name.to_string(),
183            role.clone(),
184            workspace_id.to_string(),
185            parent_id.map(|s| s.to_string()),
186            Some(model_tier),
187            None,
188        );
189
190        self.agent_store.save(&agent).await?;
191
192        self.event_bus
193            .emit(AgentEvent {
194                event_type: AgentEventType::AgentCreated,
195                agent_id: agent.id.clone(),
196                workspace_id: workspace_id.to_string(),
197                data: serde_json::json!({ "name": agent.name, "role": agent.role }),
198                timestamp: chrono::Utc::now(),
199            })
200            .await;
201
202        Ok(ToolResult::success(serde_json::json!({
203            "agentId": agent.id,
204            "name": agent.name,
205            "role": agent.role,
206            "status": agent.status,
207        })))
208    }
209
210    // ─── Tool 4: Delegate Task ──────────────────────────────────────────
211
212    pub async fn delegate(
213        &self,
214        agent_id: &str,
215        task_id: &str,
216        caller_agent_id: &str,
217    ) -> Result<ToolResult, ServerError> {
218        let agent = match self.agent_store.get(agent_id).await? {
219            Some(a) => a,
220            None => return Ok(ToolResult::error(format!("Agent not found: {}", agent_id))),
221        };
222
223        let mut task = match self.task_store.get(task_id).await? {
224            Some(t) => t,
225            None => return Ok(ToolResult::error(format!("Task not found: {}", task_id))),
226        };
227
228        // Assign and activate
229        task.assigned_to = Some(agent_id.to_string());
230        task.status = TaskStatus::InProgress;
231        task.updated_at = chrono::Utc::now();
232        self.task_store.save(&task).await?;
233
234        self.agent_store
235            .update_status(agent_id, &AgentStatus::Active)
236            .await?;
237
238        // Record delegation as a conversation message
239        let message = Message::new(
240            uuid::Uuid::new_v4().to_string(),
241            agent_id.to_string(),
242            MessageRole::User,
243            format!(
244                "Task delegated: {}\nObjective: {}",
245                task.title, task.objective
246            ),
247            None,
248            None,
249            None,
250        );
251        self.conversation_store.append(&message).await?;
252
253        self.event_bus
254            .emit(AgentEvent {
255                event_type: AgentEventType::TaskAssigned,
256                agent_id: agent_id.to_string(),
257                workspace_id: agent.workspace_id.clone(),
258                data: serde_json::json!({
259                    "taskId": task_id,
260                    "callerAgentId": caller_agent_id,
261                    "taskTitle": task.title,
262                }),
263                timestamp: chrono::Utc::now(),
264            })
265            .await;
266
267        Ok(ToolResult::success(serde_json::json!({
268            "agentId": agent_id,
269            "taskId": task_id,
270            "status": "delegated",
271        })))
272    }
273
274    // ─── Tool 5: Message Agent ──────────────────────────────────────────
275
276    pub async fn message_agent(
277        &self,
278        from_agent_id: &str,
279        to_agent_id: &str,
280        message: &str,
281    ) -> Result<ToolResult, ServerError> {
282        let to_agent = match self.agent_store.get(to_agent_id).await? {
283            Some(a) => a,
284            None => {
285                return Ok(ToolResult::error(format!(
286                    "Target agent not found: {}",
287                    to_agent_id
288                )))
289            }
290        };
291
292        let msg = Message::new(
293            uuid::Uuid::new_v4().to_string(),
294            to_agent_id.to_string(),
295            MessageRole::User,
296            format!("[From agent {}]: {}", from_agent_id, message),
297            None,
298            None,
299            None,
300        );
301        self.conversation_store.append(&msg).await?;
302
303        self.event_bus
304            .emit(AgentEvent {
305                event_type: AgentEventType::MessageSent,
306                agent_id: from_agent_id.to_string(),
307                workspace_id: to_agent.workspace_id.clone(),
308                data: serde_json::json!({
309                    "fromAgentId": from_agent_id,
310                    "toAgentId": to_agent_id,
311                    "messagePreview": &message[..message.len().min(200)],
312                }),
313                timestamp: chrono::Utc::now(),
314            })
315            .await;
316
317        Ok(ToolResult::success(serde_json::json!({
318            "delivered": true,
319            "toAgentId": to_agent_id,
320            "fromAgentId": from_agent_id,
321        })))
322    }
323
324    // ─── Tool 6: Report to Parent ───────────────────────────────────────
325
326    pub async fn report_to_parent(
327        &self,
328        agent_id: &str,
329        report: CompletionReport,
330    ) -> Result<ToolResult, ServerError> {
331        let agent = match self.agent_store.get(agent_id).await? {
332            Some(a) => a,
333            None => return Ok(ToolResult::error(format!("Agent not found: {}", agent_id))),
334        };
335
336        let parent_id = match &agent.parent_id {
337            Some(p) => p.clone(),
338            None => {
339                return Ok(ToolResult::error(format!(
340                    "Agent {} has no parent to report to",
341                    agent_id
342                )))
343            }
344        };
345
346        // Update task status
347        if let Some(task_id) = &report.task_id {
348            if let Some(mut task) = self.task_store.get(task_id).await? {
349                task.status = if report.success {
350                    TaskStatus::Completed
351                } else {
352                    TaskStatus::NeedsFix
353                };
354                task.completion_summary = Some(report.summary.clone());
355                task.updated_at = chrono::Utc::now();
356                self.task_store.save(&task).await?;
357            }
358        }
359
360        // Mark agent completed
361        self.agent_store
362            .update_status(agent_id, &AgentStatus::Completed)
363            .await?;
364
365        // Deliver report as message to parent
366        let content = format!(
367            "[Completion Report from {} ({})]\nTask: {:?}\nSuccess: {}\nSummary: {}\n{}",
368            agent.name,
369            agent_id,
370            report.task_id,
371            report.success,
372            report.summary,
373            report
374                .files_modified
375                .as_ref()
376                .map(|f| format!("Files Modified: {}", f.join(", ")))
377                .unwrap_or_default()
378        );
379
380        let msg = Message::new(
381            uuid::Uuid::new_v4().to_string(),
382            parent_id.clone(),
383            MessageRole::User,
384            content,
385            None,
386            None,
387            None,
388        );
389        self.conversation_store.append(&msg).await?;
390
391        self.event_bus
392            .emit(AgentEvent {
393                event_type: AgentEventType::ReportSubmitted,
394                agent_id: agent_id.to_string(),
395                workspace_id: agent.workspace_id.clone(),
396                data: serde_json::json!({
397                    "parentId": parent_id,
398                    "taskId": report.task_id,
399                    "success": report.success,
400                }),
401                timestamp: chrono::Utc::now(),
402            })
403            .await;
404
405        Ok(ToolResult::success(serde_json::json!({
406            "reported": true,
407            "parentId": parent_id,
408            "success": report.success,
409        })))
410    }
411
412    // ─── Tool 7: Create Task ────────────────────────────────────────────
413    #[allow(clippy::too_many_arguments)]
414    pub async fn create_task(
415        &self,
416        title: &str,
417        objective: &str,
418        workspace_id: &str,
419        session_id: Option<&str>,
420        scope: Option<&str>,
421        acceptance_criteria: Option<Vec<String>>,
422        verification_commands: Option<Vec<String>>,
423        test_cases: Option<Vec<String>>,
424        dependencies: Option<Vec<String>>,
425        parallel_group: Option<&str>,
426    ) -> Result<ToolResult, ServerError> {
427        let task = Task::new(
428            uuid::Uuid::new_v4().to_string(),
429            title.to_string(),
430            objective.to_string(),
431            workspace_id.to_string(),
432            session_id.map(|s| s.to_string()),
433            scope.map(|s| s.to_string()),
434            acceptance_criteria,
435            verification_commands,
436            test_cases,
437            dependencies,
438            parallel_group.map(|s| s.to_string()),
439        );
440
441        self.task_store.save(&task).await?;
442
443        Ok(ToolResult::success(serde_json::json!({
444            "taskId": task.id,
445            "title": task.title,
446            "status": task.status,
447        })))
448    }
449
450    // ─── Tool 8: Get Task ─────────────────────────────────────────────────
451
452    pub async fn get_task(&self, task_id: &str) -> Result<ToolResult, ServerError> {
453        match self.task_store.get(task_id).await? {
454            Some(task) => Ok(ToolResult::success(task)),
455            None => Ok(ToolResult::error(format!("Task not found: {}", task_id))),
456        }
457    }
458
459    // ─── Tool 9: List Tasks ───────────────────────────────────────────────
460
461    pub async fn list_tasks(&self, workspace_id: &str) -> Result<ToolResult, ServerError> {
462        let tasks = self.task_store.list_by_workspace(workspace_id).await?;
463        let summary: Vec<serde_json::Value> = tasks
464            .iter()
465            .map(|t| {
466                serde_json::json!({
467                    "id": t.id,
468                    "title": t.title,
469                    "status": t.status,
470                    "assignedTo": t.assigned_to,
471                    "verificationVerdict": t.verification_verdict,
472                })
473            })
474            .collect();
475        Ok(ToolResult::success(summary))
476    }
477
478    // ─── Tool 10: Update Task Status ────────────────────────────────────
479
480    pub async fn update_task_status(
481        &self,
482        task_id: &str,
483        status: &str,
484        agent_id: &str,
485        summary: Option<&str>,
486    ) -> Result<ToolResult, ServerError> {
487        let new_status = match TaskStatus::from_str(status) {
488            Some(s) => s,
489            None => {
490                return Ok(ToolResult::error(format!(
491                    "Invalid status: {}. Must be one of: PENDING, IN_PROGRESS, REVIEW_REQUIRED, COMPLETED, NEEDS_FIX, BLOCKED, CANCELLED",
492                    status
493                )))
494            }
495        };
496
497        let mut task = match self.task_store.get(task_id).await? {
498            Some(t) => t,
499            None => return Ok(ToolResult::error(format!("Task not found: {}", task_id))),
500        };
501
502        let old_status = task.status.clone();
503        task.status = new_status.clone();
504        if let Some(s) = summary {
505            task.completion_summary = Some(s.to_string());
506        }
507        task.updated_at = chrono::Utc::now();
508        self.task_store.save(&task).await?;
509
510        // Emit status change event
511        self.event_bus
512            .emit(AgentEvent {
513                event_type: AgentEventType::TaskStatusChanged,
514                agent_id: agent_id.to_string(),
515                workspace_id: task.workspace_id.clone(),
516                data: serde_json::json!({
517                    "taskId": task_id,
518                    "oldStatus": old_status,
519                    "newStatus": new_status,
520                    "summary": summary,
521                }),
522                timestamp: chrono::Utc::now(),
523            })
524            .await;
525
526        // Also emit TASK_COMPLETED if applicable
527        if new_status == TaskStatus::Completed {
528            self.event_bus
529                .emit(AgentEvent {
530                    event_type: AgentEventType::TaskCompleted,
531                    agent_id: agent_id.to_string(),
532                    workspace_id: task.workspace_id.clone(),
533                    data: serde_json::json!({
534                        "taskId": task_id,
535                        "taskTitle": task.title,
536                        "summary": summary,
537                    }),
538                    timestamp: chrono::Utc::now(),
539                })
540                .await;
541        }
542
543        Ok(ToolResult::success(serde_json::json!({
544            "taskId": task_id,
545            "oldStatus": old_status,
546            "newStatus": new_status,
547            "updatedAt": task.updated_at.to_rfc3339(),
548        })))
549    }
550
551    // ─── Tool 11: Subscribe to Events ───────────────────────────────────
552
553    #[allow(clippy::too_many_arguments)]
554    pub async fn subscribe_to_events(
555        &self,
556        agent_id: &str,
557        agent_name: &str,
558        event_types: Vec<String>,
559        exclude_self: bool,
560        one_shot: bool,
561        wait_group_id: Option<String>,
562        priority: i32,
563    ) -> Result<ToolResult, ServerError> {
564        let valid_types: Vec<AgentEventType> = event_types
565            .iter()
566            .filter_map(|t| AgentEventType::from_str(t))
567            .collect();
568
569        if valid_types.is_empty() {
570            return Ok(ToolResult::error(format!(
571                "No valid event types. Available: {}",
572                EventBus::all_event_types().join(", ")
573            )));
574        }
575
576        let subscription_id = uuid::Uuid::new_v4().to_string();
577        self.event_bus
578            .subscribe(EventSubscription {
579                id: subscription_id.clone(),
580                agent_id: agent_id.to_string(),
581                agent_name: agent_name.to_string(),
582                event_types: valid_types.clone(),
583                exclude_self,
584                one_shot,
585                wait_group_id: wait_group_id.clone(),
586                priority,
587            })
588            .await;
589
590        Ok(ToolResult::success(serde_json::json!({
591            "subscriptionId": subscription_id,
592            "eventTypes": valid_types,
593            "oneShot": one_shot,
594            "waitGroupId": wait_group_id,
595            "priority": priority,
596        })))
597    }
598
599    // ─── Tool 12: Unsubscribe from Events ──────────────────────────────
600
601    pub async fn unsubscribe_from_events(
602        &self,
603        subscription_id: &str,
604    ) -> Result<ToolResult, ServerError> {
605        let removed = self.event_bus.unsubscribe(subscription_id).await;
606        Ok(ToolResult::success(serde_json::json!({
607            "unsubscribed": removed,
608            "subscriptionId": subscription_id,
609        })))
610    }
611
612    // ─── Internal: Drain Pending Events ─────────────────────────────────
613
614    pub async fn drain_pending_events(&self, agent_id: &str) -> Result<ToolResult, ServerError> {
615        let events = self.event_bus.drain_pending_events(agent_id).await;
616        let event_data: Vec<serde_json::Value> = events
617            .iter()
618            .map(|e| {
619                serde_json::json!({
620                    "type": e.event_type,
621                    "agentId": e.agent_id,
622                    "data": e.data,
623                    "timestamp": e.timestamp.to_rfc3339(),
624                })
625            })
626            .collect();
627        Ok(ToolResult::success(
628            serde_json::json!({ "events": event_data }),
629        ))
630    }
631
632    // ─── Tool: Get Agent Status ───────────────────────────────────────
633
634    pub async fn get_agent_status(&self, agent_id: &str) -> Result<ToolResult, ServerError> {
635        let agent = match self.agent_store.get(agent_id).await? {
636            Some(a) => a,
637            None => return Ok(ToolResult::error(format!("Agent not found: {}", agent_id))),
638        };
639
640        let message_count = self.conversation_store.get_message_count(agent_id).await?;
641        let tasks = self.task_store.list_by_assignee(agent_id).await?;
642
643        Ok(ToolResult::success(serde_json::json!({
644            "agentId": agent.id,
645            "name": agent.name,
646            "role": agent.role,
647            "status": agent.status,
648            "modelTier": agent.model_tier,
649            "parentId": agent.parent_id,
650            "messageCount": message_count,
651            "tasks": tasks.iter().map(|t| serde_json::json!({
652                "id": t.id,
653                "title": t.title,
654                "status": t.status,
655            })).collect::<Vec<_>>(),
656        })))
657    }
658
659    // ─── Tool: Get Agent Summary ─────────────────────────────────────
660
661    pub async fn get_agent_summary(&self, agent_id: &str) -> Result<ToolResult, ServerError> {
662        let agent = match self.agent_store.get(agent_id).await? {
663            Some(a) => a,
664            None => return Ok(ToolResult::error(format!("Agent not found: {}", agent_id))),
665        };
666
667        let message_count = self.conversation_store.get_message_count(agent_id).await?;
668        let last_messages = self.conversation_store.get_last_n(agent_id, 3).await?;
669        let tasks = self.task_store.list_by_assignee(agent_id).await?;
670
671        let last_response = last_messages
672            .iter()
673            .rfind(|m| m.role == MessageRole::Assistant);
674
675        let all_messages = self.conversation_store.get_conversation(agent_id).await?;
676        let tool_call_count = all_messages
677            .iter()
678            .filter(|m| m.role == MessageRole::Tool)
679            .count();
680
681        Ok(ToolResult::success(serde_json::json!({
682            "agentId": agent.id,
683            "name": agent.name,
684            "role": agent.role,
685            "status": agent.status,
686            "messageCount": message_count,
687            "toolCallCount": tool_call_count,
688            "lastResponse": last_response.map(|m| serde_json::json!({
689                "content": &m.content[..m.content.len().min(500)],
690                "timestamp": m.timestamp.to_rfc3339(),
691            })),
692            "activeTasks": tasks.iter()
693                .filter(|t| t.status == TaskStatus::InProgress)
694                .map(|t| serde_json::json!({ "id": t.id, "title": t.title }))
695                .collect::<Vec<_>>(),
696        })))
697    }
698}