1use serde::{Deserialize, Serialize};
20
21use crate::error::ServerError;
22use crate::events::{AgentEvent, AgentEventType, EventBus, EventSubscription};
23use crate::models::agent::{Agent, AgentRole, AgentStatus, ModelTier};
24use crate::models::message::{Message, MessageRole};
25use crate::models::task::{Task, TaskStatus};
26use crate::store::{AgentStore, ConversationStore, TaskStore};
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct ToolResult {
32 pub success: bool,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub data: Option<serde_json::Value>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub error: Option<String>,
37}
38
39impl ToolResult {
40 pub fn success(data: impl Serialize) -> Self {
41 Self {
42 success: true,
43 data: Some(serde_json::to_value(data).unwrap_or_default()),
44 error: None,
45 }
46 }
47
48 pub fn error(msg: impl Into<String>) -> Self {
49 Self {
50 success: false,
51 data: None,
52 error: Some(msg.into()),
53 }
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(rename_all = "camelCase")]
60pub struct CompletionReport {
61 pub agent_id: String,
62 #[serde(skip_serializing_if = "Option::is_none")]
63 pub task_id: Option<String>,
64 pub summary: String,
65 pub success: bool,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub files_modified: Option<Vec<String>>,
68}
69
70pub struct AgentTools {
72 agent_store: AgentStore,
73 conversation_store: ConversationStore,
74 task_store: TaskStore,
75 event_bus: EventBus,
76}
77
78impl AgentTools {
79 pub fn new(
80 agent_store: AgentStore,
81 conversation_store: ConversationStore,
82 task_store: TaskStore,
83 event_bus: EventBus,
84 ) -> Self {
85 Self {
86 agent_store,
87 conversation_store,
88 task_store,
89 event_bus,
90 }
91 }
92
93 pub async fn list_agents(&self, workspace_id: &str) -> Result<ToolResult, ServerError> {
96 let agents = self.agent_store.list_by_workspace(workspace_id).await?;
97 let summary: Vec<serde_json::Value> = agents
98 .iter()
99 .map(|a| {
100 serde_json::json!({
101 "id": a.id,
102 "name": a.name,
103 "role": a.role,
104 "status": a.status,
105 "parentId": a.parent_id,
106 })
107 })
108 .collect();
109 Ok(ToolResult::success(summary))
110 }
111
112 pub async fn read_agent_conversation(
115 &self,
116 agent_id: &str,
117 last_n: Option<usize>,
118 start_turn: Option<i32>,
119 end_turn: Option<i32>,
120 include_tool_calls: bool,
121 ) -> Result<ToolResult, ServerError> {
122 let agent = self.agent_store.get(agent_id).await?;
123 let agent = match agent {
124 Some(a) => a,
125 None => return Ok(ToolResult::error(format!("Agent not found: {agent_id}"))),
126 };
127
128 let mut messages = if let Some(n) = last_n {
129 self.conversation_store.get_last_n(agent_id, n).await?
130 } else if let (Some(start), Some(end)) = (start_turn, end_turn) {
131 self.conversation_store
132 .get_by_turn_range(agent_id, start, end)
133 .await?
134 } else {
135 self.conversation_store.get_conversation(agent_id).await?
136 };
137
138 if !include_tool_calls {
139 messages.retain(|m| m.role != MessageRole::Tool);
140 }
141
142 Ok(ToolResult::success(serde_json::json!({
143 "agentId": agent_id,
144 "agentName": agent.name,
145 "messageCount": messages.len(),
146 "messages": messages.iter().map(|m| serde_json::json!({
147 "role": m.role,
148 "content": m.content,
149 "turn": m.turn,
150 "toolName": m.tool_name,
151 "timestamp": m.timestamp.to_rfc3339(),
152 })).collect::<Vec<_>>(),
153 })))
154 }
155
156 pub async fn create_agent(
159 &self,
160 name: &str,
161 role: &str,
162 workspace_id: &str,
163 parent_id: Option<&str>,
164 model_tier: Option<&str>,
165 ) -> Result<ToolResult, ServerError> {
166 let role = match AgentRole::from_str(role) {
167 Some(r) => r,
168 None => {
169 return Ok(ToolResult::error(format!(
170 "Invalid role: {role}. Must be one of: ROUTA, CRAFTER, GATE, DEVELOPER"
171 )))
172 }
173 };
174
175 let model_tier = model_tier
176 .and_then(ModelTier::from_str)
177 .unwrap_or(ModelTier::Smart);
178
179 let agent = Agent::new(
180 uuid::Uuid::new_v4().to_string(),
181 name.to_string(),
182 role.clone(),
183 workspace_id.to_string(),
184 parent_id.map(|s| s.to_string()),
185 Some(model_tier),
186 None,
187 );
188
189 self.agent_store.save(&agent).await?;
190
191 self.event_bus
192 .emit(AgentEvent {
193 event_type: AgentEventType::AgentCreated,
194 agent_id: agent.id.clone(),
195 workspace_id: workspace_id.to_string(),
196 data: serde_json::json!({ "name": agent.name, "role": agent.role }),
197 timestamp: chrono::Utc::now(),
198 })
199 .await;
200
201 Ok(ToolResult::success(serde_json::json!({
202 "agentId": agent.id,
203 "name": agent.name,
204 "role": agent.role,
205 "status": agent.status,
206 })))
207 }
208
209 pub async fn delegate(
212 &self,
213 agent_id: &str,
214 task_id: &str,
215 caller_agent_id: &str,
216 ) -> Result<ToolResult, ServerError> {
217 let agent = match self.agent_store.get(agent_id).await? {
218 Some(a) => a,
219 None => return Ok(ToolResult::error(format!("Agent not found: {agent_id}"))),
220 };
221
222 let mut task = match self.task_store.get(task_id).await? {
223 Some(t) => t,
224 None => return Ok(ToolResult::error(format!("Task not found: {task_id}"))),
225 };
226
227 task.assigned_to = Some(agent_id.to_string());
229 task.status = TaskStatus::InProgress;
230 task.updated_at = chrono::Utc::now();
231 self.task_store.save(&task).await?;
232
233 self.agent_store
234 .update_status(agent_id, &AgentStatus::Active)
235 .await?;
236
237 let message = Message::new(
239 uuid::Uuid::new_v4().to_string(),
240 agent_id.to_string(),
241 MessageRole::User,
242 format!(
243 "Task delegated: {}\nObjective: {}",
244 task.title, task.objective
245 ),
246 None,
247 None,
248 None,
249 );
250 self.conversation_store.append(&message).await?;
251
252 self.event_bus
253 .emit(AgentEvent {
254 event_type: AgentEventType::TaskAssigned,
255 agent_id: agent_id.to_string(),
256 workspace_id: agent.workspace_id.clone(),
257 data: serde_json::json!({
258 "taskId": task_id,
259 "callerAgentId": caller_agent_id,
260 "taskTitle": task.title,
261 }),
262 timestamp: chrono::Utc::now(),
263 })
264 .await;
265
266 Ok(ToolResult::success(serde_json::json!({
267 "agentId": agent_id,
268 "taskId": task_id,
269 "status": "delegated",
270 })))
271 }
272
273 pub async fn message_agent(
276 &self,
277 from_agent_id: &str,
278 to_agent_id: &str,
279 message: &str,
280 ) -> Result<ToolResult, ServerError> {
281 let to_agent = match self.agent_store.get(to_agent_id).await? {
282 Some(a) => a,
283 None => {
284 return Ok(ToolResult::error(format!(
285 "Target agent not found: {to_agent_id}"
286 )))
287 }
288 };
289
290 let msg = Message::new(
291 uuid::Uuid::new_v4().to_string(),
292 to_agent_id.to_string(),
293 MessageRole::User,
294 format!("[From agent {from_agent_id}]: {message}"),
295 None,
296 None,
297 None,
298 );
299 self.conversation_store.append(&msg).await?;
300
301 self.event_bus
302 .emit(AgentEvent {
303 event_type: AgentEventType::MessageSent,
304 agent_id: from_agent_id.to_string(),
305 workspace_id: to_agent.workspace_id.clone(),
306 data: serde_json::json!({
307 "fromAgentId": from_agent_id,
308 "toAgentId": to_agent_id,
309 "messagePreview": &message[..message.len().min(200)],
310 }),
311 timestamp: chrono::Utc::now(),
312 })
313 .await;
314
315 Ok(ToolResult::success(serde_json::json!({
316 "delivered": true,
317 "toAgentId": to_agent_id,
318 "fromAgentId": from_agent_id,
319 })))
320 }
321
322 pub async fn report_to_parent(
325 &self,
326 agent_id: &str,
327 report: CompletionReport,
328 ) -> Result<ToolResult, ServerError> {
329 let agent = match self.agent_store.get(agent_id).await? {
330 Some(a) => a,
331 None => return Ok(ToolResult::error(format!("Agent not found: {agent_id}"))),
332 };
333
334 let parent_id = match &agent.parent_id {
335 Some(p) => p.clone(),
336 None => {
337 return Ok(ToolResult::error(format!(
338 "Agent {agent_id} has no parent to report to"
339 )))
340 }
341 };
342
343 if let Some(task_id) = &report.task_id {
345 if let Some(mut task) = self.task_store.get(task_id).await? {
346 task.status = if report.success {
347 TaskStatus::Completed
348 } else {
349 TaskStatus::NeedsFix
350 };
351 task.completion_summary = Some(report.summary.clone());
352 task.updated_at = chrono::Utc::now();
353 self.task_store.save(&task).await?;
354 }
355 }
356
357 self.agent_store
359 .update_status(agent_id, &AgentStatus::Completed)
360 .await?;
361
362 let content = format!(
364 "[Completion Report from {} ({})]\nTask: {:?}\nSuccess: {}\nSummary: {}\n{}",
365 agent.name,
366 agent_id,
367 report.task_id,
368 report.success,
369 report.summary,
370 report
371 .files_modified
372 .as_ref()
373 .map(|f| format!("Files Modified: {}", f.join(", ")))
374 .unwrap_or_default()
375 );
376
377 let msg = Message::new(
378 uuid::Uuid::new_v4().to_string(),
379 parent_id.clone(),
380 MessageRole::User,
381 content,
382 None,
383 None,
384 None,
385 );
386 self.conversation_store.append(&msg).await?;
387
388 self.event_bus
389 .emit(AgentEvent {
390 event_type: AgentEventType::ReportSubmitted,
391 agent_id: agent_id.to_string(),
392 workspace_id: agent.workspace_id.clone(),
393 data: serde_json::json!({
394 "parentId": parent_id,
395 "taskId": report.task_id,
396 "success": report.success,
397 }),
398 timestamp: chrono::Utc::now(),
399 })
400 .await;
401
402 Ok(ToolResult::success(serde_json::json!({
403 "reported": true,
404 "parentId": parent_id,
405 "success": report.success,
406 })))
407 }
408
409 #[allow(clippy::too_many_arguments)]
411 pub async fn create_task(
412 &self,
413 title: &str,
414 objective: &str,
415 workspace_id: &str,
416 session_id: Option<&str>,
417 scope: Option<&str>,
418 acceptance_criteria: Option<Vec<String>>,
419 verification_commands: Option<Vec<String>>,
420 test_cases: Option<Vec<String>>,
421 dependencies: Option<Vec<String>>,
422 parallel_group: Option<&str>,
423 ) -> Result<ToolResult, ServerError> {
424 let task = Task::new(
425 uuid::Uuid::new_v4().to_string(),
426 title.to_string(),
427 objective.to_string(),
428 workspace_id.to_string(),
429 session_id.map(|s| s.to_string()),
430 scope.map(|s| s.to_string()),
431 acceptance_criteria,
432 verification_commands,
433 test_cases,
434 dependencies,
435 parallel_group.map(|s| s.to_string()),
436 );
437
438 self.task_store.save(&task).await?;
439
440 Ok(ToolResult::success(serde_json::json!({
441 "taskId": task.id,
442 "title": task.title,
443 "status": task.status,
444 })))
445 }
446
447 pub async fn get_task(&self, task_id: &str) -> Result<ToolResult, ServerError> {
450 match self.task_store.get(task_id).await? {
451 Some(task) => Ok(ToolResult::success(task)),
452 None => Ok(ToolResult::error(format!("Task not found: {task_id}"))),
453 }
454 }
455
456 pub async fn list_tasks(&self, workspace_id: &str) -> Result<ToolResult, ServerError> {
459 let tasks = self.task_store.list_by_workspace(workspace_id).await?;
460 let summary: Vec<serde_json::Value> = tasks
461 .iter()
462 .map(|t| {
463 serde_json::json!({
464 "id": t.id,
465 "title": t.title,
466 "status": t.status,
467 "assignedTo": t.assigned_to,
468 "verificationVerdict": t.verification_verdict,
469 })
470 })
471 .collect();
472 Ok(ToolResult::success(summary))
473 }
474
475 pub async fn update_task_status(
478 &self,
479 task_id: &str,
480 status: &str,
481 agent_id: &str,
482 summary: Option<&str>,
483 ) -> Result<ToolResult, ServerError> {
484 let new_status = match TaskStatus::from_str(status) {
485 Some(s) => s,
486 None => {
487 return Ok(ToolResult::error(format!(
488 "Invalid status: {status}. Must be one of: PENDING, IN_PROGRESS, REVIEW_REQUIRED, COMPLETED, NEEDS_FIX, BLOCKED, CANCELLED"
489 )))
490 }
491 };
492
493 let mut task = match self.task_store.get(task_id).await? {
494 Some(t) => t,
495 None => return Ok(ToolResult::error(format!("Task not found: {task_id}"))),
496 };
497
498 let old_status = task.status.clone();
499 task.status = new_status.clone();
500 if let Some(s) = summary {
501 task.completion_summary = Some(s.to_string());
502 }
503 task.updated_at = chrono::Utc::now();
504 self.task_store.save(&task).await?;
505
506 self.event_bus
508 .emit(AgentEvent {
509 event_type: AgentEventType::TaskStatusChanged,
510 agent_id: agent_id.to_string(),
511 workspace_id: task.workspace_id.clone(),
512 data: serde_json::json!({
513 "taskId": task_id,
514 "oldStatus": old_status,
515 "newStatus": new_status,
516 "summary": summary,
517 }),
518 timestamp: chrono::Utc::now(),
519 })
520 .await;
521
522 if new_status == TaskStatus::Completed {
524 self.event_bus
525 .emit(AgentEvent {
526 event_type: AgentEventType::TaskCompleted,
527 agent_id: agent_id.to_string(),
528 workspace_id: task.workspace_id.clone(),
529 data: serde_json::json!({
530 "taskId": task_id,
531 "taskTitle": task.title,
532 "summary": summary,
533 }),
534 timestamp: chrono::Utc::now(),
535 })
536 .await;
537 }
538
539 Ok(ToolResult::success(serde_json::json!({
540 "taskId": task_id,
541 "oldStatus": old_status,
542 "newStatus": new_status,
543 "updatedAt": task.updated_at.to_rfc3339(),
544 })))
545 }
546
547 #[allow(clippy::too_many_arguments)]
550 pub async fn subscribe_to_events(
551 &self,
552 agent_id: &str,
553 agent_name: &str,
554 event_types: Vec<String>,
555 exclude_self: bool,
556 one_shot: bool,
557 wait_group_id: Option<String>,
558 priority: i32,
559 ) -> Result<ToolResult, ServerError> {
560 let valid_types: Vec<AgentEventType> = event_types
561 .iter()
562 .filter_map(|t| AgentEventType::from_str(t))
563 .collect();
564
565 if valid_types.is_empty() {
566 return Ok(ToolResult::error(format!(
567 "No valid event types. Available: {}",
568 EventBus::all_event_types().join(", ")
569 )));
570 }
571
572 let subscription_id = uuid::Uuid::new_v4().to_string();
573 self.event_bus
574 .subscribe(EventSubscription {
575 id: subscription_id.clone(),
576 agent_id: agent_id.to_string(),
577 agent_name: agent_name.to_string(),
578 event_types: valid_types.clone(),
579 exclude_self,
580 one_shot,
581 wait_group_id: wait_group_id.clone(),
582 priority,
583 })
584 .await;
585
586 Ok(ToolResult::success(serde_json::json!({
587 "subscriptionId": subscription_id,
588 "eventTypes": valid_types,
589 "oneShot": one_shot,
590 "waitGroupId": wait_group_id,
591 "priority": priority,
592 })))
593 }
594
595 pub async fn unsubscribe_from_events(
598 &self,
599 subscription_id: &str,
600 ) -> Result<ToolResult, ServerError> {
601 let removed = self.event_bus.unsubscribe(subscription_id).await;
602 Ok(ToolResult::success(serde_json::json!({
603 "unsubscribed": removed,
604 "subscriptionId": subscription_id,
605 })))
606 }
607
608 pub async fn drain_pending_events(&self, agent_id: &str) -> Result<ToolResult, ServerError> {
611 let events = self.event_bus.drain_pending_events(agent_id).await;
612 let event_data: Vec<serde_json::Value> = events
613 .iter()
614 .map(|e| {
615 serde_json::json!({
616 "type": e.event_type,
617 "agentId": e.agent_id,
618 "data": e.data,
619 "timestamp": e.timestamp.to_rfc3339(),
620 })
621 })
622 .collect();
623 Ok(ToolResult::success(
624 serde_json::json!({ "events": event_data }),
625 ))
626 }
627
628 pub async fn get_agent_status(&self, agent_id: &str) -> Result<ToolResult, ServerError> {
631 let agent = match self.agent_store.get(agent_id).await? {
632 Some(a) => a,
633 None => return Ok(ToolResult::error(format!("Agent not found: {agent_id}"))),
634 };
635
636 let message_count = self.conversation_store.get_message_count(agent_id).await?;
637 let tasks = self.task_store.list_by_assignee(agent_id).await?;
638
639 Ok(ToolResult::success(serde_json::json!({
640 "agentId": agent.id,
641 "name": agent.name,
642 "role": agent.role,
643 "status": agent.status,
644 "modelTier": agent.model_tier,
645 "parentId": agent.parent_id,
646 "messageCount": message_count,
647 "tasks": tasks.iter().map(|t| serde_json::json!({
648 "id": t.id,
649 "title": t.title,
650 "status": t.status,
651 })).collect::<Vec<_>>(),
652 })))
653 }
654
655 pub async fn get_agent_summary(&self, agent_id: &str) -> Result<ToolResult, ServerError> {
658 let agent = match self.agent_store.get(agent_id).await? {
659 Some(a) => a,
660 None => return Ok(ToolResult::error(format!("Agent not found: {agent_id}"))),
661 };
662
663 let message_count = self.conversation_store.get_message_count(agent_id).await?;
664 let last_messages = self.conversation_store.get_last_n(agent_id, 3).await?;
665 let tasks = self.task_store.list_by_assignee(agent_id).await?;
666
667 let last_response = last_messages
668 .iter()
669 .rfind(|m| m.role == MessageRole::Assistant);
670
671 let all_messages = self.conversation_store.get_conversation(agent_id).await?;
672 let tool_call_count = all_messages
673 .iter()
674 .filter(|m| m.role == MessageRole::Tool)
675 .count();
676
677 Ok(ToolResult::success(serde_json::json!({
678 "agentId": agent.id,
679 "name": agent.name,
680 "role": agent.role,
681 "status": agent.status,
682 "messageCount": message_count,
683 "toolCallCount": tool_call_count,
684 "lastResponse": last_response.map(|m| serde_json::json!({
685 "content": &m.content[..m.content.len().min(500)],
686 "timestamp": m.timestamp.to_rfc3339(),
687 })),
688 "activeTasks": tasks.iter()
689 .filter(|t| t.status == TaskStatus::InProgress)
690 .map(|t| serde_json::json!({ "id": t.id, "title": t.title }))
691 .collect::<Vec<_>>(),
692 })))
693 }
694}