1use super::{
6 Inbox, PermissionConfig, PermissionPolicy, PermissionRule, ProcessManager, SlotManager,
7 SpawnOptions,
8};
9use crate::db::MissionDB;
10use crate::types::{
11 CreateTaskInput, EventType, InboxMessage, Slot, SlotsConfig, Task, TaskStatus, TaskUpdate,
12};
13use anyhow::{anyhow, Result};
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use tokio::sync::RwLock;
17use tracing::{error, info};
18use uuid::Uuid;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum ExecutionMode {
23 Batch,
25 Pty,
27}
28
29impl ExecutionMode {
30 pub fn as_str(&self) -> &'static str {
31 match self {
32 ExecutionMode::Batch => "batch",
33 ExecutionMode::Pty => "pty",
34 }
35 }
36}
37
38pub struct MissionControlOptions {
40 pub db_path: PathBuf,
42 pub slots_config_path: PathBuf,
44 pub permission_config_path: Option<PathBuf>,
46 pub logs_dir: Option<PathBuf>,
48 pub default_mode: Option<ExecutionMode>,
50}
51
52pub struct MissionControl {
56 db: Arc<MissionDB>,
57 slot_manager: SlotManager,
58 process_manager: ProcessManager,
59 permission_policy: PermissionPolicy,
60 inbox: Inbox,
61 started: RwLock<bool>,
62 logs_dir: PathBuf,
63 default_mode: RwLock<ExecutionMode>,
64}
65
66impl MissionControl {
67 pub fn new(options: MissionControlOptions) -> Result<Self> {
69 let db = Arc::new(MissionDB::open(&options.db_path)?);
71
72 let logs_dir = options
74 .logs_dir
75 .unwrap_or_else(|| options.db_path.parent().unwrap().join("logs"));
76
77 let default_mode = options.default_mode.unwrap_or(ExecutionMode::Batch);
78
79 let slot_manager = SlotManager::new(Arc::clone(&db));
81 let process_manager = ProcessManager::new(Arc::clone(&db), logs_dir.clone());
82 let inbox = Inbox::new(Arc::clone(&db));
83
84 let permission_config_path = options.permission_config_path.unwrap_or_else(|| {
86 options
87 .db_path
88 .parent()
89 .unwrap()
90 .join("config")
91 .join("permissions.yaml")
92 });
93 let permission_policy = PermissionPolicy::new(&permission_config_path);
94
95 let mc = Self {
97 db,
98 slot_manager,
99 process_manager,
100 permission_policy,
101 inbox,
102 started: RwLock::new(false),
103 logs_dir,
104 default_mode: RwLock::new(default_mode),
105 };
106
107 mc.load_slots_config(&options.slots_config_path)?;
108
109 info!("MissionControl initialized");
110 Ok(mc)
111 }
112
113 fn load_slots_config(&self, config_path: &Path) -> Result<()> {
115 let content = std::fs::read_to_string(config_path)?;
116 let config: SlotsConfig = serde_yaml::from_str(&content)?;
117
118 self.slot_manager.load_slots(config.slots.clone());
120
121 for slot_config in &config.slots {
123 if let Some(slot) = self.slot_manager.get_slot(&slot_config.id) {
124 self.process_manager.init_slot(&slot);
125 }
126 }
127
128 info!(count = config.slots.len(), "Slots loaded");
129 Ok(())
130 }
131
132 pub async fn start(&self) -> Result<()> {
134 let mut started = self.started.write().await;
135 if *started {
136 return Ok(());
137 }
138 *started = true;
139
140 info!("MissionControl started");
141 Ok(())
142 }
143
144 pub async fn stop(&self) -> Result<()> {
146 let mut started = self.started.write().await;
147 if !*started {
148 return Ok(());
149 }
150 *started = false;
151
152 self.process_manager.shutdown().await;
154
155 info!("MissionControl stopped");
156 Ok(())
157 }
158
159 pub fn submit(&self, role: &str, prompt: &str) -> Result<String> {
163 let input = CreateTaskInput {
164 role: role.to_string(),
165 prompt: prompt.to_string(),
166 };
167 let task = self.create_task(input)?;
168 Ok(task.id)
169 }
170
171 pub async fn ask_expert(
173 &self,
174 role: &str,
175 question: &str,
176 _timeout_ms: u64,
177 ) -> Result<String> {
178 let input = CreateTaskInput {
179 role: role.to_string(),
180 prompt: question.to_string(),
181 };
182 let task = self.create_task(input)?;
183
184 self.process_task(&task).await
186 }
187
188 fn create_task(&self, input: CreateTaskInput) -> Result<Task> {
190 let now = chrono::Utc::now().timestamp_millis();
191 let task = Task {
192 id: Uuid::new_v4().to_string(),
193 role: input.role.clone(),
194 prompt: input.prompt.clone(),
195 status: TaskStatus::Queued,
196 slot_id: None,
197 session_id: None,
198 result: None,
199 error: None,
200 created_at: now,
201 started_at: None,
202 finished_at: None,
203 };
204
205 let _ = self.db.insert_task(&task);
206 let data = serde_json::json!({ "role": input.role });
207 let _ = self.db.insert_event(&task.id, EventType::TaskCreated, Some(&data), now);
208
209 info!(task_id = %task.id, role = %input.role, "Task created");
210 Ok(task)
211 }
212
213 async fn process_task(&self, task: &Task) -> Result<String> {
215 let slots = self.slot_manager.get_slots_by_role(&task.role);
217 if slots.is_empty() {
218 return Err(anyhow!("No slot found for role: {}", task.role));
219 }
220
221 let mut target_slot: Option<Slot> = None;
223 for slot in &slots {
224 if self.process_manager.is_available(&slot.config.id) {
225 target_slot = Some(slot.clone());
226 break;
227 }
228 }
229
230 if target_slot.is_none() {
232 for slot in &slots {
233 if let Some(status) = self.process_manager.get_status(&slot.config.id) {
234 if status.status == super::AgentStatus::Stopped {
235 self.process_manager
236 .spawn(slot, SpawnOptions::default())
237 .await?;
238 target_slot = Some(slot.clone());
239 break;
240 }
241 }
242 }
243 }
244
245 let target_slot =
246 target_slot.ok_or_else(|| anyhow!("No available slot for role: {}", task.role))?;
247
248 let now = chrono::Utc::now().timestamp_millis();
249
250 let _ = self.db.update_task(
252 &task.id,
253 &TaskUpdate {
254 status: Some(TaskStatus::Running),
255 slot_id: Some(target_slot.config.id.clone()),
256 started_at: Some(now),
257 ..Default::default()
258 },
259 );
260
261 let data = serde_json::json!({ "slotId": target_slot.config.id });
262 let _ = self.db.insert_event(&task.id, EventType::TaskStarted, Some(&data), now);
263
264 info!(task_id = %task.id, slot_id = %target_slot.config.id, "Task started");
265
266 match self.process_manager.execute_task(&target_slot, task).await {
268 Ok(result) => {
269 let now = chrono::Utc::now().timestamp_millis();
270
271 let _ = self.db.update_task(
273 &task.id,
274 &TaskUpdate {
275 status: Some(TaskStatus::Done),
276 session_id: Some(result.session_id.clone()),
277 result: Some(result.result.clone()),
278 finished_at: Some(now),
279 ..Default::default()
280 },
281 );
282
283 let data = serde_json::json!({ "resultLength": result.result.len() });
284 let _ = self.db.insert_event(&task.id, EventType::TaskDone, Some(&data), now);
285
286 self.inbox.add_message(&task.id, &task.role, &result.result);
288
289 info!(task_id = %task.id, "Task completed");
290 Ok(result.result)
291 }
292 Err(e) => {
293 let error_msg = e.to_string();
294 let now = chrono::Utc::now().timestamp_millis();
295
296 let _ = self.db.update_task(
297 &task.id,
298 &TaskUpdate {
299 status: Some(TaskStatus::Failed),
300 error: Some(error_msg.clone()),
301 finished_at: Some(now),
302 ..Default::default()
303 },
304 );
305
306 let data = serde_json::json!({ "error": error_msg });
307 let _ = self.db.insert_event(&task.id, EventType::TaskFailed, Some(&data), now);
308
309 error!(task_id = %task.id, error = %error_msg, "Task failed");
310 Err(e)
311 }
312 }
313 }
314
315 pub fn get_status(&self, task_id: &str) -> Option<Task> {
317 self.db.get_task(task_id).ok().flatten()
318 }
319
320 pub async fn cancel(&self, task_id: &str) -> Result<bool> {
322 let task = match self.db.get_task(task_id).ok().flatten() {
323 Some(t) => t,
324 None => return Ok(false),
325 };
326
327 let now = chrono::Utc::now().timestamp_millis();
328
329 if task.status == TaskStatus::Queued {
330 let _ = self.db.update_task(
331 task_id,
332 &TaskUpdate {
333 status: Some(TaskStatus::Cancelled),
334 finished_at: Some(now),
335 ..Default::default()
336 },
337 );
338 return Ok(true);
339 }
340
341 if task.status == TaskStatus::Running {
342 if let Some(slot_id) = &task.slot_id {
343 self.process_manager.kill(slot_id).await?;
344 let _ = self.db.update_task(
345 task_id,
346 &TaskUpdate {
347 status: Some(TaskStatus::Cancelled),
348 finished_at: Some(now),
349 ..Default::default()
350 },
351 );
352 return Ok(true);
353 }
354 }
355
356 Ok(false)
357 }
358
359 pub async fn spawn_agent(
363 &self,
364 slot_id: &str,
365 options: Option<SpawnOptions>,
366 ) -> Result<super::AgentProcess> {
367 let slot = self
368 .slot_manager
369 .get_slot(slot_id)
370 .ok_or_else(|| anyhow!("Slot not found: {}", slot_id))?;
371 self.process_manager
372 .spawn(&slot, options.unwrap_or_default())
373 .await
374 }
375
376 pub async fn kill_agent(&self, slot_id: &str) -> Result<()> {
378 self.process_manager.kill(slot_id).await
379 }
380
381 pub async fn restart_agent(
383 &self,
384 slot_id: &str,
385 options: Option<SpawnOptions>,
386 ) -> Result<super::AgentProcess> {
387 let slot = self
388 .slot_manager
389 .get_slot(slot_id)
390 .ok_or_else(|| anyhow!("Slot not found: {}", slot_id))?;
391 self.process_manager
392 .restart(&slot, options.unwrap_or_default())
393 .await
394 }
395
396 pub fn get_agents(&self) -> Vec<super::AgentProcess> {
398 self.process_manager.get_all_status()
399 }
400
401 pub fn get_agent(&self, slot_id: &str) -> Option<super::AgentProcess> {
403 self.process_manager.get_status(slot_id)
404 }
405
406 pub fn get_inbox(&self, unread_only: bool, limit: usize) -> Vec<InboxMessage> {
410 self.inbox.get_messages(unread_only, limit)
411 }
412
413 pub fn mark_inbox_read(&self, message_id: &str) {
415 self.inbox.mark_read(message_id);
416 }
417
418 pub fn list_slots(&self) -> Vec<Slot> {
422 self.slot_manager.get_all_slots()
423 }
424
425 pub fn reset_slot_session(&self, slot_id: &str) {
427 self.slot_manager.reset_session(slot_id);
428 }
429
430 pub fn get_stats(&self) -> MissionStats {
434 let process_stats = self.process_manager.get_stats();
435 let slot_stats = self.slot_manager.get_stats();
436
437 MissionStats {
438 tasks: TaskStats {
439 queued: self
440 .db
441 .get_tasks_by_status(TaskStatus::Queued)
442 .map(|v| v.len())
443 .unwrap_or(0),
444 running: self
445 .db
446 .get_tasks_by_status(TaskStatus::Running)
447 .map(|v| v.len())
448 .unwrap_or(0),
449 done: self
450 .db
451 .get_tasks_by_status(TaskStatus::Done)
452 .map(|v| v.len())
453 .unwrap_or(0),
454 failed: self
455 .db
456 .get_tasks_by_status(TaskStatus::Failed)
457 .map(|v| v.len())
458 .unwrap_or(0),
459 },
460 agents: AgentStats {
461 total: process_stats.total,
462 stopped: process_stats.stopped,
463 idle: process_stats.idle,
464 busy: process_stats.busy,
465 },
466 slots: SlotStats {
467 total: slot_stats.total,
468 by_role: slot_stats.by_role,
469 },
470 inbox: InboxStats {
471 unread: self.inbox.get_unread_count(),
472 },
473 }
474 }
475
476 pub async fn get_default_mode(&self) -> ExecutionMode {
478 *self.default_mode.read().await
479 }
480
481 pub async fn set_default_mode(&self, mode: ExecutionMode) {
483 *self.default_mode.write().await = mode;
484 info!(mode = %mode.as_str(), "Default execution mode changed");
485 }
486
487 pub fn get_permission_config(&self) -> PermissionConfig {
491 self.permission_policy.get_config()
492 }
493
494 pub fn set_role_permission(&self, role: &str, rule: PermissionRule) {
496 self.permission_policy.set_role_rule(role, rule);
497 info!(role = %role, "Role permission updated");
498 }
499
500 pub fn set_slot_permission(&self, slot_id: &str, rule: PermissionRule) {
502 self.permission_policy.set_slot_rule(slot_id, rule);
503 info!(slot_id = %slot_id, "Slot permission updated");
504 }
505
506 pub fn add_role_auto_allow(&self, role: &str, pattern: &str) {
508 self.permission_policy.add_role_auto_allow(role, pattern);
509 info!(role = %role, pattern = %pattern, "Added role auto_allow");
510 }
511
512 pub fn add_slot_auto_allow(&self, slot_id: &str, pattern: &str) {
514 self.permission_policy.add_slot_auto_allow(slot_id, pattern);
515 info!(slot_id = %slot_id, pattern = %pattern, "Added slot auto_allow");
516 }
517
518 pub fn reload_permission_config(&self) {
520 self.permission_policy.reload();
521 info!("Permission config reloaded");
522 }
523
524 pub fn check_permission(
526 &self,
527 slot_id: &str,
528 role: &str,
529 tool_name: &str,
530 ) -> super::PermissionDecision {
531 self.permission_policy
532 .check_permission(slot_id, role, tool_name)
533 }
534}
535
536#[derive(Debug, Clone)]
538pub struct TaskStats {
539 pub queued: usize,
540 pub running: usize,
541 pub done: usize,
542 pub failed: usize,
543}
544
545#[derive(Debug, Clone)]
547pub struct AgentStats {
548 pub total: usize,
549 pub stopped: usize,
550 pub idle: usize,
551 pub busy: usize,
552}
553
554#[derive(Debug, Clone)]
556pub struct SlotStats {
557 pub total: usize,
558 pub by_role: std::collections::HashMap<String, usize>,
559}
560
561#[derive(Debug, Clone)]
563pub struct InboxStats {
564 pub unread: usize,
565}
566
567#[derive(Debug, Clone)]
569pub struct MissionStats {
570 pub tasks: TaskStats,
571 pub agents: AgentStats,
572 pub slots: SlotStats,
573 pub inbox: InboxStats,
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579 use tempfile::tempdir;
580
581 fn create_test_config(dir: &Path) -> (PathBuf, PathBuf) {
582 let db_path = dir.join("mission.db");
583 let slots_config_path = dir.join("slots.yaml");
584
585 let slots_config = r#"
587slots:
588 - id: slot-1
589 role: worker
590 description: Test worker slot
591 - id: slot-2
592 role: specialist
593 description: Test specialist slot
594"#;
595 std::fs::write(&slots_config_path, slots_config).unwrap();
596
597 (db_path, slots_config_path)
598 }
599
600 #[tokio::test]
601 async fn test_create_mission_control() {
602 let dir = tempdir().unwrap();
603 let (db_path, slots_config_path) = create_test_config(dir.path());
604
605 let mc = MissionControl::new(MissionControlOptions {
606 db_path,
607 slots_config_path,
608 permission_config_path: None,
609 logs_dir: None,
610 default_mode: None,
611 })
612 .unwrap();
613
614 let slots = mc.list_slots();
615 assert_eq!(slots.len(), 2);
616 }
617
618 #[tokio::test]
619 async fn test_stats() {
620 let dir = tempdir().unwrap();
621 let (db_path, slots_config_path) = create_test_config(dir.path());
622
623 let mc = MissionControl::new(MissionControlOptions {
624 db_path,
625 slots_config_path,
626 permission_config_path: None,
627 logs_dir: None,
628 default_mode: None,
629 })
630 .unwrap();
631
632 let stats = mc.get_stats();
633 assert_eq!(stats.slots.total, 2);
634 assert_eq!(stats.agents.total, 2);
635 assert_eq!(stats.agents.stopped, 2);
636 }
637
638 #[tokio::test]
639 async fn test_default_mode() {
640 let dir = tempdir().unwrap();
641 let (db_path, slots_config_path) = create_test_config(dir.path());
642
643 let mc = MissionControl::new(MissionControlOptions {
644 db_path,
645 slots_config_path,
646 permission_config_path: None,
647 logs_dir: None,
648 default_mode: Some(ExecutionMode::Pty),
649 })
650 .unwrap();
651
652 assert_eq!(mc.get_default_mode().await, ExecutionMode::Pty);
653
654 mc.set_default_mode(ExecutionMode::Batch).await;
655 assert_eq!(mc.get_default_mode().await, ExecutionMode::Batch);
656 }
657}