1use serde::{Deserialize, Serialize};
20
21use crate::error::ServerError;
22use crate::events::{AgentEvent, AgentEventType, EventBus, EventSubscription};
23use crate::models::agent::{Agent, AgentRole, AgentStatus, ModelTier};
24use crate::models::message::{Message, MessageRole};
25use crate::models::task::{Task, TaskStatus};
26use crate::store::{AgentStore, ConversationStore, TaskStore};
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct ToolResult {
32 pub success: bool,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub data: Option<serde_json::Value>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub error: Option<String>,
37}
38
39impl ToolResult {
40 pub fn success(data: impl Serialize) -> Self {
41 Self {
42 success: true,
43 data: Some(serde_json::to_value(data).unwrap_or_default()),
44 error: None,
45 }
46 }
47
48 pub fn error(msg: impl Into<String>) -> Self {
49 Self {
50 success: false,
51 data: None,
52 error: Some(msg.into()),
53 }
54 }
55}
56
57#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(rename_all = "camelCase")]
60pub struct CompletionReport {
61 pub agent_id: String,
62 #[serde(skip_serializing_if = "Option::is_none")]
63 pub task_id: Option<String>,
64 pub summary: String,
65 pub success: bool,
66 #[serde(skip_serializing_if = "Option::is_none")]
67 pub files_modified: Option<Vec<String>>,
68}
69
70pub struct AgentTools {
72 agent_store: AgentStore,
73 conversation_store: ConversationStore,
74 task_store: TaskStore,
75 event_bus: EventBus,
76}
77
78impl AgentTools {
79 pub fn new(
80 agent_store: AgentStore,
81 conversation_store: ConversationStore,
82 task_store: TaskStore,
83 event_bus: EventBus,
84 ) -> Self {
85 Self {
86 agent_store,
87 conversation_store,
88 task_store,
89 event_bus,
90 }
91 }
92
93 pub async fn list_agents(&self, workspace_id: &str) -> Result<ToolResult, ServerError> {
96 let agents = self.agent_store.list_by_workspace(workspace_id).await?;
97 let summary: Vec<serde_json::Value> = agents
98 .iter()
99 .map(|a| {
100 serde_json::json!({
101 "id": a.id,
102 "name": a.name,
103 "role": a.role,
104 "status": a.status,
105 "parentId": a.parent_id,
106 })
107 })
108 .collect();
109 Ok(ToolResult::success(summary))
110 }
111
112 pub async fn read_agent_conversation(
115 &self,
116 agent_id: &str,
117 last_n: Option<usize>,
118 start_turn: Option<i32>,
119 end_turn: Option<i32>,
120 include_tool_calls: bool,
121 ) -> Result<ToolResult, ServerError> {
122 let agent = self.agent_store.get(agent_id).await?;
123 let agent = match agent {
124 Some(a) => a,
125 None => return Ok(ToolResult::error(format!("Agent not found: {}", agent_id))),
126 };
127
128 let mut messages = if let Some(n) = last_n {
129 self.conversation_store.get_last_n(agent_id, n).await?
130 } else if let (Some(start), Some(end)) = (start_turn, end_turn) {
131 self.conversation_store
132 .get_by_turn_range(agent_id, start, end)
133 .await?
134 } else {
135 self.conversation_store.get_conversation(agent_id).await?
136 };
137
138 if !include_tool_calls {
139 messages.retain(|m| m.role != MessageRole::Tool);
140 }
141
142 Ok(ToolResult::success(serde_json::json!({
143 "agentId": agent_id,
144 "agentName": agent.name,
145 "messageCount": messages.len(),
146 "messages": messages.iter().map(|m| serde_json::json!({
147 "role": m.role,
148 "content": m.content,
149 "turn": m.turn,
150 "toolName": m.tool_name,
151 "timestamp": m.timestamp.to_rfc3339(),
152 })).collect::<Vec<_>>(),
153 })))
154 }
155
156 pub async fn create_agent(
159 &self,
160 name: &str,
161 role: &str,
162 workspace_id: &str,
163 parent_id: Option<&str>,
164 model_tier: Option<&str>,
165 ) -> Result<ToolResult, ServerError> {
166 let role = match AgentRole::from_str(role) {
167 Some(r) => r,
168 None => {
169 return Ok(ToolResult::error(format!(
170 "Invalid role: {}. Must be one of: ROUTA, CRAFTER, GATE, DEVELOPER",
171 role
172 )))
173 }
174 };
175
176 let model_tier = model_tier
177 .and_then(ModelTier::from_str)
178 .unwrap_or(ModelTier::Smart);
179
180 let agent = Agent::new(
181 uuid::Uuid::new_v4().to_string(),
182 name.to_string(),
183 role.clone(),
184 workspace_id.to_string(),
185 parent_id.map(|s| s.to_string()),
186 Some(model_tier),
187 None,
188 );
189
190 self.agent_store.save(&agent).await?;
191
192 self.event_bus
193 .emit(AgentEvent {
194 event_type: AgentEventType::AgentCreated,
195 agent_id: agent.id.clone(),
196 workspace_id: workspace_id.to_string(),
197 data: serde_json::json!({ "name": agent.name, "role": agent.role }),
198 timestamp: chrono::Utc::now(),
199 })
200 .await;
201
202 Ok(ToolResult::success(serde_json::json!({
203 "agentId": agent.id,
204 "name": agent.name,
205 "role": agent.role,
206 "status": agent.status,
207 })))
208 }
209
210 pub async fn delegate(
213 &self,
214 agent_id: &str,
215 task_id: &str,
216 caller_agent_id: &str,
217 ) -> Result<ToolResult, ServerError> {
218 let agent = match self.agent_store.get(agent_id).await? {
219 Some(a) => a,
220 None => return Ok(ToolResult::error(format!("Agent not found: {}", agent_id))),
221 };
222
223 let mut task = match self.task_store.get(task_id).await? {
224 Some(t) => t,
225 None => return Ok(ToolResult::error(format!("Task not found: {}", task_id))),
226 };
227
228 task.assigned_to = Some(agent_id.to_string());
230 task.status = TaskStatus::InProgress;
231 task.updated_at = chrono::Utc::now();
232 self.task_store.save(&task).await?;
233
234 self.agent_store
235 .update_status(agent_id, &AgentStatus::Active)
236 .await?;
237
238 let message = Message::new(
240 uuid::Uuid::new_v4().to_string(),
241 agent_id.to_string(),
242 MessageRole::User,
243 format!(
244 "Task delegated: {}\nObjective: {}",
245 task.title, task.objective
246 ),
247 None,
248 None,
249 None,
250 );
251 self.conversation_store.append(&message).await?;
252
253 self.event_bus
254 .emit(AgentEvent {
255 event_type: AgentEventType::TaskAssigned,
256 agent_id: agent_id.to_string(),
257 workspace_id: agent.workspace_id.clone(),
258 data: serde_json::json!({
259 "taskId": task_id,
260 "callerAgentId": caller_agent_id,
261 "taskTitle": task.title,
262 }),
263 timestamp: chrono::Utc::now(),
264 })
265 .await;
266
267 Ok(ToolResult::success(serde_json::json!({
268 "agentId": agent_id,
269 "taskId": task_id,
270 "status": "delegated",
271 })))
272 }
273
274 pub async fn message_agent(
277 &self,
278 from_agent_id: &str,
279 to_agent_id: &str,
280 message: &str,
281 ) -> Result<ToolResult, ServerError> {
282 let to_agent = match self.agent_store.get(to_agent_id).await? {
283 Some(a) => a,
284 None => {
285 return Ok(ToolResult::error(format!(
286 "Target agent not found: {}",
287 to_agent_id
288 )))
289 }
290 };
291
292 let msg = Message::new(
293 uuid::Uuid::new_v4().to_string(),
294 to_agent_id.to_string(),
295 MessageRole::User,
296 format!("[From agent {}]: {}", from_agent_id, message),
297 None,
298 None,
299 None,
300 );
301 self.conversation_store.append(&msg).await?;
302
303 self.event_bus
304 .emit(AgentEvent {
305 event_type: AgentEventType::MessageSent,
306 agent_id: from_agent_id.to_string(),
307 workspace_id: to_agent.workspace_id.clone(),
308 data: serde_json::json!({
309 "fromAgentId": from_agent_id,
310 "toAgentId": to_agent_id,
311 "messagePreview": &message[..message.len().min(200)],
312 }),
313 timestamp: chrono::Utc::now(),
314 })
315 .await;
316
317 Ok(ToolResult::success(serde_json::json!({
318 "delivered": true,
319 "toAgentId": to_agent_id,
320 "fromAgentId": from_agent_id,
321 })))
322 }
323
324 pub async fn report_to_parent(
327 &self,
328 agent_id: &str,
329 report: CompletionReport,
330 ) -> Result<ToolResult, ServerError> {
331 let agent = match self.agent_store.get(agent_id).await? {
332 Some(a) => a,
333 None => return Ok(ToolResult::error(format!("Agent not found: {}", agent_id))),
334 };
335
336 let parent_id = match &agent.parent_id {
337 Some(p) => p.clone(),
338 None => {
339 return Ok(ToolResult::error(format!(
340 "Agent {} has no parent to report to",
341 agent_id
342 )))
343 }
344 };
345
346 if let Some(task_id) = &report.task_id {
348 if let Some(mut task) = self.task_store.get(task_id).await? {
349 task.status = if report.success {
350 TaskStatus::Completed
351 } else {
352 TaskStatus::NeedsFix
353 };
354 task.completion_summary = Some(report.summary.clone());
355 task.updated_at = chrono::Utc::now();
356 self.task_store.save(&task).await?;
357 }
358 }
359
360 self.agent_store
362 .update_status(agent_id, &AgentStatus::Completed)
363 .await?;
364
365 let content = format!(
367 "[Completion Report from {} ({})]\nTask: {:?}\nSuccess: {}\nSummary: {}\n{}",
368 agent.name,
369 agent_id,
370 report.task_id,
371 report.success,
372 report.summary,
373 report
374 .files_modified
375 .as_ref()
376 .map(|f| format!("Files Modified: {}", f.join(", ")))
377 .unwrap_or_default()
378 );
379
380 let msg = Message::new(
381 uuid::Uuid::new_v4().to_string(),
382 parent_id.clone(),
383 MessageRole::User,
384 content,
385 None,
386 None,
387 None,
388 );
389 self.conversation_store.append(&msg).await?;
390
391 self.event_bus
392 .emit(AgentEvent {
393 event_type: AgentEventType::ReportSubmitted,
394 agent_id: agent_id.to_string(),
395 workspace_id: agent.workspace_id.clone(),
396 data: serde_json::json!({
397 "parentId": parent_id,
398 "taskId": report.task_id,
399 "success": report.success,
400 }),
401 timestamp: chrono::Utc::now(),
402 })
403 .await;
404
405 Ok(ToolResult::success(serde_json::json!({
406 "reported": true,
407 "parentId": parent_id,
408 "success": report.success,
409 })))
410 }
411
412 #[allow(clippy::too_many_arguments)]
414 pub async fn create_task(
415 &self,
416 title: &str,
417 objective: &str,
418 workspace_id: &str,
419 session_id: Option<&str>,
420 scope: Option<&str>,
421 acceptance_criteria: Option<Vec<String>>,
422 verification_commands: Option<Vec<String>>,
423 test_cases: Option<Vec<String>>,
424 dependencies: Option<Vec<String>>,
425 parallel_group: Option<&str>,
426 ) -> Result<ToolResult, ServerError> {
427 let task = Task::new(
428 uuid::Uuid::new_v4().to_string(),
429 title.to_string(),
430 objective.to_string(),
431 workspace_id.to_string(),
432 session_id.map(|s| s.to_string()),
433 scope.map(|s| s.to_string()),
434 acceptance_criteria,
435 verification_commands,
436 test_cases,
437 dependencies,
438 parallel_group.map(|s| s.to_string()),
439 );
440
441 self.task_store.save(&task).await?;
442
443 Ok(ToolResult::success(serde_json::json!({
444 "taskId": task.id,
445 "title": task.title,
446 "status": task.status,
447 })))
448 }
449
450 pub async fn get_task(&self, task_id: &str) -> Result<ToolResult, ServerError> {
453 match self.task_store.get(task_id).await? {
454 Some(task) => Ok(ToolResult::success(task)),
455 None => Ok(ToolResult::error(format!("Task not found: {}", task_id))),
456 }
457 }
458
459 pub async fn list_tasks(&self, workspace_id: &str) -> Result<ToolResult, ServerError> {
462 let tasks = self.task_store.list_by_workspace(workspace_id).await?;
463 let summary: Vec<serde_json::Value> = tasks
464 .iter()
465 .map(|t| {
466 serde_json::json!({
467 "id": t.id,
468 "title": t.title,
469 "status": t.status,
470 "assignedTo": t.assigned_to,
471 "verificationVerdict": t.verification_verdict,
472 })
473 })
474 .collect();
475 Ok(ToolResult::success(summary))
476 }
477
478 pub async fn update_task_status(
481 &self,
482 task_id: &str,
483 status: &str,
484 agent_id: &str,
485 summary: Option<&str>,
486 ) -> Result<ToolResult, ServerError> {
487 let new_status = match TaskStatus::from_str(status) {
488 Some(s) => s,
489 None => {
490 return Ok(ToolResult::error(format!(
491 "Invalid status: {}. Must be one of: PENDING, IN_PROGRESS, REVIEW_REQUIRED, COMPLETED, NEEDS_FIX, BLOCKED, CANCELLED",
492 status
493 )))
494 }
495 };
496
497 let mut task = match self.task_store.get(task_id).await? {
498 Some(t) => t,
499 None => return Ok(ToolResult::error(format!("Task not found: {}", task_id))),
500 };
501
502 let old_status = task.status.clone();
503 task.status = new_status.clone();
504 if let Some(s) = summary {
505 task.completion_summary = Some(s.to_string());
506 }
507 task.updated_at = chrono::Utc::now();
508 self.task_store.save(&task).await?;
509
510 self.event_bus
512 .emit(AgentEvent {
513 event_type: AgentEventType::TaskStatusChanged,
514 agent_id: agent_id.to_string(),
515 workspace_id: task.workspace_id.clone(),
516 data: serde_json::json!({
517 "taskId": task_id,
518 "oldStatus": old_status,
519 "newStatus": new_status,
520 "summary": summary,
521 }),
522 timestamp: chrono::Utc::now(),
523 })
524 .await;
525
526 if new_status == TaskStatus::Completed {
528 self.event_bus
529 .emit(AgentEvent {
530 event_type: AgentEventType::TaskCompleted,
531 agent_id: agent_id.to_string(),
532 workspace_id: task.workspace_id.clone(),
533 data: serde_json::json!({
534 "taskId": task_id,
535 "taskTitle": task.title,
536 "summary": summary,
537 }),
538 timestamp: chrono::Utc::now(),
539 })
540 .await;
541 }
542
543 Ok(ToolResult::success(serde_json::json!({
544 "taskId": task_id,
545 "oldStatus": old_status,
546 "newStatus": new_status,
547 "updatedAt": task.updated_at.to_rfc3339(),
548 })))
549 }
550
551 #[allow(clippy::too_many_arguments)]
554 pub async fn subscribe_to_events(
555 &self,
556 agent_id: &str,
557 agent_name: &str,
558 event_types: Vec<String>,
559 exclude_self: bool,
560 one_shot: bool,
561 wait_group_id: Option<String>,
562 priority: i32,
563 ) -> Result<ToolResult, ServerError> {
564 let valid_types: Vec<AgentEventType> = event_types
565 .iter()
566 .filter_map(|t| AgentEventType::from_str(t))
567 .collect();
568
569 if valid_types.is_empty() {
570 return Ok(ToolResult::error(format!(
571 "No valid event types. Available: {}",
572 EventBus::all_event_types().join(", ")
573 )));
574 }
575
576 let subscription_id = uuid::Uuid::new_v4().to_string();
577 self.event_bus
578 .subscribe(EventSubscription {
579 id: subscription_id.clone(),
580 agent_id: agent_id.to_string(),
581 agent_name: agent_name.to_string(),
582 event_types: valid_types.clone(),
583 exclude_self,
584 one_shot,
585 wait_group_id: wait_group_id.clone(),
586 priority,
587 })
588 .await;
589
590 Ok(ToolResult::success(serde_json::json!({
591 "subscriptionId": subscription_id,
592 "eventTypes": valid_types,
593 "oneShot": one_shot,
594 "waitGroupId": wait_group_id,
595 "priority": priority,
596 })))
597 }
598
599 pub async fn unsubscribe_from_events(
602 &self,
603 subscription_id: &str,
604 ) -> Result<ToolResult, ServerError> {
605 let removed = self.event_bus.unsubscribe(subscription_id).await;
606 Ok(ToolResult::success(serde_json::json!({
607 "unsubscribed": removed,
608 "subscriptionId": subscription_id,
609 })))
610 }
611
612 pub async fn drain_pending_events(&self, agent_id: &str) -> Result<ToolResult, ServerError> {
615 let events = self.event_bus.drain_pending_events(agent_id).await;
616 let event_data: Vec<serde_json::Value> = events
617 .iter()
618 .map(|e| {
619 serde_json::json!({
620 "type": e.event_type,
621 "agentId": e.agent_id,
622 "data": e.data,
623 "timestamp": e.timestamp.to_rfc3339(),
624 })
625 })
626 .collect();
627 Ok(ToolResult::success(
628 serde_json::json!({ "events": event_data }),
629 ))
630 }
631
632 pub async fn get_agent_status(&self, agent_id: &str) -> Result<ToolResult, ServerError> {
635 let agent = match self.agent_store.get(agent_id).await? {
636 Some(a) => a,
637 None => return Ok(ToolResult::error(format!("Agent not found: {}", agent_id))),
638 };
639
640 let message_count = self.conversation_store.get_message_count(agent_id).await?;
641 let tasks = self.task_store.list_by_assignee(agent_id).await?;
642
643 Ok(ToolResult::success(serde_json::json!({
644 "agentId": agent.id,
645 "name": agent.name,
646 "role": agent.role,
647 "status": agent.status,
648 "modelTier": agent.model_tier,
649 "parentId": agent.parent_id,
650 "messageCount": message_count,
651 "tasks": tasks.iter().map(|t| serde_json::json!({
652 "id": t.id,
653 "title": t.title,
654 "status": t.status,
655 })).collect::<Vec<_>>(),
656 })))
657 }
658
659 pub async fn get_agent_summary(&self, agent_id: &str) -> Result<ToolResult, ServerError> {
662 let agent = match self.agent_store.get(agent_id).await? {
663 Some(a) => a,
664 None => return Ok(ToolResult::error(format!("Agent not found: {}", agent_id))),
665 };
666
667 let message_count = self.conversation_store.get_message_count(agent_id).await?;
668 let last_messages = self.conversation_store.get_last_n(agent_id, 3).await?;
669 let tasks = self.task_store.list_by_assignee(agent_id).await?;
670
671 let last_response = last_messages
672 .iter()
673 .rfind(|m| m.role == MessageRole::Assistant);
674
675 let all_messages = self.conversation_store.get_conversation(agent_id).await?;
676 let tool_call_count = all_messages
677 .iter()
678 .filter(|m| m.role == MessageRole::Tool)
679 .count();
680
681 Ok(ToolResult::success(serde_json::json!({
682 "agentId": agent.id,
683 "name": agent.name,
684 "role": agent.role,
685 "status": agent.status,
686 "messageCount": message_count,
687 "toolCallCount": tool_call_count,
688 "lastResponse": last_response.map(|m| serde_json::json!({
689 "content": &m.content[..m.content.len().min(500)],
690 "timestamp": m.timestamp.to_rfc3339(),
691 })),
692 "activeTasks": tasks.iter()
693 .filter(|t| t.status == TaskStatus::InProgress)
694 .map(|t| serde_json::json!({ "id": t.id, "title": t.title }))
695 .collect::<Vec<_>>(),
696 })))
697 }
698}